diff --git a/ns8_backup_monitor/receiver.py b/ns8_backup_monitor/receiver.py index af1bc26..8677ae0 100644 --- a/ns8_backup_monitor/receiver.py +++ b/ns8_backup_monitor/receiver.py @@ -1,10 +1,24 @@ #!/usr/bin/env python3 -""" -receiver.py - HTTP webhook receiver for Alertmanager alerts. +"""HTTP webhook receiver for Alertmanager backup alerts. -Listens on configured host:port for POST /alert from Alertmanager. -On receiving NsBackupFailed or NsBackupMissing, triggers the pipeline: - correlator -> repo_check -> notifier +This module exposes a minimal HTTP server that Alertmanager POSTs +backup-failure notifications to. On receiving a relevant alert it +spawns a background daemon thread that runs the full analysis pipeline: + + correlator → repo_check (only on non-SUCCESS) → notifier + +Why a background thread? +------------------------ +Alertmanager expects a quick HTTP 200 response. The correlation step +deliberately waits ``wait_seconds`` (default 30) so that slow backup +modules have time to write their final status into Redis before we read +it. Blocking the HTTP handler for 30+ seconds would cause Alertmanager +to retry the webhook, so the work is offloaded to a daemon thread. + +Endpoints +--------- +POST /alert Accepts an Alertmanager JSON payload. + All other paths return 404. """ import json @@ -21,18 +35,41 @@ from .utils import load_config log = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Alert filter +# --------------------------------------------------------------------------- +# Only these Alertmanager alert names trigger the pipeline. +# NsBackupFailed – one or more modules reported an error. +# NsBackupMissing – expected backup did not run within the time window. ALERT_NAMES = {"NsBackupFailed", "NsBackupMissing"} +# --------------------------------------------------------------------------- +# HTTP request handler +# --------------------------------------------------------------------------- + class AlertHandler(BaseHTTPRequestHandler): + """Minimal HTTP handler that accepts Alertmanager webhook payloads. + + The ``config`` class attribute is populated by ``run_server()`` before + the server starts so that every request handler instance shares the + same configuration dictionary without using global state. + """ + config: dict = {} def do_POST(self): + """Handle POST /alert — the only supported endpoint.""" + + # Reject any path other than /alert. if self.path != "/alert": self.send_response(404) self.end_headers() return + # --------------------------------------------------------------------------- + # Read and parse the request body + # --------------------------------------------------------------------------- length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(length) @@ -44,9 +81,17 @@ class AlertHandler(BaseHTTPRequestHandler): self.end_headers() return + # Respond immediately so Alertmanager does not retry the webhook. self.send_response(200) self.end_headers() + # --------------------------------------------------------------------------- + # Filter relevant alerts + # --------------------------------------------------------------------------- + # Only process alerts that are currently firing and match our alert names. + # Resolved alerts (status == "resolved") are intentionally ignored because + # the NS8 monitoring stack clears alerts once the condition is gone; we do + # not need a separate "backup OK" notification path here. alerts = payload.get("alerts", []) relevant = [ a for a in alerts @@ -54,12 +99,17 @@ class AlertHandler(BaseHTTPRequestHandler): and a.get("status") == "firing" ] + # --------------------------------------------------------------------------- + # Schedule the pipeline in a background thread + # --------------------------------------------------------------------------- if relevant: log.info(f"Received {len(relevant)} relevant alert(s), scheduling pipeline") wait = self.config.get("correlator", {}).get("wait_seconds", 30) t = threading.Thread( target=_run_pipeline, args=(relevant, self.config, wait), + # Daemon thread: will not prevent the process from exiting if + # systemd sends SIGTERM while a pipeline run is in progress. daemon=True ) t.start() @@ -67,38 +117,82 @@ class AlertHandler(BaseHTTPRequestHandler): log.debug("No relevant alerts in payload, ignoring") def log_message(self, fmt, *args): + """Redirect BaseHTTPRequestHandler access logs to the module logger.""" log.debug(f"HTTP: {fmt % args}") +# --------------------------------------------------------------------------- +# Pipeline runner (background thread target) +# --------------------------------------------------------------------------- + def _run_pipeline(alerts: list, config: dict, wait: int): - """Wait for modules to finish, then run correlator -> repo_check -> notifier.""" + """Wait for module states to settle, then run the full analysis pipeline. + + Steps + ----- + 1. Sleep ``wait`` seconds so backup modules finish writing to Redis. + 2. Extract backup_ids from alert labels when available; fall back to + scanning Redis for recently updated plan status keys. + 3. Run the correlator to classify the overall outcome. + 4. If the outcome is not SUCCESS, run the repository health check to + gather additional diagnostic information. + 5. Send the email notification. + """ log.info(f"Waiting {wait}s before status check...") time.sleep(wait) - # Collect unique backup_ids from alert labels if available + # --------------------------------------------------------------------------- + # Extract backup_ids from alert labels + # --------------------------------------------------------------------------- + # Alertmanager may include a ``backup_id`` label on the alert. When present + # it is used to read the exact Redis keys for that plan. When absent the + # correlator falls back to scanning for recent plan status keys. backup_ids = list({ a["labels"].get("backup_id", "") for a in alerts if a["labels"].get("backup_id") }) + # --------------------------------------------------------------------------- + # Correlation + # --------------------------------------------------------------------------- log.info("Running correlator...") correlation = correlate_backup_status(config, backup_ids) + # --------------------------------------------------------------------------- + # Repository health check (non-SUCCESS outcomes only) + # --------------------------------------------------------------------------- + # Skipped on SUCCESS to avoid unnecessary restic network calls. repo_status = None if correlation["outcome"] != "SUCCESS": log.info("Non-success outcome, running repo check...") repo_status = check_repositories(config, correlation) + # --------------------------------------------------------------------------- + # Email notification + # --------------------------------------------------------------------------- log.info("Sending notification...") send_notification(config, alerts, correlation, repo_status) +# --------------------------------------------------------------------------- +# Server bootstrap +# --------------------------------------------------------------------------- + def run_server(config: dict): + """Bind the HTTP server to the configured host/port and serve forever. + + The host and port are read from the ``receiver`` section of the config. + Defaults to localhost:9099 to avoid accidental exposure on public interfaces. + """ host = config.get("receiver", {}).get("host", "127.0.0.1") port = config.get("receiver", {}).get("port", 9099) + # Share the config with the request handler class via a class attribute + # rather than passing it through BaseHTTPRequestHandler's constructor, + # which does not support custom arguments. AlertHandler.config = config + server = HTTPServer((host, port), AlertHandler) log.info(f"ns8-backup-monitor receiver listening on {host}:{port}") server.serve_forever()