Files
awoooi/scripts/security/high-value-config-change-gate.py

670 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
IwoooS 高價值配置變更 Gate。
本工具只讀取 git diff 或手動指定的檔案路徑,將變更分類到 C0/C1/C2/C3
配置控管範圍,並輸出 owner response、rollback、evidence 與驗證需求。
它不修改 workflow、不讀 secret value、不 SSH、不碰 runtime。
"""
from __future__ import annotations
import argparse
import fnmatch
import json
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
TIER_ORDER = {"C0": 0, "C1": 1, "C2": 2, "C3": 3}
PRIORITY_ORDER = {"P0": 0, "P1": 1, "P2": 2, "P3": 3}
REQUIRED_OWNER_FIELDS = [
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"followup_owner",
"rollback_owner",
"maintenance_window",
"validation_plan",
]
OWNER_FIELD_ALIASES = {
"owner_role_or_team": ["owner_role_or_team", "owner_role_team", "owner_role", "owner_team", "responsible_team"],
"decision": ["decision", "owner_decision", "scope_decision", "disposition"],
"decision_reason": ["decision_reason", "reason", "decision_summary", "owner_note", "rationale"],
"affected_scope": ["affected_scope", "affected_sources", "affected_repos", "host_scope", "target_scope"],
"redacted_evidence_refs": ["redacted_evidence_refs", "evidence_refs", "redacted_refs", "source_refs"],
"followup_owner": ["followup_owner", "next_owner", "followup_team", "resolution_owner"],
"rollback_owner": ["rollback_owner", "rollback_team"],
"maintenance_window": ["maintenance_window", "change_window"],
"validation_plan": ["validation_plan", "validation_checks", "verification_plan"],
}
REQUIRED_FALSE_FLAGS = [
"runtime_execution_authorized",
"host_write_authorized",
"secret_value_collection_allowed",
"workflow_modification_authorized",
"runner_change_authorized",
"refs_sync_authorized",
"force_push_authorized",
"active_scan_authorized",
"action_buttons_allowed",
]
@dataclass(frozen=True)
class ControlCategory:
category_id: str
label: str
priority: str
control_tier: str
required_gate: str
patterns: tuple[str, ...]
required_validation: tuple[str, ...]
CATEGORIES = [
ControlCategory(
category_id="nginx_public_gateway",
label="Nginx / reverse proxy / public route",
priority="P0",
control_tier="C0",
required_gate="public_gateway_owner_response_required",
patterns=(
"infra/ansible/roles/nginx/templates/*.j2",
"infra/ansible/playbooks/nginx-sync.yml",
"k8s/nginx/**",
"ops/nginx/**",
"docs/runbooks/disaster-recovery/DR-Nginx.md",
),
required_validation=(
"rendered_diff",
"nginx_t",
"affected_route_smoke",
"admin_route_smoke_if_affected",
"acme_path_smoke_if_affected",
"rollback_ref",
),
),
ControlCategory(
category_id="dns_tls_certbot",
label="DNS / TLS / certbot / certificate path",
priority="P0",
control_tier="C0",
required_gate="domain_tls_owner_response_required",
patterns=(
"docs/runbooks/REGISTRY-CERTBOT-188.md",
"docs/runbooks/**/*CERTBOT*.md",
"docs/runbooks/**/*TLS*.md",
"scripts/ops/**/*cert*",
"scripts/ops/**/*tls*",
"ops/**/*cert*",
"ops/**/*tls*",
"infra/**/*cert*",
"infra/**/*tls*",
"k8s/**/*tls*",
),
required_validation=(
"domain_inventory",
"certificate_path_check",
"renewal_window",
"acme_path_smoke",
"public_https_smoke",
"rollback_ref",
),
),
ControlCategory(
category_id="k8s_production_gitops",
label="K8s / ArgoCD / production manifests",
priority="P0",
control_tier="C0",
required_gate="gitops_owner_response_required",
patterns=(
"k8s/awoooi-prod/**",
"k8s/argocd/**",
"k8s/velero/**",
"k8s/monitoring/**",
),
required_validation=(
"gitops_diff",
"argocd_health_readback",
"sync_authorization_check",
"rollback_revision",
"post_deploy_health_if_executed",
),
),
ControlCategory(
category_id="secret_metadata",
label="Secret metadata / injection / redaction",
priority="P0",
control_tier="C0",
required_gate="secret_metadata_owner_response_required",
patterns=(
"k8s/**/*secret*",
"k8s/**/*Secret*",
".gitea/workflows/*.yml",
".gitea/workflows/*.yaml",
".github/workflows/*.yml",
".github/workflows/*.yaml",
"docs/runbooks/SECRETS-MANAGEMENT.md",
"docs/security/SECRETS_REFERENCE.md",
),
required_validation=(
"secret_name_parity",
"metadata_only_check",
"no_secret_value_check",
"rotation_owner",
"injection_readback_if_deployed",
),
),
ControlCategory(
category_id="gitea_workflow_runner_source_control",
label="Gitea workflow / runner / deploy key / webhook / branch protection",
priority="P0",
control_tier="C0",
required_gate="workflow_source_control_owner_response_required",
patterns=(
".gitea/workflows/**",
".github/workflows/**",
"ops/runner/**",
"scripts/setup-runner*.sh",
"scripts/**/*runner*",
"docs/security/SOURCE-CONTROL-*",
"docs/security/GITEA-*",
"docs/security/GITHUB-*",
),
required_validation=(
"workflow_diff",
"runner_label_owner",
"deploy_key_metadata_only",
"webhook_metadata_only",
"branch_protection_metadata",
"no_token_value_check",
),
),
ControlCategory(
category_id="public_admin_api_runtime_config",
label="Public / admin / API / frontend runtime config",
priority="P0",
control_tier="C0",
required_gate="public_runtime_config_owner_response_required",
patterns=(
"apps/web/next.config.*",
"apps/web/src/lib/config.*",
"apps/api/src/core/config.py",
"apps/api/src/api/v1/monitoring.py",
"apps/api/src/middleware/**",
"apps/web/src/middleware.*",
),
required_validation=(
"public_url_check",
"frontend_internal_ip_ban",
"cors_boundary_check",
"admin_auth_boundary_check",
"desktop_mobile_smoke_if_frontend",
),
),
ControlCategory(
category_id="backup_restore_credential",
label="Backup / restore / escrow / retention",
priority="P0",
control_tier="C0",
required_gate="backup_restore_owner_response_required",
patterns=(
"scripts/backup/**",
"k8s/velero/**",
"docs/runbooks/disaster-recovery/**",
"docs/runbooks/**/*RESTORE*.md",
"docs/runbooks/**/*BACKUP*.md",
),
required_validation=(
"credential_absence_check",
"restore_drill_gate",
"retention_policy",
"escrow_owner",
"rollback_ref",
),
),
ControlCategory(
category_id="agent_bounty_protocol_runtime",
label="agent-bounty-protocol runtime / MCP / A2A / treasury boundary",
priority="P0",
control_tier="C0",
required_gate="agent_bounty_owner_response_required",
patterns=(
"docs/security/AGENT-BOUNTY-IWOOOS-ONBOARDING-HANDOFF.md",
"docs/security/agent-bounty-iwooos-onboarding-handoff.snapshot.json",
"docs/schemas/agent_bounty_iwooos_onboarding_handoff_v1.schema.json",
"agent-bounty-protocol/**",
),
required_validation=(
"repo_owner_scope",
"runtime_gate_false",
"no_payout_or_treasury_execution",
"no_mcp_a2a_runtime_execution",
"redacted_evidence_refs_only",
),
),
ControlCategory(
category_id="monitoring_alerting_observability",
label="Prometheus / Alertmanager / Grafana / SigNoz / Sentry / Langfuse",
priority="P1",
control_tier="C1",
required_gate="monitoring_observability_owner_response_required",
patterns=(
"ops/monitoring/**",
"ops/alertmanager/**",
"ops/grafana/**",
"ops/signoz/**",
"ops/sentry-self-hosted/**",
"infra/langfuse/**",
"k8s/monitoring/**",
),
required_validation=(
"rule_diff",
"receiver_diff",
"reload_gate",
"failure_notification_policy",
"public_route_smoke_if_affected",
),
),
ControlCategory(
category_id="docker_compose_systemd_host_config",
label="Docker Compose / systemd / host service config",
priority="P1",
control_tier="C1",
required_gate="host_service_owner_response_required",
patterns=(
"docker-compose*.yml",
"docker-compose*.yaml",
"ops/**/docker-compose*.yml",
"ops/**/docker-compose*.yaml",
"scripts/reboot-recovery/**",
"scripts/**/*.service",
"ops/**/*.service",
),
required_validation=(
"port_conflict_check",
"volume_diff",
"env_name_diff",
"restart_window",
"rollback_owner",
),
),
ControlCategory(
category_id="ssh_firewall_network_access",
label="SSH / sudoers / known_hosts / firewall / WireGuard / NodePort",
priority="P1",
control_tier="C1",
required_gate="network_access_owner_response_required",
patterns=(
"infra/ansible/inventory/**",
"infra/ansible/**/*known_hosts*",
"infra/ansible/**/*ssh*",
"scripts/**/*ssh*",
"scripts/**/*known_hosts*",
"ops/**/*wireguard*",
"ops/**/*firewall*",
"k8s/**/*network*",
"k8s/**/*Network*",
),
required_validation=(
"target_whitelist",
"host_key_policy",
"ingress_egress_matrix",
"rollback_owner",
"maintenance_window",
),
),
ControlCategory(
category_id="ai_provider_model_routing",
label="AI provider / model routing / Ollama proxy / cost and privacy",
priority="P1",
control_tier="C1",
required_gate="ai_provider_owner_response_required",
patterns=(
"apps/api/src/services/ai_providers/**",
"apps/api/src/services/**/*model*",
"apps/api/src/services/**/*provider*",
"infra/ansible/roles/nginx/templates/110-ollama-proxy.conf.j2",
"docs/ai/**",
"docs/**/*Ollama*",
),
required_validation=(
"dry_run",
"benchmark",
"cost_review",
"privacy_review",
"fallback_order_check",
),
),
ControlCategory(
category_id="product_surface_runtime_routes",
label="AWOOOI / AwoooP / IwoooS / VibeWork / other product runtime routes",
priority="P2",
control_tier="C2",
required_gate="product_surface_owner_response_required",
patterns=(
"apps/web/src/app/**",
"apps/web/messages/*.json",
"docs/security/VIBEWORK-IWOOOS-ONBOARDING-HANDOFF.md",
"docs/security/vibework-iwooos-onboarding-handoff.snapshot.json",
),
required_validation=(
"product_boundary_check",
"i18n_traditional_chinese_check",
"no_internal_transcript_check",
"desktop_mobile_smoke_if_frontend",
),
),
ControlCategory(
category_id="security_evidence_tooling",
label="Security evidence / snapshot / guard tooling",
priority="P3",
control_tier="C3",
required_gate="security_evidence_owner_review_required",
patterns=(
"docs/security/**",
"docs/schemas/**",
"scripts/security/**",
"docs/LOGBOOK.md",
),
required_validation=(
"snapshot_parse",
"guard_smoke",
"doc_secret_sanity",
"no_runtime_gate_increase",
),
),
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def diff_files(root: Path, base: str, head: str) -> list[str]:
result = subprocess.run(
["git", "diff", "--name-only", f"{base}..{head}"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return sorted({line.strip() for line in result.stdout.splitlines() if line.strip()})
def working_tree_files(root: Path) -> list[str]:
commands = [
["git", "diff", "--name-only", "HEAD"],
["git", "diff", "--name-only", "--cached"],
["git", "ls-files", "--others", "--exclude-standard"],
]
changed: set[str] = set()
for command in commands:
result = subprocess.run(
command,
cwd=root,
check=True,
capture_output=True,
text=True,
)
changed.update(line.strip() for line in result.stdout.splitlines() if line.strip())
return sorted(changed)
def matches(pattern: str, path: str) -> bool:
if fnmatch.fnmatch(path, pattern):
return True
if "**" in pattern and fnmatch.fnmatch(path, pattern.replace("**/", "")):
return True
return False
def classify_path(path: str) -> list[ControlCategory]:
matched: list[ControlCategory] = []
for category in CATEGORIES:
if any(matches(pattern, path) for pattern in category.patterns):
matched.append(category)
return sorted(matched, key=lambda item: (TIER_ORDER[item.control_tier], PRIORITY_ORDER[item.priority], item.category_id))
def strongest_tier(categories: list[dict[str, Any]]) -> str | None:
if not categories:
return None
return min((item["control_tier"] for item in categories), key=lambda value: TIER_ORDER[value])
def strongest_priority(categories: list[dict[str, Any]]) -> str | None:
if not categories:
return None
return min((item["priority"] for item in categories), key=lambda value: PRIORITY_ORDER[value])
def load_evidence(path: Path | None) -> dict[str, Any] | None:
if path is None:
return None
return json.loads(path.read_text(encoding="utf-8"))
def has_owner_field(evidence: dict[str, Any], field: str) -> bool:
aliases = OWNER_FIELD_ALIASES.get(field, [field])
return any(bool(evidence.get(alias)) for alias in aliases)
def validate_evidence(evidence: dict[str, Any] | None) -> dict[str, Any]:
if evidence is None:
return {
"provided": False,
"complete": False,
"missing_owner_fields": REQUIRED_OWNER_FIELDS,
"invalid_false_flags": [],
"note": "未提供 owner response evidence本階段只能分類不得執行 runtime 變更。",
}
missing = [field for field in REQUIRED_OWNER_FIELDS if not has_owner_field(evidence, field)]
false_flags = evidence.get("false_flags", {})
invalid_false_flags = [
flag for flag in REQUIRED_FALSE_FLAGS if false_flags.get(flag) is not False
]
return {
"provided": True,
"complete": not missing and not invalid_false_flags,
"missing_owner_fields": missing,
"invalid_false_flags": invalid_false_flags,
"note": "owner response evidence 僅驗證欄位與 false flags不代表 runtime 授權。",
}
def category_inventory() -> list[dict[str, Any]]:
return [
{
"category_id": category.category_id,
"label": category.label,
"priority": category.priority,
"control_tier": category.control_tier,
"required_gate": category.required_gate,
"path_patterns": list(category.patterns),
"required_owner_fields": REQUIRED_OWNER_FIELDS,
"required_validation": list(category.required_validation),
}
for category in CATEGORIES
]
def build_report(
root: Path,
changed_files: list[str],
generated_at: str | None,
base: str | None,
head: str | None,
evidence: dict[str, Any] | None,
) -> dict[str, Any]:
report_time = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
path_reports: list[dict[str, Any]] = []
impacted_by_id: dict[str, dict[str, Any]] = {}
for path in sorted(set(changed_files)):
matched_categories = classify_path(path)
path_category_reports: list[dict[str, Any]] = []
for category in matched_categories:
category_report = {
"category_id": category.category_id,
"label": category.label,
"priority": category.priority,
"control_tier": category.control_tier,
"required_gate": category.required_gate,
"required_validation": list(category.required_validation),
}
path_category_reports.append(category_report)
impacted_by_id.setdefault(category.category_id, category_report)
path_reports.append(
{
"path": path,
"matched": bool(path_category_reports),
"strongest_tier": strongest_tier(path_category_reports),
"strongest_priority": strongest_priority(path_category_reports),
"categories": path_category_reports,
}
)
impacted_categories = sorted(
impacted_by_id.values(),
key=lambda item: (
TIER_ORDER[item["control_tier"]],
PRIORITY_ORDER[item["priority"]],
item["category_id"],
),
)
evidence_validation = validate_evidence(evidence)
c0_count = sum(1 for item in impacted_categories if item["control_tier"] == "C0")
c1_count = sum(1 for item in impacted_categories if item["control_tier"] == "C1")
return {
"schema_version": "high_value_config_change_gate_v1",
"generated_at": report_time,
"git_commit": git_short_sha(root),
"mode": "classification_only",
"diff": {
"base": base,
"head": head,
"changed_file_count": len(path_reports),
},
"execution_boundaries": {
"ssh_executed": False,
"host_write_executed": False,
"workflow_modified": False,
"runtime_deploy_executed": False,
"nginx_reload_executed": False,
"dns_tls_modified": False,
"secret_value_collected": False,
"active_scan_executed": False,
"runtime_gate_opened": False,
},
"required_false_flags": REQUIRED_FALSE_FLAGS,
"required_owner_fields": REQUIRED_OWNER_FIELDS,
"summary": {
"changed_file_count": len(path_reports),
"matched_high_value_file_count": sum(1 for item in path_reports if item["matched"]),
"impacted_category_count": len(impacted_categories),
"impacted_c0_category_count": c0_count,
"impacted_c1_category_count": c1_count,
"strongest_tier": strongest_tier(impacted_categories),
"strongest_priority": strongest_priority(impacted_categories),
"owner_evidence_provided": evidence_validation["provided"],
"owner_evidence_complete": evidence_validation["complete"],
"runtime_execution_authorized": False,
},
"impacted_categories": impacted_categories,
"changed_files": path_reports,
"owner_evidence_validation": evidence_validation,
"control_category_inventory": category_inventory(),
"next_steps": [
"若 impacted_c0_category_count > 0先建立 owner response packet不得直接 reload、deploy、sync 或修改主機。",
"owner response 必須包含 owner role / team、decision、decision reason、affected scope、redacted evidence refs、followup owner、rollback owner、maintenance window 與 validation plan。",
"若只有 C3 security evidence / tooling仍需跑 guard 與 doc secret sanity但不得藉此提高 runtime gate。",
],
}
def main() -> int:
parser = argparse.ArgumentParser(description="IwoooS 高價值配置變更 Gate")
parser.add_argument("--root", default=".", help="repo root")
parser.add_argument("--base", help="git diff base")
parser.add_argument("--head", default="HEAD", help="git diff head")
parser.add_argument("--changed-file", action="append", default=[], help="手動指定變更檔案,可重複")
parser.add_argument("--evidence", help="owner response evidence JSON只驗證欄位與 false flags")
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定報告時間,供 committed snapshot 使用")
parser.add_argument("--fail-on-c0", action="store_true", help="若有 C0 分類則回傳 1")
parser.add_argument(
"--fail-on-missing-evidence",
action="store_true",
help="若命中 C0/C1 但 owner evidence 不完整則回傳 1",
)
args = parser.parse_args()
root = Path(args.root).resolve()
changed_files = list(args.changed_file)
if args.base:
changed_files.extend(diff_files(root, args.base, args.head))
elif not changed_files:
changed_files.extend(working_tree_files(root))
evidence = load_evidence(Path(args.evidence)) if args.evidence else None
report = build_report(root, changed_files, args.generated_at, args.base, args.head, evidence)
payload = json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload + "\n", encoding="utf-8")
else:
print(payload)
summary = report["summary"]
print(
"HIGH_VALUE_CONFIG_CHANGE_GATE_OK "
f"changed_files={summary['changed_file_count']} "
f"matched={summary['matched_high_value_file_count']} "
f"categories={summary['impacted_category_count']} "
f"c0={summary['impacted_c0_category_count']} "
f"c1={summary['impacted_c1_category_count']}",
file=sys.stderr,
)
if args.fail_on_c0 and summary["impacted_c0_category_count"] > 0:
return 1
if (
args.fail_on_missing_evidence
and (summary["impacted_c0_category_count"] > 0 or summary["impacted_c1_category_count"] > 0)
and not summary["owner_evidence_complete"]
):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())