Python SDK (industream-flowmaker-sdk)
Package: sdk/python.
Overview
The Python SDK provides the runtime infrastructure for building FlowBox workers—stateless or stateful components that process data streams in the FlowMaker platform. Built on Python 3.11+ with asyncio, NetMQ (via zmq), and msgpack.
[INFO!hub/SDK SCOPE] This SDK handles the runtime loop, serialization, and logging. Your FlowBox implementations focus on business logic: transforming data, calling external APIs, or managing state.
Runtime Architecture & Data Flow
[NOTE!lightbulb/DIAGRAM CONTEXT] This diagram shows a typical three-stage pipeline. Data flows down (Source → Pipe → Sink) via NetMQ frames with headers. Logs flow right to the Logger via Socket.IO. Control events flow to the Scheduler via NetMQ.
How data flows:
- Source initiates data by calling
on_output_ready(), pushing{ val, $$meta, $$timestamp }with a MessagePack header - Pipe receives via
on_input_received(), transforms, and pushes downstream with the same header format - Sink receives final data, processes it (e.g., saves to database), and acknowledges
- Logger receives log events via Socket.IO (dashed lines), displays in the frontend
- Scheduler manages lifecycle (init, destroy, heartbeat) via NetMQ control events (dotted lines)
Data Shape Convention
All FlowBox integrations should use this standard data shape for compatibility with DataCatalog and metadata-aware processing:
{
"temperature": 21.7,
"pressure": 1.8,
"$$meta": {
"temperature": { "entryId": "dc-entry-temp" },
"pressure": { "entryId": "dc-entry-pressure" }
},
"$$timestamp": "2026-04-10T16:00:00.000Z"
}
$$meta: Per-field metadata (e.g., DataCatalog entry references)$$timestamp: ISO 8601 timestamp of data creation
[NOTE!lightbulb/META PROPERTY CONTRACT] The
$$metaobject must have field names as keys matching the data properties. Mismatched keys will cause DataCatalog auto-fetchers to skip enrichment.
[WARNING!shield/RESERVED PREFIX] Properties starting with
$$are reserved for runtime metadata. Do not use this prefix for application data—it may be overwritten or stripped by SDK components.
Quick Start
Create a minimal worker with a single pipe:
import msgpack
import asyncio
from industream.flowmaker.sdk import (
FlowMakerWorkerBuilder, FlowMakerWorkerOptions,
flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
FlowBoxIO, FlowBoxCore, FlowBoxInitParams
)
@flowbox(FlowBoxRegistrationInfo(
id="myorg/message-counter",
display_name="Message Counter",
current_version="1.0.0",
type=FlowBoxType.PIPE,
icon="exposure_plus_1",
io_interfaces=FlowBoxIOInterfaces(
inputs=[FlowBoxIO(name="input", display_name="Input")],
outputs=[FlowBoxIO(name="output", display_name="Output")]
)
))
class MessageCounter(FlowBoxCore):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self.counter = 0
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
self.counter += 1
output = msgpack.unpackb(data)
output["counter"] = self.counter
await self.push("output", header, msgpack.packb(output))
def configure(options: FlowMakerWorkerOptions) -> None:
options.worker_id = "fm-worker-python-1"
builder = (
FlowMakerWorkerBuilder()
.configure(configure)
.declare_flowbox(MessageCounter)
)
worker = builder.build()
asyncio.run(worker.run())
[NOTE!lightbulb/START HERE] This is the minimal working example. Copy it, modify the
@flowboxdecorator andon_input()logic, and you have a custom worker.
Build and packaging
From sdk/python:
python -m pip install -e .
python -m build
Package metadata:
- Build backend:
hatchling.build - Distribution name:
industream-flowmaker-sdk - Strict typing mode configured in
[tool.mypy]
References
sdk/python/pyproject.tomlsdk/python/industream/flowmaker/sdk/flowmaker_worker_builder.pysdk/python/industream/flowmaker/sdk/flowmaker_worker.pysdk/python/industream/flowmaker/sdk/flowbox.pysdk/python/industream/flowmaker/sdk/flowbox_core.pysdk/python/industream/flowmaker/sdk/flowbox_registration_info.pysdk/python/main.pysdk/python/random_data_generator_box.py