Core Concepts: Sources, Pipes, and Sinks

FlowMaker executes every graph using three box roles:

  • Source: emits data into the flow.
  • Pipe: receives data, transforms/enriches/filters, emits downstream.
  • Sink: consumes data and terminates a branch (storage, API call, notification, control command).

At runtime, each graph node becomes a RunNode with type 'source' | 'pipe' | 'sink', bound to a worker connection and managed by RunNodeManager.

Diagram: role responsibilities and flow direction

Source to pipe to sink flow

The diagram shows the role boundary pattern used throughout FlowMaker graphs: source emits, pipe transforms, sink commits side effects.

Why this split exists

This role separation keeps logic explicit and operationally safe:

  • Sources isolate ingestion concerns (protocols, polling, offsets).
  • Pipes isolate business logic (normalization, rules, enrichment, aggregation).
  • Sinks isolate side effects (writes, notifications, actuation), making retries and observability clearer.

[NOTE!lightbulb/ROLE BOUNDARIES] If a behavior touches external systems (database writes, ERP updates, notifications), model it as a sink concern so retry semantics stay explicit and auditable.

Runtime mapping

class RunNode {
  readonly nodeId: string;
  readonly type: 'source' | 'sink' | 'pipe';

  async initialize(connectedInputs: string[], connectedOutputs: string[]): Promise<RunReply>;
  async sendInput(inputId: string, data: Buffer, header: Buffer): Promise<RunReply>;
  async awaitOutputAck(outputId: string): Promise<RunReply>;
  async destroy(): Promise<RunReply>;
}

Each node uses the same runtime contract, but role-specific behavior is implemented inside the worker's FlowBox class.

Worker registration contract (TypeScript)

export interface FlowBoxRegistrationInfo {
  id: string;
  displayName: string;
  currentVersion: string;
  type: 'source' | 'sink' | 'pipe';
  ioInterfaces: {
    inputs: FlowBoxRegistrationIOInput[];
    outputs: FlowBoxRegistrationIOInput[];
  };
}

This is what allows the launcher to validate graph structure before execution and detect mismatches early.

Real-life examples

Example A: Energy monitoring

  • Source: Modbus poller reads active/reactive power from meters.
  • Pipe: Unit converter transforms raw register values into engineering units (kW, kvar).
  • Pipe: Aggregator computes 1-minute rolling averages.
  • Sink: Time-series sink writes into historian + alert sink for threshold violations.

Example B: Cold chain traceability

  • Source: MQTT ingestion receives truck sensor payloads.
  • Pipe: Geo-enrichment adds depot/route metadata.
  • Pipe: Compliance evaluator flags temperature excursions.
  • Sink: ERP connector writes compliance events; notification sink emits incidents.

Example C: Factory quality loop

  • Source: Vision system events.
  • Pipe: Defect classifier maps detections to defect codes.
  • Sink: MES sink creates non-conformance records.

Practical guidance

  • Start by choosing node roles first, then pick specific boxes.
  • Keep sources and sinks thin; put reusable logic in pipes.
  • Prefer multiple simple pipes over one monolithic pipe for easier testing and retries.
  • If a sink might be slow, rely on backpressure (see dedicated section) instead of ad-hoc in-memory buffers.

[WARNING!schedule/PIPELINE STABILITY] A "fat" pipe that does parsing, enrichment, persistence, and alerting at once becomes hard to retry safely and usually hides the true bottleneck.

Runtime references

  • platform-backend/docs/runtime-v2.md (architecture + RunNode model)
  • platform-backend/src/runner/run-node.class.ts
  • platform-backend/src/runner/run-node-manager.class.ts
  • sdk/typescript/src/flowbox-registration-info.ts
  • sdk/typescript/src/flow-element.types.ts