FlowBoxLogger API

The FlowBoxLogger provides Socket.IO-based logging for FlowBox workers. It manages per-namespace socket connections with automatic reconnection, message queuing, and reference-counted socket lifecycle.

Singleton Instance

import { flowBoxLogger } from "@industream/flowmaker-sdk";

The default singleton connects to FM_WORKER_LOG_SOCKET_IO_ENDPOINT (default: http://localhost:4123).


flowBoxLogger.increaseRefCount()

flowBoxLogger.increaseRefCount(namespace: string)

Parameters:

  • namespace: The job/flow element namespace (typically jobId)

Increments the reference count for a namespace. Creates a new socket connection if it doesn't exist.

const jobId = this.initStruct.runtimeContext.jobId;
flowBoxLogger.increaseRefCount(jobId);

When to use:

  • Call at the start of constructor (after super())
  • Each call should be matched with decreaseRefCount

flowBoxLogger.decreaseRefCount()

flowBoxLogger.decreaseRefCount(namespace: string)

Parameters:

  • namespace: The job/flow element namespace (typically jobId)

Decrements the reference count. When count reaches zero, the socket closes after sending any queued messages.

const jobId = this.initStruct.runtimeContext.jobId;
flowBoxLogger.decreaseRefCount(jobId);

When to use:

  • Call in onDestroy() lifecycle method
  • Multiple calls can be made if increaseRefCount was called multiple times

flowBoxLogger.logMessageTo()

flowBoxLogger.logMessageTo(namespace: string, message: any, emitType: string = 'log')

Parameters:

  • namespace: The job/flow element namespace (typically jobId)
  • message: Any serializable data object
  • emitType: Optional event name (default: 'log')

Sends a log message to a namespace. Messages are queued if the socket isn't connected yet.

const jobId = this.initStruct.runtimeContext.jobId;

// Basic log
flowBoxLogger.logMessageTo(jobId, { message: "Starting processing" });

// With from context
flowBoxLogger.logMessageTo(jobId, {
  from: this.initStruct.runtimeContext,
  data: { temperature: 21.7 }
});

// Custom event type
flowBoxLogger.logMessageTo(jobId, { error: "Connection failed" }, "error");

Behavior:

  • If socket is connected: message is sent immediately
  • If socket is disconnected: message is queued and sent on reconnect
  • No error is thrown if namespace doesn't exist (silently ignored)

[NOTE!info/LOG QUEUEING] Messages sent via logMessageTo() before the socket connects are queued and sent on reconnect. This means early constructor logs may arrive out of order with later logs. For critical sequencing, wait for connection or log after initialization completes.


flowBoxLogger.onMessage()

flowBoxLogger.onMessage(namespace: string, message: string, callback: (message: any) => void)

Parameters:

  • namespace: The namespace to listen on
  • message: The event name to listen for
  • callback: Function called with received data

Registers a listener for incoming messages on a namespace. Used for receiving widget events or bidirectional communication.

flowBoxLogger.onMessage(jobId, "widgetEvent", (eventData) => {
  console.log("Received widget event:", eventData);
});

[WARNING!error/ASYNC DESTRUCTOR] onDestroy() is called synchronously by the runtime. Async cleanup (e.g., await socket.close()) may not complete before the process exits. Use fire-and-forget patterns or register shutdown hooks with the FlowBoxEventLoopHandler.


Complete Logger Usage Example

import {
  FlowBoxCore,
  FlowBoxRegistration,
  FlowBoxInitParams,
  flowBoxLogger,
  flowBoxSerializer
} from "@industream/flowmaker-sdk";

@FlowBoxRegistration({
  id: "myorg/logger-example",
  displayName: "Logger Example",
  currentVersion: "1.0.0",
  type: "pipe",
  icon: "bolt",
  stateless: true,
  agent: "flowmaker-node/1.0.0",
  workerId: process.env.FM_WORKER_ID!,
  workerDataConnectionString: process.env.FM_WORKER_TRANSPORT_ADV_ADDRESS!,
  ioInterfaces: {
    inputs: [{ name: "input", supportedFormats: ["msgpack"] }],
    outputs: [{ name: "output", supportedFormats: ["msgpack"] }]
  },
  uiConfig: { defaultOptions: {}, implementation: null }
})
export class LoggerExample extends FlowBoxCore {
  private jobId: string;

  constructor(protected override initStruct: FlowBoxInitParams) {
    super(initStruct);
    this.jobId = this.initStruct.runtimeContext.jobId;

    // Increment ref count
    flowBoxLogger.increaseRefCount(this.jobId);

    // Log initialization
    flowBoxLogger.logMessageTo(this.jobId, {
      from: this.initStruct.runtimeContext,
      data: { message: "FlowBox initialized" }
    });
  }

  protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
    // Deserialize using the header
    const input = flowBoxSerializer.unpack(header, data);

    // Log incoming data
    flowBoxLogger.logMessageTo(this.jobId, {
      from: this.initStruct.runtimeContext,
      data: { message: "Received input", input }
    });

    // Process and push (reuse header to preserve serialization)
    const outputPayload = flowBoxSerializer.pack(header, input);
    await this.push("output", outputPayload, header);
  }

  public onDestroy(): void {
    // Log shutdown
    flowBoxLogger.logMessageTo(this.jobId, {
      from: this.initStruct.runtimeContext,
      data: { message: "FlowBox destroyed" }
    });

    // Decrement ref count
    flowBoxLogger.decreaseRefCount(this.jobId);
  }
}

Next Steps