#!/usr/bin/env python3 """ repo_check.py - Verifies reachability and health of NS8 backup repositories. For each backup destination configured in the cluster, attempts a `restic stats` or `restic snapshots` command to verify the repo is accessible and readable. Distinguishes between: - UNREACHABLE: network/mount error, cannot connect at all - LOCKED: restic repo is locked (previous backup crashed) - CORRUPTED: repo exists but integrity check fails - OK: repo is accessible """ import logging import subprocess from typing import Optional log = logging.getLogger(__name__) def _redis_cmd(config: dict, *args) -> str: socket = config.get("redis", {}).get("socket", "/var/lib/nethserver/cluster/state/redis.sock") cmd = ["redis-cli", "-s", socket] + list(args) result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) return result.stdout.strip() def _get_backup_destinations(config: dict) -> list: """ Read all configured backup destinations from NS8 Redis. Key pattern: cluster/backup_repository//parameters Returns list of dicts with repo config. """ raw = _redis_cmd(config, "KEYS", "cluster/backup_repository/*/parameters") keys = [k for k in raw.splitlines() if k] destinations = [] for key in keys: parts = key.split("/") repo_id = parts[2] if len(parts) > 2 else "unknown" url = _redis_cmd(config, "HGET", key, "url") repopath = _redis_cmd(config, "HGET", key, "path") password = _redis_cmd(config, "HGET", key, "password") backend = _redis_cmd(config, "HGET", key, "backend") destinations.append({ "repo_id": repo_id, "url": url, "path": repopath, "password": password, "backend": backend, }) return destinations def _check_restic_repo(dest: dict, config: dict) -> dict: """Run restic snapshots --last to verify repo is accessible.""" timeout = config.get("repo_check", {}).get("timeout", 60) repo_url = dest.get("url") or dest.get("path") or "" password = dest.get("password", "") extra_flags = config.get("repo_check", {}).get("restic_flags", "") if not repo_url: return {"repo_id": dest["repo_id"], "status": "UNCONFIGURED", "error": "No URL or path found"} cmd = ["restic", "-r", repo_url, "snapshots", "--last", "--no-cache"] if extra_flags: cmd += extra_flags.split() env = {"RESTIC_PASSWORD": password} if password else {} try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, env={**__import__("os").environ, **env} ) stderr = result.stderr.lower() if result.returncode == 0: return {"repo_id": dest["repo_id"], "status": "OK", "error": ""} elif "unable to open config" in stderr or "no such file" in stderr: return {"repo_id": dest["repo_id"], "status": "UNREACHABLE", "error": result.stderr.strip()} elif "locked" in stderr or "lock" in stderr: return {"repo_id": dest["repo_id"], "status": "LOCKED", "error": result.stderr.strip()} elif "error" in stderr: return {"repo_id": dest["repo_id"], "status": "ERROR", "error": result.stderr.strip()} else: return {"repo_id": dest["repo_id"], "status": "UNKNOWN", "error": result.stderr.strip()} except subprocess.TimeoutExpired: return {"repo_id": dest["repo_id"], "status": "UNREACHABLE", "error": f"Timeout after {timeout}s"} except Exception as e: return {"repo_id": dest["repo_id"], "status": "ERROR", "error": str(e)} def check_repositories(config: dict, correlation: dict) -> dict: """ Main entry point for repository check. Returns: { "destinations": [ {"repo_id": ..., "status": OK|UNREACHABLE|LOCKED|ERROR, "error": ...}, ... ], "any_unreachable": bool, "any_locked": bool, "all_ok": bool, } """ destinations = _get_backup_destinations(config) if not destinations: log.warning("No backup destinations found in Redis") return { "destinations": [], "any_unreachable": True, "any_locked": False, "all_ok": False, "note": "No backup destinations configured or readable from Redis" } results = [] for dest in destinations: log.info(f"Checking repository {dest['repo_id']} ({dest.get('backend', 'unknown')})...") res = _check_restic_repo(dest, config) log.info(f" -> {res['status']}: {res.get('error', '')}") results.append(res) any_unreachable = any(r["status"] == "UNREACHABLE" for r in results) any_locked = any(r["status"] == "LOCKED" for r in results) all_ok = all(r["status"] == "OK" for r in results) return { "destinations": results, "any_unreachable": any_unreachable, "any_locked": any_locked, "all_ok": all_ok, }