feat(p1+p3): logger 接 13 caller + Q&A/Nemotron/日報 feature flag 灰度

Phase 1 A4 — 13 個呼叫點接 ai_call_logger(覆蓋率 11.8% → 預估 50%+)
- TOP-1 nemoton_dispatcher: nemotron_dispatch caller (NIM 配額追蹤)
- TOP-2 openclaw_strategist: 4 reports (daily/weekly/monthly/meta) + qa caller
- TOP-3 hermes_analyst: hermes_analyst + hermes_intent (順修 commit 00591c5 殘留 bug)
- TOP-4 code_review_pipeline: code_review_hermes/openclaw/elephant 三鏈 (request_id 串)
- TOP-5 openclaw_bot_routes: openclaw_bot_main/gemini/nim 三層 fallback

Phase 3 A7 — OpenClaw Q&A → qwen3:14b(feature flag OFF)
- OPENCLAW_QA_OLLAMA_FIRST 灰度開關
- 繁中強制 system prompt + Gemini fallback chain
- _is_low_quality_response 品質守門(簡體字檢測 + 拒答訊號 + 結構分數)
- 黃金集 A/B 對照測試框架(10 樣本去 PII)

Phase 3 A8 — OpenClaw 日報 → Hermes 模板(feature flag OFF)
- OPENCLAW_DAILY_HERMES_TEMPLATE 灰度開關
- _compute_daily_kpi 純 SQL + Hermes 規則引擎
- _compute_gemini_insight 精簡 200 字洞察 prompt
- templates/daily_report_v2.j2 + _SafeUndefined 缺欄位優雅降級
- scripts/compare_daily_report_versions.py 雙版本盲測

Phase 3 A9 — Nemotron NIM → qwen3:14b(feature flag OFF)
- NEMOTRON_OLLAMA_FIRST 灰度開關(A2 紅燈:deepseek-r1 假支援,改 qwen3)
- _call_qwen3_dispatch + 既有 NIM tool_calls 解析共用
- 保留 ADR-004「🟡 [降級模式]」Hermes 規則引擎兜底

H6 PII fix — chat_id 進 ai_calls.meta 改 SHA1[:8](4 處 Bot Q&A)

Code Review pipeline — N3 動態 provider tag(gcp/secondary/111)+ A4 logger 三鏈

37 unit tests 全綠(routing 15 + golden 5 + qwen3 8 + daily template 8 + nemotron 1)

Operation Ollama-First v5.0 / Phase 1 A4 + Phase 3 A7+A8+A9

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OoO
2026-05-03 23:05:38 +08:00
parent 078bf2683c
commit 838267c293
12 changed files with 3122 additions and 297 deletions

View File

@@ -24,6 +24,7 @@ import os
import json
import re
import threading
import hashlib # Operation Ollama-First v5.0 P1: H6 PII fix — chat_id 進 meta 改 hash[:8]
from contextvars import ContextVar
from contextlib import contextmanager
import requests
@@ -48,6 +49,7 @@ from services.openclaw_bot.telegram_api import (
send_photo,
send_typing,
)
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
from services.openclaw_bot.menu_keyboards import (
_BACK,
_SUBMENUS,
@@ -85,7 +87,17 @@ try:
except ImportError:
_PCHOME_AVAILABLE = False
# V-New: 引入 Ollama 探測機制
try:
from services.ollama_service import resolve_ollama_host
_OLLAMA_AVAILABLE = True
except ImportError:
_OLLAMA_AVAILABLE = False
# AI 引擎Gemini Flash2~5秒→ NIM備援45~90秒
# LOCKED-GEMINI: PPT 簡報文案需長 context (5K+ rows + 多輪歷史) + 繁中商業敘事
# Ollama qwen2.5-coder:7b 為 PPT 失敗時 L3 fallback已在 _call_ollama 路徑)
# ADR-028 鎖定場景 #7
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '')
GEMINI_BASE_URL = 'https://generativelanguage.googleapis.com/v1beta/models'
GEMINI_MODEL = 'gemini-2.0-flash'
@@ -2469,6 +2481,29 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str:
.get('content', {}).get('parts', [{}])[0]
.get('text', '').strip())
def _call_ollama(prompt: str, tokens: int) -> str:
if not _OLLAMA_AVAILABLE:
return ""
try:
host = resolve_ollama_host()
# 簡報分析使用 qwen2.5-coder:7b (已升級 GCP) 或 hermes3
model = os.getenv('OPENCLAW_OLLAMA_MODEL', 'qwen2.5-coder:7b')
r = requests.post(
f"{host}/api/generate",
json={
'model': model,
'prompt': prompt,
'stream': False,
'options': {'num_predict': tokens, 'temperature': 0.3}
},
timeout=90
)
r.raise_for_status()
return r.json().get('response', '').strip()
except Exception as e:
sys_log.warning(f"[PPT] Ollama error: {e}")
return ""
if not NVIDIA_API_KEY:
if GEMINI_API_KEY:
try:
@@ -2532,6 +2567,29 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str:
return result_text
except Exception as e2:
sys_log.error(f"[PPT] Gemini fallback error: {e2}")
# ── Ollama (GCP/111) Final Fallback ───────────────────────
if _OLLAMA_AVAILABLE:
try:
sys_log.info("[PPT] Trying local/GCP Ollama as final fallback")
raw = _call_ollama(f"{sys_instruction}\n\n--- 資料 ---\n{prompt_data}", max_tokens)
result_text = _clean_ai_text(raw)
if result_text and len(result_text) > 100:
if _LEARNING_ENABLED:
import threading as _thr
_thr.Thread(
target=store_insight,
kwargs={
'insight_type': report_type or 'analysis',
'content': result_text,
'period': datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d'),
},
daemon=True
).start()
return result_text
except Exception as e3:
sys_log.error(f"[PPT] Ollama final fallback error: {e3}")
return 'AI 分析暫時無法使用,請稍後重試)'
@@ -6768,18 +6826,29 @@ def openclaw_answer(question: str, chat_id: int = None):
+ "請用繁體中文直接回答不要開場白300字以內。"
)
resp = ollama_service.generate(question, system_prompt=sys_prompt, timeout=180)
if resp.success and resp.content:
if chat_id:
openclaw_session.append_turn(chat_id, question, resp.content)
if _LEARNING_ENABLED:
import threading as _thr
_thr.Thread(target=store_conversation,
args=(0, 0, question, resp.content, "ollama", []),
daemon=True).start()
return resp.content, None
else:
sys_log.warning(f"[Ollama] 生成失敗: {resp.error}fallback 到 Gemini")
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Bot Q&A 主鏈 Ollama
_qa_req_id = f"qa-{chat_id or 0}-{int(_time_mod.time())}"
with log_ai_call(
caller='openclaw_bot_main',
provider='gcp_ollama',
model=getattr(ollama_service, 'model', 'llama3.1:8b'),
request_id=_qa_req_id,
meta={'chat_id_hash': hashlib.sha1(str(chat_id or 0).encode()).hexdigest()[:8], 'has_db_ctx': bool(db_ctx)},
) as _ctx:
resp = ollama_service.generate(question, system_prompt=sys_prompt, timeout=180)
if resp.success and resp.content:
if chat_id:
openclaw_session.append_turn(chat_id, question, resp.content)
if _LEARNING_ENABLED:
import threading as _thr
_thr.Thread(target=store_conversation,
args=(0, 0, question, resp.content, "ollama", []),
daemon=True).start()
return resp.content, None
else:
sys_log.warning(f"[Ollama] 生成失敗: {resp.error}fallback 到 Gemini")
_ctx.set_error(f"ollama generate failed: {resp.error}")
_ctx.fallback_to_caller('openclaw_bot_gemini')
except Exception as e:
sys_log.warning(f"[Ollama] 例外發生: {e}fallback 到 Gemini")
@@ -6817,15 +6886,30 @@ def openclaw_answer(question: str, chat_id: int = None):
"tool_config": {"function_calling_config": {"mode": "AUTO"}},
"generationConfig": {"temperature": 0.3, "maxOutputTokens": 600},
}
r1 = requests.post(
f"{GEMINI_BASE_URL}/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}",
headers={"Content-Type": "application/json"},
json=payload, timeout=30,
)
r1.raise_for_status()
resp1 = r1.json()
candidate = resp1.get("candidates", [{}])[0]
parts = candidate.get("content", {}).get("parts", [])
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Gemini FC 第一輪
_qa_gemini_req_id = f"qa-{chat_id or 0}-{int(_time_mod.time())}"
with log_ai_call(
caller='openclaw_bot_gemini',
provider='gemini',
model=GEMINI_MODEL,
request_id=_qa_gemini_req_id,
meta={'chat_id_hash': hashlib.sha1(str(chat_id or 0).encode()).hexdigest()[:8], 'turn': 1},
) as _ctx_g1:
r1 = requests.post(
f"{GEMINI_BASE_URL}/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}",
headers={"Content-Type": "application/json"},
json=payload, timeout=30,
)
r1.raise_for_status()
resp1 = r1.json()
# Gemini REST: usageMetadata.{promptTokenCount, candidatesTokenCount}
_um = resp1.get("usageMetadata", {}) or {}
_ctx_g1.set_tokens(
input=_um.get("promptTokenCount", 0),
output=_um.get("candidatesTokenCount", 0),
)
candidate = resp1.get("candidates", [{}])[0]
parts = candidate.get("content", {}).get("parts", [])
# 如果沒有 function call直接回傳文字
tool_calls = [p["functionCall"] for p in parts if "functionCall" in p]
@@ -6870,15 +6954,28 @@ def openclaw_answer(question: str, chat_id: int = None):
"maxOutputTokens": 600,
},
}
r2 = requests.post(
f"{GEMINI_BASE_URL}/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}",
headers={"Content-Type": "application/json"},
json=payload2, timeout=35,
)
r2.raise_for_status()
resp2 = r2.json()
parts2 = resp2.get("candidates", [{}])[0].get("content", {}).get("parts", [])
final = "".join(p.get("text", "") for p in parts2 if "text" in p).strip()
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Gemini FC 第二輪
with log_ai_call(
caller='openclaw_bot_gemini',
provider='gemini',
model=GEMINI_MODEL,
request_id=_qa_gemini_req_id,
meta={'chat_id_hash': hashlib.sha1(str(chat_id or 0).encode()).hexdigest()[:8], 'turn': 2, 'tools_used': used_sources},
) as _ctx_g2:
r2 = requests.post(
f"{GEMINI_BASE_URL}/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}",
headers={"Content-Type": "application/json"},
json=payload2, timeout=35,
)
r2.raise_for_status()
resp2 = r2.json()
_um2 = resp2.get("usageMetadata", {}) or {}
_ctx_g2.set_tokens(
input=_um2.get("promptTokenCount", 0),
output=_um2.get("candidatesTokenCount", 0),
)
parts2 = resp2.get("candidates", [{}])[0].get("content", {}).get("parts", [])
final = "".join(p.get("text", "") for p in parts2 if "text" in p).strip()
if final:
sys_log.info(f"[FC] done tools={used_sources} reply={len(final)}chars")
@@ -6931,19 +7028,34 @@ def openclaw_answer(question: str, chat_id: int = None):
+ f"\n用戶問:{question}\n"
"請用繁體中文直接回答不要開場白300字以內。"
)
r = requests.post(
f"{NVIDIA_BASE_URL}/chat/completions",
headers={"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json"},
json={
"model": CHAT_MODEL,
"messages": [{"role": "user", "content": nim_prompt}],
"max_tokens": 500, "temperature": 0.3,
},
timeout=20,
)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"].strip(), None
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Bot Q&A NIM 三層 fallback
_qa_nim_req_id = f"qa-{chat_id or 0}-{int(_time_mod.time())}"
with log_ai_call(
caller='openclaw_bot_nim',
provider='nim',
model=CHAT_MODEL,
request_id=_qa_nim_req_id,
meta={'chat_id_hash': hashlib.sha1(str(chat_id or 0).encode()).hexdigest()[:8], 'has_db_ctx': bool(db_ctx)},
) as _ctx_nim:
r = requests.post(
f"{NVIDIA_BASE_URL}/chat/completions",
headers={"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json"},
json={
"model": CHAT_MODEL,
"messages": [{"role": "user", "content": nim_prompt}],
"max_tokens": 500, "temperature": 0.3,
},
timeout=20,
)
r.raise_for_status()
_body = r.json()
_u = _body.get("usage", {}) or {}
_ctx_nim.set_tokens(
input=_u.get("prompt_tokens", 0),
output=_u.get("completion_tokens", 0),
)
return _body["choices"][0]["message"]["content"].strip(), None
except Exception as e:
sys_log.error(f"[FC] NIM fallback error: {e}")

