Files
ns8-backup-monitor/ns8_backup_monitor/receiver.py
T

199 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""HTTP webhook receiver for Alertmanager backup alerts.
This module exposes a minimal HTTP server that Alertmanager POSTs
backup-failure notifications to. On receiving a relevant alert it
spawns a background daemon thread that runs the full analysis pipeline:
correlator → repo_check (only on non-SUCCESS) → notifier
Why a background thread?
------------------------
Alertmanager expects a quick HTTP 200 response. The correlation step
deliberately waits ``wait_seconds`` (default 30) so that slow backup
modules have time to write their final status into Redis before we read
it. Blocking the HTTP handler for 30+ seconds would cause Alertmanager
to retry the webhook, so the work is offloaded to a daemon thread.
Endpoints
---------
POST /alert Accepts an Alertmanager JSON payload.
All other paths return 404.
"""
import json
import logging
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from .correlator import correlate_backup_status
from .notifier import send_notification
from .repo_check import check_repositories
from .utils import load_config
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Alert filter
# ---------------------------------------------------------------------------
# Only these Alertmanager alert names trigger the pipeline.
# NsBackupFailed one or more modules reported an error.
# NsBackupMissing expected backup did not run within the time window.
ALERT_NAMES = {"NsBackupFailed", "NsBackupMissing"}
# ---------------------------------------------------------------------------
# HTTP request handler
# ---------------------------------------------------------------------------
class AlertHandler(BaseHTTPRequestHandler):
"""Minimal HTTP handler that accepts Alertmanager webhook payloads.
The ``config`` class attribute is populated by ``run_server()`` before
the server starts so that every request handler instance shares the
same configuration dictionary without using global state.
"""
config: dict = {}
def do_POST(self):
"""Handle POST /alert — the only supported endpoint."""
# Reject any path other than /alert.
if self.path != "/alert":
self.send_response(404)
self.end_headers()
return
# ---------------------------------------------------------------------------
# Read and parse the request body
# ---------------------------------------------------------------------------
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
try:
payload = json.loads(body)
except json.JSONDecodeError:
log.warning("Received invalid JSON payload")
self.send_response(400)
self.end_headers()
return
# Respond immediately so Alertmanager does not retry the webhook.
self.send_response(200)
self.end_headers()
# ---------------------------------------------------------------------------
# Filter relevant alerts
# ---------------------------------------------------------------------------
# Only process alerts that are currently firing and match our alert names.
# Resolved alerts (status == "resolved") are intentionally ignored because
# the NS8 monitoring stack clears alerts once the condition is gone; we do
# not need a separate "backup OK" notification path here.
alerts = payload.get("alerts", [])
relevant = [
a for a in alerts
if a.get("labels", {}).get("alertname") in ALERT_NAMES
and a.get("status") == "firing"
]
# ---------------------------------------------------------------------------
# Schedule the pipeline in a background thread
# ---------------------------------------------------------------------------
if relevant:
log.info(f"Received {len(relevant)} relevant alert(s), scheduling pipeline")
wait = self.config.get("correlator", {}).get("wait_seconds", 30)
t = threading.Thread(
target=_run_pipeline,
args=(relevant, self.config, wait),
# Daemon thread: will not prevent the process from exiting if
# systemd sends SIGTERM while a pipeline run is in progress.
daemon=True
)
t.start()
else:
log.debug("No relevant alerts in payload, ignoring")
def log_message(self, fmt, *args):
"""Redirect BaseHTTPRequestHandler access logs to the module logger."""
log.debug(f"HTTP: {fmt % args}")
# ---------------------------------------------------------------------------
# Pipeline runner (background thread target)
# ---------------------------------------------------------------------------
def _run_pipeline(alerts: list, config: dict, wait: int):
"""Wait for module states to settle, then run the full analysis pipeline.
Steps
-----
1. Sleep ``wait`` seconds so backup modules finish writing to Redis.
2. Extract backup_ids from alert labels when available; fall back to
scanning Redis for recently updated plan status keys.
3. Run the correlator to classify the overall outcome.
4. If the outcome is not SUCCESS, run the repository health check to
gather additional diagnostic information.
5. Send the email notification.
"""
log.info(f"Waiting {wait}s before status check...")
time.sleep(wait)
# ---------------------------------------------------------------------------
# Extract backup_ids from alert labels
# ---------------------------------------------------------------------------
# Alertmanager may include a ``backup_id`` label on the alert. When present
# it is used to read the exact Redis keys for that plan. When absent the
# correlator falls back to scanning for recent plan status keys.
backup_ids = list({
a["labels"].get("backup_id", "")
for a in alerts
if a["labels"].get("backup_id")
})
# ---------------------------------------------------------------------------
# Correlation
# ---------------------------------------------------------------------------
log.info("Running correlator...")
correlation = correlate_backup_status(config, backup_ids)
# ---------------------------------------------------------------------------
# Repository health check (non-SUCCESS outcomes only)
# ---------------------------------------------------------------------------
# Skipped on SUCCESS to avoid unnecessary restic network calls.
repo_status = None
if correlation["outcome"] != "SUCCESS":
log.info("Non-success outcome, running repo check...")
repo_status = check_repositories(config, correlation)
# ---------------------------------------------------------------------------
# Email notification
# ---------------------------------------------------------------------------
log.info("Sending notification...")
send_notification(config, alerts, correlation, repo_status)
# ---------------------------------------------------------------------------
# Server bootstrap
# ---------------------------------------------------------------------------
def run_server(config: dict):
"""Bind the HTTP server to the configured host/port and serve forever.
The host and port are read from the ``receiver`` section of the config.
Defaults to localhost:9099 to avoid accidental exposure on public interfaces.
"""
host = config.get("receiver", {}).get("host", "127.0.0.1")
port = config.get("receiver", {}).get("port", 9099)
# Share the config with the request handler class via a class attribute
# rather than passing it through BaseHTTPRequestHandler's constructor,
# which does not support custom arguments.
AlertHandler.config = config
server = HTTPServer((host, port), AlertHandler)
log.info(f"ns8-backup-monitor receiver listening on {host}:{port}")
server.serve_forever()