Skip to content

Building Custom Connectors

Agentcy's connector system is designed to be extensible. You can build custom connectors that integrate any external system by implementing two Rust traits: IngestionSource for ETL ingestion and ConnectorToolProvider for live tool access.

Architecture

Every Agentcy connector is a Rust struct that implements one or both of these traits:

┌──────────────────────────────────────────┐
│           Your Custom Connector           │
│                                           │
│  ┌─────────────────┐ ┌─────────────────┐ │
│  │ IngestionSource  │ │ConnectorTool    │ │
│  │                  │ │Provider         │ │
│  │ • validate_config│ │ • tools()       │ │
│  │ • ingest()       │ │ • execute()     │ │
│  │ • source_type()  │ │                 │ │
│  └─────────────────┘ └─────────────────┘ │
│                                           │
│  Registered in SourceRegistry (state.rs)  │
└──────────────────────────────────────────┘

Trait Definitions

IngestionSource

Handles batch ETL from the external system into the Neo4j knowledge graph.

rust
use async_trait::async_trait;
use anyhow::Result;
use serde_json::Value;

#[async_trait]
pub trait IngestionSource: Send + Sync {
    /// Validate the configuration before creating the source.
    /// Return an error with a clear message if credentials are invalid.
    async fn validate_config(&self, config: &Value) -> Result<()>;

    /// Ingest data from the source into Neo4j.
    /// Called when a sync is triggered.
    async fn ingest(
        &self,
        source: &Source,
        tx: &mut Neo4jTx,
    ) -> Result<IngestResult>;

    /// Return the source type identifier (e.g., "jira", "datadog").
    fn source_type(&self) -> &str;
}

ConnectorToolProvider

Exposes live tools that the AI agent can invoke during conversations.

rust
use async_trait::async_trait;
use anyhow::Result;
use serde_json::Value;

#[async_trait]
pub trait ConnectorToolProvider: Send + Sync {
    /// Return the list of tools this connector provides.
    /// Each tool has a name, description, and JSON Schema for its arguments.
    fn tools(&self) -> Vec<ToolDefinition>;

    /// Execute a specific tool with the given arguments.
    /// Return the result as a structured ToolResult.
    async fn execute(
        &self,
        tool_name: &str,
        args: Value,
        source: &Source,
    ) -> Result<ToolResult>;
}

Step-by-Step Guide

1. Create the Connector Crate

Start by creating a new file in the backend/crates/agentcy-ingest/src/connectors/ directory:

rust
// backend/crates/agentcy-ingest/src/connectors/jira.rs

use async_trait::async_trait;
use anyhow::{Result, bail};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{
    IngestionSource, ConnectorToolProvider,
    Source, IngestResult, ToolDefinition, ToolResult, Neo4jTx,
};

/// Configuration for the Jira connector.
#[derive(Debug, Deserialize, Serialize)]
struct JiraConfig {
    /// Jira instance URL (e.g., https://mycompany.atlassian.net)
    base_url: String,
    /// API token or PAT
    api_token: String,
    /// Email associated with the API token
    email: String,
    /// JQL filter for issue ingestion (optional)
    jql_filter: Option<String>,
    /// Maximum issues to ingest per sync
    max_issues: Option<u32>,
}

pub struct JiraConnector {
    http_client: reqwest::Client,
}

impl JiraConnector {
    pub fn new() -> Self {
        Self {
            http_client: reqwest::Client::new(),
        }
    }

    fn parse_config(config: &Value) -> Result<JiraConfig> {
        serde_json::from_value(config.clone())
            .map_err(|e| anyhow::anyhow!("Invalid Jira config: {}", e))
    }
}

2. Implement IngestionSource

rust
#[async_trait]
impl IngestionSource for JiraConnector {
    fn source_type(&self) -> &str {
        "jira"
    }

    async fn validate_config(&self, config: &Value) -> Result<()> {
        let cfg = Self::parse_config(config)?;

        // Validate required fields
        if cfg.base_url.is_empty() {
            bail!("base_url is required");
        }
        if cfg.api_token.is_empty() {
            bail!("api_token is required");
        }

        // Test the connection
        let response = self.http_client
            .get(format!("{}/rest/api/3/myself", cfg.base_url))
            .basic_auth(&cfg.email, Some(&cfg.api_token))
            .send()
            .await?;

        match response.status().as_u16() {
            200 => Ok(()),
            401 => bail!("Authentication failed. Verify your email and API token."),
            403 => bail!("Access denied. The API token lacks required permissions."),
            404 => bail!("Jira instance not found at {}. Verify the base_url.", cfg.base_url),
            status => bail!("Unexpected response from Jira: HTTP {}", status),
        }
    }

