Files
ewoooc/services/market_intel/mcp_readiness.py
OoO 921e9eeb15
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
feat(market-intel): gate manual fetch behind mcp readiness
2026-05-18 15:40:56 +08:00

261 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報 MCP 整合就緒度 preview。
只檢查 MCP 設計、feature flag、tool registry、server health 與 telemetry 表狀態;
預設不連線、不寫 DB、不呼叫外部工具。
"""
from sqlalchemy import create_engine, inspect, text
from services.market_intel.mcp_contract import (
EXPECTED_MARKET_INTEL_TOOLS,
build_mcp_tool_contract_preview,
)
EXPECTED_EXTERNAL_SERVERS = ("postgres", "omnisearch", "firecrawl", "filesystem")
def _planned_server_statuses(base_hosts):
return [
{
"server": server,
"base_url": base_hosts.get(server),
"configured": bool(base_hosts.get(server)),
"health_checked": False,
"healthy": False,
"status": "planned_no_health_check",
"error_message": None,
}
for server in EXPECTED_EXTERNAL_SERVERS
]
def _health_check_servers(base_hosts, timeout_sec):
import requests
statuses = []
for server in EXPECTED_EXTERNAL_SERVERS:
base_url = base_hosts.get(server)
status = {
"server": server,
"base_url": base_url,
"configured": bool(base_url),
"health_checked": False,
"healthy": False,
"status": "not_configured" if not base_url else "error",
"error_message": None,
}
if not base_url:
statuses.append(status)
continue
try:
response = requests.get(
f"{base_url.rstrip('/')}/health",
timeout=timeout_sec,
)
status["health_checked"] = True
status["healthy"] = response.status_code == 200
status["status"] = "healthy" if status["healthy"] else f"http_{response.status_code}"
except Exception as exc:
status["health_checked"] = True
status["status"] = "error"
status["error_message"] = str(exc)[:240]
statuses.append(status)
return statuses
def _build_telemetry_status(*, execute_requested, engine=None, database_url=None, database_type=None):
if not execute_requested:
return {
"mode": "mcp_telemetry_planned",
"table": "mcp_calls",
"read_only_query_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"table_exists": False,
"total_calls": None,
"recent_24h_calls": None,
"server_counts": [],
}
from config import DATABASE_PATH, DATABASE_TYPE
effective_database_url = database_url or DATABASE_PATH
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
created_engine = False
connection_opened = False
try:
if engine is None:
connect_args = {}
if effective_database_type == "postgresql":
connect_args = {
"connect_timeout": 8,
"options": "-c statement_timeout=15000",
}
engine = create_engine(
effective_database_url,
isolation_level="AUTOCOMMIT",
pool_pre_ping=True,
connect_args=connect_args,
)
created_engine = True
table_exists = inspect(engine).has_table("mcp_calls")
total_calls = None
recent_24h_calls = None
server_counts = []
with engine.connect() as conn:
connection_opened = True
if table_exists:
total_calls = conn.execute(text("SELECT COUNT(*) FROM mcp_calls")).scalar()
if effective_database_type == "postgresql":
recent_24h_calls = conn.execute(
text("SELECT COUNT(*) FROM mcp_calls WHERE called_at >= NOW() - INTERVAL '24 hours'")
).scalar()
else:
recent_24h_calls = total_calls
server_counts = [
{"server": row[0], "calls": int(row[1])}
for row in conn.execute(
text(
"""
SELECT server, COUNT(*) AS calls
FROM mcp_calls
GROUP BY server
ORDER BY calls DESC
LIMIT 10
"""
)
).fetchall()
]
return {
"mode": "mcp_telemetry_read_only",
"table": "mcp_calls",
"read_only_query_executed": True,
"database_connection_opened": connection_opened,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"table_exists": bool(table_exists),
"total_calls": int(total_calls or 0) if table_exists else None,
"recent_24h_calls": int(recent_24h_calls or 0) if table_exists else None,
"server_counts": server_counts,
}
except Exception as exc:
return {
"mode": "mcp_telemetry_error",
"table": "mcp_calls",
"read_only_query_executed": False,
"database_connection_opened": connection_opened,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"table_exists": False,
"total_calls": None,
"recent_24h_calls": None,
"server_counts": [],
"error_message": str(exc)[:400],
}
finally:
if created_engine:
engine.dispose()
def build_mcp_readiness_plan(
*,
execute_requested=False,
timeout_sec=3,
engine=None,
database_url=None,
database_type=None,
):
"""建立市場情報 MCP readiness預設不做 health check / DB query。"""
from services.mcp_router import MCP_BASE_HOSTS, TOOL_REGISTRY, is_mcp_router_enabled
execute_requested = bool(execute_requested)
timeout_sec = max(1, min(int(timeout_sec or 3), 10))
router_enabled = bool(is_mcp_router_enabled())
server_statuses = (
_health_check_servers(MCP_BASE_HOSTS, timeout_sec)
if execute_requested
else _planned_server_statuses(MCP_BASE_HOSTS)
)
telemetry = _build_telemetry_status(
execute_requested=execute_requested,
engine=engine,
database_url=database_url,
database_type=database_type,
)
registered_callers = sorted(TOOL_REGISTRY.keys())
market_intel_tools = TOOL_REGISTRY.get("market_intel", {})
tool_contract = build_mcp_tool_contract_preview(TOOL_REGISTRY)
market_intel_tool_count = int(tool_contract["tool_count"])
external_mcp_complete = bool(
router_enabled
and execute_requested
and server_statuses
and all(item["healthy"] for item in server_statuses)
)
internal_mcp_complete = bool(
tool_contract["contract_ready"]
and telemetry.get("table_exists")
)
market_intel_mcp_integrated = bool(tool_contract["contract_ready"])
readiness_checks = {
"mcp_router_module_present": True,
"mcp_router_enabled": router_enabled,
"external_server_health_checked": execute_requested,
"external_servers_all_healthy": all(item["healthy"] for item in server_statuses),
"mcp_calls_table_exists": bool(telemetry.get("table_exists")),
"base_callers_registered": {"mcp_collector", "hermes_analyst", "openclaw_strategist"} <= set(registered_callers),
"market_intel_caller_registered": bool(
tool_contract["checks"]["market_intel_caller_registered"]
),
"market_intel_tools_registered": bool(
tool_contract["checks"]["all_router_tools_whitelisted"]
),
"market_intel_tool_contract_ready": bool(tool_contract["contract_ready"]),
}
blocked_reasons = [
key for key, passed in readiness_checks.items()
if not passed
]
if not execute_requested:
blocked_reasons.insert(0, "execute_false_planned_only")
return {
"mode": "mcp_readiness_read_only" if execute_requested else "mcp_readiness_planned",
"execute_requested": execute_requested,
"router_enabled": router_enabled,
"external_mcp_complete": external_mcp_complete,
"internal_mcp_complete": internal_mcp_complete,
"market_intel_mcp_integrated": market_intel_mcp_integrated,
"server_statuses": server_statuses,
"registered_callers": registered_callers,
"market_intel_tools": market_intel_tools,
"market_intel_tool_count": market_intel_tool_count,
"expected_market_intel_tools": list(EXPECTED_MARKET_INTEL_TOOLS),
"mcp_tool_contract": tool_contract,
"telemetry": telemetry,
"readiness_checks": readiness_checks,
"database_session_created": False,
"database_write_executed": False,
"database_commit_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"writes_executed": False,
"would_write_database": False,
"blocked_reasons": blocked_reasons,
"next_required_steps": [
"先通過 /api/market_intel/mcp_deploy_preflight 的 env、compose、port 與 fallback 檢查",
"部署並健康檢查 docker-compose.mcp.yml 的 postgres / omnisearch / firecrawl / filesystem",
"四個 MCP health endpoint 全部 200 後,才在正式環境設定 MCP_ROUTER_ENABLED=true",
"人工 fetch 必須先通過 /api/market_intel/mcp_fetch_gate再允許公開頁面限速探測",
],
}