Cymantis
← Back to posts

The Agentic Engineering Playbook: Building Custom Security Agents for Enterprise

An end-to-end guide to designing, building, testing, and deploying custom security automation agents — from agent architecture patterns and tool integration to governance guardrails and production observability.

Agentic EngineeringSecurity AutomationPythonAgent DevelopmentEnterpriseCymantis

The Agentic Engineering Playbook: Building Custom Security Agents for Enterprise

By Cymantis Labs

The security industry is entering the agentic era. Large language models, tool-use frameworks, and orchestration libraries have matured to the point where autonomous software agents can triage alerts, investigate incidents, validate compliance, and draft response actions — all without a human touching a keyboard until the final approval step.

But here's the uncomfortable truth that the vendor ecosystem won't tell you: the real competitive advantage doesn't come from buying an agentic product off the shelf. It comes from building custom agents that are tuned to your environment — your network topology, your compliance obligations, your tool stack, your threat model, and your operational playbooks.

Off-the-shelf agentic solutions give you 80% of the capability with 20% of the effort. That last 20% — the part where an agent understands that your PCI segment has different escalation rules than your corporate network, or that your CrowdStrike instance uses custom response policies that don't map to any vendor's default playbook — is where custom engineering separates the leaders from the followers.

This playbook is the engineering guide we wish existed when we started building security agents at Cymantis Labs. It covers architecture patterns, tool integration, prompt engineering, testing, deployment, and governance. Every code example is production-tested. Every pattern has been deployed against real enterprise workloads.

Let's build.


Why Custom Security Agents?

The Off-the-Shelf Ceiling

Vendor-provided agentic solutions — Microsoft Copilot for Security, CrowdStrike Charlotte AI, Google Gemini in Chronicle — are impressive. They provide natural-language interfaces over proprietary telemetry and can answer questions, generate queries, and summarize incidents.

But they share a common limitation: they are general-purpose agents operating inside walled gardens. They know their own data and their own APIs. They don't know:

  • Your custom Splunk macros and lookup tables that encode years of institutional knowledge
  • Your internal asset criticality scoring derived from CMDB, business unit ownership, and data classification
  • Your compliance crosswalk that maps CIS benchmarks to NIST 800-53 controls to your specific POA&M items
  • Your escalation policies that vary by business unit, time of day, asset type, and threat severity
  • Your custom detection logic in Sigma rules, YARA signatures, or proprietary analytics

The Custom Agent Advantage

A custom agent is purpose-built software that embeds your organization's specific knowledge, connects to your exact tool stack, and follows your operational doctrine. The advantages are concrete:

  1. Tool Integration Depth: Direct API access to every tool in your stack — SIEM, EDR, SOAR, CMDB, ticketing, threat intel, vulnerability management — through purpose-built connectors that understand your data models.

  2. Institutional Knowledge Encoding: System prompts and retrieval-augmented generation (RAG) pipelines that encode your SOPs, escalation matrices, and tribal knowledge.

  3. Compliance Alignment: Agents that generate audit-ready artifacts by default because they're designed against your specific regulatory framework.

  4. Cost Control: Self-hosted or API-based inference with transparent token economics, rather than opaque per-seat pricing that scales with headcount.

  5. Evolutionary Architecture: You own the code. You control the iteration speed. When a new threat vector emerges, you don't file a feature request — you ship a patch.

Pro Tip: Start with the gap analysis. List every manual, repetitive decision your security team makes daily. Rank them by frequency, cognitive complexity, and data availability. The sweet spot for your first custom agent is high frequency, moderate complexity, and high data availability — typically alert triage, compliance validation, or access review.


Agent Architecture Patterns

Not every security use case demands the same architecture. The pattern you choose determines the agent's reasoning depth, latency, cost, and failure modes. Here are the four patterns we deploy most frequently at Cymantis Labs, each with working Python implementations.

Pattern 1: ReAct Agent (Reason-Act Loop)

The ReAct (Reasoning + Acting) pattern is the workhorse of security automation. The agent receives a task, reasons about what to do next, selects a tool, observes the result, and loops until it reaches a conclusion.

Best for: Alert triage, threat investigation, ad-hoc queries.

Trade-offs: Simple to implement, but can loop inefficiently on complex multi-step tasks. Token costs scale linearly with reasoning depth.

import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable

logger = logging.getLogger("react_agent")

@dataclass
class AgentState:
    """Tracks the agent's reasoning trajectory."""
    task: str
    thoughts: list[str] = field(default_factory=list)
    actions: list[dict] = field(default_factory=list)
    observations: list[str] = field(default_factory=list)
    final_answer: str | None = None
    iteration: int = 0
    max_iterations: int = 10

class ReActAgent:
    """
    Security-focused ReAct agent with bounded iteration
    and structured reasoning traces.
    """

    def __init__(self, llm_client: Any, tools: dict[str, Callable], 
                 system_prompt: str, max_iterations: int = 10):
        self.llm = llm_client
        self.tools = tools
        self.system_prompt = system_prompt
        self.max_iterations = max_iterations

    def run(self, task: str) -> AgentState:
        """Main reasoning loop: reason, act, observe until conclusion."""
        state = AgentState(task=task, max_iterations=self.max_iterations)

        while state.iteration < state.max_iterations:
            state.iteration += 1
            logger.info(f"Iteration {state.iteration}/{state.max_iterations}")

            messages = self._build_messages(state)
            response = self.llm.chat(messages=messages)
            parsed = self._parse_response(response)

            if parsed["type"] == "final_answer":
                state.final_answer = parsed["content"]
                state.thoughts.append(parsed.get("thought", ""))
                logger.info("Agent reached final answer.")
                break

            if parsed["type"] == "action":
                state.thoughts.append(parsed["thought"])
                tool_name = parsed["action"]
                tool_input = parsed["action_input"]
                observation = self._execute_tool(tool_name, tool_input)
                state.actions.append({
                    "tool": tool_name,
                    "input": tool_input,
                    "iteration": state.iteration,
                })
                state.observations.append(observation)

        if state.final_answer is None:
            state.final_answer = (
                "Investigation incomplete — max iterations reached. "
                "Escalate to human analyst."
            )
        return state

    # Other methods:
    # _execute_tool(tool_name, tool_input) -> str: Execute tool with error handling
    # _build_messages(state) -> list[dict]: Build prompt with full reasoning trajectory
    # _parse_response(response) -> dict: Parse LLM output into structured format

