Files
ns8-backup-monitor/ns8_backup_monitor/repo_check.py
T

325 lines
13 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Verify reachability and health of NS8 backup repositories.
For each backup destination configured in the NS8 cluster, this module
invokes ``restic snapshots --last --no-cache`` to confirm that the
repository is accessible and readable.
Status values returned per destination
---------------------------------------
OK - Repository is reachable and returned a valid response.
UNREACHABLE - Network or mount error; cannot connect at all.
LOCKED - restic repository is locked (a previous backup crashed mid-run).
CORRUPTED - Repository exists but its pack integrity check fails.
ERROR - restic reported an error not covered by the above categories.
UNCONFIGURED - No URL or path found in Redis for this destination.
UNKNOWN - Non-zero exit with unrecognised stderr output.
NS8 multi-backend credential mapping
--------------------------------------
local / fs - path only, no credentials.
S3 / AWS - AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY from Redis hash.
B2 - B2_ACCOUNT_ID + B2_ACCOUNT_KEY from Redis hash.
SFTP - sftp: URL prefix; relies on SSH keys already deployed.
rclone - rclone: URL prefix; RCLONE_CONFIG env var from Redis hash.
Why ``runagent`` is NOT used here
-----------------------------------
restic is invoked directly rather than through ``runagent`` because repo_check
runs on the cluster leader and reads repository credentials from the cluster
Redis. The restic binary is available system-wide on NS8 nodes (not inside a
module container), so a direct subprocess call is both simpler and correct.
``runagent`` is used by other NS8 scripts to run commands inside rootless
Podman module containers - that indirection is not needed here.
Dependencies
------------
Only the standard library. ``restic`` must be present in PATH (installed with
NS8 or manually on the leader node).
"""
import logging
import os
import subprocess
from typing import Optional
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Redis helpers
# ---------------------------------------------------------------------------
# These are local copies of the helpers defined in correlator.py, kept here
# to make repo_check.py self-contained and avoid a circular import. If the
# Redis access layer is extracted into utils.py in the future, remove these.
def _redis_cmd(config: dict, *args) -> str:
"""Run a redis-cli command against the NS8 cluster Redis Unix socket."""
socket = config.get("redis", {}).get(
"socket", "/var/lib/nethserver/cluster/state/redis.sock"
)
cmd = ["redis-cli", "-s", socket] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
return result.stdout.strip()
def _redis_hgetall(config: dict, key: str) -> dict:
"""Return all fields of a Redis hash as a Python dict.
``redis-cli HGETALL`` outputs alternating field / value lines;
this function zips consecutive pairs into a dict.
"""
socket = config.get("redis", {}).get(
"socket", "/var/lib/nethserver/cluster/state/redis.sock"
)
cmd = ["redis-cli", "-s", socket, "HGETALL", key]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
lines = [l for l in result.stdout.strip().splitlines() if l]
return dict(zip(lines[::2], lines[1::2]))
# ---------------------------------------------------------------------------
# Destination discovery
# ---------------------------------------------------------------------------
def _get_backup_destinations(config: dict) -> list:
"""Read all configured backup repository destinations from NS8 Redis.
Key pattern: ``cluster/backup_repository/<repo_id>/parameters``
Each hash contains the URL/path, password, backend type, and any
cloud-provider credentials needed to invoke restic.
Args:
config: Parsed configuration dictionary.
Returns:
List of dicts, one per configured destination::
{
"repo_id" : str,
"url" : str (cloud URL or empty for local),
"path" : str (local/SFTP path or empty for cloud),
"password" : str (restic repository password),
"backend" : str ("s3", "b2", "sftp", "rclone", "local", ...),
"aws_access_key": str (S3 key ID or B2 account ID, normalised),
"aws_secret_key": str (S3 secret or B2 account key, normalised),
"rclone_config" : str (path to rclone config or empty),
"extra_env" : str (optional extra environment variables),
}
"""
raw = _redis_cmd(config, "KEYS", "cluster/backup_repository/*/parameters")
keys = [k for k in raw.splitlines() if k]
destinations = []
for key in keys:
# Key format: cluster/backup_repository/<repo_id>/parameters
parts = key.split("/")
repo_id = parts[2] if len(parts) > 2 else "unknown"
fields = _redis_hgetall(config, key)
destinations.append({
"repo_id": repo_id,
"url": fields.get("url", ""),
"path": fields.get("path", ""),
"password": fields.get("password", ""),
"backend": fields.get("backend", ""),
# S3 and B2 use different field names in NS8 Redis; normalise both
# to a single aws_access_key / aws_secret_key pair so _build_env()
# can handle them uniformly.
"aws_access_key": fields.get("aws_access_key_id",
fields.get("b2_account_id", "")),
"aws_secret_key": fields.get("aws_secret_access_key",
fields.get("b2_account_key", "")),
"rclone_config": fields.get("rclone_config", ""),
"extra_env": fields.get("extra_env", ""),
})
return destinations
# ---------------------------------------------------------------------------
# Environment builder
# ---------------------------------------------------------------------------
def _build_env(dest: dict) -> dict:
"""Build the environment dict that restic needs for a given backend.
Always starts from a copy of ``os.environ`` so system-level settings
(PATH, HOME, proxy variables, etc.) are inherited.
Args:
dest: A destination dict as returned by ``_get_backup_destinations()``.
Returns:
A dict suitable for the ``env`` argument of ``subprocess.run()``.
"""
env = dict(os.environ)
backend = dest.get("backend", "").lower()
# RESTIC_PASSWORD unlocks the repository for all backends.
if dest.get("password"):
env["RESTIC_PASSWORD"] = dest["password"]
# S3 / AWS backend.
if backend in ("s3", "aws") and dest.get("aws_access_key"):
env["AWS_ACCESS_KEY_ID"] = dest["aws_access_key"]
env["AWS_SECRET_ACCESS_KEY"] = dest["aws_secret_key"]
# Backblaze B2 backend.
elif backend in ("b2", "backblaze") and dest.get("aws_access_key"):
env["B2_ACCOUNT_ID"] = dest["aws_access_key"]
env["B2_ACCOUNT_KEY"] = dest["aws_secret_key"]
# rclone backend: point restic to the rclone config file.
elif backend == "rclone" and dest.get("rclone_config"):
env["RCLONE_CONFIG"] = dest["rclone_config"]
return env
# ---------------------------------------------------------------------------
# Single-repository health check
# ---------------------------------------------------------------------------
def _check_restic_repo(dest: dict, config: dict) -> dict:
"""Run ``restic snapshots --last --no-cache`` to verify one repository.
``--no-cache`` is intentional: the local cache may be stale or missing
on the cluster leader, and we always want a live check against the backend.
Args:
dest: Destination dict from ``_get_backup_destinations()``.
config: Parsed configuration dictionary (reads ``repo_check.*``).
Returns:
Dict with keys: ``repo_id``, ``status``, ``error``.
"""
timeout = config.get("repo_check", {}).get("timeout", 60)
extra_flags = config.get("repo_check", {}).get("restic_flags", "")
# Prefer ``url`` (cloud backends) over ``path`` (local / SFTP).
repo_url = dest.get("url") or dest.get("path") or ""
if not repo_url:
return {
"repo_id": dest["repo_id"],
"status": "UNCONFIGURED",
"error": "No URL or path found in Redis for this destination",
}
cmd = ["restic", "-r", repo_url, "snapshots", "--last", "--no-cache"]
if extra_flags:
# Allow operators to append flags (e.g. --cacert, --option) via the
# config file without modifying the source code.
cmd += extra_flags.split()
env = _build_env(dest)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
stderr = result.stderr.lower()
if result.returncode == 0:
return {"repo_id": dest["repo_id"], "status": "OK", "error": ""}
# Network / connectivity errors.
elif any(x in stderr for x in (
"unable to open config", "no such file", "does not exist",
"connection refused", "network", "timeout", "no route",
)):
return {"repo_id": dest["repo_id"], "status": "UNREACHABLE",
"error": result.stderr.strip()}
# Repository locked by a previous crashed backup run.
# Run ``restic unlock`` manually to recover.
elif "locked" in stderr or "lock" in stderr:
return {"repo_id": dest["repo_id"], "status": "LOCKED",
"error": result.stderr.strip()}
# Pack / data integrity error - repository may be corrupted.
elif "pack" in stderr and "error" in stderr:
return {"repo_id": dest["repo_id"], "status": "CORRUPTED",
"error": result.stderr.strip()}
# Generic restic error.
elif "error" in stderr or "fatal" in stderr:
return {"repo_id": dest["repo_id"], "status": "ERROR",
"error": result.stderr.strip()}
else:
return {"repo_id": dest["repo_id"], "status": "UNKNOWN",
"error": result.stderr.strip()}
except subprocess.TimeoutExpired:
return {"repo_id": dest["repo_id"], "status": "UNREACHABLE",
"error": f"restic timed out after {timeout}s"}
except FileNotFoundError:
return {"repo_id": dest["repo_id"], "status": "ERROR",
"error": "restic not found in PATH - install restic on the cluster leader"}
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def check_repositories(config: dict, correlation: dict) -> dict:
"""Check all configured NS8 backup repositories and return a status summary.
Called only when the correlator outcome is not SUCCESS, to avoid
unnecessary restic network calls on healthy clusters.
Args:
config: Parsed configuration dictionary.
correlation: Output dict from ``correlate_backup_status()`` (used for
context logging only; not read for repository selection).
Returns:
Dict with keys:
destinations : list of per-destination result dicts
(each has "repo_id", "status", "error")
any_ok : bool - True if at least one destination is reachable
all_ok : bool - True if all destinations are OK
summary : human-readable one-line summary string
"""
destinations = _get_backup_destinations(config)
if not destinations:
log.warning("No backup_repository keys found in Redis")
return {
"destinations": [],
"any_ok": False,
"all_ok": False,
"summary": "No backup repositories configured in NS8",
}
results = []
for dest in destinations:
log.info("Checking repository repo_id=%s url=%s",
dest["repo_id"], dest.get("url") or dest.get("path") or "(empty)")
result = _check_restic_repo(dest, config)
log.info(" -> %s", result["status"])
results.append(result)
ok_count = sum(1 for r in results if r["status"] == "OK")
all_ok = ok_count == len(results)
any_ok = ok_count > 0
summary = (
f"{ok_count}/{len(results)} repositories OK"
if not all_ok
else "All repositories reachable"
)
return {
"destinations": results,
"any_ok": any_ok,
"all_ok": all_ok,
"summary": summary,
}