Core Concepts: Sources, Pipes, and Sinks
FlowMaker executes every graph using three box roles:
- Source: emits data into the flow.
- Pipe: receives data, transforms/enriches/filters, emits downstream.
- Sink: consumes data and terminates a branch (storage, API call, notification, control command).
At runtime, each graph node becomes a RunNode with type 'source' | 'pipe' | 'sink', bound to a worker connection and managed by RunNodeManager.
Diagram: role responsibilities and flow direction
The diagram shows the role boundary pattern used throughout FlowMaker graphs: source emits, pipe transforms, sink commits side effects.
Why this split exists
This role separation keeps logic explicit and operationally safe:
- Sources isolate ingestion concerns (protocols, polling, offsets).
- Pipes isolate business logic (normalization, rules, enrichment, aggregation).
- Sinks isolate side effects (writes, notifications, actuation), making retries and observability clearer.
[NOTE!lightbulb/ROLE BOUNDARIES] If a behavior touches external systems (database writes, ERP updates, notifications), model it as a sink concern so retry semantics stay explicit and auditable.
Runtime mapping
class RunNode {
readonly nodeId: string;
readonly type: 'source' | 'sink' | 'pipe';
async initialize(connectedInputs: string[], connectedOutputs: string[]): Promise<RunReply>;
async sendInput(inputId: string, data: Buffer, header: Buffer): Promise<RunReply>;
async awaitOutputAck(outputId: string): Promise<RunReply>;
async destroy(): Promise<RunReply>;
}
Each node uses the same runtime contract, but role-specific behavior is implemented inside the worker's FlowBox class.
Worker registration contract (TypeScript)
export interface FlowBoxRegistrationInfo {
id: string;
displayName: string;
currentVersion: string;
type: 'source' | 'sink' | 'pipe';
ioInterfaces: {
inputs: FlowBoxRegistrationIOInput[];
outputs: FlowBoxRegistrationIOInput[];
};
}
This is what allows the launcher to validate graph structure before execution and detect mismatches early.
Real-life examples
Example A: Energy monitoring
- Source: Modbus poller reads active/reactive power from meters.
- Pipe: Unit converter transforms raw register values into engineering units (kW, kvar).
- Pipe: Aggregator computes 1-minute rolling averages.
- Sink: Time-series sink writes into historian + alert sink for threshold violations.
Example B: Cold chain traceability
- Source: MQTT ingestion receives truck sensor payloads.
- Pipe: Geo-enrichment adds depot/route metadata.
- Pipe: Compliance evaluator flags temperature excursions.
- Sink: ERP connector writes compliance events; notification sink emits incidents.
Example C: Factory quality loop
- Source: Vision system events.
- Pipe: Defect classifier maps detections to defect codes.
- Sink: MES sink creates non-conformance records.
Practical guidance
- Start by choosing node roles first, then pick specific boxes.
- Keep sources and sinks thin; put reusable logic in pipes.
- Prefer multiple simple pipes over one monolithic pipe for easier testing and retries.
- If a sink might be slow, rely on backpressure (see dedicated section) instead of ad-hoc in-memory buffers.
[WARNING!schedule/PIPELINE STABILITY] A "fat" pipe that does parsing, enrichment, persistence, and alerting at once becomes hard to retry safely and usually hides the true bottleneck.
Runtime references
platform-backend/docs/runtime-v2.md(architecture +RunNodemodel)platform-backend/src/runner/run-node.class.tsplatform-backend/src/runner/run-node-manager.class.tssdk/typescript/src/flowbox-registration-info.tssdk/typescript/src/flow-element.types.ts