diff --git a/apps/api/models.json b/apps/api/models.json index 200e2c93..5f6446e2 100644 --- a/apps/api/models.json +++ b/apps/api/models.json @@ -17,8 +17,8 @@ "endpoint": "http://192.168.0.111:11434", "api_path": "/api/generate", "models": { - "default": "deepseek-r1:14b", - "rca": "deepseek-r1:14b", + "default": "qwen2.5:7b-instruct", + "rca": "qwen2.5:7b-instruct", "summary": "gemma3:4b", "drift_summary": "qwen2.5:7b-instruct", "drift_intent": "qwen2.5:7b-instruct", diff --git a/apps/api/src/api/v1/drift.py b/apps/api/src/api/v1/drift.py index 63a242a8..e450620a 100644 --- a/apps/api/src/api/v1/drift.py +++ b/apps/api/src/api/v1/drift.py @@ -180,6 +180,7 @@ async def _analyze_and_notify(report: DriftReport) -> None: # 2026-04-24: 嘗試低風險自動採納 auto_adopted = False + auto_block_reason = "" try: adopt_svc = get_drift_adopt_service() auto_result = await adopt_svc.auto_adopt_if_safe(report) @@ -197,19 +198,28 @@ async def _analyze_and_notify(report: DriftReport) -> None: pr_url=auto_result.get("pr_url"), ) else: + auto_block_reason = auto_result.get("reason", "") or "auto adopt skipped" _logger.info( "drift_auto_adopt_skipped", report_id=report.report_id, - reason=auto_result.get("reason", ""), + reason=auto_block_reason, skipped=auto_result.get("skipped", True), ) except Exception as e: + auto_block_reason = f"auto adopt error: {str(e)[:120]}" _logger.warning("drift_auto_adopt_error", report_id=report.report_id, error=str(e)) if auto_adopted: # 自動採納成功,Telegram 通知已在 auto_adopt_if_safe 內發出,不再推送按鈕卡片 return + if auto_block_reason: + await _escalate_drift_auto_adopt_blocked( + report=report, + reason=auto_block_reason, + interpretation=interpretation, + ) + # ADR-075: drift_narrator_service 負責發送 TYPE-4D 卡片(含按鈕) # 舊的 send_text() 已移除,改由 narrate_and_notify() 統一處理 try: @@ -224,6 +234,25 @@ async def _analyze_and_notify(report: DriftReport) -> None: structlog.get_logger(__name__).error("drift_analyze_notify_failed", error=str(e)) +async def _escalate_drift_auto_adopt_blocked( + *, + report: DriftReport, + reason: str, + interpretation, +) -> None: + """Delegate drift emergency escalation to the service layer.""" + + from src.services.emergency_escalation_service import ( + escalate_drift_auto_adopt_blocked, + ) + + await escalate_drift_auto_adopt_blocked( + report=report, + reason=reason, + interpretation=interpretation, + ) + + async def _run_full_scan(namespaces: list[str]) -> None: """背景:完整漂移掃描""" detector = get_drift_detector() diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index da34c680..d1077d90 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -110,6 +110,34 @@ def _should_bypass_alertmanager_llm( and alert_category == "host_resource" ) + +async def _escalate_auto_repair_unavailable( + *, + incident_id: str, + approval_id: str, + alert_type: str, + target_resource: str, + namespace: str, + failure_reason: str, + attempted_actions: str, +) -> None: + """Delegate automation-blocker escalation to the service layer.""" + + from src.services.emergency_escalation_service import ( + escalate_auto_repair_unavailable, + ) + + await escalate_auto_repair_unavailable( + incident_id=incident_id, + approval_id=approval_id, + alert_type=alert_type, + target_resource=target_resource, + namespace=namespace, + failure_reason=failure_reason, + attempted_actions=attempted_actions, + ) + + async def _try_auto_repair_background( incident_id: str, approval_id: str, @@ -165,6 +193,15 @@ async def _try_auto_repair_background( "playbook_id": decision.playbook.playbook_id if decision.playbook else None, }, ) + await _escalate_auto_repair_unavailable( + incident_id=incident_id, + approval_id=approval_id, + alert_type=alert_type, + target_resource=target_resource, + namespace=namespace, + failure_reason=decision.reason, + attempted_actions=f"evaluate_auto_repair -> blocked:{decision.blocked_by}", + ) return # 記錄自動修復觸發 (Sprint 5.1 Q10: 加入 Langfuse trace_id 追蹤) @@ -228,6 +265,16 @@ async def _try_auto_repair_background( "namespace": namespace, }, ) + if not result.success: + await _escalate_auto_repair_unavailable( + incident_id=incident_id, + approval_id=approval_id, + alert_type=alert_type, + target_resource=target_resource, + namespace=namespace, + failure_reason=result.error or "auto repair execution failed", + attempted_actions=f"execute_auto_repair playbook:{result.playbook_id}", + ) # ADR-073 Phase 2-3: 自動修復結果 → 寫入 Incident outcome (2026-04-12 ogt) # 讓 KMConversionService 可依 outcome 判斷是否為 EXECUTION_SUCCESS diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py index 33cb7cc6..d207b8ac 100644 --- a/apps/api/src/core/config.py +++ b/apps/api/src/core/config.py @@ -525,7 +525,7 @@ class Settings(BaseSettings): description="OpenClaw AI Agent service URL", ) OPENCLAW_DEFAULT_MODEL: str = Field( - default="deepseek-r1:14b", # 2026-04-08 ogt: SRE最強推理,M1 Pro實測 13 tok/s + default="qwen2.5:7b-instruct", # 2026-04-30: DIAGNOSE/RCA 優先低延遲本地模型 description="Default Ollama model for RCA analysis", ) OPENCLAW_TIMEOUT: int = Field( diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index 6db38e13..a36dad0f 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -54,6 +54,26 @@ def _fire_and_forget(coro) -> asyncio.Task: return task +def _phase2_fallback_reason(package: Any) -> str | None: + """Return why a Phase 2 package should continue to Playbook/LLM fallback. + + Phase 2 is a deliberation layer, not the final brake. If the debate times + out or produces no actionable recommendation, the flywheel must still try + deterministic Playbook/LLM/expert paths before opening a manual gate. + """ + + status = getattr(package, "session_status", None) + status_value = status.value if status and hasattr(status, "value") else str(status or "") + if status_value in {"timeout", "failed"}: + return f"session_{status_value}" + if getattr(package, "all_agents_degraded", False): + return "all_agents_degraded" + action = str(getattr(package, "recommended_action", "") or "").strip() + if not action and getattr(package, "requires_human_approval", False): + return "empty_action_requires_human" + return None + + # ============================================================================= # Phase 31 (ADR-067 2026-04-10): Log 異常摘要 — NemoTron deepseek-r1:14b # ============================================================================= @@ -2455,13 +2475,26 @@ class DecisionManager: # evidence_snapshot 攜帶進 proposal_data,避免 singleton 並發污染 _p2_result = _package_to_proposal_data(package) _p2_result["_evidence_snapshot_ref"] = p2_snapshot - return _p2_result + _p2_fallback = _phase2_fallback_reason(package) + if not _p2_fallback: + return _p2_result + + logger.warning( + "p2_degraded_fallback_to_playbook_llm", + incident_id=incident.incident_id, + reason=_p2_fallback, + confidence=_p2_result.get("confidence"), + blocked_reason=_p2_result.get("blocked_reason", ""), + ) # snapshot 仍為 None → 降級繼續走原路徑(不阻塞) - logger.warning("p2_no_snapshot_fallback", incident_id=incident.incident_id) + if p2_snapshot is None: + logger.warning("p2_no_snapshot_fallback", incident_id=incident.incident_id) # Phase 7.5: 先嘗試 Playbook 匹配 playbook_result = await self._try_playbook_match(incident) if playbook_result: + if evidence_snapshot is not None: + playbook_result["_evidence_snapshot_ref"] = evidence_snapshot return playbook_result # MCP Phase 4c: Playbook 無命中 → 非同步產生 AI 草稿 Playbook (2026-04-11 Claude Sonnet 4.6) diff --git a/apps/api/src/services/emergency_escalation_service.py b/apps/api/src/services/emergency_escalation_service.py new file mode 100644 index 00000000..13a136bd --- /dev/null +++ b/apps/api/src/services/emergency_escalation_service.py @@ -0,0 +1,154 @@ +"""Emergency escalation service for automation blockers. + +Keeps Redis dedup, Telegram fanout, and operation-log writes out of API +routers while giving auto-repair / drift paths a single emergency channel. +""" + +from __future__ import annotations + +from typing import Any + +import structlog + +from src.core.config import settings +from src.core.redis_client import get_redis + +logger = structlog.get_logger(__name__) + + +async def escalate_auto_repair_unavailable( + *, + incident_id: str, + approval_id: str, + alert_type: str, + target_resource: str, + namespace: str, + failure_reason: str, + attempted_actions: str, +) -> None: + """Open an emergency channel when auto repair cannot safely continue.""" + + dedup_key = f"auto_repair:emergency_escalated:{incident_id}" + if not await _dedup_first_send(dedup_key, ttl=900, event="auto_repair"): + logger.info( + "auto_repair_escalation_dedup_skipped", + incident_id=incident_id, + approval_id=approval_id, + ) + return + + try: + from src.repositories.alert_operation_log_repository import ( + get_alert_operation_log_repository, + ) + from src.services.telegram_gateway import get_telegram_gateway + + await get_telegram_gateway().send_escalation_card( + incident_id=incident_id, + original_alertname=alert_type or "AutoRepairBlocked", + duration_min=0, + priority=0, + attempted_actions=attempted_actions, + failure_reason=failure_reason, + current_impact=f"target={target_resource or 'unknown'} namespace={namespace or 'unknown'}", + group_chat_id=settings.SRE_GROUP_CHAT_ID or None, + ) + + await get_alert_operation_log_repository().append( + "EMERGENCY_ESCALATED", + incident_id=incident_id, + approval_id=approval_id, + actor="auto_repair", + action_detail="auto_repair_unavailable_emergency_channel", + success=True, + context={ + "alert_type": alert_type, + "target_resource": target_resource, + "namespace": namespace, + "failure_reason": failure_reason, + "attempted_actions": attempted_actions, + }, + ) + logger.warning( + "auto_repair_emergency_escalated", + incident_id=incident_id, + approval_id=approval_id, + reason=failure_reason, + ) + except Exception as exc: + logger.warning( + "auto_repair_emergency_escalation_failed", + incident_id=incident_id, + approval_id=approval_id, + error=str(exc), + ) + + +async def escalate_drift_auto_adopt_blocked( + *, + report: Any, + reason: str, + interpretation: Any, +) -> None: + """Notify the emergency channel when drift cannot be auto-adopted safely.""" + + dedup_key = f"drift:auto_adopt_emergency:{report.report_id}" + if not await _dedup_first_send(dedup_key, ttl=3600, event="drift"): + logger.info("drift_emergency_escalation_dedup_skipped", report_id=report.report_id) + return + + try: + from src.services.telegram_gateway import get_telegram_gateway + + actionable_count = sum( + 1 for item in report.items + if not getattr(item, "is_allowlisted", False) + ) + intent = getattr(getattr(interpretation, "intent", None), "value", "unknown") + confidence = getattr(interpretation, "confidence", 0.0) if interpretation else 0.0 + risk = getattr(interpretation, "risk", "unknown") if interpretation else "unknown" + + await get_telegram_gateway().send_escalation_card( + incident_id=report.report_id, + original_alertname="ConfigDriftAutoAdoptBlocked", + duration_min=0, + priority=0 if report.high_count > 0 else 1, + attempted_actions="drift_interpreter -> auto_adopt_if_safe -> emergency_escalation", + failure_reason=reason, + current_impact=( + f"namespace={report.namespace} high={report.high_count} " + f"medium={report.medium_count} actionable={actionable_count} " + f"intent={intent} confidence={confidence:.0%} risk={risk}" + ), + group_chat_id=settings.SRE_GROUP_CHAT_ID or None, + ) + logger.warning( + "drift_auto_adopt_emergency_escalated", + report_id=report.report_id, + reason=reason, + high=report.high_count, + medium=report.medium_count, + actionable=actionable_count, + ) + except Exception as exc: + logger.warning( + "drift_emergency_escalation_failed", + report_id=report.report_id, + error=str(exc), + ) + + +async def _dedup_first_send(key: str, *, ttl: int, event: str) -> bool: + """Return True when this is the first escalation in the dedup window.""" + + try: + redis = get_redis() + return bool(await redis.set(key, "1", ex=ttl, nx=True)) + except Exception as exc: + logger.warning( + "emergency_escalation_dedup_failed_open", + key=key, + event=event, + error=str(exc), + ) + return True diff --git a/apps/api/src/services/failover_alerter.py b/apps/api/src/services/failover_alerter.py index 6dbb9f8a..4409a6af 100644 --- a/apps/api/src/services/failover_alerter.py +++ b/apps/api/src/services/failover_alerter.py @@ -131,9 +131,9 @@ class FailoverAlerter: msg = ( f"*Gemini 每日配額耗盡*\n\n" - f"日期:{date_str}\n" - f"上限:{quota} calls/day\n" - f"當前用量:{current_count}\n" + f"日期:{_escape_md(date_str)}\n" + f"上限:{_escape_md(str(quota))} calls/day\n" + f"當前用量:{_escape_md(str(current_count))}\n" f"降級目標:Nemotron → Claude \\(Gemini 不可用\\)\n\n" f"進入容災模式至明日 0:00\n" f"建議檢查是否有異常流量,評估是否升級 Gemini 配額" diff --git a/apps/api/tests/test_failover_alerter.py b/apps/api/tests/test_failover_alerter.py index 7039eee4..c21b195f 100644 --- a/apps/api/tests/test_failover_alerter.py +++ b/apps/api/tests/test_failover_alerter.py @@ -145,6 +145,7 @@ async def test_quota_alert_dedup_24h(mock_redis, mock_telegram_send): assert "Gemini 每日配額耗盡" in sent assert "1000" in sent assert "1003" in sent + assert "\\-" in sent # MarkdownV2 date hyphens must be escaped # 驗證 dedup TTL = 24h assert mock_redis.set.await_args_list[0].kwargs["ex"] == QUOTA_DEDUP_TTL_SEC diff --git a/apps/api/tests/test_phase2_fallback.py b/apps/api/tests/test_phase2_fallback.py new file mode 100644 index 00000000..ca4ad9b8 --- /dev/null +++ b/apps/api/tests/test_phase2_fallback.py @@ -0,0 +1,37 @@ +from types import SimpleNamespace + +from src.agents.protocol import AgentSessionStatus +from src.services.decision_manager import _phase2_fallback_reason + + +def _package(**kwargs): + data = { + "session_status": AgentSessionStatus.COMPLETED, + "all_agents_degraded": False, + "recommended_action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod", + "requires_human_approval": False, + } + data.update(kwargs) + return SimpleNamespace(**data) + + +def test_phase2_timeout_continues_to_fallback() -> None: + pkg = _package( + session_status=AgentSessionStatus.TIMEOUT, + recommended_action=None, + requires_human_approval=True, + ) + + assert _phase2_fallback_reason(pkg) == "session_timeout" + + +def test_phase2_empty_manual_gate_continues_to_fallback() -> None: + pkg = _package(recommended_action="", requires_human_approval=True) + + assert _phase2_fallback_reason(pkg) == "empty_action_requires_human" + + +def test_phase2_actionable_package_stays_primary() -> None: + pkg = _package() + + assert _phase2_fallback_reason(pkg) is None diff --git a/k8s/awoooi-prod/06-deployment-api.yaml b/k8s/awoooi-prod/06-deployment-api.yaml index 9ea43816..b82a877c 100644 --- a/k8s/awoooi-prod/06-deployment-api.yaml +++ b/k8s/awoooi-prod/06-deployment-api.yaml @@ -70,7 +70,7 @@ spec: - name: OLLAMA_URL value: "http://192.168.0.111:11434" - name: OPENCLAW_DEFAULT_MODEL - value: "deepseek-r1:14b" + value: "qwen2.5:7b-instruct" - name: OPENCLAW_TIMEOUT value: "120" - name: AGENT_DIAGNOSTICIAN_TIMEOUT_SEC