FlowBoxSerializer API

The C# SDK provides a comprehensive serialization system via the Serialization class and ISerializer interface. It supports MessagePack (default) and JSON, with automatic format detection from headers.

Overview

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Serializations;

public class MyPipe : FlowBoxCore
{
    protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
    {
        // Get serializer based on header (automatic format detection)
        var serializer = Serialization.GetSerializer(header);
        
        // Deserialize
        var input = serializer.Deserialize<Dictionary<string, object>>(data);
        
        // Transform
        var output = new Dictionary<string, object>(input)
        {
            ["processed"] = true
        };
        
        // Serialize and push (reuse header to preserve format)
        await Push("output", header, serializer.Serialize(output));
    }
}

The header-driven approach ensures downstream receivers get data in their expected format.


Header Structure

The Header class encapsulates 8-byte headers containing event ID and serialization method:

public class Header
{
    public Header(byte[] value);
    public Header(SerializationMethods serializationMethod, FlowConstants flowConstants = FlowConstants.CURRENT_EVENT);
    
    public FlowConstants FlowConstant { get; set; }
    public SerializationMethods SerializationMethod { get; set; }
    public Span<byte> Extra { get; set; }
    public byte[] GetHeader();
}

Header Layout

  • Bytes 0-3: Flow event ID (little-endian int32)
  • Bytes 4-7: Serialization method (little-endian int32)
  • Bytes 8+: Optional extra data

Event IDs:

public enum FlowConstants
{
    INIT_FLOW_ELEMENT = 0x4026,
    CURRENT_EVENT = 0x8200,
    CAN_SEND_NEXT = 0x8001,
    DESTROY_EVENT = 0x82FF,
    HEARTBEAT_EVENT = 0x8210,
    SCHEDULER_RESTART = 0x8220,
    ACK_EVENT = 0x9000,
    ERROR_EVENT = 0xFFFF
}

Serialization methods:

public enum SerializationMethods : int
{
    Msgpack = 0x8000,
    Json = 0x9000,
    Unknown = 0x0000
}

Creating Headers

// Create header with default CURRENT_EVENT
var header = new Header(SerializationMethods.Msgpack);

// Create header with specific event
var header = new Header(SerializationMethods.Json, FlowConstants.CURRENT_EVENT);

// Create from raw bytes
byte[] rawBytes = /* 8+ bytes from runtime */;
var header = new Header(rawBytes);

// Access properties
var method = header.SerializationMethod; // SerializationMethods.Msgpack
var eventConstant = header.FlowConstant; // FlowConstants.CURRENT_EVENT

[INFO!info/HEADER REUSE] For pipes and sinks, reuse the incoming header when pushing output. This preserves the serialization method and event ID, ensuring downstream receives data in the expected format.

Header Conversion

// Header to byte[]
byte[] bytes = header.GetHeader();
// Or implicit cast
byte[] bytes = (byte[])header;

// byte[] to Header
byte[] raw = /* from runtime */;
Header header = raw; // implicit cast

Serialization Registry

The Serialization class manages a registry of serializers:

public static class Serialization
{
    // Get serializer by method enum
    public static ISerializer GetSerializer(SerializationMethods method);
    
    // Get serializer by method ID
    public static ISerializer GetSerializer(int method);
    
    // Get serializer from header (automatic)
    public static ISerializer GetSerializer(Header header);
    
    // Get method enum from serializer
    public static SerializationMethods GetSerializationMethod(ISerializer serializer);
    
    // Register custom serializer
    public static void RegisterSerializer(int method, ISerializer serializer);
}

Built-in Serializers

// MessagePack (default)
var msgpackSerializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
var data = msgpackSerializer.Serialize(myObject);
var obj = msgpackSerializer.Deserialize<MyType>(bytes);

// JSON
var jsonSerializer = Serialization.GetSerializer(SerializationMethods.Json);
var data = jsonSerializer.Serialize(myObject);
var obj = jsonSerializer.Deserialize<MyType>(bytes);

[NOTE!info/AUTOMATIC SELECTION] Use Serialization.GetSerializer(header) to automatically select the correct serializer based on the incoming message format. This is the recommended approach for pipes that need to preserve format.


ISerializer Interface

public interface ISerializer
{
    byte[] Serialize<T>(T obj);
    T Deserialize<T>(byte[] data);
    T Deserialize<T>(string data);
}