Pro Tip: Always set max_iterations conservatively. A runaway ReAct loop burning tokens on circular reasoning is one of the most common failure modes in production. We default to 10 iterations and alert on any task that exceeds 7.

Pattern 2: Plan-and-Execute Agent

The Plan-and-Execute pattern separates strategic planning from tactical execution. A planning LLM decomposes the task into ordered steps, then an execution agent handles each step independently. This produces more predictable behavior for complex, multi-step investigations.

Best for: Incident response workflows, compliance audits, vulnerability assessment campaigns.

Trade-offs: Higher latency (two LLM calls minimum), but much better at long-horizon tasks.

from dataclasses import dataclass, field
from enum import Enum

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class PlanStep:
    description: str
    tools_needed: list[str]
    status: StepStatus = StepStatus.PENDING
    result: str | None = None
    depends_on: list[int] = field(default_factory=list)

@dataclass
class InvestigationPlan:
    objective: str
    steps: list[PlanStep]
    context: dict = field(default_factory=dict)

class PlanAndExecuteAgent:
    """
    Two-phase agent: strategic planning followed by
    step-by-step execution with replanning capability.
    """

    def __init__(self, planner_llm, executor_llm, tools, system_prompt):
        self.planner = planner_llm
        self.executor = executor_llm
        self.tools = tools
        self.system_prompt = system_prompt

    def run(self, objective: str, context: dict | None = None) -> InvestigationPlan:
        """Generate plan and execute steps with dependency checking and replanning."""
        plan = self._generate_plan(objective, context or {})
        logger.info(f"Plan generated: {len(plan.steps)} steps")

        for i, step in enumerate(plan.steps):
            if not self._dependencies_met(plan, step):
                step.status = StepStatus.SKIPPED
                step.result = "Skipped — dependency not met."
                continue

            step.status = StepStatus.RUNNING
            logger.info(f"Executing step {i+1}: {step.description}")

            try:
                result = self._execute_step(step, plan)
                step.result = result
                step.status = StepStatus.COMPLETED

                if self._should_replan(plan, step, result):
                    logger.info("Replanning based on new findings...")
                    plan = self._replan(plan, i)
            except Exception as e:
                step.status = StepStatus.FAILED
                step.result = f"Failed: {str(e)}"
                logger.error(f"Step {i+1} failed: {e}")

        return plan

    # Other methods:
    # _generate_plan(objective, context) -> InvestigationPlan: Generate investigation plan via LLM
    # _dependencies_met(plan, step) -> bool: Check if step dependencies are satisfied
    # _should_replan(plan, step, result) -> bool: Determine if replanning needed based on findings
    # _execute_step(step, plan) -> str: Execute individual step using ReAct agent
    # _replan(plan, current_idx) -> InvestigationPlan: Generate additional steps based on findings

Pro Tip: The replanning trigger is critical for security use cases. A triage step that discovers indicators of active compromise should immediately reprioritize the investigation toward containment, not continue with the original enrichment plan. Design your _should_replan() logic around your incident severity matrix.

Pattern 3: Multi-Agent Pipeline

The Multi-Agent Pipeline distributes responsibilities across specialized agents, each with their own tools, prompts, and reasoning boundaries. This mirrors the structure of mature SOC teams: triage analysts, investigators, responders, and documenters.

Best for: End-to-end incident lifecycle, large-scale operations, organizations with well-defined role separation.

Trade-offs: Highest engineering complexity, but best separation of concerns and easiest to govern.

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone

@dataclass
class AgentMessage:
    sender: str
    recipient: str
    content: dict
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    priority: str = "normal"

class SecurityAgent(ABC):
    """Base class for all agents in the pipeline."""

    def __init__(self, name: str, llm_client, tools: dict, system_prompt: str):
        self.name = name
        self.llm = llm_client
        self.tools = tools
        self.system_prompt = system_prompt

    @abstractmethod
    def process(self, message: AgentMessage) -> AgentMessage:
        """Process incoming message and return result."""
        pass

class TriageAgent(SecurityAgent):
    """First-pass triage: classify severity, extract IOCs, determine if investigation is warranted."""

    def process(self, message: AgentMessage) -> AgentMessage:
        react = ReActAgent(
            llm_client=self.llm, tools=self.tools,
            system_prompt=self.system_prompt, max_iterations=5,
        )
        result = react.run(
            f"Triage this alert and determine severity, IOCs, "
            f"and whether investigation is needed:\n{json.dumps(message.content)}"
        )
        triage_result = json.loads(result.final_answer)
        return AgentMessage(
            sender=self.name,
            recipient="investigator" if triage_result["needs_investigation"] else "closer",
            content={"alert": message.content, "triage": triage_result},
            priority=triage_result.get("severity", "medium"),
        )

class InvestigationAgent(SecurityAgent):
    """Deep-dive investigation: correlate across data sources, build timeline, assess blast radius."""

    def process(self, message: AgentMessage) -> AgentMessage:
        plan_agent = PlanAndExecuteAgent(
            planner_llm=self.llm, executor_llm=self.llm,
            tools=self.tools, system_prompt=self.system_prompt,
        )
        plan = plan_agent.run(
            objective=f"Investigate: {message.content['triage']['summary']}",
            context=message.content,
        )
        investigation = {
            "findings": [s.result for s in plan.steps if s.result],
            "timeline": self._build_timeline(plan),
            "iocs_discovered": self._extract_iocs(plan),
            "recommended_actions": self._recommend_actions(plan),
        }
        return AgentMessage(
            sender=self.name, recipient="responder",
            content={**message.content, "investigation": investigation},
            priority=message.priority,
        )

    # Other methods:
    # _build_timeline(plan): Build chronological event timeline from plan steps
    # _extract_iocs(plan): Extract indicators of compromise from investigation results
    # _recommend_actions(plan): Generate recommended response actions based on findings

class ResponseAgent(SecurityAgent):
    """Execute response actions with human approval gates for high-severity actions."""

    REQUIRES_APPROVAL = {"isolate_host", "disable_account", "block_ip_range"}

    def process(self, message: AgentMessage) -> AgentMessage:
        actions = message.content["investigation"]["recommended_actions"]
        results = []
        for action in actions:
            if action["type"] in self.REQUIRES_APPROVAL:
                approval = self._request_human_approval(action)
                if not approval.approved:
                    results.append({"action": action["type"], "status": "blocked",
                                   "reason": "Human denied approval"})
                    continue
            result = self._execute_action(action)
            results.append(result)
        return AgentMessage(
            sender=self.name, recipient="documenter",
            content={**message.content, "response_results": results},
        )

    # Other methods:
    # _request_human_approval(action): Queue action for human approval workflow
    # _execute_action(action): Execute approved response action via tool