    async fn ingest(
        &self,
        source: &Source,
        tx: &mut Neo4jTx,
    ) -> Result<IngestResult> {
        let cfg = Self::parse_config(&source.config)?;
        let mut total_nodes = 0;
        let mut total_relationships = 0;

        // Fetch projects
        let projects = self.fetch_projects(&cfg).await?;
        for project in &projects {
            tx.run(
                "MERGE (p:JiraProject {key: $key})
                 SET p.name = $name, p.lead = $lead, p.source_id = $source_id",
                [
                    ("key", project.key.clone().into()),
                    ("name", project.name.clone().into()),
                    ("lead", project.lead.clone().into()),
                    ("source_id", source.id.to_string().into()),
                ],
            ).await?;
            total_nodes += 1;
        }

        // Fetch issues
        let jql = cfg.jql_filter.as_deref().unwrap_or("ORDER BY updated DESC");
        let max = cfg.max_issues.unwrap_or(1000);
        let issues = self.fetch_issues(&cfg, jql, max).await?;
        for issue in &issues {
            tx.run(
                "MERGE (i:JiraIssue {key: $key})
                 SET i.summary = $summary, i.status = $status,
                     i.assignee = $assignee, i.priority = $priority,
                     i.issue_type = $issue_type, i.created = $created,
                     i.updated = $updated, i.source_id = $source_id
                 WITH i
                 MATCH (p:JiraProject {key: $project_key})
                 MERGE (p)-[:HAS_ISSUE]->(i)",
                [
                    ("key", issue.key.clone().into()),
                    ("summary", issue.summary.clone().into()),
                    ("status", issue.status.clone().into()),
                    ("assignee", issue.assignee.clone().unwrap_or_default().into()),
                    ("priority", issue.priority.clone().into()),
                    ("issue_type", issue.issue_type.clone().into()),
                    ("created", issue.created.clone().into()),
                    ("updated", issue.updated.clone().into()),
                    ("source_id", source.id.to_string().into()),
                    ("project_key", issue.project_key.clone().into()),
                ],
            ).await?;
            total_nodes += 1;
            total_relationships += 1;
        }

        Ok(IngestResult {
            nodes_created: total_nodes,
            relationships_created: total_relationships,
            errors: vec![],
        })
    }
}

3. Implement ConnectorToolProvider

rust
#[async_trait]
impl ConnectorToolProvider for JiraConnector {
    fn tools(&self) -> Vec<ToolDefinition> {
        vec![
            ToolDefinition {
                name: "jira_search_issues".into(),
                description: "Search Jira issues using JQL".into(),
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {
                        "jql": {
                            "type": "string",
                            "description": "JQL query string"
                        },
                        "max_results": {
                            "type": "integer",
                            "description": "Maximum results to return",
                            "default": 20
                        }
                    },
                    "required": ["jql"]
                }),
            },
            ToolDefinition {
                name: "jira_get_issue".into(),
                description: "Get detailed information about a Jira issue".into(),
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {
                        "issue_key": {
                            "type": "string",
                            "description": "Issue key (e.g., PROJ-123)"
                        }
                    },
                    "required": ["issue_key"]
                }),
            },
            ToolDefinition {
                name: "jira_create_issue".into(),
                description: "Create a new Jira issue".into(),
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {
                        "project_key": { "type": "string" },
                        "summary": { "type": "string" },
                        "description": { "type": "string" },
                        "issue_type": {
                            "type": "string",
                            "enum": ["Bug", "Task", "Story", "Epic"]
                        },
                        "priority": {
                            "type": "string",
                            "enum": ["Highest", "High", "Medium", "Low", "Lowest"]
                        },
                        "assignee": { "type": "string" }
                    },
                    "required": ["project_key", "summary", "issue_type"]
                }),
            },
            ToolDefinition {
                name: "jira_list_projects".into(),
                description: "List all accessible Jira projects".into(),
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {}
                }),
            },
        ]
    }

    async fn execute(
        &self,
        tool_name: &str,
        args: Value,
        source: &Source,
    ) -> Result<ToolResult> {
        let cfg = Self::parse_config(&source.config)?;

        match tool_name {
            "jira_search_issues" => {
                let jql = args["jql"].as_str()
                    .ok_or_else(|| anyhow::anyhow!("jql is required"))?;
                let max = args["max_results"].as_u64().unwrap_or(20);
                let issues = self.fetch_issues(&cfg, jql, max as u32).await?;
                Ok(ToolResult::json(serde_json::to_value(issues)?))
            }
            "jira_get_issue" => {
                let key = args["issue_key"].as_str()
                    .ok_or_else(|| anyhow::anyhow!("issue_key is required"))?;
                let issue = self.fetch_issue(&cfg, key).await?;
                Ok(ToolResult::json(serde_json::to_value(issue)?))
            }
            "jira_create_issue" => {
                let result = self.create_issue(&cfg, &args).await?;
                Ok(ToolResult::json(result))
            }
            "jira_list_projects" => {
                let projects = self.fetch_projects(&cfg).await?;
                Ok(ToolResult::json(serde_json::to_value(projects)?))
            }
            _ => bail!("Unknown tool: {}", tool_name),
        }
    }
}

