FlowBox Implementations (Source, Pipe, Sink)

This guide covers how to implement the three types of FlowBox nodes in C#: Source, Pipe, and Sink.

Worker Entrypoint & Registration

Every C# worker starts with an entrypoint that registers FlowBox implementations and starts the NetMQ event loop:

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

var services = new ServiceCollection();
services.AddLogging();
services.AddSingleton<IFlowMakerLogger, FlowMakerLogger>();
services.AddSingleton<SocketIOFactory>();

var serviceProvider = services.BuildServiceProvider();

var clientBuilder = new FlowMakerClientBuilder(serviceProvider)
    .SetRouterTransportAddress("tcp://*:5560")
    .SetRuntimeHttpAddress("http://localhost:3120")
    .SetLoggerAddress("http://localhost:3040");

var registration = new RegistrationInfoBuilder(FlowElementType.Pipe)
    .SetId("myorg/my-csharp-pipe")
    .SetDisplayName("My C# Pipe")
    .SetCurrentVersion("1.0.0")
    .SetIcon("transform")
    .SetInputs(new[] { new TypeDefinition("input") })
    .SetOutputs(new[] { new TypeDefinition("output") })
    .Build((initParams, sp) => new MyPipe(initParams, sp));

clientBuilder.DeclareFlowElement(registration);

var client = clientBuilder.Build();
await client.Run();

[INFO!info/RUNTIME INITIALIZATION] The client.Run() method starts a NetMQ poller thread that runs indefinitely. Code after await client.Run() won't execute—the poller keeps running until the process receives SIGINT/SIGTERM.

[WARNING!warning/SINGLETON REGISTRATION] FlowMakerClientBuilder maintains an internal registry of FlowBox implementations. Declaring the same FlowBox ID twice causes a collision. Always declare each box type exactly once per worker.