class AgentPipeline:
    """Orchestrates message passing between agents."""

    def __init__(self):
        self.agents: dict[str, SecurityAgent] = {}
        self.message_log: list[AgentMessage] = []

    def register(self, name: str, agent: SecurityAgent):
        self.agents[name] = agent

    def run(self, initial_message: AgentMessage) -> list[AgentMessage]:
        """Route messages through agent pipeline until no more recipients."""
        current = initial_message
        self.message_log.append(current)
        while current.recipient in self.agents:
            agent = self.agents[current.recipient]
            logger.info(f"[Pipeline] {current.sender} -> {agent.name}")
            result = agent.process(current)
            self.message_log.append(result)
            current = result
        return self.message_log

Pattern 4: Autonomous Agent with Guardrails

The most powerful — and most dangerous — pattern. A fully autonomous agent with explicit safety boundaries, rate limits, and blast-radius controls. This is the pattern you use when you want an agent operating continuously (e.g., 24/7 alert processing) without real-time human oversight.

Best for: Continuous monitoring, automated triage at scale, low-severity auto-remediation.

Trade-offs: Requires the most rigorous testing and governance. A misconfigured guardrail can cause either inaction (missing real threats) or overaction (isolating production assets).

from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from collections import defaultdict

@dataclass
class Guardrails:
    """Safety boundaries for autonomous operation."""
    max_isolations_per_hour: int = 3
    max_account_disables_per_hour: int = 5
    max_firewall_rules_per_hour: int = 10
    max_hosts_affected_per_action: int = 10
    protected_hosts: set[str] = field(default_factory=lambda: {
        "dc01.corp.local", "ca01.corp.local", "dns01.corp.local",
    })
    protected_accounts: set[str] = field(default_factory=lambda: {
        "krbtgt", "administrator", "svc_backup",
    })
    allowed_actions: set[str] = field(default_factory=lambda: {
        "enrich_ioc", "query_siem", "query_edr", "create_ticket",
        "isolate_host", "disable_account", "block_ip",
    })
    autonomous_actions: set[str] = field(default_factory=lambda: {
        "enrich_ioc", "query_siem", "query_edr", "create_ticket",
    })
    requires_approval: set[str] = field(default_factory=lambda: {
        "isolate_host", "disable_account", "block_ip",
    })
    autonomous_hours: tuple[int, int] = (6, 22)  # 6 AM – 10 PM

class GuardedAgent:
    """Autonomous agent with enforced safety boundaries."""

    def __init__(self, agent: ReActAgent, guardrails: Guardrails):
        self.agent = agent
        self.guardrails = guardrails
        self.action_counts: dict[str, list[datetime]] = defaultdict(list)

    def execute_action(self, action_name: str, params: dict) -> dict:
        """Execute action after passing all guardrail checks."""
        if action_name not in self.guardrails.allowed_actions:
            return self._deny(action_name, "Action not in allowed set")
        if action_name in self.guardrails.requires_approval:
            if not self._is_within_autonomous_hours():
                return self._deny(action_name, "Outside autonomous hours")
            return self._queue_for_approval(action_name, params)
        if not self._check_rate_limit(action_name):
            return self._deny(action_name, "Rate limit exceeded")
        if not self._check_blast_radius(action_name, params):
            return self._deny(action_name, "Blast radius exceeded")
        if self._targets_protected_asset(params):
            return self._deny(action_name, "Target is a protected asset")
        return self.agent._execute_tool(action_name, params)

    # Other methods:
    # _check_rate_limit(action) -> bool: Verify action hasn't exceeded hourly rate limit
    # _check_blast_radius(action, params) -> bool: Ensure action doesn't affect too many hosts
    # _targets_protected_asset(params) -> bool: Check if action targets protected assets
    # _is_within_autonomous_hours() -> bool: Verify current time is within autonomous operation window
    # _deny(action, reason) -> dict: Return denial response with reason
    # _queue_for_approval(action, params) -> dict: Queue action for human approval workflow

Pro Tip: Guardrails should be defined in external configuration, not hardcoded. Use a YAML or JSON policy file that can be version-controlled, peer-reviewed, and updated without redeploying the agent. Treat guardrail configuration changes with the same rigor as firewall rule changes.


Building the Tool Registry

An agent is only as useful as the tools it can access. The tool registry is the bridge between the agent's reasoning and your security infrastructure. Every SIEM query, EDR action, threat intel lookup, and ticket creation flows through registered tools.

The Tool Decorator Pattern

We use a decorator-based registry that enforces consistent interfaces, automatic input validation, and audit logging across all tools.

import functools
import time
from typing import Any, Callable
from dataclasses import dataclass

@dataclass
class ToolMetadata:
    name: str
    description: str
    parameters: dict
    category: str
    requires_approval: bool = False
    rate_limit: int | None = None
    timeout: int = 30

_TOOL_REGISTRY: dict[str, tuple[Callable, ToolMetadata]] = {}

def security_tool(name: str, description: str, parameters: dict,
                  category: str = "general", requires_approval: bool = False,
                  rate_limit: int | None = None, timeout: int = 30):
    """Decorator to register a function as an agent-accessible tool."""

    def decorator(func: Callable) -> Callable:
        metadata = ToolMetadata(
            name=name, description=description, parameters=parameters,
            category=category, requires_approval=requires_approval,
            rate_limit=rate_limit, timeout=timeout,
        )

        @functools.wraps(func)
        def wrapper(**kwargs) -> dict[str, Any]:
            start = time.time()
            logger.info(f"[Tool:{name}] Invoked with {list(kwargs.keys())}")
            try:
                result = func(**kwargs)
                elapsed = time.time() - start
                logger.info(f"[Tool:{name}] Completed in {elapsed:.2f}s")
                return {"status": "success", "data": result, "elapsed": elapsed}
            except TimeoutError:
                logger.error(f"[Tool:{name}] Timed out after {timeout}s")
                return {"status": "error", "error": f"Timeout after {timeout}s"}
            except Exception as e:
                logger.error(f"[Tool:{name}] Error: {e}")
                return {"status": "error", "error": str(e)}

        _TOOL_REGISTRY[name] = (wrapper, metadata)
        return wrapper
    return decorator

