C# / .NET SDK (Industream.FlowMaker.Sdk)

Package: sdk/dotnet/Industream.FlowMaker.Sdk.

Overview

The C# SDK provides the runtime infrastructure for building FlowBox workers—stateless or stateful components that process data streams in the FlowMaker platform. Built on .NET 9.0 with NetMQ, System.Reactive, and MessagePack.

[INFO!hub/SDK SCOPE] This SDK handles the runtime loop, serialization, and logging. Your FlowBox implementations focus on business logic: transforming data, calling external APIs, or managing state.

Quick Start

Create a minimal worker with a single pipe:

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

var services = new ServiceCollection();
services.AddLogging();
services.AddSingleton<IFlowMakerLogger, FlowMakerLogger>();
services.AddSingleton<SocketIOFactory>();
var serviceProvider = services.BuildServiceProvider();

var clientBuilder = new FlowMakerClientBuilder(serviceProvider)
    .SetRouterTransportAddress("tcp://*:5560")
    .SetRuntimeHttpAddress("http://localhost:3120")
    .SetLoggerAddress("http://localhost:3040");

var registration = new RegistrationInfoBuilder(FlowElementType.Pipe)
    .SetId("myorg/message-counter")
    .SetDisplayName("Message Counter")
    .SetCurrentVersion("1.0.0")
    .SetInputs(new[] { new TypeDefinition("input") })
    .SetOutputs(new[] { new TypeDefinition("output") })
    .Build((initParams, sp) => new MessageCounter(initParams, sp));

clientBuilder.DeclareFlowElement(registration);
var client = clientBuilder.Build();
await client.Run();

public class MessageCounter : FlowBoxCore
{
    private int _counter;
    
    public MessageCounter(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider) { }
    
    protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
    {
        _counter++;
        var serializer = Serialization.GetSerializer(header);
        var input = serializer.Deserialize<Dictionary<string, object>>(data);
        input["counter"] = _counter;
        await Push("output", header, serializer.Serialize(input));
    }
}

[NOTE!lightbulb/START HERE] This is the minimal working example. Copy it, modify the registration and OnInput() logic, and you have a custom worker.


Documentation Pages


Runtime Architecture & Data Flow

[NOTE!lightbulb/DIAGRAM CONTEXT] This diagram shows a typical three-stage pipeline. Data flows down (Source → Pipe → Sink) via NetMQ frames with headers. Logs flow right to the Logger via Socket.IO. Control events flow to the Scheduler via NetMQ.

{ val, $$meta, $$timestamp }header: Msgpack{ transformedVal , $$meta, $$timestamp }header: Msgpackemit(log)CAN_SEND_NEXTCURRENT_EVENTSourcemethodsOnOutputReady(id, hdr, cb)PipemethodsOnInputReceived(id, hdr, data)OnOutputReady(id, hdr, cb)SinkmethodsOnInputReceived(id, hdr, data)LoggerserverSocket.IO serverSchedulereventsNetMQ Events
NetMQ/Frames
Socket.IO Events

Key points:

  • Source: Emits data via OnOutputReady() when downstream signals CAN_SEND_NEXT
  • Pipe: Receives via OnInputReceived(), transforms, pushes downstream
  • Sink: Terminal node, receives data without pushing
  • Logger: Socket.IO transport for all log messages
  • Scheduler: Manages flow lifecycle, sends control events

Data Shape Convention

{
  "temperature": 21.7,
  "pressure": 1.8,
  "$$meta": {
    "temperature": { "entryId": "dc-entry-temp" },
    "pressure": { "entryId": "dc-entry-pressure" }
  },
  "$$timestamp": "2026-04-10T16:00:00.000Z"
}
  • Root keys: measured tag values
  • $$meta: per-tag metadata (e.g., DataCatalog entry IDs)
  • $$timestamp: event timestamp in ISO 8601 format

Build and packaging

From sdk/dotnet:

dotnet restore Industream.FlowMaker.Sdk.sln
dotnet build Industream.FlowMaker.Sdk.sln -c Release

Target framework and language version:

  • TargetFramework: net9.0
  • LangVersion: 13.0

References

  • sdk/dotnet/Industream.FlowMaker.Sdk/Industream.FlowMaker.Sdk.csproj
  • sdk/dotnet/Industream.FlowMaker.Sdk/Clients/FlowMakerClient.cs
  • sdk/dotnet/Industream.FlowMaker.Sdk/Clients/FlowBoxRegistrationInfo.cs
  • sdk/dotnet/Industream.FlowMaker.Sdk/Clients/Header.cs
  • sdk/dotnet/Industream.FlowMaker.Sdk/Serializations/Serialization.cs
  • sdk/dotnet/Industream.FlowMaker.Sdk/Elements/Advanced/FlowBoxCore.cs
  • sdk/dotnet/Industream.FlowMaker.Sdk/Elements/Simple/FlowBoxCore.cs