Skip to content

API Reference

This section provides detailed documentation for AmbiAlert's Python API.

Main Module

ambi_alert.main.AmbiAlert

Main class that coordinates all AmbiAlert functionality.

Source code in ambi_alert/main.py
class AmbiAlert:
    """Main class that coordinates all AmbiAlert functionality."""

    def __init__(
        self,
        model: Optional[HfApiModel] = None,
        alert_backend: Optional[AlertBackend] = None,
        db_path: str = "ambi_alert.db",
        check_interval: int = 3600,  # 1 hour
    ):
        """Initialize AmbiAlert.

        Args:
            model: Optional HfApiModel instance to share across components
            alert_backend: Optional alert backend for notifications
            db_path: Path to the SQLite database
            check_interval: How often to check for updates (in seconds)
        """
        self.model = model or HfApiModel()
        self.db = DatabaseManager(db_path)
        self.monitor = WebsiteMonitor(self.model)
        self.alert_manager = AlertManager(alert_backend)
        self.check_interval = check_interval

        # Create webpage tool instance
        self.webpage_tool = VisitWebpageTool()

        # Setup specialized agents
        self.search_agent = ToolCallingAgent(
            tools=[DuckDuckGoSearchTool()],  # Don't include VisitWebpageTool here
            model=self.model,
            name="search_agent",
            description="This agent performs web searches to find relevant URLs.",
        )

        self.query_agent = QueryExpanderAgent(self.model)

        self.relevance_agent = ToolCallingAgent(
            tools=[],  # No tools needed, just LLM abilities
            model=self.model,
            name="relevance_agent",
            description="This agent analyzes content changes to determine relevance and generate summaries.",
        )

        # Setup manager agent to coordinate
        self.manager_agent = CodeAgent(
            tools=[],
            model=self.model,
            managed_agents=[self.search_agent, self.query_agent, self.relevance_agent],
            name="manager_agent",
            description="This agent coordinates the search, query expansion, and relevance checking process.",
        )

    async def __aenter__(self) -> "AmbiAlert":
        """Async context manager entry.

        Returns:
            Self for use in async with statements
        """
        await self.db._init_db()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.monitor.close()
        await self.db.close()

    def is_valid_url(self, url: str) -> bool:
        """Check if a URL is valid.

        Args:
            url: URL to validate

        Returns:
            True if URL is valid
        """
        try:
            result = urlparse(url)
            return all([result.scheme, result.netloc])
        except Exception:
            return False

    def expand_query(self, query: str) -> list[str]:
        """Use query agent to expand the search query.

        Args:
            query: The original search query

        Returns:
            List of expanded queries
        """
        return self.query_agent.run(query)

    def find_relevant_urls(self, query: str) -> list[str]:
        """Use search agent to find relevant URLs.

        Args:
            query: The search query

        Returns:
            List of relevant URLs
        """
        prompt = f"""Search for relevant web pages about: {query}
        Focus on official sources and news sites.
        Return only the URLs, one per line, without any quotes or additional text."""

        response = self.search_agent.run(prompt)

        # Clean and validate URLs
        urls = []
        for line in response.strip().split("\n"):
            url = line.strip().strip('"').strip("'")
            if self.is_valid_url(url):
                urls.append(url)

        return urls[:5]  # Limit to top 5 valid URLs

    def check_content_relevance(self, content: str, query: str) -> tuple[bool, str]:
        """Use relevance agent to check content relevance and generate summary.

        Args:
            content: The webpage content
            query: The original search query

        Returns:
            Tuple of (is_relevant, explanation/summary)
        """
        prompt = f"""Analyze if this content is relevant to the query "{query}":
        {content[:2000]}...

        First line: Answer YES or NO
        Following lines: Brief explanation why, and if relevant, summarize the key points."""

        response = self.relevance_agent.run(prompt)
        lines = response.strip().split("\n")
        is_relevant = lines[0].strip().upper().startswith("YES")
        explanation = "\n".join(lines[1:]).strip()
        return is_relevant, explanation

    async def add_monitoring_query(self, query: str) -> None:
        """Add a new query to monitor.

        Args:
            query: The search query to monitor
        """
        print(f"\nProcessing query: {query}")

        # Use manager agent to coordinate the process
        expanded_queries = self.expand_query(query)
        print(f"Expanded into {len(expanded_queries)} queries")

        for exp_query in expanded_queries:
            print(f"\nSearching for: {exp_query}")
            urls = self.find_relevant_urls(exp_query)
            print(f"Found {len(urls)} URLs")

            for url in urls:
                try:
                    print(f"Checking URL: {url}")
                    content = await self.monitor.fetch_content(url)
                    if not content:
                        print(f"No content retrieved from {url}")
                        continue

                    content_hash = self.monitor.get_content_hash_from_content(content)
                    if content_hash:
                        await self.db.add_url(url, query, content_hash)
                        print(f"Added URL to monitoring: {url}")
                except Exception as e:
                    print(f"Error processing URL {url}: {e}")

    async def check_for_updates(self) -> None:
        """Check all monitored URLs for updates."""
        urls = await self.db.get_urls_to_check()
        print(f"\nChecking {len(urls)} URLs for updates...")

        for url_data in urls:
            try:
                print(f"\nChecking: {url_data.url}")
                content = await self.monitor.fetch_content(url_data.url)
                if not content:
                    print(f"No content retrieved from {url_data.url}")
                    continue

                new_hash = self.monitor.get_content_hash_from_content(content)
                if not new_hash:
                    continue

                # If content has changed
                if new_hash != url_data.last_content_hash:
                    print("Content changed, checking relevance...")

                    # Check if changes are relevant
                    is_relevant, summary = await self.monitor.check_relevance(content, url_data.query)

                    if is_relevant:
                        print("Relevant changes found, sending alert...")
                        # Send alert
                        alert_sent = await self.alert_manager.send_change_alert(url_data.url, url_data.query, summary)
                        if alert_sent:
                            print("Alert sent successfully")
                        else:
                            print("Failed to send alert")

                # Update the database
                await self.db.update_url_check(url_data.id, new_hash)
            except Exception as e:
                print(f"Error checking {url_data.url}: {e}")

    async def run_monitor(self) -> None:
        """Run the monitoring loop indefinitely."""
        print("\nStarting AmbiAlert monitor...")
        print("Press Ctrl+C to stop monitoring.\n")

        while True:
            try:
                await self.check_for_updates()
                print(f"\nSleeping for {self.check_interval} seconds...")
                await asyncio.sleep(self.check_interval)
            except KeyboardInterrupt:
                print("\nStopping AmbiAlert monitor...")
                # Clean up all resources
                await self.monitor.close()
                await self.db.close()
                if isinstance(self.alert_manager.backend, EmailAlertBackend):
                    await self.alert_manager.backend.close()
                break
            except Exception as e:
                print(f"Error in monitoring loop: {e}")
                # Sleep for a bit before retrying
                await asyncio.sleep(60)

