Files
ns8-backup-monitor/ns8_backup_monitor/receiver.py
T

245 lines
9.9 KiB
Python

#!/usr/bin/env python3
"""HTTP webhook receiver for Alertmanager backup alerts.
This module exposes a minimal HTTP server that Alertmanager POSTs
backup-failure notifications to. On receiving a relevant alert it
spawns a background daemon thread that runs the full analysis pipeline::
correlator -> repo_check (only on non-SUCCESS) -> notifier
Why a background thread?
------------------------
Alertmanager expects a quick HTTP 200 response. The correlation step
deliberately waits ``wait_seconds`` (default 30) so that slow backup
modules have time to write their final status into Redis before we read
it. Blocking the HTTP handler for 30+ seconds would cause Alertmanager
to retry the webhook, so the work is offloaded to a daemon thread.
Alert name mapping
------------------
NS8 ships two sets of Prometheus alert rules that can fire backup alerts:
Native stack (node_backup_status metric, present in all NS8 clusters):
backup_failed - one or more backup plans reported result != "success".
backup_missing - expected backup did not complete within the time window.
Custom / legacy rules (added manually or present in older NS8 versions):
NsBackupFailed - same semantics as backup_failed.
NsBackupMissing - same semantics as backup_missing.
All four names are matched. Any other alertname received on this webhook
is silently ignored so unrelated Alertmanager alerts do not generate noise.
Label mapping
-------------
NS8 native alerts carry the backup plan identifier in the ``id`` label.
Custom / legacy alerts may carry it in ``backup_id`` instead.
Both labels are checked when extracting plan IDs so the correlator can
look up the correct Redis keys regardless of which rule set fired.
Endpoints
---------
POST /alert Accepts an Alertmanager JSON payload.
All other paths return 404.
"""
import json
import logging
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from .correlator import correlate_backup_status
from .notifier import send_notification
from .repo_check import check_repositories
from .utils import load_config
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Alert filter
# ---------------------------------------------------------------------------
# Alert names that trigger the analysis pipeline.
#
# NS8 native monitoring stack (Prometheus node_backup_status rules):
# backup_failed - emitted when node_backup_status == 0 for one or more plans.
# backup_missing - emitted when no backup completed within the expected window.
#
# Custom / legacy alert rule names (kept for backward compatibility with NS8
# clusters that have manually configured or older rule sets):
# NsBackupFailed - same semantic as backup_failed.
# NsBackupMissing - same semantic as backup_missing.
ALERT_NAMES = {
"backup_failed", # NS8 native - canonical name from node_backup_status rules
"backup_missing", # NS8 native - missing / timed-out backup
"NsBackupFailed", # Legacy custom rule name (backward compatibility)
"NsBackupMissing", # Legacy custom rule name (backward compatibility)
}
# ---------------------------------------------------------------------------
# HTTP request handler
# ---------------------------------------------------------------------------
class AlertHandler(BaseHTTPRequestHandler):
"""Minimal HTTP handler that accepts Alertmanager webhook payloads.
The ``config`` class attribute is populated by ``run_server()`` before
the server starts so that every request handler instance shares the
same configuration dictionary without using global state.
"""
config: dict = {}
def do_POST(self):
"""Handle POST /alert - the only supported endpoint."""
# Reject any path other than /alert.
if self.path != "/alert":
self.send_response(404)
self.end_headers()
return
# Read and parse the request body.
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
try:
payload = json.loads(body)
except json.JSONDecodeError:
log.warning("Received invalid JSON payload")
self.send_response(400)
self.end_headers()
return
# Respond immediately so Alertmanager does not retry the webhook.
self.send_response(200)
self.end_headers()
# Filter relevant alerts.
# Only process alerts that are currently firing and match one of the
# known backup alert names (see ALERT_NAMES above).
# Resolved alerts (status == "resolved") are intentionally ignored:
# the NS8 monitoring stack clears alerts once the condition is gone
# and we do not need a "backup OK" notification path here.
alerts = payload.get("alerts", [])
relevant = []
for a in alerts:
labels = a.get("labels", {})
alertname = labels.get("alertname", "")
status = a.get("status", "")
# Log every received alert at DEBUG level to aid troubleshooting
# without requiring packet captures or full Alertmanager debug mode.
log.debug(
"Received alert: alertname=%r status=%r id=%r name=%r",
alertname,
status,
labels.get("id"),
labels.get("name"),
)
if alertname in ALERT_NAMES and status == "firing":
relevant.append(a)
if relevant:
log.info("Received %d relevant alert(s), scheduling pipeline", len(relevant))
wait = self.config.get("correlator", {}).get("wait_seconds", 30)
t = threading.Thread(
target=_run_pipeline,
args=(relevant, self.config, wait),
# Daemon thread: will not prevent the process from exiting if
# systemd sends SIGTERM while a pipeline run is in progress.
daemon=True,
)
t.start()
else:
log.debug("No relevant alerts in payload, ignoring")
def log_message(self, fmt, *args):
"""Redirect BaseHTTPRequestHandler access logs to the module logger."""
log.debug("HTTP: " + fmt, *args)
# ---------------------------------------------------------------------------
# Pipeline runner (background thread target)
# ---------------------------------------------------------------------------
def _run_pipeline(alerts: list, config: dict, wait: int):
"""Wait for module states to settle, then run the full analysis pipeline.
Steps
-----
1. Sleep ``wait`` seconds so backup modules finish writing to Redis.
2. Extract backup_ids from alert labels.
- Label ``backup_id``: used by custom / legacy rules.
- Label ``id``: used by NS8 native rules (backup_failed /
backup_missing carry the plan identifier here).
Both are checked; duplicates are removed so each plan is queried once.
When no IDs are found the correlator falls back to a broad Redis scan.
3. Run the correlator to classify the overall outcome.
4. If the outcome is not SUCCESS, run the repository health check to
gather additional diagnostic information.
5. Send the email notification.
"""
log.info("Waiting %ds before status check...", wait)
time.sleep(wait)
# Extract plan ids from alert labels.
# Priority:
# 1. Label "backup_id" - custom / legacy alert rules.
# 2. Label "id" - NS8 native rules (backup_failed / backup_missing
# carry the plan identifier in this label, not in
# "backup_id"). This was the root cause of the
# pipeline not triggering on automatic scheduled
# backups when only "id" was present in the alert.
# A set is used to deduplicate so each plan is only queried once.
seen: set = set()
backup_ids: list = []
for a in alerts:
labels = a.get("labels", {})
bid = labels.get("backup_id") or labels.get("id")
if bid and bid not in seen:
seen.add(bid)
backup_ids.append(bid)
if backup_ids:
log.info("Resolved backup_ids from alert labels: %s", backup_ids)
else:
log.info("No backup_id in alert labels; correlator will scan all recent Redis keys")
log.info("Running correlator...")
correlation = correlate_backup_status(config, backup_ids)
# Skip repo check on SUCCESS to avoid unnecessary restic network calls.
repo_status = None
if correlation["outcome"] != "SUCCESS":
log.info("Non-success outcome, running repo check...")
repo_status = check_repositories(config, correlation)
log.info("Sending notification...")
send_notification(config, alerts, correlation, repo_status)
# ---------------------------------------------------------------------------
# Server bootstrap
# ---------------------------------------------------------------------------
def run_server(config: dict):
"""Bind the HTTP server to the configured host/port and serve forever.
The host and port are read from the ``receiver`` section of the config
file. Defaults to 127.0.0.1:9099 to avoid accidental exposure on public
interfaces - change ``receiver.host`` to ``0.0.0.0`` only if the webhook
endpoint needs to be reachable from a remote Alertmanager instance.
"""
host = config.get("receiver", {}).get("host", "127.0.0.1")
port = config.get("receiver", {}).get("port", 9099)
# Share the config with the request handler class via a class attribute
# rather than passing it through BaseHTTPRequestHandler's constructor,
# which does not support custom arguments.
AlertHandler.config = config
server = HTTPServer((host, port), AlertHandler)
log.info("ns8-backup-monitor receiver listening on %s:%d", host, port)
server.serve_forever()