Skip to content

Module Reference

This document provides a detailed breakdown of every module, class, and function in the Cognition Engines codebase.


1. Core Library — src/cognition_engines/

1.1 Accelerators — accelerators/

semantic_index.py

The heart of the decision memory system. Provides vector-based similarity search over decisions using ChromaDB and Google Gemini embeddings.

Module-Level Functions
FunctionDescription
load_gemini_key()Loads Gemini API key from secrets files if not present in environment
api_request(method, url, data)Makes HTTP requests to ChromaDB REST API
generate_embedding(text)Generates a 768-dimensional embedding vector via Gemini text-embedding-004
get_api_base()Returns the ChromaDB API base URL from environment
get_or_create_collection()Ensures the cognition_decisions collection exists in ChromaDB
decision_to_text(decision)Converts a decision dict into searchable text (title + category + context + reasons)
decision_id(decision)Generates a deterministic MD5 hash ID from decision title
get_index()Returns the singleton SemanticIndex instance
SemanticIndex Class
MethodDescription
__init__()Initializes with collection_id = None
ensure_collection()Lazily creates/gets the ChromaDB collection
index_decision(decision)Indexes a single decision: generates embedding, constructs metadata, upserts to ChromaDB
index_decisions(decisions)Batch indexes a list of decisions, returns count
query(context, n_results, category, min_confidence)Generates embedding for query text, performs filtered vector search, returns ranked results
count()Returns the number of indexed decisions in the collection

Metadata stored per decision:

  • title, category, confidence, stakes, date, status, outcome, project, feature, pr, reason_types

1.2 Guardrails — guardrails/

engine.py

The main guardrail evaluation engine. Loads YAML guardrail definitions and evaluates them against decision contexts.

Data Classes
ClassFieldsDescription
GuardrailConditionfield, operator, valueA condition that must match for a guardrail to apply (e.g., stakes == "high")
GuardrailRequirementfield, expectedA requirement that must be met (e.g., code_review_completed == true)
Guardrailid, description, conditions, requirements, scope, action, messageFull guardrail definition
GuardrailResultguardrail_id, passed, action, message, failed_requirementsResult of evaluating one guardrail
GuardrailEngine Class
MethodDescription
load_from_yaml(content)Parses YAML string and loads guardrail definitions. Returns count loaded
load_from_file(path)Loads guardrails from a single YAML file
load_from_directory(directory)Recursively loads all .yaml/.yml files from a directory
evaluate(context)Evaluates all guardrails against a context dict, returns list of GuardrailResult
check(context)Convenience method: returns (allowed: bool, results: list)
list_guardrails()Returns list of all loaded guardrail definitions as dicts

Condition operators: =, !=, <, >, <=, >=, in, not in

Module-Level Functions
FunctionDescription
parse_condition(field, value)Parses a condition from YAML format, detecting operator from string prefix
parse_guardrail(data)Parses a full guardrail from YAML dict (supports flat condition_* and nested formats)
get_engine()Returns the singleton GuardrailEngine instance
load_default_guardrails()Loads guardrails from guardrails/ directory

evaluators.py

Advanced condition evaluators for the v2 guardrail format. Provides pluggable evaluation strategies.

ClassTypeDescription
ConditionEvaluatorProtocolAbstract protocol for condition evaluators
FieldConditionEvaluatorSimple field comparison with extended operators
SemanticConditionEvaluatorChecks semantic similarity to past decisions matching criteria. Fields: query_field, threshold, filter_outcome, filter_since_days, min_matches
TemporalConditionEvaluatorTime-window check: "Was a similar decision made within N hours?"
AggregateConditionEvaluatorStatistical check across decision history: "Is category success rate below 50%?"
CompoundConditionEvaluatorAND/OR logical composition of multiple conditions

parse_condition_v2(condition_def) — Factory function that creates the appropriate evaluator from a YAML condition definition.

audit.py

Audit trail system for guardrail evaluations.

ClassDescription
GuardrailEvaluationRecord of a single guardrail check: guardrail_id, matched, passed, action, message
AuditRecordComplete audit for one decision: list of evaluations, overall_allowed, optional override with reason
AuditLogManager that creates records, saves to JSON files, queries violations, and computes aggregate stats

Output formats: JSON files (audit/YYYY-MM-DD-<decision_id>.json) and embeddable YAML blocks.


1.3 Patterns — patterns/

detector.py

Analyzes decision history for patterns, calibration accuracy, and anti-patterns.

