Skip to content

API Reference

Core Components

Inference

Base class for an inference pipeline. Subclasses should implement preprocess, predict, and postprocess methods. This exists for a single model version. Downstream app layers (like modal) will decide how to manage multiple.

Source code in modalkit/inference_pipeline.py
class InferencePipeline:
    """
    Base class for an inference pipeline. Subclasses should implement preprocess, predict, and postprocess methods.
    This exists for a single model version. Downstream app layers (like modal) will decide how to manage multiple.
    """

    def __init__(
        self,
        model_name: str,
        all_model_data_folder: str,
        common_settings: dict,
        *args: tuple[Any, ...],
        **kwargs: dict[str, Any],
    ):
        """
        Initializes the InferencePipeline class.

        Args:
            model_name (str): Name of the model
            all_model_data_folder (str): Path to the folder containing all model data
            common_settings (dict): Common settings shared across models
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.
        """
        self.common_settings = common_settings
        self.model_name = model_name
        self.all_model_data_folder = all_model_data_folder

    def on_volume_reload(self) -> None:
        """
        Hook method called after a volume reload occurs.

        This method is called by the Modal app layer after volumes have been reloaded.
        Subclasses can override this method to perform any necessary actions after
        a volume reload, such as reloading models or updating cached data.

        By default, this method does nothing.
        """
        pass

    def run_inference(self, input_list: list[BaseModel]) -> list[InferenceOutputModel]:
        """
        Runs the full inference pipeline: preprocess -> predict -> postprocess.

        Args:
            input_list (list[BaseModel]): A list of input messages to the inference pipeline.

        Returns:
            list[InferenceOutputModel]: The list of final processed results after base_inference.
        """
        preprocessed_data = self.preprocess(input_list)
        raw_output = self.predict(input_list, preprocessed_data)
        result = self.postprocess(input_list, raw_output)
        return result

    @abstractmethod
    def preprocess(self, input_list: list[BaseModel]) -> dict:
        """
        Prepares the input data for the model.

        Args:
            input_list (list[BaseModel]): The list of input data to be preprocessed.

        Returns:
            dict: The preprocessed data.
        """
        pass

    @abstractmethod
    def predict(self, input_list: list[BaseModel], preprocessed_data: dict) -> dict:
        """
        Performs the prediction using the model.

        Args:
            input_list (list[BaseModel]): The list of original input data.
            preprocessed_data (dict): The preprocessed data.

        Returns:
            Any: The raw output from the model.
        """
        pass

    @abstractmethod
    def postprocess(self, input_list: list[BaseModel], raw_output: dict) -> list[InferenceOutputModel]:
        """
        Processes the raw output from the model into usable results.

        Args:
            input_list (list[BaseModel]): The list of original input data.
            raw_output (dict): The raw output from the model.

        Returns:
            list[InferenceOutputModel]: The list of final processed results.
        """
        pass

__init__(model_name, all_model_data_folder, common_settings, *args, **kwargs)

Initializes the InferencePipeline class.

Parameters:

Name Type Description Default
model_name str

Name of the model

required
all_model_data_folder str

Path to the folder containing all model data

required
common_settings dict

Common settings shared across models

required
*args tuple[Any, ...]

Variable length argument list.

()
**kwargs dict[str, Any]

Arbitrary keyword arguments.

{}
Source code in modalkit/inference_pipeline.py
def __init__(
    self,
    model_name: str,
    all_model_data_folder: str,
    common_settings: dict,
    *args: tuple[Any, ...],
    **kwargs: dict[str, Any],
):
    """
    Initializes the InferencePipeline class.

    Args:
        model_name (str): Name of the model
        all_model_data_folder (str): Path to the folder containing all model data
        common_settings (dict): Common settings shared across models
        *args: Variable length argument list.
        **kwargs: Arbitrary keyword arguments.
    """
    self.common_settings = common_settings
    self.model_name = model_name
    self.all_model_data_folder = all_model_data_folder

on_volume_reload()

Hook method called after a volume reload occurs.

This method is called by the Modal app layer after volumes have been reloaded. Subclasses can override this method to perform any necessary actions after a volume reload, such as reloading models or updating cached data.

By default, this method does nothing.

Source code in modalkit/inference_pipeline.py
def on_volume_reload(self) -> None:
    """
    Hook method called after a volume reload occurs.

    This method is called by the Modal app layer after volumes have been reloaded.
    Subclasses can override this method to perform any necessary actions after
    a volume reload, such as reloading models or updating cached data.

    By default, this method does nothing.
    """
    pass

postprocess(input_list, raw_output) abstractmethod

Processes the raw output from the model into usable results.

Parameters:

Name Type Description Default
input_list list[BaseModel]

The list of original input data.

required
raw_output dict

The raw output from the model.

required

Returns:

Type Description
list[InferenceOutputModel]

list[InferenceOutputModel]: The list of final processed results.

Source code in modalkit/inference_pipeline.py
@abstractmethod
def postprocess(self, input_list: list[BaseModel], raw_output: dict) -> list[InferenceOutputModel]:
    """
    Processes the raw output from the model into usable results.

    Args:
        input_list (list[BaseModel]): The list of original input data.
        raw_output (dict): The raw output from the model.

    Returns:
        list[InferenceOutputModel]: The list of final processed results.
    """
    pass

predict(input_list, preprocessed_data) abstractmethod

Performs the prediction using the model.

Parameters:

Name Type Description Default
input_list list[BaseModel]

The list of original input data.

required
preprocessed_data dict

The preprocessed data.

required

Returns:

Name Type Description
Any dict

The raw output from the model.

Source code in modalkit/inference_pipeline.py
@abstractmethod
def predict(self, input_list: list[BaseModel], preprocessed_data: dict) -> dict:
    """
    Performs the prediction using the model.

    Args:
        input_list (list[BaseModel]): The list of original input data.
        preprocessed_data (dict): The preprocessed data.

    Returns:
        Any: The raw output from the model.
    """
    pass

preprocess(input_list) abstractmethod

Prepares the input data for the model.

Parameters:

Name Type Description Default
input_list list[BaseModel]

The list of input data to be preprocessed.

required

Returns:

Name Type Description
dict dict

The preprocessed data.

Source code in modalkit/inference_pipeline.py
@abstractmethod
def preprocess(self, input_list: list[BaseModel]) -> dict:
    """
    Prepares the input data for the model.

    Args:
        input_list (list[BaseModel]): The list of input data to be preprocessed.

    Returns:
        dict: The preprocessed data.
    """
    pass

run_inference(input_list)

Runs the full inference pipeline: preprocess -> predict -> postprocess.

Parameters:

Name Type Description Default
input_list list[BaseModel]

A list of input messages to the inference pipeline.

required

Returns:

Type Description
list[InferenceOutputModel]

list[InferenceOutputModel]: The list of final processed results after base_inference.

Source code in modalkit/inference_pipeline.py
def run_inference(self, input_list: list[BaseModel]) -> list[InferenceOutputModel]:
    """
    Runs the full inference pipeline: preprocess -> predict -> postprocess.

    Args:
        input_list (list[BaseModel]): A list of input messages to the inference pipeline.

    Returns:
        list[InferenceOutputModel]: The list of final processed results after base_inference.
    """
    preprocessed_data = self.preprocess(input_list)
    raw_output = self.predict(input_list, preprocessed_data)
    result = self.postprocess(input_list, raw_output)
    return result

options: show_root_heading: true show_source: false

Settings

Bases: YamlBaseSettings

Main configuration settings for Modalkit applications.

This class manages all configuration settings for both the application and model deployment. It supports loading settings from YAML files and environment variables with proper type validation.

Attributes:

Name Type Description
app_settings AppSettings

Application-level configuration settings

