Middleware Pipeline¶
Overview¶
agentguard's middleware.py provides a composable guard pipeline — a sequence of middleware functions that can be combined, reordered, and extended. This is the underlying mechanism that powers @guard.
What Is Middleware?¶
Each guard (validator, rate limiter, circuit breaker, etc.) is a middleware function with the signature:
async def middleware(call: ToolCall, next: Callable) -> ToolResult:
# Before the call
...
result = await next(call)
# After the call
...
return result
Middleware wraps the next function in the chain. This is the classic "onion" pattern:
Request → [Validator] → [RateLimiter] → [CircuitBreaker] → [YourFunction] → ...
↑ ↑ ↑ ↑
pre-hooks pre-hooks pre-hooks execution
↓ ↓ ↓ ↓
Response ← [Validator] ← [RateLimiter] ← [CircuitBreaker] ← [YourFunction]
Built-in Middleware¶
agentguard ships these middleware components, applied in order:
CircuitBreakerMiddleware— reject calls when circuit is OPENRateLimiterMiddleware— enforce token bucket rate limitsBudgetMiddleware— check and update spend budgetsValidationMiddleware— validate inputs/outputsHallucinationMiddleware— detect hallucinated responsesRetryMiddleware— retry on failure with backoffTimeoutMiddleware— enforce per-call timeoutTraceMiddleware— record calls to trace filesHookMiddleware— callbefore_callandafter_callhooks
Using MiddlewarePipeline Directly¶
For advanced use cases, build a pipeline manually:
from agentguard.middleware import MiddlewarePipeline
from agentguard.core.types import GuardConfig
config = GuardConfig(
validate_input=True,
max_retries=2,
circuit_breaker=CircuitBreakerConfig(failure_threshold=5),
)
pipeline = MiddlewarePipeline(config=config)
result = await pipeline.execute(my_function, args=(), kwargs={"query": "test"})
Writing Custom Middleware¶
from agentguard.middleware import GuardMiddleware
from agentguard.core.types import ToolCall, ToolResult
from typing import Callable, Awaitable
class AuditMiddleware(GuardMiddleware):
"""Log every tool call to an audit trail."""
def __init__(self, audit_service):
self.audit = audit_service
async def __call__(
self,
call: ToolCall,
next: Callable[[ToolCall], Awaitable[ToolResult]],
) -> ToolResult:
# Record the attempt
audit_id = self.audit.start(call.tool_name, call.kwargs, call.session_id)
try:
result = await next(call)
self.audit.success(audit_id, result.execution_time_ms)
return result
except Exception as exc:
self.audit.failure(audit_id, str(exc))
raise
Composing Pipelines¶
from agentguard.middleware import MiddlewarePipeline
from agentguard.core.types import GuardConfig
config = GuardConfig(validate_input=True, max_retries=2)
# Add custom middleware
audit = AuditMiddleware(my_audit_service)
pipeline = MiddlewarePipeline(config=config, extra_middleware=[audit])
Per-Environment Pipelines¶
import os
def build_pipeline(config: GuardConfig) -> MiddlewarePipeline:
extra = []
if os.getenv("ENV") == "production":
extra.append(AuditMiddleware(audit_service))
extra.append(TelemetryMiddleware(otel_tracer))
return MiddlewarePipeline(config=config, extra_middleware=extra)
Async vs Sync¶
The pipeline is async-first. For sync functions, GuardedTool.__call__ runs the async pipeline in a thread: