261 lines
9.7 KiB
Python
261 lines
9.7 KiB
Python
"""市場情報 MCP 整合就緒度 preview。
|
||
|
||
只檢查 MCP 設計、feature flag、tool registry、server health 與 telemetry 表狀態;
|
||
預設不連線、不寫 DB、不呼叫外部工具。
|
||
"""
|
||
|
||
from sqlalchemy import create_engine, inspect, text
|
||
|
||
from services.market_intel.mcp_contract import (
|
||
EXPECTED_MARKET_INTEL_TOOLS,
|
||
build_mcp_tool_contract_preview,
|
||
)
|
||
|
||
|
||
EXPECTED_EXTERNAL_SERVERS = ("postgres", "omnisearch", "firecrawl", "filesystem")
|
||
|
||
|
||
def _planned_server_statuses(base_hosts):
|
||
return [
|
||
{
|
||
"server": server,
|
||
"base_url": base_hosts.get(server),
|
||
"configured": bool(base_hosts.get(server)),
|
||
"health_checked": False,
|
||
"healthy": False,
|
||
"status": "planned_no_health_check",
|
||
"error_message": None,
|
||
}
|
||
for server in EXPECTED_EXTERNAL_SERVERS
|
||
]
|
||
|
||
|
||
def _health_check_servers(base_hosts, timeout_sec):
|
||
import requests
|
||
|
||
statuses = []
|
||
for server in EXPECTED_EXTERNAL_SERVERS:
|
||
base_url = base_hosts.get(server)
|
||
status = {
|
||
"server": server,
|
||
"base_url": base_url,
|
||
"configured": bool(base_url),
|
||
"health_checked": False,
|
||
"healthy": False,
|
||
"status": "not_configured" if not base_url else "error",
|
||
"error_message": None,
|
||
}
|
||
if not base_url:
|
||
statuses.append(status)
|
||
continue
|
||
try:
|
||
response = requests.get(
|
||
f"{base_url.rstrip('/')}/health",
|
||
timeout=timeout_sec,
|
||
)
|
||
status["health_checked"] = True
|
||
status["healthy"] = response.status_code == 200
|
||
status["status"] = "healthy" if status["healthy"] else f"http_{response.status_code}"
|
||
except Exception as exc:
|
||
status["health_checked"] = True
|
||
status["status"] = "error"
|
||
status["error_message"] = str(exc)[:240]
|
||
statuses.append(status)
|
||
return statuses
|
||
|
||
|
||
def _build_telemetry_status(*, execute_requested, engine=None, database_url=None, database_type=None):
|
||
if not execute_requested:
|
||
return {
|
||
"mode": "mcp_telemetry_planned",
|
||
"table": "mcp_calls",
|
||
"read_only_query_executed": False,
|
||
"database_connection_opened": False,
|
||
"database_session_created": False,
|
||
"database_write_executed": False,
|
||
"database_commit_executed": False,
|
||
"table_exists": False,
|
||
"total_calls": None,
|
||
"recent_24h_calls": None,
|
||
"server_counts": [],
|
||
}
|
||
|
||
from config import DATABASE_PATH, DATABASE_TYPE
|
||
|
||
effective_database_url = database_url or DATABASE_PATH
|
||
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
|
||
created_engine = False
|
||
connection_opened = False
|
||
try:
|
||
if engine is None:
|
||
connect_args = {}
|
||
if effective_database_type == "postgresql":
|
||
connect_args = {
|
||
"connect_timeout": 8,
|
||
"options": "-c statement_timeout=15000",
|
||
}
|
||
engine = create_engine(
|
||
effective_database_url,
|
||
isolation_level="AUTOCOMMIT",
|
||
pool_pre_ping=True,
|
||
connect_args=connect_args,
|
||
)
|
||
created_engine = True
|
||
|
||
table_exists = inspect(engine).has_table("mcp_calls")
|
||
total_calls = None
|
||
recent_24h_calls = None
|
||
server_counts = []
|
||
with engine.connect() as conn:
|
||
connection_opened = True
|
||
if table_exists:
|
||
total_calls = conn.execute(text("SELECT COUNT(*) FROM mcp_calls")).scalar()
|
||
if effective_database_type == "postgresql":
|
||
recent_24h_calls = conn.execute(
|
||
text("SELECT COUNT(*) FROM mcp_calls WHERE called_at >= NOW() - INTERVAL '24 hours'")
|
||
).scalar()
|
||
else:
|
||
recent_24h_calls = total_calls
|
||
server_counts = [
|
||
{"server": row[0], "calls": int(row[1])}
|
||
for row in conn.execute(
|
||
text(
|
||
"""
|
||
SELECT server, COUNT(*) AS calls
|
||
FROM mcp_calls
|
||
GROUP BY server
|
||
ORDER BY calls DESC
|
||
LIMIT 10
|
||
"""
|
||
)
|
||
).fetchall()
|
||
]
|
||
|
||
return {
|
||
"mode": "mcp_telemetry_read_only",
|
||
"table": "mcp_calls",
|
||
"read_only_query_executed": True,
|
||
"database_connection_opened": connection_opened,
|
||
"database_session_created": False,
|
||
"database_write_executed": False,
|
||
"database_commit_executed": False,
|
||
"table_exists": bool(table_exists),
|
||
"total_calls": int(total_calls or 0) if table_exists else None,
|
||
"recent_24h_calls": int(recent_24h_calls or 0) if table_exists else None,
|
||
"server_counts": server_counts,
|
||
}
|
||
except Exception as exc:
|
||
return {
|
||
"mode": "mcp_telemetry_error",
|
||
"table": "mcp_calls",
|
||
"read_only_query_executed": False,
|
||
"database_connection_opened": connection_opened,
|
||
"database_session_created": False,
|
||
"database_write_executed": False,
|
||
"database_commit_executed": False,
|
||
"table_exists": False,
|
||
"total_calls": None,
|
||
"recent_24h_calls": None,
|
||
"server_counts": [],
|
||
"error_message": str(exc)[:400],
|
||
}
|
||
finally:
|
||
if created_engine:
|
||
engine.dispose()
|
||
|
||
|
||
def build_mcp_readiness_plan(
|
||
*,
|
||
execute_requested=False,
|
||
timeout_sec=3,
|
||
engine=None,
|
||
database_url=None,
|
||
database_type=None,
|
||
):
|
||
"""建立市場情報 MCP readiness;預設不做 health check / DB query。"""
|
||
from services.mcp_router import MCP_BASE_HOSTS, TOOL_REGISTRY, is_mcp_router_enabled
|
||
|
||
execute_requested = bool(execute_requested)
|
||
timeout_sec = max(1, min(int(timeout_sec or 3), 10))
|
||
router_enabled = bool(is_mcp_router_enabled())
|
||
server_statuses = (
|
||
_health_check_servers(MCP_BASE_HOSTS, timeout_sec)
|
||
if execute_requested
|
||
else _planned_server_statuses(MCP_BASE_HOSTS)
|
||
)
|
||
telemetry = _build_telemetry_status(
|
||
execute_requested=execute_requested,
|
||
engine=engine,
|
||
database_url=database_url,
|
||
database_type=database_type,
|
||
)
|
||
|
||
registered_callers = sorted(TOOL_REGISTRY.keys())
|
||
market_intel_tools = TOOL_REGISTRY.get("market_intel", {})
|
||
tool_contract = build_mcp_tool_contract_preview(TOOL_REGISTRY)
|
||
market_intel_tool_count = int(tool_contract["tool_count"])
|
||
external_mcp_complete = bool(
|
||
router_enabled
|
||
and execute_requested
|
||
and server_statuses
|
||
and all(item["healthy"] for item in server_statuses)
|
||
)
|
||
internal_mcp_complete = bool(
|
||
tool_contract["contract_ready"]
|
||
and telemetry.get("table_exists")
|
||
)
|
||
market_intel_mcp_integrated = bool(tool_contract["contract_ready"])
|
||
|
||
readiness_checks = {
|
||
"mcp_router_module_present": True,
|
||
"mcp_router_enabled": router_enabled,
|
||
"external_server_health_checked": execute_requested,
|
||
"external_servers_all_healthy": all(item["healthy"] for item in server_statuses),
|
||
"mcp_calls_table_exists": bool(telemetry.get("table_exists")),
|
||
"base_callers_registered": {"mcp_collector", "hermes_analyst", "openclaw_strategist"} <= set(registered_callers),
|
||
"market_intel_caller_registered": bool(
|
||
tool_contract["checks"]["market_intel_caller_registered"]
|
||
),
|
||
"market_intel_tools_registered": bool(
|
||
tool_contract["checks"]["all_router_tools_whitelisted"]
|
||
),
|
||
"market_intel_tool_contract_ready": bool(tool_contract["contract_ready"]),
|
||
}
|
||
blocked_reasons = [
|
||
key for key, passed in readiness_checks.items()
|
||
if not passed
|
||
]
|
||
if not execute_requested:
|
||
blocked_reasons.insert(0, "execute_false_planned_only")
|
||
|
||
return {
|
||
"mode": "mcp_readiness_read_only" if execute_requested else "mcp_readiness_planned",
|
||
"execute_requested": execute_requested,
|
||
"router_enabled": router_enabled,
|
||
"external_mcp_complete": external_mcp_complete,
|
||
"internal_mcp_complete": internal_mcp_complete,
|
||
"market_intel_mcp_integrated": market_intel_mcp_integrated,
|
||
"server_statuses": server_statuses,
|
||
"registered_callers": registered_callers,
|
||
"market_intel_tools": market_intel_tools,
|
||
"market_intel_tool_count": market_intel_tool_count,
|
||
"expected_market_intel_tools": list(EXPECTED_MARKET_INTEL_TOOLS),
|
||
"mcp_tool_contract": tool_contract,
|
||
"telemetry": telemetry,
|
||
"readiness_checks": readiness_checks,
|
||
"database_session_created": False,
|
||
"database_write_executed": False,
|
||
"database_commit_executed": False,
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"writes_executed": False,
|
||
"would_write_database": False,
|
||
"blocked_reasons": blocked_reasons,
|
||
"next_required_steps": [
|
||
"先通過 /api/market_intel/mcp_deploy_preflight 的 env、compose、port 與 fallback 檢查",
|
||
"部署並健康檢查 docker-compose.mcp.yml 的 postgres / omnisearch / firecrawl / filesystem",
|
||
"四個 MCP health endpoint 全部 200 後,才在正式環境設定 MCP_ROUTER_ENABLED=true",
|
||
"人工 fetch 必須先通過 /api/market_intel/mcp_fetch_gate,再允許公開頁面限速探測",
|
||
],
|
||
}
|