FlowBoxSerializer API
The FlowBoxSerializer handles MessagePack and JSON serialization with a header-based protocol. The header encodes both the action/event ID and the serialization method.
Header Format
Bytes 0-3: Action/Event ID (UInt32LE)
Bytes 4-7: Serialization Method (UInt32LE)
Event IDs (from flow.constants.ts):
INIT_FLOW_ELEMENT = 0x4026CURRENT_EVENT = 0x8200CAN_SEND_NEXT = 0x8201DESTROY_EVENT = 0x8202HEARTBEAT_EVENT = 0x8203SCHEDULER_RESTART = 0x8204ACK_EVENT = 0x8205
Serialization Methods:
SERIALIZER_MSGPACK = 0x8000SERIALIZER_JSON = 0x9000
Singleton Instance
import { flowBoxSerializer } from "@industream/flowmaker-sdk";
flowBoxSerializer.createShortHeader()
flowBoxSerializer.createShortHeader(actionId: number, serializationMethod: number): Buffer
Parameters:
actionId: The event/action constant (e.g.,CURRENT_EVENT,INIT_FLOW_ELEMENT)serializationMethod: The serializer constant (SERIALIZER_MSGPACKorSERIALIZER_JSON)
Returns: 8-byte Buffer ready to be sent with payload
Creates an 8-byte header with the action ID and serialization method.
import { flowBoxSerializer, CURRENT_EVENT, SERIALIZER_MSGPACK } from "@industream/flowmaker-sdk";
const header = flowBoxSerializer.createShortHeader(CURRENT_EVENT, SERIALIZER_MSGPACK);
// header: Buffer [0x26, 0x40, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00]
// bytes 0-3: 0x4026 (CURRENT_EVENT)
// bytes 4-7: 0x8000 (MessagePack)
flowBoxSerializer.pack()
flowBoxSerializer.pack(header: Buffer, data: any): Buffer
Parameters:
header: The 8-byte header containing the serializer methoddata: Any JavaScript value to serialize
Returns: Encoded Buffer
Throws: Error if serializer type is unknown
Serializes data using the serializer method encoded in the header.
import { flowBoxSerializer, CURRENT_EVENT, SERIALIZER_MSGPACK } from "@industream/flowmaker-sdk";
const header = flowBoxSerializer.createShortHeader(CURRENT_EVENT, SERIALIZER_MSGPACK);
const payload = flowBoxSerializer.pack(header, { temperature: 21.7, pressure: 1.8 });
// payload is now a MessagePack-encoded Buffer
flowBoxSerializer.unpack()
flowBoxSerializer.unpack(header: Buffer, buffer: Buffer): any
Parameters:
header: The 8-byte header containing the serializer methodbuffer: The encoded payload Buffer
Returns: Decoded JavaScript value
Throws: Error if deserializer type is unknown
Deserializes data using the serializer method encoded in the header.
import { flowBoxSerializer, CURRENT_EVENT, SERIALIZER_MSGPACK } from "@industream/flowmaker-sdk";
const header = flowBoxSerializer.createShortHeader(CURRENT_EVENT, SERIALIZER_MSGPACK);
const payload = flowBoxSerializer.pack(header, { temperature: 21.7 });
// Later, on receive:
const data = flowBoxSerializer.unpack(header, payload);
// data: { temperature: 21.7 }
flowBoxSerializer.packNum()
flowBoxSerializer.packNum(constant: number, data: any): Buffer
Parameters:
constant: The serializer constant (e.g.,SERIALIZER_JSON)data: Any JavaScript value to serialize
Returns: Encoded Buffer
Direct serialization by constant value (bypasses header parsing).
import { flowBoxSerializer, SERIALIZER_JSON } from "@industream/flowmaker-sdk";
const jsonBuffer = flowBoxSerializer.packNum(SERIALIZER_JSON, { val: 42 });
flowBoxSerializer.unpackNum()
flowBoxSerializer.unpackNum(constant: number, buffer: Buffer): any
Parameters:
constant: The serializer constant (e.g.,SERIALIZER_JSON)buffer: The encoded payload Buffer
Returns: Decoded JavaScript value
Direct deserialization by constant value.
import { flowBoxSerializer, SERIALIZER_JSON } from "@industream/flowmaker-sdk";
const data = flowBoxSerializer.unpackNum(SERIALIZER_JSON, jsonBuffer);
// data: { val: 42 }
Complete Serializer Usage Example
import {
flowBoxSerializer,
CURRENT_EVENT,
SERIALIZER_MSGPACK,
SERIALIZER_JSON
} from "@industream/flowmaker-sdk";
// ── CREATING HEADERS ────────────────────────────────────
// For runtime events (used internally by SDK)
const eventHeader = flowBoxSerializer.createShortHeader(CURRENT_EVENT, SERIALIZER_MSGPACK);
// For custom data emission (often use 0 for action ID)
const dataHeader = flowBoxSerializer.createShortHeader(0, SERIALIZER_MSGPACK);
// ── SENDING DATA ──────────────────────────────────────
// Create header
const header = flowBoxSerializer.createShortHeader(0, SERIALIZER_MSGPACK);
// Serialize payload
const payload = flowBoxSerializer.pack(header, {
temperature: 21.7,
pressure: 1.8,
$$meta: {
temperature: { entryId: "dc-entry-temp" }
},
$$timestamp: new Date().toISOString()
});
// Send header + payload over ZMQ (in custom implementations)
socket.send([identity, token, inputId, header, payload]);
// ── RECEIVING DATA ────────────────────────────────────
// Read header from message
const eventId = header.readUInt32LE(0);
const serializerMethod = header.readUInt32LE(4);
// Deserialize based on header
const data = flowBoxSerializer.unpack(header, receivedBuffer);
console.log(data.temperature); // 21.7
Using pack/unpack Directly (Performance Optimization)
[NOTE!lightbulb/WHEN TO BYPASS] Direct
msgpackrusage is appropriate when:
- You control both sender and receiver (internal box-to-box communication)
- You're serializing data that doesn't traverse ZMQ (e.g., in-memory caching)
- Performance profiling shows serialization is a bottleneck (>10% of CPU)
For performance-critical paths, using msgpackr directly is acceptable:
import { pack, unpack } from "msgpackr";
// Serialize
const data = pack({ temperature: 21.7 });
// Deserialize
const obj = unpack(data);
Best practice: Use flowBoxSerializer.pack() and flowBoxSerializer.unpack() in most cases to ensure the header's serialization method is respected. Direct pack/unpack usage bypasses header-based dispatch and should only be used when:
- You control both sender and receiver
- Performance profiling shows serialization is a bottleneck
- You're working with internal data that doesn't traverse the ZMQ protocol
[ERROR!error/HEADER MISMATCH RISK] If you use
pack()directly but the receiver expectsflowBoxSerializer.unpack()with a specific header, deserialization will fail or produce corrupted data. Document any direct serialization usage explicitly and ensure all parties use the same method.
Registering Custom Serializers
[WARNING!shield/TYPE SAFETY] Custom serializers bypass the SDK's type checking. Ensure your encode/decode functions are symmetric (unpack(pack(x)) === x) and handle edge cases (undefined, circular references, BigInt) explicitly.
import { flowBoxSerializer } from "@industream/flowmaker-sdk";
const CUSTOM_SERIALIZER = 0xA000;
// Register custom serializer
flowBoxSerializer.registerSerializer(CUSTOM_SERIALIZER, (data) => {
// Custom encoding logic (e.g., protobuf, binary format)
return Buffer.from(JSON.stringify(data));
});
flowBoxSerializer.registerDeserializer(CUSTOM_SERIALIZER, (buffer) => {
// Custom decoding logic
return JSON.parse(buffer.toString());
});
// Now you can use the custom serializer
const header = flowBoxSerializer.createShortHeader(CURRENT_EVENT, CUSTOM_SERIALIZER);
const payload = flowBoxSerializer.pack(header, { custom: true });
Summary of Critical Patterns
[INFO!hub/SUMMARY OF CRITICAL PATTERNS] Must-follow conventions:
- Call
increaseRefCount()in constructor,decreaseRefCount()inonDestroy()- Use
flowBoxSerializer.unpack(header, data)to respect incoming serialization- Reuse incoming
headerfor downstreampush()to preserve formatonOutputReady()for sources = eager emission;onInput()for pipes/sinks = backpressured- Logger messages queue before connect—don't rely on order for early logs
run()starts infinite loop—process won't exit without SIGINT/SIGTERM