diff --git a/ns8_backup_monitor/correlator.py b/ns8_backup_monitor/correlator.py index a5ece2f..2700ca0 100644 --- a/ns8_backup_monitor/correlator.py +++ b/ns8_backup_monitor/correlator.py @@ -5,9 +5,9 @@ correlator.py - Reads NS8 cluster Redis state to determine backup outcome. For each backup plan/schedule, reads the per-module backup status and produces a classified outcome: SUCCESS, PARTIAL, or REPO_FAILURE. -NS8 Redis key pattern for backup status (community/dev research): - cluster/backup//status -> last overall status - module//backup//status -> per-module status +NS8 Redis key patterns: + cluster/backup//status -> last overall plan status (hash) + module//backup//status -> per-module status (hash) Fields in status hash: result : success | error @@ -16,10 +16,8 @@ Fields in status hash: errors : number of module errors (in plan status) """ -import json import logging import subprocess -import time from datetime import datetime, timezone from typing import Optional @@ -34,6 +32,15 @@ def _redis_cmd(config: dict, *args) -> str: return result.stdout.strip() +def _redis_hgetall(config: dict, key: str) -> dict: + """Return all fields of a Redis hash as a dict in a single redis-cli call.""" + socket = config.get("redis", {}).get("socket", "/var/lib/nethserver/cluster/state/redis.sock") + cmd = ["redis-cli", "-s", socket, "HGETALL", key] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + lines = [l for l in result.stdout.strip().splitlines() if l] + return dict(zip(lines[::2], lines[1::2])) + + def _get_recent_backup_ids(config: dict, window: int) -> list: """Scan Redis for backup plan status keys updated within window seconds.""" raw = _redis_cmd(config, "KEYS", "cluster/backup/*/status") @@ -42,13 +49,13 @@ def _get_recent_backup_ids(config: dict, window: int) -> list: recent = [] for key in keys: - ts_raw = _redis_cmd(config, "HGET", key, "timestamp") + fields = _redis_hgetall(config, key) + ts_raw = fields.get("timestamp", "") if not ts_raw: continue try: ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")).timestamp() if (now - ts) <= window: - # Extract backup_id from key: cluster/backup//status parts = key.split("/") if len(parts) >= 3: recent.append(parts[2]) @@ -59,7 +66,7 @@ def _get_recent_backup_ids(config: dict, window: int) -> list: def _get_module_statuses(config: dict, backup_id: str) -> list: - """Get all per-module status entries for a given backup_id.""" + """Get all per-module status entries for a given backup_id via HGETALL.""" pattern = f"module/*/backup/{backup_id}/status" raw = _redis_cmd(config, "KEYS", pattern) keys = [k for k in raw.splitlines() if k] @@ -67,15 +74,16 @@ def _get_module_statuses(config: dict, backup_id: str) -> list: for key in keys: module_id = key.split("/")[1] - result = _redis_cmd(config, "HGET", key, "result") - error = _redis_cmd(config, "HGET", key, "error") - timestamp = _redis_cmd(config, "HGET", key, "timestamp") + fields = _redis_hgetall(config, key) + if not fields: + log.debug(f"Empty or missing status hash for {key}") + continue statuses.append({ "module_id": module_id, "backup_id": backup_id, - "result": result, - "error": error, - "timestamp": timestamp, + "result": fields.get("result", "unknown"), + "error": fields.get("error", ""), + "timestamp": fields.get("timestamp", ""), }) return statuses @@ -85,18 +93,16 @@ def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) -> """ Main correlator entry point. - Returns a dict: + Returns: { - "outcome": "SUCCESS" | "PARTIAL" | "REPO_FAILURE", - "backup_ids": [...], - "modules": [ - {"module_id": ..., "backup_id": ..., "result": ..., "error": ..., "timestamp": ...}, - ... - ], - "failed_modules": [...], - "total": int, - "failed": int, - "succeeded": int, + "outcome" : "SUCCESS" | "PARTIAL" | "REPO_FAILURE", + "backup_ids" : [...], + "modules" : [{module_id, backup_id, result, error, timestamp}, ...], + "failed_modules" : [...], + "total" : int, + "failed" : int, + "succeeded" : int, + "note" : str # optional } """ window = config.get("correlator", {}).get("recent_window", 3600) @@ -120,7 +126,9 @@ def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) -> all_modules = [] for bid in backup_ids: - all_modules.extend(_get_module_statuses(config, bid)) + modules = _get_module_statuses(config, bid) + log.info(f"backup_id={bid}: found {len(modules)} module status entries") + all_modules.extend(modules) total = len(all_modules) failed_modules = [m for m in all_modules if m["result"] != "success"] @@ -133,6 +141,8 @@ def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) -> else: outcome = "PARTIAL" + log.info(f"Correlation result: outcome={outcome}, total={total}, succeeded={succeeded}, failed={len(failed_modules)}") + return { "outcome": outcome, "backup_ids": backup_ids,