docs: add section-by-section comments — correlator.py
This commit is contained in:
@@ -1,19 +1,33 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""Read NS8 cluster Redis state to determine the backup outcome.
|
||||||
correlator.py - Reads NS8 cluster Redis state to determine backup outcome.
|
|
||||||
|
|
||||||
For each backup plan/schedule, reads the per-module backup status and
|
For each backup plan/schedule, this module reads the per-module backup
|
||||||
produces a classified outcome: SUCCESS, PARTIAL, or REPO_FAILURE.
|
status hashes from the cluster Redis and produces a classified outcome:
|
||||||
|
|
||||||
NS8 Redis key patterns:
|
SUCCESS – All modules finished without errors.
|
||||||
cluster/backup/<backup_id>/status -> last overall plan status (hash)
|
PARTIAL – Some modules failed, others succeeded.
|
||||||
module/<module_id>/backup/<backup_id>/status -> per-module status (hash)
|
REPO_FAILURE – All modules failed, total is zero, or no status was
|
||||||
|
found in Redis at all (possible repository-level issue).
|
||||||
|
|
||||||
Fields in status hash:
|
NS8 Redis key patterns
|
||||||
result : success | error
|
-----------------------
|
||||||
timestamp: ISO8601
|
cluster/backup/<backup_id>/status
|
||||||
error : error message if any
|
Overall plan status hash. Fields: result, timestamp, errors.
|
||||||
errors : number of module errors (in plan status)
|
|
||||||
|
module/<module_id>/backup/<backup_id>/status
|
||||||
|
Per-module status hash. Fields: result, timestamp, error.
|
||||||
|
|
||||||
|
Redis hash fields
|
||||||
|
-----------------
|
||||||
|
result : "success" | "error"
|
||||||
|
timestamp : ISO 8601 string
|
||||||
|
error : human-readable error message (empty on success)
|
||||||
|
errors : integer count of module errors (plan-level hash only)
|
||||||
|
|
||||||
|
Dependencies
|
||||||
|
------------
|
||||||
|
Only the standard library and ``redis-cli`` (installed with NS8) are required.
|
||||||
|
No Python Redis client library is needed.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
@@ -24,25 +38,73 @@ from typing import Optional
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Redis helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# These thin wrappers call redis-cli via subprocess instead of using a Python
|
||||||
|
# Redis client, keeping the dependency list to zero and staying consistent with
|
||||||
|
# how other NS8 scripts interact with the cluster Redis.
|
||||||
|
|
||||||
def _redis_cmd(config: dict, *args) -> str:
|
def _redis_cmd(config: dict, *args) -> str:
|
||||||
"""Run a redis-cli command against the NS8 cluster Redis socket."""
|
"""Run a redis-cli command against the NS8 cluster Redis Unix socket.
|
||||||
socket = config.get("redis", {}).get("socket", "/var/lib/nethserver/cluster/state/redis.sock")
|
|
||||||
|
Args:
|
||||||
|
config: Parsed configuration dictionary (reads ``redis.socket``).
|
||||||
|
*args: Redis command and arguments (e.g. "KEYS", "cluster/backup/*").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Raw stdout string, stripped of leading/trailing whitespace.
|
||||||
|
"""
|
||||||
|
socket = config.get("redis", {}).get(
|
||||||
|
"socket", "/var/lib/nethserver/cluster/state/redis.sock"
|
||||||
|
)
|
||||||
cmd = ["redis-cli", "-s", socket] + list(args)
|
cmd = ["redis-cli", "-s", socket] + list(args)
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||||
return result.stdout.strip()
|
return result.stdout.strip()
|
||||||
|
|
||||||
|
|
||||||
def _redis_hgetall(config: dict, key: str) -> dict:
|
def _redis_hgetall(config: dict, key: str) -> dict:
|
||||||
"""Return all fields of a Redis hash as a dict in a single redis-cli call."""
|
"""Return all fields of a Redis hash as a Python dict.
|
||||||
socket = config.get("redis", {}).get("socket", "/var/lib/nethserver/cluster/state/redis.sock")
|
|
||||||
|
``redis-cli HGETALL`` outputs alternating field / value lines.
|
||||||
|
This function zips consecutive pairs into a dict.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: Parsed configuration dictionary.
|
||||||
|
key: Full Redis key of the hash to read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict mapping field names to values, or an empty dict if the key
|
||||||
|
does not exist or the hash is empty.
|
||||||
|
"""
|
||||||
|
socket = config.get("redis", {}).get(
|
||||||
|
"socket", "/var/lib/nethserver/cluster/state/redis.sock"
|
||||||
|
)
|
||||||
cmd = ["redis-cli", "-s", socket, "HGETALL", key]
|
cmd = ["redis-cli", "-s", socket, "HGETALL", key]
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||||
lines = [l for l in result.stdout.strip().splitlines() if l]
|
lines = [l for l in result.stdout.strip().splitlines() if l]
|
||||||
|
# redis-cli HGETALL returns alternating key / value lines:
|
||||||
|
# line 0 → field name, line 1 → value, line 2 → field name, …
|
||||||
return dict(zip(lines[::2], lines[1::2]))
|
return dict(zip(lines[::2], lines[1::2]))
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Recent backup discovery
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _get_recent_backup_ids(config: dict, window: int) -> list:
|
def _get_recent_backup_ids(config: dict, window: int) -> list:
|
||||||
"""Scan Redis for backup plan status keys updated within window seconds."""
|
"""Scan Redis for plan status keys updated within the last ``window`` seconds.
|
||||||
|
|
||||||
|
Used as a fallback when Alertmanager does not include a ``backup_id``
|
||||||
|
label on the alert (older NS8 versions or custom alert rules).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: Parsed configuration dictionary.
|
||||||
|
window: Look-back window in seconds (from ``correlator.recent_window``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of backup_id strings whose plan status was updated recently.
|
||||||
|
"""
|
||||||
raw = _redis_cmd(config, "KEYS", "cluster/backup/*/status")
|
raw = _redis_cmd(config, "KEYS", "cluster/backup/*/status")
|
||||||
keys = [k for k in raw.splitlines() if k]
|
keys = [k for k in raw.splitlines() if k]
|
||||||
now = datetime.now(timezone.utc).timestamp()
|
now = datetime.now(timezone.utc).timestamp()
|
||||||
@@ -54,8 +116,11 @@ def _get_recent_backup_ids(config: dict, window: int) -> list:
|
|||||||
if not ts_raw:
|
if not ts_raw:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
|
# Parse ISO 8601 timestamp; replace trailing 'Z' with '+00:00'
|
||||||
|
# for compatibility with Python < 3.11 fromisoformat().
|
||||||
ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")).timestamp()
|
ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")).timestamp()
|
||||||
if (now - ts) <= window:
|
if (now - ts) <= window:
|
||||||
|
# Key format: cluster/backup/<backup_id>/status
|
||||||
parts = key.split("/")
|
parts = key.split("/")
|
||||||
if len(parts) >= 3:
|
if len(parts) >= 3:
|
||||||
recent.append(parts[2])
|
recent.append(parts[2])
|
||||||
@@ -65,14 +130,37 @@ def _get_recent_backup_ids(config: dict, window: int) -> list:
|
|||||||
return recent
|
return recent
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Per-module status collection
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _get_module_statuses(config: dict, backup_id: str) -> list:
|
def _get_module_statuses(config: dict, backup_id: str) -> list:
|
||||||
"""Get all per-module status entries for a given backup_id via HGETALL."""
|
"""Return all per-module status entries for a given backup_id.
|
||||||
|
|
||||||
|
Scans Redis for keys matching ``module/*/backup/<backup_id>/status``
|
||||||
|
and reads each hash with HGETALL.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: Parsed configuration dictionary.
|
||||||
|
backup_id: The backup plan identifier (e.g. "1", "2").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of dicts, one per module:
|
||||||
|
{
|
||||||
|
"module_id" : str,
|
||||||
|
"backup_id" : str,
|
||||||
|
"result" : "success" | "error" | "unknown",
|
||||||
|
"error" : str,
|
||||||
|
"timestamp" : str (ISO 8601),
|
||||||
|
}
|
||||||
|
"""
|
||||||
pattern = f"module/*/backup/{backup_id}/status"
|
pattern = f"module/*/backup/{backup_id}/status"
|
||||||
raw = _redis_cmd(config, "KEYS", pattern)
|
raw = _redis_cmd(config, "KEYS", pattern)
|
||||||
keys = [k for k in raw.splitlines() if k]
|
keys = [k for k in raw.splitlines() if k]
|
||||||
statuses = []
|
statuses = []
|
||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
|
# Key format: module/<module_id>/backup/<backup_id>/status
|
||||||
module_id = key.split("/")[1]
|
module_id = key.split("/")[1]
|
||||||
fields = _redis_hgetall(config, key)
|
fields = _redis_hgetall(config, key)
|
||||||
if not fields:
|
if not fields:
|
||||||
@@ -89,28 +177,47 @@ def _get_module_statuses(config: dict, backup_id: str) -> list:
|
|||||||
return statuses
|
return statuses
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Main correlator entry point
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) -> dict:
|
def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) -> dict:
|
||||||
"""
|
"""Classify the overall backup outcome by reading per-module Redis state.
|
||||||
Main correlator entry point.
|
|
||||||
|
Args:
|
||||||
|
config: Parsed configuration dictionary.
|
||||||
|
backup_ids: List of backup plan IDs from Alertmanager alert labels.
|
||||||
|
When empty or None, the function falls back to scanning
|
||||||
|
Redis for recently updated plan status keys.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
{
|
A dict with the following keys:
|
||||||
"outcome" : "SUCCESS" | "PARTIAL" | "REPO_FAILURE",
|
|
||||||
"backup_ids" : [...],
|
outcome : "SUCCESS" | "PARTIAL" | "REPO_FAILURE"
|
||||||
"modules" : [{module_id, backup_id, result, error, timestamp}, ...],
|
backup_ids : list of plan IDs that were evaluated
|
||||||
"failed_modules" : [...],
|
modules : list of per-module status dicts (see _get_module_statuses)
|
||||||
"total" : int,
|
failed_modules : subset of ``modules`` where result != "success"
|
||||||
"failed" : int,
|
total : total number of module status entries found
|
||||||
"succeeded" : int,
|
failed : number of failed modules
|
||||||
"note" : str # optional
|
succeeded : number of succeeded modules
|
||||||
}
|
note : optional human-readable explanation string
|
||||||
|
|
||||||
|
Outcome classification rules
|
||||||
|
----------------------------
|
||||||
|
failed == 0 and total > 0 → SUCCESS
|
||||||
|
failed == total or total == 0 → REPO_FAILURE (all failed or nothing found)
|
||||||
|
otherwise → PARTIAL
|
||||||
"""
|
"""
|
||||||
window = config.get("correlator", {}).get("recent_window", 3600)
|
window = config.get("correlator", {}).get("recent_window", 3600)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Resolve backup_ids
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
if not backup_ids:
|
if not backup_ids:
|
||||||
log.info("No backup_ids from alert labels, scanning Redis for recent backups...")
|
log.info("No backup_ids from alert labels, scanning Redis for recent backups...")
|
||||||
backup_ids = _get_recent_backup_ids(config, window)
|
backup_ids = _get_recent_backup_ids(config, window)
|
||||||
|
|
||||||
|
# If still empty, no relevant Redis state exists — treat as full failure.
|
||||||
if not backup_ids:
|
if not backup_ids:
|
||||||
log.warning("No recent backup status keys found in Redis")
|
log.warning("No recent backup status keys found in Redis")
|
||||||
return {
|
return {
|
||||||
@@ -121,15 +228,21 @@ def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) ->
|
|||||||
"total": 0,
|
"total": 0,
|
||||||
"failed": 0,
|
"failed": 0,
|
||||||
"succeeded": 0,
|
"succeeded": 0,
|
||||||
"note": "No backup status found in Redis - possible repo or scheduling failure",
|
"note": "No backup status found in Redis — possible repo or scheduling failure",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Collect per-module statuses across all plans
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
all_modules = []
|
all_modules = []
|
||||||
for bid in backup_ids:
|
for bid in backup_ids:
|
||||||
modules = _get_module_statuses(config, bid)
|
modules = _get_module_statuses(config, bid)
|
||||||
log.info(f"backup_id={bid}: found {len(modules)} module status entries")
|
log.info(f"backup_id={bid}: found {len(modules)} module status entries")
|
||||||
all_modules.extend(modules)
|
all_modules.extend(modules)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Outcome classification
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
total = len(all_modules)
|
total = len(all_modules)
|
||||||
failed_modules = [m for m in all_modules if m["result"] != "success"]
|
failed_modules = [m for m in all_modules if m["result"] != "success"]
|
||||||
succeeded = total - len(failed_modules)
|
succeeded = total - len(failed_modules)
|
||||||
@@ -137,11 +250,15 @@ def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) ->
|
|||||||
if len(failed_modules) == 0 and total > 0:
|
if len(failed_modules) == 0 and total > 0:
|
||||||
outcome = "SUCCESS"
|
outcome = "SUCCESS"
|
||||||
elif len(failed_modules) == total or total == 0:
|
elif len(failed_modules) == total or total == 0:
|
||||||
|
# All modules failed, or no modules were found at all.
|
||||||
outcome = "REPO_FAILURE"
|
outcome = "REPO_FAILURE"
|
||||||
else:
|
else:
|
||||||
outcome = "PARTIAL"
|
outcome = "PARTIAL"
|
||||||
|
|
||||||
log.info(f"Correlation result: outcome={outcome}, total={total}, succeeded={succeeded}, failed={len(failed_modules)}")
|
log.info(
|
||||||
|
f"Correlation result: outcome={outcome}, total={total}, "
|
||||||
|
f"succeeded={succeeded}, failed={len(failed_modules)}"
|
||||||
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"outcome": outcome,
|
"outcome": outcome,
|
||||||
|
|||||||
Reference in New Issue
Block a user