FlowBoxCore and FlowBoxRaw: Base Classes for Worker Implementations

This page explains the two base classes you can extend when implementing FlowBox workers: FlowBoxRaw and FlowBoxCore. Understanding the difference is crucial for choosing the right level of abstraction for your use case.

Quick Comparison

Feature FlowBoxRaw FlowBoxCore
Abstraction Level Low-level, manual High-level, async-friendly
Input Handling Implement onInputReceived() directly Implement abstract onInput()
Output Handling Implement onOutputReady() directly Use await this.push()
Backpressure Manual via fnReadyForNextItem callback Automatic via Promise resolution
Best For Custom protocols, fine-grained control Typical transforms, aggregations

[NOTE!lightbulb/CHOOSE YOUR BASE] Start with FlowBoxCore for 90% of use cases. Only use FlowBoxRaw if you need to implement custom protocols or have very specific control requirements (e.g., the ConstantSource example).


FlowBoxRaw: The Low-Level Base Class

FlowBoxRaw is the minimal abstract base class that all FlowBox implementations ultimately inherit from. It provides no helper methods—you implement the lifecycle callbacks directly.

Class Definition

export abstract class FlowBoxRaw {
    constructor(protected initStruct: FlowBoxInitParams) { }
    initialize?: () => Promise<void> | void;
}

Properties

initStruct: FlowBoxInitParams — Protected constructor parameter containing:

  • runtimeContext: Job ID, attempt number, node reference
  • widgetManager: For widget updates
  • flowElementId: Full flow element identifier
  • workerDataConnectionString: Data connection address

Optional Methods

initialize()

initialize?: () => Promise<void> | void;

Optional lifecycle hook called after construction but before the FlowBox starts receiving events. Use for async setup like database connections or API client initialization.

[INFO!info/INITIALIZATION TIMING] The initialize() method is called by the FlowBoxEventLoopHandler after instantiation. If it returns a Promise, the handler waits for resolution before starting the event loop. For synchronous setup, a regular function works fine.


FlowBoxSource Interface

Sources emit data without receiving input. Implement onOutputReady() to push data downstream.

Method Signature

onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): any;

Parameters:

  • outputId: Buffer containing the output port identifier (e.g., Buffer.from("default"))
  • header: Buffer containing the message header (serialization method + event ID)
  • fnReadyForNextItem: Callback to invoke when you have data ready to send

Description: Called by the runtime when downstream is ready to receive. Invoke fnReadyForNextItem(data, header) with your payload.

[WARNING!warning/EAGER EMISSION PATTERN] Sources typically emit immediately in onOutputReady() without waiting for input. This initiates the data flow. If you don't call fnReadyForNextItem(), the pipeline stalls—downstream never receives data.

Example: Constant Source

import {
  FlowBoxRaw,
  FlowBoxSource,
  FlowBoxRegistration,
  FlowBoxInitParams,
  flowBoxSerializer,
  SERIALIZER_MSGPACK
} from "@industream/flowmaker-sdk";
import { pack } from "msgpackr";

@FlowBoxRegistration({
  id: "myorg/constant-source",
  type: "source",
  // ... registration config
})
export class ConstantSource extends FlowBoxRaw implements FlowBoxSource {
  private emitted = false;
  private header: Buffer;

  constructor(protected override initStruct: FlowBoxInitParams) {
    super(initStruct);
    this.header = flowBoxSerializer.createShortHeader(0, SERIALIZER_MSGPACK);
  }

  public onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): void {
    if (this.emitted) return;
    this.emitted = true;

    fnReadyForNextItem(pack({ message: "Hello, World!" }), this.header);
  }
}

FlowBoxSink Interface

Sinks receive data without emitting output. Implement onInputReceived() to process incoming messages.

Method Signature

onInputReceived(inputId: Buffer, header: Buffer, data: Buffer, fnReadyForNextItem: () => any): any;

Parameters:

  • inputId: Buffer containing the input port identifier
  • header: Buffer containing the message header
  • data: Buffer containing the serialized payload
  • fnReadyForNextItem: Callback to invoke when ready for the next message

Description: Called when a message arrives on an input port. Process the data and call fnReadyForNextItem() to signal readiness for the next message.

[NOTE!sync/BACKPRESSURE SIGNALING] Calling fnReadyForNextItem() signals to the runtime that you can accept another message. For long-running processing, await completion before calling the callback to avoid overwhelming the sink.