Functions

__aenter__() async

Async context manager entry.

Returns:

Type Description
AmbiAlert

Self for use in async with statements

Source code in ambi_alert/main.py
async def __aenter__(self) -> "AmbiAlert":
    """Async context manager entry.

    Returns:
        Self for use in async with statements
    """
    await self.db._init_db()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Source code in ambi_alert/main.py
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.monitor.close()
    await self.db.close()

__init__(model=None, alert_backend=None, db_path='ambi_alert.db', check_interval=3600)

Initialize AmbiAlert.

Parameters:

Name Type Description Default
model Optional[HfApiModel]

Optional HfApiModel instance to share across components

None
alert_backend Optional[AlertBackend]

Optional alert backend for notifications

None
db_path str

Path to the SQLite database

'ambi_alert.db'
check_interval int

How often to check for updates (in seconds)

3600
Source code in ambi_alert/main.py
def __init__(
    self,
    model: Optional[HfApiModel] = None,
    alert_backend: Optional[AlertBackend] = None,
    db_path: str = "ambi_alert.db",
    check_interval: int = 3600,  # 1 hour
):
    """Initialize AmbiAlert.

    Args:
        model: Optional HfApiModel instance to share across components
        alert_backend: Optional alert backend for notifications
        db_path: Path to the SQLite database
        check_interval: How often to check for updates (in seconds)
    """
    self.model = model or HfApiModel()
    self.db = DatabaseManager(db_path)
    self.monitor = WebsiteMonitor(self.model)
    self.alert_manager = AlertManager(alert_backend)
    self.check_interval = check_interval

    # Create webpage tool instance
    self.webpage_tool = VisitWebpageTool()

    # Setup specialized agents
    self.search_agent = ToolCallingAgent(
        tools=[DuckDuckGoSearchTool()],  # Don't include VisitWebpageTool here
        model=self.model,
        name="search_agent",
        description="This agent performs web searches to find relevant URLs.",
    )

    self.query_agent = QueryExpanderAgent(self.model)

    self.relevance_agent = ToolCallingAgent(
        tools=[],  # No tools needed, just LLM abilities
        model=self.model,
        name="relevance_agent",
        description="This agent analyzes content changes to determine relevance and generate summaries.",
    )

    # Setup manager agent to coordinate
    self.manager_agent = CodeAgent(
        tools=[],
        model=self.model,
        managed_agents=[self.search_agent, self.query_agent, self.relevance_agent],
        name="manager_agent",
        description="This agent coordinates the search, query expansion, and relevance checking process.",
    )

add_monitoring_query(query) async

Add a new query to monitor.

Parameters:

Name Type Description Default
query str

The search query to monitor

required
Source code in ambi_alert/main.py
async def add_monitoring_query(self, query: str) -> None:
    """Add a new query to monitor.

    Args:
        query: The search query to monitor
    """
    print(f"\nProcessing query: {query}")

    # Use manager agent to coordinate the process
    expanded_queries = self.expand_query(query)
    print(f"Expanded into {len(expanded_queries)} queries")

    for exp_query in expanded_queries:
        print(f"\nSearching for: {exp_query}")
        urls = self.find_relevant_urls(exp_query)
        print(f"Found {len(urls)} URLs")

        for url in urls:
            try:
                print(f"Checking URL: {url}")
                content = await self.monitor.fetch_content(url)
                if not content:
                    print(f"No content retrieved from {url}")
                    continue

                content_hash = self.monitor.get_content_hash_from_content(content)
                if content_hash:
                    await self.db.add_url(url, query, content_hash)
                    print(f"Added URL to monitoring: {url}")
            except Exception as e:
                print(f"Error processing URL {url}: {e}")

check_content_relevance(content, query)

Use relevance agent to check content relevance and generate summary.

Parameters:

Name Type Description Default
content str

The webpage content

required
query str

The original search query

required

Returns:

Type Description
tuple[bool, str]

Tuple of (is_relevant, explanation/summary)

Source code in ambi_alert/main.py
def check_content_relevance(self, content: str, query: str) -> tuple[bool, str]:
    """Use relevance agent to check content relevance and generate summary.

    Args:
        content: The webpage content
        query: The original search query

    Returns:
        Tuple of (is_relevant, explanation/summary)
    """
    prompt = f"""Analyze if this content is relevant to the query "{query}":
    {content[:2000]}...

    First line: Answer YES or NO
    Following lines: Brief explanation why, and if relevant, summarize the key points."""

    response = self.relevance_agent.run(prompt)
    lines = response.strip().split("\n")
    is_relevant = lines[0].strip().upper().startswith("YES")
    explanation = "\n".join(lines[1:]).strip()
    return is_relevant, explanation

check_for_updates() async

Check all monitored URLs for updates.