model_settings ModelSettings

Model-specific configuration settings

Configuration is loaded from
  • Environment variables with MODALKIT_ prefix
  • modalkit.yaml file. This location can be overridden by the MODALKIT_CONFIG environment variable.
  • .env file
Source code in modalkit/settings.py
class Settings(YamlBaseSettings):
    """
    Main configuration settings for Modalkit applications.

    This class manages all configuration settings for both the application and
    model deployment. It supports loading settings from YAML files and environment
    variables with proper type validation.

    Attributes:
        app_settings (AppSettings): Application-level configuration settings
        model_settings (ModelSettings): Model-specific configuration settings

    Configuration is loaded from:
        - Environment variables with MODALKIT_ prefix
        - modalkit.yaml file. This location can
            be overridden by the
            MODALKIT_CONFIG environment variable.
        - .env file
    """

    model_config = SettingsConfigDict(
        env_file=".env",
        yaml_file="modalkit.yaml",
        env_prefix="MODALKIT_",
        env_nested_delimiter="__",
        protected_namespaces=("settings_",),
        case_sensitive=False,
    )
    app_settings: AppSettings
    model_settings: ModelSettings

    def __init__(self, **kwargs: Any) -> None:
        """Initialize Settings with optional direct values or load from config sources."""
        super().__init__(**kwargs)

    @classmethod
    def settings_customise_sources(
        cls,
        settings_cls: type[BaseSettings],
        init_settings: PydanticBaseSettingsSource,
        env_settings: PydanticBaseSettingsSource,
        dotenv_settings: PydanticBaseSettingsSource,
        file_secret_settings: PydanticBaseSettingsSource,
    ) -> tuple[PydanticBaseSettingsSource, ...]:
        yaml_files = os.getenv("MODALKIT_CONFIG", "modalkit.yaml").split(",")
        yaml_settings = [YamlConfigSettingsSource(settings_cls, yaml_file=yaml_file) for yaml_file in yaml_files]
        yaml_settings.reverse()
        return (
            init_settings,
            env_settings,
            *yaml_settings,
            file_secret_settings,
        )

__init__(**kwargs)

Initialize Settings with optional direct values or load from config sources.

Source code in modalkit/settings.py
def __init__(self, **kwargs: Any) -> None:
    """Initialize Settings with optional direct values or load from config sources."""
    super().__init__(**kwargs)

options: show_root_heading: true show_source: false

Bases: BaseModel

Represents the application settings for a model.

Attributes:

Name Type Description
app_prefix str

Application prefix for the model

build_config BuildConfig

Build configuration for the model

deployment_config DeploymentConfig

Deployment configuration for the model

batch_config BatchConfig

Batch endpoint configuration

queue_config QueueConfig

Queue configuration for async messaging

Source code in modalkit/settings.py
class AppSettings(BaseModel):
    """
    Represents the application settings for a model.

    Attributes:
        app_prefix (str): Application prefix for the model
        build_config (BuildConfig): Build configuration for the model
        deployment_config (DeploymentConfig): Deployment configuration for the model
        batch_config (BatchConfig): Batch endpoint configuration
        queue_config (QueueConfig): Queue configuration for async messaging
    """

    app_prefix: str
    build_config: BuildConfig
    deployment_config: DeploymentConfig
    batch_config: BatchConfig
    queue_config: QueueConfig = QueueConfig()

options: show_root_heading: true show_source: false

Bases: BaseModel

Represents the model settings for a model.

Attributes:

Name Type Description
local_model_repository_folder Path

Local model repository folder for the model

model_entries dict[str, Any]

Model entries for the model

common dict[str, Any]

Common settings for the model

Source code in modalkit/settings.py
class ModelSettings(BaseModel):
    """
    Represents the model settings for a model.

    Attributes:
        local_model_repository_folder (Path): Local model repository folder for the model
        model_entries (dict[str, Any]): Model entries for the model
        common (dict[str, Any]): Common settings for the model
    """

    local_model_repository_folder: Path
    model_entries: dict[str, Any]
    common: dict[str, Any]

options: show_root_heading: true show_source: false

Base class for Modal-based ML application deployment.

This class provides the foundation for deploying ML models using Modal, handling model loading, inference, and API endpoint creation. It integrates with the InferencePipeline class to standardize model serving.

The queue backend is fully optional and supports dependency injection for maximum flexibility:

Usage Examples:

  1. No Queues (Default):

    class MyService(ModalService):
        inference_implementation = MyInferencePipeline
    
    service = MyService()  # No queues, async requests just don't send responses
    

  2. Configuration-Based Queues:

    # Uses modalkit.yaml settings for queue configuration
    service = MyService()  # Automatically uses configured backend (SQS, etc.)
    

  3. Dependency Injection with TaskIQ:

    from taskiq_redis import AsyncRedisTaskiqBroker
    
    class TaskIQBackend:
        def __init__(self, broker_url="redis://localhost:6379"):
            self.broker = AsyncRedisTaskiqBroker(broker_url)
    
        async def send_message(self, queue_name: str, message: str) -> bool:
            @self.broker.task(task_name=f"process_{queue_name}")
            async def process_message(msg: str) -> None:
                # Your custom task processing logic
                logger.info(f"Processing: {msg}")
    
            await process_message.kiq(message)
            return True
    
    # Inject your TaskIQ backend
    taskiq_backend = TaskIQBackend("redis://localhost:6379")
    service = MyService(queue_backend=taskiq_backend)
    

  4. Custom Queue Implementation:

    class MyCustomQueue:
        async def send_message(self, queue_name: str, message: str) -> bool:
            # Send to your custom queue system (RabbitMQ, Kafka, etc.)
            await my_queue_system.send(queue_name, message)
            return True
    
    service = MyService(queue_backend=MyCustomQueue())
    

Attributes:

Name Type Description
model_name str

Name of the model to be served

inference_implementation type[InferencePipeline]

Implementation class of the inference pipeline

modal_utils ModalConfig

Modal config object, containing the settings and config functions

queue_backend Optional[QueueBackend]

Optional queue backend for dependency injection