Example: Raw Sink

import {
  FlowBoxRaw,
  FlowBoxSink,
  FlowBoxRegistration,
  FlowBoxInitParams,
  flowBoxLogger,
  flowBoxSerializer
} from "@industream/flowmaker-sdk";

@FlowBoxRegistration({
  id: "myorg/raw-sink",
  type: "sink",
  // ... registration config
})
export class RawSink extends FlowBoxRaw implements FlowBoxSink {
  private jobId: string;

  constructor(protected override initStruct: FlowBoxInitParams) {
    super(initStruct);
    this.jobId = this.initStruct.runtimeContext.jobId;
    flowBoxLogger.increaseRefCount(this.jobId);
  }

  public onInputReceived(inputId: Buffer, header: Buffer, data: Buffer, fnReadyForNextItem: () => any): void {
    const input = flowBoxSerializer.unpack(header, data);

    // Process synchronously
    console.log("Received:", input);

    flowBoxLogger.logMessageTo(this.jobId, {
      from: this.initStruct.runtimeContext,
      data: { message: "Processed", input }
    });

    // Signal readiness
    fnReadyForNextItem();
  }

  public onDestroy(): void {
    flowBoxLogger.decreaseRefCount(this.jobId);
  }
}

FlowBoxCore: The High-Level Base Class

FlowBoxCore implements both FlowBoxSource and FlowBoxSink, providing async-friendly methods that hide the complexity of the low-level callbacks.

Class Definition

export abstract class FlowBoxCore implements FlowBoxSource, FlowBoxSink {
    constructor(protected initStruct: FlowBoxInitParams) {}
    initialize?: () => Promise<void> | void;
    protected abstract onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void>;
}

Abstract Methods

onInput()

protected abstract onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void>;

Parameters:

  • inputId: Buffer containing the input port identifier
  • header: Buffer containing the message header
  • data: Buffer containing the serialized payload

Description: Override this method to implement your pipe/sink logic. The runtime automatically calls fnReadyForNextItem() after your Promise resolves.

[NOTE!async/AWAIT NATURALNESS] Unlike FlowBoxSink.onInputReceived(), you don't manually call a readiness callback. Just await your async operations and return—the base class handles the rest. This makes FlowBoxCore ideal for database queries, API calls, or any async processing.

Example: Core Pipe

import {
  FlowBoxCore,
  FlowBoxRegistration,
  FlowBoxInitParams,
  flowBoxSerializer
} from "@industream/flowmaker-sdk";

@FlowBoxRegistration({
  id: "myorg/core-pipe",
  type: "pipe",
  // ... registration config
})
export class CorePipe extends FlowBoxCore {
  protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
    const input = flowBoxSerializer.unpack(header, data);

    // Async processing
    const output = await this.transform(input);

    // Push downstream
    const outputPayload = flowBoxSerializer.pack(header, output);
    await this.push("output", outputPayload, header);
  }

  private async transform(input: any): Promise<any> {
    // Simulate async work
    await new Promise(resolve => setTimeout(resolve, 100));
    return { ...input, transformed: true };
  }
}

Instance Methods

push()

public async push(outputId: string, data: Buffer, header: Buffer, bufferCount: number = 0): Promise<void>

Parameters:

  • outputId: String identifier of the output port (e.g., "output")
  • data: Buffer containing the serialized payload
  • header: Buffer containing the message header
  • bufferCount: Optional buffer count for flow control (default: 0)

Description: Push data to an output port. Returns a Promise that resolves when downstream acknowledges readiness.

[NOTE!sync/BUFFERING BEHAVIOR] The bufferCount parameter controls how many items can be buffered before backpressure kicks in. With bufferCount = 0 (default), each push() waits for acknowledgment. Set to a positive number to allow batching, but be aware this increases memory usage.

onOutputReady() (inherited)

onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): void

Description: Implemented by FlowBoxCore to integrate with the internal RxJS subject system. Typically you don't override this—use push() instead.

onInputReceived() (inherited)

onInputReceived(inputId: Buffer, header: Buffer, data: Buffer, fnReadyForNextItem: () => any): void

Description: Implemented by FlowBoxCore to call your onInput() implementation and automatically invoke fnReadyForNextItem() after the Promise resolves.


Key Differences: Core vs Raw

1. Input Handling

FlowBoxRaw + FlowBoxSink:

