Files
ns8-backup-monitor/ns8_backup_monitor/repo_check.py
T

321 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Verify reachability and health of NS8 backup repositories.
For each backup destination configured in the NS8 cluster, this module
attempts a ``restic snapshots --last --no-cache`` command to confirm that
the repository is accessible and readable.
Status values returned per destination
---------------------------------------
OK Repository is reachable and returned a valid response.
UNREACHABLE Network/mount error — cannot connect at all.
LOCKED restic repo is locked (a previous backup crashed mid-run).
CORRUPTED Repository exists but its pack integrity check fails.
ERROR restic reported an error not covered by the above categories.
UNCONFIGURED No URL or path found in the Redis hash for this destination.
UNKNOWN Non-zero exit with unrecognised stderr output.
NS8 multi-backend credential mapping
--------------------------------------
local / fs path only, no credentials.
S3 / AWS aws_access_key_id + aws_secret_access_key from Redis hash.
B2 b2_account_id + b2_account_key from Redis hash.
SFTP URL with sftp: prefix; relies on SSH keys already in place.
rclone rclone: prefix; RCLONE_CONFIG env var set from Redis hash.
Dependencies
------------
Only the standard library. ``restic`` must be in PATH (installed on NS8 nodes).
"""
import logging
import os
import subprocess
from typing import Optional
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Redis helpers (local copies)
# ---------------------------------------------------------------------------
# Duplicated from correlator.py to keep repo_check.py self-contained and
# avoid a circular import. If the Redis access layer is ever extracted into
# a shared helper, these can be removed.
def _redis_cmd(config: dict, *args) -> str:
"""Run a redis-cli command against the NS8 cluster Redis Unix socket."""
socket = config.get("redis", {}).get(
"socket", "/var/lib/nethserver/cluster/state/redis.sock"
)
cmd = ["redis-cli", "-s", socket] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
return result.stdout.strip()
def _redis_hgetall(config: dict, key: str) -> dict:
"""Return all fields of a Redis hash as a Python dict.
``redis-cli HGETALL`` outputs alternating field / value lines;
this function zips consecutive pairs into a dict.
"""
socket = config.get("redis", {}).get(
"socket", "/var/lib/nethserver/cluster/state/redis.sock"
)
cmd = ["redis-cli", "-s", socket, "HGETALL", key]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
lines = [l for l in result.stdout.strip().splitlines() if l]
# redis-cli HGETALL returns alternating key/value lines.
return dict(zip(lines[::2], lines[1::2]))
# ---------------------------------------------------------------------------
# Destination discovery
# ---------------------------------------------------------------------------
def _get_backup_destinations(config: dict) -> list:
"""Read all configured backup repository destinations from NS8 Redis.
Key pattern: ``cluster/backup_repository/<repo_id>/parameters``
Each hash contains the URL/path, password, backend type, and any
cloud-provider credentials needed to invoke restic.
Args:
config: Parsed configuration dictionary.
Returns:
List of dicts, one per configured destination:
{
"repo_id" : str,
"url" : str (cloud URL or empty),
"path" : str (local/SFTP path or empty),
"password" : str (restic repo password),
"backend" : str ("s3", "b2", "sftp", "rclone", "local", ...),
"aws_access_key": str (S3 key ID or B2 account ID),
"aws_secret_key": str (S3 secret or B2 account key),
"rclone_config" : str (path or inline rclone config),
"extra_env" : str (optional additional environment variables),
}
"""
raw = _redis_cmd(config, "KEYS", "cluster/backup_repository/*/parameters")
keys = [k for k in raw.splitlines() if k]
destinations = []
for key in keys:
# Key format: cluster/backup_repository/<repo_id>/parameters
parts = key.split("/")
repo_id = parts[2] if len(parts) > 2 else "unknown"
fields = _redis_hgetall(config, key)
destinations.append({
"repo_id": repo_id,
"url": fields.get("url", ""),
"path": fields.get("path", ""),
"password": fields.get("password", ""),
"backend": fields.get("backend", ""),
# S3 and B2 use different field names in NS8 Redis;
# normalise both to a single aws_access_key / aws_secret_key pair.
"aws_access_key": fields.get("aws_access_key_id",
fields.get("b2_account_id", "")),
"aws_secret_key": fields.get("aws_secret_access_key",
fields.get("b2_account_key", "")),
# rclone and miscellaneous extras
"rclone_config": fields.get("rclone_config", ""),
"extra_env": fields.get("extra_env", ""),
})
return destinations
# ---------------------------------------------------------------------------
# Environment builder
# ---------------------------------------------------------------------------
def _build_env(dest: dict) -> dict:
"""Build the environment dict that restic needs based on the backend type.
Always starts from a copy of ``os.environ`` so that system-level settings
(PATH, HOME, proxy variables, etc.) are preserved.
Args:
dest: A destination dict as returned by ``_get_backup_destinations()``.
Returns:
A dict suitable for passing as the ``env`` argument to subprocess.run().
"""
env = dict(os.environ)
backend = dest.get("backend", "").lower()
# RESTIC_PASSWORD is used by all backends to unlock the repository.
if dest.get("password"):
env["RESTIC_PASSWORD"] = dest["password"]
# S3 / AWS backend credentials.
if backend in ("s3", "aws") and dest.get("aws_access_key"):
env["AWS_ACCESS_KEY_ID"] = dest["aws_access_key"]
env["AWS_SECRET_ACCESS_KEY"] = dest["aws_secret_key"]
# Backblaze B2 backend credentials.
elif backend in ("b2", "backblaze") and dest.get("aws_access_key"):
env["B2_ACCOUNT_ID"] = dest["aws_access_key"]
env["B2_ACCOUNT_KEY"] = dest["aws_secret_key"]
# rclone backend: point restic to the rclone config file.
elif backend == "rclone" and dest.get("rclone_config"):
env["RCLONE_CONFIG"] = dest["rclone_config"]
return env
# ---------------------------------------------------------------------------
# Single-repository check
# ---------------------------------------------------------------------------
def _check_restic_repo(dest: dict, config: dict) -> dict:
"""Run ``restic snapshots --last --no-cache`` to verify one repository.
``--no-cache`` is intentional: the cache may be stale or missing on the
host, and we want a live check against the actual backend.
Args:
dest: Destination dict from ``_get_backup_destinations()``.
config: Parsed configuration dictionary (reads ``repo_check.*``).
Returns:
Dict: {"repo_id": str, "status": str, "error": str}
"""
timeout = config.get("repo_check", {}).get("timeout", 60)
extra_flags = config.get("repo_check", {}).get("restic_flags", "")
# Prefer ``url`` (cloud backends) over ``path`` (local/SFTP).
repo_url = dest.get("url") or dest.get("path") or ""
if not repo_url:
return {
"repo_id": dest["repo_id"],
"status": "UNCONFIGURED",
"error": "No URL or path found in Redis for this destination",
}
cmd = ["restic", "-r", repo_url, "snapshots", "--last", "--no-cache"]
if extra_flags:
# Allow the operator to append flags like --cacert or --option
# via the config without modifying the code.
cmd += extra_flags.split()
env = _build_env(dest)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env=env
)
stderr = result.stderr.lower()
# ---------------------------------------------------------------------------
# Classify the restic exit code and stderr content
# ---------------------------------------------------------------------------
if result.returncode == 0:
return {"repo_id": dest["repo_id"], "status": "OK", "error": ""}
# Network / connectivity errors.
elif any(x in stderr for x in (
"unable to open config", "no such file", "does not exist",
"connection refused", "network", "timeout", "no route"
)):
return {"repo_id": dest["repo_id"], "status": "UNREACHABLE",
"error": result.stderr.strip()}
# Repository locked by a previous crashed backup run.
elif "locked" in stderr or "lock" in stderr:
return {"repo_id": dest["repo_id"], "status": "LOCKED",
"error": result.stderr.strip()}
# Pack / data integrity error — repository may be corrupted.
elif "pack" in stderr and "error" in stderr:
return {"repo_id": dest["repo_id"], "status": "CORRUPTED",
"error": result.stderr.strip()}
# Generic restic error not covered by the specific cases above.
elif "error" in stderr or "fatal" in stderr:
return {"repo_id": dest["repo_id"], "status": "ERROR",
"error": result.stderr.strip()}
# Non-zero exit with unrecognised output.
else:
return {"repo_id": dest["repo_id"], "status": "UNKNOWN",
"error": result.stderr.strip()}
except subprocess.TimeoutExpired:
return {"repo_id": dest["repo_id"], "status": "UNREACHABLE",
"error": f"Timeout after {timeout}s"}
except FileNotFoundError:
# restic binary is not installed or not in PATH.
return {"repo_id": dest["repo_id"], "status": "ERROR",
"error": "restic binary not found in PATH"}
except Exception as e:
return {"repo_id": dest["repo_id"], "status": "ERROR", "error": str(e)}
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def check_repositories(config: dict, correlation: dict) -> dict:
"""Check all configured NS8 backup destinations and return a status summary.
Called by the pipeline only when the correlator outcome is not SUCCESS,
so restic network calls are avoided on clean backup runs.
Args:
config: Parsed configuration dictionary.
correlation: Correlation result dict (from correlator.py); currently
unused but kept for future filtering by plan/module.
Returns:
A dict with the following keys:
destinations : list of per-repo result dicts (see _check_restic_repo)
any_unreachable : bool — True if at least one repo is UNREACHABLE
any_locked : bool — True if at least one repo is LOCKED
all_ok : bool — True only if every repo returned OK
note : optional str present when no destinations are configured
"""
destinations = _get_backup_destinations(config)
if not destinations:
log.warning("No backup destinations found in Redis")
return {
"destinations": [],
"any_unreachable": True,
"any_locked": False,
"all_ok": False,
"note": "No backup destinations configured or readable from Redis",
}
results = []
for dest in destinations:
log.info(
f"Checking repository {dest['repo_id']} "
f"(backend={dest.get('backend', 'unknown')})..."
)
res = _check_restic_repo(dest, config)
log.info(f" -> {res['status']}: {res.get('error', '')}")
results.append(res)
# ---------------------------------------------------------------------------
# Aggregate flags for quick consumption by the notifier
# ---------------------------------------------------------------------------
any_unreachable = any(r["status"] == "UNREACHABLE" for r in results)
any_locked = any(r["status"] == "LOCKED" for r in results)
all_ok = all(r["status"] == "OK" for r in results)
return {
"destinations": results,
"any_unreachable": any_unreachable,
"any_locked": any_locked,
"all_ok": all_ok,
}