π§ Development Guide#
Advanced reference for AI Cortex contributors and maintainers. This guide covers the internal architecture, testing strategy, CI/CD pipeline, and everything else you need to understand the codebase deeply and contribute effectively.
π Table of Contents#
ποΈ Architecture Overview#
Package Structure#
aicortex/
βββ __init__.py # Public API: chat(), families(), models(), etc.
βββ api.py # Internal _OllamaAPI wrapper class
βββ chat.py # chat() function, Stream, StreamEvent
βββ models/ # Model metadata β one JSON file per family
β βββ llama.json
β βββ mistral.json
β βββ deepseek.json
β βββ qwen.json
β βββ gemma.json
βββ tools/ # Administrative utilities
β βββ __init__.py
β βββ check_models.py # Step 1: validate server endpoints
β βββ fetch_models.py # Step 2: fetch current model lists
β βββ resolve_models.py # Step 3: merge and deduplicate metadata
β βββ apply_valid_models.py # Step 4: write updated JSON files
β βββ server.py # FastAPI OpenAI-compatible server
βββ stubs/ # Type stubs for IDE autocomplete
βββ __init__.pyi
βββ chat.pyi
βββ tools.pyi
βββ tools/
βββ __init__.pyi
βββ server.pyi
βββ check_models.pyi
βββ fetch_models.pyi
βββ resolve_models.pyi
βββ apply_valid_models.pyi
Layer Responsibilities#
Layer |
Module |
Responsibility |
|---|---|---|
Public API |
|
Clean, stable surface β thin wrappers only |
Internal API |
|
All Ollama interactions; |
Chat Interface |
|
|
Model Metadata |
|
Offline-available model info and server lists |
Tools |
|
Administrative pipeline for keeping metadata current |
Server |
|
FastAPI proxy that exposes an OpenAI-compatible REST API |
Type Stubs |
|
IDE support β mirrors the public surface in |
Design Principles#
Single responsibility β each module and class does one thing well
Layered access β public callers never touch
_OllamaAPIdirectlyFail gracefully β server errors cascade to failover, not crashes
Type safety everywhere β
mypy --strictmust pass with zero errorsNo state in modules β all state lives in instances or is passed explicitly
π Internal API Design#
_OllamaAPI Class#
api.py contains the single internal class that owns all Ollama communication. Public functions in __init__.py create instances of this class and delegate to it; they never call ollama directly.
class _OllamaAPI:
"""Internal wrapper around the Ollama Python client.
Not part of the public API β subject to change without notice.
All public functions should go through this class for Ollama access.
"""
def __init__(self, base_url: str = "http://localhost:11434") -> None: ...
# Chat
def _chat(self, model: str, prompt: str, **kwargs: Any) -> dict[str, Any]: ...
def _stream_chat(self, model: str, prompt: str, **kwargs: Any) -> Iterator[dict]: ...
# Model discovery
def list_families(self) -> list[str]: ...
def list_models(self, family: str | None = None) -> list[str]: ...
def get_model_info(self, model: str) -> dict[str, Any]: ...
# Server discovery
def list_model_servers(self, model: str) -> list[dict[str, Any]]: ...
def get_server_info(self, model: str, server_url: str | None = None) -> dict[str, Any]: ...
def build_api_request(self, model: str, prompt: str, **kwargs: Any) -> dict[str, Any]: ...
def get_llm_params(self, model: str) -> dict[str, Any]: ...
def get_random_llm_params(self, model: str) -> dict[str, Any]: ...
Server Selection Strategy#
When a function needs to talk to an Ollama server, _OllamaAPI follows this selection order:
Explicit URL β if the caller passes
server_url, use it directlyMetadata servers β try servers listed in the modelβs JSON entry, in order
Default localhost β fall back to
http://localhost:11434if all else fails
Each candidate is health-checked before use. A server is considered healthy if it responds to the model list endpoint within the configured timeout. Failed servers are skipped with a warning log; they do not raise exceptions unless all candidates are exhausted.
π‘ Streaming Architecture#
Event System#
Streaming is modeled as a sequence of typed events rather than a raw byte stream. This makes it easy to filter, transform, and compose stream consumers.
from dataclasses import dataclass, field
from enum import Enum
from typing import Iterator
class EventType(str, Enum):
START = "start" # Generation has begun
TOKEN = "token" # One text chunk has arrived
END = "end" # Generation completed successfully
ERROR = "error" # An error occurred during generation
@dataclass
class StreamEvent:
type: EventType
content: str | None = None # Token text (only on TOKEN events)
index: int | None = None # Token position in the sequence
model: str | None = None # Which model generated this event
done: bool = False # True on the final event
@dataclass
class Stream:
"""Iterable container for a streamed model response."""
events: list[StreamEvent] = field(default_factory=list)
def __iter__(self) -> Iterator[StreamEvent]: ...
def add(self, event: StreamEvent) -> None: ...
def text(self) -> str:
"""Concatenate all TOKEN event content into a single string."""
...
Event Flow Diagram#
chat("...", stream=True)
β
βΌ
_OllamaAPI._stream_chat()
β
β yields raw Ollama dicts
βΌ
_build_stream_events() β converts raw β StreamEvent
β
β yields StreamEvents:
β StreamEvent(type=START)
β StreamEvent(type=TOKEN, content="Hello")
β StreamEvent(type=TOKEN, content=" world")
β ...
β StreamEvent(type=END, done=True)
βΌ
Stream object returned to caller
Consuming a Stream#
# Option 1: iterate events
stream = chat("Tell me a story", stream=True)
for event in stream:
if event.type == EventType.TOKEN:
print(event.content, end="", flush=True)
# Option 2: collect full text after completion
text = stream.text()
π¦ Model Management#
JSON Metadata Schema#
Each model family has a JSON file in aicortex/models/. The schema:
{
"family": "llama",
"models": [
{
"name": "llama3.2:3b",
"family": "llama",
"size": "2.0 GB",
"parameters": "3B",
"quantization": "Q4_K_M",
"context_length": 131072,
"description": "Compact, fast Llama 3.2 variant for everyday tasks.",
"tags": ["chat", "fast", "lightweight"],
"servers": [
{
"url": "http://localhost:11434",
"status": "unknown"
}
]
}
]
}
Model Loading#
Model metadata is loaded from the bundled JSON files at import time. The files are included in the wheel via package_data in setup.py, so they are always available β no network required to get model info.
The status field in each server entry is not authoritative at load time; it reflects the last-known state from when the tools pipeline was run. Live health checks are done lazily at call time.
Keeping Metadata Current#
The four-tool pipeline in aicortex/tools/ is the mechanism for refreshing model metadata:
check_models β fetch_models β resolve_models β apply_valid_models
Run it periodically (e.g., as a cron job or pre-release step) to keep the bundled JSON files accurate. See docs/tools.md for the full pipeline reference.
π¨ Tool System#
Tool Categories#
Tool |
Module |
Purpose |
|---|---|---|
check_models |
|
Validate that server URLs are reachable and serving models |
fetch_models |
|
Fetch the current model list from each live server |
resolve_models |
|
Merge fetched data with existing metadata; deduplicate |
apply_valid_models |
|
Write the resolved data back to |
server |
|
Run the FastAPI OpenAI-compatible proxy |
Tool Design Constraints#
CLI-runnable β every tool exposes a
main()function and a__main__guard so it can be invoked directly:python -m aicortex.tools.check_modelsComposable β each toolβs output is suitable input for the next step; they can be chained in shell pipelines or called programmatically
Error-resilient β network failures for individual servers are logged and skipped; they do not abort the whole pipeline
Concurrent β
check_modelsandfetch_modelsuseasyncio/ThreadPoolExecutorto probe multiple servers in parallel
π· Type System#
Stub Files#
Every public symbol has a corresponding .pyi stub. Stubs live in aicortex/stubs/ and are included in the wheel so IDEs get autocomplete without needing to read the implementation.
The stub for chat() uses @overload to express the conditional return type:
# aicortex/stubs/chat.pyi
from typing import overload
from .models import Stream
@overload
def chat(prompt: str, *, stream: Literal[False] = ..., **kwargs: Any) -> str: ...
@overload
def chat(prompt: str, *, stream: Literal[True], **kwargs: Any) -> Stream: ...
def chat(prompt: str, *, stream: bool = False, **kwargs: Any) -> str | Stream: ...
Type Checking#
mypy aicortexmust exit 0 with strict mode enabled--no-implicit-optionaland--disallow-untyped-defsare both onAll
Anyuses must be justified with a# type: ignore[...]comment
Run:
mypy aicortex --strict
π§ͺ Testing Strategy#
Test Structure#
tests/
βββ __init__.py
βββ conftest.py # Fixtures: mock_ollama_client, sample_model_data
βββ test_chat.py # chat(), Stream, StreamEvent behavior
βββ test_api.py # _OllamaAPI methods and error paths
βββ test_models.py # JSON loading, model lookup, family listing
βββ test_tools.py # check β fetch β resolve β apply pipeline
βββ test_server.py # FastAPI endpoints, request/response shapes
βββ fixtures/
βββ mock_responses.json # Canned Ollama API responses
βββ test_models.json # Minimal model JSON for unit tests
Test Categories#
Unit tests β test one function or method in isolation, all external I/O mocked:
def test_get_model_info_returns_correct_family(mock_ollama_client):
info = get_model_info("llama3.2:3b")
assert info["family"] == "llama"
Integration tests β test multi-step workflows, still mocked at the network boundary:
def test_tool_pipeline_produces_valid_json(mock_server_responses):
check_models.run()
fetch_models.run()
resolve_models.run()
apply_valid_models.run()
data = json.loads(Path("aicortex/models/llama.json").read_text())
assert "models" in data
Server tests β test FastAPI endpoints using httpx.AsyncClient with the app mounted in-process (no real network):
@pytest.mark.asyncio
async def test_chat_completions_endpoint():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post("/v1/chat/completions", json={
"model": "llama3.2:3b",
"messages": [{"role": "user", "content": "Hello"}]
})
assert response.status_code == 200
assert "choices" in response.json()
Tox Environments#
tox.ini defines these environments:
Environment |
Command |
Purpose |
|---|---|---|
|
|
Full suite on each Python version |
|
|
Build and check docs |
|
|
Verify the package builds cleanly |
Run all environments:
tox
Run a single environment:
tox -e py311
tox -e docs
β‘ Performance Optimization#
Caching#
Model metadata β JSON files are read once at import time and held in module-level dicts; subsequent calls hit the in-memory cache
Server health β health check results are cached for a configurable TTL (default: 60 seconds) to avoid re-checking on every call
Client instances β
ollama.Clientinstances are reused per base URL rather than created per call
Concurrency#
check_modelsandfetch_modelsuseasyncio.gather()to probe all servers concurrently β O(1) wall time regardless of server countapply_valid_modelswrites each family JSON file atomically (write to temp, then rename) to prevent partial writesStreaming events are yielded lazily β no buffering of the full response before returning to the caller
Memory#
Model JSON files are loaded into
dictobjects, notdataclassinstances, to minimize overhead for large model listsStreaming yields one event at a time; the full token sequence is only materialized if the caller calls
.text()
β Error Handling#
Exception Hierarchy#
class AICortexError(Exception):
"""Base exception for all AI Cortex errors."""
class ModelNotFoundError(AICortexError):
"""Raised when the requested model is not available on any server."""
class ServerError(AICortexError):
"""Raised when all configured servers are unreachable."""
class ValidationError(AICortexError):
"""Raised when input parameters fail validation."""
class StreamError(AICortexError):
"""Raised when an error occurs during streaming."""
Recovery Strategies#
Failure |
Strategy |
|---|---|
One server unreachable |
Log warning, try next server in list |
All servers unreachable |
Raise |
Model not in metadata |
Raise |
Stream interrupted mid-response |
Emit |
Malformed model JSON |
Log error, skip that family; do not crash the import |
π¦ Build and Distribution#
Building#
# Install build tools
pip install build twine
# Build source distribution and wheel
python -m build
# Verify the built package
twine check dist/*
This produces:
dist/aicortex_core-1.0.3.tar.gzβ source distributiondist/aicortex_core-1.0.3-py3-none-any.whlβ universal wheel
Whatβs in the Wheel#
All
aicortex/Python source filesaicortex/models/*.jsonβ bundled model metadataaicortex/stubs/**/*.pyiβ type stubs for IDE supportREADME.md,LICENSE
Versioning#
AI Cortex follows Semantic Versioning:
MAJOR β breaking changes to the public API
MINOR β new features, backward-compatible
PATCH β bug fixes, backward-compatible
The version is defined in setup.py and should be updated before every release.
π CI/CD Pipeline#
The GitHub Actions workflow (.github/workflows/ci.yml) runs on every push and pull request to main and develop.
Jobs#
test β matrix over Python 3.8, 3.9, 3.10, 3.11, 3.12:
1. Checkout code
2. Install: pip install -e .[dev,server]
3. Lint: flake8 aicortex tests
4. Format check: black --check aicortex tests
5. Type check: mypy aicortex
6. Test: pytest --cov=aicortex --cov-report=xml
7. Upload coverage to Codecov
build β runs after test passes:
1. Build package: python -m build
2. Store wheel and sdist as workflow artifacts
release β runs on push to main only, after test and build:
1. Build package
2. Publish to PyPI via pypa/gh-action-pypi-publish
(requires PYPI_API_TOKEN secret in repo settings)
Release Process#
Update version in
setup.pyUpdate
CHANGELOG.mdwith release notesCommit:
git commit -m "chore: release v1.1.0"Tag:
git tag v1.1.0 && git push --tagsMerge to
mainβ CI publishes to PyPI automatically
π Security Considerations#
Input Validation#
All model identifiers are validated against the known model list before use
Server URLs are validated as valid HTTP/HTTPS URLs before connection attempts
Prompt strings are passed through as-is to Ollama β sanitization is the callerβs responsibility
Network#
HTTPS is supported and recommended for remote servers
All HTTP requests have an explicit timeout (default: 30 seconds)
No credentials or tokens are logged, even at debug level
Dependencies#
Core dependencies are minimal:
ollamaandpydanticonlyServer extras (
fastapi,uvicorn) are optionalDependencies are pinned with minimum versions in
setup.py; no upper bounds to avoid false conflicts
Known Limitations#
The server mode has no built-in authentication β do not expose it on a public network without a reverse proxy that adds auth
Model outputs are not filtered β responsible for downstream content handling lies with the application
HTTP (not HTTPS) is the default for localhost Ollama connections β this is intentional for zero-config local use
π Future Enhancements#
Planned#
Feature |
Description |
Priority |
|---|---|---|
Async API |
Full |
High |
Plugin system |
Extensible tool architecture for third-party additions |
Medium |
Metrics export |
Prometheus-compatible metrics endpoint on the server |
Medium |
Configuration files |
YAML/TOML config for server URLs, defaults, timeouts |
Medium |
Caching layer |
Optional Redis backend for response caching |
Low |
Multi-modal |
Image and audio input support (pending Ollama support) |
Low |
Architecture Notes for Future Contributors#
The
_OllamaAPIclass is intentionally not async to keep the public API simple. When async support is added, it should be a parallel_AsyncOllamaAPIclass, not a modification of the existing one.The model metadata JSON format is considered stable. New fields may be added; existing fields must not be removed without a major version bump.
The tool pipeline is designed to be run by maintainers, not end users. If a use case arises for user-facing model management, it should be a new public API function, not a thin wrapper around the tools.