Source code in ambi_alert/main.py
async def check_for_updates(self) -> None:
    """Check all monitored URLs for updates."""
    urls = await self.db.get_urls_to_check()
    print(f"\nChecking {len(urls)} URLs for updates...")

    for url_data in urls:
        try:
            print(f"\nChecking: {url_data.url}")
            content = await self.monitor.fetch_content(url_data.url)
            if not content:
                print(f"No content retrieved from {url_data.url}")
                continue

            new_hash = self.monitor.get_content_hash_from_content(content)
            if not new_hash:
                continue

            # If content has changed
            if new_hash != url_data.last_content_hash:
                print("Content changed, checking relevance...")

                # Check if changes are relevant
                is_relevant, summary = await self.monitor.check_relevance(content, url_data.query)

                if is_relevant:
                    print("Relevant changes found, sending alert...")
                    # Send alert
                    alert_sent = await self.alert_manager.send_change_alert(url_data.url, url_data.query, summary)
                    if alert_sent:
                        print("Alert sent successfully")
                    else:
                        print("Failed to send alert")

            # Update the database
            await self.db.update_url_check(url_data.id, new_hash)
        except Exception as e:
            print(f"Error checking {url_data.url}: {e}")

expand_query(query)

Use query agent to expand the search query.

Parameters:

Name Type Description Default
query str

The original search query

required

Returns:

Type Description
list[str]

List of expanded queries

Source code in ambi_alert/main.py
def expand_query(self, query: str) -> list[str]:
    """Use query agent to expand the search query.

    Args:
        query: The original search query

    Returns:
        List of expanded queries
    """
    return self.query_agent.run(query)

find_relevant_urls(query)

Use search agent to find relevant URLs.

Parameters:

Name Type Description Default
query str

The search query

required

Returns:

Type Description
list[str]

List of relevant URLs

Source code in ambi_alert/main.py
def find_relevant_urls(self, query: str) -> list[str]:
    """Use search agent to find relevant URLs.

    Args:
        query: The search query

    Returns:
        List of relevant URLs
    """
    prompt = f"""Search for relevant web pages about: {query}
    Focus on official sources and news sites.
    Return only the URLs, one per line, without any quotes or additional text."""

    response = self.search_agent.run(prompt)

    # Clean and validate URLs
    urls = []
    for line in response.strip().split("\n"):
        url = line.strip().strip('"').strip("'")
        if self.is_valid_url(url):
            urls.append(url)

    return urls[:5]  # Limit to top 5 valid URLs

is_valid_url(url)

Check if a URL is valid.

Parameters:

Name Type Description Default
url str

URL to validate

required

Returns:

Type Description
bool

True if URL is valid

Source code in ambi_alert/main.py
def is_valid_url(self, url: str) -> bool:
    """Check if a URL is valid.

    Args:
        url: URL to validate

    Returns:
        True if URL is valid
    """
    try:
        result = urlparse(url)
        return all([result.scheme, result.netloc])
    except Exception:
        return False

run_monitor() async

Run the monitoring loop indefinitely.

Source code in ambi_alert/main.py
async def run_monitor(self) -> None:
    """Run the monitoring loop indefinitely."""
    print("\nStarting AmbiAlert monitor...")
    print("Press Ctrl+C to stop monitoring.\n")

    while True:
        try:
            await self.check_for_updates()
            print(f"\nSleeping for {self.check_interval} seconds...")
            await asyncio.sleep(self.check_interval)
        except KeyboardInterrupt:
            print("\nStopping AmbiAlert monitor...")
            # Clean up all resources
            await self.monitor.close()
            await self.db.close()
            if isinstance(self.alert_manager.backend, EmailAlertBackend):
                await self.alert_manager.backend.close()
            break
        except Exception as e:
            print(f"Error in monitoring loop: {e}")
            # Sleep for a bit before retrying
            await asyncio.sleep(60)

options: show_root_heading: true show_source: true

Query Expansion

ambi_alert.query_expander.QueryExpanderAgent

Bases: MultiStepAgent

Agent that expands user queries into more comprehensive search terms.

Source code in ambi_alert/query_expander.py
class QueryExpanderAgent(MultiStepAgent):
    """Agent that expands user queries into more comprehensive search terms."""

    def __init__(self, model: Optional[HfApiModel] = None):
        """Initialize the query expander agent.

        Args:
            model: Optional HfApiModel instance. If None, creates a new one.
        """
        super().__init__(
            tools=[],  # No tools needed, just LLM abilities
            model=model or HfApiModel(),
            name="query_expander",
            description="""This agent specializes in expanding search queries to cover different aspects of a topic.
            It takes a user query and generates multiple related search queries that help capture the full scope
            of what the user wants to monitor.""",
        )

    def initialize_system_prompt(self) -> str:
        """Get the system prompt for this agent.

        Returns:
            The system prompt describing the agent's role
        """
        return """You are a query expansion specialist. Your role is to take user queries and expand them
        into multiple specific search queries that cover different aspects of the topic. You focus on
        creating queries that are:
        1. Specific and well-defined
        2. Cover different aspects of the topic
        3. Include related terms and synonyms
        4. Consider both current state and future developments

        When you receive a query, analyze it and return expanded queries using the final_answer tool.
        Return just the expanded queries, one per line, without any additional text or formatting.

        Example:
        User: "next iPhone"
        Assistant: Let me expand this query.
        <tool>final_answer
        iPhone 16 release date
        iPhone 16 specifications leak
        iPhone 16 Pro features
        Apple smartphone roadmap 2024</tool>"""

    def step(self, memory_step: ActionStep) -> Optional[list[str]]:
        """Execute one step of query expansion.

        Args:
            memory_step: The current memory step

        Returns:
            List of expanded queries if final step, None otherwise
        """
        messages = self.write_memory_to_messages()
        chat_message = self.model(messages)
        memory_step.model_output_message = chat_message

        # Process the response
        response = chat_message.content
        if "final_answer" not in response.lower():
            return None

        # Extract queries from the response
        queries_text = response.split("final_answer")[-1].strip()
        expanded_queries = [
            q.strip()
            for q in queries_text.split("\n")
            if q.strip() and not q.strip().startswith(("-", "*", "•", "<", ">"))
        ]

        # Ensure we have at least one query
        if not expanded_queries:
            expanded_queries = [self.task]  # Fall back to original query

        return expanded_queries

    def run(self, query: str) -> list[str]:
        """Expand a user query into multiple search-optimized queries.

        Args:
            query: The original user query

        Returns:
            A list of expanded search queries
        """
        result = super().run(query)
        if isinstance(result, list):
            return result
        return [query]  # Fallback to original query if something goes wrong

Functions

