Async Processing with Queue Backends¶
ModalKit supports asynchronous processing for long-running inference tasks with flexible queue backend integration. This guide demonstrates how to implement async processing with custom queue systems like TaskIQ.
Overview¶
Async processing in ModalKit involves:
- Optional Queue Backends: Use any queue system or none at all
- Dependency Injection: Inject custom queue backends like TaskIQ
- Flexible Configuration: Configure via YAML or code
- TaskIQ Native Support: Proper task definitions and workers
Queue Backend Options¶
1. No Queues (Default)¶
Perfect for sync-only APIs or when you don't need async processing:
from modalkit.modal_service import ModalService
class MyService(ModalService):
inference_implementation = MyInferencePipeline
# No queue backend - async requests are processed but responses aren't queued
service = MyService()
2. Configuration-Based Queues¶
Uses settings from modalkit.yaml
:
3. TaskIQ Integration (Recommended)¶
For production async processing with TaskIQ:
from taskiq_redis import AsyncRedisTaskiqBroker
class TaskIQBackend:
def __init__(self, broker_url="redis://localhost:6379"):
self.broker = AsyncRedisTaskiqBroker(broker_url)
async def send_message(self, queue_name: str, message: str) -> bool:
@self.broker.task(task_name=f"process_{queue_name}")
async def process_ml_result(msg: str) -> str:
# Your custom processing logic
import json
result = json.loads(msg)
# Process the ML inference result
# - Send notifications
# - Update databases
# - Trigger downstream systems
return f"Processed {result['status']} result"
await process_ml_result.kiq(message)
return True
# Inject TaskIQ backend
taskiq_backend = TaskIQBackend("redis://localhost:6379")
service = MyService(queue_backend=taskiq_backend)
4. Custom Queue Implementation¶
Implement any queue system:
class MyCustomQueueBackend:
async def send_message(self, queue_name: str, message: str) -> bool:
# Send to your custom queue system (RabbitMQ, Kafka, etc.)
await my_queue_system.send(queue_name, message)
return True
service = MyService(queue_backend=MyCustomQueueBackend())
Complete Example¶
1. Define Models¶
from modalkit.iomodel import AsyncInputModel, InferenceOutputModel
from pydantic import BaseModel
class SentimentRequest(BaseModel):
text: str
class SentimentResponse(InferenceOutputModel):
sentiment: str
confidence: float
2. Implement Inference Pipeline¶
from modalkit.inference_pipeline import InferencePipeline
from typing import List
class SentimentPipeline(InferencePipeline):
def preprocess(self, input_list: List[BaseModel]) -> dict:
texts = [getattr(req, 'text', str(req)) for req in input_list]
return {"texts": texts}
def predict(self, input_list: List[BaseModel], preprocessed_data: dict) -> dict:
predictions = []
for text in preprocessed_data["texts"]:
if "good" in text.lower():
predictions.append({"sentiment": "positive", "confidence": 0.95})
elif "bad" in text.lower():
predictions.append({"sentiment": "negative", "confidence": 0.90})
else:
predictions.append({"sentiment": "neutral", "confidence": 0.70})
return {"predictions": predictions}
def postprocess(self, input_list: List[BaseModel], raw_output: dict) -> List[InferenceOutputModel]:
results = []
for prediction in raw_output["predictions"]:
results.append(SentimentResponse(
status="success",
sentiment=prediction["sentiment"],
confidence=prediction["confidence"]
))
return results
3. Configure Modal Service¶
import modal
from modalkit.modal_service import ModalService, create_web_endpoints
from modalkit.modal_config import ModalConfig
class SentimentService(ModalService):
inference_implementation = SentimentPipeline
def __init__(self, queue_backend=None):
super().__init__(queue_backend=queue_backend)
self.model_name = "sentiment-analyzer"
# Setup with TaskIQ (optional)
if USE_TASKIQ:
taskiq_backend = TaskIQBackend("redis://localhost:6379")
app_cls = lambda: SentimentService(queue_backend=taskiq_backend)
else:
app_cls = SentimentService
# Modal configuration
app = modal.App(name="sentiment-service")
@app.cls(**modal_config.get_app_cls_settings())
class SentimentApp(app_cls):
modal_utils: ModalConfig = modal_config
@app.function(**modal_config.get_handler_settings())
@modal.asgi_app()
def web_endpoints():
return create_web_endpoints(
app_cls=SentimentApp,
input_model=SentimentRequest,
output_model=SentimentResponse
)
Usage Examples¶
Basic Async Request¶
import requests
# Submit async job with queue processing
response = requests.post(
"http://localhost:8000/async/predict",
json={
"message": {"text": "This movie is great!"},
"success_queue": "sentiment_results",
"failure_queue": "sentiment_errors",
"meta": {"user_id": "12345", "request_id": "req_001"}
},
headers={"Authorization": "Bearer your-token"}
)
job_id = response.json()["job_id"]
print(f"Job submitted: {job_id}")
TaskIQ Worker Setup¶
For production TaskIQ usage, run workers separately:
Working Examples¶
See complete working examples in the documentation:
- Queue Backend Patterns - Basic queue backend patterns and dependency injection
- TaskIQ Integration - Full TaskIQ integration tutorial
Follow the tutorials to build complete working examples with your own ML models.
Error Handling¶
Queue Send Failures¶
Queue failures don't break inference processing:
# If queue send fails, inference still completes
# Errors are logged but don't affect the response
async def send_async_response(self, ...):
try:
success = await self._send_to_queue(queue_name, message)
if not success:
logger.warning(f"Failed to send to queue: {queue_name}")
except Exception as e:
logger.error(f"Queue error: {e}")
# Processing continues normally
TaskIQ Task Failures¶
Handle task failures in your TaskIQ workers:
@broker.task
async def process_ml_result(message: str) -> str:
try:
result = json.loads(message)
# Process result
return "success"
except Exception as e:
# Log error and optionally retry
logger.error(f"Task failed: {e}")
raise # TaskIQ will handle retries based on configuration
Best Practices¶
1. Queue Backend Selection¶
- No Queues: Simple sync APIs, testing
- TaskIQ: Production async processing with workers
- Configuration-based: When you need SQS/existing infrastructure
- Custom: Specialized queue systems (RabbitMQ, Kafka, etc.)
2. Error Resilience¶
# Always handle queue failures gracefully
async def _send_to_queue(self, queue_name: str, message: str) -> bool:
if self.queue_backend:
try:
return await self.queue_backend.send_message(queue_name, message)
except Exception as e:
logger.error(f"Queue backend error: {e}")
return False
return False # No backend configured
3. TaskIQ Production Setup¶
# Use proper brokers for production
from taskiq_redis import AsyncRedisTaskiqBroker
# Production Redis setup
broker = AsyncRedisTaskiqBroker(
url="redis://redis-cluster:6379",
max_connections=20,
retry_policy=ExponentialBackoff(max_retries=3)
)
4. Message Structure¶
Ensure your queue messages contain all necessary metadata:
{
"status": "success",
"result": {...},
"meta": {
"user_id": "12345",
"request_id": "req_001",
"timestamp": "2024-01-15T10:30:00Z"
}
}
Configuration¶
Configure queue settings in modalkit.yaml
:
app_settings:
# Optional: fallback queue configuration
queue_config:
backend: "memory" # or "sqs"
# Optional: async processing settings
async_config:
timeout: 3600
result_ttl: 86400
Advanced Features¶
Multiple Queue Backends¶
You can even implement routing to different backends:
class MultiQueueBackend:
def __init__(self):
self.taskiq_backend = TaskIQBackend()
self.sqs_backend = SQSBackend()
async def send_message(self, queue_name: str, message: str) -> bool:
if queue_name.startswith("ml_"):
return await self.taskiq_backend.send_message(queue_name, message)
else:
return await self.sqs_backend.send_message(queue_name, message)
This design gives you complete flexibility to implement whatever async processing pattern works best for your use case!