FlowBoxCore and FlowBoxRaw: Base Classes for Python Workers

This page explains the two base classes you can extend when implementing FlowBox workers in Python: FlowBoxRaw and FlowBoxCore. Understanding the difference is crucial for choosing the right level of abstraction.

Quick Comparison

Feature FlowBoxRaw FlowBoxCore
Abstraction Level Low-level, manual High-level, async-friendly
Input Handling Implement on_input_received() directly Implement abstract on_input()
Output Handling Implement on_output_ready() directly Use await self.push()
Backpressure Manual via callback Automatic via Promise resolution
Best For Custom protocols, fine-grained control Typical transforms, aggregations

[NOTE!lightbulb/CHOOSE YOUR BASE] Start with FlowBoxCore for 90% of use cases. Only use FlowBoxRaw if you need to implement custom protocols or have very specific control requirements (e.g., the RandomDataGeneratorBox source example).


FlowBoxRaw: The Low-Level Base Class

FlowBoxRaw is the minimal abstract base class that all FlowBox implementations ultimately inherit from. It provides lazy-initialized logger and widget_manager properties but no helper methods for data flow.

Class Definition

class FlowBoxRaw(ABC):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        self._init_params = init_params
        self._logger: FlowMakerLogger | None = None
        self._widget_manager: FlowMakerWidgetManager | None = None

    @property
    def logger(self) -> FlowMakerLogger: ...
    @property
    def widget_manager(self) -> FlowMakerWidgetManager: ...
    def on_destroy(self) -> None: ...

Properties

logger: FlowMakerLogger

@property
def logger(self) -> FlowMakerLogger:
    if self._logger is None:
        self._logger = FlowMakerLogger(
            self._init_params._worker_options.worker_log_socket_io_endpoint,
            self._init_params.runtime_context
        )
    return self._logger

Description: Lazy-initialized logger for sending log messages to the runtime. Created on first access.

[INFO!info/LAZY INITIALIZATION] The logger is only created when you first access self.logger. This saves resources for FlowBoxes that don't need logging.

widget_manager: FlowMakerWidgetManager

@property
def widget_manager(self) -> FlowMakerWidgetManager:
    if self._widget_manager is None:
        self._widget_manager = FlowMakerWidgetManager(
            self._init_params._worker_options.worker_log_socket_io_endpoint,
            self._init_params.runtime_context
        )
    return self._widget_manager

Description: Lazy-initialized widget manager for updating the FlowBox UI. Created on first access.

Methods

on_destroy()

def on_destroy(self) -> None:
    if self._logger is not None:
        self._logger.dispose()

    if self._widget_manager is not None:
        self._widget_manager.dispose()

Description: Called when the FlowBox is being destroyed. Disposes the logger and widget manager sockets.

[ERROR!error/ALWAYS CALL SUPER] Always call super().on_destroy() in subclasses to ensure proper cleanup. Forgetting this keeps sockets alive, causing resource leaks and preventing clean worker shutdown.


FlowBoxSource Interface

Sources emit data without receiving input. Implement on_output_ready() to push data downstream.

Method Signature

