Skip to content

Data Ops

07DataData ops
Context Graph tribal knowledge
freshness contracts · suppressed alerts · pipeline owners
Sources
Postgres
Postgres
MongoDB
MongoDB
S3
S3
every 2h
Agentcy
Agentcy
Agentcy
Data agent + alert rules
on anomaly
Output
Slack #data
Slack #data

At a glance

The agent's job: every 2 hours, check freshness + row-count anomalies + missing files across the data stack, post to #data-platform only when something's off — with a sane suppression window so a single broken pipeline doesn't blow up the channel.

Stack

  • SQL — warehouse tables (Postgres, MySQL, Snowflake, ClickHouse, etc.).
  • MongoDB — collections, indexes, recent doc counts.
  • AWS AWS (S3) — data-lake bucket inventory, recently-arrived files.
  • Slack Slack — outbound to #data-platform.
  • Scheduled Tasks — every 2 hours.
  • Memory — record what we already alerted on, suppress duplicates.

What you'll build

A cron task that:

  1. Loads the freshness contract from a config (table → expected max(created_at) lag).
  2. Runs sql.query per table to fetch the latest timestamp + row count for the last hour.
  3. For each table, compares to the contract. If lag > SLA or row count is out of band, flag it.
  4. Same for MongoDB collections (using the { ts: -1 } index).
  5. Same for S3 prefixes (last LastModified).
  6. Before alerting, recall memory for "did we already alert on this in the last 30 min?" If yes, skip.
  7. If yes-it's-new, post to Slack with the affected pipeline + last-known-good time + suggested fix.
  8. Write the alert to memory so duplicates suppress.

Prerequisites

  • SQL connector configured for each warehouse instance.
  • MongoDB connector configured.
  • AWS with S3 read-only role for the data-lake buckets.
  • Slack configured.
  • Memory enabled (it is by default — only requires storage backend).
  • Realmdata recommended.

Step-by-step

1. Configure the connectors

text
1. Open /connectors → "+ Add Connector".
2. Pick SQL → fill connection string. Realm: data.
3. Repeat for MongoDB (connection string).
4. AWS already exists in another realm? Add a new one in realm: data
   so the audit task and data task don't share permissions.
5. Hit Test on each.
bash
# SQL warehouse
curl -X POST http://localhost:8080/api/v1/sources \
  -H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
  -d '{
    "name":"warehouse","connector":"sql","realm":"data",
    "config":{
      "url":"postgres://reader:****@warehouse.internal:5432/analytics",
      "schemas":["public","mart"]
    }
  }'

# MongoDB
curl -X POST http://localhost:8080/api/v1/sources \
  -H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
  -d '{
    "name":"mongo-events","connector":"mongodb","realm":"data",
    "config":{
      "url":"mongodb://reader:****@mongo.internal/events",
      "databases":["events","sessions"]
    }
  }'

# AWS (S3 only, scoped role)
curl -X POST http://localhost:8080/api/v1/sources \
  -H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
  -d '{
    "name":"data-lake","connector":"aws","realm":"data",
    "config":{
      "auth":{"kind":"assume_role","role_arn":"arn:aws:iam::1234:role/agentcy-data-reader"},
      "regions":["us-east-1"],
      "services":["s3"]
    }
  }'

2. Define your freshness contract

Memory is the right place for this — it's data the agent should always remember.

text
1. Open /memory → "+ New memory".
2. Kind: fact. Realm: data.
3. Text: paste the contract from the API tab below.
4. Save. The agent will recall it on every run.
bash
curl -X POST http://localhost:8080/api/v1/memory \
  -H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
  -d '{
    "kind":"fact",
    "realm":"data",
    "text":"Freshness SLA contract:\n- public.user_events: lag <= 30 min, row count last hour 1000-50000\n- public.transactions: lag <= 10 min, row count last hour > 10\n- mart.daily_revenue: lag <= 25h (daily refresh)\n- events.sessions (mongo): lag <= 5 min\n- s3://data-lake/raw/clickstream/: file every 15 min\nIf any are out of band, alert."
  }'

3. Create the freshness task

