Skip to content

System Architecture

Cognition Engines Architecture

Component Breakdown

1. A2A Layer (a2a/)

The Agent-to-Agent layer is the network-facing surface of Cognition Engines.

FilePurpose
server.pyFastAPI application with lifespan management, CORS, route registration, and MCP mount at /mcp
config.pyMulti-source configuration: YAML file → environment variables → defaults
auth.pyBearer token authentication with constant-time comparison (secrets.compare_digest)
mcp_server.pyMCP Server with 5 tool handlers; exposes mcp_app Server instance for mounting
mcp_schemas.pyPydantic input schemas for MCP tool definitions (auto-generates JSON Schema for tool discovery)
__init__.pyPackage exports

Endpoints:

  • POST /cstp — JSON-RPC 2.0 dispatch (authenticated)
  • POST|GET /mcp — MCP Streamable HTTP transport (tool calls + event streams)
  • GET /health — Health check with uptime (unauthenticated)
  • GET /.well-known/agent.json — A2A agent card for discovery (unauthenticated)

2. CSTP Services (a2a/cstp/)

Each JSON-RPC method is backed by a dedicated service module:

ServiceMethodDescription
query_service.pycstp.queryDecisionsSemantic search over ChromaDB with optional BM25 hybrid
decision_service.pycstp.recordDecisionRecord new decisions as YAML + ChromaDB index
decision_service.pycstp.reviewDecisionRecord outcome of a past decision for calibration
guardrails_service.pycstp.checkGuardrailsEvaluate context against all loaded guardrails
guardrails_service.pycstp.listGuardrailsList active guardrail definitions
calibration_service.pycstp.getCalibrationCompute Brier scores, confidence buckets, and variance
attribution_service.pycstp.attributeOutcomesAuto-attribute outcomes via PR stability
drift_service.pycstp.checkDriftCompare 30-day vs. 90-day+ calibration
reindex_service.pycstp.reindexDelete and rebuild ChromaDB collection
deliberation_tracker.py(auto-hook)Tracks inputs and steps for reasoning traces
bridge_extractor.py(auto-hook)Extracts structure/function from decision text
dispatcher.py(router)Maps JSON-RPC method names to async handlers
models.py(shared)Pydantic-style dataclasses for request/response objects
bm25_index.py(internal)BM25Okapi keyword index with caching and score merging

3. MCP Layer (a2a/mcp_server.py + a2a/mcp_schemas.py)

The MCP layer provides Model Context Protocol access to CSTP capabilities. It is a thin bridge — Pydantic schemas validate inputs, then tool handlers delegate directly to CSTP service functions.

ComponentFileDescription
mcp_appmcp_server.pyServer("cstp-decisions") instance — importable for ASGI mounting
list_tools()mcp_server.pyReturns 5 Tool definitions with JSON Schema from Pydantic models
call_tool()mcp_server.pyDispatches tool calls to _handle_* functions
run_stdio()mcp_server.pyRuns the MCP server with stdio transport
StreamableHTTPSessionManagerserver.pyManages Streamable HTTP sessions; mounted at /mcp via lifespan
QueryDecisionsInputmcp_schemas.pySchema for query_decisions tool
CheckActionInputmcp_schemas.pySchema for check_action tool
LogDecisionInputmcp_schemas.pySchema for log_decision tool
ReviewOutcomeInputmcp_schemas.pySchema for review_outcome tool
GetStatsInputmcp_schemas.pySchema for get_stats tool

MCP tools → CSTP method mapping:

MCP ToolCSTP MethodService
query_decisionscstp.queryDecisionsquery_service.py
check_actioncstp.checkGuardrailsguardrails_service.py
log_decisioncstp.recordDecisiondecision_service.py
review_outcomecstp.reviewDecisiondecision_service.py
get_statscstp.getCalibrationcalibration_service.py

4. Core Engines (src/cognition_engines/)

The library-level logic, usable independently of the HTTP server.

Accelerators (accelerators/)

ClassFileDescription
SemanticIndexsemantic_index.pyChromaDB HTTP API wrapper with Gemini embedding generation, decision indexing, filtered vector query

Guardrails (guardrails/)