Source code in modalkit/modal_service.py
class ModalService:
    """
    Base class for Modal-based ML application deployment.

    This class provides the foundation for deploying ML models using Modal,
    handling model loading, inference, and API endpoint creation. It integrates
    with the InferencePipeline class to standardize model serving.

    The queue backend is fully optional and supports dependency injection for
    maximum flexibility:

    **Usage Examples:**

    1. **No Queues (Default):**
    ```python
    class MyService(ModalService):
        inference_implementation = MyInferencePipeline

    service = MyService()  # No queues, async requests just don't send responses
    ```

    2. **Configuration-Based Queues:**
    ```python
    # Uses modalkit.yaml settings for queue configuration
    service = MyService()  # Automatically uses configured backend (SQS, etc.)
    ```

    3. **Dependency Injection with TaskIQ:**
    ```python
    from taskiq_redis import AsyncRedisTaskiqBroker

    class TaskIQBackend:
        def __init__(self, broker_url="redis://localhost:6379"):
            self.broker = AsyncRedisTaskiqBroker(broker_url)

        async def send_message(self, queue_name: str, message: str) -> bool:
            @self.broker.task(task_name=f"process_{queue_name}")
            async def process_message(msg: str) -> None:
                # Your custom task processing logic
                logger.info(f"Processing: {msg}")

            await process_message.kiq(message)
            return True

    # Inject your TaskIQ backend
    taskiq_backend = TaskIQBackend("redis://localhost:6379")
    service = MyService(queue_backend=taskiq_backend)
    ```

    4. **Custom Queue Implementation:**
    ```python
    class MyCustomQueue:
        async def send_message(self, queue_name: str, message: str) -> bool:
            # Send to your custom queue system (RabbitMQ, Kafka, etc.)
            await my_queue_system.send(queue_name, message)
            return True

    service = MyService(queue_backend=MyCustomQueue())
    ```

    Attributes:
        model_name (str): Name of the model to be served
        inference_implementation (type[InferencePipeline]): Implementation class of the inference pipeline
        modal_utils (ModalConfig): Modal config object, containing the settings and config functions
        queue_backend (Optional[QueueBackend]): Optional queue backend for dependency injection
    """

    model_name: str
    inference_implementation: type[InferencePipeline]
    modal_utils: ModalConfig
    queue_backend: Optional[QueueBackend] = None

    def __init__(self, queue_backend: Optional[QueueBackend] = None):
        """
        Initialize ModalService with optional queue backend.

        Args:
            queue_backend: Optional queue backend for dependency injection.
                          If None, will use configuration-based approach or skip queues.
        """
        self.queue_backend = queue_backend

    @modal.enter()
    def load_artefacts(self) -> None:
        """
        Loads model artifacts and initializes the inference instance.

        This method is called when the Modal container starts up. It:
        1. Retrieves model-specific settings from configuration
        2. Initializes the inference implementation with the model settings
        3. Sets up the model for inference
        4. Initializes volume reloading if configured

        The method is decorated with @modal.enter() to ensure it runs during container startup.
        """
        settings = self.modal_utils.settings
        self._model_inference_kwargs = settings.model_settings.model_entries[self.model_name]

        self._model_inference_instance: InferencePipeline = self.inference_implementation(
            model_name=self.model_name,
            all_model_data_folder=str(settings.model_settings.local_model_repository_folder),
            common_settings=settings.model_settings.common,
            **self._model_inference_kwargs,
        )

        # Initialize volume reloading if configured
        self._last_reload_time = time.time()
        self._reload_interval = settings.app_settings.deployment_config.volume_reload_interval_seconds

    def _reload_volumes_if_needed(self) -> None:
        """
        Reloads all configured volumes if the time since last reload exceeds the configured interval.
        After reloading, calls the on_volume_reload hook on the inference instance.
        If the hook raises an error, it is logged but does not prevent request processing.
        """
        # If reload interval is None, volume reloading is disabled
        if self._reload_interval is None:
            return

        current_time = time.time()
        if current_time - self._last_reload_time >= self._reload_interval:
            logger.info(
                f"Time since last reload {current_time - self._last_reload_time}s exceeded interval {self._reload_interval}s, reloading volumes"
            )
            self.modal_utils.reload_volumes()
            self._last_reload_time = current_time

            # Call the on_volume_reload hook
            try:
                logger.info("Calling on_volume_reload hook")
                self._model_inference_instance.on_volume_reload()
            except Exception:
                logger.exception("Error in on_volume_reload hook, continuing with request processing")

    @modal.batched(
        max_batch_size=modal_utils.settings.app_settings.batch_config.max_batch_size,
        wait_ms=modal_utils.settings.app_settings.batch_config.wait_ms,
    )
    def process_request(self, input_list: list[Union[SyncInputModel, AsyncInputModel]]) -> list[InferenceOutputModel]:
        """
        Processes a batch of inference requests.

        Args:
            input_list (list[Union[SyncInputModel, AsyncInputModel]]): The list of input models containing either
                sync or async requests

        Returns:
            list[InferenceOutputModel]: The list of processed outputs conforming to the model's output schema
        """
        batch_size = len(input_list)
        logger.info(f"Received batch of {batch_size} input requests")

        try:
            # Reload volumes if needed before processing the request
            self._reload_volumes_if_needed()

            # Run Inference. Outputs are expected to be in the same order as the inputs
            messages = [input_data.message for input_data in input_list]
            raw_output_list = self._model_inference_instance.run_inference(messages)
            logger.info(
                f"Statuses of the {batch_size} processed requests: {[output.status for output in raw_output_list]}"
            )

            # For any requests that were async, return the response to the appropriate queue
            for message_idx, (input_data, raw_output_data) in enumerate(zip(input_list, raw_output_list)):
                if isinstance(input_data, AsyncInputModel):
                    self.send_async_response(message_idx, raw_output_data, input_data)

        # Unhappy path: On internal error, return error outputs to the queues of all async messages
        # and kill the container if a CUDA error was encountered
        except Exception as e:
            if "CUDA error" in str(e):
                logger.error("Exiting container due to CUDA error. This is potentially due to a hardware issue")
                modal.experimental.stop_fetching_inputs()
            err_msg = f"Internal Server Error. Error log: {e}"
            logger.error(f"Error processing batch: {err_msg}")

            for message_idx, input_data in enumerate(input_list):
                if isinstance(input_data, AsyncInputModel):
                    error_response = DelayedFailureOutputModel(
                        status="error", error=err_msg, original_message=input_data
                    )
                    self.send_async_response(message_idx, error_response, input_data)
            raise HTTPException(status_code=500, detail=err_msg) from e
        else:
            return raw_output_list

    async def _send_to_queue(self, queue_name: str, message: str) -> bool:
        """
        Send message to queue using dependency injection or configuration-based approach.

        Args:
            queue_name: Name of the queue to send to
            message: JSON message to send

        Returns:
            bool: True if message was sent successfully, False otherwise
        """
        if self.queue_backend:
            # Use injected queue backend
            try:
                return await self.queue_backend.send_message(queue_name, message)
            except Exception as e:
                logger.error(f"Failed to send message using injected queue backend: {e}")
                return False
        else:
            # Fall back to configuration-based approach
            try:
                return send_response_queue(queue_name, message)
            except Exception as e:
                logger.error(f"Failed to send message using configuration-based queue: {e}")
                return False

    def send_async_response(
        self, message_idx: int, raw_output_data: InferenceOutputModel, input_data: AsyncInputModel
    ) -> None:
        """
        Sends the inference result to the success or failure queues depending on the message status.
        Queue functionality is optional - only attempts to send if queue names are provided.

        Args:
            message_idx: Index of the message in the batch (for logging)
            raw_output_data (InferenceOutputModel): The processed output result
            input_data (AsyncInputModel): Object containing the async input data
        """
        # Only append metadata for regular inference outputs, not DelayedFailureOutputModel
        # DelayedFailureOutputModel already contains the original message with its metadata
        if not isinstance(raw_output_data, DelayedFailureOutputModel):
            # InferenceOutputModel allows extra fields - use setattr to avoid type checker issues
            raw_output_data.meta = input_data.meta

        if raw_output_data.status == "success":
            success_queue = input_data.success_queue
            if success_queue:  # Only send if queue name is provided
                # Use asyncio.create_task to avoid blocking the batch processing
                import asyncio

                try:
                    loop = asyncio.get_running_loop()
                    task = loop.create_task(
                        self._send_to_queue(success_queue, raw_output_data.model_dump_json(exclude={"error"}))
                    )
                    # Don't await - fire and forget to avoid blocking batch processing
                    task.add_done_callback(lambda t: self._log_queue_result(t, success_queue, "success"))
                except RuntimeError:
                    # No running loop, use synchronous fallback
                    success = asyncio.run(
                        self._send_to_queue(success_queue, raw_output_data.model_dump_json(exclude={"error"}))
                    )
                    if not success:
                        logger.warning(f"Failed to send success response to queue: {success_queue}")
            else:
                logger.debug("No success queue specified, skipping queue response")
        else:
            failure_queue = input_data.failure_queue
            if failure_queue:  # Only send if queue name is provided
                # Use asyncio.create_task to avoid blocking the batch processing
                import asyncio

                try:
                    loop = asyncio.get_running_loop()
                    task = loop.create_task(self._send_to_queue(failure_queue, raw_output_data.model_dump_json()))
                    # Don't await - fire and forget to avoid blocking batch processing
                    task.add_done_callback(lambda t: self._log_queue_result(t, failure_queue, "failure"))
                except RuntimeError:
                    # No running loop, use synchronous fallback
                    success = asyncio.run(self._send_to_queue(failure_queue, raw_output_data.model_dump_json()))
                    if not success:
                        logger.warning(f"Failed to send failure response to queue: {failure_queue}")
            else:
                logger.debug("No failure queue specified, skipping queue response")

    def _log_queue_result(self, task: "asyncio.Task", queue_name: str, response_type: str) -> None:
        """Log the result of a queue sending task."""
        try:
            success = task.result()
            if not success:
                logger.warning(f"Failed to send {response_type} response to queue: {queue_name}")
        except Exception as e:
            logger.error(f"Error sending {response_type} response to queue {queue_name}: {e}")

    @staticmethod
    def async_call(cls: type["ModalService"]) -> Callable[[str, BaseModel], Awaitable[AsyncOutputModel]]:
        """
        Creates an asynchronous callable function for processing and returning inference results via queues.

        This method generates a function that spawns an asynchronous task for the `process_request` method.
        It allows triggering an async inference job while returning a job ID for tracking purposes.

        Args:
            cls (type[ModalService]): The class reference for creating an instance of `ModalService`.

        Returns:
            Callable: A function that, when called, spawns an asynchronous task and returns an AsyncOutputModel with job ID.

        Example:
            >>> async_fn = ModalService.async_call(MyApp)
            >>> result = async_fn(model_name="example_model", input_data)
            >>> print(result)
            AsyncOutputModel(job_id="some_job_id")
        """

        async def fn(model_name: str, input_data: BaseModel) -> AsyncOutputModel:
            # input_data should be an AsyncInputModel based on FastAPI usage
            if isinstance(input_data, AsyncInputModel):
                input_list = [input_data]
            else:
                # Fallback for other BaseModel types - shouldn't happen in normal usage
                input_list = [AsyncInputModel(message=input_data)]

            # Create a mock instance for spawning - this is a limitation of the current design
            # In practice, this would need to be refactored to work with Modal's class instantiation
            service_instance = cls()
            service_instance.model_name = model_name
            call = await service_instance.process_request.spawn.aio(input_list)
            return AsyncOutputModel(job_id=call.object_id)

        return fn

    @staticmethod
    def sync_call(cls: type["ModalService"]) -> Callable[[str, BaseModel], Awaitable[BaseModel]]:
        """
        Creates a synchronous callable function for processing inference requests.
        Each request is processed individually to maintain immediate response times.
        For batch processing, use async endpoints.

        This method generates a function that triggers the `process` method of the `ModalService` class.
        It allows synchronous inference processing with input data passed to the model.

        Args:
            cls (type[ModalService]): The class reference for creating an instance of `ModalService`.

        Returns:
            Callable: A function that, when called, executes a synchronous inference call and returns the result.

        Example:
            >>> sync_fn = ModalService.sync_call(MyApp)
            >>> result = sync_fn(model_name="example_model", input_data)
            >>> print(result)
            InferenceOutputModel(status="success", ...)
        """

        async def fn(model_name: str, input_data: BaseModel) -> BaseModel:
            # input_data should be a SyncInputModel based on FastAPI usage
            if isinstance(input_data, SyncInputModel):
                input_list = [input_data]
            else:
                # Fallback for other BaseModel types
                input_list = [SyncInputModel(message=input_data)]

            # Create a mock instance for remote calls - this is a limitation of the current design
            service_instance = cls()
            service_instance.model_name = model_name
            results = await service_instance.process_request.remote.aio(input_list)
            # Return the first (and only) result for sync calls
            return results[0]

        return fn