# Other functions:
# get_tool_registry() -> dict[str, Callable]: Return all registered tools as name -> function mapping
# get_tool_descriptions() -> list[dict]: Return tool metadata formatted for LLM system prompts

Security Tool Implementations

Here are production-ready tool wrappers for the most common security integrations.

import httpx

# --- Splunk SIEM ---
@security_tool(
    name="query_splunk",
    description="Execute an SPL query against Splunk and return results.",
    parameters={
        "spl": {"type": "string", "description": "SPL query to execute"},
        "earliest": {"type": "string", "description": "Time range start", "default": "-24h"},
        "latest": {"type": "string", "description": "Time range end", "default": "now"},
    },
    category="siem", rate_limit=30, timeout=120,
)
def query_splunk(spl: str, earliest: str = "-24h", latest: str = "now") -> dict:
    client = httpx.Client(
        base_url=settings.SPLUNK_URL,
        headers={"Authorization": f"Bearer {settings.SPLUNK_TOKEN}"},
        verify=settings.SPLUNK_VERIFY_SSL,
    )
    response = client.post("/services/search/jobs", data={
        "search": f"search {spl}",
        "earliest_time": earliest,
        "latest_time": latest,
        "output_mode": "json",
        "exec_mode": "oneshot",
    })
    response.raise_for_status()
    return response.json()

# Other security tool implementations:
# crowdstrike_host_lookup(identifier) -> dict: Look up host details in CrowdStrike Falcon by hostname or AID
# lookup_ioc(indicator, indicator_type) -> dict: Check indicator against VirusTotal and AbuseIPDB
# create_incident_ticket(short_description, description, severity, assignment_group) -> dict: Create ServiceNow incident ticket

Pro Tip: Every tool should return structured data, not raw API responses. Your agent's reasoning is only as good as the observations it receives. Normalize tool outputs into a consistent schema — status, data, error — so the agent's parsing logic doesn't need to handle 15 different response formats.


Prompt Engineering for Security Agents — The Cymantis View

Prompt engineering for security agents is fundamentally different from prompt engineering for chatbots. You're not optimizing for helpfulness or conversational fluency. You're optimizing for precision, safety, and auditable reasoning. A chatbot that hallucinates gives a bad answer. An agent that hallucinates executes a bad action.

The Cymantis Prompt Architecture

At Cymantis Labs, we use a four-layer prompt architecture for every security agent:

  1. Identity Layer — Who the agent is, what it does, what it must never do.
  2. Knowledge Layer — Organizational context: network topology, asset criticality, compliance requirements.
  3. Methodology Layer — How the agent should reason: investigation frameworks, escalation criteria, decision thresholds.
  4. Output Layer — Structured output format requirements for downstream consumption.

System Prompt Template: Threat Investigation Agent

# Layer 1: Identity
identity:
  role: "Senior Security Analyst Agent"
  organization: "{{org_name}}"
  clearance: "Authorized to query SIEM, EDR, threat intel, and CMDB"
  constraints:
    - "Never execute destructive actions without human approval"
    - "Never exfiltrate, store, or log raw credentials or PII"
    - "If confidence is below 70%, escalate to human analyst"
    - "Always cite evidence for every conclusion"

# Layer 2: Knowledge (injected via RAG or direct context)
knowledge:
  network_segments:
    - name: "PCI Zone"
      cidr: "10.100.0.0/16"
      escalation: "PCI-IR team, 15-min SLA"
    - name: "Corporate"
      cidr: "10.0.0.0/8"
      escalation: "SOC Tier 2, 1-hour SLA"
  critical_assets:
    - pattern: "dc*.corp.local"
      classification: "Tier 0 — Domain Controllers"
    - pattern: "sql-prod-*"
      classification: "Tier 1 — Production Databases"

# Layer 3: Methodology
methodology:
  investigation_framework: |
    1. CONTEXTUALIZE — Gather alert details, map to MITRE ATT&CK.
    2. SCOPE — Identify affected entities (hosts, users, IPs).
    3. ENRICH — Query SIEM, EDR, and threat intel for each entity.
    4. CORRELATE — Cross-reference findings across data sources.
    5. ASSESS — Determine severity, confidence, and blast radius.
    6. RECOMMEND — Propose containment and remediation actions.
    7. DOCUMENT — Produce structured investigation report.
  severity_matrix:
    critical: "Active compromise, data exfiltration, lateral movement to Tier 0"
    high: "Confirmed malicious activity, single host, no lateral movement"
    medium: "Suspicious activity, requires further investigation"
    low: "Policy violation, informational finding"

# Layer 4: Output
output_format:
  type: "json"
  schema:
    summary: "string — one-paragraph finding"
    severity: "critical | high | medium | low"
    confidence: "float 0.0–1.0"
    mitre_techniques: "array of T-codes"
    affected_entities: "array of {type, identifier, risk_score}"
    evidence: "array of {source, query, finding}"
    recommended_actions: "array of {action, priority, requires_approval}"
    timeline: "array of {timestamp, event, source}"

Prompt Templates for Common Security Tasks

Compliance Validation Prompt:

You are a compliance validation agent for {{framework}} (e.g., NIST 800-53, PCI DSS 4.0, SOC 2).

Your task is to evaluate whether the following control implementation satisfies the control requirement.

Control ID: {{control_id}}
Control Requirement: {{requirement_text}}
Implementation Evidence: {{evidence}}

Evaluate the implementation against the control requirement. For each gap identified:
1. Cite the specific sub-requirement that is not met
2. Classify the gap severity (critical, major, minor)
3. Recommend a specific remediation action
4. Estimate remediation effort (hours)

Output your assessment as structured JSON.

Incident Documentation Prompt:

Generate an incident report from the following investigation data.
Follow the organization's IR documentation standard (NIST SP 800-61 format).

Investigation data: {{investigation_json}}

The report MUST include:
1. Executive Summary (3–5 sentences, no jargon)
2. Technical Timeline (UTC timestamps, source-cited)
3. Impact Assessment (affected systems, data, users)
4. Root Cause Analysis
5. Containment Actions Taken
6. Remediation Recommendations
7. Indicators of Compromise (table format)
8. Lessons Learned

Ensure every factual claim references a specific evidence item from the investigation data.
Do not speculate. Mark any gaps in evidence as "[REQUIRES MANUAL VERIFICATION]".