__init__(model=None)

Initialize the query expander agent.

Parameters:

Name Type Description Default
model Optional[HfApiModel]

Optional HfApiModel instance. If None, creates a new one.

None
Source code in ambi_alert/query_expander.py
def __init__(self, model: Optional[HfApiModel] = None):
    """Initialize the query expander agent.

    Args:
        model: Optional HfApiModel instance. If None, creates a new one.
    """
    super().__init__(
        tools=[],  # No tools needed, just LLM abilities
        model=model or HfApiModel(),
        name="query_expander",
        description="""This agent specializes in expanding search queries to cover different aspects of a topic.
        It takes a user query and generates multiple related search queries that help capture the full scope
        of what the user wants to monitor.""",
    )

initialize_system_prompt()

Get the system prompt for this agent.

Returns:

Type Description
str

The system prompt describing the agent's role

Source code in ambi_alert/query_expander.py
def initialize_system_prompt(self) -> str:
    """Get the system prompt for this agent.

    Returns:
        The system prompt describing the agent's role
    """
    return """You are a query expansion specialist. Your role is to take user queries and expand them
    into multiple specific search queries that cover different aspects of the topic. You focus on
    creating queries that are:
    1. Specific and well-defined
    2. Cover different aspects of the topic
    3. Include related terms and synonyms
    4. Consider both current state and future developments

    When you receive a query, analyze it and return expanded queries using the final_answer tool.
    Return just the expanded queries, one per line, without any additional text or formatting.

    Example:
    User: "next iPhone"
    Assistant: Let me expand this query.
    <tool>final_answer
    iPhone 16 release date
    iPhone 16 specifications leak
    iPhone 16 Pro features
    Apple smartphone roadmap 2024</tool>"""

run(query)

Expand a user query into multiple search-optimized queries.

Parameters:

Name Type Description Default
query str

The original user query

required

Returns:

Type Description
list[str]

A list of expanded search queries

Source code in ambi_alert/query_expander.py
def run(self, query: str) -> list[str]:
    """Expand a user query into multiple search-optimized queries.

    Args:
        query: The original user query

    Returns:
        A list of expanded search queries
    """
    result = super().run(query)
    if isinstance(result, list):
        return result
    return [query]  # Fallback to original query if something goes wrong

step(memory_step)

Execute one step of query expansion.

Parameters:

Name Type Description Default
memory_step ActionStep

The current memory step

required

Returns:

Type Description
Optional[list[str]]

List of expanded queries if final step, None otherwise

Source code in ambi_alert/query_expander.py
def step(self, memory_step: ActionStep) -> Optional[list[str]]:
    """Execute one step of query expansion.

    Args:
        memory_step: The current memory step

    Returns:
        List of expanded queries if final step, None otherwise
    """
    messages = self.write_memory_to_messages()
    chat_message = self.model(messages)
    memory_step.model_output_message = chat_message

    # Process the response
    response = chat_message.content
    if "final_answer" not in response.lower():
        return None

    # Extract queries from the response
    queries_text = response.split("final_answer")[-1].strip()
    expanded_queries = [
        q.strip()
        for q in queries_text.split("\n")
        if q.strip() and not q.strip().startswith(("-", "*", "•", "<", ">"))
    ]

    # Ensure we have at least one query
    if not expanded_queries:
        expanded_queries = [self.task]  # Fall back to original query

    return expanded_queries

options: show_root_heading: true show_source: true

Website Monitoring

ambi_alert.monitor.WebsiteMonitor

Monitors websites for changes and determines relevance.

Source code in ambi_alert/monitor.py
class WebsiteMonitor:
    """Monitors websites for changes and determines relevance."""

    def __init__(self, model: Optional[HfApiModel] = None):
        """Initialize the website monitor.

        Args:
            model: Optional HfApiModel instance for relevance checking
        """
        self.model = model or HfApiModel()
        self.webpage_tool = VisitWebpageTool()
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        """Get or create an aiohttp session."""
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
        return self._session

    async def close(self) -> None:
        """Close the aiohttp session."""
        if self._session and not self._session.closed:
            await self._session.close()
            self._session = None

    def get_content_hash_from_content(self, content: str) -> str:
        """Get a hash of the relevant content from HTML content.

        Args:
            content: The HTML content to hash

        Returns:
            A hash of the page's main content
        """
        if not content:
            return ""

        try:
            # Parse with BeautifulSoup to get main content
            soup = BeautifulSoup(content, "html.parser")

            # Remove scripts, styles, and navigation elements
            for element in soup(["script", "style", "nav", "header", "footer"]):
                element.decompose()

            # Get the main text content
            text = soup.get_text()
            return hashlib.sha256(text.encode()).hexdigest()
        except Exception as e:
            print(f"Error hashing content: {e}")
            return ""

    async def get_content_hash(self, url: str) -> str:
        """Get a hash of the relevant content from a URL.

        Args:
            url: The URL to check

        Returns:
            A hash of the page's main content
        """
        try:
            session = await self._get_session()
            async with session.get(url) as response:
                if response.status == 200:
                    content = await response.text()
                    return self.get_content_hash_from_content(content)
                print(f"Error fetching {url}: HTTP {response.status}")
                return ""
        except Exception as e:
            print(f"Error getting content from {url}: {e}")
            return ""

    async def check_relevance(self, content: str, query: str) -> tuple[bool, str]:
        """Check if content changes are relevant to the original query.

        Args:
            content: The content to check
            query: The original search query

        Returns:
            Tuple of (is_relevant, explanation)
        """
        prompt = f"""Analyze if the following content is relevant to the query "{query}".
        Content: {content[:1000]}...

        Answer with YES or NO, followed by a brief explanation."""

        # Note: Using synchronous model.generate for now as smolagents doesn't support async yet
        response = self.model.generate(prompt)
        lines = response.strip().split("\n")
        is_relevant = lines[0].strip().upper().startswith("YES")
        explanation = "\n".join(lines[1:]).strip()
        return is_relevant, explanation

    async def get_content_summary(self, content: str) -> str:
        """Generate a summary of the changed content.

        Args:
            content: The content to summarize

        Returns:
            A brief summary of the content
        """
        prompt = "Summarize the following content in 2-3 sentences:\n\n" + content[:2000]
        # Note: Using synchronous model.generate for now as smolagents doesn't support async yet
        return self.model.generate(prompt).strip()

    async def fetch_content(self, url: str) -> Optional[str]:
        """Fetch content from a URL asynchronously.

        Args:
            url: The URL to fetch

        Returns:
            The page content if successful, None otherwise
        """
        try:
            session = await self._get_session()
            async with session.get(url) as response:
                if response.status == 200:
                    return await response.text()
                print(f"Error fetching {url}: HTTP {response.status}")
                return None
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None

