Python SDK (industream-flowmaker-sdk)

Package: sdk/python.

Overview

The Python SDK provides the runtime infrastructure for building FlowBox workers—stateless or stateful components that process data streams in the FlowMaker platform. Built on Python 3.11+ with asyncio, NetMQ (via zmq), and msgpack.

[INFO!hub/SDK SCOPE] This SDK handles the runtime loop, serialization, and logging. Your FlowBox implementations focus on business logic: transforming data, calling external APIs, or managing state.


Runtime Architecture & Data Flow

[NOTE!lightbulb/DIAGRAM CONTEXT] This diagram shows a typical three-stage pipeline. Data flows down (Source → Pipe → Sink) via NetMQ frames with headers. Logs flow right to the Logger via Socket.IO. Control events flow to the Scheduler via NetMQ.

{ val, $$meta, $$timestamp }header: Msgpack{ transformedVal , $$meta, $$timestamp }header: Msgpackemit(log)CAN_SEND_NEXTCURRENT_EVENTSourcemethodson_output_ready(id, hdr, cb)Pipemethodson_input_received(id, hdr, data)on_output_ready(id, hdr, cb)Sinkmethodson_input_received(id, hdr, data)LoggerserverSocket.IO serverSchedulereventsNetMQ Events
NetMQ/Frames
Socket.IO Events

How data flows:

  1. Source initiates data by calling on_output_ready(), pushing { val, $$meta, $$timestamp } with a MessagePack header
  2. Pipe receives via on_input_received(), transforms, and pushes downstream with the same header format
  3. Sink receives final data, processes it (e.g., saves to database), and acknowledges
  4. Logger receives log events via Socket.IO (dashed lines), displays in the frontend
  5. Scheduler manages lifecycle (init, destroy, heartbeat) via NetMQ control events (dotted lines)

Data Shape Convention

All FlowBox integrations should use this standard data shape for compatibility with DataCatalog and metadata-aware processing:

{
  "temperature": 21.7,
  "pressure": 1.8,
  "$$meta": {
    "temperature": { "entryId": "dc-entry-temp" },
    "pressure": { "entryId": "dc-entry-pressure" }
  },
  "$$timestamp": "2026-04-10T16:00:00.000Z"
}
  • $$meta: Per-field metadata (e.g., DataCatalog entry references)
  • $$timestamp: ISO 8601 timestamp of data creation

[NOTE!lightbulb/META PROPERTY CONTRACT] The $$meta object must have field names as keys matching the data properties. Mismatched keys will cause DataCatalog auto-fetchers to skip enrichment.

[WARNING!shield/RESERVED PREFIX] Properties starting with $$ are reserved for runtime metadata. Do not use this prefix for application data—it may be overwritten or stripped by SDK components.


Quick Start

Create a minimal worker with a single pipe:

import msgpack
import asyncio
from industream.flowmaker.sdk import (
    FlowMakerWorkerBuilder, FlowMakerWorkerOptions,
    flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
    FlowBoxIO, FlowBoxCore, FlowBoxInitParams
)

@flowbox(FlowBoxRegistrationInfo(
    id="myorg/message-counter",
    display_name="Message Counter",
    current_version="1.0.0",
    type=FlowBoxType.PIPE,
    icon="exposure_plus_1",
    io_interfaces=FlowBoxIOInterfaces(
        inputs=[FlowBoxIO(name="input", display_name="Input")],
        outputs=[FlowBoxIO(name="output", display_name="Output")]
    )
))
class MessageCounter(FlowBoxCore):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self.counter = 0

    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        self.counter += 1
        output = msgpack.unpackb(data)
        output["counter"] = self.counter
        await self.push("output", header, msgpack.packb(output))

def configure(options: FlowMakerWorkerOptions) -> None:
    options.worker_id = "fm-worker-python-1"

builder = (
    FlowMakerWorkerBuilder()
    .configure(configure)
    .declare_flowbox(MessageCounter)
)

worker = builder.build()
asyncio.run(worker.run())

[NOTE!lightbulb/START HERE] This is the minimal working example. Copy it, modify the @flowbox decorator and on_input() logic, and you have a custom worker.


Build and packaging

From sdk/python:

python -m pip install -e .
python -m build

Package metadata:

  • Build backend: hatchling.build
  • Distribution name: industream-flowmaker-sdk
  • Strict typing mode configured in [tool.mypy]

References

  • sdk/python/pyproject.toml
  • sdk/python/industream/flowmaker/sdk/flowmaker_worker_builder.py
  • sdk/python/industream/flowmaker/sdk/flowmaker_worker.py
  • sdk/python/industream/flowmaker/sdk/flowbox.py
  • sdk/python/industream/flowmaker/sdk/flowbox_core.py
  • sdk/python/industream/flowmaker/sdk/flowbox_registration_info.py
  • sdk/python/main.py
  • sdk/python/random_data_generator_box.py