FlowBoxSerializer API

The FlowBoxSerializer handles MessagePack and JSON serialization with a header-based protocol. The header encodes both the action/event ID and the serialization method.

Header Format

Bytes 0-3: Action/Event ID (UInt32LE)
Bytes 4-7: Serialization Method (UInt32LE)

Event IDs (from flow.constants.ts):

  • INIT_FLOW_ELEMENT = 0x4026
  • CURRENT_EVENT = 0x8200
  • CAN_SEND_NEXT = 0x8201
  • DESTROY_EVENT = 0x8202
  • HEARTBEAT_EVENT = 0x8203
  • SCHEDULER_RESTART = 0x8204
  • ACK_EVENT = 0x8205

Serialization Methods:

  • SERIALIZER_MSGPACK = 0x8000
  • SERIALIZER_JSON = 0x9000

Singleton Instance

import { flowBoxSerializer } from "@industream/flowmaker-sdk";

flowBoxSerializer.createShortHeader()

flowBoxSerializer.createShortHeader(actionId: number, serializationMethod: number): Buffer

Parameters:

  • actionId: The event/action constant (e.g., CURRENT_EVENT, INIT_FLOW_ELEMENT)
  • serializationMethod: The serializer constant (SERIALIZER_MSGPACK or SERIALIZER_JSON)

Returns: 8-byte Buffer ready to be sent with payload

Creates an 8-byte header with the action ID and serialization method.

import { flowBoxSerializer, CURRENT_EVENT, SERIALIZER_MSGPACK } from "@industream/flowmaker-sdk";

const header = flowBoxSerializer.createShortHeader(CURRENT_EVENT, SERIALIZER_MSGPACK);
// header: Buffer [0x26, 0x40, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00]
//         bytes 0-3: 0x4026 (CURRENT_EVENT)
//         bytes 4-7: 0x8000 (MessagePack)

flowBoxSerializer.pack()

flowBoxSerializer.pack(header: Buffer, data: any): Buffer

Parameters:

  • header: The 8-byte header containing the serializer method
  • data: Any JavaScript value to serialize

Returns: Encoded Buffer

Throws: Error if serializer type is unknown

Serializes data using the serializer method encoded in the header.

import { flowBoxSerializer, CURRENT_EVENT, SERIALIZER_MSGPACK } from "@industream/flowmaker-sdk";

const header = flowBoxSerializer.createShortHeader(CURRENT_EVENT, SERIALIZER_MSGPACK);
const payload = flowBoxSerializer.pack(header, { temperature: 21.7, pressure: 1.8 });

// payload is now a MessagePack-encoded Buffer

flowBoxSerializer.unpack()

flowBoxSerializer.unpack(header: Buffer, buffer: Buffer): any

Parameters:

  • header: The 8-byte header containing the serializer method
  • buffer: The encoded payload Buffer

Returns: Decoded JavaScript value

Throws: Error if deserializer type is unknown

Deserializes data using the serializer method encoded in the header.

import { flowBoxSerializer, CURRENT_EVENT, SERIALIZER_MSGPACK } from "@industream/flowmaker-sdk";

const header = flowBoxSerializer.createShortHeader(CURRENT_EVENT, SERIALIZER_MSGPACK);
const payload = flowBoxSerializer.pack(header, { temperature: 21.7 });

// Later, on receive:
const data = flowBoxSerializer.unpack(header, payload);
// data: { temperature: 21.7 }

flowBoxSerializer.packNum()

flowBoxSerializer.packNum(constant: number, data: any): Buffer

Parameters:

  • constant: The serializer constant (e.g., SERIALIZER_JSON)
  • data: Any JavaScript value to serialize

Returns: Encoded Buffer

Direct serialization by constant value (bypasses header parsing).

import { flowBoxSerializer, SERIALIZER_JSON } from "@industream/flowmaker-sdk";

const jsonBuffer = flowBoxSerializer.packNum(SERIALIZER_JSON, { val: 42 });

flowBoxSerializer.unpackNum()

flowBoxSerializer.unpackNum(constant: number, buffer: Buffer): any

Parameters:

  • constant: The serializer constant (e.g., SERIALIZER_JSON)
  • buffer: The encoded payload Buffer

Returns: Decoded JavaScript value

Direct deserialization by constant value.

import { flowBoxSerializer, SERIALIZER_JSON } from "@industream/flowmaker-sdk";