__init__(queue_backend=None)

Initialize ModalService with optional queue backend.

Parameters:

Name Type Description Default
queue_backend Optional[QueueBackend]

Optional queue backend for dependency injection. If None, will use configuration-based approach or skip queues.

None
Source code in modalkit/modal_service.py
def __init__(self, queue_backend: Optional[QueueBackend] = None):
    """
    Initialize ModalService with optional queue backend.

    Args:
        queue_backend: Optional queue backend for dependency injection.
                      If None, will use configuration-based approach or skip queues.
    """
    self.queue_backend = queue_backend

async_call() staticmethod

Creates an asynchronous callable function for processing and returning inference results via queues.

This method generates a function that spawns an asynchronous task for the process_request method. It allows triggering an async inference job while returning a job ID for tracking purposes.

Parameters:

Name Type Description Default
cls type[ModalService]

The class reference for creating an instance of ModalService.

required

Returns:

Name Type Description
Callable Callable[[str, BaseModel], Awaitable[AsyncOutputModel]]

A function that, when called, spawns an asynchronous task and returns an AsyncOutputModel with job ID.

Example

async_fn = ModalService.async_call(MyApp) result = async_fn(model_name="example_model", input_data) print(result) AsyncOutputModel(job_id="some_job_id")

Source code in modalkit/modal_service.py
@staticmethod
def async_call(cls: type["ModalService"]) -> Callable[[str, BaseModel], Awaitable[AsyncOutputModel]]:
    """
    Creates an asynchronous callable function for processing and returning inference results via queues.

    This method generates a function that spawns an asynchronous task for the `process_request` method.
    It allows triggering an async inference job while returning a job ID for tracking purposes.

    Args:
        cls (type[ModalService]): The class reference for creating an instance of `ModalService`.

    Returns:
        Callable: A function that, when called, spawns an asynchronous task and returns an AsyncOutputModel with job ID.

    Example:
        >>> async_fn = ModalService.async_call(MyApp)
        >>> result = async_fn(model_name="example_model", input_data)
        >>> print(result)
        AsyncOutputModel(job_id="some_job_id")
    """

    async def fn(model_name: str, input_data: BaseModel) -> AsyncOutputModel:
        # input_data should be an AsyncInputModel based on FastAPI usage
        if isinstance(input_data, AsyncInputModel):
            input_list = [input_data]
        else:
            # Fallback for other BaseModel types - shouldn't happen in normal usage
            input_list = [AsyncInputModel(message=input_data)]

        # Create a mock instance for spawning - this is a limitation of the current design
        # In practice, this would need to be refactored to work with Modal's class instantiation
        service_instance = cls()
        service_instance.model_name = model_name
        call = await service_instance.process_request.spawn.aio(input_list)
        return AsyncOutputModel(job_id=call.object_id)

    return fn

load_artefacts()

Loads model artifacts and initializes the inference instance.

This method is called when the Modal container starts up. It: 1. Retrieves model-specific settings from configuration 2. Initializes the inference implementation with the model settings 3. Sets up the model for inference 4. Initializes volume reloading if configured

The method is decorated with @modal.enter() to ensure it runs during container startup.

Source code in modalkit/modal_service.py
@modal.enter()
def load_artefacts(self) -> None:
    """
    Loads model artifacts and initializes the inference instance.

    This method is called when the Modal container starts up. It:
    1. Retrieves model-specific settings from configuration
    2. Initializes the inference implementation with the model settings
    3. Sets up the model for inference
    4. Initializes volume reloading if configured

    The method is decorated with @modal.enter() to ensure it runs during container startup.
    """
    settings = self.modal_utils.settings
    self._model_inference_kwargs = settings.model_settings.model_entries[self.model_name]

    self._model_inference_instance: InferencePipeline = self.inference_implementation(
        model_name=self.model_name,
        all_model_data_folder=str(settings.model_settings.local_model_repository_folder),
        common_settings=settings.model_settings.common,
        **self._model_inference_kwargs,
    )

    # Initialize volume reloading if configured
    self._last_reload_time = time.time()
    self._reload_interval = settings.app_settings.deployment_config.volume_reload_interval_seconds