ClassFileDescription
GuardrailEngineengine.pyLoads YAML guardrails, evaluates conditions + requirements, returns pass/block/warn results
GuardrailConditionengine.pyParsed condition with operator support (<, >, =, !=, in)
GuardrailRequirementengine.pyBoolean requirement check (field must be true)
Guardrailengine.pyFull guardrail definition with scope, conditions, requirements, action, message
ConditionEvaluatorevaluators.pyProtocol for pluggable evaluators
FieldConditionevaluators.pyv2 field comparison with extended operators
SemanticConditionevaluators.pyChecks semantic similarity to past decisions
TemporalConditionevaluators.pyTime-window based conditions
AggregateConditionevaluators.pyStatistical aggregate checks (e.g., success rate < 50%)
CompoundConditionevaluators.pyAND/OR logical composition of conditions
AuditLogaudit.pyManages JSON audit trail for guardrail evaluations
AuditRecordaudit.pyPer-decision audit record with override support

Patterns (patterns/)

ClassFileDescription
PatternDetectordetector.pyLoads YAML decisions, generates calibration reports, detects anti-patterns, produces category analysis
CalibrationBucketdetector.pyConfidence bucket with Brier score computation
AntiPatterndetector.pyDetected anti-pattern (e.g., overcalibration, flip-flopping)

5. Web Dashboard (dashboard/)

A Flask-based web UI for human-friendly decision browsing and outcome review.

FileDescription
app.pyFlask routes: decisions list, detail, review, calibration
config.pyDashboard-specific config (CSTP URL, auth, port)
auth.pyHTTP Basic Auth decorator
cstp_client.pyAsync CSTP client for backend communication
models.pyDashboard-specific data models
templates/Jinja2 HTML templates (base, decisions, decision, review, calibration)
static/CSS/JS assets

6. Guardrail Definitions (guardrails/)

FileDescription
cornerstone.yamlNon-negotiable block-level rules (production review, confidence minimum, backtest requirement)
templates/financial.yamlTemplate for financial/trading projects (risk assessment, position limits, audit trail)
templates/production-safety.yamlTemplate for production deployments (code review, CI, rollback, no-Friday deploys)

7. CLI (bin/cognition)

A standalone Python CLI providing all core operations without the HTTP server.


Data Flow

Query Flow

mermaid
sequenceDiagram
    participant Agent
    participant FastAPI as POST /cstp
    participant Dispatcher
    participant QuerySvc as query_service
    participant Gemini as Gemini API
    participant Chroma as ChromaDB
    participant BM25 as BM25 Index

    Agent->>FastAPI: JSON-RPC queryDecisions
    FastAPI->>Dispatcher: dispatch
    Dispatcher->>QuerySvc: query_decisions()
    QuerySvc->>Gemini: generate embedding
    Gemini-->>QuerySvc: 768-dim vector
    QuerySvc->>Chroma: semantic search
    Chroma-->>QuerySvc: ranked results
    opt hybrid mode
        QuerySvc->>BM25: keyword search
        BM25-->>QuerySvc: BM25 scores
        QuerySvc->>QuerySvc: merge & rank
    end
    QuerySvc-->>Agent: JSON-RPC response

Record Flow

mermaid
sequenceDiagram
    participant Agent
    participant FastAPI as POST /cstp
    participant Dispatcher
    participant DecSvc as decision_service
    participant Guard as guardrails_service
    participant Gemini as Gemini API
    participant Chroma as ChromaDB
    participant Disk as YAML File

    Agent->>FastAPI: JSON-RPC recordDecision
    FastAPI->>Dispatcher: dispatch
    Dispatcher->>DecSvc: record_decision()
    DecSvc->>Guard: check guardrails
    Guard-->>DecSvc: pass/fail
    DecSvc->>DecSvc: build YAML structure
    DecSvc->>Disk: write YAML file
    DecSvc->>Gemini: generate embedding
    Gemini-->>DecSvc: 768-dim vector
    DecSvc->>Chroma: index decision
    DecSvc-->>Agent: decision ID + path

MCP Data Flow

