perf: correlator - use HGETALL to batch-read module status (fewer redis-cli calls)
This commit is contained in:
@@ -5,9 +5,9 @@ correlator.py - Reads NS8 cluster Redis state to determine backup outcome.
|
|||||||
For each backup plan/schedule, reads the per-module backup status and
|
For each backup plan/schedule, reads the per-module backup status and
|
||||||
produces a classified outcome: SUCCESS, PARTIAL, or REPO_FAILURE.
|
produces a classified outcome: SUCCESS, PARTIAL, or REPO_FAILURE.
|
||||||
|
|
||||||
NS8 Redis key pattern for backup status (community/dev research):
|
NS8 Redis key patterns:
|
||||||
cluster/backup/<backup_id>/status -> last overall status
|
cluster/backup/<backup_id>/status -> last overall plan status (hash)
|
||||||
module/<module_id>/backup/<backup_id>/status -> per-module status
|
module/<module_id>/backup/<backup_id>/status -> per-module status (hash)
|
||||||
|
|
||||||
Fields in status hash:
|
Fields in status hash:
|
||||||
result : success | error
|
result : success | error
|
||||||
@@ -16,10 +16,8 @@ Fields in status hash:
|
|||||||
errors : number of module errors (in plan status)
|
errors : number of module errors (in plan status)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
@@ -34,6 +32,15 @@ def _redis_cmd(config: dict, *args) -> str:
|
|||||||
return result.stdout.strip()
|
return result.stdout.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def _redis_hgetall(config: dict, key: str) -> dict:
|
||||||
|
"""Return all fields of a Redis hash as a dict in a single redis-cli call."""
|
||||||
|
socket = config.get("redis", {}).get("socket", "/var/lib/nethserver/cluster/state/redis.sock")
|
||||||
|
cmd = ["redis-cli", "-s", socket, "HGETALL", key]
|
||||||
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||||
|
lines = [l for l in result.stdout.strip().splitlines() if l]
|
||||||
|
return dict(zip(lines[::2], lines[1::2]))
|
||||||
|
|
||||||
|
|
||||||
def _get_recent_backup_ids(config: dict, window: int) -> list:
|
def _get_recent_backup_ids(config: dict, window: int) -> list:
|
||||||
"""Scan Redis for backup plan status keys updated within window seconds."""
|
"""Scan Redis for backup plan status keys updated within window seconds."""
|
||||||
raw = _redis_cmd(config, "KEYS", "cluster/backup/*/status")
|
raw = _redis_cmd(config, "KEYS", "cluster/backup/*/status")
|
||||||
@@ -42,13 +49,13 @@ def _get_recent_backup_ids(config: dict, window: int) -> list:
|
|||||||
recent = []
|
recent = []
|
||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
ts_raw = _redis_cmd(config, "HGET", key, "timestamp")
|
fields = _redis_hgetall(config, key)
|
||||||
|
ts_raw = fields.get("timestamp", "")
|
||||||
if not ts_raw:
|
if not ts_raw:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")).timestamp()
|
ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")).timestamp()
|
||||||
if (now - ts) <= window:
|
if (now - ts) <= window:
|
||||||
# Extract backup_id from key: cluster/backup/<id>/status
|
|
||||||
parts = key.split("/")
|
parts = key.split("/")
|
||||||
if len(parts) >= 3:
|
if len(parts) >= 3:
|
||||||
recent.append(parts[2])
|
recent.append(parts[2])
|
||||||
@@ -59,7 +66,7 @@ def _get_recent_backup_ids(config: dict, window: int) -> list:
|
|||||||
|
|
||||||
|
|
||||||
def _get_module_statuses(config: dict, backup_id: str) -> list:
|
def _get_module_statuses(config: dict, backup_id: str) -> list:
|
||||||
"""Get all per-module status entries for a given backup_id."""
|
"""Get all per-module status entries for a given backup_id via HGETALL."""
|
||||||
pattern = f"module/*/backup/{backup_id}/status"
|
pattern = f"module/*/backup/{backup_id}/status"
|
||||||
raw = _redis_cmd(config, "KEYS", pattern)
|
raw = _redis_cmd(config, "KEYS", pattern)
|
||||||
keys = [k for k in raw.splitlines() if k]
|
keys = [k for k in raw.splitlines() if k]
|
||||||
@@ -67,15 +74,16 @@ def _get_module_statuses(config: dict, backup_id: str) -> list:
|
|||||||
|
|
||||||
for key in keys:
|
for key in keys:
|
||||||
module_id = key.split("/")[1]
|
module_id = key.split("/")[1]
|
||||||
result = _redis_cmd(config, "HGET", key, "result")
|
fields = _redis_hgetall(config, key)
|
||||||
error = _redis_cmd(config, "HGET", key, "error")
|
if not fields:
|
||||||
timestamp = _redis_cmd(config, "HGET", key, "timestamp")
|
log.debug(f"Empty or missing status hash for {key}")
|
||||||
|
continue
|
||||||
statuses.append({
|
statuses.append({
|
||||||
"module_id": module_id,
|
"module_id": module_id,
|
||||||
"backup_id": backup_id,
|
"backup_id": backup_id,
|
||||||
"result": result,
|
"result": fields.get("result", "unknown"),
|
||||||
"error": error,
|
"error": fields.get("error", ""),
|
||||||
"timestamp": timestamp,
|
"timestamp": fields.get("timestamp", ""),
|
||||||
})
|
})
|
||||||
|
|
||||||
return statuses
|
return statuses
|
||||||
@@ -85,18 +93,16 @@ def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) ->
|
|||||||
"""
|
"""
|
||||||
Main correlator entry point.
|
Main correlator entry point.
|
||||||
|
|
||||||
Returns a dict:
|
Returns:
|
||||||
{
|
{
|
||||||
"outcome": "SUCCESS" | "PARTIAL" | "REPO_FAILURE",
|
"outcome" : "SUCCESS" | "PARTIAL" | "REPO_FAILURE",
|
||||||
"backup_ids": [...],
|
"backup_ids" : [...],
|
||||||
"modules": [
|
"modules" : [{module_id, backup_id, result, error, timestamp}, ...],
|
||||||
{"module_id": ..., "backup_id": ..., "result": ..., "error": ..., "timestamp": ...},
|
"failed_modules" : [...],
|
||||||
...
|
"total" : int,
|
||||||
],
|
"failed" : int,
|
||||||
"failed_modules": [...],
|
"succeeded" : int,
|
||||||
"total": int,
|
"note" : str # optional
|
||||||
"failed": int,
|
|
||||||
"succeeded": int,
|
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
window = config.get("correlator", {}).get("recent_window", 3600)
|
window = config.get("correlator", {}).get("recent_window", 3600)
|
||||||
@@ -120,7 +126,9 @@ def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) ->
|
|||||||
|
|
||||||
all_modules = []
|
all_modules = []
|
||||||
for bid in backup_ids:
|
for bid in backup_ids:
|
||||||
all_modules.extend(_get_module_statuses(config, bid))
|
modules = _get_module_statuses(config, bid)
|
||||||
|
log.info(f"backup_id={bid}: found {len(modules)} module status entries")
|
||||||
|
all_modules.extend(modules)
|
||||||
|
|
||||||
total = len(all_modules)
|
total = len(all_modules)
|
||||||
failed_modules = [m for m in all_modules if m["result"] != "success"]
|
failed_modules = [m for m in all_modules if m["result"] != "success"]
|
||||||
@@ -133,6 +141,8 @@ def correlate_backup_status(config: dict, backup_ids: Optional[list] = None) ->
|
|||||||
else:
|
else:
|
||||||
outcome = "PARTIAL"
|
outcome = "PARTIAL"
|
||||||
|
|
||||||
|
log.info(f"Correlation result: outcome={outcome}, total={total}, succeeded={succeeded}, failed={len(failed_modules)}")
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"outcome": outcome,
|
"outcome": outcome,
|
||||||
"backup_ids": backup_ids,
|
"backup_ids": backup_ids,
|
||||||
|
|||||||
Reference in New Issue
Block a user