Custom Serializer Implementation

public class ProtobufSerializer : ISerializer
{
    public byte[] Serialize<T>(T obj)
    {
        // Protobuf serialization logic
        return ProtobufSerializer.Serialize(obj);
    }
    
    public T Deserialize<T>(byte[] data)
    {
        // Protobuf deserialization logic
        return ProtobufSerializer.Deserialize<T>(data);
    }
}

// Register custom serializer
Serialization.RegisterSerializer(0xA000, new ProtobufSerializer());

[WARNING!warning/CUSTOM METHOD ID] When registering a custom serializer, choose a unique method ID that doesn't conflict with built-in ones (0x8000 for Msgpack, 0x9000 for JSON). Document your custom ID for downstream consumers.


MessagePack Serializer

public class FMMessagePackSerializer : ISerializer
{
    public FMMessagePackSerializer();
    public byte[] Serialize<T>(T obj);
    public T Deserialize<T>(byte[] data);
}

Configuration:

  • Uses MessagePackSerializerOptions.Standard
  • Custom resolver: FlowMakerResolver + ContractlessStandardResolver
  • Supports anonymous types and POCOs without attributes

Example:

var serializer = Serialization.GetSerializer(SerializationMethods.Msgpack);

// Serialize POCO
var data = new { Value = 42, Name = "test" };
byte[] bytes = serializer.Serialize(data);

// Deserialize to POCO
var obj = serializer.Deserialize<MyType>(bytes);

// Deserialize to dictionary
var dict = serializer.Deserialize<Dictionary<string, object>>(bytes);

[NOTE!info/RESOLVER CONFIGURATION] The FlowMakerResolver handles special FlowMaker types. The ContractlessStandardResolver allows serialization of types without [MessagePackObject] attributes.


JSON Serializer

public class FMJsonConverterSerializer : ISerializer
{
    public byte[] Serialize<T>(T obj);
    public T Deserialize<T>(byte[] data);
}

Configuration:

  • Uses System.Text.Json.JsonSerializer
  • Property naming: camelCase
  • Enum conversion: camelCase strings
  • Indented output (for debugging)

Example:

var serializer = Serialization.GetSerializer(SerializationMethods.Json);

// Serialize
var data = new { Value = 42, Name = "test" };
byte[] bytes = serializer.Serialize(data);
// Output: {"value":42,"name":"test"}

// Deserialize
var obj = serializer.Deserialize<MyType>(bytes);

[INFO!info/CAMEL CASE] JSON serialization uses camelCase property names by default. This matches JavaScript/TypeScript conventions and ensures compatibility with frontend components.


Common Serialization Patterns

Pass-Through (Preserve Format)

protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
    // Get serializer from incoming header
    var serializer = Serialization.GetSerializer(header);
    
    // Deserialize and transform
    var input = serializer.Deserialize<Dictionary<string, object>>(data);
    var output = new Dictionary<string, object>(input)
    {
        ["processed"] = true
    };
    
    // Push with same header (preserves format)
    await Push("output", header, serializer.Serialize(output));
}

[NOTE!lightbulb/PREFER REUSE] Reusing the incoming header is the simplest and most efficient approach. The runtime handles format negotiation; you just transform the payload.

Format Conversion

protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
    // Deserialize with incoming format
    var inputSerializer = Serialization.GetSerializer(header);
    var input = inputSerializer.Deserialize<MyType>(data);
    
    // Transform
    var output = Transform(input);
    
    // Create new header with desired format
    var newHeader = new Header(SerializationMethods.Json);
    var outputSerializer = Serialization.GetSerializer(SerializationMethods.Json);
    
    // Push with new format
    await Push("output", newHeader, outputSerializer.Serialize(output));
}

[ERROR!error/FORMAT MISMATCH] If you change the serialization format, you MUST create a new header with the correct format code. Downstream boxes expect the format specified in the header.

Typed Serialization (Simple Model)

public class TypedPipe : FlowBoxCore<InputData, OutputData>
{
    protected override async Task OnInput(string inputId, InputData data)
    {
        // Serializer handled automatically
        var output = new OutputData { Value = data.Value * 2 };
        await Push("output", output); // No manual serialization needed
    }
}

[NOTE!lightbulb/TYPE SAFETY] The simple model (FlowBoxCore<TInput, TOutput>) handles serialization automatically based on your type parameters. Use it for well-defined data contracts.


