Files
ewoooc/scripts/verify_elephant_integration.py
ogt ba86f98514
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
feat: integrate Elephant Alpha ecosystem with full ADR-012/013 compliance
- Add ElephantService, AutonomousEngine, Orchestrator, DecisionRouter (EA 4-file stack)
- Fix 10 bugs: URL typo, SQL schema mismatches (price_records JOIN), enum mapping,
  metadata_json, NemoTron PriceThreat dispatch, async/await mismatch, broken imports
- Wire ADR-012 Agent Action Ladder: EventRouter L2 → EA first + AIOrch fallback;
  all decisions dual-write DB + triaged_alert Telegram; momo: callback prefix
- Wire ADR-013 AutoHeal: resource_optimization trigger → AutoHealService
- Add W3 guards: connection cache 300s TTL, $5/hr cost hard limit
- Add W4 persistence: routing decisions + agent performance snapshots → ai_insights
- Add Migration 015: confidence + created_by columns on ai_insights
- Fix run_scheduler.py broken imports (DecisionTracker service didn't exist)
- Fix verify_elephant_integration.py: check_status() → check_connection()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-20 04:28:26 +08:00

48 lines
1.5 KiB
Python

import sys
import os
import asyncio
import json
# Add project root to path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from services.elephant_alpha_orchestrator import elephant_orchestrator
from services.elephant_service import elephant_service
async def test_orchestration():
print("--- Testing Elephant Alpha Orchestration ---")
# Check service availability
status = elephant_service.check_connection()
print(f"Elephant Service Status: {'OK' if status else 'ERROR'}")
if not status:
print("Skipping orchestration test due to service error.")
return
# Simulate a business event
business_context = {
"task_type": "price_optimization",
"urgency": "high",
"market_situation": "Competitor A dropped price of core 10 products by 20%",
"inventory_level": "healthy"
}
print("Sending context to Elephant Alpha Orchestrator...")
try:
decision = await elephant_orchestrator.analyze_and_coordinate(business_context)
print("\n--- Strategic Decision ---")
print(f"Priority: {decision.priority}")
print(f"Reasoning: {decision.reasoning}")
print(f"Agents Required: {decision.agents_required}")
print("\nExecution Plan:")
for step in decision.execution_plan:
print(f"- {step.get('agent')}: {step.get('action')}")
except Exception as e:
print(f"Orchestration failed: {e}")
if __name__ == "__main__":
asyncio.run(test_orchestration())