Functions

__init__(model=None)

Initialize the website monitor.

Parameters:

Name Type Description Default
model Optional[HfApiModel]

Optional HfApiModel instance for relevance checking

None
Source code in ambi_alert/monitor.py
def __init__(self, model: Optional[HfApiModel] = None):
    """Initialize the website monitor.

    Args:
        model: Optional HfApiModel instance for relevance checking
    """
    self.model = model or HfApiModel()
    self.webpage_tool = VisitWebpageTool()
    self._session: Optional[aiohttp.ClientSession] = None

_get_session() async

Get or create an aiohttp session.

Source code in ambi_alert/monitor.py
async def _get_session(self) -> aiohttp.ClientSession:
    """Get or create an aiohttp session."""
    if self._session is None or self._session.closed:
        self._session = aiohttp.ClientSession()
    return self._session

check_relevance(content, query) async

Check if content changes are relevant to the original query.

Parameters:

Name Type Description Default
content str

The content to check

required
query str

The original search query

required

Returns:

Type Description
tuple[bool, str]

Tuple of (is_relevant, explanation)

Source code in ambi_alert/monitor.py
async def check_relevance(self, content: str, query: str) -> tuple[bool, str]:
    """Check if content changes are relevant to the original query.

    Args:
        content: The content to check
        query: The original search query

    Returns:
        Tuple of (is_relevant, explanation)
    """
    prompt = f"""Analyze if the following content is relevant to the query "{query}".
    Content: {content[:1000]}...

    Answer with YES or NO, followed by a brief explanation."""

    # Note: Using synchronous model.generate for now as smolagents doesn't support async yet
    response = self.model.generate(prompt)
    lines = response.strip().split("\n")
    is_relevant = lines[0].strip().upper().startswith("YES")
    explanation = "\n".join(lines[1:]).strip()
    return is_relevant, explanation

close() async

Close the aiohttp session.

Source code in ambi_alert/monitor.py
async def close(self) -> None:
    """Close the aiohttp session."""
    if self._session and not self._session.closed:
        await self._session.close()
        self._session = None

fetch_content(url) async

Fetch content from a URL asynchronously.

Parameters:

Name Type Description Default
url str

The URL to fetch

required

Returns:

Type Description
Optional[str]

The page content if successful, None otherwise

Source code in ambi_alert/monitor.py
async def fetch_content(self, url: str) -> Optional[str]:
    """Fetch content from a URL asynchronously.

    Args:
        url: The URL to fetch

    Returns:
        The page content if successful, None otherwise
    """
    try:
        session = await self._get_session()
        async with session.get(url) as response:
            if response.status == 200:
                return await response.text()
            print(f"Error fetching {url}: HTTP {response.status}")
            return None
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None

get_content_hash(url) async

Get a hash of the relevant content from a URL.

Parameters:

Name Type Description Default
url str

The URL to check

required

Returns:

Type Description
str

A hash of the page's main content

Source code in ambi_alert/monitor.py
async def get_content_hash(self, url: str) -> str:
    """Get a hash of the relevant content from a URL.

    Args:
        url: The URL to check

    Returns:
        A hash of the page's main content
    """
    try:
        session = await self._get_session()
        async with session.get(url) as response:
            if response.status == 200:
                content = await response.text()
                return self.get_content_hash_from_content(content)
            print(f"Error fetching {url}: HTTP {response.status}")
            return ""
    except Exception as e:
        print(f"Error getting content from {url}: {e}")
        return ""

get_content_hash_from_content(content)

Get a hash of the relevant content from HTML content.

Parameters:

Name Type Description Default
content str

The HTML content to hash

required

Returns:

Type Description
str

A hash of the page's main content

Source code in ambi_alert/monitor.py
def get_content_hash_from_content(self, content: str) -> str:
    """Get a hash of the relevant content from HTML content.

    Args:
        content: The HTML content to hash

    Returns:
        A hash of the page's main content
    """
    if not content:
        return ""

    try:
        # Parse with BeautifulSoup to get main content
        soup = BeautifulSoup(content, "html.parser")

        # Remove scripts, styles, and navigation elements
        for element in soup(["script", "style", "nav", "header", "footer"]):
            element.decompose()

        # Get the main text content
        text = soup.get_text()
        return hashlib.sha256(text.encode()).hexdigest()
    except Exception as e:
        print(f"Error hashing content: {e}")
        return ""

get_content_summary(content) async

Generate a summary of the changed content.

Parameters:

Name Type Description Default
content str

The content to summarize

required

Returns:

Type Description
str

A brief summary of the content

Source code in ambi_alert/monitor.py
async def get_content_summary(self, content: str) -> str:
    """Generate a summary of the changed content.

    Args:
        content: The content to summarize

    Returns:
        A brief summary of the content
    """
    prompt = "Summarize the following content in 2-3 sentences:\n\n" + content[:2000]
    # Note: Using synchronous model.generate for now as smolagents doesn't support async yet
    return self.model.generate(prompt).strip()

options: show_root_heading: true show_source: true

Database Management

ambi_alert.database.DatabaseManager

Manages the SQLite database for storing monitored URLs.

