FlowBoxCore and FlowBoxRaw: Base Classes for C# Workers
This page explains the two base classes you can extend when implementing FlowBox workers in C#: FlowBoxRaw and FlowBoxCore. Understanding the difference is crucial for choosing the right level of abstraction.
Quick Comparison
| Feature | FlowBoxRaw |
FlowBoxCore |
FlowBoxCore<TInput,TOutput> |
|---|---|---|---|
| Abstraction Level | Low-level, manual | High-level, async | Typed, automatic serialization |
| Input Handling | Implement IFlowBoxSink.OnInputReceived() |
Implement abstract OnInput() |
Implement OnInput(string, TInput) |
| Output Handling | Implement IFlowBoxSource.OnOutputReady() |
Use await Push() |
Use await Push(string, TOutput) |
| Backpressure | Manual via callback | Automatic via Task | Automatic via Task |
| Serialization | Manual | Manual (header-driven) | Automatic |
| Best For | Custom protocols, fine-grained control | Typical transforms, aggregations | Well-defined data contracts |
[NOTE!lightbulb/CHOOSE YOUR BASE] Start with
FlowBoxCore<TInput, TOutput>for typed data flows, orFlowBoxCorefor maximum flexibility. Only useFlowBoxRawif you need to implement sources or have very specific control requirements.
FlowBoxRaw: The Low-Level Base Class
FlowBoxRaw is the minimal base class that all FlowBox implementations ultimately inherit from. It provides logging and widget management but no helper methods for data flow.
Class Definition
public abstract class FlowBoxRaw
{
protected FlowBoxRaw(FlowBoxInitParams initParams, IServiceProvider serviceProvider);
public IFlowMakerLogger flowMakerLogger { get; }
public void Log(LogLevel level, string message);
}
Constructor
protected FlowBoxRaw(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
Parameters:
initParams: Contains runtime context, connected I/O, and user optionsserviceProvider: DI container for accessing services (logger, widget manager, etc.)
[INFO!info/DEPENDENCY INJECTION] The
IServiceProviderparameter enables dependency injection. You can resolve custom services in your FlowBox constructor or methods.
Properties
flowMakerLogger: IFlowMakerLogger
public IFlowMakerLogger flowMakerLogger { get; }
Description:
Provides access to the logger. Create typed loggers via CreateILogger<T>().
Example:
public class MyBox : FlowBoxRaw
{
private readonly ILogger _logger;
public MyBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_logger = flowMakerLogger.CreateILogger<MyBox>();
}
}
Methods
Log()
public void Log(LogLevel level, string message)
Parameters:
level:LogLevelenum (Debug, Information, Warning, Error, Critical)message: Log message string
Description: Logs a message with the specified level. Automatically includes runtime context.
[ERROR!error/RUNTIME CONTEXT NULL] The
Log()method throwsArgumentNullExceptionifinitParams.RuntimeContextis null. This happens if the FlowBox wasn't properly initialized by the scheduler.
IFlowBoxSource Interface
Sources emit data without receiving input. Implement OnOutputReady() to push data downstream.
Interface Definition
public interface IFlowBoxSource
{
void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut);
}
Method Signature
void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
Parameters:
outputId: Byte array containing the output port identifierheader: Header object containing event ID and serialization methodpushOut: Delegate to invoke when you have data ready to send
Delegate Type:
public delegate void PushOutDelegate(Header header, byte[] data);
Description:
Called by the runtime when downstream is ready to receive. Invoke pushOut(header, data) with your payload.
[WARNING!warning/EAGER EMISSION PATTERN] Sources typically emit asynchronously in
OnOutputReady()without waiting for input. This initiates the data flow. If you don't callpushOut(), the pipeline stalls—downstream never receives data.
Example: Source with Async Emission
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
public class IntervalSource : FlowBoxRaw, IFlowBoxSource
{
private readonly int _intervalMs;
private int _counter;
public IntervalSource(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_intervalMs = initParams.OptionsAs<SourceOptions>().IntervalMs;
}
public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
{
_ = Task.Run(async () =>
{
await Task.Delay(_intervalMs);
_counter++;
var data = new { Counter = _counter, Timestamp = DateTime.UtcNow };
var serializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
pushOut(header, serializer.Serialize(data));
});
}
private record SourceOptions { public int IntervalMs { get; set; } = 1000; }
}
[ERROR!error/ASYNC FIRE_AND_FORGET] In
OnOutputReady(), you must start an async task (e.g.,Task.Run()). If youawaitdirectly, you block the callback and the runtime waits forever. The pattern is: start task → return immediately → task callspushOut()when ready.
IFlowBoxSink Interface
Sinks receive data without emitting output. Implement OnInputReceived() to process incoming messages.
Interface Definition
public interface IFlowBoxSink
{
void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext);
}
Method Signature
void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)
Parameters:
inputId: Byte array containing the input port identifierheader: Header object containing event ID and serialization methoddata: Byte array containing the serialized payloadonReadyForNext: Callback to invoke when ready for the next message
Description:
Called when a message arrives on an input port. Process the data and call onReadyForNext() to signal readiness for the next message.
[NOTE!sync/BACKPRESSURE SIGNALING] Calling
onReadyForNext()signals to the runtime that you can accept another message. For long-running processing, await completion before calling the callback to avoid overwhelming the sink.
Example: Raw Sink
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
public class LoggingSink : FlowBoxRaw, IFlowBoxSink
{
private readonly ILogger _logger;
public LoggingSink(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_logger = flowMakerLogger.CreateILogger<LoggingSink>();
}
public void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)
{
var serializer = Serialization.GetSerializer(header);
var input = serializer.Deserialize<Dictionary<string, object>>(data);
_logger.LogInformation("Received data: {Input}", input);
// Signal readiness immediately (synchronous processing)
onReadyForNext();
}
}
FlowBoxCore: The High-Level Base Class
FlowBoxCore implements both IFlowBoxSink and IFlowBoxSource, providing async-friendly methods that hide the complexity of the low-level callbacks.
Class Definition
public abstract class FlowBoxCore : FlowBoxRaw, IFlowBoxSink, IFlowBoxSource, IFlowBoxDestroy
{
protected FlowBoxCore(FlowBoxInitParams metadata, IServiceProvider serviceProvider);
protected abstract Task OnInput(byte[] inputId, Header header, byte[] data);
protected virtual async Task Push(string outputId, Header header, byte[] data, int outputBufferSize = 0);
public virtual void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut);
public virtual void OnDestroy();
}
Abstract Methods
OnInput()
protected abstract Task OnInput(byte[] inputId, Header header, byte[] data)
Parameters:
inputId: Byte array containing the input port identifierheader: Header object containing event ID and serialization methoddata: Byte array containing the serialized payload
Description:
Override this method to implement your pipe/sink logic. The runtime automatically calls onReadyForNext() after your Task completes.
[NOTE!async/AWAIT NATURALNESS] Unlike
IFlowBoxSink.OnInputReceived(), you don't manually call a readiness callback. Justawaityour async operations and return—the base class handles the rest. This makesFlowBoxCoreideal for database queries, API calls, or any async processing.
Example: Core Pipe
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
public class TransformPipe : FlowBoxCore
{
protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
var serializer = Serialization.GetSerializer(header);
var input = serializer.Deserialize<Dictionary<string, object>>(data);
// Transform
var output = new Dictionary<string, object>(input)
{
["transformed"] = true,
["timestamp"] = DateTime.UtcNow.ToString("O")
};
// Push downstream
await Push("output", header, serializer.Serialize(output));
}
}
Instance Methods
Push()
protected virtual async Task Push(string outputId, Header header, byte[] data, int outputBufferSize = 0)
Parameters:
outputId: String identifier of the output port (e.g.,"output")header: Header object containing event ID and serialization methoddata: Byte array containing the serialized payloadoutputBufferSize: Optional buffer count for flow control (default: 0)
Description: Push data to an output port. Returns a Task that completes when downstream acknowledges readiness.
[NOTE!sync/BUFFERING BEHAVIOR] The
outputBufferSizeparameter controls how many items can be buffered before backpressure kicks in. WithoutputBufferSize = 0(default), eachPush()waits for acknowledgment. Set to a positive number to allow batching, but be aware this increases memory usage.
OnOutputReady() (inherited)
public virtual void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
Description:
Implemented by FlowBoxCore to integrate with the internal Rx subject system. Typically you don't override this—use Push() instead.
OnInputReceived() (inherited)
public virtual async void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)
Description:
Implemented by FlowBoxCore to call your OnInput() implementation and automatically invoke onReadyForNext() after the Task completes.
OnDestroy() (inherited)
public virtual void OnDestroy()
Description: Called when the FlowBox is being destroyed. Disposes the internal Rx subjects.
[ERROR!error/ALWAYS CALL SUPER] Always call
base.OnDestroy()in subclasses to ensure proper cleanup. Forgetting this keeps Rx subjects alive, causing resource leaks and preventing clean worker shutdown.
FlowBoxCore<TInput, TOutput>: Typed Base Class
The simple model provides a typed wrapper that handles serialization automatically:
public abstract class FlowBoxCore<TInput, TOutput> : FlowBoxCore
{
protected FlowBoxCore(FlowBoxInitParams metadata, IServiceProvider serviceProvider);
protected ISerializer Serializer { get; init; }
protected async Task Push(string outputId, TOutput data, int bufferCount = 0);
protected abstract Task OnInput(string inputId, TInput data);
}
Example: Typed Pipe
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Simple;
public class MultiplyPipe : FlowBoxCore<MultiplyInput, MultiplyOutput>
{
protected override async Task OnInput(string inputId, MultiplyInput data)
{
var output = new MultiplyOutput
{
Value = data.Value * data.Multiplier,
Timestamp = DateTime.UtcNow
};
// Serialization handled automatically
await Push("output", output);
}
}
public record MultiplyInput(double Value, double Multiplier);
public record MultiplyOutput(double Value, DateTime Timestamp);
[NOTE!lightbulb/TYPE SAFETY] The typed model automatically handles serialization/deserialization based on your type parameters. Use it when you have well-defined data contracts and want compile-time type safety.
OutputSynchronizer: Internal Mechanism
FlowBoxCore uses InOutSubject instances (Rx subjects) to coordinate input/output for each output port:
protected class InOutSubject
{
public int MsgCountInBuffer { get; set; }
public Subject<(Header, byte[], Action)> InputSubject = new();
public Subject<PushOutDelegate> OutputSubject = new();
public InOutSubject()
{
InputSubject.Zip(OutputSubject, (inputs, outputs) => (Inputs: inputs, Outputs: outputs))
.Subscribe(pair =>
{
var (inputHeader, inputData, inputResolver) = pair.Inputs;
var outputResolver = pair.Outputs;
MsgCountInBuffer--;
outputResolver(inputHeader, inputData);
inputResolver();
});
}
}
[INFO!info/ZIP COORDINATION] The
Zip()operator (from System.Reactive) ensures that everyPush()(output) is matched with anOnOutputReady()(input from downstream). This creates a lock-step flow control mechanism.
MsgCountInBuffer Mechanism
The MsgCountInBuffer field tracks buffered items:
- Incremented on
Push() - Decreased when downstream acknowledges
- If
MsgCountInBuffer < outputBufferSize,Push()completes immediately without waiting
This enables configurable batching while preventing unbounded memory growth.
Key Differences: Core vs Raw
1. Input Handling
FlowBoxRaw + IFlowBoxSink:
public void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)
{
var serializer = Serialization.GetSerializer(header);
var input = serializer.Deserialize<Dictionary<string, object>>(data);
// Synchronous processing
Log(LogLevel.Information, "Processed data");
onReadyForNext(); // Manual callback
}
FlowBoxCore:
protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
var serializer = Serialization.GetSerializer(header);
var input = serializer.Deserialize<Dictionary<string, object>>(data);
// Async processing with await
await ProcessData(input);
// No callback needed—runtime handles it
}
[ERROR!error/CALLBACK FORGETTING] With
FlowBoxRaw, forgetting to callonReadyForNext()blocks the entire pipeline. WithFlowBoxCore, the async approach makes this impossible—you either complete or throw, and the runtime handles both cases.
2. Output Handling
FlowBoxRaw + IFlowBoxSource:
public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
{
var data = serializer.Serialize(new { Value = 42 });
pushOut(header, data); // Immediate push
}
FlowBoxCore:
// In OnInput() or other methods
await Push("output", header, data); // Async push with backpressure handling
[INFO!info/PUSH SEMANTICS]
FlowBoxCore.Push()is async and waits for downstream acknowledgment.FlowBoxRaw'spushOutdelegate is synchronous—the data is queued immediately, but you lose the ability toawaitcompletion.
3. Backpressure
FlowBoxRaw: Manual control via callback timing. You decide when to call onReadyForNext().
FlowBoxCore: Automatic via Task completion. The runtime tracks MsgCountInBuffer and only resolves Push() when downstream is ready.
[WARNING!warning/DEADLOCK RISK] With
FlowBoxRaw, circular dependencies (A pushes to B, B pushes to A in the same callback) cause deadlocks.FlowBoxCoremitigates this with its Rx subject-based buffering, but circular data flow is still an anti-pattern.
When to Use Each
Use FlowBoxCore<TInput, TOutput> when:
- You have well-defined data contracts (records, POCOs)
- Want type safety and automatic serialization
- Building typical pipes that transform data
- Don't need fine-grained control over headers
Use FlowBoxCore when:
- Building typical pipes that transform data
- Need async processing (DB queries, API calls)
- Want flexibility with header/serialization control
- Don't need fine-grained control over message timing
Use FlowBoxRaw when:
- Implementing sources that emit on schedule
- Need custom protocol handling
- Require precise control over message timing
- Building specialized boxes with unique lifecycle needs
[NOTE!lightbulb/MOSTLY CORE] The
IntervalSourceexample usesFlowBoxRawbecause it needs to emit asynchronously inOnOutputReady(). For pipes and sinks,FlowBoxCoreorFlowBoxCore<TInput, TOutput>is almost always the better choice.
Lifecycle Hooks
Both FlowBoxRaw and FlowBoxCore support these lifecycle methods:
Constructor
public MyBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
// Access runtime context
var jobId = initParams.RuntimeContext.JobId;
var nodeId = initParams.RuntimeContext.NodeId;
// Access options
var options = initParams.OptionsAs<MyOptions>();
// Access connected I/O
var outputs = initParams.ConnectedIO?.OutputIds;
}
OnDestroy()
public override void OnDestroy()
{
base.OnDestroy(); // Disposes Rx subjects
// Additional cleanup: close DB connections, etc.
}
[ERROR!error/RESOURCE LEAK] Always call
base.OnDestroy()to ensure proper disposal of Rx subjects. Forgetting this keeps subjects alive, preventing worker shutdown and causing memory leaks.
Common Patterns
Accessing Runtime Context
public class MyBox : FlowBoxCore
{
private readonly string _jobId;
private readonly string _nodeId;
public MyBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_jobId = initParams.RuntimeContext?.JobId ?? throw new ArgumentNullException();
_nodeId = initParams.RuntimeContext?.NodeId ?? throw new ArgumentNullException();
}
}
Accessing Options
public record PipeOptions { public int BufferSize { get; set; } = 10; public bool EnableLogging { get; set; } = true; }
public class ConfigurablePipe : FlowBoxCore
{
private readonly PipeOptions _options;
public ConfigurablePipe(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_options = initParams.OptionsAs<PipeOptions>();
}
}
Combining with Widgets
public class WidgetBox : FlowBoxCore
{
private readonly FlowMakerWidgetManager _widgetManager;
public WidgetBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_widgetManager = new FlowMakerWidgetManager(
initParams.RuntimeContext,
serviceProvider.GetRequiredService<SocketIOFactory>(),
loggerAddress);
_widgetManager.AddWidget("counter", "exposure_plus_1")
.AddWidgetElement("counter", "title", new FlowBoxWidgetTitleElement
{
Title = "Counter",
Subtitle = "Counts messages"
})
.AddWidgetElement("counter", "value", new FlowBoxWidgetValueElement
{
Contents = "Count: {{count}}",
Icon = "exposure_plus_1"
})
.SetContextValue("count", 0)
.PublishWidgets();
_widgetManager.EventReceived += (sender, e) =>
{
// Handle widget events
};
}
}
Differences from TypeScript/Python SDKs
| Feature | C# SDK | TypeScript SDK | Python SDK |
|---|---|---|---|
| Base class name | FlowBoxRaw, FlowBoxCore |
FlowBoxRaw, FlowBoxCore |
FlowBoxRaw, FlowBoxCore |
| Input method | protected abstract Task OnInput() |
protected abstract async Task onInput() |
async def on_input() |
| Output method | protected virtual async Task Push() |
public async push() |
async def push() |
| Rx library | System.Reactive (Rx.NET) | RxJS | RxPY |
| Callback type | Action, PushOutDelegate |
(data: Buffer, header: Buffer) => any |
Callable[[], None] |
| Typed model | FlowBoxCore<TInput, TOutput> |
Not available | Not available |
[INFO!info/SAME PATTERN] C#, TypeScript, and Python SDKs follow the same design pattern. The main differences are language-specific (async/await syntax, type systems, Rx variants). C# adds a typed model not available in other SDKs.