FlowBox Implementations (Source, Pipe, Sink)
This guide covers how to implement the three types of FlowBox nodes in Python: Source, Pipe, and Sink.
Worker Entrypoint & Registration
Every Python worker starts with an entrypoint that registers FlowBox implementations and starts the ZeroMQ event loop:
import asyncio
from industream.flowmaker.sdk import FlowMakerWorkerBuilder, FlowMakerWorkerOptions
from my_boxes import MyFlowBox
def configure_options(options: FlowMakerWorkerOptions) -> None:
options.worker_id = options.worker_id or "fm-worker-python-1"
options.router_transport_address = options.router_transport_address or "tcp://*:5560"
options.runtime_http_address = options.runtime_http_address or "http://localhost:3120"
options.worker_log_socket_io_endpoint = options.worker_log_socket_io_endpoint or "http://localhost:3040"
builder = (
FlowMakerWorkerBuilder()
.configure(configure_options)
.declare_flowbox(MyFlowBox)
)
worker = builder.build()
asyncio.run(worker.run())
[INFO!info/RUNTIME INITIALIZATION] The
worker.run()method is async and starts an infinite ZMQ message loop. Code afterasyncio.run(worker.run())won't execute—the loop keeps running until the process receives SIGINT/SIGTERM.
[WARNING!warning/SINGLETON REGISTRATION]
FlowMakerWorkerBuildermaintains an internal dictionary of implementations. Declaring the same flowbox ID twice causes a collision. Always declare each box type exactly once per worker.
Environment/configurable options:
worker_id: Unique worker identifier (required)router_transport_address: ZMQ router socket address (default:tcp://*:5560)runtime_http_address: Scheduler HTTP API endpoint (default:http://localhost:3120)worker_log_socket_io_endpoint: Socket.IO endpoint for logs/widgets (default:http://localhost:3040)worker_transport_adv_address: Advanced transport address for data connection
Pipe Implementation
A minimal pipe that transforms data:
import msgpack
from industream.flowmaker.sdk import (
flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
FlowBoxIO, FlowBoxUIConfig, FlowBoxCore, FlowBoxInitParams
)
@flowbox(FlowBoxRegistrationInfo(
id="myorg/my-python-pipe",
display_name="My Python Pipe",
current_version="1.0.0",
type=FlowBoxType.PIPE,
icon="transform",
stateless=True,
io_interfaces=FlowBoxIOInterfaces(
inputs=[FlowBoxIO(name="input", display_name="Input")],
outputs=[FlowBoxIO(name="output", display_name="Output")]
),
ui_config=FlowBoxUIConfig(
default_options={}
)
))
class MyPipe(FlowBoxCore):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self.job_id = init_params.runtime_context.job_id
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
# Deserialize input using msgpack
input_data = msgpack.unpackb(data)
# Transform
output_data = {
**input_data,
"processed": True,
"timestamp": "2026-04-14T12:00:00Z"
}
# Log the processing
self.logger.log(f"Processed: {output_data}")
# Push downstream using the same header (preserves serialization method)
await self.push("output", header, msgpack.packb(output_data))
[NOTE!sync/BACKPRESSURE HANDLING] The
await self.push()call blocks until the downstream receiver signals readiness viaCAN_SEND_NEXT. Long-running processing inon_input()delays acknowledgment and can stall the entire pipeline—offload heavy work to thread pools or async queues.
[WARNING!warning/HEADER REUSE] Reusing the incoming
headerbytes for output preserves the serialization method. For downstream data flow, this is correct. For control messages (e.g., custom ACKs), create a new header with the appropriate event ID.
Source Implementation
A source that emits random values periodically:
import asyncio
import msgpack
import random
from collections.abc import Callable
from industream.flowmaker.sdk import (
flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
FlowBoxIO, FlowBoxUIConfig, FlowBoxRaw, FlowBoxSource, FlowBoxInitParams,
FlowBoxWidgetTitleElement, FlowBoxWidgetValueElement, FlowBoxWidgetIconElement,
WidgetEvent
)
@flowbox(FlowBoxRegistrationInfo(
id="myorg/random-generator",
display_name="Random Data Generator",
current_version="1.0.0",
type=FlowBoxType.SOURCE,
icon="123",
io_interfaces=FlowBoxIOInterfaces(
inputs=[],
outputs=[FlowBoxIO(name="output", display_name="Output")]
),
ui_config=FlowBoxUIConfig(
default_options={
"fieldName": "random",
"pushIntervalMs": 1000
}
)
))
class RandomGenerator(FlowBoxRaw, FlowBoxSource):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self.job_id = init_params.runtime_context.job_id
self.field_name = str(init_params.options["fieldName"])
self.push_interval_s = int(init_params.options["pushIntervalMs"]) / 1000
self._create_widget()
def _create_widget(self) -> None:
self.widget_manager \
.set_context_value("liveRandomNumber", 0) \
.set_context_value("genRandomNumber", 0)
self.widget_manager.add_widget("live-random-number", "123") \
.add_widget_element("live-random-number", "1-title", FlowBoxWidgetTitleElement(
title="Random Data Generator",
subtitle="Random number live-generated"
)) \
.add_widget_element("live-random-number", "2-value", FlowBoxWidgetValueElement(
contents="Random: {{liveRandomNumber}}",
icon="arrow_circle_right"
))
self.widget_manager.add_widget("on-demand-random-number", "123") \
.add_widget_element("on-demand-random-number", "1-title", FlowBoxWidgetTitleElement(
title="Random Data Generator",
subtitle="Random number generated on demand"
)) \
.add_widget_element("on-demand-random-number", "2-value", FlowBoxWidgetValueElement(
contents="Random: {{genRandomNumber}}",
icon="arrow_circle_right"
)) \
.add_widget_element("on-demand-random-number", "3-generate", FlowBoxWidgetIconElement(
icon="change_circle"
))
# Handle widget events
self.widget_manager.on_event_received(
lambda e: self.widget_manager.set_context_value("genRandomNumber", random.random())
)
self.widget_manager.publish_widgets()
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
async def emit_async() -> None:
await asyncio.sleep(self.push_interval_s)
random_value = random.random()
output = {self.field_name: random_value}
# Update widget
self.widget_manager.set_context_value("liveRandomNumber", random_value)
# Emit to downstream
fn_ready_for_next_item(header, msgpack.packb(output))
asyncio.create_task(emit_async())
[NOTE!info/EAGER EMISSION PATTERN] Source boxes like
RandomGeneratortypically emit asynchronously without waiting for input. Theon_output_ready()callback is invoked when downstream is ready, but the actual emission happens after a delay (viaasyncio.sleep). This is the key behavioral difference from pipes/sinks—they initiate the data flow.
[ERROR!error/ASYNC FIRE_AND_FORGET] In
on_output_ready(), you must start the async task (e.g.,asyncio.create_task()). If youawaitdirectly, you block the callback and the runtime waits forever. The pattern is: start task → return immediately → task callsfn_ready_for_next_item()when ready.
Sink Implementation
A sink that processes incoming data:
import msgpack
from industream.flowmaker.sdk import (
flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
FlowBoxIO, FlowBoxUIConfig, FlowBoxCore, FlowBoxInitParams
)
@flowbox(FlowBoxRegistrationInfo(
id="myorg/my-sink",
display_name="My Python Sink",
current_version="1.0.0",
type=FlowBoxType.SINK,
icon="save",
stateless=True,
io_interfaces=FlowBoxIOInterfaces(
inputs=[FlowBoxIO(name="input", display_name="Input")],
outputs=[]
),
ui_config=FlowBoxUIConfig(
default_options={}
)
))
class MySink(FlowBoxCore):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self.job_id = init_params.runtime_context.job_id
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
# Deserialize using msgpack
input_data = msgpack.unpackb(data)
# Process data (e.g., save to database)
print(f"Received: {input_data}")
# Log the processing
self.logger.log({
"message": "Processed data",
"input": input_data
})
def on_destroy(self) -> None:
super().on_destroy()
print(f"Sink destroyed for job {self.job_id}")
[NOTE!sync/NO OUTPUT NEEDED] Sinks don't implement
FlowBoxSource, so they never callpush(). The pipeline ends at the sink—no downstream acknowledgment is needed.
Key Implementation Patterns
Logger Usage
# Log simple message
self.logger.log("Processing started")
# Log structured data
self.logger.log({
"message": "Data processed",
"count": self.counter,
"input": input_data
})
The logger automatically includes jobId and nodeId in the payload.
Widget Management
# Create widget with title
self.widget_manager.add_widget("my-widget", "123") \
.add_widget_element("my-widget", "title", FlowBoxWidgetTitleElement(
title="My Widget",
subtitle="Shows data"
))
# Update context value (used in widget templates)
self.widget_manager.set_context_value("myValue", 42)
# Publish to frontend
self.widget_manager.publish_widgets()
# Listen for events
self.widget_manager.on_event_received(lambda e: self.handle_event(e))
Lifecycle Hooks
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
# Setup: initialize counters, widgets, etc.
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
# Process incoming data (FlowBoxCore only)
pass
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
# Emit data (FlowBoxSource only)
pass
def on_destroy(self) -> None:
super().on_destroy() # Disposes logger and widget manager
# Cleanup: close connections, etc.
[INFO!info/AUTO DISPOSAL]
FlowBoxRaw.on_destroy()automatically disposes the logger and widget manager sockets. Always callsuper().on_destroy()to ensure clean shutdown and prevent resource leaks.
Serialization
Python SDK uses msgpack by default. JSON is also supported via SerializationFormat.JSON.
import msgpack
# Serialize
data = msgpack.packb({"key": "value"})
# Deserialize
obj = msgpack.unpackb(data)
[NOTE!info/BINARY DATA]
msgpackhandles bytes natively. If you need to send binary data, usemsgpack.packb(data, use_bin_type=True)andmsgpack.unpackb(data, raw=False).
Common Pitfalls
Missing @flowbox Decorator
# !!! WRONG !!!: Missing decorator
class MyBox(FlowBoxCore):
async def on_input(...): ...
# +++ CORRECT +++: Decorated with registration info
@flowbox(FlowBoxRegistrationInfo(...))
class MyBox(FlowBoxCore):
async def on_input(...): ...
[ERROR!error/DECORATOR REQUIRED] Without
@flowbox(), theFlowMakerWorkerBuildercan't extract registration info and raisesAttributeError: Missing registration info.
Forgetting async in on_input
# !!! WRONG !!!: Synchronous method
def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
await self.push(...) # TypeError: 'await' outside async function
# +++ CORRECT +++: Async method
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
await self.push(...)
Blocking in on_output_ready
# !!! WRONG !!!: Blocking the callback
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
time.sleep(1) # Blocks runtime!
fn_ready_for_next_item(header, data)
# +++ CORRECT +++: Async task
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
async def emit():
await asyncio.sleep(1)
fn_ready_for_next_item(header, data)
asyncio.create_task(emit())
[WARNING!warning/EVENT LOOP BLOCKING] Python's event loop is single-threaded. Blocking operations (e.g.,
time.sleep(), sync I/O) freeze the entire worker. Useasyncio.sleep(), async libraries, orloop.run_in_executor()for blocking code.