Metadata Convention

While not enforced by the SDK, the FlowMaker ecosystem uses a standard metadata convention:

var payload = new
{
    temperature = 21.7,
    pressure = 1.8,
    Meta = new
    {
        temperature = new { entryId = "dc-entry-temp" },
        pressure = new { entryId = "dc-entry-pressure" }
    },
    Timestamp = DateTime.UtcNow.ToString("O")
};

var bytes = serializer.Serialize(payload);

Convention:

  • Root keys: measured tag values
  • Meta: per-tag metadata (e.g., DataCatalog entry IDs)
  • Timestamp: event timestamp in ISO 8601 format

[INFO!info/METADATA PREFIX] The Meta property (or $$meta in JavaScript) avoids collisions with user data. The runtime doesn't interpret this metadata—it's purely for application logic.


Differences from TypeScript/Python SDKs

Feature C# SDK TypeScript SDK Python SDK
Serializer class Serialization registry + ISerializer FlowBoxSerializer singleton None (use msgpack directly)
Header class Header with properties Raw Buffer Raw bytes
Format detection GetSerializer(header) flowBoxSerializer.unpack() Manual (read header bytes)
Custom serializers RegisterSerializer() Not documented Not supported
Built-in formats Msgpack, JSON Msgpack, JSON Msgpack, JSON

[NOTE!lightbulb/REGISTRY PATTERN] C# SDK uses a registry pattern for serializers, making it easy to add custom formats. TypeScript uses a singleton with built-in methods. Python requires manual format detection.


Complete Example

using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
using Microsoft.Extensions.Logging;

public class FormatConverterPipe : FlowBoxCore
{
    private readonly ILogger _logger;

    public FormatConverterPipe(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
        : base(initParams, serviceProvider)
    {
        _logger = flowMakerLogger.CreateILogger<FormatConverterPipe>();
    }

    protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
    {
        // Log incoming format
        _logger.LogInformation("Received data in format: {Format}", header.SerializationMethod);
        
        // Deserialize with incoming format
        var inputSerializer = Serialization.GetSerializer(header);
        var input = inputSerializer.Deserialize<Dictionary<string, object>>(data);
        
        // Transform
        var output = new Dictionary<string, object>(input)
        {
            ["converted"] = true,
            ["converterVersion"] = "1.0.0"
        };
        
        // Convert to JSON (if not already)
        Header outputHeader;
        byte[] outputData;
        
        if (header.SerializationMethod == SerializationMethods.Json)
        {
            // Already JSON, reuse header
            outputHeader = header;
            outputData = data;
        }
        else
        {
            // Convert to JSON
            outputHeader = new Header(SerializationMethods.Json);
            var jsonSerializer = Serialization.GetSerializer(SerializationMethods.Json);
            outputData = jsonSerializer.Serialize(output);
        }
        
        _logger.LogInformation("Sending data in format: {Format}", outputHeader.SerializationMethod);
        
        // Push downstream
        await Push("output", outputHeader, outputData);
    }
}

Troubleshooting

SerializationException: Type not found

// !!! WRONG !!!: Missing type in resolver
var obj = serializer.Deserialize<MyType>(bytes); // MyType not registered

// +++ CORRECT +++ : Ensure type is known
// Use FlowMakerResolver or register custom resolver

NullReferenceException on Deserialize

// !!! WRONG !!!: Not checking for null
var obj = serializer.Deserialize<MyType>(bytes);
Console.WriteLine(obj.Property); // NullReferenceException

// +++ CORRECT +++ : Check for null
var obj = serializer.Deserialize<MyType>(bytes);
if (obj != null)
    Console.WriteLine(obj.Property);

[ERROR!error/DESERIALIZATION NULL] Deserialize<T> can return null if the data is corrupted or doesn't match the type. Always check for null or use the non-nullable reference types with proper null handling.

Header Size Mismatch

// !!! WRONG !!!: Header too short
byte[] shortHeader = new byte[4];
var header = new Header(shortHeader); // ArgumentException

// +++ CORRECT +++ : Header must be at least 8 bytes
byte[] validHeader = new byte[8];
var header = new Header(validHeader);

[ERROR!error/HEADER VALIDATION] Always validate header length before creating a Header object: if (bytes.Length < 8) throw new ArgumentException("Invalid header");.