const data = flowBoxSerializer.unpackNum(SERIALIZER_JSON, jsonBuffer);
// data: { val: 42 }

Complete Serializer Usage Example

import {
  flowBoxSerializer,
  CURRENT_EVENT,
  SERIALIZER_MSGPACK,
  SERIALIZER_JSON
} from "@industream/flowmaker-sdk";

// ── CREATING HEADERS ────────────────────────────────────

// For runtime events (used internally by SDK)
const eventHeader = flowBoxSerializer.createShortHeader(CURRENT_EVENT, SERIALIZER_MSGPACK);

// For custom data emission (often use 0 for action ID)
const dataHeader = flowBoxSerializer.createShortHeader(0, SERIALIZER_MSGPACK);

// ── SENDING DATA ──────────────────────────────────────

// Create header
const header = flowBoxSerializer.createShortHeader(0, SERIALIZER_MSGPACK);

// Serialize payload
const payload = flowBoxSerializer.pack(header, {
  temperature: 21.7,
  pressure: 1.8,
  $$meta: {
    temperature: { entryId: "dc-entry-temp" }
  },
  $$timestamp: new Date().toISOString()
});

// Send header + payload over ZMQ (in custom implementations)
socket.send([identity, token, inputId, header, payload]);

// ── RECEIVING DATA ────────────────────────────────────

// Read header from message
const eventId = header.readUInt32LE(0);
const serializerMethod = header.readUInt32LE(4);

// Deserialize based on header
const data = flowBoxSerializer.unpack(header, receivedBuffer);

console.log(data.temperature); // 21.7

Using pack/unpack Directly (Performance Optimization)

[NOTE!lightbulb/WHEN TO BYPASS] Direct msgpackr usage is appropriate when:

  • You control both sender and receiver (internal box-to-box communication)
  • You're serializing data that doesn't traverse ZMQ (e.g., in-memory caching)
  • Performance profiling shows serialization is a bottleneck (>10% of CPU)

For performance-critical paths, using msgpackr directly is acceptable:

import { pack, unpack } from "msgpackr";

// Serialize
const data = pack({ temperature: 21.7 });

// Deserialize
const obj = unpack(data);

Best practice: Use flowBoxSerializer.pack() and flowBoxSerializer.unpack() in most cases to ensure the header's serialization method is respected. Direct pack/unpack usage bypasses header-based dispatch and should only be used when:

  • You control both sender and receiver
  • Performance profiling shows serialization is a bottleneck
  • You're working with internal data that doesn't traverse the ZMQ protocol

[ERROR!error/HEADER MISMATCH RISK] If you use pack() directly but the receiver expects flowBoxSerializer.unpack() with a specific header, deserialization will fail or produce corrupted data. Document any direct serialization usage explicitly and ensure all parties use the same method.


Registering Custom Serializers

[WARNING!shield/TYPE SAFETY] Custom serializers bypass the SDK's type checking. Ensure your encode/decode functions are symmetric (unpack(pack(x)) === x) and handle edge cases (undefined, circular references, BigInt) explicitly.

import { flowBoxSerializer } from "@industream/flowmaker-sdk";

const CUSTOM_SERIALIZER = 0xA000;

// Register custom serializer
flowBoxSerializer.registerSerializer(CUSTOM_SERIALIZER, (data) => {
  // Custom encoding logic (e.g., protobuf, binary format)
  return Buffer.from(JSON.stringify(data));
});

flowBoxSerializer.registerDeserializer(CUSTOM_SERIALIZER, (buffer) => {
  // Custom decoding logic
  return JSON.parse(buffer.toString());
});

// Now you can use the custom serializer
const header = flowBoxSerializer.createShortHeader(CURRENT_EVENT, CUSTOM_SERIALIZER);
const payload = flowBoxSerializer.pack(header, { custom: true });

Summary of Critical Patterns

[INFO!hub/SUMMARY OF CRITICAL PATTERNS] Must-follow conventions:

  1. Call increaseRefCount() in constructor, decreaseRefCount() in onDestroy()
  2. Use flowBoxSerializer.unpack(header, data) to respect incoming serialization
  3. Reuse incoming header for downstream push() to preserve format
  4. onOutputReady() for sources = eager emission; onInput() for pipes/sinks = backpressured
  5. Logger messages queue before connect—don't rely on order for early logs
  6. run() starts infinite loop—process won't exit without SIGINT/SIGTERM