process_request(input_list)

Processes a batch of inference requests.

Parameters:

Name Type Description Default
input_list list[Union[SyncInputModel, AsyncInputModel]]

The list of input models containing either sync or async requests

required

Returns:

Type Description
list[InferenceOutputModel]

list[InferenceOutputModel]: The list of processed outputs conforming to the model's output schema

Source code in modalkit/modal_service.py
@modal.batched(
    max_batch_size=modal_utils.settings.app_settings.batch_config.max_batch_size,
    wait_ms=modal_utils.settings.app_settings.batch_config.wait_ms,
)
def process_request(self, input_list: list[Union[SyncInputModel, AsyncInputModel]]) -> list[InferenceOutputModel]:
    """
    Processes a batch of inference requests.

    Args:
        input_list (list[Union[SyncInputModel, AsyncInputModel]]): The list of input models containing either
            sync or async requests

    Returns:
        list[InferenceOutputModel]: The list of processed outputs conforming to the model's output schema
    """
    batch_size = len(input_list)
    logger.info(f"Received batch of {batch_size} input requests")

    try:
        # Reload volumes if needed before processing the request
        self._reload_volumes_if_needed()

        # Run Inference. Outputs are expected to be in the same order as the inputs
        messages = [input_data.message for input_data in input_list]
        raw_output_list = self._model_inference_instance.run_inference(messages)
        logger.info(
            f"Statuses of the {batch_size} processed requests: {[output.status for output in raw_output_list]}"
        )

        # For any requests that were async, return the response to the appropriate queue
        for message_idx, (input_data, raw_output_data) in enumerate(zip(input_list, raw_output_list)):
            if isinstance(input_data, AsyncInputModel):
                self.send_async_response(message_idx, raw_output_data, input_data)

    # Unhappy path: On internal error, return error outputs to the queues of all async messages
    # and kill the container if a CUDA error was encountered
    except Exception as e:
        if "CUDA error" in str(e):
            logger.error("Exiting container due to CUDA error. This is potentially due to a hardware issue")
            modal.experimental.stop_fetching_inputs()
        err_msg = f"Internal Server Error. Error log: {e}"
        logger.error(f"Error processing batch: {err_msg}")

        for message_idx, input_data in enumerate(input_list):
            if isinstance(input_data, AsyncInputModel):
                error_response = DelayedFailureOutputModel(
                    status="error", error=err_msg, original_message=input_data
                )
                self.send_async_response(message_idx, error_response, input_data)
        raise HTTPException(status_code=500, detail=err_msg) from e
    else:
        return raw_output_list

send_async_response(message_idx, raw_output_data, input_data)

Sends the inference result to the success or failure queues depending on the message status. Queue functionality is optional - only attempts to send if queue names are provided.

Parameters:

Name Type Description Default
message_idx int

Index of the message in the batch (for logging)

required
raw_output_data InferenceOutputModel

The processed output result

required
input_data AsyncInputModel

Object containing the async input data

required
Source code in modalkit/modal_service.py
def send_async_response(
    self, message_idx: int, raw_output_data: InferenceOutputModel, input_data: AsyncInputModel
) -> None:
    """
    Sends the inference result to the success or failure queues depending on the message status.
    Queue functionality is optional - only attempts to send if queue names are provided.

    Args:
        message_idx: Index of the message in the batch (for logging)
        raw_output_data (InferenceOutputModel): The processed output result
        input_data (AsyncInputModel): Object containing the async input data
    """
    # Only append metadata for regular inference outputs, not DelayedFailureOutputModel
    # DelayedFailureOutputModel already contains the original message with its metadata
    if not isinstance(raw_output_data, DelayedFailureOutputModel):
        # InferenceOutputModel allows extra fields - use setattr to avoid type checker issues
        raw_output_data.meta = input_data.meta

    if raw_output_data.status == "success":
        success_queue = input_data.success_queue
        if success_queue:  # Only send if queue name is provided
            # Use asyncio.create_task to avoid blocking the batch processing
            import asyncio

            try:
                loop = asyncio.get_running_loop()
                task = loop.create_task(
                    self._send_to_queue(success_queue, raw_output_data.model_dump_json(exclude={"error"}))
                )
                # Don't await - fire and forget to avoid blocking batch processing
                task.add_done_callback(lambda t: self._log_queue_result(t, success_queue, "success"))
            except RuntimeError:
                # No running loop, use synchronous fallback
                success = asyncio.run(
                    self._send_to_queue(success_queue, raw_output_data.model_dump_json(exclude={"error"}))
                )
                if not success:
                    logger.warning(f"Failed to send success response to queue: {success_queue}")
        else:
            logger.debug("No success queue specified, skipping queue response")
    else:
        failure_queue = input_data.failure_queue
        if failure_queue:  # Only send if queue name is provided
            # Use asyncio.create_task to avoid blocking the batch processing
            import asyncio

            try:
                loop = asyncio.get_running_loop()
                task = loop.create_task(self._send_to_queue(failure_queue, raw_output_data.model_dump_json()))
                # Don't await - fire and forget to avoid blocking batch processing
                task.add_done_callback(lambda t: self._log_queue_result(t, failure_queue, "failure"))
            except RuntimeError:
                # No running loop, use synchronous fallback
                success = asyncio.run(self._send_to_queue(failure_queue, raw_output_data.model_dump_json()))
                if not success:
                    logger.warning(f"Failed to send failure response to queue: {failure_queue}")
        else:
            logger.debug("No failure queue specified, skipping queue response")

sync_call() staticmethod

Creates a synchronous callable function for processing inference requests. Each request is processed individually to maintain immediate response times. For batch processing, use async endpoints.

This method generates a function that triggers the process method of the ModalService class. It allows synchronous inference processing with input data passed to the model.

Parameters:

Name Type Description Default
cls type[ModalService]

The class reference for creating an instance of ModalService.

required

Returns:

Name Type Description
Callable Callable[[str, BaseModel], Awaitable[BaseModel]]

A function that, when called, executes a synchronous inference call and returns the result.

Example

sync_fn = ModalService.sync_call(MyApp) result = sync_fn(model_name="example_model", input_data) print(result) InferenceOutputModel(status="success", ...)

Source code in modalkit/modal_service.py
@staticmethod
def sync_call(cls: type["ModalService"]) -> Callable[[str, BaseModel], Awaitable[BaseModel]]:
    """
    Creates a synchronous callable function for processing inference requests.
    Each request is processed individually to maintain immediate response times.
    For batch processing, use async endpoints.

    This method generates a function that triggers the `process` method of the `ModalService` class.
    It allows synchronous inference processing with input data passed to the model.

    Args:
        cls (type[ModalService]): The class reference for creating an instance of `ModalService`.

    Returns:
        Callable: A function that, when called, executes a synchronous inference call and returns the result.

    Example:
        >>> sync_fn = ModalService.sync_call(MyApp)
        >>> result = sync_fn(model_name="example_model", input_data)
        >>> print(result)
        InferenceOutputModel(status="success", ...)
    """

    async def fn(model_name: str, input_data: BaseModel) -> BaseModel:
        # input_data should be a SyncInputModel based on FastAPI usage
        if isinstance(input_data, SyncInputModel):
            input_list = [input_data]
        else:
            # Fallback for other BaseModel types
            input_list = [SyncInputModel(message=input_data)]

        # Create a mock instance for remote calls - this is a limitation of the current design
        service_instance = cls()
        service_instance.model_name = model_name
        results = await service_instance.process_request.remote.aio(input_list)
        # Return the first (and only) result for sync calls
        return results[0]

    return fn

options: show_root_heading: true show_source: false