4. Register the Connector

Add the connector to the SourceRegistry in backend/crates/agentcy-api/src/state.rs:

rust
use agentcy_ingest::connectors::jira::JiraConnector;

impl AppState {
    pub fn new(/* ... */) -> Self {
        let mut registry = SourceRegistry::new();

        // ... existing connectors ...

        // Register the Jira connector
        let jira = Arc::new(JiraConnector::new());
        registry.register_ingestion("jira", jira.clone());
        registry.register_tools("jira", jira);

        // ...
    }
}

5. Add to the Connector Module

Export the connector in backend/crates/agentcy-ingest/src/connectors/mod.rs:

rust
pub mod jira;
// ... other connector modules

Best Practices

Config Validation

Always validate credentials by making a test API call in validate_config. Return clear, actionable error messages:

rust
// Good: specific, actionable error messages
bail!("Authentication failed. Verify your email ({}) and API token.", cfg.email);
bail!("Connection error: could not reach {} — check the URL and network.", cfg.base_url);

// Bad: generic errors
bail!("Invalid config");
bail!("Connection failed");

Tool Design

Design tools with the AI agent in mind:

  • Clear descriptions — the agent reads tool descriptions to decide which tool to use
  • Reasonable defaults — provide defaults for optional parameters
  • Structured output — return JSON objects, not raw strings
  • Error context — include the tool name and relevant IDs in error messages
rust
ToolDefinition {
    name: "jira_search_issues".into(),
    // Good: tells the agent exactly what this tool does and when to use it
    description: "Search Jira issues using JQL (Jira Query Language). \
                  Use this to find issues by status, assignee, project, \
                  labels, or any combination of fields.".into(),
    // ...
}

Graph Modeling

Follow these conventions for graph nodes:

  • Use MERGE, not CREATE — idempotent ingestion allows safe re-syncs
  • Set source_id — tag every node with its source for provenance tracking
  • Use descriptive labels — prefix with the system name (e.g., JiraIssue, not Issue) to avoid label collisions
  • Model relationships explicitlyHAS_ISSUE, ASSIGNED_TO, DEPENDS_ON are better than generic RELATED_TO

Error Handling

Handle API errors gracefully and continue ingestion when possible:

rust
async fn ingest(&self, source: &Source, tx: &mut Neo4jTx) -> Result<IngestResult> {
    let mut errors = vec![];

    for project in &projects {
        match self.ingest_project(project, tx).await {
            Ok(count) => total_nodes += count,
            Err(e) => {
                errors.push(format!("Failed to ingest project {}: {}", project.key, e));
                // Continue with remaining projects
            }
        }
    }

    Ok(IngestResult {
        nodes_created: total_nodes,
        relationships_created: total_relationships,
        errors,
    })
}

Testing

Write integration tests for your connector:

rust
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_config() {
        let config = serde_json::json!({
            "base_url": "https://test.atlassian.net",
            "api_token": "test-token",
            "email": "test@example.com"
        });
        let result = JiraConnector::parse_config(&config);
        assert!(result.is_ok());
    }

    #[test]
    fn test_tools_defined() {
        let connector = JiraConnector::new();
        let tools = connector.tools();
        assert_eq!(tools.len(), 4);
        assert!(tools.iter().any(|t| t.name == "jira_search_issues"));
    }

    #[tokio::test]
    async fn test_validate_config_missing_fields() {
        let connector = JiraConnector::new();
        let config = serde_json::json!({"base_url": ""});
        let result = connector.validate_config(&config).await;
        assert!(result.is_err());
    }
}

File Structure

backend/crates/agentcy-ingest/src/connectors/
├── mod.rs              ← Export your module here
├── github.rs
├── aws.rs
├── gcp.rs
├── kubernetes.rs
├── sql_connector.rs
├── mongodb.rs
├── vercel.rs
├── supabase.rs
├── openapi.rs
├── mcp.rs
├── csv.rs
├── json.rs
├── remote_exec.rs
└── jira.rs             ← Your new connector

Next Steps

  • Review existing connectors in backend/crates/agentcy-ingest/src/connectors/ for reference implementations
  • Read the Connectors Overview for the full connector architecture
  • Check the Zero-Trust Policies docs to understand how policies apply to connector tools

Built by AgentcyLabs. For in-house deployment or Agentcy Cloud (PaaS) access, visit agentcylabs.com.