FlowBoxCore and FlowBoxRaw: Base Classes for C# Workers

This page explains the two base classes you can extend when implementing FlowBox workers in C#: FlowBoxRaw and FlowBoxCore. Understanding the difference is crucial for choosing the right level of abstraction.

Quick Comparison

Feature FlowBoxRaw FlowBoxCore FlowBoxCore<TInput,TOutput>
Abstraction Level Low-level, manual High-level, async Typed, automatic serialization
Input Handling Implement IFlowBoxSink.OnInputReceived() Implement abstract OnInput() Implement OnInput(string, TInput)
Output Handling Implement IFlowBoxSource.OnOutputReady() Use await Push() Use await Push(string, TOutput)
Backpressure Manual via callback Automatic via Task Automatic via Task
Serialization Manual Manual (header-driven) Automatic
Best For Custom protocols, fine-grained control Typical transforms, aggregations Well-defined data contracts

[NOTE!lightbulb/CHOOSE YOUR BASE] Start with FlowBoxCore<TInput, TOutput> for typed data flows, or FlowBoxCore for maximum flexibility. Only use FlowBoxRaw if you need to implement sources or have very specific control requirements.


FlowBoxRaw: The Low-Level Base Class

FlowBoxRaw is the minimal base class that all FlowBox implementations ultimately inherit from. It provides logging and widget management but no helper methods for data flow.

Class Definition

public abstract class FlowBoxRaw
{
    protected FlowBoxRaw(FlowBoxInitParams initParams, IServiceProvider serviceProvider);
    
    public IFlowMakerLogger flowMakerLogger { get; }
    public void Log(LogLevel level, string message);
}

Constructor

protected FlowBoxRaw(FlowBoxInitParams initParams, IServiceProvider serviceProvider)

Parameters:

  • initParams: Contains runtime context, connected I/O, and user options
  • serviceProvider: DI container for accessing services (logger, widget manager, etc.)

[INFO!info/DEPENDENCY INJECTION] The IServiceProvider parameter enables dependency injection. You can resolve custom services in your FlowBox constructor or methods.

Properties

flowMakerLogger: IFlowMakerLogger

public IFlowMakerLogger flowMakerLogger { get; }

Description: Provides access to the logger. Create typed loggers via CreateILogger<T>().

Example:

public class MyBox : FlowBoxRaw
{
    private readonly ILogger _logger;
    
    public MyBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _logger = flowMakerLogger.CreateILogger<MyBox>();
    }
}

Methods

Log()

public void Log(LogLevel level, string message)

Parameters:

  • level: LogLevel enum (Debug, Information, Warning, Error, Critical)
  • message: Log message string

Description: Logs a message with the specified level. Automatically includes runtime context.

[ERROR!error/RUNTIME CONTEXT NULL] The Log() method throws ArgumentNullException if initParams.RuntimeContext is null. This happens if the FlowBox wasn't properly initialized by the scheduler.


IFlowBoxSource Interface

Sources emit data without receiving input. Implement OnOutputReady() to push data downstream.

Interface Definition

public interface IFlowBoxSource
{
    void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut);
}

Method Signature

void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)

Parameters:

  • outputId: Byte array containing the output port identifier
  • header: Header object containing event ID and serialization method
  • pushOut: Delegate to invoke when you have data ready to send

Delegate Type:

public delegate void PushOutDelegate(Header header, byte[] data);

Description: Called by the runtime when downstream is ready to receive. Invoke pushOut(header, data) with your payload.

[WARNING!warning/EAGER EMISSION PATTERN] Sources typically emit asynchronously in OnOutputReady() without waiting for input. This initiates the data flow. If you don't call pushOut(), the pipeline stalls—downstream never receives data.

Example: Source with Async Emission

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;

public class IntervalSource : FlowBoxRaw, IFlowBoxSource
{
    private readonly int _intervalMs;
    private int _counter;

    public IntervalSource(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _intervalMs = initParams.OptionsAs<SourceOptions>().IntervalMs;
    }

    public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
    {
        _ = Task.Run(async () =>
        {
            await Task.Delay(_intervalMs);
            
            _counter++;
            var data = new { Counter = _counter, Timestamp = DateTime.UtcNow };
            
            var serializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
            pushOut(header, serializer.Serialize(data));
        });
    }
    
    private record SourceOptions { public int IntervalMs { get; set; } = 1000; }
}

