Skip to content

Building Custom Connectors

Agentcy's connector system is designed to be extensible. You can build custom connectors that integrate any external system by implementing two Rust traits: IngestionSource for ETL ingestion and ConnectorToolProvider for live tool access.

Architecture

Every Agentcy connector is a Rust struct that implements one or both of these traits:

Custom connector trait architecture A custom connector is a Rust struct inside a "Your Custom Connector" container. It implements two traits side by side: IngestionSource (with validate_config, ingest, and source_type methods for ETL) and ConnectorToolProvider (with tools and execute methods for live tool access). The connector is registered with the SourceRegistry in state.rs. Custom connector architecture Implement one or both traits; register once in SourceRegistry Your Custom Connector pub struct MyConnector { … } IngestionSource ETL: batch ingest to Neo4j async fn validate_config( &self, config: &Value ) -> Result<()> async fn ingest( &self, config, writer ) -> Result<IngestStats> fn source_type( &self ) -> &'static str ConnectorToolProvider Live tools: agent tool calls async fn tools( &self, config: &Value ) -> Vec<ToolDef> async fn execute( &self, tool: &str, params: Value, config: &Value ) -> Result<Value> Registered in SourceRegistry (state.rs) register SourceRegistry state.rs — holds all registered connectors Arc<dyn IngestionSource> Arc<dyn ConnectorTool…>
A custom connector implements two traits and is registered with the SourceRegistry.

Trait Definitions

IngestionSource

Handles batch ETL from the external system into the Neo4j knowledge graph.

rust
use async_trait::async_trait;
use anyhow::Result;
use serde_json::Value;

#[async_trait]
pub trait IngestionSource: Send + Sync {
    /// Validate the configuration before creating the source.
    /// Return an error with a clear message if credentials are invalid.
    async fn validate_config(&self, config: &Value) -> Result<()>;

    /// Ingest data from the source into Neo4j.
    /// Called when a sync is triggered.
    async fn ingest(
        &self,
        source: &Source,
        tx: &mut Neo4jTx,
    ) -> Result<IngestResult>;

    /// Return the source type identifier (e.g., "jira", "datadog").
    fn source_type(&self) -> &str;
}

ConnectorToolProvider

Exposes live tools that the AI agent can invoke during conversations.

rust
use async_trait::async_trait;
use anyhow::Result;
use serde_json::Value;

#[async_trait]
pub trait ConnectorToolProvider: Send + Sync {
    /// Return the list of tools this connector provides.
    /// Each tool has a name, description, and JSON Schema for its arguments.
    fn tools(&self) -> Vec<ToolDefinition>;

    /// Execute a specific tool with the given arguments.
    /// Return the result as a structured ToolResult.
    async fn execute(
        &self,
        tool_name: &str,
        args: Value,
        source: &Source,
    ) -> Result<ToolResult>;
}

Step-by-Step Guide

1. Create the Connector Crate

Start by creating a new file in the backend/crates/agentcy-ingest/src/connectors/ directory:

rust
// backend/crates/agentcy-ingest/src/connectors/jira.rs

use async_trait::async_trait;
use anyhow::{Result, bail};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{
    IngestionSource, ConnectorToolProvider,
    Source, IngestResult, ToolDefinition, ToolResult, Neo4jTx,
};

/// Configuration for the Jira connector.
#[derive(Debug, Deserialize, Serialize)]
struct JiraConfig {
    /// Jira instance URL (e.g., https://mycompany.atlassian.net)
    base_url: String,
    /// API token or PAT
    api_token: String,
    /// Email associated with the API token
    email: String,
    /// JQL filter for issue ingestion (optional)
    jql_filter: Option<String>,
    /// Maximum issues to ingest per sync
    max_issues: Option<u32>,
}

pub struct JiraConnector {
    http_client: reqwest::Client,
}

impl JiraConnector {
    pub fn new() -> Self {
        Self {
            http_client: reqwest::Client::new(),
        }
    }

    fn parse_config(config: &Value) -> Result<JiraConfig> {
        serde_json::from_value(config.clone())
            .map_err(|e| anyhow::anyhow!("Invalid Jira config: {}", e))
    }
}

2. Implement IngestionSource

rust
#[async_trait]
impl IngestionSource for JiraConnector {
    fn source_type(&self) -> &str {
        "jira"
    }

    async fn validate_config(&self, config: &Value) -> Result<()> {
        let cfg = Self::parse_config(config)?;

        // Validate required fields
        if cfg.base_url.is_empty() {
            bail!("base_url is required");
        }
        if cfg.api_token.is_empty() {
            bail!("api_token is required");
        }

        // Test the connection
        let response = self.http_client
            .get(format!("{}/rest/api/3/myself", cfg.base_url))
            .basic_auth(&cfg.email, Some(&cfg.api_token))
            .send()
            .await?;

        match response.status().as_u16() {
            200 => Ok(()),
            401 => bail!("Authentication failed. Verify your email and API token."),
            403 => bail!("Access denied. The API token lacks required permissions."),
            404 => bail!("Jira instance not found at {}. Verify the base_url.", cfg.base_url),
            status => bail!("Unexpected response from Jira: HTTP {}", status),
        }
    }

