refactor(p1): LLM JSON parse helper 抽出 + coverage 閾值雙條件 (架構師 Review P1)
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 8m52s
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 8m52s
首席架構師 2026-04-19 Review (92/100 Grade A) 指出 P1 優化:
1. LLM JSON 3-path parse 邏輯在 4 scanner 重複 (~80 行 × 4 = 320 行)
2. coverage red>=20 觸發閾值偏低,生產 bootstrap 必觸發浪費 token
P1.1+1.2 新增 services/llm_json_parser.py (~90 行):
parse_llm_json_response(text, required_key, logger_context)
3-path fallback:
Path 1: 剝 markdown fence + 直接 JSON 含 required_key
Path 2: NemoTron wrapper (description/action_title/reasoning 內嵌 JSON)
Path 3: 所有失敗 return None + logger.warning
失敗永不 raise,呼叫者決定 fallback.
4 個 LLM scanner 改用 helper:
- hermes_rule_quality_job: required_key='recommended_actions'
- capacity_forecaster_job: required_key='priority_actions'
- compliance_scanner_job: required_key='posture_grade'
- coverage_evaluator_job: required_key='worst_dimension'
每個減少約 20 行重複.
P1.3 coverage 觸發條件改雙條件:
原: total_red >= 20 (bootstrap 必觸發)
新: red_ratio > 30% AND total_scanned >= 50
_fetch_red_summary 加 total_scanned 回傳供計算.
5/5 單元測試 parse_llm_json_response:
✅ direct / markdown fence / NemoTron wrapper / invalid / missing key
P1.4 capacity_scanner + rule_catalog_sync: 檢查後已有完整作者註解 (Review 誤判).
其他 P1 (Prom HTTP helper / first_delay 錯開 / LLM budget guard) 留下 session.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -183,9 +183,13 @@ _LLM_FORECAST_PROMPT = """你是 AWOOOI 容量規劃專家。以下 host 過去
|
||||
|
||||
|
||||
async def _llm_analyze_risk(host: str, findings: list[dict[str, Any]]) -> dict[str, Any] | None:
|
||||
"""用 OpenClaw 分析高風險 host. 失敗回 None 不阻塞."""
|
||||
"""用 OpenClaw 分析高風險 host. 失敗回 None 不阻塞.
|
||||
|
||||
2026-04-19 P1.2 重構: 改用 llm_json_parser.parse_llm_json_response 共用 helper.
|
||||
"""
|
||||
try:
|
||||
import json as _j
|
||||
from src.services.llm_json_parser import parse_llm_json_response
|
||||
from src.services.openclaw import get_openclaw
|
||||
|
||||
prompt = _LLM_FORECAST_PROMPT.format(
|
||||
@@ -197,26 +201,14 @@ async def _llm_analyze_risk(host: str, findings: list[dict[str, Any]]) -> dict[s
|
||||
if not success or not text:
|
||||
return None
|
||||
|
||||
_raw = text.strip()
|
||||
if _raw.startswith("```"):
|
||||
_raw = _raw.strip("`").lstrip("json").strip()
|
||||
|
||||
try:
|
||||
parsed = _j.loads(_raw)
|
||||
if isinstance(parsed, dict) and "priority_actions" in parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
# NemoTron wrapper fallback
|
||||
if isinstance(parsed, dict) and "description" in parsed:
|
||||
desc = str(parsed["description"]).strip()
|
||||
if desc.startswith("{"):
|
||||
inner = _j.loads(desc)
|
||||
if isinstance(inner, dict) and "priority_actions" in inner:
|
||||
inner["_llm_provider"] = provider
|
||||
return inner
|
||||
except (_j.JSONDecodeError, ValueError) as e:
|
||||
logger.warning("forecast_llm_parse_failed", host=host, error=str(e), raw=_raw[:200])
|
||||
return None
|
||||
parsed = parse_llm_json_response(
|
||||
text,
|
||||
required_key="priority_actions",
|
||||
logger_context=f"forecaster:{host}",
|
||||
)
|
||||
if parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
except Exception as e:
|
||||
logger.warning("forecast_llm_error", host=host, error=str(e))
|
||||
return None
|
||||
|
||||
@@ -423,9 +423,13 @@ async def _llm_analyze_compliance_posture(
|
||||
warning_assets: list[dict[str, Any]],
|
||||
stats: dict[str, Any],
|
||||
) -> dict[str, Any] | None:
|
||||
"""用 LLM 分析整體 compliance posture. 失敗回 None."""
|
||||
"""用 LLM 分析整體 compliance posture. 失敗回 None.
|
||||
|
||||
2026-04-19 P1.2 重構: 改用 llm_json_parser.parse_llm_json_response.
|
||||
"""
|
||||
try:
|
||||
import json as _j
|
||||
from src.services.llm_json_parser import parse_llm_json_response
|
||||
from src.services.openclaw import get_openclaw
|
||||
|
||||
prompt = _LLM_POSTURE_PROMPT.format(
|
||||
@@ -439,26 +443,12 @@ async def _llm_analyze_compliance_posture(
|
||||
if not success or not text:
|
||||
return None
|
||||
|
||||
_raw = text.strip()
|
||||
if _raw.startswith("```"):
|
||||
_raw = _raw.strip("`").lstrip("json").strip()
|
||||
|
||||
try:
|
||||
parsed = _j.loads(_raw)
|
||||
if isinstance(parsed, dict) and "posture_grade" in parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
# NemoTron wrapper fallback
|
||||
if isinstance(parsed, dict) and "description" in parsed:
|
||||
desc = str(parsed["description"]).strip()
|
||||
if desc.startswith("{"):
|
||||
inner = _j.loads(desc)
|
||||
if isinstance(inner, dict) and "posture_grade" in inner:
|
||||
inner["_llm_provider"] = provider
|
||||
return inner
|
||||
except (_j.JSONDecodeError, ValueError) as e:
|
||||
logger.warning("compliance_llm_parse_failed", error=str(e), raw=_raw[:200])
|
||||
return None
|
||||
parsed = parse_llm_json_response(
|
||||
text, required_key="posture_grade", logger_context="compliance",
|
||||
)
|
||||
if parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
except Exception as e:
|
||||
logger.warning("compliance_llm_error", error=str(e))
|
||||
return None
|
||||
|
||||
@@ -105,14 +105,21 @@ async def evaluate_once() -> dict[str, int]:
|
||||
duration_ms = int((_time.time() - started_ms) * 1000)
|
||||
|
||||
# Gap 3.3 LLM 升級: 分析 red 分布產補覆蓋建議
|
||||
# 只在有大量 red 時才跑 LLM (避免 well-covered 集群浪費 token)
|
||||
# 2026-04-19 P1.3 閾值調整 (架構師 review): 從「total_red >= 20」改雙條件
|
||||
# - 紅佔比 > 30%: 實質有治理缺口
|
||||
# - 且總 asset_scan >= 50: 樣本量足夠
|
||||
# 避免 bootstrap 首次 scan 必觸發 LLM 浪費 token.
|
||||
red_summary = await _fetch_red_summary()
|
||||
llm_analysis: dict[str, Any] | None = None
|
||||
if red_summary and red_summary.get("total_red", 0) >= 20:
|
||||
llm_analysis = await _llm_analyze_coverage_gaps(red_summary)
|
||||
if llm_analysis:
|
||||
stats["llm_analyzed"] = True
|
||||
await _send_telegram_gaps(red_summary, llm_analysis)
|
||||
if red_summary:
|
||||
total_red = red_summary.get("total_red", 0)
|
||||
total_scanned = red_summary.get("total_scanned", 0)
|
||||
red_ratio = (total_red / total_scanned) if total_scanned > 0 else 0.0
|
||||
if red_ratio > 0.3 and total_scanned >= 50:
|
||||
llm_analysis = await _llm_analyze_coverage_gaps(red_summary)
|
||||
if llm_analysis:
|
||||
stats["llm_analyzed"] = True
|
||||
await _send_telegram_gaps(red_summary, llm_analysis)
|
||||
|
||||
await _log_aol(stats, duration_ms, error_msg)
|
||||
|
||||
@@ -136,7 +143,10 @@ async def evaluate_once() -> dict[str, int]:
|
||||
# ============================================================================
|
||||
|
||||
async def _fetch_red_summary() -> dict[str, Any] | None:
|
||||
"""撈最新 run 的 red 分佈 + top red asset type."""
|
||||
"""撈最新 run 的 red 分佈 + top red asset type.
|
||||
|
||||
2026-04-19 P1.3: 加 total_scanned 供呼叫端算 red_ratio 做雙條件觸發.
|
||||
"""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
@@ -159,6 +169,17 @@ async def _fetch_red_summary() -> dict[str, Any] | None:
|
||||
if total_red == 0:
|
||||
return None
|
||||
|
||||
# 總 snapshot 數 (for red_ratio 計算)
|
||||
total_row = await db.execute(_sql("""
|
||||
SELECT count(*) AS total
|
||||
FROM asset_coverage_snapshot
|
||||
WHERE run_id = (
|
||||
SELECT run_id FROM asset_discovery_run
|
||||
WHERE status='success' ORDER BY ended_at DESC LIMIT 1
|
||||
)
|
||||
"""))
|
||||
total_scanned = int(total_row.scalar() or 0)
|
||||
|
||||
# Top red asset: 哪些 asset 被標最多 red
|
||||
asset_rows = await db.execute(_sql("""
|
||||
SELECT ai.asset_key, ai.asset_type, count(*) AS red_dims
|
||||
@@ -179,6 +200,7 @@ async def _fetch_red_summary() -> dict[str, Any] | None:
|
||||
]
|
||||
return {
|
||||
"total_red": total_red,
|
||||
"total_scanned": total_scanned,
|
||||
"by_dimension": by_dim,
|
||||
"top_red_assets": top_assets,
|
||||
}
|
||||
@@ -219,9 +241,13 @@ _LLM_COVERAGE_PROMPT = """你是 AWOOOI 可觀察性覆蓋率專家。以下是
|
||||
|
||||
|
||||
async def _llm_analyze_coverage_gaps(red_summary: dict[str, Any]) -> dict[str, Any] | None:
|
||||
"""LLM 分析 coverage 缺口. 失敗回 None."""
|
||||
"""LLM 分析 coverage 缺口. 失敗回 None.
|
||||
|
||||
2026-04-19 P1.2 重構: 改用 llm_json_parser.parse_llm_json_response.
|
||||
"""
|
||||
try:
|
||||
import json as _j
|
||||
from src.services.llm_json_parser import parse_llm_json_response
|
||||
from src.services.openclaw import get_openclaw
|
||||
|
||||
prompt = _LLM_COVERAGE_PROMPT.format(
|
||||
@@ -234,25 +260,12 @@ async def _llm_analyze_coverage_gaps(red_summary: dict[str, Any]) -> dict[str, A
|
||||
if not success or not text:
|
||||
return None
|
||||
|
||||
_raw = text.strip()
|
||||
if _raw.startswith("```"):
|
||||
_raw = _raw.strip("`").lstrip("json").strip()
|
||||
|
||||
try:
|
||||
parsed = _j.loads(_raw)
|
||||
if isinstance(parsed, dict) and "worst_dimension" in parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
if isinstance(parsed, dict) and "description" in parsed:
|
||||
desc = str(parsed["description"]).strip()
|
||||
if desc.startswith("{"):
|
||||
inner = _j.loads(desc)
|
||||
if isinstance(inner, dict) and "worst_dimension" in inner:
|
||||
inner["_llm_provider"] = provider
|
||||
return inner
|
||||
except (_j.JSONDecodeError, ValueError) as e:
|
||||
logger.warning("coverage_llm_parse_failed", error=str(e), raw=_raw[:200])
|
||||
return None
|
||||
parsed = parse_llm_json_response(
|
||||
text, required_key="worst_dimension", logger_context="coverage",
|
||||
)
|
||||
if parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
except Exception as e:
|
||||
logger.warning("coverage_llm_error", error=str(e))
|
||||
return None
|
||||
|
||||
@@ -143,9 +143,14 @@ _LLM_ANALYZE_PROMPT = """你是 AWOOOI SRE 告警規則品質分析專家。以
|
||||
|
||||
|
||||
async def _llm_analyze_noisy_rule(rule: dict[str, Any]) -> dict[str, Any] | None:
|
||||
"""用 OpenClaw (多 provider) 分析噪音真因. 失敗回 None 不阻塞."""
|
||||
"""用 OpenClaw (多 provider) 分析噪音真因. 失敗回 None 不阻塞.
|
||||
|
||||
2026-04-19 P1.2 重構: 使用 llm_json_parser.parse_llm_json_response 共用 helper
|
||||
(原 30 行重複 3-path parse 邏輯已抽出到 services/llm_json_parser.py).
|
||||
"""
|
||||
try:
|
||||
import json as _j
|
||||
from src.services.llm_json_parser import parse_llm_json_response
|
||||
from src.services.openclaw import get_openclaw
|
||||
|
||||
prompt = _LLM_ANALYZE_PROMPT.format(
|
||||
@@ -164,26 +169,14 @@ async def _llm_analyze_noisy_rule(rule: dict[str, Any]) -> dict[str, Any] | None
|
||||
if not success or not text:
|
||||
return None
|
||||
|
||||
_raw = text.strip()
|
||||
if _raw.startswith("```"):
|
||||
_raw = _raw.strip("`").lstrip("json").strip()
|
||||
|
||||
try:
|
||||
parsed = _j.loads(_raw)
|
||||
if isinstance(parsed, dict) and "recommended_actions" in parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
# NemoTron wrapper: description 內嵌 JSON
|
||||
if isinstance(parsed, dict) and "description" in parsed:
|
||||
desc = str(parsed["description"]).strip()
|
||||
if desc.startswith("{"):
|
||||
inner = _j.loads(desc)
|
||||
if isinstance(inner, dict) and "recommended_actions" in inner:
|
||||
inner["_llm_provider"] = provider
|
||||
return inner
|
||||
except (_j.JSONDecodeError, ValueError) as e:
|
||||
logger.warning("hermes_llm_parse_failed", rule=rule["rule_name"], error=str(e), raw=_raw[:200])
|
||||
return None
|
||||
parsed = parse_llm_json_response(
|
||||
text,
|
||||
required_key="recommended_actions",
|
||||
logger_context=f"hermes:{rule['rule_name']}",
|
||||
)
|
||||
if parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
except Exception as e:
|
||||
logger.warning("hermes_llm_analyze_error", rule=rule["rule_name"], error=str(e))
|
||||
return None
|
||||
|
||||
98
apps/api/src/services/llm_json_parser.py
Normal file
98
apps/api/src/services/llm_json_parser.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
LLM JSON Response Parser — 共用 helper (Gap 3 Review P1 優化)
|
||||
==============================================================
|
||||
4 個 LLM scanner 原本各自重複 3-path JSON parse 邏輯 (~80 行 × 4 = 320 行).
|
||||
抽成統一 helper,未來擴加 LLM 的 service 直接呼叫.
|
||||
|
||||
Origin (2026-04-19 pr-review-toolkit code review 指出):
|
||||
- hermes_rule_quality_job:167-186
|
||||
- capacity_forecaster_job:204-221
|
||||
- compliance_scanner_job:446-463
|
||||
- coverage_evaluator_job:241-257
|
||||
全部重複 strip ``` → direct parse → NemoTron wrapper → description 內嵌 JSON 的 pattern.
|
||||
|
||||
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json as _json
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
def parse_llm_json_response(
|
||||
text: str,
|
||||
required_key: str,
|
||||
logger_context: str = "",
|
||||
) -> dict[str, Any] | None:
|
||||
"""
|
||||
解析 LLM JSON 回應,含 3-path fallback.
|
||||
|
||||
Args:
|
||||
text: LLM 回傳的原始文字
|
||||
required_key: JSON 必須含的 key (e.g. 'posture_grade', 'priority_actions')
|
||||
用來區分「真 JSON」vs 「NemoTron wrapper」
|
||||
logger_context: log warning 時附加的 context (e.g. 'compliance_scan')
|
||||
|
||||
Returns:
|
||||
dict | None — 解析成功且含 required_key 回 dict,否則 None
|
||||
|
||||
3-path fallback:
|
||||
Path 1: 直接 JSON → 有 required_key 就用
|
||||
Path 2: NemoTron wrapper (description/action_title/reasoning 內嵌 JSON 字串)
|
||||
Path 3: Path 2 的 description 含 required_key 直接採用
|
||||
|
||||
失敗時 log warning 但不 raise,呼叫者決定如何 fallback.
|
||||
"""
|
||||
if not text:
|
||||
return None
|
||||
|
||||
_raw = text.strip()
|
||||
# 剝去 markdown code fence ```json...``` 或 ```...```
|
||||
if _raw.startswith("```"):
|
||||
_raw = _raw.strip("`").lstrip("json").strip()
|
||||
|
||||
try:
|
||||
parsed = _json.loads(_raw)
|
||||
except (_json.JSONDecodeError, ValueError) as e:
|
||||
logger.warning(
|
||||
"llm_json_parse_raw_failed",
|
||||
context=logger_context,
|
||||
error=str(e),
|
||||
raw_prefix=_raw[:200],
|
||||
)
|
||||
return None
|
||||
|
||||
if not isinstance(parsed, dict):
|
||||
return None
|
||||
|
||||
# Path 1: 直接 JSON 含 required_key
|
||||
if required_key in parsed:
|
||||
return parsed
|
||||
|
||||
# Path 2+3: NemoTron wrapper — description / action_title / reasoning 可能是內嵌 JSON
|
||||
for wrapper_key in ("description", "action_title", "reasoning"):
|
||||
desc = parsed.get(wrapper_key)
|
||||
if not desc:
|
||||
continue
|
||||
desc_str = str(desc).strip()
|
||||
# Path 2: description 是內嵌 JSON 字串
|
||||
if desc_str.startswith("{"):
|
||||
try:
|
||||
inner = _json.loads(desc_str)
|
||||
if isinstance(inner, dict) and required_key in inner:
|
||||
return inner
|
||||
except (_json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# Path 3: 所有嘗試都失敗
|
||||
logger.warning(
|
||||
"llm_json_missing_required_key",
|
||||
context=logger_context,
|
||||
required_key=required_key,
|
||||
keys_found=list(parsed.keys())[:10],
|
||||
)
|
||||
return None
|
||||
Reference in New Issue
Block a user