Appearance
Ingestion Pipelines

An ingestion pipeline is how Agentcy turns an external system into graph knowledge. A pipeline runs one or more connectors, normalizes their output into Neo4j nodes and edges, and schedules incremental resyncs. Pipelines are versioned, reversible, and observable.
Related concepts:
- Connectors & Tool Providers — what a connector is.
- Knowledge Graph & Realms — where pipeline output lands.
- Tasks, Workers & Workflows — how pipelines actually execute.
Anatomy of a pipeline
┌─────────────────────────────────────────────────────────────┐
│ Pipeline: "infra-weekly" │
│ realm: infrastructure schedule: 0 */6 * * * │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ GitHub │ │ Kubernetes │ │ AWS (us-east-1) │ │
│ │ repos, PRs, │ │ pods, deploys│ │ EC2, S3, IAM │ │
│ │ branches │ │ services │ │ CloudWatch │ │
│ └──────┬───────┘ └──────┬───────┘ └────────┬─────────┘ │
│ │ │ │ │
│ └────── normalize & dedupe ─────────────┘ │
│ │ │
│ ▼ │
│ Neo4j (realm=infrastructure) │
└─────────────────────────────────────────────────────────────┘Each pipeline has:
- A name (unique per org).
- A target realm — all nodes written get
realm = "<value>"so queries can be scoped. - One or more steps — each step runs a connector with its own config.
- A schedule — cron expression or
manual. - A retention policy — how long to keep superseded nodes before prune.
Lifecycle
┌─────────┐ validate ┌─────────┐ run ┌──────────┐ finalize ┌──────────┐
│ draft │─────────────▶│ ready │────────▶│ running │────────────▶│ synced │
└─────────┘ └─────────┘ └─────┬────┘ └──────────┘
│ error
▼
┌──────────┐
│ failed │──► revert to previous run
└──────────┘- validate — every connector step runs
validate_config(credentials, scopes, reachability). Invalid configs never get toready. - running — a worker claims the run. Each step produces a batch of nodes + edges, streamed to
agentcy-graphin transactions sized byPIPELINE_BATCH_SIZE. - finalize — after all steps succeed, the previous snapshot is superseded. Orphan nodes (present in the previous run, absent now) are soft-deleted (
archived_atset). - failed — nothing is published. The previous snapshot stays current. Retry or revert from the UI.
Incremental vs. full sync
Every connector declares which it supports. Most support both.
- Full sync — re-reads everything. Slower, catches drift (renames, deletions).
- Incremental sync — uses provider deltas (GitHub
since, K8s resource versions, etc.). Fast; default for scheduled runs.
The first run of a pipeline is always full. Scheduled runs are incremental unless you request a full sync from the UI or pass {"mode":"full"} to the run endpoint.
Realms prevent collision
Multiple pipelines can write to the same Neo4j database safely because every node and edge carries a realm property. Queries filter by realm (or * to cross-search). A pipeline writing to design cannot clobber a node written by the infrastructure pipeline — even if the source_id matches — because realm is part of the primary key.
Wire a pipeline
Minimum viable pipeline over the REST API:
bash
curl -X POST http://localhost:8080/api/v1/pipelines \
-H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
-d '{
"name": "github-weekly",
"realm": "development",
"schedule": "0 3 * * 1",
"steps": [{
"connector": "github",
"source_id": "gh-primary",
"mode": "incremental"
}]
}'Trigger a run:
bash
curl -X POST http://localhost:8080/api/v1/pipelines/$PIPELINE_ID/runs \
-H "authorization: Bearer $TOKEN"Stream run events (SSE) — the UI uses this for live status:
bash
curl -N http://localhost:8080/api/v1/pipelines/$PIPELINE_ID/runs/$RUN_ID/events \
-H "authorization: Bearer $TOKEN"Running without a pipeline
For one-off syncs you don't need a pipeline. POST /api/v1/sources/:id/sync runs a single connector inline (or enqueues if workers is enabled). That path is how the Connectors UI's Sync button works.
Pipelines exist when you need:
- multi-step ingestion that must succeed atomically,
- scheduling with retention + revert,
- reporting ("what changed in the last run?"),
- a named configuration that other teams can subscribe to.
Observability
For every run you get:
started_at,finished_at,status.- Per-step counts:
nodes_created,nodes_updated,nodes_archived,edges_created,errors. - A diff against the previous successful run.
- Full
tracinglogs (filter byrun_id).
The Activity feed (GET /api/v1/activity) includes one entry per run state change, so audit + operational dashboards share the same event stream.
Failure modes and retries
| Failure | Behavior |
|---|---|
| Auth error on a step | Run marked failed, no writes committed. Validate the source config. |
| Rate-limit from provider | Exponential backoff, up to PIPELINE_STEP_MAX_RETRIES (default 5). |
| Neo4j write timeout | Batch retries; if repeated, run fails. Next run retries from scratch. |
| Worker crash mid-run | Redis lease expires after WORKER_LEASE_TTL (default 300s). Another worker picks up. |
| Provider returns partial data | Succeeds; archived_at is not applied to missing nodes (safety rail). Requires a full sync to prune. |
Revert
Every successful run creates a snapshot. Revert re-applies the previous snapshot as the current one — useful when a provider fed bad data (e.g. a GitHub rename made half the repos disappear).
bash
curl -X POST http://localhost:8080/api/v1/pipelines/$PIPELINE_ID/runs/$RUN_ID/revert \
-H "authorization: Bearer $TOKEN"Revert is recorded as a new run with mode: "revert" so it shows in history.
Under the hood
- Pipelines are stored in Postgres (
pipelines,pipeline_runs,pipeline_step_results). - Steps run through
agentcy-ingest::PipelineRunner, which callsConnectorToolProvider::ingeston each source. - Graph writes go through
agentcy-graph::RealmWriter, which enforces therealmproperty and primary-key composition. - The
ConnectorEventBusfan-outs progress events to the UI over WebSocket.
Next
- How-To: Scheduled Tasks — scheduling agent runs (distinct from pipelines).
- Connectors Overview — the 26 sources available.
- Tasks, Workers & Workflows — where runs actually execute.