FlowBox Implementations (Source, Pipe, Sink)

This guide covers how to implement the three types of FlowBox nodes: Source, Pipe, and Sink.

Worker Entrypoint & Registration

Every TypeScript worker starts with an entrypoint that registers FlowBox implementations and starts the ZeroMQ event loop:

// src/index.ts
import { FlowBoxEventLoopHandler, FlowBoxFactory } from "@industream/flowmaker-sdk";
import { MyFlowBox } from "./my-flowbox";

const implementations = [MyFlowBox];

const handler = new FlowBoxEventLoopHandler(
  process.env.FM_ROUTER_TRANSPORT_ADDRESS || 'tcp://*:5560',
  process.env.FM_RUNTIME_HTTP_ADDRESS || 'http://localhost:3100',
  new FlowBoxFactory(implementations)
);

handler.run();

[INFO!info/RUNTIME INITIALIZATION] The run() method is non-blocking for the event loop setup but starts an infinite async dispatch loop. Code after run() will execute, but the process won't exit naturally—the ZMQ loop keeps the event loop alive.

[WARNING!warning/SINGLETON REGISTRATION] FlowBoxFactory registers all implementations globally. Calling new FlowBoxFactory() multiple times with overlapping IDs causes registration conflicts. Always instantiate exactly once per worker process.

Environment variables:

  • FM_ROUTER_TRANSPORT_ADDRESS: ZMQ router socket address (default: tcp://*:5560)
  • FM_RUNTIME_HTTP_ADDRESS: Scheduler HTTP API endpoint (default: http://localhost:3100)
  • FM_WORKER_ID: Unique worker identifier (required)
  • FM_WORKER_TRANSPORT_ADV_ADDRESS: Advanced transport address for data connection

Pipe Implementation

A minimal pipe that transforms data:

// src/my-pipe.ts
import {
  FlowBoxCore,
  FlowBoxRegistration,
  FlowBoxInitParams,
  flowBoxSerializer,
  SERIALIZER_MSGPACK
} from "@industream/flowmaker-sdk";

@FlowBoxRegistration({
  id: "myorg/my-ts-pipe",
  displayName: "My TypeScript Pipe",
  currentVersion: "1.0.0",
  type: "pipe",
  icon: "transform",
  stateless: true,
  agent: "flowmaker-node/1.0.0",
  workerId: process.env.FM_WORKER_ID!,
  workerDataConnectionString: process.env.FM_WORKER_TRANSPORT_ADV_ADDRESS!,
  ioInterfaces: {
    inputs: [{ name: "input", supportedFormats: ["msgpack", "json"] }],
    outputs: [{ name: "output", supportedFormats: ["msgpack", "json"] }]
  },
  uiConfig: { defaultOptions: {}, implementation: null }
})
export class MyPipe extends FlowBoxCore {
  protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
    // Deserialize input using the header from the message
    const input = flowBoxSerializer.unpack(header, data);

    // Transform
    const output = {
      ...input,
      processed: true,
      timestamp: new Date().toISOString()
    };

    // Push downstream using the same header (preserves serialization method)
    const outputPayload = flowBoxSerializer.pack(header, output);
    await this.push("output", outputPayload, header);
  }
}

[NOTE!sync/BACKPRESSURE HANDLING] The await this.push() call blocks until the downstream receiver signals readiness via CAN_SEND_NEXT. Long-running processing in onInput() delays acknowledgment and can stall the entire pipeline—offload heavy work to worker threads or async queues.

[WARNING!warning/HEADER REUSE] Reusing the incoming header buffer for output preserves the serialization method but copies the event ID. For downstream data flow, this is correct. For control messages (e.g., custom ACKs), create a new header with the appropriate event ID.


Source Implementation

A source that emits constant values:

// src/constant-source.ts
import {
  FlowBoxRaw,
  FlowBoxDestroy,
  FlowBoxRegistration,
  FlowBoxInitParams,
  FlowBoxSource,
  flowBoxLogger,
  flowBoxSerializer,
  SERIALIZER_MSGPACK
} from "@industream/flowmaker-sdk";
import { pack } from "msgpackr";

@FlowBoxRegistration({
  id: "myorg/constant-source",
  displayName: "Constant Source",
  currentVersion: "1.0.0",
  type: "source",
  icon: "data_array",
  stateless: true,
  agent: "flowmaker-node/1.0.0",
  workerId: process.env.FM_WORKER_ID!,
  workerDataConnectionString: process.env.FM_WORKER_TRANSPORT_ADV_ADDRESS!,
  ioInterfaces: {
    inputs: [],
    outputs: [{ name: "default", supportedFormats: ["msgpack"] }]
  },
  uiConfig: { defaultOptions: { value: "Hello" }, implementation: null }
})
export class ConstantSource extends FlowBoxRaw implements FlowBoxSource, FlowBoxDestroy {
  private emitted = false;
  private header: Buffer;

  constructor(protected override initStruct: FlowBoxInitParams) {
    super(initStruct);

    // Increment logger ref count
    flowBoxLogger.increaseRefCount(this.initStruct.runtimeContext.jobId);

    // Create header for serialization
    this.header = flowBoxSerializer.createShortHeader(0, SERIALIZER_MSGPACK);
  }

  public onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): void {
    if (this.emitted) return;
    this.emitted = true;

    // Emit constant value
    fnReadyForNextItem(pack({ message: "Hello, World!" }), this.header);
  }

  public onDestroy(): void {
    // Decrement logger ref count
    flowBoxLogger.decreaseRefCount(this.initStruct.runtimeContext.jobId);
  }
}

