Monitoring and Observability Guide
Phlow provides comprehensive monitoring and observability features to help you understand your agent's performance, security posture, and operational health.
Overview
The monitoring system includes: - Structured Logging - JSON-formatted logs with request tracing - Metrics Collection - Prometheus-compatible metrics for monitoring - Request Tracing - Track requests across distributed systems - Security Events - Authentication and authorization monitoring - Performance Metrics - Response times and throughput tracking
Quick Setup
Install Monitoring Dependencies
# Install monitoring features
pip install "phlow[monitoring]"
# Or install specific dependencies
pip install structlog prometheus-client
Basic Configuration
from phlow.monitoring import configure_logging, configure_metrics
# Configure structured logging
logger = configure_logging(
log_level="INFO",
output_format="json", # or "console" for development
enable_metrics=True,
enable_tracing=True
)
# Configure metrics collection
metrics = configure_metrics(enable_prometheus=True)
Structured Logging
Configuration
from phlow.monitoring import configure_logging
# Development configuration (human-readable)
logger = configure_logging(
log_level="DEBUG",
output_format="console",
enable_metrics=True,
enable_tracing=True
)
# Production configuration (JSON output)
logger = configure_logging(
log_level="INFO",
output_format="json",
enable_metrics=True,
enable_tracing=True
)
Log Output Examples
Console Format (Development):
2025-01-15 10:30:45 [INFO ] Authentication succeeded [phlow] agent_id=agent-123 request_id=abc-def-123 success=True
JSON Format (Production):
{
"timestamp": 1705320645.123,
"level": "info",
"logger": "phlow",
"message": "Authentication succeeded",
"request_id": "abc-def-123",
"agent_id": "agent-123",
"event_type": "authentication",
"success": true,
"token_hash": "sha256:abc123..."
}
Manual Logging
from phlow.monitoring import get_logger
logger = get_logger()
# Set request context
logger.set_request_context(
req_id="request-123",
ag_id="agent-456"
)
# Log structured events
logger.log_authentication_event(
agent_id="agent-123",
success=True,
token_hash="abc123"
)
logger.log_rate_limit_event(
identifier="192.168.1.1",
limit_type="api",
exceeded=False,
current_count=45,
limit=100
)
Metrics Collection
Prometheus Integration
from phlow.monitoring import configure_metrics
# Enable Prometheus metrics
metrics = configure_metrics(enable_prometheus=True)
# Metrics are automatically collected by Phlow middleware
# Access metrics endpoint in your FastAPI app:
from fastapi import FastAPI
from phlow.monitoring import get_metrics_collector
app = FastAPI()
@app.get("/metrics")
async def metrics():
collector = get_metrics_collector()
return Response(
content=collector.get_metrics_text(),
media_type="text/plain"
)
Available Metrics
Metric | Type | Description | Labels |
---|---|---|---|
phlow_auth_attempts_total |
Counter | Authentication attempts | agent_id , success |
phlow_auth_duration_seconds |
Histogram | Authentication latency | agent_id |
phlow_rate_limit_checks_total |
Counter | Rate limit checks | limit_type , exceeded |
phlow_did_resolutions_total |
Counter | DID resolutions | cached , success |
phlow_external_api_calls_total |
Counter | External API calls | service , status_code |
phlow_database_operations_total |
Counter | Database operations | operation , table , success |
Manual Metrics
from phlow.monitoring import get_metrics_collector, MetricsTimer
collector = get_metrics_collector()
# Record custom metrics
collector.record_auth_attempt("agent-123", True, 0.045)
collector.record_rate_limit_check("api", False)
# Use timing context manager
with MetricsTimer(collector, "external_api", service="supabase"):
# Make API call
response = await supabase_client.query()
FastAPI Integration
Add Logging Middleware
from fastapi import FastAPI
from phlow.monitoring import LoggingMiddleware, get_logger
app = FastAPI()
# Add logging middleware
logger = get_logger()
app.middleware("http")(LoggingMiddleware(logger))
@app.post("/api/agent")
async def agent_endpoint():
# Request context is automatically set
logger.info("Processing agent request")
return {"status": "success"}
Complete Integration Example
from fastapi import FastAPI, Depends, Response
from phlow import PhlowMiddleware, PhlowConfig
from phlow.monitoring import (
configure_logging,
configure_metrics,
LoggingMiddleware,
get_metrics_collector
)
from phlow.integrations.fastapi import create_phlow_dependency
# Configure monitoring
configure_logging(output_format="json", log_level="INFO")
configure_metrics(enable_prometheus=True)
app = FastAPI()
# Add logging middleware
app.middleware("http")(LoggingMiddleware(get_logger()))
# Configure Phlow
config = PhlowConfig(...) # Your config
middleware = PhlowMiddleware(config)
auth_required = create_phlow_dependency(middleware)
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
collector = get_metrics_collector()
return Response(
content=collector.get_metrics_text(),
media_type="text/plain"
)
@app.get("/health")
async def health():
"""Health check endpoint."""
collector = get_metrics_collector()
metrics_data = collector.get_metrics_dict()
return {
"status": "healthy",
"metrics_summary": {
"auth_attempts": sum(metrics_data["counters"].values()),
"active_connections": metrics_data["gauges"].get("active_connections", 0)
}
}
@app.post("/api/secure")
async def secure_endpoint(context = Depends(auth_required)):
# All authentication events are automatically logged
return {"message": "Success", "agent": context.agent.name}
Log Analysis
Searching Logs
Find authentication failures:
# Using jq for JSON logs
cat app.log | jq 'select(.event_type == "authentication" and .success == false)'
# Using grep for console logs
grep "Authentication failed" app.log
Track specific agent:
cat app.log | jq 'select(.agent_id == "agent-123")'
Rate limit violations:
cat app.log | jq 'select(.event_type == "rate_limit" and .exceeded == true)'
Log Aggregation
ELK Stack Configuration:
# logstash.conf
input {
file {
path => "/app/logs/phlow.log"
codec => json
}
}
filter {
if [event_type] {
mutate {
add_tag => ["phlow", "%{event_type}"]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "phlow-logs-%{+YYYY.MM.dd}"
}
}
Prometheus Monitoring
Grafana Dashboard
Example queries for Grafana:
Authentication Rate:
rate(phlow_auth_attempts_total[5m])
Authentication Success Rate:
rate(phlow_auth_attempts_total{success="true"}[5m]) /
rate(phlow_auth_attempts_total[5m])
P95 Authentication Latency:
histogram_quantile(0.95, rate(phlow_auth_duration_seconds_bucket[5m]))
Rate Limit Violations:
rate(phlow_rate_limit_checks_total{exceeded="true"}[5m])
Alerting Rules
# prometheus.rules.yml
groups:
- name: phlow
rules:
- alert: HighAuthFailureRate
expr: rate(phlow_auth_attempts_total{success="false"}[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "High authentication failure rate"
description: "Authentication failure rate is {{ $value }} per second"
- alert: RateLimitViolations
expr: rate(phlow_rate_limit_checks_total{exceeded="true"}[5m]) > 0.05
for: 1m
labels:
severity: warning
annotations:
summary: "Rate limit violations detected"
- alert: HighAuthLatency
expr: histogram_quantile(0.95, rate(phlow_auth_duration_seconds_bucket[5m])) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "High authentication latency"
description: "P95 auth latency is {{ $value }}s"
Distributed Tracing
Request Correlation
Phlow automatically generates request IDs for distributed tracing:
from phlow.monitoring import get_logger
logger = get_logger()
# Request ID is automatically generated and included in all logs
# You can also set custom request context:
logger.set_request_context(
req_id="custom-request-id",
ag_id="agent-123"
)
Integration with External Systems
Forward request ID to external services:
import httpx
from phlow.monitoring import request_id
async def call_external_service():
req_id = request_id.get()
headers = {"X-Request-ID": req_id} if req_id else {}
async with httpx.AsyncClient() as client:
response = await client.get(
"https://external-service.com/api",
headers=headers
)
return response
Performance Monitoring
Response Time Tracking
from phlow.monitoring import MetricsTimer, get_metrics_collector
collector = get_metrics_collector()
# Track operation timing
with MetricsTimer(collector, "did_resolution", cached=False):
did_document = await resolve_did(did)
# Manual timing
import time
start = time.time()
result = await operation()
duration = time.time() - start
collector.record_external_api_call("service", 200, duration)
Memory and Resource Monitoring
import psutil
from phlow.monitoring import get_metrics_collector
collector = get_metrics_collector()
# Monitor resource usage
def collect_system_metrics():
process = psutil.Process()
# Set gauges for current resource usage
collector.set_active_connections(len(process.connections()))
# You can extend this with custom gauges
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
# collector.set_memory_usage(memory_usage)
Troubleshooting
Common Issues
Logs not structured:
- Ensure output_format="json"
in configuration
- Check that structlog is installed
Metrics not appearing:
- Verify enable_prometheus=True
in metrics configuration
- Install prometheus-client: pip install prometheus-client
Request IDs missing: - Add LoggingMiddleware to your FastAPI app - Ensure context is set manually if not using middleware
Debug Logging
Enable debug logging to see internal Phlow operations:
from phlow.monitoring import configure_logging
# Enable debug logging
configure_logging(log_level="DEBUG", output_format="console")
Best Practices
- Use JSON format in production for better log parsing
- Set up log rotation to manage disk space
- Monitor authentication patterns for security insights
- Set up alerting for rate limit violations and auth failures
- Use request tracing to correlate events across services
- Monitor performance metrics to identify bottlenecks
- Regularly review logs for security anomalies