FlowBoxSerializer API

The Python SDK doesn't provide a dedicated FlowBoxSerializer class like TypeScript. Instead, serialization is handled directly via the msgpack library, with header management done manually.

Overview

import msgpack
from industream.flowmaker.sdk import FlowBoxCore

class MyPipe(FlowBoxCore):
    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        # Deserialize
        input_data = msgpack.unpackb(data)

        # Process
        output_data = {"result": input_data["value"] * 2}

        # Serialize and push (reusing incoming header)
        await self.push("output", header, msgpack.packb(output_data))

The header parameter contains metadata (event ID + serialization method) and is passed through by the runtime.


Serialization Formats

Python SDK supports two serialization formats:

MessagePack (Default)

import msgpack

# Serialize
data = msgpack.packb({"key": "value", "number": 42})

# Deserialize
obj = msgpack.unpackb(data)

Advantages:

  • Compact binary format
  • Faster than JSON
  • Native bytes support

JSON

import json

# Serialize
data = json.dumps({"key": "value"}).encode('utf-8')

# Deserialize
obj = json.loads(data.decode('utf-8'))

Advantages:

  • Human-readable
  • Universal support
  • Debugging-friendly

[NOTE!info/FORMAT SELECTION] The serialization format is specified in the FlowBox registration via supported_formats in FlowBoxIO. The runtime respects this and sends data in the requested format.


Header Structure

The header bytes contain:

  • Bytes 0-3: Event ID (little-endian uint32)
  • Bytes 4-7: Serialization method (little-endian uint32)

Event IDs:

class FlowMakerEvent(IntEnum):
    INIT_FLOW_BOX = 0x4026
    CURRENT_EVENT = 0x8200
    CAN_SEND_NEXT = 0x8001
    DESTROY_EVENT = 0x82FF
    HEARTBEAT_EVENT = 0x8210
    SCHEDULER_RESTART = 0x8220

Serialization methods:

# Typically defined as constants
SERIALIZER_MSGPACK = 0
SERIALIZER_JSON = 1

Reading the Header

def parse_header(header: bytes) -> tuple[int, int]:
    event_id = int.from_bytes(header[:4], byteorder="little")
    serialization_method = int.from_bytes(header[4:8], byteorder="little")
    return event_id, serialization_method

# Usage
event_id, ser_method = parse_header(header)
if ser_method == 0:  # MSGPACK
    data = msgpack.unpackb(payload)
elif ser_method == 1:  # JSON
    data = json.loads(payload.decode('utf-8'))

[INFO!info/HEADER REUSE] For pipes and sinks, reuse the incoming header when pushing output. This preserves the serialization method and event ID, ensuring downstream receives data in the expected format.

Creating a Header (Sources Only)

Sources need to create their own headers:

def create_header(event_id: int, serialization_method: int) -> bytes:
    return (
        event_id.to_bytes(4, byteorder="little") +
        serialization_method.to_bytes(4, byteorder="little")
    )

# Usage in a Source
HEADER = create_header(0x8200, 0)  # CURRENT_EVENT + MSGPACK

