docs: add section-by-section comments — receiver.py
This commit is contained in:
@@ -1,10 +1,24 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""HTTP webhook receiver for Alertmanager backup alerts.
|
||||||
receiver.py - HTTP webhook receiver for Alertmanager alerts.
|
|
||||||
|
|
||||||
Listens on configured host:port for POST /alert from Alertmanager.
|
This module exposes a minimal HTTP server that Alertmanager POSTs
|
||||||
On receiving NsBackupFailed or NsBackupMissing, triggers the pipeline:
|
backup-failure notifications to. On receiving a relevant alert it
|
||||||
correlator -> repo_check -> notifier
|
spawns a background daemon thread that runs the full analysis pipeline:
|
||||||
|
|
||||||
|
correlator → repo_check (only on non-SUCCESS) → notifier
|
||||||
|
|
||||||
|
Why a background thread?
|
||||||
|
------------------------
|
||||||
|
Alertmanager expects a quick HTTP 200 response. The correlation step
|
||||||
|
deliberately waits ``wait_seconds`` (default 30) so that slow backup
|
||||||
|
modules have time to write their final status into Redis before we read
|
||||||
|
it. Blocking the HTTP handler for 30+ seconds would cause Alertmanager
|
||||||
|
to retry the webhook, so the work is offloaded to a daemon thread.
|
||||||
|
|
||||||
|
Endpoints
|
||||||
|
---------
|
||||||
|
POST /alert Accepts an Alertmanager JSON payload.
|
||||||
|
All other paths return 404.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
@@ -21,18 +35,41 @@ from .utils import load_config
|
|||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Alert filter
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Only these Alertmanager alert names trigger the pipeline.
|
||||||
|
# NsBackupFailed – one or more modules reported an error.
|
||||||
|
# NsBackupMissing – expected backup did not run within the time window.
|
||||||
ALERT_NAMES = {"NsBackupFailed", "NsBackupMissing"}
|
ALERT_NAMES = {"NsBackupFailed", "NsBackupMissing"}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# HTTP request handler
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
class AlertHandler(BaseHTTPRequestHandler):
|
class AlertHandler(BaseHTTPRequestHandler):
|
||||||
|
"""Minimal HTTP handler that accepts Alertmanager webhook payloads.
|
||||||
|
|
||||||
|
The ``config`` class attribute is populated by ``run_server()`` before
|
||||||
|
the server starts so that every request handler instance shares the
|
||||||
|
same configuration dictionary without using global state.
|
||||||
|
"""
|
||||||
|
|
||||||
config: dict = {}
|
config: dict = {}
|
||||||
|
|
||||||
def do_POST(self):
|
def do_POST(self):
|
||||||
|
"""Handle POST /alert — the only supported endpoint."""
|
||||||
|
|
||||||
|
# Reject any path other than /alert.
|
||||||
if self.path != "/alert":
|
if self.path != "/alert":
|
||||||
self.send_response(404)
|
self.send_response(404)
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Read and parse the request body
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
length = int(self.headers.get("Content-Length", 0))
|
length = int(self.headers.get("Content-Length", 0))
|
||||||
body = self.rfile.read(length)
|
body = self.rfile.read(length)
|
||||||
|
|
||||||
@@ -44,9 +81,17 @@ class AlertHandler(BaseHTTPRequestHandler):
|
|||||||
self.end_headers()
|
self.end_headers()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Respond immediately so Alertmanager does not retry the webhook.
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Filter relevant alerts
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Only process alerts that are currently firing and match our alert names.
|
||||||
|
# Resolved alerts (status == "resolved") are intentionally ignored because
|
||||||
|
# the NS8 monitoring stack clears alerts once the condition is gone; we do
|
||||||
|
# not need a separate "backup OK" notification path here.
|
||||||
alerts = payload.get("alerts", [])
|
alerts = payload.get("alerts", [])
|
||||||
relevant = [
|
relevant = [
|
||||||
a for a in alerts
|
a for a in alerts
|
||||||
@@ -54,12 +99,17 @@ class AlertHandler(BaseHTTPRequestHandler):
|
|||||||
and a.get("status") == "firing"
|
and a.get("status") == "firing"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Schedule the pipeline in a background thread
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
if relevant:
|
if relevant:
|
||||||
log.info(f"Received {len(relevant)} relevant alert(s), scheduling pipeline")
|
log.info(f"Received {len(relevant)} relevant alert(s), scheduling pipeline")
|
||||||
wait = self.config.get("correlator", {}).get("wait_seconds", 30)
|
wait = self.config.get("correlator", {}).get("wait_seconds", 30)
|
||||||
t = threading.Thread(
|
t = threading.Thread(
|
||||||
target=_run_pipeline,
|
target=_run_pipeline,
|
||||||
args=(relevant, self.config, wait),
|
args=(relevant, self.config, wait),
|
||||||
|
# Daemon thread: will not prevent the process from exiting if
|
||||||
|
# systemd sends SIGTERM while a pipeline run is in progress.
|
||||||
daemon=True
|
daemon=True
|
||||||
)
|
)
|
||||||
t.start()
|
t.start()
|
||||||
@@ -67,38 +117,82 @@ class AlertHandler(BaseHTTPRequestHandler):
|
|||||||
log.debug("No relevant alerts in payload, ignoring")
|
log.debug("No relevant alerts in payload, ignoring")
|
||||||
|
|
||||||
def log_message(self, fmt, *args):
|
def log_message(self, fmt, *args):
|
||||||
|
"""Redirect BaseHTTPRequestHandler access logs to the module logger."""
|
||||||
log.debug(f"HTTP: {fmt % args}")
|
log.debug(f"HTTP: {fmt % args}")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Pipeline runner (background thread target)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _run_pipeline(alerts: list, config: dict, wait: int):
|
def _run_pipeline(alerts: list, config: dict, wait: int):
|
||||||
"""Wait for modules to finish, then run correlator -> repo_check -> notifier."""
|
"""Wait for module states to settle, then run the full analysis pipeline.
|
||||||
|
|
||||||
|
Steps
|
||||||
|
-----
|
||||||
|
1. Sleep ``wait`` seconds so backup modules finish writing to Redis.
|
||||||
|
2. Extract backup_ids from alert labels when available; fall back to
|
||||||
|
scanning Redis for recently updated plan status keys.
|
||||||
|
3. Run the correlator to classify the overall outcome.
|
||||||
|
4. If the outcome is not SUCCESS, run the repository health check to
|
||||||
|
gather additional diagnostic information.
|
||||||
|
5. Send the email notification.
|
||||||
|
"""
|
||||||
log.info(f"Waiting {wait}s before status check...")
|
log.info(f"Waiting {wait}s before status check...")
|
||||||
time.sleep(wait)
|
time.sleep(wait)
|
||||||
|
|
||||||
# Collect unique backup_ids from alert labels if available
|
# ---------------------------------------------------------------------------
|
||||||
|
# Extract backup_ids from alert labels
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Alertmanager may include a ``backup_id`` label on the alert. When present
|
||||||
|
# it is used to read the exact Redis keys for that plan. When absent the
|
||||||
|
# correlator falls back to scanning for recent plan status keys.
|
||||||
backup_ids = list({
|
backup_ids = list({
|
||||||
a["labels"].get("backup_id", "")
|
a["labels"].get("backup_id", "")
|
||||||
for a in alerts
|
for a in alerts
|
||||||
if a["labels"].get("backup_id")
|
if a["labels"].get("backup_id")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Correlation
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
log.info("Running correlator...")
|
log.info("Running correlator...")
|
||||||
correlation = correlate_backup_status(config, backup_ids)
|
correlation = correlate_backup_status(config, backup_ids)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Repository health check (non-SUCCESS outcomes only)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Skipped on SUCCESS to avoid unnecessary restic network calls.
|
||||||
repo_status = None
|
repo_status = None
|
||||||
if correlation["outcome"] != "SUCCESS":
|
if correlation["outcome"] != "SUCCESS":
|
||||||
log.info("Non-success outcome, running repo check...")
|
log.info("Non-success outcome, running repo check...")
|
||||||
repo_status = check_repositories(config, correlation)
|
repo_status = check_repositories(config, correlation)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Email notification
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
log.info("Sending notification...")
|
log.info("Sending notification...")
|
||||||
send_notification(config, alerts, correlation, repo_status)
|
send_notification(config, alerts, correlation, repo_status)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Server bootstrap
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def run_server(config: dict):
|
def run_server(config: dict):
|
||||||
|
"""Bind the HTTP server to the configured host/port and serve forever.
|
||||||
|
|
||||||
|
The host and port are read from the ``receiver`` section of the config.
|
||||||
|
Defaults to localhost:9099 to avoid accidental exposure on public interfaces.
|
||||||
|
"""
|
||||||
host = config.get("receiver", {}).get("host", "127.0.0.1")
|
host = config.get("receiver", {}).get("host", "127.0.0.1")
|
||||||
port = config.get("receiver", {}).get("port", 9099)
|
port = config.get("receiver", {}).get("port", 9099)
|
||||||
|
|
||||||
|
# Share the config with the request handler class via a class attribute
|
||||||
|
# rather than passing it through BaseHTTPRequestHandler's constructor,
|
||||||
|
# which does not support custom arguments.
|
||||||
AlertHandler.config = config
|
AlertHandler.config = config
|
||||||
|
|
||||||
server = HTTPServer((host, port), AlertHandler)
|
server = HTTPServer((host, port), AlertHandler)
|
||||||
log.info(f"ns8-backup-monitor receiver listening on {host}:{port}")
|
log.info(f"ns8-backup-monitor receiver listening on {host}:{port}")
|
||||||
server.serve_forever()
|
server.serve_forever()
|
||||||
|
|||||||
Reference in New Issue
Block a user