FlowBox Implementations (Source, Pipe, Sink)

This guide covers how to implement the three types of FlowBox nodes in Python: Source, Pipe, and Sink.

Worker Entrypoint & Registration

Every Python worker starts with an entrypoint that registers FlowBox implementations and starts the ZeroMQ event loop:

import asyncio
from industream.flowmaker.sdk import FlowMakerWorkerBuilder, FlowMakerWorkerOptions
from my_boxes import MyFlowBox

def configure_options(options: FlowMakerWorkerOptions) -> None:
    options.worker_id = options.worker_id or "fm-worker-python-1"
    options.router_transport_address = options.router_transport_address or "tcp://*:5560"
    options.runtime_http_address = options.runtime_http_address or "http://localhost:3120"
    options.worker_log_socket_io_endpoint = options.worker_log_socket_io_endpoint or "http://localhost:3040"

builder = (
    FlowMakerWorkerBuilder()
    .configure(configure_options)
    .declare_flowbox(MyFlowBox)
)

worker = builder.build()
asyncio.run(worker.run())

[INFO!info/RUNTIME INITIALIZATION] The worker.run() method is async and starts an infinite ZMQ message loop. Code after asyncio.run(worker.run()) won't execute—the loop keeps running until the process receives SIGINT/SIGTERM.

[WARNING!warning/SINGLETON REGISTRATION] FlowMakerWorkerBuilder maintains an internal dictionary of implementations. Declaring the same flowbox ID twice causes a collision. Always declare each box type exactly once per worker.