Data Classes
ClassDescription
CalibrationBucketConfidence range bucket: computes predicted rate, actual success rate, and Brier score
CategoryStatsPer-category statistics: count, average confidence, success rate
AntiPatternDetected anti-pattern: type, description, severity, affected decisions
PatternDetector Class
MethodDescription
load_from_directory(directory)Recursively loads all YAML decision files
calibration_report()Generates Brier scores for 5 confidence buckets (0-0.2, 0.2-0.4, ..., 0.8-1.0)
category_analysis()Success rates and patterns per category, identifies concerning categories
detect_antipatterns()Detects: overcalibration, flip-flopping, anchoring, blind spots, hot-hand fallacy
full_report()Combines calibration + category + antipattern reports

2. A2A Layer — a2a/

2.1 server.py — FastAPI Application

FunctionDescription
lifespan(app)Async context manager: loads config, initializes AuthManager, creates CstpDispatcher, registers methods, initializes MCP StreamableHTTPSessionManager and runs it within the lifespan context
create_app(config)Factory: creates FastAPI app with CORS, lifespan, routes, and MCP mount at /mcp
_mount_mcp(app)Mounts the MCP Streamable HTTP handler as a raw ASGI app at /mcp; returns 503 if MCP SDK not installed
_register_routes(app)Registers /health, /.well-known/agent.json, and POST /cstp
run_server(host, port, config_path)Entry point: loads config, creates app, runs uvicorn

2.2 mcp_server.py — MCP Server

Exposes CSTP capabilities as MCP tools for native integration with MCP-compliant agents (Claude Desktop, Claude Code, OpenClaw, etc.).

ComponentDescription
mcp_appServer("cstp-decisions") instance — importable for mounting in ASGI apps
list_tools()Returns 5 Tool definitions with JSON Schema auto-generated from Pydantic models
call_tool(name, arguments)Dispatches tool calls to _handle_* functions; returns TextContent with JSON result
_handle_query_decisions()Validates input via QueryDecisionsInput, calls query_service.query_decisions()
_handle_check_action()Validates input via CheckActionInput, calls guardrails_service.evaluate_guardrails()
_handle_log_decision()Validates input via LogDecisionInput, calls decision_service.record_decision()
_handle_review_outcome()Validates input via ReviewOutcomeInput, calls decision_service.review_decision()
_handle_get_stats()Validates input via GetStatsInput, calls calibration_service.get_calibration()
run_stdio()Runs the MCP server with stdio transport (async with stdio_server())
main()Entry point: asyncio.run(run_stdio())

Transports:

  • stdiopython -m a2a.mcp_server (local or docker exec -i cstp python -m a2a.mcp_server)
  • Streamable HTTP — Mounted at /mcp on port 9991 via StreamableHTTPSessionManager in server.py lifespan

2.3 mcp_schemas.py — MCP Input Schemas

Pydantic models that define the JSON Schema MCP clients see during tool discovery. They map to existing CSTP dataclass models but use Pydantic for automatic schema generation required by the MCP protocol.

SchemaMCP ToolKey Fields
QueryDecisionsInputquery_decisionsquery (str), limit (1–50), retrieval_mode (semantic/keyword/hybrid), filters (QueryFiltersInput)
QueryFiltersInput(nested)category, stakes, project, has_outcome
CheckActionInputcheck_actiondescription (str), category, stakes (low/medium/high/critical), confidence (0.0–1.0)
LogDecisionInputlog_decisiondecision (str), confidence (float), category, stakes, context, reasons (ReasonInput[]), tags, project, feature, pr
ReasonInput(nested)type (authority/analogy/analysis/pattern/intuition), text (str)
ReviewOutcomeInputreview_outcomeid (str), outcome (success/partial/failure/abandoned), actual_result, lessons, notes
GetStatsInputget_statscategory, project, window (30d/60d/90d/all)

2.4 config.py — Configuration Management

ClassDescription
AuthTokenAgent + token pair
AuthConfigenabled flag + list of AuthToken; validate_token() with constant-time comparison
AgentConfigAgent identity: name, description, version, URL, contact
ServerConfigHTTP settings: host, port, CORS origins
ConfigComposite config with from_yaml(path), from_env(), _from_dict(data) class methods

Config loading priority: YAML file → Environment variables → Defaults

2.5 auth.py — Authentication

ComponentDescription
AuthManagerWraps Config, validates bearer tokens, returns agent ID
verify_bearer_token()FastAPI Depends function for route-level auth
set_auth_manager() / get_auth_manager()Global singleton management

2.6 models/ — Shared Models

