FlowBoxCore and FlowBoxRaw: Base Classes for Python Workers
This page explains the two base classes you can extend when implementing FlowBox workers in Python: FlowBoxRaw and FlowBoxCore. Understanding the difference is crucial for choosing the right level of abstraction.
Quick Comparison
| Feature | FlowBoxRaw |
FlowBoxCore |
|---|---|---|
| Abstraction Level | Low-level, manual | High-level, async-friendly |
| Input Handling | Implement on_input_received() directly |
Implement abstract on_input() |
| Output Handling | Implement on_output_ready() directly |
Use await self.push() |
| Backpressure | Manual via callback | Automatic via Promise resolution |
| Best For | Custom protocols, fine-grained control | Typical transforms, aggregations |
[NOTE!lightbulb/CHOOSE YOUR BASE] Start with
FlowBoxCorefor 90% of use cases. Only useFlowBoxRawif you need to implement custom protocols or have very specific control requirements (e.g., theRandomDataGeneratorBoxsource example).
FlowBoxRaw: The Low-Level Base Class
FlowBoxRaw is the minimal abstract base class that all FlowBox implementations ultimately inherit from. It provides lazy-initialized logger and widget_manager properties but no helper methods for data flow.
Class Definition
class FlowBoxRaw(ABC):
def __init__(self, init_params: FlowBoxInitParams) -> None:
self._init_params = init_params
self._logger: FlowMakerLogger | None = None
self._widget_manager: FlowMakerWidgetManager | None = None
@property
def logger(self) -> FlowMakerLogger: ...
@property
def widget_manager(self) -> FlowMakerWidgetManager: ...
def on_destroy(self) -> None: ...
Properties
logger: FlowMakerLogger
@property
def logger(self) -> FlowMakerLogger:
if self._logger is None:
self._logger = FlowMakerLogger(
self._init_params._worker_options.worker_log_socket_io_endpoint,
self._init_params.runtime_context
)
return self._logger
Description: Lazy-initialized logger for sending log messages to the runtime. Created on first access.
[INFO!info/LAZY INITIALIZATION] The logger is only created when you first access
self.logger. This saves resources for FlowBoxes that don't need logging.
widget_manager: FlowMakerWidgetManager
@property
def widget_manager(self) -> FlowMakerWidgetManager:
if self._widget_manager is None:
self._widget_manager = FlowMakerWidgetManager(
self._init_params._worker_options.worker_log_socket_io_endpoint,
self._init_params.runtime_context
)
return self._widget_manager
Description: Lazy-initialized widget manager for updating the FlowBox UI. Created on first access.
Methods
on_destroy()
def on_destroy(self) -> None:
if self._logger is not None:
self._logger.dispose()
if self._widget_manager is not None:
self._widget_manager.dispose()
Description: Called when the FlowBox is being destroyed. Disposes the logger and widget manager sockets.
[ERROR!error/ALWAYS CALL SUPER] Always call
super().on_destroy()in subclasses to ensure proper cleanup. Forgetting this keeps sockets alive, causing resource leaks and preventing clean worker shutdown.
FlowBoxSource Interface
Sources emit data without receiving input. Implement on_output_ready() to push data downstream.
Method Signature
class FlowBoxSource(ABC):
@abstractmethod
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None: ...
Parameters:
output_name: String identifier of the output port (e.g.,"output")header: Bytes containing the message header (serialization method + event ID)fn_ready_for_next_item: Callback to invoke when you have data ready to send
Description:
Called by the runtime when downstream is ready to receive. Invoke fn_ready_for_next_item(data, header) with your payload.
[WARNING!warning/EAGER EMISSION PATTERN] Sources typically emit asynchronously in
on_output_ready()without waiting for input. This initiates the data flow. If you don't callfn_ready_for_next_item(), the pipeline stalls—downstream never receives data.
Example: Source with Async Emission
import asyncio
import msgpack
from collections.abc import Callable
from industream.flowmaker.sdk import (
flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
FlowBoxIO, FlowBoxUIConfig, FlowBoxRaw, FlowBoxSource, FlowBoxInitParams
)
@flowbox(FlowBoxRegistrationInfo(
id="myorg/interval-source",
display_name="Interval Source",
current_version="1.0.0",
type=FlowBoxType.SOURCE,
icon="timer",
io_interfaces=FlowBoxIOInterfaces(
inputs=[],
outputs=[FlowBoxIO(name="output", display_name="Output")]
),
ui_config=FlowBoxUIConfig(
default_options={"interval_ms": 1000}
)
))
class IntervalSource(FlowBoxRaw, FlowBoxSource):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self.interval_s = int(init_params.options["interval_ms"]) / 1000
self.counter = 0
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
async def emit():
await asyncio.sleep(self.interval_s)
self.counter += 1
data = msgpack.packb({"counter": self.counter})
fn_ready_for_next_item(header, data)
asyncio.create_task(emit())
[ERROR!error/ASYNC FIRE_AND_FORGET] In
on_output_ready(), you must start an async task (e.g.,asyncio.create_task()). If youawaitdirectly, you block the callback and the runtime waits forever. The pattern is: start task → return immediately → task callsfn_ready_for_next_item()when ready.
FlowBoxSink Interface
Sinks receive data without emitting output. Implement on_input_received() to process incoming messages.
Method Signature
class FlowBoxSink(ABC):
@abstractmethod
def on_input_received(self, input_name: str, header: bytes, data: bytes,
fn_ready_for_next_item: Callable[[], None]) -> None: ...
Parameters:
input_name: String identifier of the input portheader: Bytes containing the message headerdata: Bytes containing the serialized payloadfn_ready_for_next_item: Callback to invoke when ready for the next message
Description:
Called when a message arrives on an input port. Process the data and call fn_ready_for_next_item() to signal readiness for the next message.
[NOTE!sync/BACKPRESSURE SIGNALING] Calling
fn_ready_for_next_item()signals to the runtime that you can accept another message. For long-running processing, await completion before calling the callback to avoid overwhelming the sink.
Example: Raw Sink
import msgpack
from industream.flowmaker.sdk import (
flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
FlowBoxIO, FlowBoxRaw, FlowBoxSink, FlowBoxInitParams
)
@flowbox(FlowBoxRegistrationInfo(
id="myorg/raw-sink",
display_name="Raw Sink",
current_version="1.0.0",
type=FlowBoxType.SINK,
icon="save",
io_interfaces=FlowBoxIOInterfaces(
inputs=[FlowBoxIO(name="input", display_name="Input")],
outputs=[]
)
))
class RawSink(FlowBoxRaw, FlowBoxSink):
def on_input_received(self, input_name: str, header: bytes, data: bytes,
fn_ready_for_next_item: Callable[[], None]) -> None:
input_data = msgpack.unpackb(data)
# Synchronous processing
print(f"Received: {input_data}")
self.logger.log({"message": "Processed", "data": input_data})
# Signal readiness immediately
fn_ready_for_next_item()
def on_destroy(self) -> None:
super().on_destroy()
print("Sink destroyed")
FlowBoxCore: The High-Level Base Class
FlowBoxCore implements both FlowBoxSource and FlowBoxSink, providing async-friendly methods that hide the complexity of the low-level callbacks.
Class Definition
class FlowBoxCore(FlowBoxRaw, FlowBoxSink, FlowBoxSource):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self._output_synchronizers: dict[str, OutputSynchronizer] = {}
@abstractmethod
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None: ...
Abstract Methods
on_input()
@abstractmethod
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None: ...
Parameters:
input_name: String identifier of the input portheader: Bytes containing the message headerdata: Bytes containing the serialized payload
Description:
Override this method to implement your pipe/sink logic. The runtime automatically calls fn_ready_for_next_item() after your coroutine completes.
[NOTE!async/AWAIT NATURALNESS] Unlike
FlowBoxSink.on_input_received(), you don't manually call a readiness callback. Justawaityour async operations and return—the base class handles the rest. This makesFlowBoxCoreideal for database queries, API calls, or any async processing.
Example: Core Pipe
import msgpack
from industream.flowmaker.sdk import (
flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
FlowBoxIO, FlowBoxCore, FlowBoxInitParams
)
@flowbox(FlowBoxRegistrationInfo(
id="myorg/core-pipe",
display_name="Core Pipe",
current_version="1.0.0",
type=FlowBoxType.PIPE,
icon="transform",
io_interfaces=FlowBoxIOInterfaces(
inputs=[FlowBoxIO(name="input", display_name="Input")],
outputs=[FlowBoxIO(name="output", display_name="Output")]
)
))
class CorePipe(FlowBoxCore):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self.job_id = init_params.runtime_context.job_id
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
input_data = msgpack.unpackb(data)
# Async processing
output_data = await self.transform(input_data)
# Push downstream
await self.push("output", header, msgpack.packb(output_data))
async def transform(self, data: dict) -> dict:
# Simulate async work (e.g., API call, DB query)
await asyncio.sleep(0.1)
return {**data, "transformed": True}
Instance Methods
push()
async def push(self, output_name: str, header: bytes, data: bytes,
output_buffer_size: int = 0) -> None:
Parameters:
output_name: String identifier of the output port (e.g.,"output")header: Bytes containing the message headerdata: Bytes containing the serialized payloadoutput_buffer_size: Optional buffer count for flow control (default: 0)
Description: Push data to an output port. Returns a coroutine that completes when downstream acknowledges readiness.
[NOTE!sync/BUFFERING BEHAVIOR] The
output_buffer_sizeparameter controls how many items can be buffered before backpressure kicks in. Withoutput_buffer_size = 0(default), eachpush()waits for acknowledgment. Set to a positive number to allow batching, but be aware this increases memory usage.
on_output_ready() (inherited)
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
Description:
Implemented by FlowBoxCore to integrate with the internal RxPY subject system. Typically you don't override this—use push() instead.
on_input_received() (inherited)
def on_input_received(self, input_name: str, header: bytes, data: bytes,
fn_ready_for_next_item: Callable[[], None]) -> None:
Description:
Implemented by FlowBoxCore to call your on_input() implementation and automatically invoke fn_ready_for_next_item() after the coroutine completes.
OutputSynchronizer: Internal Mechanism
FlowBoxCore uses OutputSynchronizer instances to coordinate input/output for each output port:
class OutputSynchronizer:
def __init__(self) -> None:
self.msg_count_in_buffer = 0
self.push_subject: Subject[tuple[bytes, bytes, Callable[[], None]]] = Subject()
self.output_ready_subject: Subject[Callable[[bytes, bytes], None]] = Subject()
zip(self.push_subject, self.output_ready_subject) \
.subscribe(lambda pair: self._on_next(*pair))
def _on_next(self, push_info: tuple[bytes, bytes, Callable[[], None]],
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
data_to_push, header_to_push, complete_push = push_info
self.msg_count_in_buffer -= 1
fn_ready_for_next_item(data_to_push, header_to_push)
complete_push()
[INFO!info/ZIP COORDINATION] The
zip()operator (from RxPY) ensures that everypush()(output) is matched with anon_output_ready()(input from downstream). This creates a lock-step flow control mechanism.
msg_count_in_buffer Mechanism
The msg_count_in_buffer field tracks buffered items:
- Incremented on
push() - Decreased when downstream acknowledges
- If
msg_count_in_buffer < output_buffer_size,push()completes immediately without waiting
This enables configurable batching while preventing unbounded memory growth.
Key Differences: Core vs Raw
1. Input Handling
FlowBoxRaw + FlowBoxSink:
def on_input_received(self, input_name: str, header: bytes, data: bytes,
fn_ready_for_next_item: Callable[[], None]) -> None:
input_data = msgpack.unpackb(data)
# Synchronous processing
self.logger.log({"data": input_data})
fn_ready_for_next_item() # Manual callback
FlowBoxCore:
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
input_data = msgpack.unpackb(data)
# Async processing with await
await self.process(input_data)
# No callback needed—runtime handles it
[ERROR!error/CALLBACK FORGETTING] With
FlowBoxRaw, forgetting to callfn_ready_for_next_item()blocks the entire pipeline. WithFlowBoxCore, the async approach makes this impossible—you either return or raise, and the runtime handles both cases.
2. Output Handling
FlowBoxRaw + FlowBoxSource:
def on_output_ready(self, output_name: str, header: bytes,
fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
data = msgpack.packb({"value": 42})
fn_ready_for_next_item(header, data) # Immediate push
FlowBoxCore:
# In on_input() or other methods
await self.push("output", header, data) # Async push with backpressure handling
[INFO!info/PUSH SEMANTICS]
FlowBoxCore.push()is async and waits for downstream acknowledgment.FlowBoxRaw'sfn_ready_for_next_item()is synchronous—the data is queued immediately, but you lose the ability toawaitcompletion.
3. Backpressure
FlowBoxRaw: Manual control via callback timing. You decide when to call fn_ready_for_next_item().
FlowBoxCore: Automatic via coroutine resolution. The runtime tracks msg_count_in_buffer and only resolves push() when downstream is ready.
[WARNING!warning/DEADLOCK RISK] With
FlowBoxRaw, circular dependencies (A pushes to B, B pushes to A in the same callback) cause deadlocks.FlowBoxCoremitigates this with its RxPY subject-based buffering, but circular data flow is still an anti-pattern.
When to Use Each
Use FlowBoxCore when:
- Building typical pipes that transform data
- Need async processing (DB queries, API calls)
- Want simpler, more maintainable code
- Don't need fine-grained control over message timing
Use FlowBoxRaw when:
- Implementing sources that emit on schedule
- Need custom protocol handling
- Require precise control over message timing
- Building specialized boxes with unique lifecycle needs
[NOTE!lightbulb/MOSTLY CORE] The
IntervalSourceexample usesFlowBoxRawbecause it needs to emit asynchronously inon_output_ready(). For pipes and sinks,FlowBoxCoreis almost always the better choice.
Lifecycle Hooks
Both FlowBoxRaw and FlowBoxCore support these lifecycle methods:
Constructor
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
# Access runtime context
job_id = init_params.runtime_context.job_id
# Access options
my_option = init_params.options.get("myOption", "default")
on_destroy()
def on_destroy(self) -> None:
super().on_destroy() # Disposes logger and widget manager
# Additional cleanup: close DB connections, etc.
[ERROR!error/REF COUNT LEAK] Always call
super().on_destroy()to ensure proper disposal of logger and widget manager sockets. Forgetting this keeps sockets alive, preventing worker shutdown.
Common Patterns
Accessing Runtime Context
class MyBox(FlowBoxCore):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self.job_id = init_params.runtime_context.job_id
self.node_id = init_params.runtime_context.node_id
self.flowbox_id = init_params.runtime_context.flowbox_id
self.options = init_params.options # User-provided options
Combining with Widgets
class WidgetBox(FlowBoxCore):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
self.widget_manager \
.add_widget("counter", "123") \
.add_widget_element("counter", "title", FlowBoxWidgetTitleElement(
title="Counter",
subtitle="Counts messages"
)) \
.add_widget_element("counter", "value", FlowBoxWidgetValueElement(
contents="Count: {{count}}",
icon="exposure_plus_1"
)) \
.set_context_value("count", 0) \
.publish_widgets()
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
count = self._get_count() + 1
self._set_count(count)
self.widget_manager.set_context_value("count", count)
def _get_count(self) -> int:
# Retrieve from context or state
return 0
def _set_count(self, count: int) -> None:
# Store in state
pass
Differences from TypeScript SDK
| Feature | Python SDK | TypeScript SDK |
|---|---|---|
| Base class name | FlowBoxRaw, FlowBoxCore |
FlowBoxRaw, FlowBoxCore |
| Input method | async def on_input() |
protected async onInput() |
| Output method | async def push() |
public async push() |
| Rx library | RxPY (reactivex) |
RxJS |
| Callback type | Callable[[bytes, bytes], None] |
(data: Buffer, header: Buffer) => any |
[INFO!info/SAME PATTERN] Python and TypeScript SDKs follow the same design pattern. The main differences are Python-specific (async/await syntax, type hints, RxPY vs RxJS).