Cymantis Recommendations: Prompt Engineering Principles

  1. Explicit Negatives Over Implicit Positives. Don't say "be careful." Say "Never execute isolate_host on assets matching pattern dc*.corp.local."

  2. Evidence-Linked Reasoning. Every conclusion the agent produces must reference a specific tool output. This makes hallucination detectable in post-hoc review.

  3. Confidence Calibration. Force the agent to output a numerical confidence score. Calibrate the threshold through testing. Agents that are 90% confident and wrong 30% of the time need prompt tuning, not more tools.

  4. Fail-Safe Defaults. If the agent cannot determine severity, it should default to HIGH, not LOW. If it cannot determine scope, it should assume BROAD, not NARROW. Design for the failure mode that causes least harm.

  5. Version Control Your Prompts. Prompts are code. They belong in version control with semantic versioning, changelogs, and peer review. A prompt change that shifts the agent's severity assessment threshold is a production-impacting change.

Pro Tip: Run your system prompts through a red-team exercise before production deployment. Have a colleague write adversarial inputs designed to make the agent violate its constraints. If your agent can be prompt-injected into executing unscoped actions, your guardrails are insufficient regardless of how good the system prompt looks.


Testing Security Agents

Security agents operate in environments where errors have real consequences. A test suite isn't optional — it's a prerequisite for production deployment. We test at four levels: unit, integration, adversarial, and evaluation.

Level 1: Unit Tests — Tool Functions

Every tool function gets standard unit tests with mocked external dependencies.

import pytest
from unittest.mock import patch, MagicMock
from tools.siem import query_splunk
from tools.threat_intel import lookup_ioc

class TestSplunkTool:
    """Unit tests for the Splunk query tool."""

    @patch("tools.siem.httpx.Client")
    def test_query_splunk_success(self, mock_client_class):
        mock_client = MagicMock()
        mock_client_class.return_value = mock_client
        mock_client.post.return_value = MagicMock(
            status_code=200,
            json=lambda: {"results": [{"src_ip": "10.0.0.1", "count": 42}]},
        )
        result = query_splunk(
            spl='index=firewall action=blocked | stats count by src_ip',
            earliest="-1h",
        )
        assert result["status"] == "success"
        assert len(result["data"]["results"]) == 1
        mock_client.post.assert_called_once()

    # Other test methods:
    # test_query_splunk_timeout(): Verify timeout handling when Splunk API times out
    # test_query_splunk_auth_failure(): Verify error handling for authentication failures

class TestThreatIntelTool:
    """Unit tests for threat intelligence lookups."""

    @patch("tools.threat_intel.httpx.Client")
    def test_lookup_known_malicious_ip(self, mock_client_class):
        mock_client = MagicMock()
        mock_client_class.return_value = mock_client
        mock_client.get.return_value = MagicMock(
            status_code=200,
            json=lambda: {
                "data": {
                    "attributes": {
                        "last_analysis_stats": {"malicious": 45, "harmless": 2}
                    }
                }
            },
        )
        result = lookup_ioc(indicator="198.51.100.1", indicator_type="ip")
        assert result["status"] == "success"
        assert "virustotal" in result["data"]

Level 2: Integration Tests — Agent Reasoning

Integration tests validate that the agent correctly chains tool calls and produces sound reasoning. We use deterministic LLM responses (mocked or few-shot-prompted) to test specific scenarios.

class TestReActAgentIntegration:
    """Integration tests for the ReAct agent reasoning loop."""

    def setup_method(self):
        self.mock_llm = MockLLM(responses=[
            json.dumps({
                "type": "action",
                "thought": "Need to check SIEM for related alerts on this IP.",
                "action": "query_splunk",
                "action_input": {"spl": "index=firewall src_ip=198.51.100.1"},
            }),
            json.dumps({
                "type": "action",
                "thought": "SIEM shows 500 blocked connections. Check threat intel.",
                "action": "lookup_ioc",
                "action_input": {"indicator": "198.51.100.1", "indicator_type": "ip"},
            }),
            json.dumps({
                "type": "final_answer",
                "thought": "Known malicious IP with active scanning. High severity.",
                "content": json.dumps({
                    "severity": "high", "confidence": 0.92,
                    "summary": "Active scanning from known malicious IP.",
                }),
            }),
        ])
        self.mock_tools = {
            "query_splunk": lambda **kw: {"results": [{"count": 500}]},
            "lookup_ioc": lambda **kw: {"virustotal": {"malicious": 45}},
        }

    def test_agent_investigates_suspicious_ip(self):
        agent = ReActAgent(
            llm_client=self.mock_llm, tools=self.mock_tools,
            system_prompt="You are a security analyst.", max_iterations=10,
        )
        state = agent.run("Investigate suspicious IP 198.51.100.1")
        assert state.final_answer is not None
        assert state.iteration == 3
        assert len(state.actions) == 2
        assert state.actions[0]["tool"] == "query_splunk"
        assert state.actions[1]["tool"] == "lookup_ioc"
        result = json.loads(state.final_answer)
        assert result["severity"] == "high"
        assert result["confidence"] > 0.8

Level 3: Adversarial Testing — Red Teaming the Agent

This is where most teams stop — and where the real vulnerabilities live. Adversarial testing validates that the agent's guardrails hold under hostile conditions.

class TestGuardrailEnforcement:
    """Red team tests for agent guardrails."""

    def setup_method(self):
        self.guardrails = Guardrails(
            protected_hosts={"dc01.corp.local", "ca01.corp.local"},
            protected_accounts={"krbtgt", "administrator"},
            max_isolations_per_hour=3,
        )
        self.guarded = GuardedAgent(agent=MagicMock(), guardrails=self.guardrails)

    def test_cannot_isolate_domain_controller(self):
        result = self.guarded.execute_action(
            "isolate_host", {"target": "dc01.corp.local"}
        )
        assert result["status"] == "denied"
        assert "protected asset" in result["reason"]

    # Other test methods:
    # test_rate_limit_enforcement(): Verify agent cannot exceed isolation rate limit
    # test_blast_radius_limit(): Verify agent cannot affect more hosts than blast radius allows
    # test_unknown_action_blocked(): Verify agent cannot execute actions not in allowed set

Level 4: Evaluation Harness — Benchmarking Agent Quality

Beyond pass/fail tests, you need a benchmark that measures the agent's quality across a representative set of scenarios.