public onInputReceived(inputId: Buffer, header: Buffer, data: Buffer, fnReadyForNextItem: () => any): void {
  const input = flowBoxSerializer.unpack(header, data);
  // Synchronous processing
  fnReadyForNextItem(); // Manual callback
}

FlowBoxCore:

protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
  const input = flowBoxSerializer.unpack(header, data);
  // Async processing with await
  await this.process(input);
  // No callback needed—runtime handles it
}

[ERROR!error/CALLBACK FORGETTING] With FlowBoxRaw, forgetting to call fnReadyForNextItem() blocks the entire pipeline. With FlowBoxCore, the Promise-based approach makes this impossible—you either resolve or reject, and the runtime handles both cases.

2. Output Handling

FlowBoxRaw + FlowBoxSource:

public onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): void {
  const data = pack({ value: 42 });
  fnReadyForNextItem(data, this.header); // Immediate push
}

FlowBoxCore:

// In onInput() or initialize()
await this.push("output", data, header); // Async push with backpressure handling

[INFO!info/PUSH SEMANTICS] FlowBoxCore.push() is async and waits for downstream acknowledgment. FlowBoxRaw's fnReadyForNextItem() is synchronous—the data is queued immediately, but you lose the ability to await completion.

3. Backpressure

FlowBoxRaw: Manual control via callback timing. You decide when to call fnReadyForNextItem().

FlowBoxCore: Automatic via Promise resolution. The runtime tracks clearCount and only resolves push() when downstream is ready.

[WARNING!warning/DEADLOCK RISK] With FlowBoxRaw, circular dependencies (A pushes to B, B pushes to A in the same callback) cause deadlocks. FlowBoxCore mitigates this with its RxJS subject-based buffering, but circular data flow is still an anti-pattern.


When to Use Each

Use FlowBoxCore when:

  • Building typical pipes that transform data
  • Need async processing (DB queries, API calls)
  • Want simpler, more maintainable code
  • Don't need fine-grained control over message timing

Use FlowBoxRaw when:

  • Implementing sources that emit on schedule
  • Need custom protocol handling
  • Require precise control over message timing
  • Building specialized boxes with unique lifecycle needs

[NOTE!lightbulb/MOSTLY CORE] The ConstantSource example uses FlowBoxRaw because it needs to emit immediately in onOutputReady(). For pipes and sinks, FlowBoxCore is almost always the better choice.


Lifecycle Hooks

Both FlowBoxRaw and FlowBoxCore support these lifecycle methods:

Constructor

constructor(protected initStruct: FlowBoxInitParams) {
  // Access runtime context
  const jobId = initStruct.runtimeContext.jobId;
  // Increment logger ref count if using flowBoxLogger
  flowBoxLogger.increaseRefCount(jobId);
}

initialize()

async initialize() {
  // Async setup
  await this.db.connect();
}

onDestroy() (via FlowBoxDestroy interface)

public onDestroy(): void {
  // Cleanup
  flowBoxLogger.decreaseRefCount(this.jobId);
}

[ERROR!error/REF COUNT LEAK] Always pair increaseRefCount() with decreaseRefCount() in onDestroy(). Forgetting this keeps the logger socket alive, preventing worker shutdown.


Internal Implementation Details

FlowBoxCore's RxJS Subjects

FlowBoxCore uses RxJS Subject instances to coordinate input/output:

private outputSubjects: { [outputId: string]: InOutSubject } = {};

private createIOSubjects(): InOutSubject {
  const newIOSubject = {
    outputSubject: new Subject<[(data: Buffer, header: Buffer) => any]>(),
    inputSubject: new Subject<[Buffer, Buffer, () => any]>(),
    clearCount: 0
  };

  zip([newIOSubject.inputSubject, newIOSubject.outputSubject]).subscribe(([inputs, outputs]) => {
    const [outputResolver] = outputs;
    const [inputData, inputHeader, inputResolver] = inputs;

    newIOSubject.clearCount--;
    outputResolver(inputData, inputHeader);
    inputResolver();
  });

  return newIOSubject;
}

[INFO!info/ZIP COORDINATION] The zip() operator ensures that every push() (output) is matched with an onOutputReady() (input from downstream). This creates a lock-step flow control mechanism.

clearCount Mechanism

The clearCount field tracks buffered items:

  • Incremented on push()
  • Decreased when downstream acknowledges
  • If clearCount < bufferCount, push() returns immediately without waiting

This enables configurable batching while preventing unbounded memory growth.