Configuration class for handling Modal-specific operations. Provide many helper methods to permit shorthand usage of Modal, in app code

Additionally, has some staticmethods that can be used without instantiating the class.

Source code in modalkit/modal_config.py
class ModalConfig:
    """
    Configuration class for handling Modal-specific operations.
    Provide many helper methods to permit shorthand usage of Modal, in app code

    Additionally, has some staticmethods that can be used without instantiating the class.
    """

    def __init__(self, settings: Optional[Settings] = None):
        if settings is None:
            self.settings = Settings()
        else:
            self.settings = settings

        # Volumes
        self._volumes: dict[str, modal.Volume] = {}

    @property
    def cloud_bucket_mounts(self) -> dict[str, modal.CloudBucketMount]:
        """
        Gets the Modal cloud bucket mounts based on configuration.

        Returns:
            dict: A dictionary mapping mount points to Modal CloudBucketMount objects
        """
        cloud_mounts = {}
        for mount_config in self.app_settings.deployment_config.cloud_bucket_mounts:
            # Build CloudBucketMount with explicit parameters
            cloud_mount = modal.CloudBucketMount(
                bucket_name=mount_config.bucket_name,
                read_only=mount_config.read_only,
                requester_pays=mount_config.requester_pays,
                bucket_endpoint_url=mount_config.bucket_endpoint_url,
                key_prefix=mount_config.key_prefix,
                secret=modal.Secret.from_name(mount_config.secret) if mount_config.secret else None,
                oidc_auth_role_arn=mount_config.oidc_auth_role_arn,
            )

            cloud_mounts[mount_config.mount_point] = cloud_mount

        return cloud_mounts

    @property
    def volumes(self) -> dict[str, modal.Volume]:
        """
        Gets the Modal volumes based on the deployment config.
        Returns cached volumes if already computed.

        Returns:
            dict[str, modal.Volume]: Dictionary of Modal volumes
        """
        if not self._volumes:
            logger.info("Initializing Modal volumes")
            if not self.settings.app_settings.deployment_config.volumes:
                logger.info("No volumes configured in deployment config")
                self._volumes = {}
            else:
                try:
                    self._volumes = {
                        Path(key).as_posix(): modal.Volume.from_name(val)
                        for key, val in self.settings.app_settings.deployment_config.volumes.items()
                    }
                    logger.info(f"Successfully initialized {len(self._volumes)} volumes")
                    for mount_point in self._volumes:
                        logger.debug(f"Volume initialized at mount point: {mount_point}")
                except Exception:
                    logger.exception("Failed to initialize volume")
                    raise
        return self._volumes

    @property
    def all_volumes(self) -> dict[str, Union[modal.Volume, modal.CloudBucketMount]]:
        """
        Gets all volume mounts including both regular volumes and cloud bucket mounts.

        Returns:
            dict: Combined dictionary of Modal volumes and CloudBucketMounts
        """
        combined_volumes: dict[str, Union[modal.Volume, modal.CloudBucketMount]] = {}

        # Add regular volumes
        for key, volume in self.volumes.items():
            combined_volumes[key] = volume

        # Add cloud bucket mounts
        for key, mount in self.cloud_bucket_mounts.items():
            combined_volumes[key] = mount

        return combined_volumes

    @property
    def app_settings(self) -> "AppSettings":
        """
        Gets the application-specific settings.

        Returns:
            AppSettings: The application configuration settings
        """
        return self.settings.app_settings

    @property
    def model_settings(self) -> "ModelSettings":
        """
        Gets the model-specific settings.

        Returns:
            ModelSettings: The model configuration settings
        """
        return self.settings.model_settings

    @property
    def app_name(self) -> str:
        """
        Gets the complete application name.

        Returns:
            str: The application name with prefix and postfix
        """
        return self.app_settings.app_prefix + self.app_postfix

    @property
    def app_postfix(self) -> str:
        """
        Gets the application postfix from environment.

        Returns:
            str: The application postfix, defaults to "-dev"
        """
        return os.getenv("MODALKIT_APP_POSTFIX", "-dev")

    # Removed CustomAPIKey property - using Modal proxy auth only

    @property
    def region(self) -> Optional[str]:
        """
        Gets the Modal deployment region.

        Returns:
            Optional[str]: String of the modal deployment region.
        """
        return self.app_settings.deployment_config.region

    def get_image(self) -> modal.Image:
        """
        Creates a Modal container image configuration.

        Returns:
            modal.Image: Configured Modal container image with:
                - Base image (either from registry or debian_slim)
                - Build commands
                - Environment variables
                - Working directory
                - Local file/directory mounts (added via Modal 1.0 API)
        """
        extra_run_commands = []
        if isinstance(self.app_settings.build_config.extra_run_commands, str):
            extra_run_commands.append(self.app_settings.build_config.extra_run_commands)

        envvars = self.app_settings.build_config.env.copy()
        modalkit_config_path = os.getenv("MODALKIT_CONFIG")
        if modalkit_config_path:
            envvars["MODALKIT_CONFIG"] = modalkit_config_path

        # Use Modal's debian_slim as default, or from_registry for custom registries
        if self.app_settings.build_config.image and self.app_settings.build_config.tag:
            # If image looks like a registry URL, use from_registry
            image_ref = f"{self.app_settings.build_config.image}:{self.app_settings.build_config.tag}"
            if "/" in self.app_settings.build_config.image:
                image = modal.Image.from_registry(image_ref)
            else:
                # Default to debian_slim for simple image names
                image = modal.Image.debian_slim()
        else:
            # Default to debian_slim if no image specified
            image = modal.Image.debian_slim()

        image = image.env(envvars).run_commands(extra_run_commands).workdir(self.app_settings.build_config.workdir)

        # Add local mounts using Modal 1.0 API (add files/directories to image)
        for mnt in self.app_settings.deployment_config.mounts:
            if mnt.type == MountType.file:
                image = image.add_local_file(mnt.local_path, mnt.remote_path)
            else:
                image = image.add_local_dir(mnt.local_path, remote_path=mnt.remote_path)

        return image

    def get_app_cls_settings(self) -> dict[str, Any]:
        """
        Gets Modal application class settings.

        Returns:
            dict: Application settings with None values removed, including:
                - Container image configuration (with local mounts embedded)
                - GPU requirements
                - Secrets and concurrency settings
                - Volume configurations
        """
        settings = {
            "region": self.region,
            "image": self.get_image(),
            "gpu": self.app_settings.deployment_config.gpu,
            "secrets": [modal.Secret.from_name(secret) for secret in self.app_settings.deployment_config.secrets],
            "concurrency_limit": self.app_settings.deployment_config.concurrency_limit,
            "allow_concurrent_inputs": self.app_settings.deployment_config.allow_concurrent_inputs,
            "container_idle_timeout": self.app_settings.deployment_config.container_idle_timeout,
            "retries": self.app_settings.deployment_config.retries,
            "volumes": self.all_volumes,
        }
        return {k: v for k, v in settings.items() if v is not None}

    def get_handler_settings(self) -> dict[str, Any]:
        """
        Gets Modal request handler settings.

        Returns:
            dict: Handler settings including:
                - Application image (with local mounts embedded)
                - Required secrets
                - Concurrency settings
        """
        settings = {
            "image": self.get_image(),
            "secrets": [modal.Secret.from_name(secret) for secret in self.app_settings.deployment_config.secrets],
            "allow_concurrent_inputs": self.app_settings.deployment_config.allow_concurrent_inputs_handler,
        }

        return settings

    def get_batched_method_settings(self) -> dict[str, Any]:
        """
        Gets Modal batched method settings.

        Returns:
            dict: batched method including:
                - max_batch_size
                - wait_ms
        """
        return self.app_settings.batch_config.model_dump()

    def get_asgi_app_settings(self) -> dict[str, Any]:
        """
        Gets Modal ASGI app settings for web endpoints.

        Returns:
            dict: ASGI app settings including:
                - requires_proxy_auth: Whether to enable Modal proxy authentication
        """
        return {"requires_proxy_auth": self.app_settings.deployment_config.secure}

    def reload_volumes(self) -> None:
        """
        Reloads the Modal volumes.
        Handles errors gracefully and provides detailed logging of the process.
        """
        if not self._volumes:
            logger.info("No volumes initialized yet, skipping reload")
            return

        logger.info(f"Starting volume reload for {len(self._volumes)} volumes")
        for mount_point, volume in self._volumes.items():
            try:
                logger.debug(f"Reloading volume at mount point: {mount_point}")
                volume.reload()
                logger.debug(f"Successfully reloaded volume at mount point: {mount_point}")
            except Exception:
                logger.exception(f"Failed to reload volume at mount point {mount_point}")
                # We don't re-raise as we want to continue with other volumes
                continue
        logger.info("Volume reload completed")