Environment/configurable options:

  • worker_id: Unique worker identifier (required)
  • router_transport_address: ZMQ router socket address (default: tcp://*:5560)
  • runtime_http_address: Scheduler HTTP API endpoint (default: http://localhost:3120)
  • worker_log_socket_io_endpoint: Socket.IO endpoint for logs/widgets (default: http://localhost:3040)
  • worker_transport_adv_address: Advanced transport address for data connection

Pipe Implementation

A minimal pipe that transforms data:

import msgpack
from industream.flowmaker.sdk import (
    flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
    FlowBoxIO, FlowBoxUIConfig, FlowBoxCore, FlowBoxInitParams
)

@flowbox(FlowBoxRegistrationInfo(
    id="myorg/my-python-pipe",
    display_name="My Python Pipe",
    current_version="1.0.0",
    type=FlowBoxType.PIPE,
    icon="transform",
    stateless=True,
    io_interfaces=FlowBoxIOInterfaces(
        inputs=[FlowBoxIO(name="input", display_name="Input")],
        outputs=[FlowBoxIO(name="output", display_name="Output")]
    ),
    ui_config=FlowBoxUIConfig(
        default_options={}
    )
))
class MyPipe(FlowBoxCore):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self.job_id = init_params.runtime_context.job_id

    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        # Deserialize input using msgpack
        input_data = msgpack.unpackb(data)

        # Transform
        output_data = {
            **input_data,
            "processed": True,
            "timestamp": "2026-04-14T12:00:00Z"
        }

        # Log the processing
        self.logger.log(f"Processed: {output_data}")

        # Push downstream using the same header (preserves serialization method)
        await self.push("output", header, msgpack.packb(output_data))

[NOTE!sync/BACKPRESSURE HANDLING] The await self.push() call blocks until the downstream receiver signals readiness via CAN_SEND_NEXT. Long-running processing in on_input() delays acknowledgment and can stall the entire pipeline—offload heavy work to thread pools or async queues.

[WARNING!warning/HEADER REUSE] Reusing the incoming header bytes for output preserves the serialization method. For downstream data flow, this is correct. For control messages (e.g., custom ACKs), create a new header with the appropriate event ID.


Source Implementation

A source that emits random values periodically:

import asyncio
import msgpack
import random
from collections.abc import Callable
from industream.flowmaker.sdk import (
    flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
    FlowBoxIO, FlowBoxUIConfig, FlowBoxRaw, FlowBoxSource, FlowBoxInitParams,
    FlowBoxWidgetTitleElement, FlowBoxWidgetValueElement, FlowBoxWidgetIconElement,
    WidgetEvent
)

@flowbox(FlowBoxRegistrationInfo(
    id="myorg/random-generator",
    display_name="Random Data Generator",
    current_version="1.0.0",
    type=FlowBoxType.SOURCE,
    icon="123",
    io_interfaces=FlowBoxIOInterfaces(
        inputs=[],
        outputs=[FlowBoxIO(name="output", display_name="Output")]
    ),
    ui_config=FlowBoxUIConfig(
        default_options={
            "fieldName": "random",
            "pushIntervalMs": 1000
        }
    )
))
class RandomGenerator(FlowBoxRaw, FlowBoxSource):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self.job_id = init_params.runtime_context.job_id
        self.field_name = str(init_params.options["fieldName"])
        self.push_interval_s = int(init_params.options["pushIntervalMs"]) / 1000

        self._create_widget()

    def _create_widget(self) -> None:
        self.widget_manager \
            .set_context_value("liveRandomNumber", 0) \
            .set_context_value("genRandomNumber", 0)

        self.widget_manager.add_widget("live-random-number", "123") \
            .add_widget_element("live-random-number", "1-title", FlowBoxWidgetTitleElement(
                title="Random Data Generator",
                subtitle="Random number live-generated"
            )) \
            .add_widget_element("live-random-number", "2-value", FlowBoxWidgetValueElement(
                contents="Random: {{liveRandomNumber}}",
                icon="arrow_circle_right"
            ))

        self.widget_manager.add_widget("on-demand-random-number", "123") \
            .add_widget_element("on-demand-random-number", "1-title", FlowBoxWidgetTitleElement(
                title="Random Data Generator",
                subtitle="Random number generated on demand"
            )) \
            .add_widget_element("on-demand-random-number", "2-value", FlowBoxWidgetValueElement(
                contents="Random: {{genRandomNumber}}",
                icon="arrow_circle_right"
            )) \
            .add_widget_element("on-demand-random-number", "3-generate", FlowBoxWidgetIconElement(
                icon="change_circle"
            ))

        # Handle widget events
        self.widget_manager.on_event_received(
            lambda e: self.widget_manager.set_context_value("genRandomNumber", random.random())
        )

        self.widget_manager.publish_widgets()

    def on_output_ready(self, output_name: str, header: bytes,
                        fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
        async def emit_async() -> None:
            await asyncio.sleep(self.push_interval_s)
            random_value = random.random()
            output = {self.field_name: random_value}

            # Update widget
            self.widget_manager.set_context_value("liveRandomNumber", random_value)

            # Emit to downstream
            fn_ready_for_next_item(header, msgpack.packb(output))

        asyncio.create_task(emit_async())

[NOTE!info/EAGER EMISSION PATTERN] Source boxes like RandomGenerator typically emit asynchronously without waiting for input. The on_output_ready() callback is invoked when downstream is ready, but the actual emission happens after a delay (via asyncio.sleep). This is the key behavioral difference from pipes/sinks—they initiate the data flow.

[ERROR!error/ASYNC FIRE_AND_FORGET] In on_output_ready(), you must start the async task (e.g., asyncio.create_task()). If you await directly, you block the callback and the runtime waits forever. The pattern is: start task → return immediately → task calls fn_ready_for_next_item() when ready.


Sink Implementation

A sink that processes incoming data:

import msgpack
from industream.flowmaker.sdk import (
    flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
    FlowBoxIO, FlowBoxUIConfig, FlowBoxCore, FlowBoxInitParams
)

@flowbox(FlowBoxRegistrationInfo(
    id="myorg/my-sink",
    display_name="My Python Sink",
    current_version="1.0.0",
    type=FlowBoxType.SINK,
    icon="save",
    stateless=True,
    io_interfaces=FlowBoxIOInterfaces(
        inputs=[FlowBoxIO(name="input", display_name="Input")],
        outputs=[]
    ),
    ui_config=FlowBoxUIConfig(
        default_options={}
    )
))
class MySink(FlowBoxCore):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self.job_id = init_params.runtime_context.job_id

    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        # Deserialize using msgpack
        input_data = msgpack.unpackb(data)

        # Process data (e.g., save to database)
        print(f"Received: {input_data}")

        # Log the processing
        self.logger.log({
            "message": "Processed data",
            "input": input_data
        })

    def on_destroy(self) -> None:
        super().on_destroy()
        print(f"Sink destroyed for job {self.job_id}")

[NOTE!sync/NO OUTPUT NEEDED] Sinks don't implement FlowBoxSource, so they never call push(). The pipeline ends at the sink—no downstream acknowledgment is needed.


Key Implementation Patterns

Logger Usage

# Log simple message
self.logger.log("Processing started")

# Log structured data
self.logger.log({
    "message": "Data processed",
    "count": self.counter,
    "input": input_data
})

The logger automatically includes jobId and nodeId in the payload.

Widget Management

# Create widget with title
self.widget_manager.add_widget("my-widget", "123") \
    .add_widget_element("my-widget", "title", FlowBoxWidgetTitleElement(
        title="My Widget",
        subtitle="Shows data"
    ))

# Update context value (used in widget templates)
self.widget_manager.set_context_value("myValue", 42)

# Publish to frontend
self.widget_manager.publish_widgets()

# Listen for events
self.widget_manager.on_event_received(lambda e: self.handle_event(e))

Lifecycle Hooks

def __init__(self, init_params: FlowBoxInitParams) -> None:
    super().__init__(init_params)
    # Setup: initialize counters, widgets, etc.

async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
    # Process incoming data (FlowBoxCore only)
    pass

def on_output_ready(self, output_name: str, header: bytes,
                    fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
    # Emit data (FlowBoxSource only)
    pass

def on_destroy(self) -> None:
    super().on_destroy()  # Disposes logger and widget manager
    # Cleanup: close connections, etc.

[INFO!info/AUTO DISPOSAL] FlowBoxRaw.on_destroy() automatically disposes the logger and widget manager sockets. Always call super().on_destroy() to ensure clean shutdown and prevent resource leaks.


Serialization

Python SDK uses msgpack by default. JSON is also supported via SerializationFormat.JSON.

import msgpack

# Serialize
data = msgpack.packb({"key": "value"})

# Deserialize
obj = msgpack.unpackb(data)

[NOTE!info/BINARY DATA] msgpack handles bytes natively. If you need to send binary data, use msgpack.packb(data, use_bin_type=True) and msgpack.unpackb(data, raw=False).


Common Pitfalls

Missing @flowbox Decorator

# !!! WRONG !!!: Missing decorator
class MyBox(FlowBoxCore):
    async def on_input(...): ...

# +++ CORRECT +++: Decorated with registration info
@flowbox(FlowBoxRegistrationInfo(...))
class MyBox(FlowBoxCore):
    async def on_input(...): ...

[ERROR!error/DECORATOR REQUIRED] Without @flowbox(), the FlowMakerWorkerBuilder can't extract registration info and raises AttributeError: Missing registration info.

Forgetting async in on_input

# !!! WRONG !!!: Synchronous method
def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
    await self.push(...)  # TypeError: 'await' outside async function

# +++ CORRECT +++: Async method
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
    await self.push(...)

Blocking in on_output_ready

# !!! WRONG !!!: Blocking the callback
def on_output_ready(self, output_name: str, header: bytes,
                    fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
    time.sleep(1)  # Blocks runtime!
    fn_ready_for_next_item(header, data)

# +++ CORRECT +++: Async task
def on_output_ready(self, output_name: str, header: bytes,
                    fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
    async def emit():
        await asyncio.sleep(1)
        fn_ready_for_next_item(header, data)
    asyncio.create_task(emit())

[WARNING!warning/EVENT LOOP BLOCKING] Python's event loop is single-threaded. Blocking operations (e.g., time.sleep(), sync I/O) freeze the entire worker. Use asyncio.sleep(), async libraries, or loop.run_in_executor() for blocking code.