text
1. Open /tasks → "+ New Task".
2. Trigger Type: Schedule.
3. Name: data-freshness-watch. Realm: data.
4. Cron: 0 */2 * * *  (every 2 hours).
5. Task Prompt: paste the instruction from the API tab.
6. Connectors: warehouse, mongo-events, data-lake, slack.
7. Memory enabled (default).
8. Save.
bash
curl -X POST http://localhost:8080/api/v1/tasks \
  -H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
  -d '{
    "name":"data-freshness-watch",
    "agent":"default",
    "realm":"data",
    "trigger":{"kind":"schedule","cron":"0 */2 * * *"},
    "input_template":{
      "instruction":"Recall the freshness SLA contract from memory. For each table/collection/S3 prefix, run the appropriate read tool to get max(timestamp) and last-hour row count or last-modified file. Compare to SLA. For any out-of-band: check memory for an existing alert in the last 30 minutes (recall query: alert <table>) — if found, skip. Otherwise, post to Slack #data-platform with: pipeline name, expected lag, actual lag, last-known-good time, suggested next step (which Airflow DAG / Dagster job to retry). After posting, write a memory entry kind=incident summarizing the alert (so duplicates are suppressed). If everything is healthy, do nothing — silent runs are good runs."
    },
    "approval_defaults":{"write":"allow"},
    "cost_cap_usd_per_day": 8.00,
    "min_interval_secs": 30
  }'

4. Test by simulating staleness

text
1. In your warehouse, intentionally pause the loader for one table.
2. Wait until the lag exceeds SLA.
3. Open /tasks → data-freshness-watch → "Run now".
4. Confirm Slack receives one alert.
5. Run again 5 min later → Slack should NOT receive a duplicate
   (memory suppression worked).
6. Resume the loader. Next run should be silent.
bash
curl -X POST "http://localhost:8080/api/v1/tasks/$TASK_ID/run" \
  -H "authorization: Bearer $TOKEN" -d '{}'

Worked example

json
{
  "name": "data-freshness-watch",
  "agent": "default",
  "realm": "data",
  "trigger": { "kind": "schedule", "cron": "0 */2 * * *" },
  "input_template": { "instruction": "...as above..." },
  "approval_defaults": { "write": "allow" },
  "cost_cap_usd_per_day": 8.00,
  "min_interval_secs": 30,
  "max_concurrent_runs": 1
}

A scoped policy that prevents the agent from running expensive queries:

rego
package agentcy.data_ops

# Block any SQL query without LIMIT — this agent should never
# scan the warehouse end-to-end.
deny[msg] {
  input.subject.task == "data-freshness-watch"
  input.resource.connector == "sql"
  input.resource.tool == "sql.query"
  not contains(lower(input.resource.args.query), "limit")
  msg := "data-freshness-watch must use LIMIT on SQL queries"
}

# Block writes to data stores entirely
deny[msg] {
  input.subject.task == "data-freshness-watch"
  input.resource.connector in {"sql", "mongodb"}
  input.resource.tool_effect != "read"
  msg := "data-freshness-watch is read-only"
}

What good looks like

A typical alert in #data-platform:

⚠️ public.user_events behind SLA · expected 30 min, actual 2h 14m.

Last-known-good: 2026-04-25 14:02 UTC. Likely upstream: the clickstream-loader Dagster job. Same incident pattern as 2026-03-12 (root cause: Kinesis throttling — runbook in acme/runbooks/kinesis).

Action: check Dagster clickstream_loader_dag for failed runs in the last 3 hours.

A silent run looks like nothing — and that's the point. The Activity feed shows the run completed in 18s with tool_calls: 7, slack_messages: 0.

Variations

  • Per-team channels. Split the contract by team. Run multiple tasks, each scoped to one team's tables and one channel.
  • Anomaly scoring instead of strict SLA. Replace "lag > X" with "z-score > 2 over rolling 7-day window". The agent already has access to historical data via the warehouse.
  • Add Power BI for downstream impact. When a table is stale, also surface which Power BI reports depend on it (via the power-bi connector's lineage tools).
  • Auto-trigger a backfill. Risky — but with the right approval gate, the agent can call airflow.trigger_dag or whatever orchestrator you use to retry.

Troubleshooting

Slack is silent during a known outage. Check the run transcript. Most often: the agent recalled an old memory entry and suppressed the alert. Memory recall is fuzzy by similarity, not exact match — tighten the recall query in the instruction (e.g. "recall query: alert in last 30m").

Alerts are noisy after a long outage. Memory suppression works on a 30-min window. If a table stays broken for hours, the alert re-fires. Bump min_interval_secs on the task or extend the suppression window in the instruction ("suppress for 4 hours instead of 30 minutes").

SQL query returns 0 rows. The reader role doesn't have access to the table, or the schema name is wrong. The transcript shows the SQL — debug it directly in your warehouse.

S3 freshness check shows the wrong region. Default region in the source config is us-east-1. Set regions explicitly per source.

Next

Built by AgentcyLabs. For in-house deployment or Agentcy Cloud (PaaS) access, visit agentcylabs.com.