[ERROR!error/ASYNC FIRE_AND_FORGET] In OnOutputReady(), you must start an async task (e.g., Task.Run()). If you await directly, you block the callback and the runtime waits forever. The pattern is: start task → return immediately → task calls pushOut() when ready.


IFlowBoxSink Interface

Sinks receive data without emitting output. Implement OnInputReceived() to process incoming messages.

Interface Definition

public interface IFlowBoxSink
{
    void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext);
}

Method Signature

void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)

Parameters:

  • inputId: Byte array containing the input port identifier
  • header: Header object containing event ID and serialization method
  • data: Byte array containing the serialized payload
  • onReadyForNext: Callback to invoke when ready for the next message

Description: Called when a message arrives on an input port. Process the data and call onReadyForNext() to signal readiness for the next message.

[NOTE!sync/BACKPRESSURE SIGNALING] Calling onReadyForNext() signals to the runtime that you can accept another message. For long-running processing, await completion before calling the callback to avoid overwhelming the sink.

Example: Raw Sink

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;

public class LoggingSink : FlowBoxRaw, IFlowBoxSink
{
    private readonly ILogger _logger;

    public LoggingSink(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _logger = flowMakerLogger.CreateILogger<LoggingSink>();
    }

    public void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)
    {
        var serializer = Serialization.GetSerializer(header);
        var input = serializer.Deserialize<Dictionary<string, object>>(data);

        _logger.LogInformation("Received data: {Input}", input);

        // Signal readiness immediately (synchronous processing)
        onReadyForNext();
    }
}

FlowBoxCore: The High-Level Base Class

FlowBoxCore implements both IFlowBoxSink and IFlowBoxSource, providing async-friendly methods that hide the complexity of the low-level callbacks.

Class Definition

public abstract class FlowBoxCore : FlowBoxRaw, IFlowBoxSink, IFlowBoxSource, IFlowBoxDestroy
{
    protected FlowBoxCore(FlowBoxInitParams metadata, IServiceProvider serviceProvider);
    
    protected abstract Task OnInput(byte[] inputId, Header header, byte[] data);
    protected virtual async Task Push(string outputId, Header header, byte[] data, int outputBufferSize = 0);
    public virtual void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut);
    public virtual void OnDestroy();
}

Abstract Methods

OnInput()

protected abstract Task OnInput(byte[] inputId, Header header, byte[] data)

Parameters:

  • inputId: Byte array containing the input port identifier
  • header: Header object containing event ID and serialization method
  • data: Byte array containing the serialized payload

Description: Override this method to implement your pipe/sink logic. The runtime automatically calls onReadyForNext() after your Task completes.

[NOTE!async/AWAIT NATURALNESS] Unlike IFlowBoxSink.OnInputReceived(), you don't manually call a readiness callback. Just await your async operations and return—the base class handles the rest. This makes FlowBoxCore ideal for database queries, API calls, or any async processing.

Example: Core Pipe

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;

public class TransformPipe : FlowBoxCore
{
    protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
    {
        var serializer = Serialization.GetSerializer(header);
        var input = serializer.Deserialize<Dictionary<string, object>>(data);

        // Transform
        var output = new Dictionary<string, object>(input)
        {
            ["transformed"] = true,
            ["timestamp"] = DateTime.UtcNow.ToString("O")
        };

        // Push downstream
        await Push("output", header, serializer.Serialize(output));
    }
}

Instance Methods

Push()

protected virtual async Task Push(string outputId, Header header, byte[] data, int outputBufferSize = 0)

Parameters:

  • outputId: String identifier of the output port (e.g., "output")
  • header: Header object containing event ID and serialization method
  • data: Byte array containing the serialized payload
  • outputBufferSize: Optional buffer count for flow control (default: 0)

Description: Push data to an output port. Returns a Task that completes when downstream acknowledges readiness.

[NOTE!sync/BUFFERING BEHAVIOR] The outputBufferSize parameter controls how many items can be buffered before backpressure kicks in. With outputBufferSize = 0 (default), each Push() waits for acknowledgment. Set to a positive number to allow batching, but be aware this increases memory usage.

OnOutputReady() (inherited)

public virtual void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)

Description: Implemented by FlowBoxCore to integrate with the internal Rx subject system. Typically you don't override this—use Push() instead.

OnInputReceived() (inherited)

public virtual async void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)

Description: Implemented by FlowBoxCore to call your OnInput() implementation and automatically invoke onReadyForNext() after the Task completes.

OnDestroy() (inherited)

public virtual void OnDestroy()

Description: Called when the FlowBox is being destroyed. Disposes the internal Rx subjects.

