#!/usr/bin/env python3 """HTTP webhook receiver for Alertmanager backup alerts. This module exposes a minimal HTTP server that Alertmanager POSTs backup-failure notifications to. On receiving a relevant alert it spawns a background daemon thread that runs the full analysis pipeline: correlator → repo_check (only on non-SUCCESS) → notifier Why a background thread? ------------------------ Alertmanager expects a quick HTTP 200 response. The correlation step deliberately waits ``wait_seconds`` (default 30) so that slow backup modules have time to write their final status into Redis before we read it. Blocking the HTTP handler for 30+ seconds would cause Alertmanager to retry the webhook, so the work is offloaded to a daemon thread. Endpoints --------- POST /alert Accepts an Alertmanager JSON payload. All other paths return 404. """ import json import logging import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any from .correlator import correlate_backup_status from .notifier import send_notification from .repo_check import check_repositories from .utils import load_config log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Alert filter # --------------------------------------------------------------------------- # Only these Alertmanager alert names trigger the pipeline. # NsBackupFailed – one or more modules reported an error. # NsBackupMissing – expected backup did not run within the time window. ALERT_NAMES = {"NsBackupFailed", "NsBackupMissing"} # --------------------------------------------------------------------------- # HTTP request handler # --------------------------------------------------------------------------- class AlertHandler(BaseHTTPRequestHandler): """Minimal HTTP handler that accepts Alertmanager webhook payloads. The ``config`` class attribute is populated by ``run_server()`` before the server starts so that every request handler instance shares the same configuration dictionary without using global state. """ config: dict = {} def do_POST(self): """Handle POST /alert — the only supported endpoint.""" # Reject any path other than /alert. if self.path != "/alert": self.send_response(404) self.end_headers() return # --------------------------------------------------------------------------- # Read and parse the request body # --------------------------------------------------------------------------- length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(length) try: payload = json.loads(body) except json.JSONDecodeError: log.warning("Received invalid JSON payload") self.send_response(400) self.end_headers() return # Respond immediately so Alertmanager does not retry the webhook. self.send_response(200) self.end_headers() # --------------------------------------------------------------------------- # Filter relevant alerts # --------------------------------------------------------------------------- # Only process alerts that are currently firing and match our alert names. # Resolved alerts (status == "resolved") are intentionally ignored because # the NS8 monitoring stack clears alerts once the condition is gone; we do # not need a separate "backup OK" notification path here. alerts = payload.get("alerts", []) relevant = [ a for a in alerts if a.get("labels", {}).get("alertname") in ALERT_NAMES and a.get("status") == "firing" ] # --------------------------------------------------------------------------- # Schedule the pipeline in a background thread # --------------------------------------------------------------------------- if relevant: log.info(f"Received {len(relevant)} relevant alert(s), scheduling pipeline") wait = self.config.get("correlator", {}).get("wait_seconds", 30) t = threading.Thread( target=_run_pipeline, args=(relevant, self.config, wait), # Daemon thread: will not prevent the process from exiting if # systemd sends SIGTERM while a pipeline run is in progress. daemon=True ) t.start() else: log.debug("No relevant alerts in payload, ignoring") def log_message(self, fmt, *args): """Redirect BaseHTTPRequestHandler access logs to the module logger.""" log.debug(f"HTTP: {fmt % args}") # --------------------------------------------------------------------------- # Pipeline runner (background thread target) # --------------------------------------------------------------------------- def _run_pipeline(alerts: list, config: dict, wait: int): """Wait for module states to settle, then run the full analysis pipeline. Steps ----- 1. Sleep ``wait`` seconds so backup modules finish writing to Redis. 2. Extract backup_ids from alert labels when available; fall back to scanning Redis for recently updated plan status keys. 3. Run the correlator to classify the overall outcome. 4. If the outcome is not SUCCESS, run the repository health check to gather additional diagnostic information. 5. Send the email notification. """ log.info(f"Waiting {wait}s before status check...") time.sleep(wait) # --------------------------------------------------------------------------- # Extract backup_ids from alert labels # --------------------------------------------------------------------------- # Alertmanager may include a ``backup_id`` label on the alert. When present # it is used to read the exact Redis keys for that plan. When absent the # correlator falls back to scanning for recent plan status keys. backup_ids = list({ a["labels"].get("backup_id", "") for a in alerts if a["labels"].get("backup_id") }) # --------------------------------------------------------------------------- # Correlation # --------------------------------------------------------------------------- log.info("Running correlator...") correlation = correlate_backup_status(config, backup_ids) # --------------------------------------------------------------------------- # Repository health check (non-SUCCESS outcomes only) # --------------------------------------------------------------------------- # Skipped on SUCCESS to avoid unnecessary restic network calls. repo_status = None if correlation["outcome"] != "SUCCESS": log.info("Non-success outcome, running repo check...") repo_status = check_repositories(config, correlation) # --------------------------------------------------------------------------- # Email notification # --------------------------------------------------------------------------- log.info("Sending notification...") send_notification(config, alerts, correlation, repo_status) # --------------------------------------------------------------------------- # Server bootstrap # --------------------------------------------------------------------------- def run_server(config: dict): """Bind the HTTP server to the configured host/port and serve forever. The host and port are read from the ``receiver`` section of the config. Defaults to localhost:9099 to avoid accidental exposure on public interfaces. """ host = config.get("receiver", {}).get("host", "127.0.0.1") port = config.get("receiver", {}).get("port", 9099) # Share the config with the request handler class via a class attribute # rather than passing it through BaseHTTPRequestHandler's constructor, # which does not support custom arguments. AlertHandler.config = config server = HTTPServer((host, port), AlertHandler) log.info(f"ns8-backup-monitor receiver listening on {host}:{port}") server.serve_forever()