View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""
scripts/compare_daily_report_versions.py
─────────────────────────────────────────────────────────────────
Operation Ollama-First v5.0 / Phase 3 / A8 — 日報雙版本盲測腳本
用途:
跑同一天的「舊版 Gemini 全文」vs「新版 Hermes 模板」
輸出兩份檔案到 reports/,供統帥盲測選擇預設模式。
使用:
python3 scripts/compare_daily_report_versions.py --date 2026-05-03
python3 scripts/compare_daily_report_versions.py # 預設昨日
紀律:
- 不寫入 ai_insights避免污染 production 資料)
- 不發 Telegram純 dry-run
- 兩版本皆獨立執行,互不干擾
- 失敗時報錯但不刪舊檔
"""
import os
import sys
import argparse
from datetime import date, timedelta, datetime
from pathlib import Path
# 確保可 import 本專案 services
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
def _ensure_reports_dir() -> Path:
reports_dir = ROOT / 'reports'
reports_dir.mkdir(exist_ok=True)
return reports_dir
def _run_legacy(target_date: date) -> str:
"""跑舊版_legacy_full_gemini_daily_report"""
os.environ['OPENCLAW_DAILY_HERMES_TEMPLATE'] = 'false'
# 強制 reload module避免 cache
import importlib
import services.openclaw_strategist_service as svc
importlib.reload(svc)
print(f"[legacy] 開始跑 Gemini 全文版日報 target_date={target_date}")
result = svc._legacy_full_gemini_daily_report()
return result.get('content', '') or result.get('report_content', '') or str(result)
def _run_hermes_template(target_date: date) -> str:
"""跑新版_generate_daily_report_hermes_template"""
os.environ['OPENCLAW_DAILY_HERMES_TEMPLATE'] = 'true'
import importlib
import services.openclaw_strategist_service as svc
importlib.reload(svc)
print(f"[hermes] 開始跑 Hermes 模板版日報 target_date={target_date}")
result = svc._generate_daily_report_hermes_template()
return result.get('content', '') or result.get('report_content', '') or str(result)
def main():
parser = argparse.ArgumentParser(description='OpenClaw 日報雙版本盲測')
parser.add_argument('--date', help='YYYY-MM-DD預設昨日')
args = parser.parse_args()
if args.date:
target_date = datetime.strptime(args.date, '%Y-%m-%d').date()
else:
target_date = date.today() - timedelta(days=1)
reports_dir = _ensure_reports_dir()
date_tag = target_date.strftime('%Y%m%d')
legacy_file = reports_dir / f'daily_report_legacy_{date_tag}.md'
hermes_file = reports_dir / f'daily_report_v2_{date_tag}.md'
# 跑舊版
try:
legacy_content = _run_legacy(target_date)
legacy_file.write_text(legacy_content, encoding='utf-8')
print(f"✅ legacy 版輸出:{legacy_file}")
except Exception as e:
print(f"❌ legacy 版失敗:{e}", file=sys.stderr)
# 跑新版
try:
hermes_content = _run_hermes_template(target_date)
hermes_file.write_text(hermes_content, encoding='utf-8')
print(f"✅ hermes 模板版輸出:{hermes_file}")
except Exception as e:
print(f"❌ hermes 版失敗:{e}", file=sys.stderr)
print(f"\n盲測檢查(建議):")
print(f" diff <(head -50 {legacy_file}) <(head -50 {hermes_file})")
print(f" wc -w {legacy_file} {hermes_file}")
print(f" # 統帥盲測時可遮 caller 名稱避免偏見")
if __name__ == '__main__':
main()

View File

@@ -29,7 +29,12 @@ from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
from services.hermes_analyst_service import HERMES_URL as _HERMES_URL, HERMES_MODEL as _HERMES_MODEL
# ADR-027 Phase 2 N3HERMES_MODEL 仍 import純常數HERMES_URL 改 lazy
# 每次 _hermes_scan 才透過 get_hermes_url() 取最新解析GCP 優先 / 111 備援),
# 避免 import-time freeze 導致主機切換不生效。
from services.hermes_analyst_service import HERMES_MODEL as _HERMES_MODEL
from config import get_hermes_url
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
logger = logging.getLogger(__name__)
@@ -38,6 +43,9 @@ _current_pipeline: Dict[str, Any] = {}
_pipeline_lock = threading.Lock()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
# LOCKED-GEMINI: Code Review 全 repo diff 可達 100K+ tokens超過 Ollama 32K context
# 未來可升 Claude Opus 4.7 (200K context, Arena code Elo 1548) — Phase 7 任務
# ADR-028 鎖定場景 #5
REVIEW_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash")
INTERNAL_TOKEN = os.getenv("INTERNAL_WEBHOOK_TOKEN", "")
AUTO_FIX_ENABLED = os.getenv("CODE_REVIEW_AUTO_FIX_ENABLED", "true").lower() == "true"
@@ -215,14 +223,36 @@ class CodeReviewPipeline:
只輸出 JSON 陣列,不含其他文字。無問題時輸出 []"""
resp = _req.post(
f"{_HERMES_URL}/api/generate",
json={"model": _HERMES_MODEL, "prompt": prompt,
"stream": False, "options": {"temperature": 0.1}},
timeout=120,
# ADR-027 Phase 2 N3lazy resolve Hermes 主機GCP 優先 / 111 備援),
# 避開 import-time freeze。provider 標籤跟著解析結果動態決定。
hermes_url = get_hermes_url()
provider_tag = (
'gcp_ollama' if ('34.21.145.224' in hermes_url or '34.143.170.20' in hermes_url)
else 'ollama_111' if '192.168.0.111' in hermes_url
else 'ollama_other'
)
resp.raise_for_status()
raw = resp.json().get("response", "").strip()
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Code Review Hermes scan
with log_ai_call(
caller='code_review_hermes',
provider=provider_tag,
model=_HERMES_MODEL,
request_id=f"cr-{self.commit_sha[:8]}",
meta={'commit': self.commit_sha[:8], 'branch': self.branch,
'files': len(files), 'host': hermes_url},
) as _ctx:
resp = _req.post(
f"{hermes_url}/api/generate",
json={"model": _HERMES_MODEL, "prompt": prompt,
"stream": False, "options": {"temperature": 0.1}},
timeout=120,
)
resp.raise_for_status()
body = resp.json()
_ctx.set_tokens(
input=body.get("prompt_eval_count", 0),
output=body.get("eval_count", 0),
)
raw = body.get("response", "").strip()
match = re.search(r"\[.*\]", raw, re.DOTALL)
if not match:
@@ -271,36 +301,70 @@ class CodeReviewPipeline:
<b>💡 架構優化方向</b>1條長期建議
<b>✅ 本次部署亮點</b>"""
# 優先 Gemini
# 優先 Gemini — Phase 1 v5.0 logger 追蹤
if GEMINI_API_KEY:
try:
import google.generativeai as genai
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel(
model_name=REVIEW_MODEL,
generation_config=genai.types.GenerationConfig(
temperature=0.3, max_output_tokens=1500,
),
system_instruction=system,
)
resp = model.generate_content(user_prompt, request_options={"timeout": 90})
return resp.text or ""
except Exception as e:
logger.warning("[CodeReview] OpenClaw Gemini 失敗,降級 ElephantAlpha: %s", e)
with log_ai_call(
caller='code_review_openclaw',
provider='gemini',
model=REVIEW_MODEL,
request_id=f"cr-{self.commit_sha[:8]}",
meta={'commit': self.commit_sha[:8], 'branch': self.branch},
) as _ctx:
try:
import google.generativeai as genai
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel(
model_name=REVIEW_MODEL,
generation_config=genai.types.GenerationConfig(
temperature=0.3, max_output_tokens=1500,
),
system_instruction=system,
)
resp = model.generate_content(user_prompt, request_options={"timeout": 90})
try:
usage = getattr(resp, 'usage_metadata', None)
if usage is not None:
_ctx.set_tokens(
input=getattr(usage, 'prompt_token_count', 0) or 0,
output=getattr(usage, 'candidates_token_count', 0) or 0,
)
except Exception:
pass
return resp.text or ""
except Exception as e:
logger.warning("[CodeReview] OpenClaw Gemini 失敗,降級 ElephantAlpha: %s", e)
_ctx.set_error(f"{type(e).__name__}: {e}")
_ctx.fallback_to_caller('code_review_elephant')
# 降級ElephantAlpha via OpenRouterOPENROUTER_API_KEY 容器內一定有)
try:
from services.elephant_service import elephant_service
resp = elephant_service.generate(
prompt=user_prompt,
system_prompt=system,
temperature=0.3,
timeout=90,
)
if resp.success:
return resp.content or ""
except Exception as e:
logger.warning("[CodeReview] OpenClaw ElephantAlpha 降級也失敗: %s", e)
# Phase 1 v5.0 logger 追蹤
with log_ai_call(
caller='code_review_elephant',
provider='nim_via_elephant',
model='nvidia/llama-3.3-nemotron-super-49b-v1.5',
request_id=f"cr-{self.commit_sha[:8]}",
meta={'commit': self.commit_sha[:8], 'branch': self.branch},
) as _ctx:
try:
from services.elephant_service import elephant_service
resp = elephant_service.generate(
prompt=user_prompt,
system_prompt=system,
temperature=0.3,
timeout=90,
)
if resp.success:
# ElephantResponse 已含 input_tokens/output_tokens
_ctx.set_tokens(
input=getattr(resp, 'input_tokens', 0) or 0,
output=getattr(resp, 'output_tokens', 0) or 0,
)
return resp.content or ""
else:
_ctx.set_error(getattr(resp, 'error', 'elephant generate failed'))
except Exception as e:
logger.warning("[CodeReview] OpenClaw ElephantAlpha 降級也失敗: %s", e)
_ctx.set_error(f"{type(e).__name__}: {e}")
return ""

View File

@@ -23,6 +23,8 @@ from typing import Optional
import requests
from sqlalchemy import text
from services.mcp_context_service import build_mcp_context
from services.ollama_service import resolve_ollama_host, get_host_label
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
logger = logging.getLogger(__name__)
@@ -155,32 +157,48 @@ class HermesAnalystService:
"keep_alive": HERMES_KEEP_ALIVE, # ADR-012避免冷啟動 timeout
"options": {"temperature": 0.1},
}
try:
resp = requests.post(
f"{HERMES_URL}/api/generate",
json=payload,
timeout=HERMES_TIMEOUT, # 統一 config 集中讀取ADR-008keep_alive 確保熱駐留時實測 < 10s
)
resp.raise_for_status()
raw = (resp.json().get("response", "") or "").strip()
if raw.startswith("```"):
raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE)
raw = re.sub(r"\s*```\s*$", "", raw.strip(), flags=re.MULTILINE).strip()
data = json.loads(raw)
return {
"intent": data.get("intent", "unknown"),
"confidence": float(data.get("confidence", 0.5)),
"complexity_score": float(data.get("complexity_score", 0.5)),
"requires_data_fetch": bool(data.get("requires_data_fetch", False)),
"preliminary_answer": data.get("preliminary_answer", "") or "",
"metadata": {"source": "hermes_llm"},
}
except Exception as e:
logger.warning(
f"[Hermes.intent] Ollama 連線失敗,降級規則引擎"
f"host={HERMES_URL} model={HERMES_MODEL} error={type(e).__name__}: {e}"
)
return None
target_host = resolve_ollama_host()
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Hermes 意圖分類 token / fallback
with log_ai_call(
caller='hermes_intent',
provider='gcp_ollama',
model=HERMES_MODEL,
meta={'host_label': get_host_label(target_host)},
) as _ctx:
try:
resp = requests.post(
f"{target_host}/api/generate",
json=payload,
timeout=HERMES_TIMEOUT, # 統一 config 集中讀取ADR-008keep_alive 確保熱駐留時實測 < 10s
)
resp.raise_for_status()
body = resp.json()
_ctx.set_tokens(
input=body.get("prompt_eval_count", 0),
output=body.get("eval_count", 0),
)
raw = (body.get("response", "") or "").strip()
if raw.startswith("```"):
raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE)
raw = re.sub(r"\s*```\s*$", "", raw.strip(), flags=re.MULTILINE).strip()
data = json.loads(raw)
return {
"intent": data.get("intent", "unknown"),
"confidence": float(data.get("confidence", 0.5)),
"complexity_score": float(data.get("complexity_score", 0.5)),
"requires_data_fetch": bool(data.get("requires_data_fetch", False)),
"preliminary_answer": data.get("preliminary_answer", "") or "",
"metadata": {"source": "hermes_llm"},
}
except Exception as e:
# NOTE: 修補 commit 00591c5 殘留的孤立 f-string原 logger.warning 被誤刪)
logger.warning(
f"[Hermes.intent] Ollama 連線失敗,降級規則引擎"
f"model={HERMES_MODEL} error={type(e).__name__}: {e}"
)
_ctx.set_error(f"{type(e).__name__}: {e}")
_ctx.fallback_to_caller('hermes_rule_engine')
return None
def _rule_based_intent(self, message: str) -> dict:
"""Ollama 掛掉時的規則引擎 fallback — 永遠返回結構化結果。"""
@@ -416,23 +434,46 @@ class HermesAnalystService:
"options": {"temperature": 0.1},
}
resp = requests.post(
f"{HERMES_URL}/api/generate",
json=payload,
timeout=HERMES_TIMEOUT,
)
resp.raise_for_status()
target_host = resolve_ollama_host()
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Hermes 競價分析 token / fallback
with log_ai_call(
caller='hermes_analyst',
provider='gcp_ollama',
model=HERMES_MODEL,
meta={
'host_label': get_host_label(target_host),
'item_count': len(items),
'top_n': TOP_N,
},
) as _ctx:
try:
resp = requests.post(
f"{target_host}/api/generate",
json=payload,
timeout=HERMES_TIMEOUT,
)
resp.raise_for_status()
except Exception as e:
_ctx.set_error(f"{type(e).__name__}: {e}")
raise
data = resp.json()
raw = data.get("response", "").strip()
duration_sec = round(data.get("total_duration", 0) / 1e9, 1)
eval_tokens = data.get("eval_count", "?") # Ollama 推理 token 數
logger.info(
f"[Hermes] 推理耗時 {duration_sec}s"
f"輸入 {len(items)}tokens={eval_tokens},回應長度 {len(raw)}"
)
# 儲存統計供 footprint 使用(掛在 instance 上供 run() 讀取)
self._last_stats = {"duration_sec": duration_sec, "tokens": eval_tokens}
data = resp.json()
raw = data.get("response", "").strip()
duration_sec = round(data.get("total_duration", 0) / 1e9, 1)
eval_tokens_raw = data.get("eval_count", 0) # Ollama 推理 token 數
prompt_tokens_raw = data.get("prompt_eval_count", 0)
_ctx.set_tokens(input=prompt_tokens_raw, output=eval_tokens_raw)
logger.info(
f"[Hermes] 推理耗時 {duration_sec}s"
f"輸入 {len(items)}tokens={eval_tokens_raw},回應長度 {len(raw)}"
)
# 儲存統計供 footprint 使用(掛在 instance 上供 run() 讀取)
self._last_stats = {
"duration_sec": duration_sec,
"tokens": eval_tokens_raw,
"host": target_host,
"host_label": get_host_label(target_host)
}
# P0-1 修復:剝除 Hermes 可能輸出的 markdown code fence
if raw.startswith("```"):

View File

@@ -27,6 +27,7 @@ import requests
from services.mcp_context_service import build_mcp_context
from config import HERMES_URL # ADR-008 集中化:禁止硬編碼 IP
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
logger = logging.getLogger(__name__)
@@ -107,6 +108,17 @@ NIM_TIMEOUT = 60 # 秒
NIM_DAILY_LIMIT = 80 # 留 20 個給 AWOOOI100/天免費配額
_nim_call_count = {"date": "", "count": 0}
# ── Operation Ollama-First v5.0 / Phase 3 / A9 ──────────────────
# GCP Ollama qwen3:14b 灰度切換開關
# - 預設 false → 行為與戰前完全相同(仍走 NIM
# - true → qwen3 主路徑NIM 降為備援,最後仍兜底 Hermes 規則引擎ADR-004
# 模型選擇A2 web-research 紅綠燈報告 docs/phase0_research_report_20260503.md
# 原戰役計畫 deepseek-r1:14b 的 Ollama tool_calls chat template 缺對應 jinja
# GitHub Issue #10935 未解),改採 qwen3:14bOllama 官方 + qwenlm 雙確認 tools 支援)。
NEMOTRON_OLLAMA_FIRST = os.getenv("NEMOTRON_OLLAMA_FIRST", "false").lower() == "true"
NEMOTRON_OLLAMA_MODEL = os.getenv("NEMOTRON_OLLAMA_MODEL", "qwen3:14b")
NEMOTRON_OLLAMA_TIMEOUT = int(os.getenv("NEMOTRON_OLLAMA_TIMEOUT", "180")) # 秒
def _check_nim_quota() -> bool:
today = datetime.now().strftime("%Y-%m-%d")
@@ -320,6 +332,68 @@ ICON_AI = "🧠"
ICON_FOOTPRINT = "⚙️"
# ── tool_calls 解析NIM 與 qwen3 共用)──────────────────────────
def _parse_tool_calls_struct(tool_calls: list) -> list:
"""從 OpenAI 格式的 tool_calls 結構陣列抽出 [{tool, args}] 清單。
NIM 與 qwen3 (Ollama /api/chat) 兩邊回應對齊 OpenAI schema
[{"function": {"name": ..., "arguments": <json-str-or-dict>}, ...}]
arguments 在 NIM 是 JSON 字串、在 Ollama 通常已是 dict本 helper 兼容兩者。
"""
results = []
for tc in tool_calls or []:
fn = tc.get("function", {}) if isinstance(tc, dict) else {}
if not fn:
continue
raw_args = fn.get("arguments", {})
if isinstance(raw_args, str):
try:
args = json.loads(raw_args) if raw_args.strip() else {}
except json.JSONDecodeError:
args = {}
elif isinstance(raw_args, dict):
args = raw_args
else:
args = {}
name = fn.get("name")
if name:
results.append({"tool": name, "args": args})
return results
def _parse_content_fallback(raw_content: str) -> list:
"""當模型沒回 tool_calls 結構、把工具呼叫塞進 content 時嘗試解析。
既有 NIM llama-3.1-8b 偶有此行為line 537-554 原邏輯);
qwen3 開 thinking_mode=False 後較少見,但保留同等容錯。
"""
if not raw_content or not isinstance(raw_content, str):
return []
try:
parsed = json.loads(raw_content.strip())
except Exception as parse_err:
logger.error(f"[ToolCalls] content fallback JSON 解析失敗:{parse_err}")
return []
if not isinstance(parsed, list):
return []
results = []
for item in parsed:
if not isinstance(item, dict):
continue
name = item.get("name") or (item.get("function", {}) or {}).get("name")
args = item.get("parameters") or item.get("arguments") or {}
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
if name:
results.append({"tool": name, "args": args})
if results:
logger.info(f"[ToolCalls] content fallback 解析成功,取得 {len(results)} 個 tool_calls")
return results
def _build_footprint_json(hermes_stats: Optional[dict], nim_stats: Optional[dict]) -> dict:
"""
建立結構化運算足跡 (用於 DB model_footprint JSONB 欄位)
@@ -331,7 +405,8 @@ def _build_footprint_json(hermes_stats: Optional[dict], nim_stats: Optional[dict
if hermes_stats:
result["analyst"] = {
"model": "qwen2.5:7b-instruct",
"host": HERMES_URL, # ADR-008集中讀取禁止硬編碼 IP
"host": hermes_stats.get("host", HERMES_URL),
"host_label": hermes_stats.get("host_label", "未知"),
"duration_sec": hermes_stats.get("duration_sec", 0),
"tokens": hermes_stats.get("tokens", 0),
"cost_usd": 0,
@@ -363,12 +438,13 @@ def _build_footprint_block(hermes_stats: Optional[dict], nim_stats: Optional[dic
if hermes_stats:
dur = hermes_stats.get("duration_sec", 0)
tok = hermes_stats.get("tokens", "?")
label = hermes_stats.get("host_label", "本地 188")
lines.append(
f"• 🔍 分析: Qwen2.5 7B (本地 188) | "
f"• 🔍 分析: Qwen2.5 7B ({label}) | "
f"耗時: {dur:.1f}s | Tokens: {tok} | $0 成本"
)
else:
lines.append("• 🔍 分析: Qwen2.5 7B (本地 188) | $0 成本")
lines.append("• 🔍 分析: Qwen2.5 7B (未知主機) | $0 成本")
if nim_stats:
tok = nim_stats.get("total_tokens", "?")
@@ -464,81 +540,208 @@ class NemotronDispatcher:
]
# P1-4 修復NIM API 指數退避 retry最多 3 次)
# Phase 1 v5.0: 包 ai_call_logger 追蹤 NIM 配額/tokens/錯誤
import time as _time
last_err = None
for _attempt in range(3):
try:
resp = requests.post(
f"{NIM_BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {NIM_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": NIM_MODEL,
"messages": messages,
"tools": TOOLS,
"tool_choice": "required",
"max_tokens": 2048,
},
timeout=NIM_TIMEOUT,
)
resp.raise_for_status()
break
except (requests.Timeout, requests.HTTPError) as e:
last_err = e
# ADR-004: 429 不重試,立即拋出讓上層啟動 Hermes 規則引擎降級
if isinstance(e, requests.HTTPError) and e.response is not None \
and e.response.status_code == 429:
logger.warning("[NIM] HTTP 429 速率限制,跳出 retry 迴圈")
raise
if _attempt < 2:
_time.sleep(2 ** _attempt)
logger.warning(f"[NIM] retry {_attempt + 1}/2 after {e}")
else:
raise last_err
with log_ai_call(
caller='nemotron_dispatch',
provider='nim',
model=NIM_MODEL,
meta={'threat_count': len(threats), 'quota_used': _nim_quota_used()},
) as _ctx:
for _attempt in range(3):
try:
resp = requests.post(
f"{NIM_BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {NIM_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": NIM_MODEL,
"messages": messages,
"tools": TOOLS,
"tool_choice": "required",
"max_tokens": 2048,
},
timeout=NIM_TIMEOUT,
)
resp.raise_for_status()
break
except (requests.Timeout, requests.HTTPError) as e:
last_err = e
# ADR-004: 429 不重試,立即拋出讓上層啟動 Hermes 規則引擎降級
if isinstance(e, requests.HTTPError) and e.response is not None \
and e.response.status_code == 429:
logger.warning("[NIM] HTTP 429 速率限制,跳出 retry 迴圈")
_ctx.set_error(f"NIM 429 rate-limited")
_ctx.fallback_to_caller('hermes_rule_engine')
raise
if _attempt < 2:
_time.sleep(2 ** _attempt)
logger.warning(f"[NIM] retry {_attempt + 1}/2 after {e}")
else:
raise last_err
body = resp.json()
usage = body.get("usage", {})
nim_stats = {
"total_tokens": usage.get("total_tokens", 0),
"quota_used": _nim_quota_used(),
}
body = resp.json()
usage = body.get("usage", {})
# 記錄 token / 成本到 ai_calls 表
_ctx.set_tokens(
input=usage.get("prompt_tokens", 0),
output=usage.get("completion_tokens", 0),
)
nim_stats = {
"total_tokens": usage.get("total_tokens", 0),
"quota_used": _nim_quota_used(),
}
choices = body.get("choices", [])
tool_calls = choices[0].get("message", {}).get("tool_calls", []) if choices else []
message = choices[0].get("message", {}) if choices else {}
tool_calls = message.get("tool_calls", []) or []
results = []
for tc in tool_calls:
fn = tc.get("function", {})
try:
args = json.loads(fn.get("arguments", "{}"))
except json.JSONDecodeError:
args = {}
results.append({"tool": fn.get("name"), "args": args})
# 共用結構解析NIM / qwen3 兩邊統一走同一條)
results = _parse_tool_calls_struct(tool_calls)
if not results:
# llama-3.1-8b-instruct 有時把 tool call 寫進 content 而非 tool_calls 結構
raw_content = choices[0].get("message", {}).get("content", "") if choices else ""
raw_content = message.get("content", "") or ""
logger.warning(f"[NIM] 0 tool_calls嘗試從 content 解析:{raw_content[:120]}")
try:
parsed = json.loads(raw_content.strip())
if isinstance(parsed, list):
for item in parsed:
name = item.get("name") or item.get("function", {}).get("name")
args = item.get("parameters") or item.get("arguments") or {}
if isinstance(args, str):
args = json.loads(args)
if name:
results.append({"tool": name, "args": args})
if results:
logger.info(f"[NIM] content fallback 解析成功,取得 {len(results)} 個 tool_calls")
except Exception as parse_err:
logger.error(f"[NIM] content fallback 解析失敗:{parse_err}")
results = _parse_content_fallback(raw_content)
logger.info(f"[NIM] 收到 {len(results)} 個 tool_calls | tokens={nim_stats['total_tokens']}")
return results, nim_stats
# ──────────────────────────────────────────────
# GCP Ollama qwen3:14b Tool CallingOperation Ollama-First v5.0 / Phase 3
# ──────────────────────────────────────────────
def _call_qwen3_dispatch(self, threats: list) -> tuple:
"""
將 Hermes 威脅清單交給 GCP Ollama qwen3:14b取得 tool_calls 決策。
Why qwen3:14bA2 web-research 結論docs/phase0_research_report_20260503.md
- Ollama registry 官方頁 + qwenlm.github.io 雙確認 tools capability 可用
- 預設可關閉 thinking mode避免 deepseek-r1 的 30s thinking 延遲)
- 14B 體積 9.3GB,與 deepseek-r1:14b 同級
- 與 NIM 一致採 OpenAI 兼容 chat completion + tools schema
Returns:
(list of {"tool": str, "args": dict}, dict ollama_stats)
ollama_stats: {"total_tokens": int, "host": str, "model": str}
"""
from services.ollama_service import resolve_ollama_host, mark_unhealthy
host = resolve_ollama_host().rstrip("/")
threat_summary = json.dumps(
[
{
"sku": t.sku,
"name": t.name,
"momo_price": t.momo_price,
"pchome_price": t.pchome_price,
"gap_pct": t.gap_pct,
"sales_delta": t.sales_7d_delta_pct,
"risk": t.risk,
"action": t.recommended_action,
"confidence": t.confidence,
}
for t in threats
],
ensure_ascii=False,
)
# 注入 MCP 市場上下文(與 NIM 路徑一致)
mcp_ctx = build_mcp_context()
# System prompt 與 NIM 完全一致(避免兩套維護)
system_prompt = (
"你是台灣電商競價情報的行動派發器。"
f"當前市場背景 (MCP)\n{mcp_ctx}\n\n"
"根據 Hermes 分析師提供的威脅清單,決定對每支商品呼叫哪個工具。\n"
"路由鐵律(依序判斷,命中即停):\n"
"1. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review"
"concern 說明『價差接近 0 但銷量大幅下滑,疑似缺貨/下架/平台流量異常,請人工走查前台』。\n"
"2. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert填入 momo_price, comp_price\n"
"3. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
"4. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
"每支商品只呼叫一個工具。\n"
"【語言鐵律 — 台灣標準正體中文(繁體)】所有文字欄位必須遵守:\n"
" 1. 嚴禁簡體字、嚴禁異體字(例:不可用「亊」,必須用「事」)\n"
" 2. 嚴禁短語重複(語意坍塌)、嚴禁無意義字元組合\n"
"若無法產出合理的繁體中文說明,直接輸出「請人工評估議價空間」。"
)
payload = {
"model": NEMOTRON_OLLAMA_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"請處理以下 {len(threats)} 筆威脅清單:\n{threat_summary}"},
],
"tools": TOOLS, # 重用既有 NIM tools schema
"stream": False,
"options": {
"temperature": 0.2,
"num_predict": 2048,
},
}
with log_ai_call(
caller='nemotron_dispatch',
provider='gcp_ollama',
model=NEMOTRON_OLLAMA_MODEL,
request_id=f"nem-{int(time.time())}",
meta={
'flag': 'NEMOTRON_OLLAMA_FIRST',
'threats_count': len(threats),
'host': host,
},
) as ctx:
try:
resp = requests.post(
f"{host}/api/chat",
json=payload,
timeout=NEMOTRON_OLLAMA_TIMEOUT,
)
resp.raise_for_status()
body = resp.json()
except Exception as e:
# 連線/HTTP 失敗 → 標記主機 unhealthy + log 錯誤後 re-raise由 dispatch 走 NIM fallback
ctx.set_error(f"qwen3 call failed: {type(e).__name__}: {e}")
ctx.fallback_to_caller('nim')
mark_unhealthy(host)
raise
ctx.set_tokens(
input=body.get('prompt_eval_count', 0),
output=body.get('eval_count', 0),
)
msg = body.get('message', {}) if isinstance(body, dict) else {}
tool_calls = msg.get('tool_calls', []) or []
# 走共用 tool_calls 結構解析(與 NIM 同一條 helper
results = _parse_tool_calls_struct(tool_calls)
if not results:
# qwen3 沒回 tool_calls → 走既有 content fallback 解析
raw_content = msg.get('content', '') or ''
logger.warning(
f"[Dispatcher][qwen3] 0 tool_calls嘗試從 content 解析:{raw_content[:120]}"
)
results = _parse_content_fallback(raw_content)
ollama_stats = {
"total_tokens": (body.get('prompt_eval_count', 0) or 0)
+ (body.get('eval_count', 0) or 0),
"host": host,
"model": NEMOTRON_OLLAMA_MODEL,
}
logger.info(
f"[Dispatcher][qwen3] 收到 {len(results)} 個 tool_calls | "
f"tokens={ollama_stats['total_tokens']} host={host}"
)
return results, ollama_stats
# ──────────────────────────────────────────────
# ADR-004Hermes 規則引擎降級路由
# ──────────────────────────────────────────────
@@ -1190,6 +1393,51 @@ class NemotronDispatcher:
"nim_stats": {},
}
# ── Operation Ollama-First v5.0 / Phase 3 / A9qwen3 主路徑feature flag 灰度)──
# 預設 NEMOTRON_OLLAMA_FIRST=false 時不進入此分支,行為與戰前完全相同。
# 若 qwen3 成功取得 tool_calls沿用既有 TOOL_MAP 執行邏輯(共用 footprint/threat 注入)。
# 若 qwen3 失敗或 0 tool_calls → 不直接降到 Hermes 規則,先嘗試 NIM 備援,再走 ADR-004。
qwen3_used = False
qwen3_stats: Optional[dict] = None
qwen3_tool_calls: Optional[list] = None
if NEMOTRON_OLLAMA_FIRST:
try:
qwen3_tool_calls, qwen3_stats = self._call_qwen3_dispatch(nim_candidates)
if qwen3_tool_calls:
qwen3_used = True
logger.info(
f"[Dispatcher][qwen3] 主路徑成功 tool_calls={len(qwen3_tool_calls)} "
f"tokens={qwen3_stats.get('total_tokens', 0)}"
)
else:
logger.warning("[Dispatcher][qwen3] 0 tool_callsfallback 至 NIM")
except Exception as e:
logger.warning(f"[Dispatcher][qwen3] 呼叫失敗 fallback NIM: {e}")
# log_ai_call 已在 _call_qwen3_dispatch 內標記 status=error + fallback_to=nim
qwen3_tool_calls = None
qwen3_stats = None
# qwen3 主路徑成功 → 直接進入工具執行區塊(跳過 NIM
if qwen3_used:
tool_calls = qwen3_tool_calls
# 與既有 NIM 路徑一致的 stats 結構footprint 顯示用)
nim_stats = {
"total_tokens": qwen3_stats.get("total_tokens", 0),
"quota_used": _nim_quota_used(), # 配額未動用
"provider": "gcp_ollama",
"model": qwen3_stats.get("model", NEMOTRON_OLLAMA_MODEL),
}
return self._execute_tool_calls(
tool_calls=tool_calls,
threats=threats,
hermes_stats=hermes_stats,
nim_stats=nim_stats,
pre_dispatched=dispatched,
pre_skipped=skipped,
pre_errors=errors,
)
# ── 進入 NIM 路徑flag=false 預設主路徑flag=true 則為 qwen3 失敗備援)──
if not NIM_API_KEY:
logger.warning("[Dispatcher][ADR-004] NVIDIA_API_KEY 未設定,啟動 Hermes 規則引擎降級")
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
@@ -1249,11 +1497,38 @@ class NemotronDispatcher:
"nim_stats": fb["nim_stats"],
}
# 建立運算足跡Telegram 顯示文字 + DB 結構化 JSON共用同一份
return self._execute_tool_calls(
tool_calls=tool_calls,
threats=threats,
hermes_stats=hermes_stats,
nim_stats=nim_stats,
pre_dispatched=dispatched,
pre_skipped=skipped,
pre_errors=errors,
)
# ──────────────────────────────────────────────
# tool_calls 執行區塊NIM 與 qwen3 共用)
# ──────────────────────────────────────────────
def _execute_tool_calls(
self,
tool_calls: list,
threats: list,
hermes_stats: Optional[dict],
nim_stats: dict,
pre_dispatched: int = 0,
pre_skipped: int = 0,
pre_errors: Optional[list] = None,
) -> dict:
"""執行 LLM 回傳的 tool_calls 清單,注入 Python 獨裁的客觀數字 + 金額影響。
被 NIM 路徑與 qwen3 路徑共用,避免雙路雙維護。
"""
errors = list(pre_errors or [])
dispatched = pre_dispatched
footprint_text = _build_footprint_block(hermes_stats, nim_stats)
footprint_data = _build_footprint_json(hermes_stats, nim_stats)
# 建立 SKU → threat 的查詢字典(供 add_to_recommendation 寫入快照)
threat_map = {t.sku: t for t in threats}
TOOL_MAP = {
@@ -1266,20 +1541,15 @@ class NemotronDispatcher:
for tc in tool_calls:
tool_name = tc.get("tool")
args = tc.get("args", {})
args = dict(tc.get("args", {}) or {})
handler = TOOL_MAP.get(tool_name)
if not handler:
errors.append(f"未知工具: {tool_name}")
continue
# 注入通用參數Telegram 文字 + DB JSON 足跡
args["footprint"] = footprint_text
# [2026-04-18 台北] Bug-1 防線一 保險:所有客觀數字強制由 Python 從 threat_map 注入,
# 覆蓋 LLM 可能回吐的幻覺數字(例如 $0。Layer A Hermes 根治是主防線,
# 此處為二道屏障(萬一 ground_items 有漏網,或未來走 bypass — Claude Opus 4.7
# [2026-05-02 台北] B' 軌:金額影響量化亦走 Python 獨裁注入 — Claude Opus 4.7
t = threat_map.get(args.get("sku"))
if tool_name == "trigger_price_alert" and t:
args["momo_price"] = getattr(t, "momo_price", None)
@@ -1302,7 +1572,6 @@ class NemotronDispatcher:
args["threat"] = t
elif tool_name == "route_to_km":
args["threat"] = t
# mark_for_relearn 無需注入客觀數字(僅寫 DB
try:
handler(**args)
@@ -1311,11 +1580,13 @@ class NemotronDispatcher:
errors.append(f"{tool_name}({args.get('sku', '?')}): {e}")
logger.error(f"[Dispatcher] 工具執行失敗 [{tool_name}]: {e}")
skipped = len(threats) - dispatched
skipped = max(0, len(threats) - dispatched)
# nim_stats 在 qwen3 路徑下會帶 provider='gcp_ollama'log 出處可區辨
provider = nim_stats.get("provider", "nim") if isinstance(nim_stats, dict) else "nim"
logger.info(
f"[Dispatcher] 完成 forced_review={len(forced_review)} "
f"[Dispatcher] 完成 provider={provider} "
f"dispatched={dispatched} skipped={skipped} "
f"errors={len(errors)} nim_tokens={nim_stats.get('total_tokens', 0)}"
f"errors={len(errors)} tokens={nim_stats.get('total_tokens', 0)}"
)
return {
"dispatched": dispatched,

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,63 @@
📊 momo 日報 {{ today }} ({{ weekday }})
═══════════════════════════════════════
## 📈 營收 KPI
| 指標 | 今日 | vs 昨日 | vs 7日均 |
|------|------|---------|----------|
| 營收 | {{ revenue.today | format_currency }} | {{ revenue.dod_pct | format_pct }} | {{ revenue.wow_pct | format_pct }} |
| 訂單筆數 | {{ orders.today_rows | default('—') }} | — | — |
| 上架 SKU | {{ orders.today_sku | default('—') }} | — | — |
| 平均客單 | {{ orders.avg_value_today | format_currency }} | — | — |
{% if revenue.today == 0 %}
⚠️ 今日營收為零,請檢查資料管線是否正常。
{% endif %}
## 🏆 TOP {{ top_skus | length }} 熱銷商品
{% if top_skus %}
{% for sku in top_skus %}
{{ loop.index }}. **{{ sku.name | default('—') }}**
數量:{{ sku.qty | default('—') }} 件 | 營收:{{ sku.revenue | format_currency }}
{% endfor %}
{% else %}
(今日無熱銷資料)
{% endif %}
## 🔍 競品價差警示
{% if price_gaps %}
{% for alert in price_gaps %}
- ⚠️ **{{ alert.sku_name | default(alert.sku) | default('—') }}**:我方 {{ alert.momo_price | format_currency }} vs {{ alert.competitor | default('競品') }} {{ alert.comp_price | format_currency }} (價差 {{ alert.gap_pct | format_pct }}
{% endfor %}
{% else %}
✅ 暫無重大價差警示
{% endif %}
## 📦 庫存異常
{% if inventory_alerts %}
{% for alert in inventory_alerts %}
- 🟡 {{ alert.sku_name | default(alert.sku) | default('—') }}{{ alert.reason | default('—') }}
{% endfor %}
{% else %}
✅ 庫存狀態正常
{% endif %}
## 💡 今日洞察 (AI 分析)
{{ gemini_insight | default('(本日洞察生成失敗,請查 logger') }}
## ✅ 48h 優先事項
{% if priority_actions %}
{% for action in priority_actions %}
{{ loop.index }}. {{ action }}
{% endfor %}
{% else %}
(暫無自動產生的行動建議)
{% endif %}
═══════════════════════════════════════
🤖 Operation Ollama-First v5.0 / daily_report_v2 (Hermes 模板模式)

15
tests/conftest.py Normal file
View File

@@ -0,0 +1,15 @@
import os
import pytest
@pytest.fixture
def host():
"""Provide a default SMTP/IMAP host for non-parametric email probe tests."""
return os.getenv("MOOD_TEST_MAIL_HOST", "ms1.pchome.tw")
@pytest.fixture
def port():
"""Provide a default SMTP/IMAP port used by probe tests."""
return int(os.getenv("MOOD_TEST_MAIL_PORT", "587"))

View File

@@ -0,0 +1,456 @@
"""
test_nemotron_qwen3_compat.py
─────────────────────────────────────────────────────────────────
Operation Ollama-First v5.0 / Phase 3 / A9 — Nemotron qwen3 切換相容性測試
驗證面:
T1. qwen3 chat 回應 OpenAI tool_calls 結構 → _parse_tool_calls_struct 正確
T2. qwen3 沒回 tool_calls → _parse_content_fallback 正確(與 NIM 同等容錯)
T3. qwen3 同時回 tool_calls + content → 優先採用 tool_calls
T4. qwen3 連線失敗 → 不丟例外給上游,自動 fallback NIM 路徑
T5. qwen3 + NIM 都失敗 → ADR-004 走 Hermes 規則引擎降級(含「🟡 [規則引擎]」標記)
T6. NEMOTRON_OLLAMA_FIRST 預設 false → 完全不呼叫 qwen3戰前行為
紀律:
- 所有 HTTP 互動 mock不實際呼叫 GCP Ollama 或 NIM
- 與 test_nemotron_fallback 共存,使用同款 FakeThreat
- assert log_ai_call 路徑可被 monkeypatch 旁路(不污染 ai_calls 表)
"""
from dataclasses import dataclass
from contextlib import contextmanager
import pytest
# ─────────────────────────────────────────────────────────────
# Fixtures
# ─────────────────────────────────────────────────────────────
@dataclass
class FakeThreat:
sku: str = "SKU-Q1"
name: str = "qwen3 測試品"
momo_price: float = 1200.0
pchome_price: float = 980.0
gap_pct: float = 22.4
sales_7d_delta_pct: float = -35.0
risk: str = "HIGH"
recommended_action: str = "建議跟進降價"
confidence: float = 0.85
sales_7d_curr_amount: float = 78000.0
sales_7d_prev_amount: float = 120000.0
class _FakeResp:
def __init__(self, payload: dict, status: int = 200):
self._payload = payload
self.status_code = status
def raise_for_status(self):
if self.status_code >= 400:
import requests
raise requests.HTTPError(f"HTTP {self.status_code}", response=self)
def json(self):
return self._payload
@contextmanager
def _noop_log_ai_call(*args, **kwargs):
"""Mock log_ai_call context manager — 不寫 ai_calls 表,回傳具備所需 setter 的 stub"""
class _Ctx:
def set_tokens(self, **_kw): pass
def set_error(self, *_a, **_kw): pass
def fallback_to_caller(self, *_a, **_kw): pass
def set_cache_hit(self, *_a, **_kw): pass
def add_meta(self, *_a, **_kw): pass
yield _Ctx()
@pytest.fixture(autouse=True)
def _reset_global_state():
"""test 互相污染防線:每個 test 前後清 _ALERT_CACHE + ollama unhealthy marks。
根因dispatch() line 1328 _is_duplicate_alert 用 module-level _ALERT_CACHE
第一個 test 跑完留 "SKU-Q1" 在 cache後續 test 命中去重 → dispatched=0 失敗。
"""
import services.nemoton_dispatcher_service as _nem
import services.ollama_service as _oss
_nem._ALERT_CACHE.clear()
_oss._unhealthy_marks.clear()
_oss._resolved_host_cache['host'] = None
_oss._resolved_host_cache['ts'] = 0
yield
_nem._ALERT_CACHE.clear()
_oss._unhealthy_marks.clear()
_oss._resolved_host_cache['host'] = None
_oss._resolved_host_cache['ts'] = 0
def _patch_execution_methods(monkeypatch, dispatcher):
"""攔截實際 Telegram/DB 寫入,記錄被呼叫的 tool 名稱與 args與 fallback test 共用 pattern"""
calls = []
def record(kind):
def _inner(*args, **kwargs):
calls.append({"kind": kind, "args": args, "kwargs": kwargs})
return _inner
monkeypatch.setattr(dispatcher, "_exec_trigger_price_alert", record("price_alert"))
monkeypatch.setattr(dispatcher, "_exec_add_to_recommendation", record("recommendation"))
monkeypatch.setattr(dispatcher, "_exec_flag_for_human_review", record("human_review"))
return calls
def _enable_qwen3_path(monkeypatch, module):
"""打開 NEMOTRON_OLLAMA_FIRST + 旁路 mcp/log_ai_call/resolve_host 等副作用"""
monkeypatch.setattr(module, "NEMOTRON_OLLAMA_FIRST", True)
monkeypatch.setattr(module, "log_ai_call", _noop_log_ai_call)
monkeypatch.setattr(module, "build_mcp_context", lambda: "MCP-MOCK")
# 確保即使未被呼叫import 路徑可解析
import services.ollama_service as ollama_module
monkeypatch.setattr(ollama_module, "resolve_ollama_host", lambda: "http://gcp-mock:11434")
monkeypatch.setattr(ollama_module, "mark_unhealthy", lambda *a, **kw: None)
# ─────────────────────────────────────────────────────────────
# T1. qwen3 OpenAI tool_calls 結構 → 正確解析
# ─────────────────────────────────────────────────────────────
def test_qwen3_tool_calls_struct_parsed_and_executed(monkeypatch):
"""qwen3 回標準 OpenAI tool_calls 結構dispatcher 應跳過 NIM 直接走工具執行"""
import services.nemoton_dispatcher_service as module
_enable_qwen3_path(monkeypatch, module)
# mock GCP Ollama /api/chat 回 OpenAI 兼容結構
fake_body = {
"message": {
"role": "assistant",
"content": "",
"tool_calls": [
{
"function": {
"name": "trigger_price_alert",
"arguments": {
"sku": "SKU-Q1",
"name": "qwen3 測試品",
"gap_pct": 22.4,
"sales_delta": -35.0,
"action": "跟進降價至 $980",
"confidence": 0.85,
},
}
}
],
},
"prompt_eval_count": 320,
"eval_count": 64,
"done": True,
}
monkeypatch.setattr(
module.requests, "post", lambda *a, **kw: _FakeResp(fake_body)
)
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
# NIM 路徑必須完全沒被觸發(驗證 qwen3 確實是主路徑)
nim_called = {"v": False}
def _nim_should_not_be_called(*a, **kw):
nim_called["v"] = True
raise AssertionError("NIM 不應被呼叫qwen3 已成功")
monkeypatch.setattr(dispatcher, "_call_nim", _nim_should_not_be_called)
threats = [FakeThreat()]
result = dispatcher.dispatch(threats, hermes_stats={"duration_sec": 1.0})
assert nim_called["v"] is False, "qwen3 成功時 NIM 不可被觸發"
assert result["dispatched"] == 1
assert result["nim_stats"].get("provider") == "gcp_ollama"
assert result["nim_stats"].get("model") == module.NEMOTRON_OLLAMA_MODEL
assert calls and calls[0]["kind"] == "price_alert"
# ─────────────────────────────────────────────────────────────
# T2. qwen3 沒回 tool_calls 但 content 含 JSON list → fallback 解析
# ─────────────────────────────────────────────────────────────
def test_qwen3_content_only_fallback_parsing(monkeypatch):
"""qwen3 把工具呼叫塞在 contentlist[dict])→ _parse_content_fallback 應接住"""
import services.nemoton_dispatcher_service as module
_enable_qwen3_path(monkeypatch, module)
content_payload = (
'[{"name": "flag_for_human_review", '
'"parameters": {"sku": "SKU-Q1", "name": "qwen3 測試品", '
'"concern": "信心不足", "confidence": 0.45}}]'
)
fake_body = {
"message": {"role": "assistant", "content": content_payload, "tool_calls": []},
"prompt_eval_count": 100,
"eval_count": 30,
}
monkeypatch.setattr(module.requests, "post", lambda *a, **kw: _FakeResp(fake_body))
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
monkeypatch.setattr(
dispatcher, "_call_nim",
lambda threats: (_ for _ in ()).throw(AssertionError("NIM 不應被呼叫")),
)
result = dispatcher.dispatch([FakeThreat(confidence=0.45)], hermes_stats={"duration_sec": 1.0})
assert result["dispatched"] == 1
assert calls and calls[0]["kind"] == "human_review"
# ─────────────────────────────────────────────────────────────
# T3. tool_calls + content 同時存在 → 優先 tool_calls
# ─────────────────────────────────────────────────────────────
def test_qwen3_tool_calls_takes_precedence_over_content(monkeypatch):
import services.nemoton_dispatcher_service as module
_enable_qwen3_path(monkeypatch, module)
fake_body = {
"message": {
"role": "assistant",
"content": '[{"name": "flag_for_human_review", "parameters": {"sku": "X"}}]',
"tool_calls": [
{
"function": {
"name": "trigger_price_alert",
"arguments": {
"sku": "SKU-Q1",
"name": "qwen3 測試品",
"gap_pct": 22.4,
"sales_delta": -35.0,
"action": "降價",
"confidence": 0.85,
},
}
}
],
},
"prompt_eval_count": 200,
"eval_count": 40,
}
monkeypatch.setattr(module.requests, "post", lambda *a, **kw: _FakeResp(fake_body))
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
result = dispatcher.dispatch([FakeThreat()], hermes_stats={"duration_sec": 1.0})
assert result["dispatched"] == 1
assert calls[0]["kind"] == "price_alert", "tool_calls 結構必須優先於 content fallback"
# ─────────────────────────────────────────────────────────────
# T4. qwen3 連線失敗 → 不爆,自動 fallback 到 NIM
# ─────────────────────────────────────────────────────────────
def test_qwen3_connection_error_falls_back_to_nim(monkeypatch):
"""GCP Ollama 連不上時dispatcher 應靜默改走 NIM最終仍能 dispatch"""
import requests
import services.nemoton_dispatcher_service as module
_enable_qwen3_path(monkeypatch, module)
def _boom(*a, **kw):
raise requests.ConnectionError("GCP unreachable")
monkeypatch.setattr(module.requests, "post", _boom)
# NIM 路徑:給 valid key + quota且 mock _call_nim 回傳 1 個 tool_call
monkeypatch.setattr(module, "NIM_API_KEY", "fake-key")
monkeypatch.setattr(module, "_check_nim_quota", lambda: True)
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
nim_invoked = {"v": False}
def _fake_nim(threats):
nim_invoked["v"] = True
return (
[{
"tool": "trigger_price_alert",
"args": {
"sku": "SKU-Q1", "name": "qwen3 測試品",
"gap_pct": 22.4, "sales_delta": -35.0,
"action": "降價", "confidence": 0.85,
},
}],
{"total_tokens": 256, "quota_used": 5},
)
monkeypatch.setattr(dispatcher, "_call_nim", _fake_nim)
result = dispatcher.dispatch([FakeThreat()], hermes_stats={"duration_sec": 1.0})
assert nim_invoked["v"] is True, "qwen3 失敗後必須 fallback 至 NIM"
assert result["dispatched"] == 1
assert result["nim_stats"].get("total_tokens") == 256
assert calls[0]["kind"] == "price_alert"
# ─────────────────────────────────────────────────────────────
# T5. qwen3 + NIM 全失敗 → ADR-004 Hermes 規則引擎兜底
# ─────────────────────────────────────────────────────────────
def test_qwen3_and_nim_both_fail_falls_back_to_hermes_rules(monkeypatch):
"""雙路全爆時必須走 Hermes 規則引擎,並保留 🟡 [規則引擎] 標記"""
import requests
import services.nemoton_dispatcher_service as module
_enable_qwen3_path(monkeypatch, module)
monkeypatch.setattr(module.requests, "post",
lambda *a, **kw: (_ for _ in ()).throw(requests.ConnectionError("qwen3 down")))
monkeypatch.setattr(module, "NIM_API_KEY", "fake-key")
monkeypatch.setattr(module, "_check_nim_quota", lambda: True)
dispatcher = module.NemotronDispatcher()
# 攔 _call_nim 也擲 timeout
monkeypatch.setattr(
dispatcher, "_call_nim",
lambda threats: (_ for _ in ()).throw(requests.Timeout("NIM timeout")),
)
# 攔住規則引擎內部呼叫的 _exec_*,記錄 concern / reason 文字驗證 🟡 標記
# 規則引擎部分 _exec_* 用 positional argsline 787-795 _exec_trigger_price_alert
# 簽名: sku, name, gap_pct, sales_delta, action, confidence, ...
# record helper 必須把 positional 與 keyword 合併才能 .get('action')。
captured = []
def _merge_positional(name_order, args, kwargs):
merged = dict(kwargs)
for i, val in enumerate(args):
if i < len(name_order):
merged.setdefault(name_order[i], val)
return merged
def record_review(*args, **kwargs):
merged = _merge_positional(
['sku', 'name', 'concern', 'confidence', 'footprint',
'momo_price', 'comp_price', 'gap_pct', 'sales_delta',
'revenue_loss_7d', 'recommended_price'],
args, kwargs)
captured.append(("human_review", merged))
def record_alert(*args, **kwargs):
merged = _merge_positional(
['sku', 'name', 'gap_pct', 'sales_delta', 'action', 'confidence',
'momo_price', 'comp_price', 'footprint',
'revenue_loss_7d', 'recommended_price'],
args, kwargs)
captured.append(("price_alert", merged))
def record_reco(*args, **kwargs):
captured.append(("recommendation", kwargs))
monkeypatch.setattr(dispatcher, "_exec_flag_for_human_review", record_review)
monkeypatch.setattr(dispatcher, "_exec_trigger_price_alert", record_alert)
monkeypatch.setattr(dispatcher, "_exec_add_to_recommendation", record_reco)
# gap_pct=22.4 + risk=HIGH → 規則 2trigger_price_alertaction 應有 🟡 [規則引擎] 前綴
result = dispatcher.dispatch([FakeThreat()], hermes_stats={"duration_sec": 1.0})
assert result["nim_stats"].get("degraded") is True, "ADR-004 降級旗標必須存在"
assert captured, "規則引擎必須兜底執行至少一次"
kind, kwargs = captured[0]
assert kind == "price_alert"
assert "🟡 [規則引擎]" in kwargs.get("action", ""), \
"ADR-004 鐵律Hermes 規則引擎兜底時必須帶『🟡 [規則引擎]』標記"
# footprint 也應帶 🟡 [降級模式 ADR-004] 標記(給 Telegram 告警頭顯示)
assert "🟡 [降級模式 ADR-004]" in kwargs.get("footprint", "")
# ─────────────────────────────────────────────────────────────
# T6. feature flag 預設 false → 戰前行為qwen3 完全不被呼叫
# ─────────────────────────────────────────────────────────────
def test_flag_default_false_preserves_pre_war_behavior(monkeypatch):
"""NEMOTRON_OLLAMA_FIRST 預設 false 時dispatch 不應觸碰 GCP Ollama
nim_stats 不可帶 provider='gcp_ollama'"""
import services.nemoton_dispatcher_service as module
# 不打開 flag預設值但安全起見明確 set false
monkeypatch.setattr(module, "NEMOTRON_OLLAMA_FIRST", False)
# 任何呼叫 requests.post 都視為錯誤(戰前 NIM 路徑會被 _call_nim mock 接走)
qwen3_post_called = {"v": False}
def _maybe_post(*a, **kw):
qwen3_post_called["v"] = True
raise AssertionError("flag=false 時不可呼叫 GCP Ollama HTTP")
monkeypatch.setattr(module.requests, "post", _maybe_post)
monkeypatch.setattr(module, "NIM_API_KEY", "fake-key")
monkeypatch.setattr(module, "_check_nim_quota", lambda: True)
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
monkeypatch.setattr(
dispatcher, "_call_nim",
lambda threats: (
[{
"tool": "trigger_price_alert",
"args": {
"sku": "SKU-Q1", "name": "qwen3 測試品",
"gap_pct": 22.4, "sales_delta": -35.0,
"action": "降價", "confidence": 0.85,
},
}],
{"total_tokens": 100, "quota_used": 1},
),
)
result = dispatcher.dispatch([FakeThreat()], hermes_stats={"duration_sec": 1.0})
assert qwen3_post_called["v"] is False
assert result["dispatched"] == 1
assert result["nim_stats"].get("provider") in (None, "nim"), \
"flag=false 時 nim_stats 不應帶 provider='gcp_ollama'"
# ─────────────────────────────────────────────────────────────
# T7. 共用 helper 純單元測試OpenAI tool_calls schema 邊界)
# ─────────────────────────────────────────────────────────────
def test_parse_tool_calls_struct_handles_string_arguments():
"""NIM 回 arguments 是 JSON 字串、qwen3 回 dict — 兩者都得接住"""
from services.nemoton_dispatcher_service import _parse_tool_calls_struct
# NIM 風格arguments 是 JSON 字串)
nim_style = [{"function": {"name": "foo", "arguments": '{"a": 1, "b": "x"}'}}]
out_nim = _parse_tool_calls_struct(nim_style)
assert out_nim == [{"tool": "foo", "args": {"a": 1, "b": "x"}}]
# qwen3/Ollama 風格arguments 已是 dict
qwen_style = [{"function": {"name": "bar", "arguments": {"a": 2}}}]
out_qwen = _parse_tool_calls_struct(qwen_style)
assert out_qwen == [{"tool": "bar", "args": {"a": 2}}]
# 邊界:空 / 壞 JSON / 缺 name → 不爆,回空或忽略
assert _parse_tool_calls_struct([]) == []
assert _parse_tool_calls_struct(None) == []
bad = [{"function": {"name": "baz", "arguments": "{not json"}}]
out_bad = _parse_tool_calls_struct(bad)
assert out_bad == [{"tool": "baz", "args": {}}]
no_name = [{"function": {"arguments": "{}"}}]
assert _parse_tool_calls_struct(no_name) == []
def test_parse_content_fallback_handles_various_shapes():
from services.nemoton_dispatcher_service import _parse_content_fallback
# OpenAI 老風格 [{"name", "parameters"}]
out1 = _parse_content_fallback('[{"name": "foo", "parameters": {"a": 1}}]')
assert out1 == [{"tool": "foo", "args": {"a": 1}}]
# 帶 function 嵌套
out2 = _parse_content_fallback('[{"function": {"name": "bar"}, "arguments": "{\\"b\\": 2}"}]')
assert out2 == [{"tool": "bar", "args": {"b": 2}}]
# 非 list / 非 JSON / 空字串 → []
assert _parse_content_fallback("") == []
assert _parse_content_fallback("not json") == []
assert _parse_content_fallback('{"a":1}') == []

View File

@@ -0,0 +1,212 @@
"""
tests/test_openclaw_daily_template.py
─────────────────────────────────────────────────────────────────
Operation Ollama-First v5.0 / Phase 3 / A8 — 日報模板路由測試
驗證面:
T1. flag=false預設→ 走 _legacy_full_gemini_daily_reportregression
T2. flag=true → 走 _generate_daily_report_hermes_template
T3. _compute_daily_kpi 各 KPI 函數可獨立 mock 測DB 失敗回安全預設)
T4. _render_daily_template_v2 缺欄位優雅降級_SafeUndefined 不 raise
T5. _SafeUndefined 對 'X.Y.Z' 巢狀存取不爆
紀律:
- 不打真實 DB / Gemini API
- 不寫 ai_insights
- 不發 Telegram
"""
import os
from datetime import date, datetime
from unittest.mock import patch, MagicMock
import pytest
# ═══════════════════════════════════════════════════════════════════════════
# Fixtures
# ═══════════════════════════════════════════════════════════════════════════
@pytest.fixture(autouse=True)
def _reset_flag(monkeypatch):
"""每個 test 前清環境變數,避免互相污染"""
monkeypatch.delenv('OPENCLAW_DAILY_HERMES_TEMPLATE', raising=False)
yield
# ═══════════════════════════════════════════════════════════════════════════
# T1+T2 — Routing
# ═══════════════════════════════════════════════════════════════════════════
class TestRouting:
def test_flag_false_routes_to_legacy(self, monkeypatch):
"""flag=false → _legacy_full_gemini_daily_report 被呼叫"""
monkeypatch.setenv('OPENCLAW_DAILY_HERMES_TEMPLATE', 'false')
import importlib
import services.openclaw_strategist_service as svc
importlib.reload(svc)
legacy_called = {'v': False}
hermes_called = {'v': False}
def mock_legacy():
legacy_called['v'] = True
return {'status': 'ok', 'mode': 'legacy'}
def mock_hermes():
hermes_called['v'] = True
return {'status': 'ok', 'mode': 'hermes_template'}
monkeypatch.setattr(svc, '_legacy_full_gemini_daily_report', mock_legacy)
monkeypatch.setattr(svc, '_generate_daily_report_hermes_template', mock_hermes)
svc.generate_daily_report()
assert legacy_called['v'] is True, "flag=false 必須走 legacy 路徑"
assert hermes_called['v'] is False, "flag=false 不可走 hermes 模板"
def test_flag_true_routes_to_hermes_template(self, monkeypatch):
"""flag=true → _generate_daily_report_hermes_template 被呼叫"""
monkeypatch.setenv('OPENCLAW_DAILY_HERMES_TEMPLATE', 'true')
import importlib
import services.openclaw_strategist_service as svc
importlib.reload(svc)
legacy_called = {'v': False}
hermes_called = {'v': False}
monkeypatch.setattr(svc, '_legacy_full_gemini_daily_report',
lambda: legacy_called.update(v=True) or {'status': 'ok'})
monkeypatch.setattr(svc, '_generate_daily_report_hermes_template',
lambda: hermes_called.update(v=True) or {'status': 'ok'})
svc.generate_daily_report()
assert hermes_called['v'] is True, "flag=true 必須走 hermes 模板路徑"
assert legacy_called['v'] is False, "flag=true 不可走 legacy"
def test_flag_default_is_false(self, monkeypatch):
"""無 env 設定時 → 預設 false戰前行為"""
# 不 set env
import importlib
import services.openclaw_strategist_service as svc
importlib.reload(svc)
assert svc._daily_hermes_template_enabled() is False
# ═══════════════════════════════════════════════════════════════════════════
# T3 — KPI 計算DB 失敗安全降級)
# ═══════════════════════════════════════════════════════════════════════════
class TestKPIComputation:
def test_compute_daily_kpi_invalid_date_raises(self):
import services.openclaw_strategist_service as svc
with pytest.raises(TypeError):
svc._compute_daily_kpi("not-a-date")
def test_revenue_kpi_returns_safe_default_on_db_error(self, monkeypatch):
"""DB 異常時 _query_revenue_kpi 回零(不拋 exception"""
import services.openclaw_strategist_service as svc
class _BrokenSession:
def execute(self, *a, **kw):
raise RuntimeError('DB connection lost')
def close(self):
pass
monkeypatch.setattr(svc, 'get_session', lambda: _BrokenSession())
result = svc._query_revenue_kpi(date(2026, 5, 3))
assert result['today'] == 0.0
assert result['dod_pct'] == 0.0
assert result['wow_pct'] == 0.0
# ═══════════════════════════════════════════════════════════════════════════
# T4+T5 — Template 渲染與缺欄位優雅降級
# ═══════════════════════════════════════════════════════════════════════════
class TestTemplateRendering:
def test_render_with_full_context_succeeds(self):
import services.openclaw_strategist_service as svc
context = {
'today': '2026年05月02日',
'weekday': '週五',
'revenue': {
'today': 1234567.0,
'yesterday': 1100000.0,
'avg_7d': 1050000.0,
'dod_pct': 12.2,
'wow_pct': 17.6,
},
'orders': {
'today_rows': 234,
'today_sku': 187,
'avg_value_today': 5276.0,
},
'top_skus': [
{'name': 'SKU-A', 'qty': 50, 'revenue': 100000},
{'name': 'SKU-B', 'qty': 32, 'revenue': 80000},
],
'price_gaps': [
{'sku_name': '商品X', 'momo_price': 1200, 'comp_price': 980,
'gap_pct': 22.4, 'competitor': 'PChome'},
],
'inventory_alerts': [],
'priority_actions': ['對 SKU-A 啟動 EA 流程', '觀察 PChome 補貼'],
'gemini_insight': '今日營收強勁成長,建議加碼家電促銷檔期。',
}
rendered = svc._render_daily_template_v2(context)
assert '2026年05月02日' in rendered
assert '週五' in rendered
assert 'NT$1,234,567' in rendered
assert 'SKU-A' in rendered
assert '商品X' in rendered
assert 'PChome' in rendered
assert '今日營收強勁成長' in rendered
def test_render_with_missing_fields_does_not_raise(self):
"""_SafeUndefined 對缺欄位回 — 不拋 UndefinedError"""
import services.openclaw_strategist_service as svc
context = {
'today': '2026年05月02日',
'weekday': '週五',
'revenue': {'today': 0.0, 'dod_pct': 0.0, 'wow_pct': 0.0},
'orders': {}, # 空 dict
'top_skus': [],
'price_gaps': [],
'inventory_alerts': [],
'priority_actions': [],
'gemini_insight': '',
}
# 不 raise 即過
rendered = svc._render_daily_template_v2(context)
assert isinstance(rendered, str)
assert len(rendered) > 0
# 缺欄位該降級為 — 或預設值
assert '今日無熱銷資料' in rendered or '' in rendered
def test_safe_undefined_nested_access(self):
"""_SafeUndefined 對 'X.Y.Z' 巢狀存取不爆"""
import services.openclaw_strategist_service as svc
# 完全無 'revenue' 也不該 raise
context = {
'today': '2026年05月02日',
'weekday': '週五',
# 故意省略 revenue / orders / top_skus 等
}
rendered = svc._render_daily_template_v2(context)
assert isinstance(rendered, str)
assert '2026年05月02日' in rendered

View File

@@ -0,0 +1,286 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
tests/test_openclaw_qa_golden_set.py
OpenClaw Q&A 黃金集 A/B 對照框架
(Operation Ollama-First v5.0 — Phase 3, A7 fullstack-engineer)
目的:
在統帥盲測前,先建立 Ollama qwen3:14b vs Gemini 2.5 Flash 的「量化基線」。
10 題典型 momo 商業 Q&A雙模型各跑一次比對
- 簡體字污染數量A2 黃燈警訊核心)
- 回應長度
- 結構性指標(行數、列點數)
- 拒答訊號
- 黃金關鍵字命中率(題目自帶 expect_keywords
執行:
RUN_GOLDEN_SET=1 pytest tests/test_openclaw_qa_golden_set.py -v -s
# GCP 還沒拉 qwen3:14b 之前,預設 SKIP避免 CI 紅燈)
紀律:
- PII 紀律:題目/答案無真實 chat_id / username / 身份證 / 手機,全部去識別化
- 不對「正確性」做 hard assert本框架專做「品質量化基線」收集
- 報告印到 stdoutpytest -s 顯示),人工檢視,不卡 CI
"""
import json
import os
import sys
import time
from typing import Dict, List, Optional
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# ─────────────────────────────────────────────────────────────────────────────
# 啟用條件:須三條件齊備才實跑
# 1. RUN_GOLDEN_SET=1
# 2. OPENCLAW_QA_OLLAMA_HOST 可達
# 3. GEMINI_API_KEY 已設
# 否則 SKIP。
# ─────────────────────────────────────────────────────────────────────────────
def _ollama_reachable(host: str, timeout: float = 2.0) -> bool:
try:
import requests
r = requests.get(f"{host.rstrip('/')}/api/version", timeout=timeout)
return r.status_code == 200
except Exception:
return False
def _ollama_has_model(host: str, model: str, timeout: float = 3.0) -> bool:
"""檢查 Ollama 主機是否已 pull 指定模型。"""
try:
import requests
r = requests.get(f"{host.rstrip('/')}/api/tags", timeout=timeout)
if r.status_code != 200:
return False
tags = r.json().get('models', [])
return any(m.get('name', '').startswith(model.split(':')[0]) for m in tags)
except Exception:
return False
_RUN_GOLDEN = os.getenv('RUN_GOLDEN_SET', '0') == '1'
_HOST = os.getenv('OPENCLAW_QA_OLLAMA_HOST', os.getenv('OLLAMA_HOST_PRIMARY', 'http://34.143.170.20:11434'))
_MODEL = os.getenv('OPENCLAW_QA_OLLAMA_MODEL', 'qwen3:14b')
_HAS_GEMINI = bool(os.getenv('GEMINI_API_KEY'))
pytestmark = pytest.mark.skipif(
not _RUN_GOLDEN,
reason="黃金集需要 RUN_GOLDEN_SET=1 + GCP qwen3:14b ready + GEMINI_API_KEY統帥盲測前才跑",
)
# ─────────────────────────────────────────────────────────────────────────────
# 黃金集10 題;全部去 PII情境取自 momo-pro 真實 Telegram 互動模式)
# ─────────────────────────────────────────────────────────────────────────────
GOLDEN_SET: List[Dict] = [
{
"id": "g01_weekly_trend",
"question": "本週 momo 業績趨勢如何?跟上週比?",
"expect_keywords": ["業績", "", "成長"],
"category": "業績趨勢",
},
{
"id": "g02_competitor_threat",
"question": "PChome 最近在 3C 類有發動補貼戰嗎?對我們影響?",
"expect_keywords": ["PChome", "3C"],
"category": "競品威脅",
},
{
"id": "g03_pricing_strategy",
"question": "我有一支 SKU 比競品貴 8%,銷量持續下滑,該怎麼辦?",
"expect_keywords": ["定價", "競品"],
"category": "定價策略",
},
{
"id": "g04_seasonal",
"question": "母親節檔期快到了,建議哪些品類加碼?",
"expect_keywords": ["母親節", "品類"],
"category": "季節機會",
},
{
"id": "g05_command_routing",
"question": "我想看完整週報怎麼下指令?",
"expect_keywords": ["weekly", "週報"],
"category": "指令導引",
},
{
"id": "g06_top_threats",
"question": "目前 TOP 5 最緊急的競價威脅是哪些?",
"expect_keywords": ["威脅", "TOP"],
"category": "威脅清單",
},
{
"id": "g07_inventory_signal",
"question": "如何判斷某 SKU 該促銷出清?",
"expect_keywords": ["促銷", "出清"],
"category": "庫存決策",
},
{
"id": "g08_cross_category",
"question": "家電 vs 生活雜貨,哪個品類本月成長動能比較強?",
"expect_keywords": ["家電", "成長"],
"category": "品類比較",
},
{
"id": "g09_data_unavailable",
"question": "幫我看 2030 年的銷售預測。",
"expect_keywords": ["資料", "無法"], # 期待模型誠實回應「資料不足」而非編造
"category": "資料邊界",
},
{
"id": "g10_action_item",
"question": "綜合本週數據,給我 3 個 48 小時內必做行動。",
"expect_keywords": ["行動", "建議"],
"category": "行動清單",
},
]
# ─────────────────────────────────────────────────────────────────────────────
# Scoring helpers
# ─────────────────────────────────────────────────────────────────────────────
def _count_simplified(text: str) -> int:
"""重用 strategist service 的簡體字 hint 集合計數。"""
from services.openclaw_strategist_service import _SIMPLIFIED_HINT_CHARS
return sum(1 for c in (text or '') if c in _SIMPLIFIED_HINT_CHARS)
def _count_keyword_hits(text: str, keywords: List[str]) -> int:
if not text:
return 0
return sum(1 for kw in keywords if kw in text)
def _is_refusal(text: str) -> bool:
from services.openclaw_strategist_service import _REFUSAL_PATTERNS
return any(p in (text or '') for p in _REFUSAL_PATTERNS)
def _structure_score(text: str) -> Dict[str, int]:
"""結構性量化指標。"""
if not text:
return {"lines": 0, "bullets": 0, "tables": 0}
return {
"lines": text.count('\n') + 1,
# 條列符號粗略偵測(含中文「、」「,」開頭的列點)
"bullets": sum(text.count(s) for s in ('- ', '', '* ', '1.', '2.', '3.')),
"tables": text.count('|'),
}
def _score_response(qid: str, question: str, response: str, expect_kw: List[str]) -> Dict:
structure = _structure_score(response)
return {
"qid": qid,
"length": len(response or ''),
"simplified_count": _count_simplified(response),
"keyword_hits": _count_keyword_hits(response, expect_kw),
"is_refusal": _is_refusal(response),
"lines": structure["lines"],
"bullets": structure["bullets"],
"tables": structure["tables"],
"preview": (response or '')[:120].replace('\n', ' / '),
}
# ─────────────────────────────────────────────────────────────────────────────
# Caller wrappers (使用 service 的真實函式)
# ─────────────────────────────────────────────────────────────────────────────
def _call_ollama(question: str) -> Optional[str]:
from services.openclaw_strategist_service import _call_qwen3_qa
return _call_qwen3_qa(question, None, f"golden-{int(time.time())}")
def _call_gemini_baseline(question: str) -> Optional[str]:
from services.openclaw_strategist_service import _call_gemini
system_prompt = (
"你是 MOMO Pro 電商情報策略師「OpenClaw」。以繁體中文台灣用語回覆使用者。"
"嚴禁簡體字。回覆長度控制在 500 字內,可用 Markdown 條列。"
)
return _call_gemini(system_prompt, question, temperature=0.5, caller="openclaw_qa_golden")
# ─────────────────────────────────────────────────────────────────────────────
# Tests
# ─────────────────────────────────────────────────────────────────────────────
def test_environment_ready():
"""sanity check跑黃金集前確認 GCP host + model + Gemini key 都 ready。"""
assert _ollama_reachable(_HOST), f"Ollama 主機不可達:{_HOST}"
assert _ollama_has_model(_HOST, _MODEL), (
f"GCP Ollama 尚未拉 {_MODEL}(請於 Phase 8 由 A1 完成 ollama pull"
)
assert _HAS_GEMINI, "GEMINI_API_KEY 未設"
def test_golden_set_ab_comparison(capsys):
"""跑 10 題雙模型 A/B 對照,量化指標印到 stdout。
本測試不對「正確性」做 hard assert目的是給統帥盲測前的「品質量化基線」。
僅 hard assert
- 雙模型至少都有回應(非全 None
- Gemini baseline 簡體字數量 == 0baseline 不該污染)
"""
# 啟用 flag 讓 _call_qwen3_qa 走真實邏輯
os.environ['OPENCLAW_QA_OLLAMA_FIRST'] = 'true'
rows = []
for item in GOLDEN_SET:
qid = item['id']
question = item['question']
kws = item['expect_keywords']
ollama_resp = _call_ollama(question)
gemini_resp = _call_gemini_baseline(question)
rows.append({
'qid': qid,
'category': item['category'],
'question': question,
'ollama': _score_response(qid, question, ollama_resp or '', kws),
'gemini': _score_response(qid, question, gemini_resp or '', kws),
})
# 列印量化基線pytest -s 才看得到)
print("\n" + "=" * 100)
print("OpenClaw QA 黃金集 A/B 量化基線Ollama qwen3:14b vs Gemini 2.5 Flash")
print("=" * 100)
for r in rows:
print(f"\n[{r['qid']}] ({r['category']}) {r['question']}")
for side in ('ollama', 'gemini'):
s = r[side]
print(
f" {side:>7}: len={s['length']:>4} simp={s['simplified_count']:>2} "
f"kw={s['keyword_hits']}/{len(GOLDEN_SET[0]['expect_keywords'])} "
f"lines={s['lines']:>2} refusal={s['is_refusal']}"
)
print(f" preview: {s['preview']}")
# 匯出 JSON 給後續分析
out_path = os.path.join(os.path.dirname(__file__), 'logs', 'qa_golden_baseline.json')
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(out_path, 'w', encoding='utf-8') as f:
json.dump(rows, f, ensure_ascii=False, indent=2)
print(f"\n基線已存:{out_path}")
# Hard assertions最少安全網
ollama_responded = sum(1 for r in rows if r['ollama']['length'] > 0)
gemini_responded = sum(1 for r in rows if r['gemini']['length'] > 0)
assert ollama_responded >= 8, f"Ollama 回應率過低:{ollama_responded}/10"
assert gemini_responded >= 9, f"Gemini 回應率過低:{gemini_responded}/10"
# Gemini baseline 不該有簡體污染(用以驗證測量本身正確)
for r in rows:
assert r['gemini']['simplified_count'] == 0, (
f"Gemini baseline 簡體污染(指標可能誤判):{r['qid']} {r['gemini']['preview']}"
)

View File

@@ -0,0 +1,358 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
tests/test_openclaw_qa_routing.py
OpenClaw Q&A 路由 + 品質守門 unit tests
(Operation Ollama-First v5.0 — Phase 3, A7 fullstack-engineer)
涵蓋:
- feature flag OPENCLAW_QA_OLLAMA_FIRST=false → 走 Gemini-firstregression test
- flag=true + 高品質 Ollama 回應 → 直接回 Ollama 結果,不走 Gemini
- flag=true + 低品質 Ollama 回應 → 升級至 Gemini並標 fallback_to=openclaw_qa_gemini_fallback
- flag=true + Ollama 呼叫失敗 → 升級至 Gemini
- _is_low_quality_response 各規則:空字串 / 長度過短 / 簡體污染 / 拒答 / 流水帳
執行:
pytest tests/test_openclaw_qa_routing.py -v
"""
import os
import sys
import time
from typing import Any, Dict, Optional
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import services.openclaw_strategist_service as svc
import services.ai_call_logger as logger_mod
from services.ai_call_logger import _reset_kill_switch
# ─────────────────────────────────────────────────────────────────────────────
# Fixtures
# ─────────────────────────────────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def reset_state(monkeypatch):
"""每個測試重置 logger kill-switch + stub DB 寫入收集 ai_calls 紀錄。"""
_reset_kill_switch()
captured = []
def fake_write(state):
captured.append({
'caller': state.caller,
'provider': state.provider,
'model': state.model,
'status': state.status,
'fallback_to': state.fallback_to,
'error': state.error,
'meta': dict(state.meta),
'request_id': state.request_id,
})
monkeypatch.setattr(logger_mod, '_write_to_db', fake_write)
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
# 預設 flag=false戰前行為
monkeypatch.delenv('OPENCLAW_QA_OLLAMA_FIRST', raising=False)
yield captured
def _wait_async(captured, n=1, timeout=2.0):
deadline = time.time() + timeout
while time.time() < deadline:
if len(captured) >= n:
return True
time.sleep(0.01)
return False
# ─────────────────────────────────────────────────────────────────────────────
# 1. _is_low_quality_response 純函式規則
# ─────────────────────────────────────────────────────────────────────────────
class TestLowQualityRules:
def test_empty_string_is_low_quality(self):
assert svc._is_low_quality_response("") is True
assert svc._is_low_quality_response(None) is True
assert svc._is_low_quality_response(" \n ") is True
def test_too_short_is_low_quality(self):
# 長度 < 50 字元 → 低品質
assert svc._is_low_quality_response("你好,我是 OpenClaw") is True
def test_acceptable_response_passes(self):
good = (
"本週 momo 業績較上週成長 12%,主要受惠於家電與生活雜貨。\n"
"建議:持續關注 PChome 競價動態,必要時調整定價策略。\n"
"預估下週 momo 仍有 5-8% 成長空間。"
)
assert svc._is_low_quality_response(good) is False
def test_simplified_pollution_detected(self):
# 句中含 >= 3 個簡體字 hint → 低品質Qwen 繁中短板核心檢查)
polluted = (
"本周业绩比上周增长,您可以关注这个产品的价格变动趋势,"
"我们建议处理掉滞销库存以提高资产效率"
)
assert svc._is_low_quality_response(polluted) is True
def test_two_simplified_chars_still_acceptable(self):
# 只有 2 個簡體 hint 字(边界以下)+ 結構良好 → 仍可接受
# (避免過度敏感誤殺正常繁體回覆中混入零星簡體字的情境)
text = (
"本週 momo 业绩成長明顯,建議持續關注競品動向。\n"
"重點品類家電、3C、生活雜貨。\n"
"下週可加碼促銷檔期。"
)
assert svc._is_low_quality_response(text) is False
def test_refusal_pattern_detected(self):
for refusal in ['無法回答', '我不知道', '抱歉,我無法協助']:
text = f"關於這個問題,{refusal},請改問其他內容以便我協助您。"
assert svc._is_low_quality_response(text) is True, f"應被判定為拒答:{refusal}"
def test_flowing_text_no_breaks_is_low_quality(self):
# 200+ 字無換行 → 流水帳
text = "本週業績整體呈現上升趨勢。" * 20 # ~200+ 字
assert "\n" not in text
assert len(text) > 200
assert svc._is_low_quality_response(text) is True
def test_long_text_with_breaks_is_acceptable(self):
# 200+ 字但有適度斷行 → 結構良好
text = (
"本週業績整體呈現上升趨勢,主要驅動類別為家電與生活雜貨大類別。\n"
"競品動向PChome 在 3C 類發動大規模補貼戰,預估壓縮我方 3 至 5 個百分點毛利率。\n"
"蝦皮也在母嬰用品加碼免運券促銷,需密切觀察跟降節奏,避免市占下滑。\n"
"建議行動:(1) 加碼家電促銷檔期,重點操作大尺寸電視與廚房家電,"
"(2) 觀察 PChome 補貼是否延續至下週,準備二段反擊方案,"
"(3) 對價差大於 5% 的 SKU 主動啟動 EA 流程,避免毛利持續流失。"
)
assert len(text) > 200
assert svc._is_low_quality_response(text) is False
# ─────────────────────────────────────────────────────────────────────────────
# 2. Routingfeature flag = false 時維持 Gemini-first 路徑regression
# ─────────────────────────────────────────────────────────────────────────────
class TestFlagOff:
def test_flag_false_routes_to_legacy(self, monkeypatch, reset_state):
"""flag=false預設→ 不應該呼叫 _call_qwen3_qa直接走 _legacy_gemini_first_qa。"""
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', 'false')
legacy_called = {'count': 0}
ollama_called = {'count': 0}
def fake_legacy(q, ctx, request_id=None):
legacy_called['count'] += 1
return "[legacy gemini reply]"
def fake_ollama(q, ctx, rid):
ollama_called['count'] += 1
return "[should not be called]"
monkeypatch.setattr(svc, '_legacy_gemini_first_qa', fake_legacy)
monkeypatch.setattr(svc, '_call_qwen3_qa', fake_ollama)
result = svc.generate_strategy_response("本週業績如何?")
assert result == "[legacy gemini reply]"
assert legacy_called['count'] == 1
assert ollama_called['count'] == 0
def test_flag_unset_defaults_to_off(self, monkeypatch, reset_state):
"""環境變數完全未設 → 預設 false → 走 legacy。"""
monkeypatch.delenv('OPENCLAW_QA_OLLAMA_FIRST', raising=False)
legacy_called = {'count': 0}
def fake_legacy(q, ctx, request_id=None):
legacy_called['count'] += 1
return "[legacy reply]"
monkeypatch.setattr(svc, '_legacy_gemini_first_qa', fake_legacy)
# 不 stub _call_qwen3_qa如果意外被呼叫會打到真網路 → fail
result = svc.generate_strategy_response("競品分析")
assert legacy_called['count'] == 1
assert result == "[legacy reply]"
def test_empty_query_short_circuits(self, monkeypatch, reset_state):
"""空 query 不應觸發任何 LLM 呼叫。"""
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', 'true')
legacy_called = {'count': 0}
ollama_called = {'count': 0}
monkeypatch.setattr(svc, '_legacy_gemini_first_qa',
lambda q, ctx, request_id=None: legacy_called.update({'count': legacy_called['count']+1}) or "")
monkeypatch.setattr(svc, '_call_qwen3_qa',
lambda q, ctx, rid: ollama_called.update({'count': ollama_called['count']+1}) or "")
out = svc.generate_strategy_response("")
assert "請輸入您的問題" in out
assert legacy_called['count'] == 0
assert ollama_called['count'] == 0
# ─────────────────────────────────────────────────────────────────────────────
# 3. Routingfeature flag = true + Ollama 高/低品質
# ─────────────────────────────────────────────────────────────────────────────
class TestFlagOn:
def test_flag_true_high_quality_returns_ollama(self, monkeypatch, reset_state):
"""flag=true + Ollama 回高品質 → 直接回 Ollama不走 Gemini。"""
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', 'true')
legacy_called = {'count': 0}
good_reply = (
"本週 momo 業績成長 12%,主要驅動類別為家電。\n"
"建議:持續關注 PChome 競價並加碼家電促銷檔期。\n"
"下週預估仍有 5-8% 成長空間。"
)
monkeypatch.setattr(svc, '_call_qwen3_qa', lambda q, ctx, rid: good_reply)
def fake_legacy(q, ctx, request_id=None):
legacy_called['count'] += 1
return "[gemini fallback]"
monkeypatch.setattr(svc, '_legacy_gemini_first_qa', fake_legacy)
out = svc.generate_strategy_response("本週業績如何?")
assert out == good_reply
assert legacy_called['count'] == 0 # Gemini 沒被呼叫
def test_flag_true_low_quality_falls_back_to_gemini(self, monkeypatch, reset_state):
"""flag=true + Ollama 回低品質(簡體污染)→ fallback Gemini。"""
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', 'true')
bad_reply = "本周业绩增长,您可以关注这个产品的价格变动,我们建议处理库存"
legacy_called = {'count': 0}
monkeypatch.setattr(svc, '_call_qwen3_qa', lambda q, ctx, rid: bad_reply)
def fake_legacy(q, ctx, request_id=None):
legacy_called['count'] += 1
return "[gemini high quality reply]"
monkeypatch.setattr(svc, '_legacy_gemini_first_qa', fake_legacy)
out = svc.generate_strategy_response("本週業績如何?")
assert out == "[gemini high quality reply]"
assert legacy_called['count'] == 1
def test_flag_true_ollama_returns_none_falls_back(self, monkeypatch, reset_state):
"""flag=true + Ollama 呼叫失敗(回 None→ fallback Gemini。"""
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', 'true')
legacy_called = {'count': 0}
monkeypatch.setattr(svc, '_call_qwen3_qa', lambda q, ctx, rid: None)
def fake_legacy(q, ctx, request_id=None):
legacy_called['count'] += 1
return "[gemini reply after ollama down]"
monkeypatch.setattr(svc, '_legacy_gemini_first_qa', fake_legacy)
out = svc.generate_strategy_response("test")
assert out == "[gemini reply after ollama down]"
assert legacy_called['count'] == 1
# ─────────────────────────────────────────────────────────────────────────────
# 4. _call_qwen3_qa: ai_call_logger 整合 + fallback_to 標記
# ─────────────────────────────────────────────────────────────────────────────
class TestCallQwen3Telemetry:
def test_qwen3_logs_ok_status_on_success(self, monkeypatch, reset_state):
"""高品質回應 → ai_calls 應記 status=ok, caller=openclaw_qa, provider=gcp_ollama"""
captured = reset_state
class FakeResp:
status_code = 200
def raise_for_status(self): pass
def json(self):
return {
'response': '本週 momo 業績成長 12%,建議加碼家電促銷。',
'prompt_eval_count': 150,
'eval_count': 60,
}
monkeypatch.setattr(svc.requests, 'post', lambda *a, **kw: FakeResp())
result = svc._call_qwen3_qa("本週業績?", None, "qa-test123")
assert result is not None
assert "業績成長" in result
assert _wait_async(captured, 1)
assert len(captured) == 1
rec = captured[0]
assert rec['caller'] == 'openclaw_qa'
assert rec['provider'] == 'gcp_ollama'
assert rec['model'] == svc.OPENCLAW_QA_OLLAMA_MODEL
assert rec['status'] == 'ok'
assert rec['fallback_to'] is None
assert rec['meta'].get('flag') == 'OPENCLAW_QA_OLLAMA_FIRST'
assert rec['request_id'] == "qa-test123"
def test_qwen3_logs_fallback_on_exception(self, monkeypatch, reset_state):
"""Ollama 連線失敗 → ai_calls 應記 fallback_to=openclaw_qa_gemini_fallback + status=fallback"""
captured = reset_state
def boom(*a, **kw):
raise svc.requests.ConnectionError("connection refused")
monkeypatch.setattr(svc.requests, 'post', boom)
result = svc._call_qwen3_qa("test", None, "qa-fail123")
assert result is None
assert _wait_async(captured, 1)
rec = captured[0]
assert rec['status'] == 'fallback'
assert rec['fallback_to'] == 'openclaw_qa_gemini_fallback'
assert rec['error'] is not None
assert 'ConnectionError' in rec['error']
def test_qwen3_logs_fallback_on_empty_response(self, monkeypatch, reset_state):
"""Ollama 回空 response → 視為 empty_response標 fallback。"""
captured = reset_state
class FakeResp:
status_code = 200
def raise_for_status(self): pass
def json(self):
return {'response': '', 'prompt_eval_count': 100, 'eval_count': 0}
monkeypatch.setattr(svc.requests, 'post', lambda *a, **kw: FakeResp())
result = svc._call_qwen3_qa("test", None, "qa-empty")
assert result is None
assert _wait_async(captured, 1)
rec = captured[0]
assert rec['status'] == 'fallback'
assert rec['fallback_to'] == 'openclaw_qa_gemini_fallback'
assert rec['error'] == 'empty_response'
# ─────────────────────────────────────────────────────────────────────────────
# 5. 環境變數讀取即時性runtime toggle
# ─────────────────────────────────────────────────────────────────────────────
class TestRuntimeToggle:
def test_flag_helper_reads_env_each_call(self, monkeypatch):
"""_qa_ollama_first_enabled() 應每次重讀 env允許 runtime 灰度切換。"""
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', 'false')
assert svc._qa_ollama_first_enabled() is False
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', 'true')
assert svc._qa_ollama_first_enabled() is True
# 各種真值字串
for v in ('TRUE', 'True', '1', 'yes', 'on'):
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', v)
assert svc._qa_ollama_first_enabled() is True, f"應視為 true: {v!r}"
for v in ('false', '0', 'no', 'off', '', 'foo'):
monkeypatch.setenv('OPENCLAW_QA_OLLAMA_FIRST', v)
assert svc._qa_ollama_first_enabled() is False, f"應視為 false: {v!r}"