class FlowBoxSource(ABC):
    @abstractmethod
    def on_output_ready(self, output_name: str, header: bytes,
                        fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None: ...

Parameters:

  • output_name: String identifier of the output port (e.g., "output")
  • header: Bytes containing the message header (serialization method + event ID)
  • fn_ready_for_next_item: Callback to invoke when you have data ready to send

Description: Called by the runtime when downstream is ready to receive. Invoke fn_ready_for_next_item(data, header) with your payload.

[WARNING!warning/EAGER EMISSION PATTERN] Sources typically emit asynchronously in on_output_ready() without waiting for input. This initiates the data flow. If you don't call fn_ready_for_next_item(), the pipeline stalls—downstream never receives data.

Example: Source with Async Emission

import asyncio
import msgpack
from collections.abc import Callable
from industream.flowmaker.sdk import (
    flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
    FlowBoxIO, FlowBoxUIConfig, FlowBoxRaw, FlowBoxSource, FlowBoxInitParams
)

@flowbox(FlowBoxRegistrationInfo(
    id="myorg/interval-source",
    display_name="Interval Source",
    current_version="1.0.0",
    type=FlowBoxType.SOURCE,
    icon="timer",
    io_interfaces=FlowBoxIOInterfaces(
        inputs=[],
        outputs=[FlowBoxIO(name="output", display_name="Output")]
    ),
    ui_config=FlowBoxUIConfig(
        default_options={"interval_ms": 1000}
    )
))
class IntervalSource(FlowBoxRaw, FlowBoxSource):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self.interval_s = int(init_params.options["interval_ms"]) / 1000
        self.counter = 0

    def on_output_ready(self, output_name: str, header: bytes,
                        fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
        async def emit():
            await asyncio.sleep(self.interval_s)
            self.counter += 1
            data = msgpack.packb({"counter": self.counter})
            fn_ready_for_next_item(header, data)

        asyncio.create_task(emit())

[ERROR!error/ASYNC FIRE_AND_FORGET] In on_output_ready(), you must start an async task (e.g., asyncio.create_task()). If you await directly, you block the callback and the runtime waits forever. The pattern is: start task → return immediately → task calls fn_ready_for_next_item() when ready.


FlowBoxSink Interface

Sinks receive data without emitting output. Implement on_input_received() to process incoming messages.

Method Signature

class FlowBoxSink(ABC):
    @abstractmethod
    def on_input_received(self, input_name: str, header: bytes, data: bytes,
                          fn_ready_for_next_item: Callable[[], None]) -> None: ...

Parameters:

  • input_name: String identifier of the input port
  • header: Bytes containing the message header
  • data: Bytes containing the serialized payload
  • fn_ready_for_next_item: Callback to invoke when ready for the next message

Description: Called when a message arrives on an input port. Process the data and call fn_ready_for_next_item() to signal readiness for the next message.

[NOTE!sync/BACKPRESSURE SIGNALING] Calling fn_ready_for_next_item() signals to the runtime that you can accept another message. For long-running processing, await completion before calling the callback to avoid overwhelming the sink.

Example: Raw Sink

import msgpack
from industream.flowmaker.sdk import (
    flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
    FlowBoxIO, FlowBoxRaw, FlowBoxSink, FlowBoxInitParams
)

@flowbox(FlowBoxRegistrationInfo(
    id="myorg/raw-sink",
    display_name="Raw Sink",
    current_version="1.0.0",
    type=FlowBoxType.SINK,
    icon="save",
    io_interfaces=FlowBoxIOInterfaces(
        inputs=[FlowBoxIO(name="input", display_name="Input")],
        outputs=[]
    )
))
class RawSink(FlowBoxRaw, FlowBoxSink):
    def on_input_received(self, input_name: str, header: bytes, data: bytes,
                          fn_ready_for_next_item: Callable[[], None]) -> None:
        input_data = msgpack.unpackb(data)

        # Synchronous processing
        print(f"Received: {input_data}")
        self.logger.log({"message": "Processed", "data": input_data})

        # Signal readiness immediately
        fn_ready_for_next_item()

    def on_destroy(self) -> None:
        super().on_destroy()
        print("Sink destroyed")

FlowBoxCore: The High-Level Base Class

FlowBoxCore implements both FlowBoxSource and FlowBoxSink, providing async-friendly methods that hide the complexity of the low-level callbacks.

Class Definition

class FlowBoxCore(FlowBoxRaw, FlowBoxSink, FlowBoxSource):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self._output_synchronizers: dict[str, OutputSynchronizer] = {}

    @abstractmethod
    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None: ...

Abstract Methods

on_input()

@abstractmethod
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None: ...

Parameters:

  • input_name: String identifier of the input port
  • header: Bytes containing the message header
  • data: Bytes containing the serialized payload

Description: Override this method to implement your pipe/sink logic. The runtime automatically calls fn_ready_for_next_item() after your coroutine completes.

[NOTE!async/AWAIT NATURALNESS] Unlike FlowBoxSink.on_input_received(), you don't manually call a readiness callback. Just await your async operations and return—the base class handles the rest. This makes FlowBoxCore ideal for database queries, API calls, or any async processing.

Example: Core Pipe

import msgpack
from industream.flowmaker.sdk import (
    flowbox, FlowBoxRegistrationInfo, FlowBoxType, FlowBoxIOInterfaces,
    FlowBoxIO, FlowBoxCore, FlowBoxInitParams
)

@flowbox(FlowBoxRegistrationInfo(
    id="myorg/core-pipe",
    display_name="Core Pipe",
    current_version="1.0.0",
    type=FlowBoxType.PIPE,
    icon="transform",
    io_interfaces=FlowBoxIOInterfaces(
        inputs=[FlowBoxIO(name="input", display_name="Input")],
        outputs=[FlowBoxIO(name="output", display_name="Output")]
    )
))
class CorePipe(FlowBoxCore):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self.job_id = init_params.runtime_context.job_id

    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        input_data = msgpack.unpackb(data)

        # Async processing
        output_data = await self.transform(input_data)

        # Push downstream
        await self.push("output", header, msgpack.packb(output_data))

    async def transform(self, data: dict) -> dict:
        # Simulate async work (e.g., API call, DB query)
        await asyncio.sleep(0.1)
        return {**data, "transformed": True}

Instance Methods

push()

async def push(self, output_name: str, header: bytes, data: bytes,
               output_buffer_size: int = 0) -> None:

Parameters:

  • output_name: String identifier of the output port (e.g., "output")
  • header: Bytes containing the message header
  • data: Bytes containing the serialized payload
  • output_buffer_size: Optional buffer count for flow control (default: 0)

Description: Push data to an output port. Returns a coroutine that completes when downstream acknowledges readiness.

[NOTE!sync/BUFFERING BEHAVIOR] The output_buffer_size parameter controls how many items can be buffered before backpressure kicks in. With output_buffer_size = 0 (default), each push() waits for acknowledgment. Set to a positive number to allow batching, but be aware this increases memory usage.

on_output_ready() (inherited)

def on_output_ready(self, output_name: str, header: bytes,
                    fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:

Description: Implemented by FlowBoxCore to integrate with the internal RxPY subject system. Typically you don't override this—use push() instead.

on_input_received() (inherited)

def on_input_received(self, input_name: str, header: bytes, data: bytes,
                      fn_ready_for_next_item: Callable[[], None]) -> None:

Description: Implemented by FlowBoxCore to call your on_input() implementation and automatically invoke fn_ready_for_next_item() after the coroutine completes.


OutputSynchronizer: Internal Mechanism

FlowBoxCore uses OutputSynchronizer instances to coordinate input/output for each output port:

class OutputSynchronizer:
    def __init__(self) -> None:
        self.msg_count_in_buffer = 0
        self.push_subject: Subject[tuple[bytes, bytes, Callable[[], None]]] = Subject()
        self.output_ready_subject: Subject[Callable[[bytes, bytes], None]] = Subject()

        zip(self.push_subject, self.output_ready_subject) \
            .subscribe(lambda pair: self._on_next(*pair))

    def _on_next(self, push_info: tuple[bytes, bytes, Callable[[], None]],
                 fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
        data_to_push, header_to_push, complete_push = push_info

        self.msg_count_in_buffer -= 1
        fn_ready_for_next_item(data_to_push, header_to_push)
        complete_push()

[INFO!info/ZIP COORDINATION] The zip() operator (from RxPY) ensures that every push() (output) is matched with an on_output_ready() (input from downstream). This creates a lock-step flow control mechanism.

msg_count_in_buffer Mechanism

The msg_count_in_buffer field tracks buffered items:

  • Incremented on push()
  • Decreased when downstream acknowledges
  • If msg_count_in_buffer < output_buffer_size, push() completes immediately without waiting

This enables configurable batching while preventing unbounded memory growth.


Key Differences: Core vs Raw

1. Input Handling

FlowBoxRaw + FlowBoxSink:

def on_input_received(self, input_name: str, header: bytes, data: bytes,
                      fn_ready_for_next_item: Callable[[], None]) -> None:
    input_data = msgpack.unpackb(data)
    # Synchronous processing
    self.logger.log({"data": input_data})
    fn_ready_for_next_item()  # Manual callback

FlowBoxCore:

async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
    input_data = msgpack.unpackb(data)
    # Async processing with await
    await self.process(input_data)
    # No callback needed—runtime handles it

[ERROR!error/CALLBACK FORGETTING] With FlowBoxRaw, forgetting to call fn_ready_for_next_item() blocks the entire pipeline. With FlowBoxCore, the async approach makes this impossible—you either return or raise, and the runtime handles both cases.

2. Output Handling

FlowBoxRaw + FlowBoxSource:

def on_output_ready(self, output_name: str, header: bytes,
                    fn_ready_for_next_item: Callable[[bytes, bytes], None]) -> None:
    data = msgpack.packb({"value": 42})
    fn_ready_for_next_item(header, data)  # Immediate push

FlowBoxCore:

# In on_input() or other methods
await self.push("output", header, data)  # Async push with backpressure handling

[INFO!info/PUSH SEMANTICS] FlowBoxCore.push() is async and waits for downstream acknowledgment. FlowBoxRaw's fn_ready_for_next_item() is synchronous—the data is queued immediately, but you lose the ability to await completion.

3. Backpressure

FlowBoxRaw: Manual control via callback timing. You decide when to call fn_ready_for_next_item().

FlowBoxCore: Automatic via coroutine resolution. The runtime tracks msg_count_in_buffer and only resolves push() when downstream is ready.

[WARNING!warning/DEADLOCK RISK] With FlowBoxRaw, circular dependencies (A pushes to B, B pushes to A in the same callback) cause deadlocks. FlowBoxCore mitigates this with its RxPY subject-based buffering, but circular data flow is still an anti-pattern.


When to Use Each

Use FlowBoxCore when:

  • Building typical pipes that transform data
  • Need async processing (DB queries, API calls)
  • Want simpler, more maintainable code
  • Don't need fine-grained control over message timing

Use FlowBoxRaw when:

  • Implementing sources that emit on schedule
  • Need custom protocol handling
  • Require precise control over message timing
  • Building specialized boxes with unique lifecycle needs

[NOTE!lightbulb/MOSTLY CORE] The IntervalSource example uses FlowBoxRaw because it needs to emit asynchronously in on_output_ready(). For pipes and sinks, FlowBoxCore is almost always the better choice.


Lifecycle Hooks

Both FlowBoxRaw and FlowBoxCore support these lifecycle methods:

Constructor

def __init__(self, init_params: FlowBoxInitParams) -> None:
    super().__init__(init_params)
    # Access runtime context
    job_id = init_params.runtime_context.job_id
    # Access options
    my_option = init_params.options.get("myOption", "default")

on_destroy()

def on_destroy(self) -> None:
    super().on_destroy()  # Disposes logger and widget manager
    # Additional cleanup: close DB connections, etc.

[ERROR!error/REF COUNT LEAK] Always call super().on_destroy() to ensure proper disposal of logger and widget manager sockets. Forgetting this keeps sockets alive, preventing worker shutdown.


Common Patterns

Accessing Runtime Context

class MyBox(FlowBoxCore):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self.job_id = init_params.runtime_context.job_id
        self.node_id = init_params.runtime_context.node_id
        self.flowbox_id = init_params.runtime_context.flowbox_id
        self.options = init_params.options  # User-provided options

Combining with Widgets

class WidgetBox(FlowBoxCore):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        self.widget_manager \
            .add_widget("counter", "123") \
            .add_widget_element("counter", "title", FlowBoxWidgetTitleElement(
                title="Counter",
                subtitle="Counts messages"
            )) \
            .add_widget_element("counter", "value", FlowBoxWidgetValueElement(
                contents="Count: {{count}}",
                icon="exposure_plus_1"
            )) \
            .set_context_value("count", 0) \
            .publish_widgets()

    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        count = self._get_count() + 1
        self._set_count(count)
        self.widget_manager.set_context_value("count", count)

    def _get_count(self) -> int:
        # Retrieve from context or state
        return 0

    def _set_count(self, count: int) -> None:
        # Store in state
        pass

Differences from TypeScript SDK

Feature Python SDK TypeScript SDK
Base class name FlowBoxRaw, FlowBoxCore FlowBoxRaw, FlowBoxCore
Input method async def on_input() protected async onInput()
Output method async def push() public async push()
Rx library RxPY (reactivex) RxJS
Callback type Callable[[bytes, bytes], None] (data: Buffer, header: Buffer) => any

[INFO!info/SAME PATTERN] Python and TypeScript SDKs follow the same design pattern. The main differences are Python-specific (async/await syntax, type hints, RxPY vs RxJS).