FlowBoxLogger API
The FlowMakerLogger class provides logging capabilities for FlowBox workers. It sends log messages to the runtime over a Socket.IO connection, where they appear in the FlowMaker UI.
Overview
from industream.flowmaker.sdk import FlowMakerLogger, FlowRuntimeContext
# Logger is automatically created as a property on FlowBoxRaw
class MyBox(FlowBoxCore):
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
self.logger.log("Processing data") # ✅ Use the logger property
The logger is lazily initialized on first access and automatically disposed when the FlowBox is destroyed.
Class: FlowMakerLogger
Constructor
def __init__(self, socketio_url: str, runtime_context: FlowRuntimeContext) -> None:
Parameters:
socketio_url: Socket.IO endpoint URL (e.g.,http://localhost:3040)runtime_context: Containsjob_idandnode_idfor log attribution
[INFO!info/AUTO INITIALIZATION] You don't typically construct
FlowMakerLoggerdirectly. It's created automatically when you accessself.loggeron aFlowBoxRawsubclass. The constructor uses the worker's Socket.IO endpoint and the FlowBox's runtime context.
Instance Methods
log()
def log(self, message: Any) -> None:
Parameters:
message: Any serializable data (str, dict, list, etc.)
Description:
Emits a log message to the Socket.IO channel log. The message is wrapped with metadata including the job ID and node ID.
Payload format:
{
"from": {
"jobId": "job-123",
"nodeId": "node-abc"
},
"data": message # Your actual log content
}
Examples:
# Simple string message
self.logger.log("Processing started")
# Structured data
self.logger.log({
"message": "Data processed",
"count": 42,
"input": {"temperature": 21.7}
})
# Error with traceback
import traceback
try:
risky_operation()
except Exception as e:
self.logger.log({
"error": str(e),
"traceback": traceback.format_exc()
})
[NOTE!info/LOG STRUCTURE] All log messages include a
fromfield withjobIdandnodeId. This allows the runtime to route logs to the correct job view in the UI. Thedatafield contains your actual log content.
[WARNING!warning/SYNC OPERATION] The
log()method is synchronous—it emits via Socket.IO and returns immediately. However, if the Socket.IO connection is slow or disconnected, this can block the event loop. For high-frequency logging, consider batching or sampling.
dispose()
def dispose(self) -> None:
Description:
Closes the Socket.IO connection and releases resources. Called automatically by FlowBoxRaw.on_destroy().
Example:
def on_destroy(self) -> None:
super().on_destroy() # Automatically disposes logger and widget manager
# No need to call self.logger.dispose() manually
[ERROR!error/RESOURCE LEAK] If you create a
FlowMakerLoggermanually (not viaself.logger), you must calldispose()when done. Failing to do so keeps the Socket.IO connection alive, causing resource leaks and preventing clean worker shutdown.
Lazy Initialization
The logger is created on first access:
class MyBox(FlowBoxCore):
def __init__(self, init_params: FlowBoxInitParams) -> None:
super().__init__(init_params)
# Logger not created yet
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
# Logger created on first access
self.logger.log("First log message")
Implementation:
@property
def logger(self) -> FlowMakerLogger:
if self._logger is None:
self._logger = FlowMakerLogger(
self._init_params._worker_options.worker_log_socket_io_endpoint,
self._init_params.runtime_context
)
return self._logger
[NOTE!lightbulb/LAZY LOADING] Lazy initialization means the Socket.IO connection is only established when you actually log something. This saves resources for FlowBoxes that don't need logging.
Socket.IO Connection Details
The logger uses Socket.IO to emit events:
- Channel:
log - Event payload:
{"from": {...}, "data": ...} - Connection URL: From
worker_log_socket_io_endpointoption - Namespace: Job ID (from
runtime_context.job_id)
Connection lifecycle:
- Created on first
self.loggeraccess - Remains open for the FlowBox's lifetime
- Closed when
dispose()is called (viaon_destroy())
[INFO!info/CONNECTION SHARING] Each FlowBox instance has its own logger with its own Socket.IO connection. Multiple FlowBoxes in the same worker process each maintain separate connections.
Common Patterns
Logging Progress
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
self.logger.log(f"Starting processing of {input_name}")
# Process in stages
stage1_result = await self.process_stage1(data)
self.logger.log({"stage": "stage1", "result": stage1_result})
stage2_result = await self.process_stage2(stage1_result)
self.logger.log({"stage": "stage2", "result": stage2_result})
self.logger.log("Processing complete")
Error Handling
async def on_input(self, input_name: str, header: bytes, data: bytes) -> None:
try:
result = await self.process(data)
self.logger.log({"status": "success", "result": result})
except Exception as e:
self.logger.log({
"status": "error",
"error": str(e),
"input_name": input_name
})
raise # Re-raise to fail the message
Structured Logging
# Instead of:
self.logger.log(f"Processed {count} items in {duration_ms}ms")
# Use:
self.logger.log({
"action": "process_batch",
"count": count,
"duration_ms": duration_ms,
"success": True
})
[NOTE!lightbulb/STRUCTURED LOGS] The runtime can parse structured logs (dicts) for filtering and aggregation. Plain strings are harder to query. Prefer structured logs for metrics and events.
Differences from TypeScript SDK
| Feature | Python FlowMakerLogger |
TypeScript FlowBoxLogger |
|---|---|---|
| Initialization | Lazy via self.logger property |
Manual via flowBoxLogger singleton |
| Ref counting | Not needed (per-instance) | Required (increaseRefCount/decreaseRefCount) |
| Disposal | Auto via on_destroy() |
Manual or auto via on_destroy() |
| Socket.IO channel | log |
log |
[INFO!info/NO REF COUNTING] Python's
FlowMakerLoggeris per-instance and doesn't use reference counting. TypeScript'sflowBoxLoggeris a singleton shared across all FlowBoxes, hence the need for ref counting to manage the shared socket lifecycle.
Troubleshooting
Logs Not Appearing
- Check Socket.IO endpoint: Ensure
worker_log_socket_io_endpointpoints to the correct runtime URL - Verify job ID: Logs are scoped to
jobId—check you're viewing the right job in the UI - Connection errors: Check worker logs for Socket.IO connection failures
Duplicate Logs
If logs appear twice, check:
- Multiple worker instances processing the same job
on_destroy()being called multiple times (shouldn't happen)
Memory Leaks
If memory grows over time:
- Ensure
on_destroy()callssuper().on_destroy() - Check for manually created loggers that aren't disposed