FlowBoxLogger API

The FlowMakerLogger class provides logging capabilities for FlowBox workers. It sends log messages to the runtime over a Socket.IO connection, where they appear in the FlowMaker UI.

Overview

from industream.flowmaker.sdk import FlowMakerLogger, FlowRuntimeContext

# Logger is automatically created as a property on FlowBoxRaw
class MyBox(FlowBoxCore):
    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        self.logger.log("Processing data")  # ✅ Use the logger property

The logger is lazily initialized on first access and automatically disposed when the FlowBox is destroyed.


Class: FlowMakerLogger

Constructor

def __init__(self, socketio_url: str, runtime_context: FlowRuntimeContext) -> None:

Parameters:

  • socketio_url: Socket.IO endpoint URL (e.g., http://localhost:3040)
  • runtime_context: Contains job_id and node_id for log attribution

[INFO!info/AUTO INITIALIZATION] You don't typically construct FlowMakerLogger directly. It's created automatically when you access self.logger on a FlowBoxRaw subclass. The constructor uses the worker's Socket.IO endpoint and the FlowBox's runtime context.


Instance Methods

log()

def log(self, message: Any) -> None:

Parameters:

  • message: Any serializable data (str, dict, list, etc.)

Description: Emits a log message to the Socket.IO channel log. The message is wrapped with metadata including the job ID and node ID.

Payload format:

{
    "from": {
        "jobId": "job-123",
        "nodeId": "node-abc"
    },
    "data": message  # Your actual log content
}

Examples:

# Simple string message
self.logger.log("Processing started")

# Structured data
self.logger.log({
    "message": "Data processed",
    "count": 42,
    "input": {"temperature": 21.7}
})

# Error with traceback
import traceback
try:
    risky_operation()
except Exception as e:
    self.logger.log({
        "error": str(e),
        "traceback": traceback.format_exc()
    })

[NOTE!info/LOG STRUCTURE] All log messages include a from field with jobId and nodeId. This allows the runtime to route logs to the correct job view in the UI. The data field contains your actual log content.

[WARNING!warning/SYNC OPERATION] The log() method is synchronous—it emits via Socket.IO and returns immediately. However, if the Socket.IO connection is slow or disconnected, this can block the event loop. For high-frequency logging, consider batching or sampling.


dispose()

def dispose(self) -> None:

Description: Closes the Socket.IO connection and releases resources. Called automatically by FlowBoxRaw.on_destroy().

Example:

def on_destroy(self) -> None:
    super().on_destroy()  # Automatically disposes logger and widget manager
    # No need to call self.logger.dispose() manually

[ERROR!error/RESOURCE LEAK] If you create a FlowMakerLogger manually (not via self.logger), you must call dispose() when done. Failing to do so keeps the Socket.IO connection alive, causing resource leaks and preventing clean worker shutdown.


Lazy Initialization

The logger is created on first access:

class MyBox(FlowBoxCore):
    def __init__(self, init_params: FlowBoxInitParams) -> None:
        super().__init__(init_params)
        # Logger not created yet

    async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
        # Logger created on first access
        self.logger.log("First log message")

Implementation:

@property
def logger(self) -> FlowMakerLogger:
    if self._logger is None:
        self._logger = FlowMakerLogger(
            self._init_params._worker_options.worker_log_socket_io_endpoint,
            self._init_params.runtime_context
        )
    return self._logger

[NOTE!lightbulb/LAZY LOADING] Lazy initialization means the Socket.IO connection is only established when you actually log something. This saves resources for FlowBoxes that don't need logging.


Socket.IO Connection Details

The logger uses Socket.IO to emit events:

  • Channel: log
  • Event payload: {"from": {...}, "data": ...}
  • Connection URL: From worker_log_socket_io_endpoint option
  • Namespace: Job ID (from runtime_context.job_id)

Connection lifecycle:

  1. Created on first self.logger access
  2. Remains open for the FlowBox's lifetime
  3. Closed when dispose() is called (via on_destroy())

[INFO!info/CONNECTION SHARING] Each FlowBox instance has its own logger with its own Socket.IO connection. Multiple FlowBoxes in the same worker process each maintain separate connections.


Common Patterns

Logging Progress

async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
    self.logger.log(f"Starting processing of {input_name}")

    # Process in stages
    stage1_result = await self.process_stage1(data)
    self.logger.log({"stage": "stage1", "result": stage1_result})

    stage2_result = await self.process_stage2(stage1_result)
    self.logger.log({"stage": "stage2", "result": stage2_result})

    self.logger.log("Processing complete")

Error Handling

async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
    try:
        result = await self.process(data)
        self.logger.log({"status": "success", "result": result})
    except Exception as e:
        self.logger.log({
            "status": "error",
            "error": str(e),
            "input_name": input_name
        })
        raise  # Re-raise to fail the message

Structured Logging

# Instead of:
self.logger.log(f"Processed {count} items in {duration_ms}ms")

# Use:
self.logger.log({
    "action": "process_batch",
    "count": count,
    "duration_ms": duration_ms,
    "success": True
})

[NOTE!lightbulb/STRUCTURED LOGS] The runtime can parse structured logs (dicts) for filtering and aggregation. Plain strings are harder to query. Prefer structured logs for metrics and events.


Differences from TypeScript SDK

Feature Python FlowMakerLogger TypeScript FlowBoxLogger
Initialization Lazy via self.logger property Manual via flowBoxLogger singleton
Ref counting Not needed (per-instance) Required (increaseRefCount/decreaseRefCount)
Disposal Auto via on_destroy() Manual or auto via on_destroy()
Socket.IO channel log log

[INFO!info/NO REF COUNTING] Python's FlowMakerLogger is per-instance and doesn't use reference counting. TypeScript's flowBoxLogger is a singleton shared across all FlowBoxes, hence the need for ref counting to manage the shared socket lifecycle.


Troubleshooting

Logs Not Appearing

  1. Check Socket.IO endpoint: Ensure worker_log_socket_io_endpoint points to the correct runtime URL
  2. Verify job ID: Logs are scoped to jobId—check you're viewing the right job in the UI
  3. Connection errors: Check worker logs for Socket.IO connection failures

Duplicate Logs

If logs appear twice, check:

  • Multiple worker instances processing the same job
  • on_destroy() being called multiple times (shouldn't happen)

Memory Leaks

If memory grows over time:

  • Ensure on_destroy() calls super().on_destroy()
  • Check for manually created loggers that aren't disposed