330 lines
13 KiB
Python
330 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
IwoooS Monitoring / Alerting / Observability owner request draft 產生器。
|
|
|
|
本工具讀取 monitoring-alerting-observability repo-only 清冊,將 60 個 surface
|
|
轉成人工送件前 request draft。它不連 live Prometheus、不 reload Alertmanager、
|
|
不改 Grafana / SigNoz / Sentry / Langfuse、不發 Telegram、不建立 silence。
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
TAIPEI = timezone(timedelta(hours=8))
|
|
|
|
REQUEST_FIELDS = [
|
|
"request_id",
|
|
"surface_id",
|
|
"label",
|
|
"expected_scope",
|
|
"config_kind",
|
|
"observability_scope",
|
|
"control_tier",
|
|
"repo_source_path",
|
|
"repo_sha256",
|
|
"owner_role_or_team",
|
|
"decision",
|
|
"decision_reason",
|
|
"affected_scope",
|
|
"redacted_evidence_refs",
|
|
"live_config_hash_ref",
|
|
"reload_owner",
|
|
"receiver_owner",
|
|
"route_smoke_plan",
|
|
"maintenance_window",
|
|
"rollback_owner",
|
|
"validation_plan",
|
|
"noise_budget_owner",
|
|
"followup_owner",
|
|
"not_approval",
|
|
]
|
|
|
|
REQUIRED_OWNER_FIELDS = [
|
|
"owner_role_or_team",
|
|
"decision",
|
|
"decision_reason",
|
|
"affected_scope",
|
|
"redacted_evidence_refs",
|
|
"live_config_hash_ref",
|
|
"reload_owner",
|
|
"receiver_owner",
|
|
"route_smoke_plan",
|
|
"maintenance_window",
|
|
"rollback_owner",
|
|
"validation_plan",
|
|
"noise_budget_owner",
|
|
"followup_owner",
|
|
]
|
|
|
|
BLOCKED_ACTIONS = [
|
|
"prometheus_reload",
|
|
"alertmanager_reload",
|
|
"grafana_dashboard_apply",
|
|
"signoz_rule_apply",
|
|
"sentry_deploy",
|
|
"langfuse_config_change",
|
|
"otel_collector_reload",
|
|
"receiver_route_change",
|
|
"silence_policy_change",
|
|
"telegram_send",
|
|
"notification_route_change",
|
|
"webhook_receiver_change",
|
|
"remote_write_change",
|
|
"exporter_deploy",
|
|
"live_alert_fire",
|
|
"alert_chain_smoke",
|
|
"ssh_read",
|
|
"ssh_write",
|
|
"kubectl_action",
|
|
"secret_value_collection",
|
|
"host_write",
|
|
"active_scan",
|
|
"production_write",
|
|
"runtime_gate_open",
|
|
]
|
|
|
|
|
|
def git_short_sha(root: Path) -> str:
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "--short", "HEAD"],
|
|
cwd=root,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def request_for(surface: dict[str, Any], write_capable_ids: set[str]) -> dict[str, Any]:
|
|
surface_id = surface["surface_id"]
|
|
write_capable = surface_id in write_capable_ids
|
|
return {
|
|
"request_id": f"monitoring_owner_request:{surface_id}",
|
|
"status": "draft_not_dispatched",
|
|
"surface_id": surface_id,
|
|
"label": surface["label"],
|
|
"expected_scope": surface["expected_scope"],
|
|
"config_kind": surface["config_kind"],
|
|
"observability_scope": surface["observability_scope"],
|
|
"control_tier": surface["control_tier"],
|
|
"repo_source_path": surface["source_path"],
|
|
"repo_sha256": surface["sha256"],
|
|
"source_line_count": surface["line_count"],
|
|
"requires_live_evidence": surface["requires_live_evidence"],
|
|
"write_capable_surface": write_capable,
|
|
"source_inventory_ref": "docs/security/monitoring-alerting-observability-inventory.snapshot.json",
|
|
"request_fields": REQUEST_FIELDS,
|
|
"required_owner_fields": REQUIRED_OWNER_FIELDS,
|
|
"blocked_actions": BLOCKED_ACTIONS,
|
|
"owner_role_or_team": "pending_owner_role_or_team",
|
|
"decision": "pending_owner_decision",
|
|
"decision_reason": "pending_decision_reason",
|
|
"affected_scope": "pending_affected_scope",
|
|
"redacted_evidence_refs": [],
|
|
"live_config_hash_ref": None,
|
|
"reload_owner": "pending_reload_owner",
|
|
"receiver_owner": "pending_receiver_owner",
|
|
"route_smoke_plan": "pending_route_smoke_plan",
|
|
"maintenance_window": "pending_maintenance_window",
|
|
"rollback_owner": "pending_rollback_owner",
|
|
"validation_plan": "pending_validation_plan",
|
|
"noise_budget_owner": "pending_noise_budget_owner",
|
|
"followup_owner": "pending_followup_owner",
|
|
"not_approval": True,
|
|
"request_sent": False,
|
|
"recipient_confirmed": False,
|
|
"owner_response_received": False,
|
|
"owner_response_accepted": False,
|
|
"live_evidence_received": False,
|
|
"reload_owner_accepted": False,
|
|
"receiver_owner_accepted": False,
|
|
"route_smoke_accepted": False,
|
|
"maintenance_window_accepted": False,
|
|
"rollback_owner_accepted": False,
|
|
"validation_plan_accepted": False,
|
|
"prometheus_reload_authorized": False,
|
|
"alertmanager_reload_authorized": False,
|
|
"grafana_dashboard_apply_authorized": False,
|
|
"signoz_rule_apply_authorized": False,
|
|
"sentry_deploy_authorized": False,
|
|
"langfuse_config_change_authorized": False,
|
|
"otel_collector_reload_authorized": False,
|
|
"receiver_route_change_authorized": False,
|
|
"silence_policy_change_authorized": False,
|
|
"telegram_send_authorized": False,
|
|
"notification_route_change_authorized": False,
|
|
"webhook_receiver_change_authorized": False,
|
|
"remote_write_change_authorized": False,
|
|
"exporter_deploy_authorized": False,
|
|
"live_alert_fire_authorized": False,
|
|
"alert_chain_smoke_authorized": False,
|
|
"ssh_read_authorized": False,
|
|
"ssh_write_authorized": False,
|
|
"kubectl_action_authorized": False,
|
|
"secret_value_collection_allowed": False,
|
|
"host_write_authorized": False,
|
|
"active_scan_authorized": False,
|
|
"production_write_authorized": False,
|
|
"runtime_gate": False,
|
|
"action_buttons_allowed": False,
|
|
}
|
|
|
|
|
|
def build_report(root: Path, inventory: dict[str, Any], generated_at: str | None) -> dict[str, Any]:
|
|
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
|
|
write_capable_ids = {surface["surface_id"] for surface in inventory["write_capable_surfaces"]}
|
|
requests = [request_for(surface, write_capable_ids) for surface in inventory["observability_surfaces"]]
|
|
write_capable_requests = [item for item in requests if item["write_capable_surface"]]
|
|
live_evidence_requests = [item for item in requests if item["requires_live_evidence"]]
|
|
|
|
return {
|
|
"schema_version": "monitoring_owner_request_draft_v1",
|
|
"generated_at": report_time,
|
|
"git_commit": git_short_sha(root),
|
|
"source_inventory_schema_version": inventory.get("schema_version"),
|
|
"source_inventory_status": inventory.get("status"),
|
|
"status": "owner_request_draft_ready_not_dispatched",
|
|
"summary": {
|
|
"request_draft_count": len(requests),
|
|
"write_capable_request_draft_count": len(write_capable_requests),
|
|
"live_evidence_required_request_count": len(live_evidence_requests),
|
|
"request_field_count": len(REQUEST_FIELDS),
|
|
"required_owner_field_count": len(REQUIRED_OWNER_FIELDS),
|
|
"blocked_action_count": len(BLOCKED_ACTIONS),
|
|
"request_sent_count": 0,
|
|
"recipient_confirmed_count": 0,
|
|
"owner_response_received_count": 0,
|
|
"owner_response_accepted_count": 0,
|
|
"live_evidence_received_count": 0,
|
|
"reload_owner_accepted_count": 0,
|
|
"receiver_owner_accepted_count": 0,
|
|
"route_smoke_accepted_count": 0,
|
|
"maintenance_window_accepted_count": 0,
|
|
"rollback_owner_accepted_count": 0,
|
|
"validation_plan_accepted_count": 0,
|
|
"prometheus_reload_authorized_count": 0,
|
|
"alertmanager_reload_authorized_count": 0,
|
|
"grafana_dashboard_apply_authorized_count": 0,
|
|
"signoz_rule_apply_authorized_count": 0,
|
|
"sentry_deploy_authorized_count": 0,
|
|
"langfuse_config_change_authorized_count": 0,
|
|
"otel_collector_reload_authorized_count": 0,
|
|
"receiver_route_change_authorized_count": 0,
|
|
"silence_policy_change_authorized_count": 0,
|
|
"telegram_send_authorized_count": 0,
|
|
"notification_route_change_authorized_count": 0,
|
|
"webhook_receiver_change_authorized_count": 0,
|
|
"remote_write_change_authorized_count": 0,
|
|
"exporter_deploy_authorized_count": 0,
|
|
"live_alert_fire_authorized_count": 0,
|
|
"alert_chain_smoke_authorized_count": 0,
|
|
"ssh_read_authorized_count": 0,
|
|
"ssh_write_authorized_count": 0,
|
|
"kubectl_action_authorized_count": 0,
|
|
"secret_value_collection_allowed_count": 0,
|
|
"host_write_authorized_count": 0,
|
|
"active_scan_authorized_count": 0,
|
|
"production_write_authorized_count": 0,
|
|
"runtime_gate_count": 0,
|
|
"action_button_count": 0,
|
|
},
|
|
"execution_boundaries": {
|
|
"request_sent": False,
|
|
"recipient_confirmed": False,
|
|
"owner_response_received": False,
|
|
"owner_response_accepted": False,
|
|
"live_evidence_received": False,
|
|
"prometheus_reload_authorized": False,
|
|
"alertmanager_reload_authorized": False,
|
|
"grafana_dashboard_apply_authorized": False,
|
|
"signoz_rule_apply_authorized": False,
|
|
"sentry_deploy_authorized": False,
|
|
"langfuse_config_change_authorized": False,
|
|
"otel_collector_reload_authorized": False,
|
|
"receiver_route_change_authorized": False,
|
|
"silence_policy_change_authorized": False,
|
|
"telegram_send_authorized": False,
|
|
"notification_route_change_authorized": False,
|
|
"webhook_receiver_change_authorized": False,
|
|
"remote_write_change_authorized": False,
|
|
"exporter_deploy_authorized": False,
|
|
"live_alert_fire_authorized": False,
|
|
"alert_chain_smoke_authorized": False,
|
|
"ssh_read_authorized": False,
|
|
"ssh_write_authorized": False,
|
|
"kubectl_action_authorized": False,
|
|
"secret_value_collection_allowed": False,
|
|
"host_write_authorized": False,
|
|
"active_scan_authorized": False,
|
|
"production_write_authorized": False,
|
|
"runtime_execution_authorized": False,
|
|
"action_buttons_allowed": False,
|
|
"not_authorization": True,
|
|
},
|
|
"request_fields": REQUEST_FIELDS,
|
|
"required_owner_fields": REQUIRED_OWNER_FIELDS,
|
|
"blocked_actions": BLOCKED_ACTIONS,
|
|
"request_drafts": requests,
|
|
"next_steps": [
|
|
"人工送件前確認 monitoring / alerting / notification owner role 與回覆窗口。",
|
|
"owner 只能提供脫敏 live config hash、receiver diff、route smoke plan、noise budget、maintenance window 與 rollback owner。",
|
|
"收到回覆後先做欄位完整性、敏感 payload 隔離、reload / receiver / route smoke gate 檢查,不得直接 reload、send Telegram 或 fire alert。",
|
|
],
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="IwoooS monitoring owner request draft 產生器")
|
|
parser.add_argument("--root", default=".", help="repo root")
|
|
parser.add_argument(
|
|
"--inventory-report",
|
|
default="docs/security/monitoring-alerting-observability-inventory.snapshot.json",
|
|
help="monitoring-alerting-observability-inventory.py 輸出的 JSON",
|
|
)
|
|
parser.add_argument("--output", help="寫出 JSON 報告")
|
|
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
|
|
args = parser.parse_args()
|
|
|
|
root = Path(args.root).resolve()
|
|
inventory = load_json(root / args.inventory_report)
|
|
report = build_report(root, inventory, args.generated_at)
|
|
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
|
|
|
|
if args.output:
|
|
output = Path(args.output)
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
output.write_text(payload + "\n", encoding="utf-8")
|
|
else:
|
|
print(payload)
|
|
|
|
summary = report["summary"]
|
|
print(
|
|
"MONITORING_OWNER_REQUEST_DRAFT_OK "
|
|
f"drafts={summary['request_draft_count']} "
|
|
f"write_capable={summary['write_capable_request_draft_count']} "
|
|
f"fields={summary['required_owner_field_count']} "
|
|
f"sent={summary['request_sent_count']} "
|
|
f"runtime_gate={summary['runtime_gate_count']}",
|
|
file=sys.stderr,
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|