diff --git a/apps/api/src/services/decision_fusion_adapter.py b/apps/api/src/services/decision_fusion_adapter.py index d8ac69c5..6628b614 100644 --- a/apps/api/src/services/decision_fusion_adapter.py +++ b/apps/api/src/services/decision_fusion_adapter.py @@ -254,7 +254,7 @@ class DecisionFusionAdapter: "只輸出 CONFIDENCE 和 ACTION 兩行,不要其他解釋。" ) - ollama_url = getattr(self._settings, "OLLAMA_URL", "http://34.143.170.20:11434") # 2026-05-03 ogt: ADR-110 GCP-A Primary + ollama_url = getattr(self._settings, "OLLAMA_URL", "http://192.168.0.111:11434") # 2026-05-04 ogt: ADR-110 修正 — 111 primary try: async with httpx.AsyncClient( @@ -392,6 +392,7 @@ class DecisionFusionAdapter: snapshot: dict[str, Any] = {} success_count = 0 + no_data_count = 0 # Prometheus 正常但指標尚未建立(SLI recording rules 未生效) total_count = len(queries) if total_count == 0: @@ -413,22 +414,29 @@ class DecisionFusionAdapter: snapshot[metric_name] = round(value, 4) success_count += 1 else: - snapshot[metric_name] = None # 有回應但無資料 + # 2026-05-04 ogt: 指標尚未建立 ≠ MCP 失敗 + # SLI recording rules 初期可能無資料,給予 0.5 中性貢獻 + snapshot[metric_name] = "no_data" + no_data_count += 1 except Exception as exc: snapshot[metric_name] = f"error:{exc!s:.60}" except Exception as exc: logger.warning("fusion_mcp_prometheus_failed", event_id=event.id, error=str(exc)) return 0.5, {"error": str(exc)} - # 品質分數:成功取得資料的指標比例(映射到 [0.2, 0.9]) + # 2026-05-04 ogt: 品質分數修正 + # success=完整貢獻(1.0), no_data=半貢獻(0.5,指標未建立非 MCP 故障), error=0 + # 最終映射到 [0.2, 0.9] if total_count > 0: - ratio = success_count / total_count + weighted = success_count + 0.5 * no_data_count + ratio = weighted / total_count mcp_score = 0.2 + 0.7 * ratio else: mcp_score = 0.5 snapshot["_meta"] = { "success_count": success_count, + "no_data_count": no_data_count, "total_queries": total_count, "quality_score": round(mcp_score, 4), } diff --git a/apps/api/src/services/governance_dispatcher.py b/apps/api/src/services/governance_dispatcher.py index 0a59fda9..aa552b3c 100644 --- a/apps/api/src/services/governance_dispatcher.py +++ b/apps/api/src/services/governance_dispatcher.py @@ -16,15 +16,19 @@ Tier 3 鐵線(絕不觸碰): - 本模組透過 DecisionFusionAdapter(wrapper)間接使用這些能力 2026-05-03 ogt + Claude Sonnet 4.6(亞太): GovernanceDispatcher Wave 2E 實作 +2026-05-04 ogt + Claude Sonnet 4.6(亞太): skip 路徑無限迴圈修復 + - skip 決策後設 Redis 90min 冷卻,避免重複 LLM 呼叫 + - 超過 2 小時的 stale skip 事件標記 resolved=True(新事件若問題持續會重新產生) """ from __future__ import annotations import asyncio +from datetime import datetime, timezone from typing import Any import structlog -from sqlalchemy import select +from sqlalchemy import select, update from src.db.base import get_db_context from src.db.models import AiGovernanceEvent @@ -46,6 +50,14 @@ logger = structlog.get_logger(__name__) # TODO: 移到 settings,允許運維不重啟調整 poll 間隔 _DISPATCHER_INTERVAL_SEC: int = 30 +# Skip 冷卻時間(秒):skip 決策後 90 分鐘內不重新評估同一事件 +# 原因:skip = 信心度不足,短期內 playbook trust / MCP 指標不會驟變 +_SKIP_COOLDOWN_SEC: int = 5400 # 90 分鐘 + +# Stale 事件閾值(秒):超過此時間的 skip 事件直接標 resolved +# 原因:持久問題會由 governance_agent 重新產生新事件;舊事件繼續留著只是積壓 +_STALE_EVENT_SEC: int = 7200 # 2 小時 + # 每輪最多處理幾個事件(避免單輪阻塞過長) _MAX_EVENTS_PER_CYCLE: int = 10 @@ -59,6 +71,54 @@ _DISPATCHABLE_EVENT_TYPES: frozenset[str] = frozenset({ }) +# ============================================================================= +# Redis 冷卻 helpers(防止 skip 事件無限重評迴圈) +# ============================================================================= + +async def _is_skip_cooldown(event_id: str) -> bool: + """確認事件是否在 skip 冷卻期內(90 分鐘)。""" + try: + from src.core.redis_client import get_redis + redis = get_redis() + return bool(await redis.exists(f"governance:skip:{event_id}")) + except Exception: + return False + + +async def _set_skip_cooldown(event_id: str) -> None: + """設置 skip 冷卻期(90 分鐘),防止重複 LLM 呼叫。""" + try: + from src.core.redis_client import get_redis + redis = get_redis() + await redis.setex(f"governance:skip:{event_id}", _SKIP_COOLDOWN_SEC, "1") + except Exception as exc: + logger.warning("governance_skip_cooldown_set_failed", event_id=event_id, error=str(exc)) + + +async def _mark_event_resolved(event_id: str, reason: str) -> None: + """將 stale skip 事件標記為 resolved(持久問題會由 governance_agent 重新產生新事件)。 + + 對齊模型設計:resolved=True 由「下次計算時補填」, + dispatcher skip = 系統判斷當前無法自動修復,等同一次計算完成。 + """ + try: + from src.utils.timezone import now_taipei + async with get_db_context() as db: + await db.execute( + update(AiGovernanceEvent) + .where(AiGovernanceEvent.id == event_id) + .where(AiGovernanceEvent.resolved.is_(False)) + .values(resolved=True, resolved_at=now_taipei()) + ) + logger.info( + "governance_event_stale_resolved", + event_id=event_id, + reason=reason, + ) + except Exception as exc: + logger.warning("governance_event_resolve_failed", event_id=event_id, error=str(exc)) + + # ============================================================================= # 核心函數 # ============================================================================= @@ -75,6 +135,15 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None: event_id = event.id event_type = event.event_type + # Step 0: Redis skip 冷卻檢查(防止 skip 事件每 30s 重新做 LLM 呼叫) + if await _is_skip_cooldown(event_id): + logger.debug( + "governance_dispatch_skip_cooldown", + event_id=event_id, + event_type=event_type, + ) + return None + # Step 1: 檢查是否已有活躍 dispatch(冪等保護) existing = await get_active_for_event(event_id) if existing is not None: @@ -108,12 +177,31 @@ async def dispatch_governance_event(event: AiGovernanceEvent) -> str | None: # Step 3: 依 decision_path 決定要不要寫 dispatch if decision.decision_path == "skip": + # 2026-05-04 ogt: 修復無限迴圈根因 + # skip 決策後設 90min Redis 冷卻,避免每 30s 重新呼叫 LLM + # 超過 2h 的 stale 事件直接標 resolved(持久問題由 governance_agent 重新產生新事件) + await _set_skip_cooldown(event_id) + + triggered_at_aware = event.triggered_at + if triggered_at_aware is not None and triggered_at_aware.tzinfo is None: + triggered_at_aware = triggered_at_aware.replace(tzinfo=timezone.utc) + event_age_sec = ( + (datetime.now(timezone.utc) - triggered_at_aware).total_seconds() + if triggered_at_aware is not None else 0 + ) + logger.info( "governance_dispatch_path_skip", event_id=event_id, event_type=event_type, confidence=round(decision.confidence, 4), + event_age_sec=int(event_age_sec), + stale=event_age_sec > _STALE_EVENT_SEC, ) + + if event_age_sec > _STALE_EVENT_SEC: + await _mark_event_resolved(event_id, reason=f"skip_stale_{int(event_age_sec)}s") + return None # Step 4: 決定 executor_type 與 dispatch_status