    async fn ingest(
        &self,
        source: &Source,
        tx: &mut Neo4jTx,
    ) -> Result<IngestResult> {
        let cfg = Self::parse_config(&source.config)?;
        let mut total_nodes = 0;
        let mut total_relationships = 0;

        // Fetch projects
        let projects = self.fetch_projects(&cfg).await?;
        for project in &projects {
            tx.run(
                "MERGE (p:JiraProject {key: $key})
                 SET p.name = $name, p.lead = $lead, p.source_id = $source_id",
                [
                    ("key", project.key.clone().into()),
                    ("name", project.name.clone().into()),
                    ("lead", project.lead.clone().into()),
                    ("source_id", source.id.to_string().into()),
                ],
            ).await?;
            total_nodes += 1;
        }

        // Fetch issues
        let jql = cfg.jql_filter.as_deref().unwrap_or("ORDER BY updated DESC");
        let max = cfg.max_issues.unwrap_or(1000);
        let issues = self.fetch_issues(&cfg, jql, max).await?;
        for issue in &issues {
            tx.run(
                "MERGE (i:JiraIssue {key: $key})
                 SET i.summary = $summary, i.status = $status,
                     i.assignee = $assignee, i.priority = $priority,
                     i.issue_type = $issue_type, i.created = $created,
                     i.updated = $updated, i.source_id = $source_id
                 WITH i
                 MATCH (p:JiraProject {key: $project_key})
                 MERGE (p)-[:HAS_ISSUE]->(i)",
                [
                    ("key", issue.key.clone().into()),
                    ("summary", issue.summary.clone().into()),
                    ("status", issue.status.clone().into()),
                    ("assignee", issue.assignee.clone().unwrap_or_default().into()),
                    ("priority", issue.priority.clone().into()),
                    ("issue_type", issue.issue_type.clone().into()),
                    ("created", issue.created.clone().into()),
                    ("updated", issue.updated.clone().into()),
                    ("source_id", source.id.to_string().into()),
                    ("project_key", issue.project_key.clone().into()),
                ],
            ).await?;
            total_nodes += 1;
            total_relationships += 1;
        }

        Ok(IngestResult {
            nodes_created: total_nodes,
            relationships_created: total_relationships,
            errors: vec![],
        })
    }
}

3. Implement ConnectorToolProvider

rust
#[async_trait]
impl ConnectorToolProvider for JiraConnector {
    fn tools(&self) -> Vec<ToolDefinition> {
        vec![
            ToolDefinition {
                name: "jira_search_issues".into(),
                description: "Search Jira issues using JQL".into(),
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {
                        "jql": {
                            "type": "string",
                            "description": "JQL query string"
                        },
                        "max_results": {
                            "type": "integer",
                            "description": "Maximum results to return",
                            "default": 20
                        }
                    },
                    "required": ["jql"]
                }),
            },
            ToolDefinition {
                name: "jira_get_issue".into(),
                description: "Get detailed information about a Jira issue".into(),
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {
                        "issue_key": {
                            "type": "string",
                            "description": "Issue key (e.g., PROJ-123)"
                        }
                    },
                    "required": ["issue_key"]
                }),
            },
            ToolDefinition {
                name: "jira_create_issue".into(),
                description: "Create a new Jira issue".into(),
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {
                        "project_key": { "type": "string" },
                        "summary": { "type": "string" },
                        "description": { "type": "string" },
                        "issue_type": {
                            "type": "string",
                            "enum": ["Bug", "Task", "Story", "Epic"]
                        },
                        "priority": {
                            "type": "string",
                            "enum": ["Highest", "High", "Medium", "Low", "Lowest"]
                        },
                        "assignee": { "type": "string" }
                    },
                    "required": ["project_key", "summary", "issue_type"]
                }),
            },
            ToolDefinition {
                name: "jira_list_projects".into(),
                description: "List all accessible Jira projects".into(),
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {}
                }),
            },
        ]
    }

    async fn execute(
        &self,
        tool_name: &str,
        args: Value,
        source: &Source,
    ) -> Result<ToolResult> {
        let cfg = Self::parse_config(&source.config)?;

        match tool_name {
            "jira_search_issues" => {
                let jql = args["jql"].as_str()
                    .ok_or_else(|| anyhow::anyhow!("jql is required"))?;
                let max = args["max_results"].as_u64().unwrap_or(20);
                let issues = self.fetch_issues(&cfg, jql, max as u32).await?;
                Ok(ToolResult::json(serde_json::to_value(issues)?))
            }
            "jira_get_issue" => {
                let key = args["issue_key"].as_str()
                    .ok_or_else(|| anyhow::anyhow!("issue_key is required"))?;
                let issue = self.fetch_issue(&cfg, key).await?;
                Ok(ToolResult::json(serde_json::to_value(issue)?))
            }
            "jira_create_issue" => {
                let result = self.create_issue(&cfg, &args).await?;
                Ok(ToolResult::json(result))
            }
            "jira_list_projects" => {
                let projects = self.fetch_projects(&cfg).await?;
                Ok(ToolResult::json(serde_json::to_value(projects)?))
            }
            _ => bail!("Unknown tool: {}", tool_name),
        }
    }
}

