TaskIQ Queue Integration¶
This tutorial shows how to integrate TaskIQ with ModalKit for production-ready async ML processing. TaskIQ is the recommended queue system for ModalKit due to its simplicity and powerful features.
Why TaskIQ?¶
TaskIQ is a modern async task queue for Python that provides: - Native async support: Built for modern Python async/await patterns - Multiple brokers: Redis, SQS, RabbitMQ, and more - Result backends: Store and retrieve task results - Production ready: Retries, monitoring, and error handling - Simple integration: Clean APIs that work well with dependency injection
Quick Setup¶
1. Install TaskIQ with Redis¶
# Install core TaskIQ
pip install taskiq
# Install Redis broker (recommended)
pip install taskiq-redis
2. Basic TaskIQ Backend for ModalKit¶
from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend
import json
class TaskIQBackend:
"""Simple TaskIQ backend for ModalKit integration"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
# Create broker and result backend
self.result_backend = RedisAsyncResultBackend(redis_url=redis_url)
self.broker = RedisStreamBroker(url=redis_url).with_result_backend(self.result_backend)
# Define your task processors
self._setup_tasks()
def _setup_tasks(self):
"""Setup TaskIQ tasks for processing ML results"""
@self.broker.task
async def process_ml_result(message: str) -> str:
"""Process successful ML inference results"""
try:
result = json.loads(message)
# Your custom processing logic here:
print(f"🎯 Processing ML result:")
print(f" Status: {result.get('status')}")
print(f" Confidence: {result.get('confidence')}")
# Examples of what you might do:
# - Send notifications
# - Update databases
# - Trigger downstream ML pipelines
# - Store results in analytics systems
return "success"
except Exception as e:
print(f"Error processing result: {e}")
raise # TaskIQ will handle retries
@self.broker.task
async def process_ml_error(message: str) -> str:
"""Process ML inference errors"""
try:
error_result = json.loads(message)
print(f"❌ Processing ML error:")
print(f" Error: {error_result.get('error')}")
print(f" Request ID: {error_result.get('request_id')}")
# Error handling:
# - Log to monitoring systems
# - Send alerts
# - Retry failed requests
return "error_handled"
except Exception as e:
print(f"Error processing error: {e}")
raise
# Store task references
self.process_ml_result = process_ml_result
self.process_ml_error = process_ml_error
async def send_message(self, queue_name: str, message: str) -> bool:
"""Send message to appropriate TaskIQ task"""
try:
# Route to different tasks based on queue name
if queue_name.endswith("_results"):
await self.process_ml_result.kiq(message)
elif queue_name.endswith("_errors"):
await self.process_ml_error.kiq(message)
else:
# Default to result processing
await self.process_ml_result.kiq(message)
return True
except Exception as e:
print(f"TaskIQ send error: {e}")
return False
Integration with ModalKit¶
1. Simple ML Pipeline¶
from modalkit.inference_pipeline import InferencePipeline
from modalkit.iomodel import InferenceOutputModel
from pydantic import BaseModel
from typing import List
class TextRequest(BaseModel):
text: str
class TextResponse(InferenceOutputModel):
result: str
confidence: float
class SimplePipeline(InferencePipeline):
def preprocess(self, input_list: List[BaseModel]) -> dict:
texts = [getattr(req, 'text', str(req)) for req in input_list]
return {"texts": texts}
def predict(self, input_list: List[BaseModel], preprocessed_data: dict) -> dict:
# Simple text processing
results = []
for text in preprocessed_data["texts"]:
processed = text.upper() # Simple transformation
results.append({"result": processed, "confidence": 0.95})
return {"predictions": results}
def postprocess(self, input_list: List[BaseModel], raw_output: dict) -> List[InferenceOutputModel]:
results = []
for prediction in raw_output["predictions"]:
results.append(TextResponse(
status="success",
result=prediction["result"],
confidence=prediction["confidence"]
))
return results
2. ModalService with TaskIQ¶
from modalkit.modal_service import ModalService
class TextProcessingService(ModalService):
inference_implementation = SimplePipeline
def __init__(self, queue_backend=None):
super().__init__(queue_backend=queue_backend)
self.model_name = "text-processor"
# Create TaskIQ backend
taskiq_backend = TaskIQBackend("redis://localhost:6379")
# Create service with TaskIQ integration
service = TextProcessingService(queue_backend=taskiq_backend)
3. Complete Modal App¶
import modal
from modalkit.modal_service import ModalService, create_web_endpoints
from modalkit.modal_config import ModalConfig
# TaskIQ backend
taskiq_backend = TaskIQBackend("redis://redis:6379") # Use Redis hostname in Modal
# Modal configuration
modal_config = ModalConfig()
app = modal.App(name="text-processing-with-taskiq")
@app.cls(**modal_config.get_app_cls_settings())
class TextProcessingApp(ModalService):
inference_implementation = SimplePipeline
def __init__(self):
super().__init__(queue_backend=taskiq_backend)
self.model_name = "text-processor"
@app.function(**modal_config.get_handler_settings())
@modal.asgi_app()
def web_endpoints():
return create_web_endpoints(
app_cls=TextProcessingApp,
input_model=TextRequest,
output_model=TextResponse
)
TaskIQ Worker Setup¶
1. Worker Script¶
Create worker.py
:
import asyncio
from your_app import taskiq_backend
async def main():
"""Start TaskIQ worker"""
print("🚀 Starting TaskIQ worker...")
# Start the broker
await taskiq_backend.broker.startup()
try:
# Keep worker running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("👋 Shutting down worker...")
finally:
await taskiq_backend.broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
2. Run Worker¶
# Option 1: Use the worker script
python worker.py
# Option 2: Use TaskIQ CLI (recommended)
taskiq worker your_app:taskiq_backend.broker
Usage Examples¶
Async Processing¶
import requests
headers = {"Authorization": "Bearer your-token"}
# Submit async request with TaskIQ processing
response = requests.post(
"https://your-org--text-processing-with-taskiq.modal.run/predict_async",
json={
"message": {"text": "Process this text asynchronously"},
"success_queue": "text_results",
"failure_queue": "text_errors",
"meta": {"user_id": "123", "request_id": "req_001"}
},
headers=headers
)
job_id = response.json()["job_id"]
print(f"Job submitted: {job_id}")
When this request is processed:
1. ModalKit processes the ML inference
2. Results are sent to TaskIQ via the TaskIQBackend
3. TaskIQ workers process the results asynchronously
4. You can implement custom logic in the TaskIQ tasks
Advanced TaskIQ Features¶
1. Different Broker Types¶
The taskiq-redis package provides multiple broker types:
from taskiq_redis import (
RedisStreamBroker, # Recommended: supports acknowledgments
RedisListQueueBroker, # Simple: no acknowledgments
RedisPubSubBroker # Broadcasting: no acknowledgments
)
# Stream broker (recommended for production)
broker = RedisStreamBroker(url="redis://localhost:6379")
# List queue broker (simpler, but no guarantees)
broker = RedisListQueueBroker(url="redis://localhost:6379")
# PubSub broker (for broadcasting to multiple workers)
broker = RedisPubSubBroker(url="redis://localhost:6379")
2. Production Configuration¶
from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend
class ProductionTaskIQBackend:
def __init__(self):
# Production-ready configuration
self.result_backend = RedisAsyncResultBackend(
redis_url="redis://redis-cluster:6379",
result_ex_time=3600, # Results expire after 1 hour
keep_results=False # Clean up after reading
)
self.broker = RedisStreamBroker(
url="redis://redis-cluster:6379"
).with_result_backend(self.result_backend)
3. Error Handling and Retries¶
from taskiq import ExponentialBackoff
@broker.task(retry_policy=ExponentialBackoff(max_retries=3))
async def robust_ml_processor(message: str) -> str:
"""Task with automatic retries"""
try:
# Process ML result
return "success"
except Exception as e:
print(f"Task failed, will retry: {e}")
raise # TaskIQ handles retries automatically
Alternative: Simple Queue Backends¶
If you don't need TaskIQ's full features, ModalKit also provides simple built-in backends:
from modalkit.task_queue import InMemoryBackend, SQSBackend
# For testing
memory_backend = InMemoryBackend()
service = TextProcessingService(queue_backend=memory_backend)
# For AWS SQS
sqs_backend = SQSBackend(
queue_url="https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
)
service = TextProcessingService(queue_backend=sqs_backend)
Summary¶
TaskIQ integration with ModalKit is straightforward:
- Install:
pip install taskiq taskiq-redis
- Create backend: Implement
TaskIQBackend
with your task logic - Inject: Pass backend to
ModalService(queue_backend=taskiq_backend)
- Deploy: Run TaskIQ workers separately from your Modal app
The taskiq-redis package makes Redis integration simple and provides production-ready features like acknowledgments, retries, and result storage.
Key Benefits: - ✅ Native async: Perfect for modern ML workflows - ✅ Production ready: Retries, monitoring, persistence - ✅ Flexible: Multiple broker types and configurations - ✅ Scalable: Independent worker scaling - ✅ Simple: Clean integration with ModalKit's dependency injection