def on_output_ready(self, output_name: str, header: bytes,
                    fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
    data = msgpack.packb({"value": 42})
    fn_ready_for_next_item(HEADER, data)

[WARNING!warning/HEADER SIZE] Headers must be exactly 8 bytes. Using the wrong byte order (big-endian vs little-endian) or size causes deserialization errors in the runtime.


Common Serialization Patterns

Pass-Through (Preserve Format)

async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
    # Reuse header and data format
    output_data = transform(msgpack.unpackb(data))
    await self.push("output", header, msgpack.packb(output_data))

[NOTE!lightbulb/PREFER REUSE] Reusing the incoming header is the simplest and most efficient approach. The runtime handles format negotiation; you just transform the payload.

Format Conversion

async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
    # Deserialize with incoming format
    event_id = int.from_bytes(header[:4], byteorder="little")
    ser_method = int.from_bytes(header[4:8], byteorder="little")

    if ser_method == 0:  # MSGPACK
        input_data = msgpack.unpackb(data)
    else:  # JSON
        input_data = json.loads(data.decode('utf-8'))

    # Transform
    output_data = transform(input_data)

    # Create new header with desired format
    new_header = create_header(event_id, SERIALIZER_JSON)
    await self.push("output", new_header, json.dumps(output_data).encode('utf-8'))

[ERROR!error/FORMAT MISMATCH] If you change the serialization format, you MUST create a new header with the correct format code. Downstream boxes expect the format specified in the header.

Binary Data

import msgpack

# Serialize binary data
binary_data = b'\x00\x01\x02\x03'
packed = msgpack.packb({"data": binary_data}, use_bin_type=True)

# Deserialize
obj = msgpack.unpackb(packed, raw=False)
binary = obj["data"]  # bytes

[NOTE!info/BIN TYPE] Use use_bin_type=True when packing to preserve bytes as binary (not strings). Use raw=False when unpacking to decode strings properly.


Metadata Convention

While not enforced by the SDK, the FlowMaker ecosystem uses a standard metadata convention:

import msgpack

payload = {
    "temperature": 21.7,
    "pressure": 1.8,
    "$$meta": {
        "temperature": {"entryId": "dc-entry-temp"},
        "pressure": {"entryId": "dc-entry-pressure"}
    },
    "$$timestamp": "2026-04-14T12:00:00Z"
}

data = msgpack.packb(payload)

Convention:

  • Root keys: measured tag values
  • $$meta: per-tag metadata (e.g., DataCatalog entry IDs)
  • $$timestamp: event timestamp in ISO 8601 format

[INFO!info/METADATA PREFIX] The $$ prefix avoids collisions with user data. The runtime doesn't interpret this metadata—it's purely for application logic.


Differences from TypeScript SDK

Feature Python SDK TypeScript SDK
Serializer class None (use msgpack directly) FlowBoxSerializer with methods
Header creation Manual (bytes manipulation) flowBoxSerializer.createShortHeader()
Header unpacking Manual (bytes slicing) flowBoxSerializer.unpack(header, data)
Format detection Manual (read header bytes) Automatic in unpack()

[NOTE!lightbulb/MANUAL vs AUTOMATIC] TypeScript's FlowBoxSerializer.unpack() automatically detects the format from the header and uses the correct deserializer. Python requires manual format detection, giving you more control but more boilerplate.


Complete Example

import msgpack
from industream.flowmaker.sdk import (
    flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
    FlowBoxIO, FlowBoxCore, FlowBoxInitParams
)

@flowbox(FlowBoxRegistrationInfo(
    id="myorg/serializer-example",
    display_name="Serializer Example",
    current_version="1.0.0",
    type=FlowBoxType.PIPE,
    icon="code",
    io_interfaces=FlowBoxIOInterfaces(
        inputs=[FlowBoxIO(name="input", display_name="Input",
                         supported_formats=[SerializationFormat.MSGPACK])],
        outputs=[FlowBoxIO(name="output", display_name="Output",
                          supported_formats=[SerializationFormat.MSGPACK])]
    )
))
class SerializerExample(FlowBoxCore):
    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        # Parse header
        event_id = int.from_bytes(header[:4], byteorder="little")
        ser_method = int.from_bytes(header[4:8], byteorder="little")

        # Deserialize based on format
        if ser_method == 0:  # MSGPACK
            input_data = msgpack.unpackb(data)
        else:
            raise ValueError(f"Unsupported serialization method: {ser_method}")

        # Transform
        output_data = {
            **input_data,
            "processed": True,
            "processor_version": "1.0.0"
        }

        # Push downstream (reuse header)
        await self.push("output", header, msgpack.packb(output_data))

Troubleshooting

TypeError: can't serialize bytes

# ❌ Wrong
msgpack.packb({"data": b'\x00\x01'})  # May fail

# ✅ Correct
msgpack.packb({"data": b'\x00\x01'}, use_bin_type=True)

UnicodeDecodeError

# ❌ Wrong
msgpack.unpackb(data, raw=True)  # Returns bytes for strings

# ✅ Correct
msgpack.unpackb(data, raw=False)  # Returns str

Header Size Mismatch

# ❌ Wrong: Header too short
header = b'\x00\x01\x02'  # Only 3 bytes

# ✅ Correct: Header must be 8 bytes
header = (0x8200).to_bytes(4, 'little') + (0).to_bytes(4, 'little')

[ERROR!error/HEADER VALIDATION] Always validate header length before parsing: if len(header) < 8: raise ValueError("Invalid header").