[ERROR!error/ALWAYS CALL SUPER] Always call base.OnDestroy() in subclasses to ensure proper cleanup. Forgetting this keeps Rx subjects alive, causing resource leaks and preventing clean worker shutdown.


FlowBoxCore<TInput, TOutput>: Typed Base Class

The simple model provides a typed wrapper that handles serialization automatically:

public abstract class FlowBoxCore<TInput, TOutput> : FlowBoxCore
{
    protected FlowBoxCore(FlowBoxInitParams metadata, IServiceProvider serviceProvider);
    
    protected ISerializer Serializer { get; init; }
    protected async Task Push(string outputId, TOutput data, int bufferCount = 0);
    protected abstract Task OnInput(string inputId, TInput data);
}

Example: Typed Pipe

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Simple;

public class MultiplyPipe : FlowBoxCore<MultiplyInput, MultiplyOutput>
{
    protected override async Task OnInput(string inputId, MultiplyInput data)
    {
        var output = new MultiplyOutput
        {
            Value = data.Value * data.Multiplier,
            Timestamp = DateTime.UtcNow
        };

        // Serialization handled automatically
        await Push("output", output);
    }
}

public record MultiplyInput(double Value, double Multiplier);
public record MultiplyOutput(double Value, DateTime Timestamp);

[NOTE!lightbulb/TYPE SAFETY] The typed model automatically handles serialization/deserialization based on your type parameters. Use it when you have well-defined data contracts and want compile-time type safety.


OutputSynchronizer: Internal Mechanism

FlowBoxCore uses InOutSubject instances (Rx subjects) to coordinate input/output for each output port:

protected class InOutSubject
{
    public int MsgCountInBuffer { get; set; }
    public Subject<(Header, byte[], Action)> InputSubject = new();
    public Subject<PushOutDelegate> OutputSubject = new();

    public InOutSubject()
    {
        InputSubject.Zip(OutputSubject, (inputs, outputs) => (Inputs: inputs, Outputs: outputs))
            .Subscribe(pair =>
            {
                var (inputHeader, inputData, inputResolver) = pair.Inputs;
                var outputResolver = pair.Outputs;

                MsgCountInBuffer--;
                outputResolver(inputHeader, inputData);
                inputResolver();
            });
    }
}

[INFO!info/ZIP COORDINATION] The Zip() operator (from System.Reactive) ensures that every Push() (output) is matched with an OnOutputReady() (input from downstream). This creates a lock-step flow control mechanism.

MsgCountInBuffer Mechanism

The MsgCountInBuffer field tracks buffered items:

  • Incremented on Push()
  • Decreased when downstream acknowledges
  • If MsgCountInBuffer < outputBufferSize, Push() completes immediately without waiting

This enables configurable batching while preventing unbounded memory growth.


Key Differences: Core vs Raw

1. Input Handling

FlowBoxRaw + IFlowBoxSink:

public void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)
{
    var serializer = Serialization.GetSerializer(header);
    var input = serializer.Deserialize<Dictionary<string, object>>(data);
    
    // Synchronous processing
    Log(LogLevel.Information, "Processed data");
    
    onReadyForNext(); // Manual callback
}

FlowBoxCore:

protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
    var serializer = Serialization.GetSerializer(header);
    var input = serializer.Deserialize<Dictionary<string, object>>(data);
    
    // Async processing with await
    await ProcessData(input);
    
    // No callback needed—runtime handles it
}

[ERROR!error/CALLBACK FORGETTING] With FlowBoxRaw, forgetting to call onReadyForNext() blocks the entire pipeline. With FlowBoxCore, the async approach makes this impossible—you either complete or throw, and the runtime handles both cases.

2. Output Handling

FlowBoxRaw + IFlowBoxSource:

public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
{
    var data = serializer.Serialize(new { Value = 42 });
    pushOut(header, data); // Immediate push
}

FlowBoxCore:

// In OnInput() or other methods
await Push("output", header, data); // Async push with backpressure handling

[INFO!info/PUSH SEMANTICS] FlowBoxCore.Push() is async and waits for downstream acknowledgment. FlowBoxRaw's pushOut delegate is synchronous—the data is queued immediately, but you lose the ability to await completion.

3. Backpressure

FlowBoxRaw: Manual control via callback timing. You decide when to call onReadyForNext().

FlowBoxCore: Automatic via Task completion. The runtime tracks MsgCountInBuffer and only resolves Push() when downstream is ready.