all_volumes property

Gets all volume mounts including both regular volumes and cloud bucket mounts.

Returns:

Name Type Description
dict dict[str, Union[Volume, CloudBucketMount]]

Combined dictionary of Modal volumes and CloudBucketMounts

app_name property

Gets the complete application name.

Returns:

Name Type Description
str str

The application name with prefix and postfix

app_postfix property

Gets the application postfix from environment.

Returns:

Name Type Description
str str

The application postfix, defaults to "-dev"

app_settings property

Gets the application-specific settings.

Returns:

Name Type Description
AppSettings AppSettings

The application configuration settings

cloud_bucket_mounts property

Gets the Modal cloud bucket mounts based on configuration.

Returns:

Name Type Description
dict dict[str, CloudBucketMount]

A dictionary mapping mount points to Modal CloudBucketMount objects

model_settings property

Gets the model-specific settings.

Returns:

Name Type Description
ModelSettings ModelSettings

The model configuration settings

region property

Gets the Modal deployment region.

Returns:

Type Description
Optional[str]

Optional[str]: String of the modal deployment region.

volumes property

Gets the Modal volumes based on the deployment config. Returns cached volumes if already computed.

Returns:

Type Description
dict[str, Volume]

dict[str, modal.Volume]: Dictionary of Modal volumes

get_app_cls_settings()

Gets Modal application class settings.

Returns:

Name Type Description
dict dict[str, Any]

Application settings with None values removed, including: - Container image configuration (with local mounts embedded) - GPU requirements - Secrets and concurrency settings - Volume configurations

Source code in modalkit/modal_config.py
def get_app_cls_settings(self) -> dict[str, Any]:
    """
    Gets Modal application class settings.

    Returns:
        dict: Application settings with None values removed, including:
            - Container image configuration (with local mounts embedded)
            - GPU requirements
            - Secrets and concurrency settings
            - Volume configurations
    """
    settings = {
        "region": self.region,
        "image": self.get_image(),
        "gpu": self.app_settings.deployment_config.gpu,
        "secrets": [modal.Secret.from_name(secret) for secret in self.app_settings.deployment_config.secrets],
        "concurrency_limit": self.app_settings.deployment_config.concurrency_limit,
        "allow_concurrent_inputs": self.app_settings.deployment_config.allow_concurrent_inputs,
        "container_idle_timeout": self.app_settings.deployment_config.container_idle_timeout,
        "retries": self.app_settings.deployment_config.retries,
        "volumes": self.all_volumes,
    }
    return {k: v for k, v in settings.items() if v is not None}

get_asgi_app_settings()

Gets Modal ASGI app settings for web endpoints.

Returns:

Name Type Description
dict dict[str, Any]

ASGI app settings including: - requires_proxy_auth: Whether to enable Modal proxy authentication

Source code in modalkit/modal_config.py
def get_asgi_app_settings(self) -> dict[str, Any]:
    """
    Gets Modal ASGI app settings for web endpoints.

    Returns:
        dict: ASGI app settings including:
            - requires_proxy_auth: Whether to enable Modal proxy authentication
    """
    return {"requires_proxy_auth": self.app_settings.deployment_config.secure}

get_batched_method_settings()

Gets Modal batched method settings.

Returns:

Name Type Description
dict dict[str, Any]

batched method including: - max_batch_size - wait_ms

Source code in modalkit/modal_config.py
def get_batched_method_settings(self) -> dict[str, Any]:
    """
    Gets Modal batched method settings.

    Returns:
        dict: batched method including:
            - max_batch_size
            - wait_ms
    """
    return self.app_settings.batch_config.model_dump()

get_handler_settings()

Gets Modal request handler settings.

Returns:

Name Type Description
dict dict[str, Any]

Handler settings including: - Application image (with local mounts embedded) - Required secrets - Concurrency settings

Source code in modalkit/modal_config.py
def get_handler_settings(self) -> dict[str, Any]:
    """
    Gets Modal request handler settings.

    Returns:
        dict: Handler settings including:
            - Application image (with local mounts embedded)
            - Required secrets
            - Concurrency settings
    """
    settings = {
        "image": self.get_image(),
        "secrets": [modal.Secret.from_name(secret) for secret in self.app_settings.deployment_config.secrets],
        "allow_concurrent_inputs": self.app_settings.deployment_config.allow_concurrent_inputs_handler,
    }

    return settings

get_image()

Creates a Modal container image configuration.

Returns:

Type Description
Image

modal.Image: Configured Modal container image with: - Base image (either from registry or debian_slim) - Build commands - Environment variables - Working directory - Local file/directory mounts (added via Modal 1.0 API)

Source code in modalkit/modal_config.py
def get_image(self) -> modal.Image:
    """
    Creates a Modal container image configuration.

    Returns:
        modal.Image: Configured Modal container image with:
            - Base image (either from registry or debian_slim)
            - Build commands
            - Environment variables
            - Working directory
            - Local file/directory mounts (added via Modal 1.0 API)
    """
    extra_run_commands = []
    if isinstance(self.app_settings.build_config.extra_run_commands, str):
        extra_run_commands.append(self.app_settings.build_config.extra_run_commands)

    envvars = self.app_settings.build_config.env.copy()
    modalkit_config_path = os.getenv("MODALKIT_CONFIG")
    if modalkit_config_path:
        envvars["MODALKIT_CONFIG"] = modalkit_config_path

    # Use Modal's debian_slim as default, or from_registry for custom registries
    if self.app_settings.build_config.image and self.app_settings.build_config.tag:
        # If image looks like a registry URL, use from_registry
        image_ref = f"{self.app_settings.build_config.image}:{self.app_settings.build_config.tag}"
        if "/" in self.app_settings.build_config.image:
            image = modal.Image.from_registry(image_ref)
        else:
            # Default to debian_slim for simple image names
            image = modal.Image.debian_slim()
    else:
        # Default to debian_slim if no image specified
        image = modal.Image.debian_slim()

    image = image.env(envvars).run_commands(extra_run_commands).workdir(self.app_settings.build_config.workdir)

    # Add local mounts using Modal 1.0 API (add files/directories to image)
    for mnt in self.app_settings.deployment_config.mounts:
        if mnt.type == MountType.file:
            image = image.add_local_file(mnt.local_path, mnt.remote_path)
        else:
            image = image.add_local_dir(mnt.local_path, remote_path=mnt.remote_path)

    return image

reload_volumes()

Reloads the Modal volumes. Handles errors gracefully and provides detailed logging of the process.