import csv
from pathlib import Path

@dataclass
class EvalCase:
    case_id: str
    description: str
    input_data: dict
    expected_severity: str
    expected_actions: list[str]
    expected_iocs: list[str]

class AgentEvaluator:
    """Benchmark agent performance against labeled evaluation cases."""

    def __init__(self, agent, eval_cases: list[EvalCase]):
        self.agent = agent
        self.cases = eval_cases
        self.results = []

    def run_evaluation(self) -> dict:
        """Run agent against all evaluation cases and compute aggregate scores."""
        for case in self.cases:
            state = self.agent.run(json.dumps(case.input_data))
            result = json.loads(state.final_answer) if state.final_answer else {}
            score = {
                "case_id": case.case_id,
                "severity_correct": result.get("severity") == case.expected_severity,
                "actions_precision": self._precision(
                    result.get("recommended_actions", []), case.expected_actions
                ),
                "ioc_recall": self._recall(
                    result.get("iocs_discovered", []), case.expected_iocs
                ),
                "iterations_used": state.iteration,
                "completed": state.final_answer is not None,
            }
            self.results.append(score)
        return self._aggregate_scores()

    # Other methods:
    # _precision(predicted, expected) -> float: Calculate precision score for predicted vs expected actions
    # _recall(predicted, expected) -> float: Calculate recall score for IOCs discovered vs expected
    # _aggregate_scores() -> dict: Compute aggregate metrics across all evaluation cases
    # export_report(path): Export evaluation results to CSV file

Pro Tip: Build your evaluation dataset from real incidents, not synthetic scenarios. Take 50 closed incidents from your ticketing system, extract the initial alert data, and label each with the investigation outcome. This gives you a benchmark grounded in your actual threat landscape. Update the dataset quarterly as your threat profile evolves.


Production Deployment

A security agent running on a developer's laptop is a prototype. A security agent running in production is infrastructure that requires the same operational rigor as any other critical system: containerization, observability, health checks, and failure handling.

Containerization

# Dockerfile for security agent service
FROM python:3.12-slim AS base

# Security hardening
RUN groupadd -r agent && useradd -r -g agent agent
RUN apt-get update && apt-get install -y --no-install-recommends \
    ca-certificates curl && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Install dependencies first for layer caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Drop privileges
USER agent

# Health check endpoint
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
    CMD curl -f http://localhost:8080/health || exit 1

EXPOSE 8080

CMD ["python", "-m", "uvicorn", "agent.api:app", "--host", "0.0.0.0", "--port", "8080"]
# docker-compose.yml — Agent with supporting services
version: "3.9"

services:
  security-agent:
    build: .
    ports:
      - "8080:8080"
    environment:
      - AGENT_ENV=production
      - LOG_LEVEL=INFO
      - SPLUNK_URL=${SPLUNK_URL}
      - SPLUNK_TOKEN=${SPLUNK_TOKEN}
      - CS_CLIENT_ID=${CS_CLIENT_ID}
      - CS_CLIENT_SECRET=${CS_CLIENT_SECRET}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379/0
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
    depends_on:
      - redis
      - otel-collector
    deploy:
      resources:
        limits:
          cpus: "2"
          memory: 4G
      restart_policy:
        condition: on-failure
        max_attempts: 3

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    ports:
      - "4317:4317"   # OTLP gRPC
      - "4318:4318"   # OTLP HTTP
      - "8889:8889"   # Prometheus metrics
    volumes:
      - ./config/otel-collector.yaml:/etc/otelcol-contrib/config.yaml

volumes:
  redis-data:

Observability Stack

Agents are non-deterministic systems. You can't debug them with stack traces alone. You need structured logs, metrics, and distributed traces that capture the full reasoning trajectory.

import structlog
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider

# --- Tracing Setup ---
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("security-agent")

# --- Metrics Setup ---
meter = metrics.get_meter("security-agent")
agent_invocations = meter.create_counter("agent.invocations", description="Total agent invocations")
agent_duration = meter.create_histogram("agent.duration_seconds", description="Agent execution duration")
tool_calls = meter.create_counter("agent.tool_calls", description="Tool invocations by tool name")
guardrail_denials = meter.create_counter("agent.guardrail_denials", description="Actions denied by guardrails")
token_usage = meter.create_histogram("agent.token_usage", description="LLM tokens consumed per invocation")

# --- Structured Logging ---
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)
log = structlog.get_logger()

class ObservableAgent:
    """Wrapper that adds observability to any agent."""

    def __init__(self, agent, agent_name: str):
        self.agent = agent
        self.name = agent_name

    def run(self, task: str) -> AgentState:
        """Run agent with full observability: tracing, metrics, and structured logging."""
        with tracer.start_as_current_span(
            f"agent.run.{self.name}",
            attributes={"agent.name": self.name, "agent.task": task[:200]},
        ) as span:
            start = time.time()
            agent_invocations.add(1, {"agent": self.name})
            try:
                state = self.agent.run(task)
                duration = time.time() - start
                agent_duration.record(duration, {"agent": self.name})
                span.set_attribute("agent.iterations", state.iteration)
                span.set_attribute("agent.tools_used", len(state.actions))
                span.set_attribute("agent.completed", state.final_answer is not None)
                for action in state.actions:
                    tool_calls.add(1, {"agent": self.name, "tool": action["tool"]})
                log.info("agent_completed", agent=self.name, iterations=state.iteration,
                         duration=duration, tools_used=[a["tool"] for a in state.actions])
                return state
            except Exception as e:
                span.set_status(trace.StatusCode.ERROR, str(e))
                log.error("agent_failed", agent=self.name, error=str(e))
                raise

Health Checks and Circuit Breakers

from fastapi import FastAPI, HTTPException
from datetime import datetime, timezone
import asyncio

app = FastAPI(title="Security Agent Service")

class CircuitBreaker:
    """Circuit breaker for external dependencies. Prevents cascading failures when a tool's backend is down."""

    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures: dict[str, int] = {}
        self.open_since: dict[str, datetime] = {}

    def is_open(self, service: str) -> bool:
        """Check if circuit breaker is open for a service."""
        if service not in self.open_since:
            return False
        elapsed = (datetime.now(timezone.utc) - self.open_since[service]).seconds
        if elapsed > self.recovery_timeout:
            del self.open_since[service]
            self.failures[service] = 0
            return False
        return True

    # Other methods:
    # record_failure(service): Record failure and open circuit if threshold exceeded
    # record_success(service): Reset failure count and close circuit on successful request

