FlowBoxSerializer API
The C# SDK provides a comprehensive serialization system via the Serialization class and ISerializer interface. It supports MessagePack (default) and JSON, with automatic format detection from headers.
Overview
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Serializations;
public class MyPipe : FlowBoxCore
{
protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
// Get serializer based on header (automatic format detection)
var serializer = Serialization.GetSerializer(header);
// Deserialize
var input = serializer.Deserialize<Dictionary<string, object>>(data);
// Transform
var output = new Dictionary<string, object>(input)
{
["processed"] = true
};
// Serialize and push (reuse header to preserve format)
await Push("output", header, serializer.Serialize(output));
}
}
The header-driven approach ensures downstream receivers get data in their expected format.
Header Structure
The Header class encapsulates 8-byte headers containing event ID and serialization method:
public class Header
{
public Header(byte[] value);
public Header(SerializationMethods serializationMethod, FlowConstants flowConstants = FlowConstants.CURRENT_EVENT);
public FlowConstants FlowConstant { get; set; }
public SerializationMethods SerializationMethod { get; set; }
public Span<byte> Extra { get; set; }
public byte[] GetHeader();
}
Header Layout
- Bytes 0-3: Flow event ID (little-endian int32)
- Bytes 4-7: Serialization method (little-endian int32)
- Bytes 8+: Optional extra data
Event IDs:
public enum FlowConstants
{
INIT_FLOW_ELEMENT = 0x4026,
CURRENT_EVENT = 0x8200,
CAN_SEND_NEXT = 0x8001,
DESTROY_EVENT = 0x82FF,
HEARTBEAT_EVENT = 0x8210,
SCHEDULER_RESTART = 0x8220,
ACK_EVENT = 0x9000,
ERROR_EVENT = 0xFFFF
}
Serialization methods:
public enum SerializationMethods : int
{
Msgpack = 0x8000,
Json = 0x9000,
Unknown = 0x0000
}
Creating Headers
// Create header with default CURRENT_EVENT
var header = new Header(SerializationMethods.Msgpack);
// Create header with specific event
var header = new Header(SerializationMethods.Json, FlowConstants.CURRENT_EVENT);
// Create from raw bytes
byte[] rawBytes = /* 8+ bytes from runtime */;
var header = new Header(rawBytes);
// Access properties
var method = header.SerializationMethod; // SerializationMethods.Msgpack
var eventConstant = header.FlowConstant; // FlowConstants.CURRENT_EVENT
[INFO!info/HEADER REUSE] For pipes and sinks, reuse the incoming
headerwhen pushing output. This preserves the serialization method and event ID, ensuring downstream receives data in the expected format.
Header Conversion
// Header to byte[]
byte[] bytes = header.GetHeader();
// Or implicit cast
byte[] bytes = (byte[])header;
// byte[] to Header
byte[] raw = /* from runtime */;
Header header = raw; // implicit cast
Serialization Registry
The Serialization class manages a registry of serializers:
public static class Serialization
{
// Get serializer by method enum
public static ISerializer GetSerializer(SerializationMethods method);
// Get serializer by method ID
public static ISerializer GetSerializer(int method);
// Get serializer from header (automatic)
public static ISerializer GetSerializer(Header header);
// Get method enum from serializer
public static SerializationMethods GetSerializationMethod(ISerializer serializer);
// Register custom serializer
public static void RegisterSerializer(int method, ISerializer serializer);
}
Built-in Serializers
// MessagePack (default)
var msgpackSerializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
var data = msgpackSerializer.Serialize(myObject);
var obj = msgpackSerializer.Deserialize<MyType>(bytes);
// JSON
var jsonSerializer = Serialization.GetSerializer(SerializationMethods.Json);
var data = jsonSerializer.Serialize(myObject);
var obj = jsonSerializer.Deserialize<MyType>(bytes);
[NOTE!info/AUTOMATIC SELECTION] Use
Serialization.GetSerializer(header)to automatically select the correct serializer based on the incoming message format. This is the recommended approach for pipes that need to preserve format.
ISerializer Interface
public interface ISerializer
{
byte[] Serialize<T>(T obj);
T Deserialize<T>(byte[] data);
T Deserialize<T>(string data);
}
Custom Serializer Implementation
public class ProtobufSerializer : ISerializer
{
public byte[] Serialize<T>(T obj)
{
// Protobuf serialization logic
return ProtobufSerializer.Serialize(obj);
}
public T Deserialize<T>(byte[] data)
{
// Protobuf deserialization logic
return ProtobufSerializer.Deserialize<T>(data);
}
}
// Register custom serializer
Serialization.RegisterSerializer(0xA000, new ProtobufSerializer());
[WARNING!warning/CUSTOM METHOD ID] When registering a custom serializer, choose a unique method ID that doesn't conflict with built-in ones (0x8000 for Msgpack, 0x9000 for JSON). Document your custom ID for downstream consumers.
MessagePack Serializer
public class FMMessagePackSerializer : ISerializer
{
public FMMessagePackSerializer();
public byte[] Serialize<T>(T obj);
public T Deserialize<T>(byte[] data);
}
Configuration:
- Uses
MessagePackSerializerOptions.Standard - Custom resolver:
FlowMakerResolver+ContractlessStandardResolver - Supports anonymous types and POCOs without attributes
Example:
var serializer = Serialization.GetSerializer(SerializationMethods.Msgpack);
// Serialize POCO
var data = new { Value = 42, Name = "test" };
byte[] bytes = serializer.Serialize(data);
// Deserialize to POCO
var obj = serializer.Deserialize<MyType>(bytes);
// Deserialize to dictionary
var dict = serializer.Deserialize<Dictionary<string, object>>(bytes);
[NOTE!info/RESOLVER CONFIGURATION] The
FlowMakerResolverhandles special FlowMaker types. TheContractlessStandardResolverallows serialization of types without[MessagePackObject]attributes.
JSON Serializer
public class FMJsonConverterSerializer : ISerializer
{
public byte[] Serialize<T>(T obj);
public T Deserialize<T>(byte[] data);
}
Configuration:
- Uses
System.Text.Json.JsonSerializer - Property naming: camelCase
- Enum conversion: camelCase strings
- Indented output (for debugging)
Example:
var serializer = Serialization.GetSerializer(SerializationMethods.Json);
// Serialize
var data = new { Value = 42, Name = "test" };
byte[] bytes = serializer.Serialize(data);
// Output: {"value":42,"name":"test"}
// Deserialize
var obj = serializer.Deserialize<MyType>(bytes);
[INFO!info/CAMEL CASE] JSON serialization uses camelCase property names by default. This matches JavaScript/TypeScript conventions and ensures compatibility with frontend components.
Common Serialization Patterns
Pass-Through (Preserve Format)
protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
// Get serializer from incoming header
var serializer = Serialization.GetSerializer(header);
// Deserialize and transform
var input = serializer.Deserialize<Dictionary<string, object>>(data);
var output = new Dictionary<string, object>(input)
{
["processed"] = true
};
// Push with same header (preserves format)
await Push("output", header, serializer.Serialize(output));
}
[NOTE!lightbulb/PREFER REUSE] Reusing the incoming header is the simplest and most efficient approach. The runtime handles format negotiation; you just transform the payload.
Format Conversion
protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
// Deserialize with incoming format
var inputSerializer = Serialization.GetSerializer(header);
var input = inputSerializer.Deserialize<MyType>(data);
// Transform
var output = Transform(input);
// Create new header with desired format
var newHeader = new Header(SerializationMethods.Json);
var outputSerializer = Serialization.GetSerializer(SerializationMethods.Json);
// Push with new format
await Push("output", newHeader, outputSerializer.Serialize(output));
}
[ERROR!error/FORMAT MISMATCH] If you change the serialization format, you MUST create a new header with the correct format code. Downstream boxes expect the format specified in the header.
Typed Serialization (Simple Model)
public class TypedPipe : FlowBoxCore<InputData, OutputData>
{
protected override async Task OnInput(string inputId, InputData data)
{
// Serializer handled automatically
var output = new OutputData { Value = data.Value * 2 };
await Push("output", output); // No manual serialization needed
}
}
[NOTE!lightbulb/TYPE SAFETY] The simple model (
FlowBoxCore<TInput, TOutput>) handles serialization automatically based on your type parameters. Use it for well-defined data contracts.
Metadata Convention
While not enforced by the SDK, the FlowMaker ecosystem uses a standard metadata convention:
var payload = new
{
temperature = 21.7,
pressure = 1.8,
Meta = new
{
temperature = new { entryId = "dc-entry-temp" },
pressure = new { entryId = "dc-entry-pressure" }
},
Timestamp = DateTime.UtcNow.ToString("O")
};
var bytes = serializer.Serialize(payload);
Convention:
- Root keys: measured tag values
Meta: per-tag metadata (e.g., DataCatalog entry IDs)Timestamp: event timestamp in ISO 8601 format
[INFO!info/METADATA PREFIX] The
Metaproperty (or$$metain JavaScript) avoids collisions with user data. The runtime doesn't interpret this metadata—it's purely for application logic.
Differences from TypeScript/Python SDKs
| Feature | C# SDK | TypeScript SDK | Python SDK |
|---|---|---|---|
| Serializer class | Serialization registry + ISerializer |
FlowBoxSerializer singleton |
None (use msgpack directly) |
| Header class | Header with properties |
Raw Buffer |
Raw bytes |
| Format detection | GetSerializer(header) |
flowBoxSerializer.unpack() |
Manual (read header bytes) |
| Custom serializers | RegisterSerializer() |
Not documented | Not supported |
| Built-in formats | Msgpack, JSON | Msgpack, JSON | Msgpack, JSON |
[NOTE!lightbulb/REGISTRY PATTERN] C# SDK uses a registry pattern for serializers, making it easy to add custom formats. TypeScript uses a singleton with built-in methods. Python requires manual format detection.
Complete Example
using Industream.FlowMaker.Sdk.Clients;
using Industream.FlowMaker.Sdk.Elements.Advanced;
using Industream.FlowMaker.Sdk.Serializations;
using Microsoft.Extensions.Logging;
public class FormatConverterPipe : FlowBoxCore
{
private readonly ILogger _logger;
public FormatConverterPipe(FlowBoxInitParams initParams, IServiceProvider serviceProvider)
: base(initParams, serviceProvider)
{
_logger = flowMakerLogger.CreateILogger<FormatConverterPipe>();
}
protected override async Task OnInput(byte[] inputId, Header header, byte[] data)
{
// Log incoming format
_logger.LogInformation("Received data in format: {Format}", header.SerializationMethod);
// Deserialize with incoming format
var inputSerializer = Serialization.GetSerializer(header);
var input = inputSerializer.Deserialize<Dictionary<string, object>>(data);
// Transform
var output = new Dictionary<string, object>(input)
{
["converted"] = true,
["converterVersion"] = "1.0.0"
};
// Convert to JSON (if not already)
Header outputHeader;
byte[] outputData;
if (header.SerializationMethod == SerializationMethods.Json)
{
// Already JSON, reuse header
outputHeader = header;
outputData = data;
}
else
{
// Convert to JSON
outputHeader = new Header(SerializationMethods.Json);
var jsonSerializer = Serialization.GetSerializer(SerializationMethods.Json);
outputData = jsonSerializer.Serialize(output);
}
_logger.LogInformation("Sending data in format: {Format}", outputHeader.SerializationMethod);
// Push downstream
await Push("output", outputHeader, outputData);
}
}
Troubleshooting
SerializationException: Type not found
// !!! WRONG !!!: Missing type in resolver
var obj = serializer.Deserialize<MyType>(bytes); // MyType not registered
// +++ CORRECT +++ : Ensure type is known
// Use FlowMakerResolver or register custom resolver
NullReferenceException on Deserialize
// !!! WRONG !!!: Not checking for null
var obj = serializer.Deserialize<MyType>(bytes);
Console.WriteLine(obj.Property); // NullReferenceException
// +++ CORRECT +++ : Check for null
var obj = serializer.Deserialize<MyType>(bytes);
if (obj != null)
Console.WriteLine(obj.Property);
[ERROR!error/DESERIALIZATION NULL]
Deserialize<T>can return null if the data is corrupted or doesn't match the type. Always check for null or use the non-nullable reference types with proper null handling.
Header Size Mismatch
// !!! WRONG !!!: Header too short
byte[] shortHeader = new byte[4];
var header = new Header(shortHeader); // ArgumentException
// +++ CORRECT +++ : Header must be at least 8 bytes
byte[] validHeader = new byte[8];
var header = new Header(validHeader);
[ERROR!error/HEADER VALIDATION] Always validate header length before creating a
Headerobject:if (bytes.Length < 8) throw new ArgumentException("Invalid header");.