FlowBox Implementations (Source, Pipe, Sink)
This guide covers how to implement the three types of FlowBox nodes in C#: Source, Pipe, and Sink.
Worker Entrypoint & Registration
Every C# worker starts with an entrypoint that registers FlowBox implementations and starts the NetMQ event loop:
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
var services = new ServiceCollection();
services.AddLogging();
services.AddSingleton<IFlowMakerLogger, FlowMakerLogger>();
services.AddSingleton<SocketIOFactory>();
var serviceProvider = services.BuildServiceProvider();
var clientBuilder = new FlowMakerClientBuilder(serviceProvider)
.SetRouterTransportAddress("tcp://*:5560")
.SetRuntimeHttpAddress("http://localhost:3120")
.SetLoggerAddress("http://localhost:3040");
var registration = new RegistrationInfoBuilder(FlowElementType.Pipe)
.SetId("myorg/my-csharp-pipe")
.SetDisplayName("My C# Pipe")
.SetCurrentVersion("1.0.0")
.SetIcon("transform")
.SetInputs(new[] { new TypeDefinition("input") })
.SetOutputs(new[] { new TypeDefinition("output") })
.Build((initParams, sp) => new MyPipe(initParams, sp));
clientBuilder.DeclareFlowElement(registration);
var client = clientBuilder.Build();
await client.Run();
[INFO!info/RUNTIME INITIALIZATION] The
client.Run()method starts a NetMQ poller thread that runs indefinitely. Code afterawait client.Run()won't execute—the poller keeps running until the process receives SIGINT/SIGTERM.
[WARNING!warning/SINGLETON REGISTRATION]
FlowMakerClientBuildermaintains an internal registry of FlowBox implementations. Declaring the same FlowBox ID twice causes a collision. Always declare each box type exactly once per worker.
Configuration options:
RouterTransportAddress: NetMQ router socket address (default:tcp://*:5560)RuntimeHttpAddress: Scheduler HTTP API endpoint (default:http://localhost:3120)LoggerAddress: Socket.IO endpoint for logs/widgets (default:http://localhost:3040)
Pipe Implementation (Advanced Model)
A minimal pipe that transforms data:
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
using Microsoft.Extensions.Logging;
public class MyPipe : FlowBoxCore
{
private readonly ILogger _logger;
public MyPipe(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_logger = flowMakerLogger.CreateILogger(GetType().Name);
}
protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
var serializer = Serialization.GetSerializer(header);
var input = serializer.Deserialize<Dictionary<string, object>>(data);
// Transform
var output = new Dictionary<string, object>(input)
{
["processed"] = true,
["timestamp"] = DateTime.UtcNow.ToString("O")
};
_logger.LogInformation("Processed data: {Output}", output);
// Push downstream using the same header (preserves serialization method)
await Push("output", header, serializer.Serialize(output));
}
}
[NOTE!sync/BACKPRESSURE HANDLING] The
await Push()call blocks until the downstream receiver signals readiness viaCAN_SEND_NEXT. Long-running processing inOnInput()delays acknowledgment and can stall the entire pipeline—offload heavy work toTask.Run()or async queues.
[WARNING!warning/HEADER REUSE] Reusing the incoming
headerfor output preserves the serialization method. For downstream data flow, this is correct. For control messages (e.g., custom ACKs), create a new header with the appropriate event ID.
Pipe Implementation (Simple Model)
The simple model provides typed wrappers for easier implementation:
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Simple;
using Microsoft.Extensions.Logging;
public class MyTypedPipe : FlowBoxCore<InputData, OutputData>
{
private readonly ILogger _logger;
public MyTypedPipe(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_logger = flowMakerLogger.CreateILogger(GetType().Name);
}
protected override async Task OnInput(string inputId, InputData data)
{
// Transform
var output = new OutputData
{
Value = data.Value * 2,
Processed = true,
Timestamp = DateTime.UtcNow
};
_logger.LogInformation("Transformed: {Output}", output);
// Push downstream (serializer handled automatically)
await Push("output", output);
}
}
public record InputData(double Value);
public record OutputData(double Value, bool Processed, DateTime Timestamp);
[NOTE!lightbulb/TYPED ABSTRACTION] The simple model automatically handles serialization/deserialization based on your type parameters. Use it when you have well-defined data contracts.
Source Implementation
A source that emits values periodically:
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
public class IntervalSource : FlowBoxRaw, IFlowBoxSource
{
private readonly ILogger _logger;
private readonly int _intervalMs;
private int _counter;
public IntervalSource(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_logger = flowMakerLogger.CreateILogger(GetType().Name);
_intervalMs = initParams.OptionsAs<SourceOptions>().IntervalMs;
}
public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
{
_ = Task.Run(async () =>
{
await Task.Delay(_intervalMs);
_counter++;
var data = new { Counter = _counter, Timestamp = DateTime.UtcNow };
_logger.LogInformation("Emitting counter: {Counter}", _counter);
var serializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
pushOut(header, serializer.Serialize(data));
});
}
private record SourceOptions { public int IntervalMs { get; set; } = 1000; }
}
[NOTE!info/EAGER EMISSION PATTERN] Sources typically emit asynchronously in
OnOutputReady()without waiting for input. This initiates the data flow. The pattern is: start task → return immediately → task callspushOut()when ready.
[ERROR!error/ASYNC FIRE_AND_FORGET] In
OnOutputReady(), you must start an async task (e.g.,Task.Run()). If youawaitdirectly, you block the callback and the runtime waits forever.
Sink Implementation
A sink that processes incoming data:
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
using Microsoft.Extensions.Logging;
public class MySink : FlowBoxRaw, IFlowBoxSink
{
private readonly ILogger _logger;
public MySink(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_logger = flowMakerLogger.CreateILogger(GetType().Name);
}
public void OnInputReceived(byte[] inputId, Header header, byte[] data, Action onReadyForNext)
{
var serializer = Serialization.GetSerializer(header);
var input = serializer.Deserialize<Dictionary<string, object>>(data);
// Process data (e.g., save to database)
_logger.LogInformation("Received data: {Input}", input);
// Signal readiness immediately (synchronous processing)
onReadyForNext();
}
}
[NOTE!sync/NO OUTPUT NEEDED] Sinks don't implement
IFlowBoxSource, so they never callPush(). The pipeline ends at the sink—no downstream acknowledgment is needed.
Simple Model: AbstractSource and AbstractSink
The simple model provides base classes that handle serialization automatically:
AbstractSource
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Simple;
using Industream.FlowMaker.Sdk.Serializations;
public class RandomSource : AbstractSource<RandomData>
{
public RandomSource(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
}
public override void OnOutputReady(string outputId, Action<RandomData> pushOut)
{
var random = new Random();
pushOut(new RandomData { Value = random.NextDouble() });
}
}
public record RandomData(double Value);
AbstractSink
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Simple;
public class LoggingSink : AbstractSink<LogData>
{
private readonly ILogger _logger;
public LoggingSink(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_logger = flowMakerLogger.CreateILogger(GetType().Name);
}
public override void OnInputReceived(string inputId, LogData data, Action onReadyForNext)
{
_logger.LogInformation("Log entry: {Message} at {Timestamp}",
data.Message, data.Timestamp);
onReadyForNext();
}
}
public record LogData(string Message, DateTime Timestamp);
[NOTE!lightbulb/WHEN TO USE SIMPLE MODEL] Use
AbstractSource<T>andAbstractSink<T>when you want typed data handling without manual serialization. For pipes with complex logic,FlowBoxCore(advanced model) provides more control.
Key Implementation Patterns
Logger Usage
// Log with LogLevel
Log(LogLevel.Information, "Processing started");
// Use injected ILogger
var logger = flowMakerLogger.CreateILogger(GetType().Name);
logger.LogInformation("Processed {Count} items", count);
Widget Management
var widgetManager = new FlowMakerWidgetManager(
initParams.RuntimeContext,
serviceProvider.GetRequiredService<SocketIOFactory>(),
loggerAddress);
widgetManager.AddWidget("counter", "exposure_plus_1")
.AddWidgetElement("counter", "title", new FlowBoxWidgetTitleElement
{
Title = "Counter",
Subtitle = "Counts messages"
})
.AddWidgetElement("counter", "value", new FlowBoxWidgetValueElement
{
Contents = "Count: {{count}}",
Icon = "exposure_plus_1"
})
.SetContextValue("count", 0)
.PublishWidgets();
widgetManager.EventReceived += (sender, e) =>
{
// Handle widget events
widgetManager.SetContextValue("count", currentValue + 1);
};
Lifecycle Hooks
public class MyBox : FlowBoxCore
{
public MyBox(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
// Constructor: initialize state
}
protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
// Process incoming data
}
public override void OnDestroy()
{
base.OnDestroy(); // Disposes subjects
// Additional cleanup: close DB connections, etc.
}
}
[INFO!info/AUTO DISPOSAL]
FlowBoxCore.OnDestroy()automatically disposes the internal Rx subjects. Always callbase.OnDestroy()to ensure proper cleanup and prevent resource leaks.
Serialization
C# SDK uses MessagePack by default with JSON support:
using Industream.FlowMaker.Sdk.Serializations;
// Get serializer from header (respects incoming format)
var serializer = Serialization.GetSerializer(header);
var data = serializer.Deserialize<MyType>(bytes);
// Or use specific serializer
var msgpackSerializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
var jsonSerializer = Serialization.GetSerializer(SerializationMethods.Json);
[NOTE!info/HEADER-DRIVEN FORMAT] The
Headerclass contains the serialization method. UseSerialization.GetSerializer(header)to automatically select the correct serializer based on the incoming message format.
Common Pitfalls
Missing Builder in Registration
// !!! WRONG !!! : Missing builder
var registration = new RegistrationInfoBuilder(FlowElementType.Pipe)
.SetId("myorg/my-pipe")
.SetCurrentVersion("1.0.0");
// Missing .Build((initParams, sp) => new MyPipe(initParams, sp))
// +++ CORRECT +++ : Include builder
var registration = new RegistrationInfoBuilder(FlowElementType.Pipe)
.SetId("myorg/my-pipe")
.SetCurrentVersion("1.0.0")
.Build((initParams, sp) => new MyPipe(initParams, sp));
[ERROR!error/BUILDER REQUIRED] Without
.Build(), theFlowMakerClientcan't instantiate your FlowBox and throws an exception during initialization.
Blocking in OnOutputReady
// !!! WRONG !!!: Blocking the callback
public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
{
Thread.Sleep(1000); // Blocks runtime!
pushOut(header, data);
}
// +++ CORRECT +++: Async task
public void OnOutputReady(byte[] outputId, Header header, PushOutDelegate pushOut)
{
_ = Task.Run(async () =>
{
await Task.Delay(1000);
pushOut(header, data);
});
}
[WARNING!warning/EVENT LOOP BLOCKING] C#'s NetMQ poller is single-threaded for message processing. Blocking operations (e.g.,
Thread.Sleep(), sync I/O) freeze the entire worker. UseTask.Run()or async methods.
Not Awaiting Push
// !!! WRONG !!!: Fire-and-forget push
protected override Task OnInput(...)
{
Push("output", header, data); // Not awaited!
return Task.CompletedTask;
}
// +++ CORRECT +++ Await push
protected override async Task OnInput(...)
{
await Push("output", header, data);
}
[ERROR!error/BACKPRESSURE BYPASS] Not awaiting
Push()bypasses backpressure handling. The runtime won't know when downstream is ready, potentially causing data loss or memory issues.