FlowBox Implementations (Source, Pipe, Sink)
This guide covers how to implement the three types of FlowBox nodes: Source, Pipe, and Sink.
Worker Entrypoint & Registration
Every TypeScript worker starts with an entrypoint that registers FlowBox implementations and starts the ZeroMQ event loop:
// src/index.ts
import { FlowBoxEventLoopHandler, FlowBoxFactory } from "@industream/flowmaker-sdk";
import { MyFlowBox } from "./my-flowbox";
const implementations = [MyFlowBox];
const handler = new FlowBoxEventLoopHandler(
process.env.FM_ROUTER_TRANSPORT_ADDRESS || 'tcp://*:5560',
process.env.FM_RUNTIME_HTTP_ADDRESS || 'http://localhost:3100',
new FlowBoxFactory(implementations)
);
handler.run();
[INFO!info/RUNTIME INITIALIZATION] The
run()method is non-blocking for the event loop setup but starts an infinite async dispatch loop. Code afterrun()will execute, but the process won't exit naturally—the ZMQ loop keeps the event loop alive.
[WARNING!warning/SINGLETON REGISTRATION]
FlowBoxFactoryregisters all implementations globally. Callingnew FlowBoxFactory()multiple times with overlapping IDs causes registration conflicts. Always instantiate exactly once per worker process.
Environment variables:
FM_ROUTER_TRANSPORT_ADDRESS: ZMQ router socket address (default:tcp://*:5560)FM_RUNTIME_HTTP_ADDRESS: Scheduler HTTP API endpoint (default:http://localhost:3100)FM_WORKER_ID: Unique worker identifier (required)FM_WORKER_TRANSPORT_ADV_ADDRESS: Advanced transport address for data connection
Pipe Implementation
A minimal pipe that transforms data:
// src/my-pipe.ts
import {
FlowBoxCore,
FlowBoxRegistration,
FlowBoxInitParams,
flowBoxSerializer,
SERIALIZER_MSGPACK
} from "@industream/flowmaker-sdk";
@FlowBoxRegistration({
id: "myorg/my-ts-pipe",
displayName: "My TypeScript Pipe",
currentVersion: "1.0.0",
type: "pipe",
icon: "transform",
stateless: true,
agent: "flowmaker-node/1.0.0",
workerId: process.env.FM_WORKER_ID!,
workerDataConnectionString: process.env.FM_WORKER_TRANSPORT_ADV_ADDRESS!,
ioInterfaces: {
inputs: [{ name: "input", supportedFormats: ["msgpack", "json"] }],
outputs: [{ name: "output", supportedFormats: ["msgpack", "json"] }]
},
uiConfig: { defaultOptions: {}, implementation: null }
})
export class MyPipe extends FlowBoxCore {
protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
// Deserialize input using the header from the message
const input = flowBoxSerializer.unpack(header, data);
// Transform
const output = {
...input,
processed: true,
timestamp: new Date().toISOString()
};
// Push downstream using the same header (preserves serialization method)
const outputPayload = flowBoxSerializer.pack(header, output);
await this.push("output", outputPayload, header);
}
}
[NOTE!sync/BACKPRESSURE HANDLING] The
await this.push()call blocks until the downstream receiver signals readiness viaCAN_SEND_NEXT. Long-running processing inonInput()delays acknowledgment and can stall the entire pipeline—offload heavy work to worker threads or async queues.
[WARNING!warning/HEADER REUSE] Reusing the incoming
headerbuffer for output preserves the serialization method but copies the event ID. For downstream data flow, this is correct. For control messages (e.g., custom ACKs), create a new header with the appropriate event ID.
Source Implementation
A source that emits constant values:
// src/constant-source.ts
import {
FlowBoxRaw,
FlowBoxDestroy,
FlowBoxRegistration,
FlowBoxInitParams,
FlowBoxSource,
flowBoxLogger,
flowBoxSerializer,
SERIALIZER_MSGPACK
} from "@industream/flowmaker-sdk";
import { pack } from "msgpackr";
@FlowBoxRegistration({
id: "myorg/constant-source",
displayName: "Constant Source",
currentVersion: "1.0.0",
type: "source",
icon: "data_array",
stateless: true,
agent: "flowmaker-node/1.0.0",
workerId: process.env.FM_WORKER_ID!,
workerDataConnectionString: process.env.FM_WORKER_TRANSPORT_ADV_ADDRESS!,
ioInterfaces: {
inputs: [],
outputs: [{ name: "default", supportedFormats: ["msgpack"] }]
},
uiConfig: { defaultOptions: { value: "Hello" }, implementation: null }
})
export class ConstantSource extends FlowBoxRaw implements FlowBoxSource, FlowBoxDestroy {
private emitted = false;
private header: Buffer;
constructor(protected override initStruct: FlowBoxInitParams) {
super(initStruct);
// Increment logger ref count
flowBoxLogger.increaseRefCount(this.initStruct.runtimeContext.jobId);
// Create header for serialization
this.header = flowBoxSerializer.createShortHeader(0, SERIALIZER_MSGPACK);
}
public onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): void {
if (this.emitted) return;
this.emitted = true;
// Emit constant value
fnReadyForNextItem(pack({ message: "Hello, World!" }), this.header);
}
public onDestroy(): void {
// Decrement logger ref count
flowBoxLogger.decreaseRefCount(this.initStruct.runtimeContext.jobId);
}
}
[NOTE!info/EAGER EMISSION] Source boxes like
ConstantSourcetypically emit immediately inonOutputReady()without waiting for input. This is the key behavioral difference from pipes/sinks—they initiate the data flow.
[ERROR!error/REF COUNT LEAK] Forgetting
decreaseRefCount()inonDestroy()keeps the logger socket alive indefinitely, causing resource leaks and preventing clean worker shutdown. Always pair everyincreaseRefCount()with a matchingdecreaseRefCount().
Sink Implementation
A sink that processes incoming data:
// src/my-sink.ts
import {
FlowBoxCore,
FlowBoxRegistration,
FlowBoxInitParams,
flowBoxLogger,
flowBoxSerializer
} from "@industream/flowmaker-sdk";
@FlowBoxRegistration({
id: "myorg/my-sink",
displayName: "My Sink",
currentVersion: "1.0.0",
type: "sink",
icon: "save",
stateless: true,
agent: "flowmaker-node/1.0.0",
workerId: process.env.FM_WORKER_ID!,
workerDataConnectionString: process.env.FM_WORKER_TRANSPORT_ADV_ADDRESS!,
ioInterfaces: {
inputs: [{ name: "input", supportedFormats: ["msgpack"] }],
outputs: []
},
uiConfig: { defaultOptions: {}, implementation: null }
})
export class MySink extends FlowBoxCore {
private jobId: string;
constructor(protected override initStruct: FlowBoxInitParams) {
super(initStruct);
this.jobId = this.initStruct.runtimeContext.jobId;
// Increment logger ref count in constructor
flowBoxLogger.increaseRefCount(this.jobId);
}
protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
// Deserialize using the header from the message
const input = flowBoxSerializer.unpack(header, data);
// Process data (e.g., save to database)
console.log("Received:", input);
// Log the processing
flowBoxLogger.logMessageTo(this.jobId, {
from: this.initStruct.runtimeContext,
data: { message: "Processed data", input }
});
}
public onDestroy(): void {
// Decrement logger ref count
flowBoxLogger.decreaseRefCount(this.jobId);
}
}