Configuration options:

  • RouterTransportAddress: NetMQ router socket address (default: tcp://*:5560)
  • RuntimeHttpAddress: Scheduler HTTP API endpoint (default: http://localhost:3120)
  • LoggerAddress: Socket.IO endpoint for logs/widgets (default: http://localhost:3040)

Pipe Implementation (Advanced Model)

A minimal pipe that transforms data:

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
using Microsoft.Extensions.Logging;

public class MyPipe : FlowBoxCore
{
    private readonly ILogger _logger;

    public MyPipe(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _logger = flowMakerLogger.CreateILogger(GetType().Name);
    }

    protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
    {
        var serializer = Serialization.GetSerializer(header);
        var input = serializer.Deserialize<Dictionary<string, object>>(data);

        // Transform
        var output = new Dictionary<string, object>(input)
        {
            ["processed"] = true,
            ["timestamp"] = DateTime.UtcNow.ToString("O")
        };

        _logger.LogInformation("Processed data: {Output}", output);

        // Push downstream using the same header (preserves serialization method)
        await Push("output", header, serializer.Serialize(output));
    }
}

[NOTE!sync/BACKPRESSURE HANDLING] The await Push() call blocks until the downstream receiver signals readiness via CAN_SEND_NEXT. Long-running processing in OnInput() delays acknowledgment and can stall the entire pipeline—offload heavy work to Task.Run() or async queues.

[WARNING!warning/HEADER REUSE] Reusing the incoming header for output preserves the serialization method. For downstream data flow, this is correct. For control messages (e.g., custom ACKs), create a new header with the appropriate event ID.


Pipe Implementation (Simple Model)

The simple model provides typed wrappers for easier implementation:

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Simple;
using Microsoft.Extensions.Logging;

public class MyTypedPipe : FlowBoxCore<InputData, OutputData>
{
    private readonly ILogger _logger;

    public MyTypedPipe(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _logger = flowMakerLogger.CreateILogger(GetType().Name);
    }

    protected override async Task OnInput(string inputId, InputData data)
    {
        // Transform
        var output = new OutputData
        {
            Value = data.Value * 2,
            Processed = true,
            Timestamp = DateTime.UtcNow
        };

        _logger.LogInformation("Transformed: {Output}", output);

        // Push downstream (serializer handled automatically)
        await Push("output", output);
    }
}

public record InputData(double Value);
public record OutputData(double Value, bool Processed, DateTime Timestamp);

[NOTE!lightbulb/TYPED ABSTRACTION] The simple model automatically handles serialization/deserialization based on your type parameters. Use it when you have well-defined data contracts.


Source Implementation

A source that emits values periodically:

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;

public class IntervalSource : FlowBoxRaw, IFlowBoxSource
{
    private readonly ILogger _logger;
    private readonly int _intervalMs;
    private int _counter;

    public IntervalSource(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _logger = flowMakerLogger.CreateILogger(GetType().Name);
        _intervalMs = initParams.OptionsAs<SourceOptions>().IntervalMs;
    }

    public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
    {
        _ = Task.Run(async () =>
        {
            await Task.Delay(_intervalMs);

            _counter++;
            var data = new { Counter = _counter, Timestamp = DateTime.UtcNow };

            _logger.LogInformation("Emitting counter: {Counter}", _counter);

            var serializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
            pushOut(header, serializer.Serialize(data));
        });
    }

    private record SourceOptions { public int IntervalMs { get; set; } = 1000; }
}

[NOTE!info/EAGER EMISSION PATTERN] Sources typically emit asynchronously in OnOutputReady() without waiting for input. This initiates the data flow. The pattern is: start task → return immediately → task calls pushOut() when ready.

[ERROR!error/ASYNC FIRE_AND_FORGET] In OnOutputReady(), you must start an async task (e.g., Task.Run()). If you await directly, you block the callback and the runtime waits forever.


Sink Implementation

A sink that processes incoming data:

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
using Microsoft.Extensions.Logging;

public class MySink : FlowBoxRaw, IFlowBoxSink
{
    private readonly ILogger _logger;

    public MySink(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _logger = flowMakerLogger.CreateILogger(GetType().Name);
    }

    public void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)
    {
        var serializer = Serialization.GetSerializer(header);
        var input = serializer.Deserialize<Dictionary<string, object>>(data);

        // Process data (e.g., save to database)
        _logger.LogInformation("Received data: {Input}", input);

        // Signal readiness immediately (synchronous processing)
        onReadyForNext();
    }
}

[NOTE!sync/NO OUTPUT NEEDED] Sinks don't implement IFlowBoxSource, so they never call Push(). The pipeline ends at the sink—no downstream acknowledgment is needed.


Simple Model: AbstractSource and AbstractSink

The simple model provides base classes that handle serialization automatically:

AbstractSource

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Simple;
using Industream.FlowMaker.Sdk.Serializations;

public class RandomSource : AbstractSource<RandomData>
{
    public RandomSource(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
    }

    public override void OnOutputReady(string outputId, Action<RandomData> pushOut)
    {
        var random = new Random();
        pushOut(new RandomData { Value = random.NextDouble() });
    }
}

public record RandomData(double Value);

AbstractSink

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Simple;

public class LoggingSink : AbstractSink<LogData>
{
    private readonly ILogger _logger;

    public LoggingSink(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _logger = flowMakerLogger.CreateILogger(GetType().Name);
    }

    public override void OnInputReceived(string inputId, LogData data, Action onReadyForNext)
    {
        _logger.LogInformation("Log entry: {Message} at {Timestamp}",
            data.Message, data.Timestamp);

        onReadyForNext();
    }
}

public record LogData(string Message, DateTime Timestamp);

[NOTE!lightbulb/WHEN TO USE SIMPLE MODEL] Use AbstractSource<T> and AbstractSink<T> when you want typed data handling without manual serialization. For pipes with complex logic, FlowBoxCore (advanced model) provides more control.


Key Implementation Patterns

Logger Usage

// Log with LogLevel
Log(LogLevel.Information, "Processing started");

// Use injected ILogger
var logger = flowMakerLogger.CreateILogger(GetType().Name);
logger.LogInformation("Processed {Count} items", count);

Widget Management

var widgetManager = new FlowMakerWidgetManager(
    initParams.RuntimeContext,
    serviceProvider.GetRequiredService<SocketIOFactory>(),
    loggerAddress);

widgetManager.AddWidget("counter", "exposure_plus_1")
    .AddWidgetElement("counter", "title", new FlowBoxWidgetTitleElement
    {
        Title = "Counter",
        Subtitle = "Counts messages"
    })
    .AddWidgetElement("counter", "value", new FlowBoxWidgetValueElement
    {
        Contents = "Count: {{count}}",
        Icon = "exposure_plus_1"
    })
    .SetContextValue("count", 0)
    .PublishWidgets();

widgetManager.EventReceived += (sender, e) =>
{
    // Handle widget events
    widgetManager.SetContextValue("count", currentValue + 1);
};

Lifecycle Hooks

public class MyBox : FlowBoxCore
{
    public MyBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        // Constructor: initialize state
    }

    protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
    {
        // Process incoming data
    }

    public override void OnDestroy()
    {
        base.OnDestroy(); // Disposes subjects
        // Additional cleanup: close DB connections, etc.
    }
}

[INFO!info/AUTO DISPOSAL] FlowBoxCore.OnDestroy() automatically disposes the internal Rx subjects. Always call base.OnDestroy() to ensure proper cleanup and prevent resource leaks.


Serialization

C# SDK uses MessagePack by default with JSON support:

using Industream.FlowMaker.Sdk.Serializations;

// Get serializer from header (respects incoming format)
var serializer = Serialization.GetSerializer(header);
var data = serializer.Deserialize<MyType>(bytes);

// Or use specific serializer
var msgpackSerializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
var jsonSerializer = Serialization.GetSerializer(SerializationMethods.Json);

[NOTE!info/HEADER-DRIVEN FORMAT] The Header class contains the serialization method. Use Serialization.GetSerializer(header) to automatically select the correct serializer based on the incoming message format.


Common Pitfalls

Missing Builder in Registration

// !!! WRONG !!! : Missing builder
var registration = new RegistrationInfoBuilder(FlowElementType.Pipe)
    .SetId("myorg/my-pipe")
    .SetCurrentVersion("1.0.0");
// Missing .Build((initParams, sp) => new MyPipe(initParams, sp))

// +++ CORRECT +++ : Include builder
var registration = new RegistrationInfoBuilder(FlowElementType.Pipe)
    .SetId("myorg/my-pipe")
    .SetCurrentVersion("1.0.0")
    .Build((initParams, sp) => new MyPipe(initParams, sp));

[ERROR!error/BUILDER REQUIRED] Without .Build(), the FlowMakerClient can't instantiate your FlowBox and throws an exception during initialization.

Blocking in OnOutputReady

// !!! WRONG !!!: Blocking the callback
public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
{
    Thread.Sleep(1000); // Blocks runtime!
    pushOut(header, data);
}

// +++ CORRECT +++: Async task
public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
{
    _ = Task.Run(async () =>
    {
        await Task.Delay(1000);
        pushOut(header, data);
    });
}

[WARNING!warning/EVENT LOOP BLOCKING] C#'s NetMQ poller is single-threaded for message processing. Blocking operations (e.g., Thread.Sleep(), sync I/O) freeze the entire worker. Use Task.Run() or async methods.

Not Awaiting Push

// !!! WRONG !!!: Fire-and-forget push
protected override Task OnInput(...)
{
    Push("output", header, data); // Not awaited!
    return Task.CompletedTask;
}

// +++ CORRECT +++ Await push
protected override async Task OnInput(...)
{
    await Push("output", header, data);
}

[ERROR!error/BACKPRESSURE BYPASS] Not awaiting Push() bypasses backpressure handling. The runtime won't know when downstream is ready, potentially causing data loss or memory issues.