Source code in ambi_alert/database.py
class DatabaseManager:
    """Manages the SQLite database for storing monitored URLs."""

    def __init__(self, db_path: str = "ambi_alert.db"):
        """Initialize the database manager.

        Args:
            db_path: Path to the SQLite database file
        """
        self.db_path = Path(db_path)
        self._connection: Optional[aiosqlite.Connection] = None

    async def _get_connection(self) -> aiosqlite.Connection:
        """Get or create a database connection.

        Returns:
            An aiosqlite connection
        """
        if self._connection is None:
            self._connection = await aiosqlite.connect(self.db_path)
            self._connection.row_factory = aiosqlite.Row
        return self._connection

    async def close(self) -> None:
        """Close the database connection."""
        if self._connection:
            await self._connection.close()
            self._connection = None

    async def _init_db(self) -> None:
        """Initialize the database schema if it doesn't exist."""
        conn = await self._get_connection()
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS monitored_urls (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT NOT NULL,
                query TEXT NOT NULL,
                last_check TIMESTAMP NOT NULL,
                last_content_hash TEXT NOT NULL
            )
        """)
        await conn.commit()

    async def add_url(self, url: str, query: str, content_hash: str) -> None:
        """Add a new URL to monitor.

        Args:
            url: The URL to monitor
            query: The search query that found this URL
            content_hash: Hash of the initial content
        """
        conn = await self._get_connection()
        await conn.execute(
            "INSERT INTO monitored_urls (url, query, last_check, last_content_hash) VALUES (?, ?, ?, ?)",
            (url, query, datetime.now(), content_hash),
        )
        await conn.commit()

    async def get_urls_to_check(self) -> list[MonitoredURL]:
        """Get all URLs that need to be checked.

        Returns:
            List of MonitoredURL objects
        """
        conn = await self._get_connection()
        async with conn.execute("SELECT * FROM monitored_urls") as cursor:
            rows = await cursor.fetchall()
            return [
                MonitoredURL(
                    id=row["id"],
                    url=row["url"],
                    query=row["query"],
                    last_check=datetime.fromisoformat(row["last_check"]),
                    last_content_hash=row["last_content_hash"],
                )
                for row in rows
            ]

    async def update_url_check(self, url_id: int, content_hash: str) -> None:
        """Update the last check time and content hash for a URL.

        Args:
            url_id: The ID of the URL in the database
            content_hash: The new content hash
        """
        conn = await self._get_connection()
        await conn.execute(
            "UPDATE monitored_urls SET last_check = ?, last_content_hash = ? WHERE id = ?",
            (datetime.now(), content_hash, url_id),
        )
        await conn.commit()

    async def get_all_urls(self) -> list[tuple[str, str, str]]:
        """Get all monitored URLs with their queries and hashes.

        Returns:
            List of tuples containing (url, query, content_hash)
        """
        conn = await self._get_connection()
        async with conn.execute("SELECT url, query, last_content_hash FROM monitored_urls") as cursor:
            rows = await cursor.fetchall()
            return [(row["url"], row["query"], row["last_content_hash"]) for row in rows]

    async def update_url_hash(self, url: str, new_hash: str) -> None:
        """Update the content hash for a URL.

        Args:
            url: The URL to update
            new_hash: The new content hash
        """
        conn = await self._get_connection()
        await conn.execute(
            "UPDATE monitored_urls SET last_content_hash = ? WHERE url = ?",
            (new_hash, url),
        )
        await conn.commit()

    async def __aenter__(self) -> "DatabaseManager":
        """Async context manager entry.

        Returns:
            Self for use in async with statements
        """
        await self._init_db()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.close()

Functions

__aenter__() async

Async context manager entry.

Returns:

Type Description
DatabaseManager

Self for use in async with statements

Source code in ambi_alert/database.py
async def __aenter__(self) -> "DatabaseManager":
    """Async context manager entry.

    Returns:
        Self for use in async with statements
    """
    await self._init_db()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Source code in ambi_alert/database.py
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.close()

__init__(db_path='ambi_alert.db')

Initialize the database manager.

Parameters:

Name Type Description Default
db_path str

Path to the SQLite database file

'ambi_alert.db'
Source code in ambi_alert/database.py
def __init__(self, db_path: str = "ambi_alert.db"):
    """Initialize the database manager.

    Args:
        db_path: Path to the SQLite database file
    """
    self.db_path = Path(db_path)
    self._connection: Optional[aiosqlite.Connection] = None

_get_connection() async

Get or create a database connection.

Returns:

Type Description
Connection

An aiosqlite connection

Source code in ambi_alert/database.py
async def _get_connection(self) -> aiosqlite.Connection:
    """Get or create a database connection.

    Returns:
        An aiosqlite connection
    """
    if self._connection is None:
        self._connection = await aiosqlite.connect(self.db_path)
        self._connection.row_factory = aiosqlite.Row
    return self._connection

_init_db() async

Initialize the database schema if it doesn't exist.

Source code in ambi_alert/database.py
async def _init_db(self) -> None:
    """Initialize the database schema if it doesn't exist."""
    conn = await self._get_connection()
    await conn.execute("""
        CREATE TABLE IF NOT EXISTS monitored_urls (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            url TEXT NOT NULL,
            query TEXT NOT NULL,
            last_check TIMESTAMP NOT NULL,
            last_content_hash TEXT NOT NULL
        )
    """)
    await conn.commit()

add_url(url, query, content_hash) async

Add a new URL to monitor.

Parameters:

Name Type Description Default
url str

The URL to monitor

required
query str

The search query that found this URL

required
content_hash str

Hash of the initial content

required
Source code in ambi_alert/database.py
async def add_url(self, url: str, query: str, content_hash: str) -> None:
    """Add a new URL to monitor.

    Args:
        url: The URL to monitor
        query: The search query that found this URL
        content_hash: Hash of the initial content
    """
    conn = await self._get_connection()
    await conn.execute(
        "INSERT INTO monitored_urls (url, query, last_check, last_content_hash) VALUES (?, ?, ?, ?)",
        (url, query, datetime.now(), content_hash),
    )
    await conn.commit()

close() async

Close the database connection.

Source code in ambi_alert/database.py
async def close(self) -> None:
    """Close the database connection."""
    if self._connection:
        await self._connection.close()
        self._connection = None

get_all_urls() async

Get all monitored URLs with their queries and hashes.

Returns:

Type Description
list[tuple[str, str, str]]

List of tuples containing (url, query, content_hash)