circuit_breaker = CircuitBreaker()

@app.get("/health")
async def health_check():
    """Health check endpoint for orchestrators and load balancers."""
    checks = {
        "agent": "healthy",
        "llm_api": await _check_llm_health(),
        "splunk": await _check_splunk_health(),
        "redis": await _check_redis_health(),
    }
    all_healthy = all(v == "healthy" for v in checks.values())
    return {
        "status": "healthy" if all_healthy else "degraded",
        "checks": checks,
        "timestamp": datetime.now(timezone.utc).isoformat(),
    }

@app.post("/investigate")
async def investigate(request: InvestigationRequest):
    """Submit an investigation task to the agent."""
    if circuit_breaker.is_open("llm_api"):
        raise HTTPException(status_code=503, detail="LLM API circuit breaker is open. Retry later.")
    agent = ObservableAgent(agent=build_react_agent(), agent_name="threat_investigator")
    state = agent.run(request.task)
    return {
        "result": state.final_answer,
        "iterations": state.iteration,
        "tools_used": [a["tool"] for a in state.actions],
        "trace_id": trace.get_current_span().get_span_context().trace_id,
    }

Pro Tip: Expose the OpenTelemetry trace ID in every API response. When an analyst questions an agent's conclusion, you can pull the full reasoning trace — every thought, tool call, and observation — by trace ID. This is the difference between "the agent said so" and "here's exactly why the agent reached that conclusion, step by step."


Governance & Compliance

Autonomous agents operating in security environments must meet a higher governance bar than typical software. Regulatory frameworks (SOC 2, NIST CSF, PCI DSS, FedRAMP) increasingly require evidence of control over automated decision-making. The governance layer isn't optional — it's a production requirement.

Audit Trail Architecture

Every agent action must produce an immutable, queryable audit record.

from dataclasses import dataclass, asdict
from datetime import datetime, timezone
import hashlib

@dataclass
class AuditRecord:
    """Immutable audit record for agent actions."""
    record_id: str
    timestamp: str
    agent_name: str
    task_id: str
    action_type: str  # "tool_call", "decision", "escalation", "approval"
    action_detail: dict
    input_data: dict
    output_data: dict
    reasoning: str
    confidence: float | None
    human_approved: bool
    approved_by: str | None
    guardrails_evaluated: list[str]
    guardrails_triggered: list[str]
    model_id: str
    prompt_version: str
    trace_id: str

    def integrity_hash(self) -> str:
        """SHA-256 hash for tamper detection."""
        record_str = json.dumps(asdict(self), sort_keys=True, default=str)
        return hashlib.sha256(record_str.encode()).hexdigest()

class AuditLogger:
    """Write audit records to immutable storage."""

    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.chain_hash: str | None = None

    def log(self, record: AuditRecord):
        """Write audit record with chain hashing for tamper detection."""
        record_dict = asdict(record)
        record_dict["integrity_hash"] = record.integrity_hash()
        record_dict["previous_hash"] = self.chain_hash
        self.storage.write(record_dict)
        self.chain_hash = record_dict["integrity_hash"]
        log.info("audit_record", record_id=record.record_id,
                 action=record.action_type, agent=record.agent_name)

Human-in-the-Loop Approval Workflows

High-impact actions require explicit human approval. The approval system must be asynchronous (analysts aren't always watching), time-bounded (approvals shouldn't wait forever), and auditable (who approved what, when).

@dataclass
class ApprovalRequest:
    request_id: str
    agent_name: str
    action: str
    params: dict
    justification: str
    severity: str
    created_at: datetime
    expires_at: datetime
    status: str = "pending"  # pending, approved, denied, expired
    decided_by: str | None = None
    decided_at: datetime | None = None

class ApprovalManager:
    """Manages human approval workflows for agent actions."""

    def __init__(self, notification_service, storage, default_ttl: int = 3600):
        self.notifications = notification_service
        self.storage = storage
        self.default_ttl = default_ttl

    async def request_approval(
        self, agent_name: str, action: str, params: dict,
        justification: str, severity: str,
    ) -> ApprovalRequest:
        """Create approval request and notify via appropriate channel based on severity."""
        now = datetime.now(timezone.utc)
        request = ApprovalRequest(
            request_id=f"apr-{now.timestamp():.0f}",
            agent_name=agent_name, action=action, params=params,
            justification=justification, severity=severity,
            created_at=now, expires_at=now + timedelta(seconds=self.default_ttl),
        )
        self.storage.save(request)
        await self.notifications.send(
            channel=self._get_channel(severity),
            message=self._format_approval_request(request),
        )
        return request

    # Other methods:
    # wait_for_decision(request_id, poll_interval) -> ApprovalRequest: Poll for human decision with expiration
    # _get_channel(severity) -> str: Map severity to notification channel (PagerDuty/Slack/email)
    # _format_approval_request(request) -> str: Format approval request message for notification

Data Handling Policies

Agents process sensitive data — alert payloads, user identities, IP addresses, potentially PII. Your data handling policy must be enforced programmatically, not just documented in a wiki.

import re

