Pipelines
A flow describes how to transform a document; a pipeline describes where the data comes
from and goes. weavster run executes pipelines: read a source, transform with a flow,
write a sink.
Pipelines live one-per-file in pipelines/, alongside flows/ and fixtures/:
# pipelines/order.yaml
source:
type: file
path: in/order.json
flow: order # flows/order.yaml
sink:
type: file
path: out/order.json
weavster run order # run pipelines/order.yaml
weavster run # run every pipeline
Connectors
A source and sink each have a type:
| Type | As source | As sink |
|---|---|---|
file | read path | write path (creates dirs) |
stdin | read process stdin | — |
stdout | — | write process stdout |
source: { type: stdin, format: json }
flow: order
sink: { type: stdout }
(Network connectors such as REST and SFTP will land later on the same shape.)
How run works
A source yields a stream of documents, and run processes each one as it arrives —
parse → flow → serialize → sink — staying live until the source closes:
- A
filesource is bounded: it yields the whole file as one document, then ends, so the loop runs once andrunexits. - A
stdinsource is unbounded and line-delimited: each non-empty line is one document, processed as it arrives.runblocks for the next line and exits at end-of-stream. (Pipe newline-delimited JSON:cat orders.ndjson | weavster run orders.)
This is the seam for always-on connectors later (REST/SFTP/queues) — same loop, unbounded source.
Formats
The source format picks the parser, the sink format picks the serializer — so a pipeline can convert formats (XML in, JSON out).
- Source
file— inferred from the path extension (.json/.xml); setformat:to override. - Source
stdin—format:is required (no extension to infer). - Sink — defaults to the source format; a
filesink with a recognized extension uses that; an explicitformat:always wins.
A file sink overwrites its path. Converting to XML requires the document to have a single
root element (see the Format Packs limitations).
Errors
Errors are split by when they happen:
- Startup (pipeline not found / schema-invalid, source fails to open) aborts the pipeline before the loop and exits non-zero.
- Per-document (parse, transform, or write failure for one document) is scoped to that
document. On a bounded
filesource that single failure fails the run; on an unbounded stream it is logged and the loop continues to the next document.
run reports which pipeline, which document, and which stage failed.
Validation
weavster validate checks every pipelines/*.yaml against the pipeline schema, alongside
weavster.yaml and your flows.