FileClasses
jsonrpc.pyJsonRpcRequest, JsonRpcResponse, JsonRpcError, error codes (PARSE_ERROR, INVALID_REQUEST, METHOD_NOT_FOUND, INTERNAL_ERROR)
agent_card.pyAgentCard, AgentCapabilities for /.well-known/agent.json
health.pyHealthResponse with status, version, uptime, timestamp

3. CSTP Services — a2a/cstp/

3.1 dispatcher.py — Method Router

ComponentDescription
CstpDispatcherRegistry of method name → async handler; dispatches requests, catches errors, returns JSON-RPC responses
register_methods(dispatcher)Registers all 9 method handlers: queryDecisions, checkGuardrails, listGuardrails, recordDecision, reviewDecision, getCalibration, attributeOutcomes, checkDrift, reindex
Custom error codesQUERY_FAILED (-32003), RATE_LIMITED (-32002), GUARDRAIL_EVAL_FAILED (-32004), RECORD_FAILED (-32005), REVIEW_FAILED (-32006), DECISION_NOT_FOUND (-32007), ATTRIBUTION_FAILED (-32008)
ComponentDescription
QueryResultSingle result: id, title, category, confidence, distance
QueryResponseWrapper with results list, query, timing
query_decisions()Full query pipeline: embedding, ChromaDB search, metadata filtering, optional BM25 hybrid
load_all_decisions()Loads YAML files from disk for BM25 indexing

3.3 decision_service.py — Decision Recording & Review

ComponentDescription
BridgeDefinitionDataclass: structure, function, tolerance, enforcement, prevention
ReasonDecision reason with type, text, strength
PreDecisionProtocolTracks whether query was run and guardrails were checked before recording
ProjectContextProject, feature, PR, file, line, commit associations
ReasoningStepStep in a reasoning trace: step number, thought, output, confidence, tags
RecordDecisionRequestFull request with validation: decision text, confidence, category, stakes, reasons, review_in
RecordDecisionResponseSuccess indicator, generated ID, file path, index status
ReviewDecisionRequestReview request: decision ID, outcome, actual result, lessons
review_decision()Loads decision YAML, updates with outcome and review metadata, reindexes in ChromaDB
build_decision_yaml()Constructs the YAML dictionary structure
write_decision_file()Writes to decisions/YYYY/MM/YYYY-MM-DD-decision-<id>.yaml
generate_embedding(text)Gemini embedding for ChromaDB indexing
ensure_collection_exists()ChromaDB collection management

3.4 calibration_service.py — Confidence Calibration

ComponentDescription
GetCalibrationRequestFilters: agent, category, stakes, date range, window, project, feature
CalibrationResultOverall: Brier score, accuracy, calibration gap, interpretation
ConfidenceBucketPer-bucket: decisions, success rate, expected rate, gap, interpretation
ConfidenceStatsDistribution stats: mean, std_dev, min, max, bucket counts
CalibrationRecommendationActionable recommendation with type, message, severity
get_reviewed_decisions()Loads reviewed decisions matching filters from YAML files
calculate_calibration()Computes Brier score: mean((confidence - outcome)²)
calculate_buckets()Splits into 5 confidence buckets and computes per-bucket stats
calculate_confidence_stats()Habituation detection: identifies low-variance confidence patterns
generate_recommendations()Produces actionable advice based on calibration gaps

3.5 attribution_service.py — Outcome Attribution

ComponentDescription
AttributeOutcomesRequestProject, since date, stability days, dry_run flag
find_pending_decisions()Finds pending decisions for a project
is_pr_stable()Checks if PR is older than stability window (simplified)
update_decision_outcome()Atomic file update: marks as reviewed with outcome + reason
attribute_outcomes()Main pipeline: find pending → check stability → update files

3.6 drift_service.py — Calibration Drift Detection

ComponentDescription
CheckDriftRequestThresholds for Brier and accuracy, category/project filters
DriftAlertAlert: type (brier_degradation/accuracy_drop), recent vs. historical values, change %, severity
check_drift()Compares 30-day calibration against 90-day+ baseline
generate_drift_recommendations()Actionable recommendations based on drift alerts

3.7 guardrails_service.py — Guardrail Evaluation (CSTP)

ComponentDescription
evaluate_guardrails(context)Loads guardrails (cached 5 min), evaluates, returns violations/warnings
list_guardrails(scope)Lists active guardrails, optionally filtered by scope
log_guardrail_check()Structured audit logging

3.8 reindex_service.py — Collection Rebuild