Source code in ambi_alert/database.py
async def get_all_urls(self) -> list[tuple[str, str, str]]:
    """Get all monitored URLs with their queries and hashes.

    Returns:
        List of tuples containing (url, query, content_hash)
    """
    conn = await self._get_connection()
    async with conn.execute("SELECT url, query, last_content_hash FROM monitored_urls") as cursor:
        rows = await cursor.fetchall()
        return [(row["url"], row["query"], row["last_content_hash"]) for row in rows]

get_urls_to_check() async

Get all URLs that need to be checked.

Returns:

Type Description
list[MonitoredURL]

List of MonitoredURL objects

Source code in ambi_alert/database.py
async def get_urls_to_check(self) -> list[MonitoredURL]:
    """Get all URLs that need to be checked.

    Returns:
        List of MonitoredURL objects
    """
    conn = await self._get_connection()
    async with conn.execute("SELECT * FROM monitored_urls") as cursor:
        rows = await cursor.fetchall()
        return [
            MonitoredURL(
                id=row["id"],
                url=row["url"],
                query=row["query"],
                last_check=datetime.fromisoformat(row["last_check"]),
                last_content_hash=row["last_content_hash"],
            )
            for row in rows
        ]

update_url_check(url_id, content_hash) async

Update the last check time and content hash for a URL.

Parameters:

Name Type Description Default
url_id int

The ID of the URL in the database

required
content_hash str

The new content hash

required
Source code in ambi_alert/database.py
async def update_url_check(self, url_id: int, content_hash: str) -> None:
    """Update the last check time and content hash for a URL.

    Args:
        url_id: The ID of the URL in the database
        content_hash: The new content hash
    """
    conn = await self._get_connection()
    await conn.execute(
        "UPDATE monitored_urls SET last_check = ?, last_content_hash = ? WHERE id = ?",
        (datetime.now(), content_hash, url_id),
    )
    await conn.commit()

update_url_hash(url, new_hash) async

Update the content hash for a URL.

Parameters:

Name Type Description Default
url str

The URL to update

required
new_hash str

The new content hash

required
Source code in ambi_alert/database.py
async def update_url_hash(self, url: str, new_hash: str) -> None:
    """Update the content hash for a URL.

    Args:
        url: The URL to update
        new_hash: The new content hash
    """
    conn = await self._get_connection()
    await conn.execute(
        "UPDATE monitored_urls SET last_content_hash = ? WHERE url = ?",
        (new_hash, url),
    )
    await conn.commit()

options: show_root_heading: true show_source: true

ambi_alert.database.MonitoredURL dataclass

Represents a URL being monitored.

Source code in ambi_alert/database.py
@dataclass
class MonitoredURL:
    """Represents a URL being monitored."""

    url: str
    query: str
    last_check: datetime
    last_content_hash: str
    id: Optional[int] = None

options: show_root_heading: true show_source: true

Alerting System

ambi_alert.alerting.AlertManager

Manages sending alerts through configured backends.

Source code in ambi_alert/alerting.py
class AlertManager:
    """Manages sending alerts through configured backends."""

    def __init__(self, backend: Optional[AlertBackend] = None):
        """Initialize the alert manager.

        Args:
            backend: Optional alert backend. Defaults to MockAlertBackend
        """
        self.backend = backend or MockAlertBackend()

    async def send_change_alert(self, url: str, query: str, summary: str) -> bool:
        """Send an alert about a relevant change.

        Args:
            url: The URL that changed
            query: The original search query
            summary: Summary of the changes

        Returns:
            True if the alert was sent successfully
        """
        subject = f"AmbiAlert Update: New information about '{query}'"
        message = f"""
        Hello!

        We've detected relevant changes related to your query: "{query}"

        URL: {url}

        What's New:
        {summary}

        Best regards,
        Your AmbiAlert System
        """

        return await self.backend.send_alert(subject, message.strip())

Functions

__init__(backend=None)

Initialize the alert manager.

Parameters:

Name Type Description Default
backend Optional[AlertBackend]

Optional alert backend. Defaults to MockAlertBackend

None
Source code in ambi_alert/alerting.py
def __init__(self, backend: Optional[AlertBackend] = None):
    """Initialize the alert manager.

    Args:
        backend: Optional alert backend. Defaults to MockAlertBackend
    """
    self.backend = backend or MockAlertBackend()

send_change_alert(url, query, summary) async

Send an alert about a relevant change.

Parameters:

Name Type Description Default
url str

The URL that changed

required
query str

The original search query

required
summary str

Summary of the changes

required

Returns:

Type Description
bool

True if the alert was sent successfully

Source code in ambi_alert/alerting.py
async def send_change_alert(self, url: str, query: str, summary: str) -> bool:
    """Send an alert about a relevant change.

    Args:
        url: The URL that changed
        query: The original search query
        summary: Summary of the changes

    Returns:
        True if the alert was sent successfully
    """
    subject = f"AmbiAlert Update: New information about '{query}'"
    message = f"""
    Hello!

    We've detected relevant changes related to your query: "{query}"

    URL: {url}

    What's New:
    {summary}

    Best regards,
    Your AmbiAlert System
    """

    return await self.backend.send_alert(subject, message.strip())

options: show_root_heading: true show_source: true

ambi_alert.alerting.AlertBackend

Bases: Protocol

Protocol for alert backends.

Source code in ambi_alert/alerting.py
class AlertBackend(Protocol):
    """Protocol for alert backends."""

    async def send_alert(self, subject: str, message: str) -> bool:
        """Send an alert with the given subject and message.

        Args:
            subject: Alert subject
            message: Alert message

        Returns:
            True if the alert was sent successfully
        """
        ...

Functions

send_alert(subject, message) async

Send an alert with the given subject and message.

Parameters:

Name Type Description Default
subject str

Alert subject

required
message str

Alert message

required

Returns:

Type Description
bool

True if the alert was sent successfully

Source code in ambi_alert/alerting.py
async def send_alert(self, subject: str, message: str) -> bool:
    """Send an alert with the given subject and message.

    Args:
        subject: Alert subject
        message: Alert message

    Returns:
        True if the alert was sent successfully
    """
    ...

