FlowBoxSerializer API
The Python SDK doesn't provide a dedicated FlowBoxSerializer class like TypeScript. Instead, serialization is handled directly via the msgpack library, with header management done manually.
Overview
import msgpack
from industream.flowmaker.sdk import FlowBoxCore
class MyPipe(FlowBoxCore):
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
# Deserialize
input_data = msgpack.unpackb(data)
# Process
output_data = {"result": input_data["value"] * 2}
# Serialize and push (reusing incoming header)
await self.push("output", header, msgpack.packb(output_data))
The header parameter contains metadata (event ID + serialization method) and is passed through by the runtime.
Serialization Formats
Python SDK supports two serialization formats:
MessagePack (Default)
import msgpack
# Serialize
data = msgpack.packb({"key": "value", "number": 42})
# Deserialize
obj = msgpack.unpackb(data)
Advantages:
- Compact binary format
- Faster than JSON
- Native bytes support
JSON
import json
# Serialize
data = json.dumps({"key": "value"}).encode('utf-8')
# Deserialize
obj = json.loads(data.decode('utf-8'))
Advantages:
- Human-readable
- Universal support
- Debugging-friendly
[NOTE!info/FORMAT SELECTION] The serialization format is specified in the FlowBox registration via
supported_formatsinFlowBoxIO. The runtime respects this and sends data in the requested format.
Header Structure
The header bytes contain:
- Bytes 0-3: Event ID (little-endian uint32)
- Bytes 4-7: Serialization method (little-endian uint32)
Event IDs:
class FlowMakerEvent(IntEnum):
INIT_FLOW_BOX = 0x4026
CURRENT_EVENT = 0x8200
CAN_SEND_NEXT = 0x8001
DESTROY_EVENT = 0x82FF
HEARTBEAT_EVENT = 0x8210
SCHEDULER_RESTART = 0x8220
Serialization methods:
# Typically defined as constants
SERIALIZER_MSGPACK = 0
SERIALIZER_JSON = 1
Reading the Header
def parse_header(header: bytes) -> tuple[int, int]:
event_id = int.from_bytes(header[:4], byteorder="little")
serialization_method = int.from_bytes(header[4:8], byteorder="little")
return event_id, serialization_method
# Usage
event_id, ser_method = parse_header(header)
if ser_method == 0: # MSGPACK
data = msgpack.unpackb(payload)
elif ser_method == 1: # JSON
data = json.loads(payload.decode('utf-8'))
[INFO!info/HEADER REUSE] For pipes and sinks, reuse the incoming
headerwhen pushing output. This preserves the serialization method and event ID, ensuring downstream receives data in the expected format.
Creating a Header (Sources Only)
Sources need to create their own headers:
def create_header(event_id: int, serialization_method: int) -> bytes:
return (
event_id.to_bytes(4, byteorder="little") +
serialization_method.to_bytes(4, byteorder="little")
)
# Usage in a Source
HEADER = create_header(0x8200, 0) # CURRENT_EVENT + MSGPACK
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
data = msgpack.packb({"value": 42})
fn_ready_for_next_item(HEADER, data)
[WARNING!warning/HEADER SIZE] Headers must be exactly 8 bytes. Using the wrong byte order (big-endian vs little-endian) or size causes deserialization errors in the runtime.
Common Serialization Patterns
Pass-Through (Preserve Format)
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
# Reuse header and data format
output_data = transform(msgpack.unpackb(data))
await self.push("output", header, msgpack.packb(output_data))
[NOTE!lightbulb/PREFER REUSE] Reusing the incoming header is the simplest and most efficient approach. The runtime handles format negotiation; you just transform the payload.
Format Conversion
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
# Deserialize with incoming format
event_id = int.from_bytes(header[:4], byteorder="little")
ser_method = int.from_bytes(header[4:8], byteorder="little")
if ser_method == 0: # MSGPACK
input_data = msgpack.unpackb(data)
else: # JSON
input_data = json.loads(data.decode('utf-8'))
# Transform
output_data = transform(input_data)
# Create new header with desired format
new_header = create_header(event_id, SERIALIZER_JSON)
await self.push("output", new_header, json.dumps(output_data).encode('utf-8'))
[ERROR!error/FORMAT MISMATCH] If you change the serialization format, you MUST create a new header with the correct format code. Downstream boxes expect the format specified in the header.
Binary Data
import msgpack
# Serialize binary data
binary_data = b'\x00\x01\x02\x03'
packed = msgpack.packb({"data": binary_data}, use_bin_type=True)
# Deserialize
obj = msgpack.unpackb(packed, raw=False)
binary = obj["data"] # bytes
[NOTE!info/BIN TYPE] Use
use_bin_type=Truewhen packing to preserve bytes as binary (not strings). Useraw=Falsewhen unpacking to decode strings properly.
Metadata Convention
While not enforced by the SDK, the FlowMaker ecosystem uses a standard metadata convention:
import msgpack
payload = {
"temperature": 21.7,
"pressure": 1.8,
"$$meta": {
"temperature": {"entryId": "dc-entry-temp"},
"pressure": {"entryId": "dc-entry-pressure"}
},
"$$timestamp": "2026-04-14T12:00:00Z"
}
data = msgpack.packb(payload)
Convention:
- Root keys: measured tag values
$$meta: per-tag metadata (e.g., DataCatalog entry IDs)$$timestamp: event timestamp in ISO 8601 format
[INFO!info/METADATA PREFIX] The
$$prefix avoids collisions with user data. The runtime doesn't interpret this metadata—it's purely for application logic.
Differences from TypeScript SDK
| Feature | Python SDK | TypeScript SDK |
|---|---|---|
| Serializer class | None (use msgpack directly) |
FlowBoxSerializer with methods |
| Header creation | Manual (bytes manipulation) | flowBoxSerializer.createShortHeader() |
| Header unpacking | Manual (bytes slicing) | flowBoxSerializer.unpack(header, data) |
| Format detection | Manual (read header bytes) | Automatic in unpack() |
[NOTE!lightbulb/MANUAL vs AUTOMATIC] TypeScript's
FlowBoxSerializer.unpack()automatically detects the format from the header and uses the correct deserializer. Python requires manual format detection, giving you more control but more boilerplate.
Complete Example
import msgpack
from industream.flowmaker.sdk import (
flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
FlowBoxIO, FlowBoxCore, FlowBoxInitParams
)
@flowbox(FlowBoxRegistrationInfo(
id="myorg/serializer-example",
display_name="Serializer Example",
current_version="1.0.0",
type=FlowBoxType.PIPE,
icon="code",
io_interfaces=FlowBoxIOInterfaces(
inputs=[FlowBoxIO(name="input", display_name="Input",
supported_formats=[SerializationFormat.MSGPACK])],
outputs=[FlowBoxIO(name="output", display_name="Output",
supported_formats=[SerializationFormat.MSGPACK])]
)
))
class SerializerExample(FlowBoxCore):
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
# Parse header
event_id = int.from_bytes(header[:4], byteorder="little")
ser_method = int.from_bytes(header[4:8], byteorder="little")
# Deserialize based on format
if ser_method == 0: # MSGPACK
input_data = msgpack.unpackb(data)
else:
raise ValueError(f"Unsupported serialization method: {ser_method}")
# Transform
output_data = {
**input_data,
"processed": True,
"processor_version": "1.0.0"
}
# Push downstream (reuse header)
await self.push("output", header, msgpack.packb(output_data))
Troubleshooting
TypeError: can't serialize bytes
# ❌ Wrong
msgpack.packb({"data": b'\x00\x01'}) # May fail
# ✅ Correct
msgpack.packb({"data": b'\x00\x01'}, use_bin_type=True)
UnicodeDecodeError
# ❌ Wrong
msgpack.unpackb(data, raw=True) # Returns bytes for strings
# ✅ Correct
msgpack.unpackb(data, raw=False) # Returns str
Header Size Mismatch
# ❌ Wrong: Header too short
header = b'\x00\x01\x02' # Only 3 bytes
# ✅ Correct: Header must be 8 bytes
header = (0x8200).to_bytes(4, 'little') + (0).to_bytes(4, 'little')
[ERROR!error/HEADER VALIDATION] Always validate header length before parsing:
if len(header) < 8: raise ValueError("Invalid header").