Appearance
Building Custom Connectors
Agentcy's connector system is designed to be extensible. You can build custom connectors that integrate any external system by implementing two Rust traits: IngestionSource for ETL ingestion and ConnectorToolProvider for live tool access.
Architecture
Every Agentcy connector is a Rust struct that implements one or both of these traits:
┌──────────────────────────────────────────┐
│ Your Custom Connector │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ IngestionSource │ │ConnectorTool │ │
│ │ │ │Provider │ │
│ │ • validate_config│ │ • tools() │ │
│ │ • ingest() │ │ • execute() │ │
│ │ • source_type() │ │ │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ Registered in SourceRegistry (state.rs) │
└──────────────────────────────────────────┘Trait Definitions
IngestionSource
Handles batch ETL from the external system into the Neo4j knowledge graph.
rust
use async_trait::async_trait;
use anyhow::Result;
use serde_json::Value;
#[async_trait]
pub trait IngestionSource: Send + Sync {
/// Validate the configuration before creating the source.
/// Return an error with a clear message if credentials are invalid.
async fn validate_config(&self, config: &Value) -> Result<()>;
/// Ingest data from the source into Neo4j.
/// Called when a sync is triggered.
async fn ingest(
&self,
source: &Source,
tx: &mut Neo4jTx,
) -> Result<IngestResult>;
/// Return the source type identifier (e.g., "jira", "datadog").
fn source_type(&self) -> &str;
}ConnectorToolProvider
Exposes live tools that the AI agent can invoke during conversations.
rust
use async_trait::async_trait;
use anyhow::Result;
use serde_json::Value;
#[async_trait]
pub trait ConnectorToolProvider: Send + Sync {
/// Return the list of tools this connector provides.
/// Each tool has a name, description, and JSON Schema for its arguments.
fn tools(&self) -> Vec<ToolDefinition>;
/// Execute a specific tool with the given arguments.
/// Return the result as a structured ToolResult.
async fn execute(
&self,
tool_name: &str,
args: Value,
source: &Source,
) -> Result<ToolResult>;
}Step-by-Step Guide
1. Create the Connector Crate
Start by creating a new file in the backend/crates/agentcy-ingest/src/connectors/ directory:
rust
// backend/crates/agentcy-ingest/src/connectors/jira.rs
use async_trait::async_trait;
use anyhow::{Result, bail};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{
IngestionSource, ConnectorToolProvider,
Source, IngestResult, ToolDefinition, ToolResult, Neo4jTx,
};
/// Configuration for the Jira connector.
#[derive(Debug, Deserialize, Serialize)]
struct JiraConfig {
/// Jira instance URL (e.g., https://mycompany.atlassian.net)
base_url: String,
/// API token or PAT
api_token: String,
/// Email associated with the API token
email: String,
/// JQL filter for issue ingestion (optional)
jql_filter: Option<String>,
/// Maximum issues to ingest per sync
max_issues: Option<u32>,
}
pub struct JiraConnector {
http_client: reqwest::Client,
}
impl JiraConnector {
pub fn new() -> Self {
Self {
http_client: reqwest::Client::new(),
}
}
fn parse_config(config: &Value) -> Result<JiraConfig> {
serde_json::from_value(config.clone())
.map_err(|e| anyhow::anyhow!("Invalid Jira config: {}", e))
}
}2. Implement IngestionSource
rust
#[async_trait]
impl IngestionSource for JiraConnector {
fn source_type(&self) -> &str {
"jira"
}
async fn validate_config(&self, config: &Value) -> Result<()> {
let cfg = Self::parse_config(config)?;
// Validate required fields
if cfg.base_url.is_empty() {
bail!("base_url is required");
}
if cfg.api_token.is_empty() {
bail!("api_token is required");
}
// Test the connection
let response = self.http_client
.get(format!("{}/rest/api/3/myself", cfg.base_url))
.basic_auth(&cfg.email, Some(&cfg.api_token))
.send()
.await?;
match response.status().as_u16() {
200 => Ok(()),
401 => bail!("Authentication failed. Verify your email and API token."),
403 => bail!("Access denied. The API token lacks required permissions."),
404 => bail!("Jira instance not found at {}. Verify the base_url.", cfg.base_url),
status => bail!("Unexpected response from Jira: HTTP {}", status),
}
}
async fn ingest(
&self,
source: &Source,
tx: &mut Neo4jTx,
) -> Result<IngestResult> {
let cfg = Self::parse_config(&source.config)?;
let mut total_nodes = 0;
let mut total_relationships = 0;
// Fetch projects
let projects = self.fetch_projects(&cfg).await?;
for project in &projects {
tx.run(
"MERGE (p:JiraProject {key: $key})
SET p.name = $name, p.lead = $lead, p.source_id = $source_id",
[
("key", project.key.clone().into()),
("name", project.name.clone().into()),
("lead", project.lead.clone().into()),
("source_id", source.id.to_string().into()),
],
).await?;
total_nodes += 1;
}
// Fetch issues
let jql = cfg.jql_filter.as_deref().unwrap_or("ORDER BY updated DESC");
let max = cfg.max_issues.unwrap_or(1000);
let issues = self.fetch_issues(&cfg, jql, max).await?;
for issue in &issues {
tx.run(
"MERGE (i:JiraIssue {key: $key})
SET i.summary = $summary, i.status = $status,
i.assignee = $assignee, i.priority = $priority,
i.issue_type = $issue_type, i.created = $created,
i.updated = $updated, i.source_id = $source_id
WITH i
MATCH (p:JiraProject {key: $project_key})
MERGE (p)-[:HAS_ISSUE]->(i)",
[
("key", issue.key.clone().into()),
("summary", issue.summary.clone().into()),
("status", issue.status.clone().into()),
("assignee", issue.assignee.clone().unwrap_or_default().into()),
("priority", issue.priority.clone().into()),
("issue_type", issue.issue_type.clone().into()),
("created", issue.created.clone().into()),
("updated", issue.updated.clone().into()),
("source_id", source.id.to_string().into()),
("project_key", issue.project_key.clone().into()),
],
).await?;
total_nodes += 1;
total_relationships += 1;
}
Ok(IngestResult {
nodes_created: total_nodes,
relationships_created: total_relationships,
errors: vec![],
})
}
}3. Implement ConnectorToolProvider
rust
#[async_trait]
impl ConnectorToolProvider for JiraConnector {
fn tools(&self) -> Vec<ToolDefinition> {
vec![
ToolDefinition {
name: "jira_search_issues".into(),
description: "Search Jira issues using JQL".into(),
parameters: serde_json::json!({
"type": "object",
"properties": {
"jql": {
"type": "string",
"description": "JQL query string"
},
"max_results": {
"type": "integer",
"description": "Maximum results to return",
"default": 20
}
},
"required": ["jql"]
}),
},
ToolDefinition {
name: "jira_get_issue".into(),
description: "Get detailed information about a Jira issue".into(),
parameters: serde_json::json!({
"type": "object",
"properties": {
"issue_key": {
"type": "string",
"description": "Issue key (e.g., PROJ-123)"
}
},
"required": ["issue_key"]
}),
},
ToolDefinition {
name: "jira_create_issue".into(),
description: "Create a new Jira issue".into(),
parameters: serde_json::json!({
"type": "object",
"properties": {
"project_key": { "type": "string" },
"summary": { "type": "string" },
"description": { "type": "string" },
"issue_type": {
"type": "string",
"enum": ["Bug", "Task", "Story", "Epic"]
},
"priority": {
"type": "string",
"enum": ["Highest", "High", "Medium", "Low", "Lowest"]
},
"assignee": { "type": "string" }
},
"required": ["project_key", "summary", "issue_type"]
}),
},
ToolDefinition {
name: "jira_list_projects".into(),
description: "List all accessible Jira projects".into(),
parameters: serde_json::json!({
"type": "object",
"properties": {}
}),
},
]
}
async fn execute(
&self,
tool_name: &str,
args: Value,
source: &Source,
) -> Result<ToolResult> {
let cfg = Self::parse_config(&source.config)?;
match tool_name {
"jira_search_issues" => {
let jql = args["jql"].as_str()
.ok_or_else(|| anyhow::anyhow!("jql is required"))?;
let max = args["max_results"].as_u64().unwrap_or(20);
let issues = self.fetch_issues(&cfg, jql, max as u32).await?;
Ok(ToolResult::json(serde_json::to_value(issues)?))
}
"jira_get_issue" => {
let key = args["issue_key"].as_str()
.ok_or_else(|| anyhow::anyhow!("issue_key is required"))?;
let issue = self.fetch_issue(&cfg, key).await?;
Ok(ToolResult::json(serde_json::to_value(issue)?))
}
"jira_create_issue" => {
let result = self.create_issue(&cfg, &args).await?;
Ok(ToolResult::json(result))
}
"jira_list_projects" => {
let projects = self.fetch_projects(&cfg).await?;
Ok(ToolResult::json(serde_json::to_value(projects)?))
}
_ => bail!("Unknown tool: {}", tool_name),
}
}
}4. Register the Connector
Add the connector to the SourceRegistry in backend/crates/agentcy-api/src/state.rs:
rust
use agentcy_ingest::connectors::jira::JiraConnector;
impl AppState {
pub fn new(/* ... */) -> Self {
let mut registry = SourceRegistry::new();
// ... existing connectors ...
// Register the Jira connector
let jira = Arc::new(JiraConnector::new());
registry.register_ingestion("jira", jira.clone());
registry.register_tools("jira", jira);
// ...
}
}5. Add to the Connector Module
Export the connector in backend/crates/agentcy-ingest/src/connectors/mod.rs:
rust
pub mod jira;
// ... other connector modulesBest Practices
Config Validation
Always validate credentials by making a test API call in validate_config. Return clear, actionable error messages:
rust
// Good: specific, actionable error messages
bail!("Authentication failed. Verify your email ({}) and API token.", cfg.email);
bail!("Connection error: could not reach {} — check the URL and network.", cfg.base_url);
// Bad: generic errors
bail!("Invalid config");
bail!("Connection failed");Tool Design
Design tools with the AI agent in mind:
- Clear descriptions — the agent reads tool descriptions to decide which tool to use
- Reasonable defaults — provide defaults for optional parameters
- Structured output — return JSON objects, not raw strings
- Error context — include the tool name and relevant IDs in error messages
rust
ToolDefinition {
name: "jira_search_issues".into(),
// Good: tells the agent exactly what this tool does and when to use it
description: "Search Jira issues using JQL (Jira Query Language). \
Use this to find issues by status, assignee, project, \
labels, or any combination of fields.".into(),
// ...
}Graph Modeling
Follow these conventions for graph nodes:
- Use MERGE, not CREATE — idempotent ingestion allows safe re-syncs
- Set
source_id— tag every node with its source for provenance tracking - Use descriptive labels — prefix with the system name (e.g.,
JiraIssue, notIssue) to avoid label collisions - Model relationships explicitly —
HAS_ISSUE,ASSIGNED_TO,DEPENDS_ONare better than genericRELATED_TO
Error Handling
Handle API errors gracefully and continue ingestion when possible:
rust
async fn ingest(&self, source: &Source, tx: &mut Neo4jTx) -> Result<IngestResult> {
let mut errors = vec![];
for project in &projects {
match self.ingest_project(project, tx).await {
Ok(count) => total_nodes += count,
Err(e) => {
errors.push(format!("Failed to ingest project {}: {}", project.key, e));
// Continue with remaining projects
}
}
}
Ok(IngestResult {
nodes_created: total_nodes,
relationships_created: total_relationships,
errors,
})
}Testing
Write integration tests for your connector:
rust
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_config() {
let config = serde_json::json!({
"base_url": "https://test.atlassian.net",
"api_token": "test-token",
"email": "test@example.com"
});
let result = JiraConnector::parse_config(&config);
assert!(result.is_ok());
}
#[test]
fn test_tools_defined() {
let connector = JiraConnector::new();
let tools = connector.tools();
assert_eq!(tools.len(), 4);
assert!(tools.iter().any(|t| t.name == "jira_search_issues"));
}
#[tokio::test]
async fn test_validate_config_missing_fields() {
let connector = JiraConnector::new();
let config = serde_json::json!({"base_url": ""});
let result = connector.validate_config(&config).await;
assert!(result.is_err());
}
}File Structure
backend/crates/agentcy-ingest/src/connectors/
├── mod.rs ← Export your module here
├── github.rs
├── aws.rs
├── gcp.rs
├── kubernetes.rs
├── sql_connector.rs
├── mongodb.rs
├── vercel.rs
├── supabase.rs
├── openapi.rs
├── mcp.rs
├── csv.rs
├── json.rs
├── remote_exec.rs
└── jira.rs ← Your new connectorNext Steps
- Review existing connectors in
backend/crates/agentcy-ingest/src/connectors/for reference implementations - Read the Connectors Overview for the full connector architecture
- Check the Zero-Trust Policies docs to understand how policies apply to connector tools