FlowBoxLogger API
The FlowBoxLogger provides Socket.IO-based logging for FlowBox workers. It manages per-namespace socket connections with automatic reconnection, message queuing, and reference-counted socket lifecycle.
Singleton Instance
import { flowBoxLogger } from "@industream/flowmaker-sdk";
The default singleton connects to FM_WORKER_LOG_SOCKET_IO_ENDPOINT (default: http://localhost:4123).
flowBoxLogger.increaseRefCount()
flowBoxLogger.increaseRefCount(namespace: string)
Parameters:
namespace: The job/flow element namespace (typicallyjobId)
Increments the reference count for a namespace. Creates a new socket connection if it doesn't exist.
const jobId = this.initStruct.runtimeContext.jobId;
flowBoxLogger.increaseRefCount(jobId);
When to use:
- Call at the start of constructor (after
super()) - Each call should be matched with
decreaseRefCount
flowBoxLogger.decreaseRefCount()
flowBoxLogger.decreaseRefCount(namespace: string)
Parameters:
namespace: The job/flow element namespace (typicallyjobId)
Decrements the reference count. When count reaches zero, the socket closes after sending any queued messages.
const jobId = this.initStruct.runtimeContext.jobId;
flowBoxLogger.decreaseRefCount(jobId);
When to use:
- Call in
onDestroy()lifecycle method - Multiple calls can be made if
increaseRefCountwas called multiple times
flowBoxLogger.logMessageTo()
flowBoxLogger.logMessageTo(namespace: string, message: any, emitType: string = 'log')
Parameters:
namespace: The job/flow element namespace (typicallyjobId)message: Any serializable data objectemitType: Optional event name (default:'log')
Sends a log message to a namespace. Messages are queued if the socket isn't connected yet.
const jobId = this.initStruct.runtimeContext.jobId;
// Basic log
flowBoxLogger.logMessageTo(jobId, { message: "Starting processing" });
// With from context
flowBoxLogger.logMessageTo(jobId, {
from: this.initStruct.runtimeContext,
data: { temperature: 21.7 }
});
// Custom event type
flowBoxLogger.logMessageTo(jobId, { error: "Connection failed" }, "error");
Behavior:
- If socket is connected: message is sent immediately
- If socket is disconnected: message is queued and sent on reconnect
- No error is thrown if namespace doesn't exist (silently ignored)
[NOTE!info/LOG QUEUEING] Messages sent via
logMessageTo()before the socket connects are queued and sent on reconnect. This means early constructor logs may arrive out of order with later logs. For critical sequencing, wait for connection or log after initialization completes.
flowBoxLogger.onMessage()
flowBoxLogger.onMessage(namespace: string, message: string, callback: (message: any) => void)
Parameters:
namespace: The namespace to listen onmessage: The event name to listen forcallback: Function called with received data
Registers a listener for incoming messages on a namespace. Used for receiving widget events or bidirectional communication.
flowBoxLogger.onMessage(jobId, "widgetEvent", (eventData) => {
console.log("Received widget event:", eventData);
});
[WARNING!error/ASYNC DESTRUCTOR]
onDestroy()is called synchronously by the runtime. Async cleanup (e.g.,await socket.close()) may not complete before the process exits. Use fire-and-forget patterns or register shutdown hooks with theFlowBoxEventLoopHandler.
Complete Logger Usage Example
import {
FlowBoxCore,
FlowBoxRegistration,
FlowBoxInitParams,
flowBoxLogger,
flowBoxSerializer
} from "@industream/flowmaker-sdk";
@FlowBoxRegistration({
id: "myorg/logger-example",
displayName: "Logger Example",
currentVersion: "1.0.0",
type: "pipe",
icon: "bolt",
stateless: true,
agent: "flowmaker-node/1.0.0",
workerId: process.env.FM_WORKER_ID!,
workerDataConnectionString: process.env.FM_WORKER_TRANSPORT_ADV_ADDRESS!,
ioInterfaces: {
inputs: [{ name: "input", supportedFormats: ["msgpack"] }],
outputs: [{ name: "output", supportedFormats: ["msgpack"] }]
},
uiConfig: { defaultOptions: {}, implementation: null }
})
export class LoggerExample extends FlowBoxCore {
private jobId: string;
constructor(protected override initStruct: FlowBoxInitParams) {
super(initStruct);
this.jobId = this.initStruct.runtimeContext.jobId;
// Increment ref count
flowBoxLogger.increaseRefCount(this.jobId);
// Log initialization
flowBoxLogger.logMessageTo(this.jobId, {
from: this.initStruct.runtimeContext,
data: { message: "FlowBox initialized" }
});
}
protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
// Deserialize using the header
const input = flowBoxSerializer.unpack(header, data);
// Log incoming data
flowBoxLogger.logMessageTo(this.jobId, {
from: this.initStruct.runtimeContext,
data: { message: "Received input", input }
});
// Process and push (reuse header to preserve serialization)
const outputPayload = flowBoxSerializer.pack(header, input);
await this.push("output", outputPayload, header);
}
public onDestroy(): void {
// Log shutdown
flowBoxLogger.logMessageTo(this.jobId, {
from: this.initStruct.runtimeContext,
data: { message: "FlowBox destroyed" }
});
// Decrement ref count
flowBoxLogger.decreaseRefCount(this.jobId);
}
}
Next Steps
- FlowBoxSerializer API — Serialization and headers