feat: add alertmanager webhook receiver
This commit is contained in:
@@ -0,0 +1,104 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
receiver.py - HTTP webhook receiver for Alertmanager alerts.
|
||||
|
||||
Listens on configured host:port for POST /alert from Alertmanager.
|
||||
On receiving NsBackupFailed or NsBackupMissing, triggers the pipeline:
|
||||
correlator -> repo_check -> notifier
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from typing import Any
|
||||
|
||||
from .correlator import correlate_backup_status
|
||||
from .notifier import send_notification
|
||||
from .repo_check import check_repositories
|
||||
from .utils import load_config
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ALERT_NAMES = {"NsBackupFailed", "NsBackupMissing"}
|
||||
|
||||
|
||||
class AlertHandler(BaseHTTPRequestHandler):
|
||||
config: dict = {}
|
||||
|
||||
def do_POST(self):
|
||||
if self.path != "/alert":
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
return
|
||||
|
||||
length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(length)
|
||||
|
||||
try:
|
||||
payload = json.loads(body)
|
||||
except json.JSONDecodeError:
|
||||
log.warning("Received invalid JSON payload")
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
return
|
||||
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
|
||||
alerts = payload.get("alerts", [])
|
||||
relevant = [
|
||||
a for a in alerts
|
||||
if a.get("labels", {}).get("alertname") in ALERT_NAMES
|
||||
and a.get("status") == "firing"
|
||||
]
|
||||
|
||||
if relevant:
|
||||
log.info(f"Received {len(relevant)} relevant alert(s), scheduling pipeline")
|
||||
wait = self.config.get("correlator", {}).get("wait_seconds", 30)
|
||||
t = threading.Thread(
|
||||
target=_run_pipeline,
|
||||
args=(relevant, self.config, wait),
|
||||
daemon=True
|
||||
)
|
||||
t.start()
|
||||
else:
|
||||
log.debug("No relevant alerts in payload, ignoring")
|
||||
|
||||
def log_message(self, fmt, *args):
|
||||
log.debug(f"HTTP: {fmt % args}")
|
||||
|
||||
|
||||
def _run_pipeline(alerts: list, config: dict, wait: int):
|
||||
"""Wait for modules to finish, then run correlator -> repo_check -> notifier."""
|
||||
log.info(f"Waiting {wait}s before status check...")
|
||||
time.sleep(wait)
|
||||
|
||||
# Collect unique backup_ids from alert labels if available
|
||||
backup_ids = list({
|
||||
a["labels"].get("backup_id", "")
|
||||
for a in alerts
|
||||
if a["labels"].get("backup_id")
|
||||
})
|
||||
|
||||
log.info("Running correlator...")
|
||||
correlation = correlate_backup_status(config, backup_ids)
|
||||
|
||||
repo_status = None
|
||||
if correlation["outcome"] != "SUCCESS":
|
||||
log.info("Non-success outcome, running repo check...")
|
||||
repo_status = check_repositories(config, correlation)
|
||||
|
||||
log.info("Sending notification...")
|
||||
send_notification(config, alerts, correlation, repo_status)
|
||||
|
||||
|
||||
def run_server(config: dict):
|
||||
host = config.get("receiver", {}).get("host", "127.0.0.1")
|
||||
port = config.get("receiver", {}).get("port", 9099)
|
||||
|
||||
AlertHandler.config = config
|
||||
server = HTTPServer((host, port), AlertHandler)
|
||||
log.info(f"ns8-backup-monitor receiver listening on {host}:{port}")
|
||||
server.serve_forever()
|
||||
Reference in New Issue
Block a user