options: show_root_heading: true show_source: true

ambi_alert.alerting.EmailAlertBackend

Email-based alert backend.

Source code in ambi_alert/alerting.py
class EmailAlertBackend:
    """Email-based alert backend."""

    def __init__(
        self,
        smtp_server: str,
        smtp_port: int,
        username: str,
        password: str,
        from_email: str,
        to_email: str,
    ):
        """Initialize the email alert backend.

        Args:
            smtp_server: SMTP server hostname
            smtp_port: SMTP server port
            username: SMTP authentication username
            password: SMTP authentication password
            from_email: Sender email address
            to_email: Recipient email address
        """
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.from_email = from_email
        self.to_email = to_email
        self._client: Optional[aiosmtplib.SMTP] = None

    async def _get_client(self) -> aiosmtplib.SMTP:
        """Get or create an SMTP client.

        Returns:
            An SMTP client instance
        """
        if self._client is None or not self._client.is_connected:
            self._client = aiosmtplib.SMTP(
                hostname=self.smtp_server,
                port=self.smtp_port,
                use_tls=True,
            )
            await self._client.connect()
            await self._client.login(self.username, self.password)
        return self._client

    async def close(self) -> None:
        """Close the SMTP connection."""
        if self._client and self._client.is_connected:
            await self._client.quit()
            self._client = None

    async def send_alert(self, subject: str, message: str) -> bool:
        """Send an email alert.

        Args:
            subject: Email subject
            message: Email body

        Returns:
            True if the email was sent successfully
        """
        try:
            msg = MIMEMultipart()
            msg["From"] = self.from_email
            msg["To"] = self.to_email
            msg["Subject"] = subject
            msg.attach(MIMEText(message, "plain"))

            client = await self._get_client()
            await client.send_message(msg)
        except Exception as e:
            print(f"Failed to send email alert: {e}")
            return False
        else:
            return True

    async def __aenter__(self) -> "EmailAlertBackend":
        """Async context manager entry."""
        await self._get_client()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""
        await self.close()

Functions

__aenter__() async

Async context manager entry.

Source code in ambi_alert/alerting.py
async def __aenter__(self) -> "EmailAlertBackend":
    """Async context manager entry."""
    await self._get_client()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Source code in ambi_alert/alerting.py
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.close()

__init__(smtp_server, smtp_port, username, password, from_email, to_email)

Initialize the email alert backend.

Parameters:

Name Type Description Default
smtp_server str

SMTP server hostname

required
smtp_port int

SMTP server port

required
username str

SMTP authentication username

required
password str

SMTP authentication password

required
from_email str

Sender email address

required
to_email str

Recipient email address

required
Source code in ambi_alert/alerting.py
def __init__(
    self,
    smtp_server: str,
    smtp_port: int,
    username: str,
    password: str,
    from_email: str,
    to_email: str,
):
    """Initialize the email alert backend.

    Args:
        smtp_server: SMTP server hostname
        smtp_port: SMTP server port
        username: SMTP authentication username
        password: SMTP authentication password
        from_email: Sender email address
        to_email: Recipient email address
    """
    self.smtp_server = smtp_server
    self.smtp_port = smtp_port
    self.username = username
    self.password = password
    self.from_email = from_email
    self.to_email = to_email
    self._client: Optional[aiosmtplib.SMTP] = None

_get_client() async

Get or create an SMTP client.

Returns:

Type Description
SMTP

An SMTP client instance

Source code in ambi_alert/alerting.py
async def _get_client(self) -> aiosmtplib.SMTP:
    """Get or create an SMTP client.

    Returns:
        An SMTP client instance
    """
    if self._client is None or not self._client.is_connected:
        self._client = aiosmtplib.SMTP(
            hostname=self.smtp_server,
            port=self.smtp_port,
            use_tls=True,
        )
        await self._client.connect()
        await self._client.login(self.username, self.password)
    return self._client

close() async

Close the SMTP connection.

Source code in ambi_alert/alerting.py
async def close(self) -> None:
    """Close the SMTP connection."""
    if self._client and self._client.is_connected:
        await self._client.quit()
        self._client = None

send_alert(subject, message) async

Send an email alert.

Parameters:

Name Type Description Default
subject str

Email subject

required
message str

Email body

required

Returns:

Type Description
bool

True if the email was sent successfully

Source code in ambi_alert/alerting.py
async def send_alert(self, subject: str, message: str) -> bool:
    """Send an email alert.

    Args:
        subject: Email subject
        message: Email body

    Returns:
        True if the email was sent successfully
    """
    try:
        msg = MIMEMultipart()
        msg["From"] = self.from_email
        msg["To"] = self.to_email
        msg["Subject"] = subject
        msg.attach(MIMEText(message, "plain"))

        client = await self._get_client()
        await client.send_message(msg)
    except Exception as e:
        print(f"Failed to send email alert: {e}")
        return False
    else:
        return True

options: show_root_heading: true show_source: true

ambi_alert.alerting.MockAlertBackend

Mock alert backend for testing.

Source code in ambi_alert/alerting.py
class MockAlertBackend:
    """Mock alert backend for testing."""

    async def send_alert(self, subject: str, message: str) -> bool:
        """Print the alert to console instead of sending it.

        Args:
            subject: Alert subject
            message: Alert message

        Returns:
            Always returns True
        """
        print("\n=== MOCK ALERT ===")
        print(f"Subject: {subject}")
        print("Message:")
        print(message)
        print("================\n")
        return True

Functions

send_alert(subject, message) async

Print the alert to console instead of sending it.

Parameters:

Name Type Description Default
subject str

Alert subject

required
message str

Alert message

required

Returns:

Type Description
bool

Always returns True

Source code in ambi_alert/alerting.py
async def send_alert(self, subject: str, message: str) -> bool:
    """Print the alert to console instead of sending it.

    Args:
        subject: Alert subject
        message: Alert message

    Returns:
        Always returns True
    """
    print("\n=== MOCK ALERT ===")
    print(f"Subject: {subject}")
    print("Message:")
    print(message)
    print("================\n")
    return True

options: show_root_heading: true show_source: true