class DataSanitizer:
    """Enforce data handling policies on agent inputs and outputs."""

    PII_PATTERNS = {
        "ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
        "credit_card": re.compile(r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b"),
        "email": re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"),
    }
    RESTRICTED_FIELDS = {"password", "secret", "token", "private_key", "ssn"}

    @classmethod
    def sanitize_for_llm(cls, data: dict) -> dict:
        """Remove sensitive fields and redact PII before LLM inference."""
        sanitized = {}
        for key, value in data.items():
            if key.lower() in cls.RESTRICTED_FIELDS:
                sanitized[key] = "[REDACTED]"
            elif isinstance(value, str):
                sanitized[key] = cls._redact_pii(value)
            elif isinstance(value, dict):
                sanitized[key] = cls.sanitize_for_llm(value)
            else:
                sanitized[key] = value
        return sanitized

    # Other methods:
    # _redact_pii(text) -> str: Redact PII patterns from text using regex substitution

Pro Tip: Run a data-flow audit before deploying any agent. Trace every piece of data from ingestion to LLM call to output. Map which external APIs receive what data. If your agent sends alert payloads containing internal IP addresses to an external LLM API, that's a data leakage risk that needs to be evaluated against your security policy and any relevant regulatory requirements.


The Agent Development Lifecycle

Building a production security agent isn't a weekend project. It's a disciplined engineering effort with five distinct phases. Skipping phases creates technical debt that compounds rapidly when the agent is operating autonomously.

Phase 1: Discovery & Scoping (1–2 Weeks)

Objective: Define what the agent will do, what it won't do, and how success is measured.

Activities:

  1. Shadow analysts performing the target workflow. Document every decision point, data source queried, and tool used.
  2. Identify the decision boundary: which decisions can the agent make autonomously, which require human approval, which are out of scope.
  3. Define success metrics: accuracy, latency, token cost, analyst time saved, false positive/negative rates.
  4. Inventory available data sources and APIs. Validate access, rate limits, and data freshness.
  5. Draft the agent's threat model — what can go wrong if the agent makes a mistake?

Deliverables: Agent design document, tool inventory, success criteria, risk assessment.

Phase 2: Prototype & Validate (2–3 Weeks)

Objective: Build a working prototype that handles the happy path and validates the core reasoning loop.

Activities:

  1. Implement the core agent architecture (choose from the patterns above).
  2. Build tool connectors for primary data sources.
  3. Write the system prompt and test against 10–20 representative scenarios.
  4. Iterate on prompt and tool design based on failure analysis.
  5. Validate with subject matter experts: show the agent's reasoning traces to senior analysts and incorporate feedback.

Deliverables: Working prototype, initial evaluation results, prompt v0.1, SME feedback log.

Phase 3: Harden & Test (2–4 Weeks)

Objective: Make the agent production-safe through systematic testing and guardrail implementation.

Activities:

  1. Implement the full guardrail framework (rate limits, blast radius, protected assets, approval workflows).
  2. Build the evaluation harness with 50+ labeled test cases from real incidents.
  3. Conduct adversarial testing: prompt injection, edge cases, malformed inputs, cascading tool failures.
  4. Performance testing: measure latency, token consumption, and throughput under load.
  5. Security review: audit data flows, credential handling, API key management, and network access.

Deliverables: Test suite (all four levels), guardrail configuration, security review report, benchmark results.

Phase 4: Deploy & Monitor (1–2 Weeks)

Objective: Deploy to production with full observability and a controlled rollout.

Activities:

  1. Containerize the agent with health checks and resource limits.
  2. Deploy observability stack: structured logging, metrics, distributed tracing.
  3. Shadow mode deployment: the agent processes real alerts and produces outputs, but no actions are executed. Human analysts compare agent recommendations to their own decisions.
  4. Gradual rollout: start with low-severity alerts, expand scope as confidence grows.
  5. Establish on-call procedures for agent incidents (misbehavior, outages, guardrail triggers).

Deliverables: Production deployment, monitoring dashboards, runbook, shadow mode evaluation report.

Phase 5: Operate & Evolve (Ongoing)

Objective: Continuously improve the agent based on production feedback and evolving threats.

Activities:

  1. Weekly review of agent decisions: sample 10% of processed alerts and validate accuracy.
  2. Monthly prompt tuning based on false positive/negative trends.
  3. Quarterly evaluation harness refresh with new incident data.
  4. Tool registry updates as your security stack evolves.
  5. Guardrail policy reviews aligned with compliance audit cycles.

Deliverables: Monthly performance reports, updated prompts and tools, quarterly evaluation results.

Pro Tip: Shadow mode is non-negotiable. We've seen organizations skip straight from prototype to autonomous operation and regret it within the first week. Two weeks of shadow mode — where the agent runs in parallel with human analysts, producing recommendations but not acting — gives you the data to calibrate guardrails, tune prompts, and build analyst trust.


The Cymantis View: Where Agentic Engineering Is Headed

We're at the beginning of a fundamental shift in how security operations are engineered. The organizations that invest in agentic engineering capability now will have a compounding advantage over the next 3–5 years. Here is our view on where this is headed:

Custom agents become infrastructure, not experiments. Within two years, every mature SOC will have a portfolio of purpose-built agents — not one monolithic AI, but a team of specialized agents each owning a slice of the operational workflow. The engineering discipline of building, testing, and governing these agents will be as foundational as detection engineering is today.

Agent-to-agent collaboration will replace human handoffs. The multi-agent pipeline pattern will evolve into fully autonomous agent teams that negotiate priorities, share context, and coordinate response actions across organizational boundaries. The human role shifts from operator to governor.

Evaluation and governance become the bottleneck. The limiting factor won't be building agents — it will be proving they work safely. Organizations that invest in evaluation harnesses, audit infrastructure, and compliance frameworks for autonomous systems will ship agents faster than those that treat governance as an afterthought.

Prompt engineering matures into a formal discipline. System prompts for security agents will be version-controlled, peer-reviewed, tested, and audited with the same rigor as firewall rules and detection logic. Prompt drift — unintended behavioral changes from model updates — will become a tracked risk.

The talent model shifts. Security teams will need engineers who can build agents, not just analysts who can investigate alerts. The hybrid security-engineer/ML-engineer role — what we call the "agentic engineer" — will be the most in-demand position in cybersecurity by 2028.

Build the capability now. The threat landscape isn't waiting.


Final Thoughts

Building custom security agents is hard. It requires security domain expertise, software engineering discipline, ML literacy, and operational rigor — all applied simultaneously. There are no shortcuts.

But the payoff is transformative. An organization with a well-governed portfolio of custom security agents doesn't just respond faster — it operates at a fundamentally different level. Investigations that took hours complete in seconds. Compliance validations that consumed analyst-weeks happen continuously. Threat detection that was limited by human attention becomes bounded only by data availability and compute.

The playbook in this guide gives you the blueprints: four architecture patterns to choose from, a tool registry pattern that scales, prompt engineering principles battle-tested in production, a testing methodology that covers unit through adversarial, deployment infrastructure that provides observability and resilience, and governance frameworks that satisfy auditors without strangling innovation.

Start with one agent. Pick the workflow with the highest frequency and the most available data. Build the prototype. Run shadow mode. Earn trust. Then expand.

The agentic era rewards builders. Start building.


Resources & References

Architecture & Patterns

Security-Specific

Testing & Evaluation

Observability & Operations

Governance & Compliance


Cymantis Labs builds security automation for teams that refuse to compromise between speed and rigor. If your organization is ready to move from vendor-dependent AI to custom-built agentic capability, reach out — we'll help you build it right.