[WARNING!warning/DEADLOCK RISK] With FlowBoxRaw, circular dependencies (A pushes to B, B pushes to A in the same callback) cause deadlocks. FlowBoxCore mitigates this with its Rx subject-based buffering, but circular data flow is still an anti-pattern.


When to Use Each

Use FlowBoxCore<TInput, TOutput> when:

  • You have well-defined data contracts (records, POCOs)
  • Want type safety and automatic serialization
  • Building typical pipes that transform data
  • Don't need fine-grained control over headers

Use FlowBoxCore when:

  • Building typical pipes that transform data
  • Need async processing (DB queries, API calls)
  • Want flexibility with header/serialization control
  • Don't need fine-grained control over message timing

Use FlowBoxRaw when:

  • Implementing sources that emit on schedule
  • Need custom protocol handling
  • Require precise control over message timing
  • Building specialized boxes with unique lifecycle needs

[NOTE!lightbulb/MOSTLY CORE] The IntervalSource example uses FlowBoxRaw because it needs to emit asynchronously in OnOutputReady(). For pipes and sinks, FlowBoxCore or FlowBoxCore<TInput, TOutput> is almost always the better choice.


Lifecycle Hooks

Both FlowBoxRaw and FlowBoxCore support these lifecycle methods:

Constructor

public MyBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
    : base(initParams, serviceProvider)
{
    // Access runtime context
    var jobId = initParams.RuntimeContext.JobId;
    var nodeId = initParams.RuntimeContext.NodeId;
    
    // Access options
    var options = initParams.OptionsAs<MyOptions>();
    
    // Access connected I/O
    var outputs = initParams.ConnectedIO?.OutputIds;
}

OnDestroy()

public override void OnDestroy()
{
    base.OnDestroy(); // Disposes Rx subjects
    // Additional cleanup: close DB connections, etc.
}

[ERROR!error/RESOURCE LEAK] Always call base.OnDestroy() to ensure proper disposal of Rx subjects. Forgetting this keeps subjects alive, preventing worker shutdown and causing memory leaks.


Common Patterns

Accessing Runtime Context

public class MyBox : FlowBoxCore
{
    private readonly string _jobId;
    private readonly string _nodeId;

    public MyBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _jobId = initParams.RuntimeContext?.JobId ?? throw new ArgumentNullException();
        _nodeId = initParams.RuntimeContext?.NodeId ?? throw new ArgumentNullException();
    }
}

Accessing Options

public record PipeOptions { public int BufferSize { get; set; } = 10; public bool EnableLogging { get; set; } = true; }

public class ConfigurablePipe : FlowBoxCore
{
    private readonly PipeOptions _options;

    public ConfigurablePipe(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _options = initParams.OptionsAs<PipeOptions>();
    }
}

Combining with Widgets

public class WidgetBox : FlowBoxCore
{
    private readonly FlowMakerWidgetManager _widgetManager;

    public WidgetBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _widgetManager = new FlowMakerWidgetManager(
            initParams.RuntimeContext,
            serviceProvider.GetRequiredService<SocketIOFactory>(),
            loggerAddress);

        _widgetManager.AddWidget("counter", "exposure_plus_1")
            .AddWidgetElement("counter", "title", new FlowBoxWidgetTitleElement
            {
                Title = "Counter",
                Subtitle = "Counts messages"
            })
            .AddWidgetElement("counter", "value", new FlowBoxWidgetValueElement
            {
                Contents = "Count: {{count}}",
                Icon = "exposure_plus_1"
            })
            .SetContextValue("count", 0)
            .PublishWidgets();

        _widgetManager.EventReceived += (sender, e) =>
        {
            // Handle widget events
        };
    }
}

Differences from TypeScript/Python SDKs

Feature C# SDK TypeScript SDK Python SDK
Base class name FlowBoxRaw, FlowBoxCore FlowBoxRaw, FlowBoxCore FlowBoxRaw, FlowBoxCore
Input method protected abstract Task OnInput() protected abstract async Task onInput() async def on_input()
Output method protected virtual async Task Push() public async push() async def push()
Rx library System.Reactive (Rx.NET) RxJS RxPY
Callback type Action, PushOutDelegate (data: Buffer, header: Buffer) => any Callable[[], None]
Typed model FlowBoxCore<TInput, TOutput> Not available Not available

[INFO!info/SAME PATTERN] C#, TypeScript, and Python SDKs follow the same design pattern. The main differences are language-specific (async/await syntax, type systems, Rx variants). C# adds a typed model not available in other SDKs.