Appearance
Data Ops
07DataData ops
Context Graph tribal knowledge
freshness contracts · suppressed alerts · pipeline owners
Sources
Postgres
MongoDB
S3
every 2h
Agentcy
Agentcy
Data agent + alert rules
on anomaly
Output
Slack #data
At a glance
The agent's job: every 2 hours, check freshness + row-count anomalies + missing files across the data stack, post to #data-platform only when something's off — with a sane suppression window so a single broken pipeline doesn't blow up the channel.
Stack
- SQL — warehouse tables (Postgres, MySQL, Snowflake, ClickHouse, etc.).
- MongoDB — collections, indexes, recent doc counts.
AWS (S3) — data-lake bucket inventory, recently-arrived files.
Slack — outbound to
#data-platform.- Scheduled Tasks — every 2 hours.
- Memory — record what we already alerted on, suppress duplicates.
What you'll build
A cron task that:
- Loads the freshness contract from a config (table → expected
max(created_at)lag). - Runs
sql.queryper table to fetch the latest timestamp + row count for the last hour. - For each table, compares to the contract. If lag > SLA or row count is out of band, flag it.
- Same for MongoDB collections (using the
{ ts: -1 }index). - Same for S3 prefixes (last
LastModified). - Before alerting, recall memory for "did we already alert on this in the last 30 min?" If yes, skip.
- If yes-it's-new, post to Slack with the affected pipeline + last-known-good time + suggested fix.
- Write the alert to memory so duplicates suppress.
Prerequisites
- SQL connector configured for each warehouse instance.
- MongoDB connector configured.
- AWS with S3 read-only role for the data-lake buckets.
- Slack configured.
- Memory enabled (it is by default — only requires storage backend).
- Realm —
datarecommended.
Step-by-step
1. Configure the connectors
text
1. Open /connectors → "+ Add Connector".
2. Pick SQL → fill connection string. Realm: data.
3. Repeat for MongoDB (connection string).
4. AWS already exists in another realm? Add a new one in realm: data
so the audit task and data task don't share permissions.
5. Hit Test on each.bash
# SQL warehouse
curl -X POST http://localhost:8080/api/v1/sources \
-H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
-d '{
"name":"warehouse","connector":"sql","realm":"data",
"config":{
"url":"postgres://reader:****@warehouse.internal:5432/analytics",
"schemas":["public","mart"]
}
}'
# MongoDB
curl -X POST http://localhost:8080/api/v1/sources \
-H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
-d '{
"name":"mongo-events","connector":"mongodb","realm":"data",
"config":{
"url":"mongodb://reader:****@mongo.internal/events",
"databases":["events","sessions"]
}
}'
# AWS (S3 only, scoped role)
curl -X POST http://localhost:8080/api/v1/sources \
-H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
-d '{
"name":"data-lake","connector":"aws","realm":"data",
"config":{
"auth":{"kind":"assume_role","role_arn":"arn:aws:iam::1234:role/agentcy-data-reader"},
"regions":["us-east-1"],
"services":["s3"]
}
}'2. Define your freshness contract
Memory is the right place for this — it's data the agent should always remember.
text
1. Open /memory → "+ New memory".
2. Kind: fact. Realm: data.
3. Text: paste the contract from the API tab below.
4. Save. The agent will recall it on every run.bash
curl -X POST http://localhost:8080/api/v1/memory \
-H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
-d '{
"kind":"fact",
"realm":"data",
"text":"Freshness SLA contract:\n- public.user_events: lag <= 30 min, row count last hour 1000-50000\n- public.transactions: lag <= 10 min, row count last hour > 10\n- mart.daily_revenue: lag <= 25h (daily refresh)\n- events.sessions (mongo): lag <= 5 min\n- s3://data-lake/raw/clickstream/: file every 15 min\nIf any are out of band, alert."
}'3. Create the freshness task
text
1. Open /tasks → "+ New Task".
2. Trigger Type: Schedule.
3. Name: data-freshness-watch. Realm: data.
4. Cron: 0 */2 * * * (every 2 hours).
5. Task Prompt: paste the instruction from the API tab.
6. Connectors: warehouse, mongo-events, data-lake, slack.
7. Memory enabled (default).
8. Save.bash
curl -X POST http://localhost:8080/api/v1/tasks \
-H "authorization: Bearer $TOKEN" -H 'content-type: application/json' \
-d '{
"name":"data-freshness-watch",
"agent":"default",
"realm":"data",
"trigger":{"kind":"schedule","cron":"0 */2 * * *"},
"input_template":{
"instruction":"Recall the freshness SLA contract from memory. For each table/collection/S3 prefix, run the appropriate read tool to get max(timestamp) and last-hour row count or last-modified file. Compare to SLA. For any out-of-band: check memory for an existing alert in the last 30 minutes (recall query: alert <table>) — if found, skip. Otherwise, post to Slack #data-platform with: pipeline name, expected lag, actual lag, last-known-good time, suggested next step (which Airflow DAG / Dagster job to retry). After posting, write a memory entry kind=incident summarizing the alert (so duplicates are suppressed). If everything is healthy, do nothing — silent runs are good runs."
},
"approval_defaults":{"write":"allow"},
"cost_cap_usd_per_day": 8.00,
"min_interval_secs": 30
}'4. Test by simulating staleness
text
1. In your warehouse, intentionally pause the loader for one table.
2. Wait until the lag exceeds SLA.
3. Open /tasks → data-freshness-watch → "Run now".
4. Confirm Slack receives one alert.
5. Run again 5 min later → Slack should NOT receive a duplicate
(memory suppression worked).
6. Resume the loader. Next run should be silent.bash
curl -X POST "http://localhost:8080/api/v1/tasks/$TASK_ID/run" \
-H "authorization: Bearer $TOKEN" -d '{}'Worked example
json
{
"name": "data-freshness-watch",
"agent": "default",
"realm": "data",
"trigger": { "kind": "schedule", "cron": "0 */2 * * *" },
"input_template": { "instruction": "...as above..." },
"approval_defaults": { "write": "allow" },
"cost_cap_usd_per_day": 8.00,
"min_interval_secs": 30,
"max_concurrent_runs": 1
}A scoped policy that prevents the agent from running expensive queries:
rego
package agentcy.data_ops
# Block any SQL query without LIMIT — this agent should never
# scan the warehouse end-to-end.
deny[msg] {
input.subject.task == "data-freshness-watch"
input.resource.connector == "sql"
input.resource.tool == "sql.query"
not contains(lower(input.resource.args.query), "limit")
msg := "data-freshness-watch must use LIMIT on SQL queries"
}
# Block writes to data stores entirely
deny[msg] {
input.subject.task == "data-freshness-watch"
input.resource.connector in {"sql", "mongodb"}
input.resource.tool_effect != "read"
msg := "data-freshness-watch is read-only"
}What good looks like
A typical alert in #data-platform:
⚠️ public.user_events behind SLA · expected 30 min, actual 2h 14m.
Last-known-good: 2026-04-25 14:02 UTC. Likely upstream: the
clickstream-loaderDagster job. Same incident pattern as 2026-03-12 (root cause: Kinesis throttling — runbook in acme/runbooks/kinesis).Action: check Dagster
clickstream_loader_dagfor failed runs in the last 3 hours.
A silent run looks like nothing — and that's the point. The Activity feed shows the run completed in 18s with tool_calls: 7, slack_messages: 0.
Variations
- Per-team channels. Split the contract by team. Run multiple tasks, each scoped to one team's tables and one channel.
- Anomaly scoring instead of strict SLA. Replace "lag > X" with "z-score > 2 over rolling 7-day window". The agent already has access to historical data via the warehouse.
- Add Power BI for downstream impact. When a table is stale, also surface which Power BI reports depend on it (via the
power-biconnector's lineage tools). - Auto-trigger a backfill. Risky — but with the right approval gate, the agent can call
airflow.trigger_dagor whatever orchestrator you use to retry.
Troubleshooting
Slack is silent during a known outage. Check the run transcript. Most often: the agent recalled an old memory entry and suppressed the alert. Memory recall is fuzzy by similarity, not exact match — tighten the recall query in the instruction (e.g. "recall query: alert in last 30m").
Alerts are noisy after a long outage. Memory suppression works on a 30-min window. If a table stays broken for hours, the alert re-fires. Bump min_interval_secs on the task or extend the suppression window in the instruction ("suppress for 4 hours instead of 30 minutes").
SQL query returns 0 rows. The reader role doesn't have access to the table, or the schema name is wrong. The transcript shows the SQL — debug it directly in your warehouse.
S3 freshness check shows the wrong region. Default region in the source config is us-east-1. Set regions explicitly per source.
Next
- Concept: Memory System — why suppression works.
- Concept: Tasks, Workers & Workflows
- SQL connector — query patterns.
- MongoDB connector