mermaid
sequenceDiagram
    participant Client as MCP Client
    participant Transport as Transport Layer
    participant MCPSrv as MCP Server (mcp_app)
    participant Services as CSTP Services
    participant Storage as ChromaDB + YAML

    alt Streamable HTTP
        Client->>Transport: POST /mcp (HTTP)
        Transport->>MCPSrv: StreamableHTTPSessionManager
    else stdio
        Client->>Transport: stdin/stdout
        Transport->>MCPSrv: stdio_server
    end

    MCPSrv->>MCPSrv: validate via Pydantic schemas
    MCPSrv->>Services: direct function call
    Services->>Storage: read/write
    Storage-->>Services: result
    Services-->>MCPSrv: response dict
    MCPSrv-->>Client: TextContent (JSON)

Guardrail Flow

mermaid
sequenceDiagram
    participant Agent
    participant FastAPI as POST /cstp
    participant Dispatcher
    participant GuardSvc as guardrails_service
    participant Engine as GuardrailEngine
    participant Audit as AuditLog

    Agent->>FastAPI: JSON-RPC checkGuardrails
    FastAPI->>Dispatcher: dispatch
    Dispatcher->>GuardSvc: evaluate_guardrails()
    GuardSvc->>Engine: load YAML (cached 5 min)
    Engine->>Engine: match conditions
    Engine->>Engine: evaluate requirements
    Engine-->>GuardSvc: violations + warnings
    GuardSvc->>Audit: log audit trail
    GuardSvc-->>Agent: allowed/blocked + details

Deliberation & Bridge Flow

mermaid
sequenceDiagram
    participant Agent
    participant FastAPI as POST /cstp
    participant Tracker as DeliberationTracker
    participant DecSvc as decision_service
    participant Extractor as BridgeExtractor
    participant Storage as ChromaDB + YAML

    Agent->>FastAPI: queryDecisions("...")
    FastAPI->>Tracker: track_query(agent_id, query)
    FastAPI-->>Agent: results

    Agent->>FastAPI: checkGuardrails("...")
    FastAPI->>Tracker: track_check(agent_id, action)
    FastAPI-->>Agent: allowed/blocked

    Agent->>FastAPI: recordDecision("...", bridge=None)
    FastAPI->>DecSvc: record_decision()
    
    DecSvc->>Tracker: auto_attach_deliberation(agent_id)
    Tracker-->>DecSvc: Deliberation object (inputs + steps)
    
    alt No explicit bridge
        DecSvc->>Extractor: auto_extract_bridge(decision, context)
        Extractor-->>DecSvc: BridgeDefinition (structure + function)
    end

    DecSvc->>Storage: write YAML (with deliberation + bridge)
    DecSvc->>Storage: index (embedding: structure + function)
    DecSvc-->>Agent: success

Deployment Architecture

mermaid
graph TB
    subgraph Docker["Docker Compose"]
        subgraph CSTP["cstp-server (Python 3.11)"]
            PORT_9991["Port: 9991"]
            EP_CSTP["/cstp — JSON-RPC 2.0"]
            EP_MCP["/mcp — MCP Streamable HTTP"]
            EP_HEALTH["/health"]
            VOL_CONFIG["config/ (ro)"]
            VOL_GUARD["guardrails/"]
            VOL_DEC["decisions/"]
        end

        subgraph CHROMA["chromadb (chromadb/chroma)"]
            PORT_8000["Port: 8000"]
            VOL_CHROMA["chroma_data"]
            HB["/api/v1/heartbeat"]
        end

        CSTP -->|HTTP| CHROMA
    end

    NET["Network: cstp-network (bridge)"]
    Docker --- NET

Docker Image: Multi-stage build (python:3.11-slim)

  • Builder stage: Installs uv and Python dependencies from pyproject.toml
  • Runtime stage: Copies installed packages, app code, creates non-root user
  • Security: Runs as appuser (non-root), read-only config mounts
  • MCP: Both /cstp (JSON-RPC) and /mcp (MCP Streamable HTTP) are served on port 9991

Security Model

LayerMechanism
TransportHTTPS (reverse proxy recommended for production)
AuthenticationBearer token per agent, validated with secrets.compare_digest
AuthorizationAgent ID embedded in each request; audit trail records who did what
MCP AuthMCP Streamable HTTP inherits FastAPI bearer token middleware
SecretsEnvironment variables or ${VAR} expansion in YAML config
ContainerNon-root user, minimal base image, no-cache pip installs
CORSConfigurable allowed origins (defaults to *)
CSRFDashboard uses Flask-WTF CSRF protection

Released under the Apache 2.0 License.