From 7b0675bf4b319172d5253a5858c14969ad0c444a Mon Sep 17 00:00:00 2001 From: admin Date: Mon, 18 May 2026 15:11:09 +0000 Subject: [PATCH] feat: add alertmanager webhook receiver --- ns8_backup_monitor/receiver.py | 104 +++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 ns8_backup_monitor/receiver.py diff --git a/ns8_backup_monitor/receiver.py b/ns8_backup_monitor/receiver.py new file mode 100644 index 0000000..af1bc26 --- /dev/null +++ b/ns8_backup_monitor/receiver.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +receiver.py - HTTP webhook receiver for Alertmanager alerts. + +Listens on configured host:port for POST /alert from Alertmanager. +On receiving NsBackupFailed or NsBackupMissing, triggers the pipeline: + correlator -> repo_check -> notifier +""" + +import json +import logging +import threading +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Any + +from .correlator import correlate_backup_status +from .notifier import send_notification +from .repo_check import check_repositories +from .utils import load_config + +log = logging.getLogger(__name__) + +ALERT_NAMES = {"NsBackupFailed", "NsBackupMissing"} + + +class AlertHandler(BaseHTTPRequestHandler): + config: dict = {} + + def do_POST(self): + if self.path != "/alert": + self.send_response(404) + self.end_headers() + return + + length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(length) + + try: + payload = json.loads(body) + except json.JSONDecodeError: + log.warning("Received invalid JSON payload") + self.send_response(400) + self.end_headers() + return + + self.send_response(200) + self.end_headers() + + alerts = payload.get("alerts", []) + relevant = [ + a for a in alerts + if a.get("labels", {}).get("alertname") in ALERT_NAMES + and a.get("status") == "firing" + ] + + if relevant: + log.info(f"Received {len(relevant)} relevant alert(s), scheduling pipeline") + wait = self.config.get("correlator", {}).get("wait_seconds", 30) + t = threading.Thread( + target=_run_pipeline, + args=(relevant, self.config, wait), + daemon=True + ) + t.start() + else: + log.debug("No relevant alerts in payload, ignoring") + + def log_message(self, fmt, *args): + log.debug(f"HTTP: {fmt % args}") + + +def _run_pipeline(alerts: list, config: dict, wait: int): + """Wait for modules to finish, then run correlator -> repo_check -> notifier.""" + log.info(f"Waiting {wait}s before status check...") + time.sleep(wait) + + # Collect unique backup_ids from alert labels if available + backup_ids = list({ + a["labels"].get("backup_id", "") + for a in alerts + if a["labels"].get("backup_id") + }) + + log.info("Running correlator...") + correlation = correlate_backup_status(config, backup_ids) + + repo_status = None + if correlation["outcome"] != "SUCCESS": + log.info("Non-success outcome, running repo check...") + repo_status = check_repositories(config, correlation) + + log.info("Sending notification...") + send_notification(config, alerts, correlation, repo_status) + + +def run_server(config: dict): + host = config.get("receiver", {}).get("host", "127.0.0.1") + port = config.get("receiver", {}).get("port", 9099) + + AlertHandler.config = config + server = HTTPServer((host, port), AlertHandler) + log.info(f"ns8-backup-monitor receiver listening on {host}:{port}") + server.serve_forever()