Source code in modalkit/modal_config.py
def reload_volumes(self) -> None:
    """
    Reloads the Modal volumes.
    Handles errors gracefully and provides detailed logging of the process.
    """
    if not self._volumes:
        logger.info("No volumes initialized yet, skipping reload")
        return

    logger.info(f"Starting volume reload for {len(self._volumes)} volumes")
    for mount_point, volume in self._volumes.items():
        try:
            logger.debug(f"Reloading volume at mount point: {mount_point}")
            volume.reload()
            logger.debug(f"Successfully reloaded volume at mount point: {mount_point}")
        except Exception:
            logger.exception(f"Failed to reload volume at mount point {mount_point}")
            # We don't re-raise as we want to continue with other volumes
            continue
    logger.info("Volume reload completed")

options: show_root_heading: true show_source: false

Input/Output Models

Async Models

Bases: BaseModel, Generic[T]

Model for asynchronous operation inputs, wrapping the message with queue information.

Attributes:

Name Type Description
message T

The actual input data, must be a Pydantic model

success_queue str

SQS queue name for successful results

failure_queue str

SQS queue name for error messages

meta dict

Additional metadata to be passed through the processing pipeline

Notes

The model_config ensures no extra fields are allowed in the input

Source code in modalkit/iomodel.py
class AsyncInputModel(BaseModel, Generic[T]):
    """
    Model for asynchronous operation inputs, wrapping the message with queue information.

    Attributes:
        message (T): The actual input data, must be a Pydantic model
        success_queue (str): SQS queue name for successful results
        failure_queue (str): SQS queue name for error messages
        meta (dict): Additional metadata to be passed through the processing pipeline

    Notes:
        The model_config ensures no extra fields are allowed in the input
    """

    model_config = ConfigDict(extra="forbid")
    message: T
    success_queue: str = ""
    failure_queue: str = ""
    meta: dict = {}

options: show_root_heading: true show_source: false

Bases: BaseModel

Model for asynchronous operation outputs.

Attributes:

Name Type Description
job_id str

Unique identifier for tracking the asynchronous job

Source code in modalkit/iomodel.py
class AsyncOutputModel(BaseModel):
    """
    Model for asynchronous operation outputs.

    Attributes:
        job_id (str): Unique identifier for tracking the asynchronous job
    """

    job_id: str

options: show_root_heading: true show_source: false

Task Queue

Bases: Protocol

Protocol for queue backends.

Implement this interface to create custom queue backends. The interface is intentionally minimal - just message sending.

Source code in modalkit/task_queue.py
class QueueBackend(Protocol):
    """
    Protocol for queue backends.

    Implement this interface to create custom queue backends.
    The interface is intentionally minimal - just message sending.
    """

    async def send_message(self, queue_name: str, message: str) -> bool:
        """
        Send a message to the specified queue.

        Args:
            queue_name: Name/identifier of the queue
            message: Message content (JSON string)

        Returns:
            True if message was sent successfully, False otherwise
        """
        ...

send_message(queue_name, message) async

Send a message to the specified queue.

Parameters:

Name Type Description Default
queue_name str

Name/identifier of the queue

required
message str

Message content (JSON string)

required

Returns:

Type Description
bool

True if message was sent successfully, False otherwise

Source code in modalkit/task_queue.py
async def send_message(self, queue_name: str, message: str) -> bool:
    """
    Send a message to the specified queue.

    Args:
        queue_name: Name/identifier of the queue
        message: Message content (JSON string)

    Returns:
        True if message was sent successfully, False otherwise
    """
    ...

options: show_root_heading: true show_source: false

Simple in-memory backend for testing and development. Messages are just logged, not actually queued.

Source code in modalkit/task_queue.py
class InMemoryBackend:
    """
    Simple in-memory backend for testing and development.
    Messages are just logged, not actually queued.
    """

    async def send_message(self, queue_name: str, message: str) -> bool:
        """Send message to in-memory log"""
        if not isinstance(message, str):
            raise TypeValidationError(f"Expected string, got {type(message)}")

        logger.info(f"[IN-MEMORY QUEUE] {queue_name}: {message}")
        return True

send_message(queue_name, message) async

Send message to in-memory log

Source code in modalkit/task_queue.py
async def send_message(self, queue_name: str, message: str) -> bool:
    """Send message to in-memory log"""
    if not isinstance(message, str):
        raise TypeValidationError(f"Expected string, got {type(message)}")

    logger.info(f"[IN-MEMORY QUEUE] {queue_name}: {message}")
    return True

options: show_root_heading: true show_source: false

Direct AWS SQS backend implementation.

This is a basic implementation - for production use, consider implementing a custom backend with proper error handling, retry logic, etc.

Source code in modalkit/task_queue.py
class SQSBackend:
    """
    Direct AWS SQS backend implementation.

    This is a basic implementation - for production use, consider
    implementing a custom backend with proper error handling,
    retry logic, etc.
    """

    def __init__(self, **kwargs: Any) -> None:
        try:
            import boto3  # type: ignore

            self.client = boto3.client("sqs")
            self.available = True
        except ImportError:
            self.available = False
            logger.warning("boto3 not available for SQS backend")

    async def send_message(self, queue_name: str, message: str) -> bool:
        """Send message to SQS queue"""
        if not self.available:
            logger.warning("boto3 not available, cannot send SQS message")
            return False

        if not isinstance(message, str):
            raise TypeValidationError(f"Expected string, got {type(message)}")

        # Get or create queue
        try:
            sqs_response = self.client.get_queue_url(QueueName=queue_name)
        except self.client.exceptions.QueueDoesNotExist:
            logger.debug(f"Queue: {queue_name} does not exist. Creating it.")
            try:
                sqs_response = self.client.create_queue(QueueName=queue_name)
                logger.debug(f"Created queue with url: {sqs_response['QueueUrl']}")
            except Exception as create_error:
                logger.error(f"Failed to create SQS queue {queue_name}: {create_error}")
                return False
        except Exception as e:
            logger.error(f"Error while fetching SQS queue URL: {e}")
            return False
        else:
            queue_url = sqs_response["QueueUrl"]

        # Send message
        try:
            sqs_response = self.client.send_message(QueueUrl=queue_url, MessageBody=message)
        except Exception as e:
            logger.error(f"Failed to send message to SQS queue: {e}")
            return False
        else:
            logger.info(f"Message ID: {sqs_response['MessageId']} sent to queue: {queue_url}")
            return True

send_message(queue_name, message) async

Send message to SQS queue

Source code in modalkit/task_queue.py
async def send_message(self, queue_name: str, message: str) -> bool:
    """Send message to SQS queue"""
    if not self.available:
        logger.warning("boto3 not available, cannot send SQS message")
        return False

    if not isinstance(message, str):
        raise TypeValidationError(f"Expected string, got {type(message)}")

    # Get or create queue
    try:
        sqs_response = self.client.get_queue_url(QueueName=queue_name)
    except self.client.exceptions.QueueDoesNotExist:
        logger.debug(f"Queue: {queue_name} does not exist. Creating it.")
        try:
            sqs_response = self.client.create_queue(QueueName=queue_name)
            logger.debug(f"Created queue with url: {sqs_response['QueueUrl']}")
        except Exception as create_error:
            logger.error(f"Failed to create SQS queue {queue_name}: {create_error}")
            return False
    except Exception as e:
        logger.error(f"Error while fetching SQS queue URL: {e}")
        return False
    else:
        queue_url = sqs_response["QueueUrl"]

    # Send message
    try:
        sqs_response = self.client.send_message(QueueUrl=queue_url, MessageBody=message)
    except Exception as e:
        logger.error(f"Failed to send message to SQS queue: {e}")
        return False
    else:
        logger.info(f"Message ID: {sqs_response['MessageId']} sent to queue: {queue_url}")
        return True

options: show_root_heading: true show_source: false