FlowBoxCore and FlowBoxRaw: Base Classes for Worker Implementations
This page explains the two base classes you can extend when implementing FlowBox workers: FlowBoxRaw and FlowBoxCore. Understanding the difference is crucial for choosing the right level of abstraction for your use case.
Quick Comparison
| Feature | FlowBoxRaw |
FlowBoxCore |
|---|---|---|
| Abstraction Level | Low-level, manual | High-level, async-friendly |
| Input Handling | Implement onInputReceived() directly |
Implement abstract onInput() |
| Output Handling | Implement onOutputReady() directly |
Use await this.push() |
| Backpressure | Manual via fnReadyForNextItem callback |
Automatic via Promise resolution |
| Best For | Custom protocols, fine-grained control | Typical transforms, aggregations |
[NOTE!lightbulb/CHOOSE YOUR BASE] Start with
FlowBoxCorefor 90% of use cases. Only useFlowBoxRawif you need to implement custom protocols or have very specific control requirements (e.g., theConstantSourceexample).
FlowBoxRaw: The Low-Level Base Class
FlowBoxRaw is the minimal abstract base class that all FlowBox implementations ultimately inherit from. It provides no helper methods—you implement the lifecycle callbacks directly.
Class Definition
export abstract class FlowBoxRaw {
constructor(protected initStruct: FlowBoxInitParams) { }
initialize?: () => Promise<void> | void;
}
Properties
initStruct: FlowBoxInitParams — Protected constructor parameter containing:
runtimeContext: Job ID, attempt number, node referencewidgetManager: For widget updatesflowElementId: Full flow element identifierworkerDataConnectionString: Data connection address
Optional Methods
initialize()
initialize?: () => Promise<void> | void;
Optional lifecycle hook called after construction but before the FlowBox starts receiving events. Use for async setup like database connections or API client initialization.
[INFO!info/INITIALIZATION TIMING] The
initialize()method is called by theFlowBoxEventLoopHandlerafter instantiation. If it returns a Promise, the handler waits for resolution before starting the event loop. For synchronous setup, a regular function works fine.
FlowBoxSource Interface
Sources emit data without receiving input. Implement onOutputReady() to push data downstream.
Method Signature
onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): any;
Parameters:
outputId: Buffer containing the output port identifier (e.g.,Buffer.from("default"))header: Buffer containing the message header (serialization method + event ID)fnReadyForNextItem: Callback to invoke when you have data ready to send
Description:
Called by the runtime when downstream is ready to receive. Invoke fnReadyForNextItem(data, header) with your payload.
[WARNING!warning/EAGER EMISSION PATTERN] Sources typically emit immediately in
onOutputReady()without waiting for input. This initiates the data flow. If you don't callfnReadyForNextItem(), the pipeline stalls—downstream never receives data.
Example: Constant Source
import {
FlowBoxRaw,
FlowBoxSource,
FlowBoxRegistration,
FlowBoxInitParams,
flowBoxSerializer,
SERIALIZER_MSGPACK
} from "@industream/flowmaker-sdk";
import { pack } from "msgpackr";
@FlowBoxRegistration({
id: "myorg/constant-source",
type: "source",
// ... registration config
})
export class ConstantSource extends FlowBoxRaw implements FlowBoxSource {
private emitted = false;
private header: Buffer;
constructor(protected override initStruct: FlowBoxInitParams) {
super(initStruct);
this.header = flowBoxSerializer.createShortHeader(0, SERIALIZER_MSGPACK);
}
public onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): void {
if (this.emitted) return;
this.emitted = true;
fnReadyForNextItem(pack({ message: "Hello, World!" }), this.header);
}
}
FlowBoxSink Interface
Sinks receive data without emitting output. Implement onInputReceived() to process incoming messages.
Method Signature
onInputReceived(inputId: Buffer, header: Buffer, data: Buffer, fnReadyForNextItem: () => any): any;
Parameters:
inputId: Buffer containing the input port identifierheader: Buffer containing the message headerdata: Buffer containing the serialized payloadfnReadyForNextItem: Callback to invoke when ready for the next message
Description:
Called when a message arrives on an input port. Process the data and call fnReadyForNextItem() to signal readiness for the next message.
[NOTE!sync/BACKPRESSURE SIGNALING] Calling
fnReadyForNextItem()signals to the runtime that you can accept another message. For long-running processing, await completion before calling the callback to avoid overwhelming the sink.
Example: Raw Sink
import {
FlowBoxRaw,
FlowBoxSink,
FlowBoxRegistration,
FlowBoxInitParams,
flowBoxLogger,
flowBoxSerializer
} from "@industream/flowmaker-sdk";
@FlowBoxRegistration({
id: "myorg/raw-sink",
type: "sink",
// ... registration config
})
export class RawSink extends FlowBoxRaw implements FlowBoxSink {
private jobId: string;
constructor(protected override initStruct: FlowBoxInitParams) {
super(initStruct);
this.jobId = this.initStruct.runtimeContext.jobId;
flowBoxLogger.increaseRefCount(this.jobId);
}
public onInputReceived(inputId: Buffer, header: Buffer, data: Buffer, fnReadyForNextItem: () => any): void {
const input = flowBoxSerializer.unpack(header, data);
// Process synchronously
console.log("Received:", input);
flowBoxLogger.logMessageTo(this.jobId, {
from: this.initStruct.runtimeContext,
data: { message: "Processed", input }
});
// Signal readiness
fnReadyForNextItem();
}
public onDestroy(): void {
flowBoxLogger.decreaseRefCount(this.jobId);
}
}
FlowBoxCore: The High-Level Base Class
FlowBoxCore implements both FlowBoxSource and FlowBoxSink, providing async-friendly methods that hide the complexity of the low-level callbacks.
Class Definition
export abstract class FlowBoxCore implements FlowBoxSource, FlowBoxSink {
constructor(protected initStruct: FlowBoxInitParams) {}
initialize?: () => Promise<void> | void;
protected abstract onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void>;
}
Abstract Methods
onInput()
protected abstract onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void>;
Parameters:
inputId: Buffer containing the input port identifierheader: Buffer containing the message headerdata: Buffer containing the serialized payload
Description:
Override this method to implement your pipe/sink logic. The runtime automatically calls fnReadyForNextItem() after your Promise resolves.
[NOTE!async/AWAIT NATURALNESS] Unlike
FlowBoxSink.onInputReceived(), you don't manually call a readiness callback. Justawaityour async operations and return—the base class handles the rest. This makesFlowBoxCoreideal for database queries, API calls, or any async processing.
Example: Core Pipe
import {
FlowBoxCore,
FlowBoxRegistration,
FlowBoxInitParams,
flowBoxSerializer
} from "@industream/flowmaker-sdk";
@FlowBoxRegistration({
id: "myorg/core-pipe",
type: "pipe",
// ... registration config
})
export class CorePipe extends FlowBoxCore {
protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
const input = flowBoxSerializer.unpack(header, data);
// Async processing
const output = await this.transform(input);
// Push downstream
const outputPayload = flowBoxSerializer.pack(header, output);
await this.push("output", outputPayload, header);
}
private async transform(input: any): Promise<any> {
// Simulate async work
await new Promise(resolve => setTimeout(resolve, 100));
return { ...input, transformed: true };
}
}
Instance Methods
push()
public async push(outputId: string, data: Buffer, header: Buffer, bufferCount: number = 0): Promise<void>
Parameters:
outputId: String identifier of the output port (e.g.,"output")data: Buffer containing the serialized payloadheader: Buffer containing the message headerbufferCount: Optional buffer count for flow control (default: 0)
Description: Push data to an output port. Returns a Promise that resolves when downstream acknowledges readiness.
[NOTE!sync/BUFFERING BEHAVIOR] The
bufferCountparameter controls how many items can be buffered before backpressure kicks in. WithbufferCount = 0(default), eachpush()waits for acknowledgment. Set to a positive number to allow batching, but be aware this increases memory usage.
onOutputReady() (inherited)
onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): void
Description:
Implemented by FlowBoxCore to integrate with the internal RxJS subject system. Typically you don't override this—use push() instead.
onInputReceived() (inherited)
onInputReceived(inputId: Buffer, header: Buffer, data: Buffer, fnReadyForNextItem: () => any): void
Description:
Implemented by FlowBoxCore to call your onInput() implementation and automatically invoke fnReadyForNextItem() after the Promise resolves.
Key Differences: Core vs Raw
1. Input Handling
FlowBoxRaw + FlowBoxSink:
public onInputReceived(inputId: Buffer, header: Buffer, data: Buffer, fnReadyForNextItem: () => any): void {
const input = flowBoxSerializer.unpack(header, data);
// Synchronous processing
fnReadyForNextItem(); // Manual callback
}
FlowBoxCore:
protected async onInput(inputId: Buffer, header: Buffer, data: Buffer): Promise<void> {
const input = flowBoxSerializer.unpack(header, data);
// Async processing with await
await this.process(input);
// No callback needed—runtime handles it
}
[ERROR!error/CALLBACK FORGETTING] With
FlowBoxRaw, forgetting to callfnReadyForNextItem()blocks the entire pipeline. WithFlowBoxCore, the Promise-based approach makes this impossible—you either resolve or reject, and the runtime handles both cases.
2. Output Handling
FlowBoxRaw + FlowBoxSource:
public onOutputReady(outputId: Buffer, header: Buffer, fnReadyForNextItem: (data: Buffer, header: Buffer) => any): void {
const data = pack({ value: 42 });
fnReadyForNextItem(data, this.header); // Immediate push
}
FlowBoxCore:
// In onInput() or initialize()
await this.push("output", data, header); // Async push with backpressure handling
[INFO!info/PUSH SEMANTICS]
FlowBoxCore.push()is async and waits for downstream acknowledgment.FlowBoxRaw'sfnReadyForNextItem()is synchronous—the data is queued immediately, but you lose the ability toawaitcompletion.
3. Backpressure
FlowBoxRaw: Manual control via callback timing. You decide when to call fnReadyForNextItem().
FlowBoxCore: Automatic via Promise resolution. The runtime tracks clearCount and only resolves push() when downstream is ready.
[WARNING!warning/DEADLOCK RISK] With
FlowBoxRaw, circular dependencies (A pushes to B, B pushes to A in the same callback) cause deadlocks.FlowBoxCoremitigates this with its RxJS subject-based buffering, but circular data flow is still an anti-pattern.
When to Use Each
Use FlowBoxCore when:
- Building typical pipes that transform data
- Need async processing (DB queries, API calls)
- Want simpler, more maintainable code
- Don't need fine-grained control over message timing
Use FlowBoxRaw when:
- Implementing sources that emit on schedule
- Need custom protocol handling
- Require precise control over message timing
- Building specialized boxes with unique lifecycle needs
[NOTE!lightbulb/MOSTLY CORE] The
ConstantSourceexample usesFlowBoxRawbecause it needs to emit immediately inonOutputReady(). For pipes and sinks,FlowBoxCoreis almost always the better choice.
Lifecycle Hooks
Both FlowBoxRaw and FlowBoxCore support these lifecycle methods:
Constructor
constructor(protected initStruct: FlowBoxInitParams) {
// Access runtime context
const jobId = initStruct.runtimeContext.jobId;
// Increment logger ref count if using flowBoxLogger
flowBoxLogger.increaseRefCount(jobId);
}
initialize()
async initialize() {
// Async setup
await this.db.connect();
}
onDestroy() (via FlowBoxDestroy interface)
public onDestroy(): void {
// Cleanup
flowBoxLogger.decreaseRefCount(this.jobId);
}
[ERROR!error/REF COUNT LEAK] Always pair
increaseRefCount()withdecreaseRefCount()inonDestroy(). Forgetting this keeps the logger socket alive, preventing worker shutdown.
Internal Implementation Details
FlowBoxCore's RxJS Subjects
FlowBoxCore uses RxJS Subject instances to coordinate input/output:
private outputSubjects: { [outputId: string]: InOutSubject } = {};
private createIOSubjects(): InOutSubject {
const newIOSubject = {
outputSubject: new Subject<[(data: Buffer, header: Buffer) => any]>(),
inputSubject: new Subject<[Buffer, Buffer, () => any]>(),
clearCount: 0
};
zip([newIOSubject.inputSubject, newIOSubject.outputSubject]).subscribe(([inputs, outputs]) => {
const [outputResolver] = outputs;
const [inputData, inputHeader, inputResolver] = inputs;
newIOSubject.clearCount--;
outputResolver(inputData, inputHeader);
inputResolver();
});
return newIOSubject;
}
[INFO!info/ZIP COORDINATION] The
zip()operator ensures that everypush()(output) is matched with anonOutputReady()(input from downstream). This creates a lock-step flow control mechanism.
clearCount Mechanism
The clearCount field tracks buffered items:
- Incremented on
push() - Decreased when downstream acknowledges
- If
clearCount < bufferCount,push()returns immediately without waiting
This enables configurable batching while preventing unbounded memory growth.