Sentiment Analysis Service¶
A production-ready sentiment analysis service using Modalkit, demonstrating core features with a practical use case.
Overview¶
This tutorial shows how to build a sentiment analysis API that: - Analyzes text sentiment using pre-trained models - Provides confidence scores and emotion detection - Handles batch processing for multiple texts - Includes proper error handling and monitoring - Demonstrates cloud storage integration
Project Structure¶
sentiment-service/
├── app.py # Modal app definition
├── sentiment_model.py # Sentiment analysis implementation
├── modalkit.yaml # Configuration
├── requirements.txt # Dependencies
└── models/ # Model artifacts (optional)
1. Model Implementation¶
Create sentiment_model.py
:
from modalkit.inference_pipeline import InferencePipeline
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
import os
# Input/Output schemas
class TextInput(BaseModel):
text: str
language: str = "en"
include_emotions: bool = False
class SentimentOutput(BaseModel):
text: str
sentiment: str # "positive", "negative", "neutral"
confidence: float
score: float # -1 to 1 scale
emotions: Optional[Dict[str, float]] = None
language: str
processing_time: float
class SentimentAnalysisInference(InferencePipeline):
def __init__(self, model_name: str, all_model_data_folder: str, common_settings: dict, *args, **kwargs):
super().__init__(model_name, all_model_data_folder, common_settings)
self.model_config = common_settings.get(model_name, {})
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Initialize sentiment analysis pipeline
self.sentiment_pipeline = self._load_sentiment_model()
# Initialize emotion detection (optional)
self.emotion_pipeline = self._load_emotion_model()
# Language detection (for multilingual support)
self.language_detector = self._load_language_detector()
print(f"Sentiment analysis service initialized on {self.device}")
def _load_sentiment_model(self):
"""Load sentiment analysis model"""
model_name = self.model_config.get("sentiment_model", "cardiffnlp/twitter-roberta-base-sentiment-latest")
# Try to load from mounted storage first
model_path = "/mnt/models/sentiment_model"
if os.path.exists(model_path):
print("Loading sentiment model from mounted storage")
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)
return pipeline("sentiment-analysis", model=model, tokenizer=tokenizer, device=0 if torch.cuda.is_available() else -1)
else:
print(f"Loading sentiment model from Hugging Face: {model_name}")
return pipeline("sentiment-analysis", model=model_name, device=0 if torch.cuda.is_available() else -1)
def _load_emotion_model(self):
"""Load emotion detection model"""
try:
model_name = self.model_config.get("emotion_model", "j-hartmann/emotion-english-distilroberta-base")
emotion_path = "/mnt/models/emotion_model"
if os.path.exists(emotion_path):
print("Loading emotion model from mounted storage")
tokenizer = AutoTokenizer.from_pretrained(emotion_path)
model = AutoModelForSequenceClassification.from_pretrained(emotion_path)
return pipeline("text-classification", model=model, tokenizer=tokenizer, device=0 if torch.cuda.is_available() else -1)
else:
print(f"Loading emotion model from Hugging Face: {model_name}")
return pipeline("text-classification", model=model_name, device=0 if torch.cuda.is_available() else -1)
except Exception as e:
print(f"Could not load emotion model: {e}")
return None
def _load_language_detector(self):
"""Load language detection model"""
try:
from langdetect import detect
return detect
except ImportError:
print("Language detection not available")
return None
def _detect_language(self, text: str) -> str:
"""Detect language of text"""
if self.language_detector:
try:
return self.language_detector(text)
except:
return "unknown"
return "unknown"
def _normalize_sentiment_score(self, label: str, score: float) -> tuple[str, float, float]:
"""Normalize sentiment score to -1 to 1 scale"""
if label.lower() in ["positive", "pos"]:
return "positive", score, score
elif label.lower() in ["negative", "neg"]:
return "negative", score, -score
else:
return "neutral", score, 0.0
def _get_emotions(self, text: str) -> Optional[Dict[str, float]]:
"""Get emotion scores for text"""
if not self.emotion_pipeline:
return None
try:
results = self.emotion_pipeline(text)
emotions = {}
for result in results:
emotions[result['label'].lower()] = result['score']
return emotions
except Exception as e:
print(f"Emotion detection error: {e}")
return None
def preprocess(self, input_list: List[TextInput]) -> dict:
"""Preprocess text inputs for sentiment analysis"""
import time
start_time = time.time()
texts = []
languages = []
include_emotions_flags = []
for input_item in input_list:
# Clean and prepare text
cleaned_text = input_item.text.strip()
if len(cleaned_text) == 0:
cleaned_text = "empty text"
# Truncate very long texts
if len(cleaned_text) > 512:
cleaned_text = cleaned_text[:512]
texts.append(cleaned_text)
# Detect language if not provided
if input_item.language == "auto":
detected_lang = self._detect_language(cleaned_text)
languages.append(detected_lang)
else:
languages.append(input_item.language)
include_emotions_flags.append(input_item.include_emotions)
preprocessing_time = time.time() - start_time
return {
"texts": texts,
"languages": languages,
"include_emotions_flags": include_emotions_flags,
"preprocessing_time": preprocessing_time
}
def predict(self, input_list: List[TextInput], preprocessed_data: dict) -> dict:
"""Perform sentiment analysis on preprocessed texts"""
import time
start_time = time.time()
texts = preprocessed_data["texts"]
languages = preprocessed_data["languages"]
include_emotions_flags = preprocessed_data["include_emotions_flags"]
# Batch sentiment analysis
sentiment_results = self.sentiment_pipeline(texts)
# Process emotion detection for texts that need it
emotion_results = []
for i, (text, include_emotions) in enumerate(zip(texts, include_emotions_flags)):
if include_emotions:
emotions = self._get_emotions(text)
emotion_results.append(emotions)
else:
emotion_results.append(None)
inference_time = time.time() - start_time
return {
"sentiment_results": sentiment_results,
"emotion_results": emotion_results,
"inference_time": inference_time
}
def postprocess(self, input_list: List[TextInput], raw_output: dict) -> List[SentimentOutput]:
"""Format sentiment analysis outputs"""
sentiment_results = raw_output["sentiment_results"]
emotion_results = raw_output["emotion_results"]
inference_time = raw_output["inference_time"]
outputs = []
for i, (input_item, sentiment_result, emotions) in enumerate(zip(input_list, sentiment_results, emotion_results)):
# Normalize sentiment
sentiment_label, confidence, normalized_score = self._normalize_sentiment_score(
sentiment_result["label"], sentiment_result["score"]
)
# Detect language if needed
language = input_item.language
if language == "auto":
language = self._detect_language(input_item.text)
outputs.append(SentimentOutput(
text=input_item.text,
sentiment=sentiment_label,
confidence=confidence,
score=normalized_score,
emotions=emotions,
language=language,
processing_time=inference_time / len(input_list)
))
return outputs
2. Configuration¶
Create modalkit.yaml
:
app_settings:
app_prefix: "sentiment-service"
# Authentication
auth_config:
ssm_key: "/sentiment-service/api-key"
auth_header: "x-api-key"
# Container configuration
build_config:
image: "python:3.11"
tag: "latest"
workdir: "/app"
env:
TRANSFORMERS_CACHE: "/tmp/transformers_cache"
HF_HOME: "/tmp/huggingface"
extra_run_commands:
# Install PyTorch
- "pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118"
# Install NLP libraries
- "pip install transformers tokenizers"
# Install language detection
- "pip install langdetect"
# Deployment configuration
deployment_config:
gpu: "T4" # Cost-effective GPU for transformer models
concurrency_limit: 10
container_idle_timeout: 600 # 10 minutes
retries: 3
memory: 8192 # 8GB RAM
# Mount pre-trained models from cloud storage
cloud_bucket_mounts:
- mount_point: "/mnt/models"
bucket_name: "sentiment-models"
secret: "aws-credentials"
key_prefix: "production/"
read_only: true
# Cache for model downloads
volumes:
"/tmp/transformers_cache": "transformers-cache"
"/tmp/huggingface": "huggingface-cache"
volume_reload_interval_seconds: 3600
# Batch processing configuration
batch_config:
max_batch_size: 16 # Process multiple texts efficiently
wait_ms: 100
# Queue configuration for async processing
queue_config:
backend: "taskiq"
broker_url: "redis://redis:6379"
# Model configuration
model_settings:
local_model_repository_folder: "./models"
common:
device: "cuda"
max_length: 512
model_entries:
sentiment_model:
sentiment_model: "cardiffnlp/twitter-roberta-base-sentiment-latest"
emotion_model: "j-hartmann/emotion-english-distilroberta-base"
supported_languages: ["en", "es", "fr", "de", "it"]
3. Modal App¶
Create app.py
:
import modal
from modalkit.modal_service import ModalService, create_web_endpoints
from modalkit.modal_config import ModalConfig
from sentiment_model import SentimentAnalysisInference, TextInput, SentimentOutput
# Initialize Modalkit
modal_config = ModalConfig()
app = modal.App(name=modal_config.app_name)
# Define Modal app class
@app.cls(**modal_config.get_app_cls_settings())
class SentimentApp(ModalService):
inference_implementation = SentimentAnalysisInference
model_name: str = modal.parameter(default="sentiment_model")
modal_utils: ModalConfig = modal_config
# Optional: Add queue backend for async processing
# def __init__(self, queue_backend=None):
# super().__init__(queue_backend=queue_backend)
# # Custom initialization here
# Create API endpoints
@app.function(**modal_config.get_handler_settings())
@modal.asgi_app(**modal_config.get_asgi_app_settings())
def web_endpoints():
return create_web_endpoints(
app_cls=SentimentApp,
input_model=TextInput,
output_model=SentimentOutput
)
# Health check endpoint
@app.function()
def health_check():
return {"status": "healthy", "service": "sentiment-analysis"}
if __name__ == "__main__":
# For local development
with modal.enable_local_development():
pass
4. Dependencies¶
Create requirements.txt
:
5. Usage Examples¶
Single Text Analysis¶
import requests
headers = {"x-api-key": "your-api-key"}
# Analyze sentiment of a single text
response = requests.post(
"https://your-org--sentiment-service.modal.run/predict_sync",
json={
"text": "I absolutely love this new product! It's amazing.",
"language": "en",
"include_emotions": True
},
headers=headers
)
result = response.json()
print(f"Text: {result['text']}")
print(f"Sentiment: {result['sentiment']} (confidence: {result['confidence']:.3f})")
print(f"Score: {result['score']:.3f}")
if result['emotions']:
print("Emotions:")
for emotion, score in result['emotions'].items():
print(f" {emotion}: {score:.3f}")
Batch Processing¶
import requests
headers = {"x-api-key": "your-api-key"}
# Analyze multiple texts at once
texts = [
{"text": "This is the best day ever!", "include_emotions": True},
{"text": "I'm feeling really disappointed about this.", "include_emotions": True},
{"text": "The weather is okay today.", "include_emotions": False},
{"text": "I can't believe how terrible this service is!", "include_emotions": True}
]
response = requests.post(
"https://your-org--sentiment-service.modal.run/predict_batch",
json=texts,
headers=headers
)
results = response.json()
for i, result in enumerate(results):
print(f"Text {i+1}: {result['sentiment']} (score: {result['score']:.2f})")
Async Processing¶
ModalKit supports flexible async processing with optional queue backends:
# Option 1: No queues (default) - async requests are processed but responses aren't queued
class SentimentApp(ModalService):
inference_implementation = SentimentAnalysisInference
# Option 2: Configuration-based queues (uses modalkit.yaml settings)
# Configure in modalkit.yaml:
# queue_config:
# backend: "memory" # or "sqs"
# Option 3: Custom queue backend (TaskIQ, RabbitMQ, etc.)
from your_queue_backend import TaskIQBackend
taskiq_backend = TaskIQBackend("redis://redis:6379")
class SentimentApp(ModalService):
inference_implementation = SentimentAnalysisInference
def __init__(self):
super().__init__(queue_backend=taskiq_backend)
Basic async usage:
import requests
headers = {"x-api-key": "your-api-key"}
# Submit for async processing
response = requests.post(
"https://your-org--sentiment-service.modal.run/predict_async",
json={
"message": {
"text": "This is a long text that might take time to process...",
"language": "en",
"include_emotions": True
},
"success_queue": "sentiment_results",
"failure_queue": "sentiment_errors",
"meta": {"user_id": "123", "request_id": "req_001"}
},
headers=headers
)
job_id = response.json()["job_id"]
print(f"Analysis submitted: {job_id}")
For complete queue backend examples, see: - Queue Backend Patterns - TaskIQ Integration
6. Production Features¶
Error Handling¶
import requests
headers = {"x-api-key": "your-api-key"}
# Test error handling
try:
response = requests.post(
"https://your-org--sentiment-service.modal.run/predict_sync",
json={
"text": "", # Empty text
"language": "en"
},
headers=headers
)
if response.status_code == 200:
result = response.json()
print(f"Handled empty text: {result['sentiment']}")
else:
print(f"Error: {response.status_code} - {response.text}")
except Exception as e:
print(f"Request failed: {e}")
Monitoring and Metrics¶
# In production, you can add monitoring
import requests
import time
headers = {"x-api-key": "your-api-key"}
# Measure response time
start_time = time.time()
response = requests.post(
"https://your-org--sentiment-service.modal.run/predict_sync",
json={
"text": "This is a test for monitoring response times.",
"language": "en"
},
headers=headers
)
end_time = time.time()
if response.status_code == 200:
result = response.json()
print(f"Response time: {end_time - start_time:.3f}s")
print(f"Model processing time: {result['processing_time']:.3f}s")
7. Advanced Configuration¶
Multi-Language Support¶
# modalkit.yaml - Multi-language configuration
model_settings:
model_entries:
sentiment_model:
sentiment_model: "cardiffnlp/twitter-xlm-roberta-base-sentiment" # Multilingual model
emotion_model: "j-hartmann/emotion-english-distilroberta-base"
supported_languages: ["en", "es", "fr", "de", "it", "pt", "ja", "zh"]
High-Performance Configuration¶
# modalkit.yaml - High-performance setup
deployment_config:
gpu: "A10G" # More powerful GPU
concurrency_limit: 20
container_idle_timeout: 1800 # 30 minutes
batch_config:
max_batch_size: 32 # Larger batches
wait_ms: 200
Cost-Optimized Configuration¶
# modalkit.yaml - Cost-optimized setup
deployment_config:
gpu: null # Use CPU for cost savings
cpu: 4.0
memory: 8192
concurrency_limit: 15
container_idle_timeout: 300 # 5 minutes
8. Deployment¶
Local Testing¶
# Start local server
modal serve app.py
# Test the service
curl -X POST http://localhost:8000/predict_sync \
-H "Content-Type: application/json" \
-H "x-api-key: dev-key" \
-d '{"text": "I love this tutorial!", "include_emotions": true}'
Production Deployment¶
# Deploy to Modal
modal deploy app.py
# Check deployment status
modal app list
# View logs
modal logs -f sentiment-service
Key Features Demonstrated¶
- Practical Use Case: Real-world sentiment analysis service
- Batch Processing: Efficient processing of multiple texts
- Cloud Storage: Model loading from S3/GCS
- Error Handling: Graceful handling of edge cases
- Multi-modal Output: Sentiment + emotion detection
- Language Detection: Automatic language detection
- Production Ready: Proper configuration for different environments
- Monitoring: Built-in performance metrics
This example provides a solid foundation for building production ML services with Modalkit, demonstrating core concepts while solving a real business problem.