105 lines
3.1 KiB
Python
105 lines
3.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
receiver.py - HTTP webhook receiver for Alertmanager alerts.
|
|
|
|
Listens on configured host:port for POST /alert from Alertmanager.
|
|
On receiving NsBackupFailed or NsBackupMissing, triggers the pipeline:
|
|
correlator -> repo_check -> notifier
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from typing import Any
|
|
|
|
from .correlator import correlate_backup_status
|
|
from .notifier import send_notification
|
|
from .repo_check import check_repositories
|
|
from .utils import load_config
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
ALERT_NAMES = {"NsBackupFailed", "NsBackupMissing"}
|
|
|
|
|
|
class AlertHandler(BaseHTTPRequestHandler):
|
|
config: dict = {}
|
|
|
|
def do_POST(self):
|
|
if self.path != "/alert":
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
return
|
|
|
|
length = int(self.headers.get("Content-Length", 0))
|
|
body = self.rfile.read(length)
|
|
|
|
try:
|
|
payload = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
log.warning("Received invalid JSON payload")
|
|
self.send_response(400)
|
|
self.end_headers()
|
|
return
|
|
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
|
|
alerts = payload.get("alerts", [])
|
|
relevant = [
|
|
a for a in alerts
|
|
if a.get("labels", {}).get("alertname") in ALERT_NAMES
|
|
and a.get("status") == "firing"
|
|
]
|
|
|
|
if relevant:
|
|
log.info(f"Received {len(relevant)} relevant alert(s), scheduling pipeline")
|
|
wait = self.config.get("correlator", {}).get("wait_seconds", 30)
|
|
t = threading.Thread(
|
|
target=_run_pipeline,
|
|
args=(relevant, self.config, wait),
|
|
daemon=True
|
|
)
|
|
t.start()
|
|
else:
|
|
log.debug("No relevant alerts in payload, ignoring")
|
|
|
|
def log_message(self, fmt, *args):
|
|
log.debug(f"HTTP: {fmt % args}")
|
|
|
|
|
|
def _run_pipeline(alerts: list, config: dict, wait: int):
|
|
"""Wait for modules to finish, then run correlator -> repo_check -> notifier."""
|
|
log.info(f"Waiting {wait}s before status check...")
|
|
time.sleep(wait)
|
|
|
|
# Collect unique backup_ids from alert labels if available
|
|
backup_ids = list({
|
|
a["labels"].get("backup_id", "")
|
|
for a in alerts
|
|
if a["labels"].get("backup_id")
|
|
})
|
|
|
|
log.info("Running correlator...")
|
|
correlation = correlate_backup_status(config, backup_ids)
|
|
|
|
repo_status = None
|
|
if correlation["outcome"] != "SUCCESS":
|
|
log.info("Non-success outcome, running repo check...")
|
|
repo_status = check_repositories(config, correlation)
|
|
|
|
log.info("Sending notification...")
|
|
send_notification(config, alerts, correlation, repo_status)
|
|
|
|
|
|
def run_server(config: dict):
|
|
host = config.get("receiver", {}).get("host", "127.0.0.1")
|
|
port = config.get("receiver", {}).get("port", 9099)
|
|
|
|
AlertHandler.config = config
|
|
server = HTTPServer((host, port), AlertHandler)
|
|
log.info(f"ns8-backup-monitor receiver listening on {host}:{port}")
|
|
server.serve_forever()
|