4. Register the Connector

Add the connector to the SourceRegistry in backend/crates/agentcy-api/src/state.rs:

rust
use agentcy_ingest::connectors::jira::JiraConnector;

impl AppState {
    pub fn new(/* ... */) -> Self {
        let mut registry = SourceRegistry::new();

        // ... existing connectors ...

        // Register the Jira connector
        let jira = Arc::new(JiraConnector::new());
        registry.register_ingestion("jira", jira.clone());
        registry.register_tools("jira", jira);

        // ...
    }
}

5. Add to the Connector Module

Export the connector in backend/crates/agentcy-ingest/src/connectors/mod.rs:

rust
pub mod jira;
// ... other connector modules

Best Practices

Config Validation

Always validate credentials by making a test API call in validate_config. Return clear, actionable error messages:

rust
// Good: specific, actionable error messages
bail!("Authentication failed. Verify your email ({}) and API token.", cfg.email);
bail!("Connection error: could not reach {} — check the URL and network.", cfg.base_url);

// Bad: generic errors
bail!("Invalid config");
bail!("Connection failed");

Tool Design

Design tools with the AI agent in mind:

  • Clear descriptions — the agent reads tool descriptions to decide which tool to use
  • Reasonable defaults — provide defaults for optional parameters
  • Structured output — return JSON objects, not raw strings
  • Error context — include the tool name and relevant IDs in error messages
rust
ToolDefinition {
    name: "jira_search_issues".into(),
    // Good: tells the agent exactly what this tool does and when to use it
    description: "Search Jira issues using JQL (Jira Query Language). \
                  Use this to find issues by status, assignee, project, \
                  labels, or any combination of fields.".into(),
    // ...
}

Graph Modeling

Follow these conventions for graph nodes:

  • Use MERGE, not CREATE — idempotent ingestion allows safe re-syncs
  • Set source_id — tag every node with its source for provenance tracking
  • Use descriptive labels — prefix with the system name (e.g., JiraIssue, not Issue) to avoid label collisions
  • Model relationships explicitlyHAS_ISSUE, ASSIGNED_TO, DEPENDS_ON are better than generic RELATED_TO

Error Handling

Handle API errors gracefully and continue ingestion when possible:

rust
async fn ingest(&self, source: &Source, tx: &mut Neo4jTx) -> Result<IngestResult> {
    let mut errors = vec![];

    for project in &projects {
        match self.ingest_project(project, tx).await {
            Ok(count) => total_nodes += count,
            Err(e) => {
                errors.push(format!("Failed to ingest project {}: {}", project.key, e));
                // Continue with remaining projects
            }
        }
    }

    Ok(IngestResult {
        nodes_created: total_nodes,
        relationships_created: total_relationships,
        errors,
    })
}

Testing

Write integration tests for your connector:

rust
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_config() {
        let config = serde_json::json!({
            "base_url": "https://test.atlassian.net",
            "api_token": "test-token",
            "email": "test@example.com"
        });
        let result = JiraConnector::parse_config(&config);
        assert!(result.is_ok());
    }

    #[test]
    fn test_tools_defined() {
        let connector = JiraConnector::new();
        let tools = connector.tools();
        assert_eq!(tools.len(), 4);
        assert!(tools.iter().any(|t| t.name == "jira_search_issues"));
    }

    #[tokio::test]
    async fn test_validate_config_missing_fields() {
        let connector = JiraConnector::new();
        let config = serde_json::json!({"base_url": ""});
        let result = connector.validate_config(&config).await;
        assert!(result.is_err());
    }
}

File Structure

backend/crates/agentcy-ingest/src/connectors/
├── mod.rs              ← Export your module here
├── github.rs
├── aws.rs
├── gcp.rs
├── kubernetes.rs
├── sql_connector.rs
├── mongodb.rs
├── vercel.rs
├── supabase.rs
├── openapi.rs
├── mcp.rs
├── csv.rs
├── json.rs
├── remote_exec.rs
└── jira.rs             ← Your new connector

Next Steps

  • Review existing connectors in backend/crates/agentcy-ingest/src/connectors/ for reference implementations
  • Read the Connectors Overview for the full connector architecture
  • Check the Zero-Trust Policies docs to understand how policies apply to connector tools

Built by AgentcyLabs. For in-house deployment or Agentcy Cloud (PaaS) access, visit agentcylabs.com.