ComponentDescription
reindex_decisions()Full pipeline: delete collection → create → load decisions → generate embeddings → batch insert
ReindexResultSuccess, count indexed, errors, duration
ComponentDescription
BM25IndexWraps rank-bm25 BM25Okapi algorithm
BM25Index.from_decisions()Builds index from decision dicts
BM25Index.search()Returns ranked (doc_id, score) pairs
tokenize(text)Simple word tokenization with lowercasing
merge_results()Weighted merge of semantic + keyword results (default 70/30)
get_cached_index()5-minute TTL cache with count-based invalidation

3.10 deliberation_tracker.py — Deliberation Traces

Tracks input/reasoning steps across API calls to build full deliberation traces.

ComponentDescription
TrackedInputDataclass: type (query/check), content, timestamp, source
TrackerSessionPer-agent session state with inputs list and start time
DeliberationTrackerSingleton manager for active deliberation sessions
track_query(agent_id, query)Hooks into queryDecisions to record search inputs
track_check(agent_id, action)Hooks into checkGuardrails to record constraint inputs
track_lookup(agent_id, id)Hooks into getDecision to record reference inputs
auto_attach_deliberation(agent_id)Returns and clears tracked inputs for recordDecision

3.11 bridge_extractor.py — Bridge Auto-Extraction

Extracts structure/function pairs from decision text when not explicitly provided.

ComponentDescription
auto_extract_bridge(text, context)Main entry point: heuristic extraction pipeline
_score_as_function(text)Scorer: how likely is text to be a function/purpose?
_score_as_structure(text)Scorer: how likely is text to be a structure/pattern?

3.12 bridge_hook.py — Shared Hook

ComponentDescription
maybe_auto_extract_bridge(req)Wraps extraction logic for use in decision_service

3.13 models.py — CSTP Data Models

ModelUsed By
QueryFilterscstp.queryDecisions — category, confidence, stakes, status, project, feature, PR, has_outcome
QueryDecisionsRequestcstp.queryDecisions — query text, bridge_side (structure/function), filters, limit
DecisionSummaryQuery results — id, title, category, confidence, distance, reasons
QueryDecisionsResponseWrapper with decisions, total, timing, retrieval mode, scores
ActionContextcstp.checkGuardrails — description, category, stakes, confidence, context dict
CheckGuardrailsRequestAction + agent info
GuardrailViolationBlock/warn result with id, name, message, severity, suggestion
CheckGuardrailsResponseallowed flag, violations, warnings, evaluated count

4. Dashboard — dashboard/

app.py — Flask Routes

RouteMethodDescription
/GETRedirects to /decisions
/healthGETHealth check (no auth)
/decisionsGETPaginated decision list with category/status filters
/decisions/<id>GETDecision detail view
/decisions/<id>/reviewGET/POSTReview form for recording outcomes
/calibrationGETCalibration dashboard with drift detection

cstp_client.py — CSTP Client

Async HTTP client for communicating with the CSTP server. Uses httpx for non-blocking requests.

MethodDescription
health_check()Check CSTP server health
list_decisions()Query decisions with pagination
get_decision(id)Get single decision by ID
review_decision()Submit decision review
get_calibration()Get calibration statistics
check_drift()Check for calibration drift

5. CLI — bin/cognition

CommandDescription
cognition index <dir>Index all YAML decisions from a directory
cognition query <context>Search for similar decisions
cognition check --stakes high --confidence 0.8Evaluate guardrails for a context
cognition guardrailsList all loaded guardrails
cognition countCount indexed decisions
cognition patterns calibrationConfidence calibration report (Brier scores)
cognition patterns categoriesCategory success analysis
cognition patterns antipatternsDetect decision anti-patterns
cognition patterns fullComplete pattern report (JSON)

6. Test Suite — tests/

Test FileCoverage
test_a2a_server.pyFastAPI app creation, health endpoint, agent card, CSTP dispatch
test_decision_service.pyDecision recording, YAML generation, validation
test_query_service.pySemantic query pipeline
test_guardrails.pyCore guardrail engine evaluation
test_guardrails_service.pyCSTP guardrail service
test_evaluators.pyv2 condition evaluators
test_audit.pyAudit trail creation and querying
test_patterns.pyPattern detection and calibration
test_calibration_service.pyCalibration computation and recommendations
test_attribution_service.pyOutcome attribution pipeline
test_semantic_index.pyChromaDB semantic index
test_config_env.pyConfiguration loading
test_f002_query_decisions.pyF002 feature: query decisions
test_f003_check_guardrails.pyF003 feature: check guardrails
test_f007_record_decision.pyF007 feature: record decision
test_f008_review_decision.pyF008 feature: review decision
test_f009_get_calibration.pyF009 feature: get calibration

Released under the Apache 2.0 License.