[NOTE!info/EAGER EMISSION] Source boxes like ConstantSource typically emit immediately in onOutputReady() without waiting for input. This is the key behavioral difference from pipes/sinks—they initiate the data flow.

[ERROR!error/REF COUNT LEAK] Forgetting decreaseRefCount() in onDestroy() keeps the logger socket alive indefinitely, causing resource leaks and preventing clean worker shutdown. Always pair every increaseRefCount() with a matching decreaseRefCount().


Sink Implementation

A sink that processes incoming data:

// src/my-sink.ts
import {
  FlowBoxCore,
  FlowBoxRegistration,
  FlowBoxInitParams,
  flowBoxLogger,
  flowBoxSerializer
} from "@industream/flowmaker-sdk";

@FlowBoxRegistration({
  id: "myorg/my-sink",
  displayName: "My Sink",
  currentVersion: "1.0.0",
  type: "sink",
  icon: "save",
  stateless: true,
  agent: "flowmaker-node/1.0.0",
  workerId: process.env.FM_WORKER_ID!,
  workerDataConnectionString: process.env.FM_WORKER_TRANSPORT_ADV_ADDRESS!,
  ioInterfaces: {
    inputs: [{ name: "input", supportedFormats: ["msgpack"] }],
    outputs: []
  },
  uiConfig: { defaultOptions: {}, implementation: null }
})
export class MySink extends FlowBoxCore {
  private jobId: string;

  constructor(protected override initStruct: FlowBoxInitParams) {
    super(initStruct);
    this.jobId = this.initStruct.runtimeContext.jobId;

    // Increment logger ref count in constructor
    flowBoxLogger.increaseRefCount(this.jobId);
  }

  protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
    // Deserialize using the header from the message
    const input = flowBoxSerializer.unpack(header, data);

    // Process data (e.g., save to database)
    console.log("Received:", input);

    // Log the processing
    flowBoxLogger.logMessageTo(this.jobId, {
      from: this.initStruct.runtimeContext,
      data: { message: "Processed data", input }
    });
  }

  public onDestroy(): void {
    // Decrement logger ref count
    flowBoxLogger.decreaseRefCount(this.jobId);
  }
}