Skip to content

Ingestion Pipelines

Ingest page — a list of configured pipelines with last-run status, next-run schedule, and per-step counts.

An ingestion pipeline is how Agentcy turns an external system into graph knowledge. A pipeline runs one or more connectors, normalizes their output into Context Engine nodes and edges, and schedules incremental resyncs. Pipelines are versioned, reversible, and observable.

Two flavors of pipeline

This page describes the original, connector-driven pipelines that run on a schedule. Agentcy also ships a newer, continuous-ingest pipeline model — webhook URLs, S3 file drops, and (soon) OTLP / Kafka — exposed under Explore → Pipelines and /api/v1/context/pipelines. Both write through the same Context Engine and share the realm/run-history model. See the dedicated Pipelines how-to section for the continuous flavor.

Related concepts:

Pipeline run: connector → validate → fetch → normalize → RealmWriter → Context Engine + Postgres; plus the draft → ready → running → synced/failed state machine with revert path.

Anatomy of a pipeline

Ingestion pipeline anatomy Three connectors (GitHub, Kubernetes, AWS) feed in parallel into a normalize-and-dedupe step, which writes realm-scoped nodes into Neo4j. Connectors Parallel ingestion sources GitHub repos · PRs · branches Kubernetes pods · deploys · services AWS EC2 · S3 · IAM · CloudWatch Normalize & Dedupe schema-map · merge · diff realm-scoped primary keys write batch Neo4j (realm-scoped) nodes · edges · snapshots RealmWriter enforces realm=…
Connectors feed a normalize-and-dedupe step that writes realm-scoped nodes into the Context Engine.

Each pipeline has:

  • A name (unique per org).
  • A target realm — all nodes written get realm = "<value>" so queries can be scoped.
  • One or more steps — each step runs a connector with its own config.
  • A schedule — cron expression or manual.
  • A retention policy — how long to keep superseded nodes before prune.

Lifecycle

Ingestion pipeline state machine Pipelines progress: draft → ready → running. Running has two outcomes: synced (success) or failed → revert (error path). Pipeline lifecycle states Happy path (top row) and error path (bottom branch from running) draft config pending validate ready config valid run running worker active finalize (success) synced snapshot current error failed no writes published revert revert prior snapshot restored
Pipeline lifecycle states and transitions.
  • validate — every connector step runs validate_config (credentials, scopes, reachability). Invalid configs never get to ready.
  • running — a worker claims the run. Each step produces a batch of nodes + edges, streamed to agentcy-graph in transactions sized by PIPELINE_BATCH_SIZE.
  • finalize — after all steps succeed, the previous snapshot is superseded. Orphan nodes (present in the previous run, absent now) are soft-deleted (archived_at set).
  • failed — nothing is published. The previous snapshot stays current. Retry or revert from the UI.

Incremental vs. full sync

Every connector declares which it supports. Most support both.

  • Full sync — re-reads everything. Slower, catches drift (renames, deletions).
  • Incremental sync — uses provider deltas (GitHub since, K8s resource versions, etc.). Fast; default for scheduled runs.

The first run of a pipeline is always full. Scheduled runs are incremental unless you request a full sync from the UI or pass {"mode":"full"} to the run endpoint.

Realms prevent collision

Multiple pipelines can write to the same Context Engine safely because every node and edge carries a realm property. Queries filter by realm (or * to cross-search). A pipeline writing to design cannot clobber a node written by the infrastructure pipeline — even if the source_id matches — because realm is part of the primary key.

See Knowledge Graph & Realms.

Wire a pipeline

Minimum viable pipeline over the REST API:

bash
curl -X POST http://localhost:8080/api/v1/pipelines \
  -H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
  -d '{
    "name": "github-weekly",
    "realm": "development",
    "schedule": "0 3 * * 1",
    "steps": [{
      "connector": "github",
      "source_id": "gh-primary",
      "mode": "incremental"
    }]
  }'

Trigger a run:

bash
curl -X POST http://localhost:8080/api/v1/pipelines/$PIPELINE_ID/runs \
  -H "authorization: Bearer $TOKEN"

Stream run events (SSE) — the UI uses this for live status:

bash
curl -N http://localhost:8080/api/v1/pipelines/$PIPELINE_ID/runs/$RUN_ID/events \
  -H "authorization: Bearer $TOKEN"

Running without a pipeline

For one-off syncs you don't need a pipeline. POST /api/v1/sources/:id/sync runs a single connector inline (or enqueues if workers is enabled). That path is how the Connectors UI's Sync button works.

Pipelines exist when you need:

  • multi-step ingestion that must succeed atomically,
  • scheduling with retention + revert,
  • reporting ("what changed in the last run?"),
  • a named configuration that other teams can subscribe to.

Observability

For every run you get:

  • started_at, finished_at, status.
  • Per-step counts: nodes_created, nodes_updated, nodes_archived, edges_created, errors.
  • A diff against the previous successful run.
  • Full tracing logs (filter by run_id).

The Activity feed (GET /api/v1/activity) includes one entry per run state change, so audit + operational dashboards share the same event stream.

Failure modes and retries

FailureBehavior
Auth error on a stepRun marked failed, no writes committed. Validate the source config.
Rate-limit from providerExponential backoff, up to PIPELINE_STEP_MAX_RETRIES (default 5).
Context Engine write timeoutBatch retries; if repeated, run fails. Next run retries from scratch.
Worker crash mid-runRedis lease expires after WORKER_LEASE_TTL (default 300s). Another worker picks up.
Provider returns partial dataSucceeds; archived_at is not applied to missing nodes (safety rail). Requires a full sync to prune.

Revert

Every successful run creates a snapshot. Revert re-applies the previous snapshot as the current one — useful when a provider fed bad data (e.g. a GitHub rename made half the repos disappear).

bash
curl -X POST http://localhost:8080/api/v1/pipelines/$PIPELINE_ID/runs/$RUN_ID/revert \
  -H "authorization: Bearer $TOKEN"

Revert is recorded as a new run with mode: "revert" so it shows in history.

Under the hood

  • Pipelines are stored in Postgres (pipelines, pipeline_runs, pipeline_step_results).
  • Steps run through agentcy-ingest::PipelineRunner, which calls ConnectorToolProvider::ingest on each source.
  • Graph writes go through agentcy-graph::RealmWriter, which enforces the realm property and primary-key composition.
  • The ConnectorEventBus fan-outs progress events to the UI over WebSocket.

Next

Built by AgentcyLabs. For in-house deployment or Agentcy Cloud (PaaS) access, visit agentcylabs.com.