fix(awooop): bridge signal worker observations
All checks were successful
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / tests (push) Successful in 1m4s
CD Pipeline / build-and-deploy (push) Successful in 3m23s
CD Pipeline / post-deploy-checks (push) Successful in 1m22s

This commit is contained in:
Your Name
2026-05-18 11:49:33 +08:00
parent 161e337e77
commit a023c535db
5 changed files with 665 additions and 2 deletions

View File

@@ -550,6 +550,10 @@ _SHORT_HOST_MAP: dict[str, str] = {
"120": "192.168.0.120",
"121": "192.168.0.121",
"188": "192.168.0.188",
"ollama": "192.168.0.188",
"ai-web": "192.168.0.188",
"harbor": "192.168.0.110",
"gitea": "192.168.0.110",
}
"""
Prometheus instance label 使用短主機名(如 "110:9100"
@@ -591,18 +595,43 @@ def _build_prometheus_query(alertname: str, namespace: str, pod_name: str) -> st
def _build_tool_params(incident: "Incident") -> dict[str, Any]:
"""從 Incident 提取 MCP 工具呼叫所需的公共參數。"""
labels = _get_labels(incident)
raw_host = labels.get("instance", "").split(":")[0] or labels.get("host", "")
host = _SHORT_HOST_MAP.get(raw_host, raw_host) # 短名 → 完整 IP
affected_services = getattr(incident, "affected_services", []) or []
target = (
labels.get("target")
or labels.get("node")
or (affected_services[0] if affected_services else "")
)
raw_host = (
labels.get("instance", "").split(":")[0]
or labels.get("host", "")
or labels.get("sensor_ip", "")
or (target if str(labels.get("alertname", "")).lower().startswith("host") else "")
)
host_key = str(raw_host).split(":")[0].strip().lower()
host = _SHORT_HOST_MAP.get(host_key, raw_host) # 短名/主機別名 → 完整 IP
namespace = labels.get("namespace", "awoooi-prod")
pod_name = labels.get("pod", labels.get("name", ""))
alertname = labels.get("alertname", "")
service_name = (
labels.get("service")
or labels.get("app")
or labels.get("container")
or target
or pod_name
)
return {
"namespace": namespace,
"pod_name": pod_name,
"deployment": labels.get("deployment", ""),
"host": host,
"container": labels.get("container", labels.get("name", "")),
"target": target,
"service_name": service_name,
"alertname": alertname,
"search_text": alertname,
"severity": labels.get("severity", ""),
"time_window_minutes": 30,
"limit": 100,
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: Prometheus tool 需要 query 欄位
# 原本缺少此欄位 → prometheus_query/range tool 傳入空 query → 回傳 error dict
"query": _build_prometheus_query(alertname, namespace, pod_name),

View File

@@ -0,0 +1,511 @@
"""Signal-worker observation bridge for AwoooP truth-chain.
The Redis signal worker predates AwoooP channel events. Incidents created from
that path used to stop at "source persisted" plus a seed timeline event, which
made operator history look like the alert was received but never observed. This
service records the non-Telegram path as first-class AwoooP evidence without
claiming that any auto-repair was executed.
"""
from __future__ import annotations
import asyncio
import json
import time
from typing import TYPE_CHECKING, Any
import structlog
from sqlalchemy import text
from src.db.base import get_db_context
from src.services.channel_hub import (
_db_timestamp_now,
build_external_alert_provider_event_id,
build_external_alert_run_id,
build_inbound_source_envelope,
ensure_completed_shadow_run,
mirror_inbound_event,
record_outbound_message,
)
from src.services.evidence_snapshot import EvidenceSnapshot
if TYPE_CHECKING:
from src.models.incident import Incident
logger = structlog.get_logger(__name__)
PROJECT_ID = "awoooi"
SIGNAL_OBSERVATION_TIMEOUT_SEC = 12.0
def _to_text(value: Any, default: str = "") -> str:
if value is None:
return default
if isinstance(value, bytes):
return value.decode(errors="replace")
text_value = str(value)
return text_value if text_value else default
def _as_dict(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return dict(value)
if value in (None, ""):
return {}
if isinstance(value, bytes):
value = value.decode(errors="replace")
if isinstance(value, str):
try:
parsed = json.loads(value)
except json.JSONDecodeError:
return {}
return dict(parsed) if isinstance(parsed, dict) else {}
return {}
def _first_signal(incident: "Incident") -> Any | None:
signals = getattr(incident, "signals", []) or []
return signals[0] if signals else None
def _severity_value(value: Any) -> str:
return _to_text(getattr(value, "value", value), "warning")
def _incident_id(incident: "Incident") -> str:
return _to_text(getattr(incident, "incident_id", None), "unknown")
def _alertname(incident: "Incident", signal_data: dict[str, Any]) -> str:
signal = _first_signal(incident)
labels = _as_dict(getattr(signal, "labels", None))
return (
_to_text(signal_data.get("alert_name"))
or _to_text(getattr(signal, "alert_name", None))
or _to_text(labels.get("alertname"))
or "unknown"
)
def _annotations(incident: "Incident", signal_data: dict[str, Any]) -> dict[str, Any]:
signal = _first_signal(incident)
merged = _as_dict(signal_data.get("annotations"))
merged.update(_as_dict(getattr(signal, "annotations", None)))
return merged
def _labels(incident: "Incident", signal_data: dict[str, Any]) -> dict[str, Any]:
signal = _first_signal(incident)
merged = _as_dict(signal_data.get("labels"))
merged.update(_as_dict(getattr(signal, "labels", None)))
affected_services = getattr(incident, "affected_services", []) or []
target = (
_to_text(signal_data.get("target"))
or _to_text(merged.get("target"))
or (_to_text(affected_services[0]) if affected_services else "")
)
sensor_ip = _to_text(signal_data.get("sensor_ip"))
sensor_host = _to_text(signal_data.get("sensor_host"))
namespace = _to_text(signal_data.get("namespace"), "default")
if target:
merged.setdefault("target", target)
if namespace:
merged.setdefault("namespace", namespace)
if sensor_ip:
merged.setdefault("sensor_ip", sensor_ip)
if sensor_host:
merged.setdefault("sensor_host", sensor_host)
if not merged.get("host") and (sensor_ip or target):
merged["host"] = sensor_ip or target
alertname = _alertname(incident, signal_data)
if alertname:
merged.setdefault("alertname", alertname)
merged.setdefault("source", _to_text(signal_data.get("source"), "signal_worker"))
return merged
def enrich_incident_for_signal_observation(
incident: "Incident",
signal_data: dict[str, Any],
) -> dict[str, Any]:
"""Add signal-worker context back to the local incident used by PDI."""
labels = _labels(incident, signal_data)
signal = _first_signal(incident)
if signal is not None:
try:
signal.labels = labels
except Exception:
logger.debug(
"signal_observation_label_enrich_skipped",
incident_id=_incident_id(incident),
)
return labels
def _fingerprint(incident: "Incident", signal_data: dict[str, Any]) -> str:
signal = _first_signal(incident)
return (
_to_text(signal_data.get("fingerprint"))
or _to_text(getattr(signal, "fingerprint", None))
or "no-fingerprint"
)
def build_signal_worker_provider_event_id(message_id: Any, fingerprint: str) -> str:
raw_event_id = f"{_to_text(message_id, 'unknown')}:{fingerprint[:16]}"
return build_external_alert_provider_event_id(
"signal-worker",
raw_event_id,
"received",
)
def format_signal_worker_event_content(
*,
incident_id: str,
alertname: str,
severity: str,
namespace: str,
target: str,
fingerprint: str,
message_id: str,
signal_source: str,
) -> str:
"""Render a redaction-safe internal event preview."""
return "\n".join(
[
"Signal worker inbound received",
f"Incident: {incident_id}",
f"Alert: {alertname}",
f"Severity: {severity}",
f"Namespace: {namespace or 'default'}",
f"Target: {target or '-'}",
f"Fingerprint: {fingerprint}",
f"Redis Message: {message_id}",
f"Source: {signal_source or 'signal_worker'}",
"Telegram: not sent (internal shadow mirror)",
]
)
async def _record_signal_channel_history(
*,
incident: "Incident",
signal_data: dict[str, Any],
message_id: Any,
labels: dict[str, Any],
) -> None:
incident_id = _incident_id(incident)
alertname = _alertname(incident, signal_data)
severity = _severity_value(
signal_data.get("severity") or getattr(incident, "severity", None)
)
namespace = _to_text(labels.get("namespace"), "default")
target = _to_text(labels.get("target")) or ",".join(
_to_text(value) for value in (getattr(incident, "affected_services", []) or [])
)
fingerprint = _fingerprint(incident, signal_data)
signal_source = _to_text(signal_data.get("source"), "signal_worker")
safe_message_id = _to_text(message_id, "unknown")
provider_event_id = build_signal_worker_provider_event_id(safe_message_id, fingerprint)
run_id = build_external_alert_run_id(PROJECT_ID, provider_event_id)
annotations = _annotations(incident, signal_data)
content = format_signal_worker_event_content(
incident_id=incident_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target=target,
fingerprint=fingerprint,
message_id=safe_message_id,
signal_source=signal_source,
)
envelope = build_inbound_source_envelope(
provider="signal-worker",
stage="received",
provider_event_id=provider_event_id,
raw_event_id=safe_message_id,
raw_content=content,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target,
fingerprint=fingerprint,
incident_id=incident_id,
labels=labels,
annotations=annotations,
extra={
"redis_stream": "awoooi:signals",
"signal_source": signal_source,
"sensor_host": labels.get("sensor_host"),
"sensor_ip": labels.get("sensor_ip"),
},
)
async with get_db_context(PROJECT_ID) as db:
await ensure_completed_shadow_run(
db,
project_id=PROJECT_ID,
run_id=run_id,
agent_id="legacy-signal-worker",
trigger_type="signal_worker_inbound",
trigger_ref=provider_event_id,
input_payload={
"incident_id": incident_id,
"alertname": alertname,
"severity": severity,
"namespace": namespace,
"target": target,
"fingerprint": fingerprint,
"message_id": safe_message_id,
},
)
conversation_event_id = await mirror_inbound_event(
db,
project_id=PROJECT_ID,
channel_type="internal",
provider_event_id=provider_event_id,
platform_subject_id="signal-worker",
channel_user_id=signal_source,
channel_chat_id=f"signal-worker:{namespace or 'default'}",
content_type="text",
raw_content=content,
source_envelope=envelope,
provider_ts=_db_timestamp_now(),
run_id=run_id,
)
exists = (
await db.execute(
text(
"""
SELECT 1
FROM awooop_outbound_message
WHERE project_id = :project_id
AND triggered_by_state = 'signal_worker_observed'
AND coalesce(source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb)
? :incident_id
LIMIT 1
"""
),
{"project_id": PROJECT_ID, "incident_id": incident_id},
)
).first()
if exists:
return
outbound_envelope = {
"schema_version": "signal_worker_observation_outbound_v1",
"source_refs": {
"incident_ids": [incident_id],
"alert_ids": [provider_event_id, safe_message_id],
"fingerprints": [fingerprint],
},
"automation_stage": "observed_not_executed",
"truth_note": "internal shadow message only; no auto-repair execution claimed",
}
await record_outbound_message(
db,
project_id=PROJECT_ID,
run_id=run_id,
conversation_event_id=conversation_event_id,
channel_type="internal",
channel_chat_id=f"signal-worker:{namespace or 'default'}",
message_type="interim",
content=(
f"AwoooP signal-worker observation recorded\n"
f"Incident: {incident_id}\n"
f"Alert: {alertname}\n"
"Stage: observed_not_executed\n"
"Telegram: not sent"
),
source_envelope=outbound_envelope,
provider_message_id=provider_event_id[:64],
send_status="shadow",
triggered_by_state="signal_worker_observed",
is_shadow=True,
)
async def _evidence_counts(incident_id: str) -> tuple[int, int]:
async with get_db_context(PROJECT_ID) as db:
row = (
await db.execute(
text(
"""
SELECT
count(*)::int AS total,
coalesce(sum(sensors_succeeded), 0)::int AS succeeded
FROM incident_evidence
WHERE incident_id = :incident_id
"""
),
{"incident_id": incident_id},
)
).mappings().first()
if not row:
return 0, 0
return int(row["total"] or 0), int(row["succeeded"] or 0)
async def _record_raw_signal_evidence_if_needed(
*,
incident: "Incident",
signal_data: dict[str, Any],
message_id: Any,
labels: dict[str, Any],
pdi_duration_ms: int | None,
) -> bool:
incident_id = _incident_id(incident)
try:
_total, succeeded = await _evidence_counts(incident_id)
except Exception as exc:
logger.warning(
"signal_observation_evidence_count_failed",
incident_id=incident_id,
error=str(exc),
)
succeeded = 0
if succeeded > 0:
return False
alertname = _alertname(incident, signal_data)
severity = _severity_value(
signal_data.get("severity") or getattr(incident, "severity", None)
)
annotations = _annotations(incident, signal_data)
snapshot = EvidenceSnapshot(incident_id=incident_id)
snapshot.alert_info = {
"alert_name": alertname,
"severity": severity,
"affected_services": getattr(incident, "affected_services", []) or [],
"labels": labels,
"annotations": annotations,
"source": _to_text(signal_data.get("source"), "signal_worker"),
"incident_id": incident_id,
"redis_message_id": _to_text(message_id, "unknown"),
}
snapshot.metrics_snapshot = {
"source": "signal_worker_raw_event",
"error_count": labels.get("error_count"),
"sensor_host": labels.get("sensor_host"),
"sensor_ip": labels.get("sensor_ip"),
"target": labels.get("target"),
}
snapshot.historical_context = (
"Raw signal-worker observation captured for truth-chain continuity. "
"This is evidence only; no auto-repair execution was performed."
)
snapshot.mcp_health = {"signal_worker_raw_event": True}
snapshot.sensors_attempted = 1
snapshot.sensors_succeeded = 1
snapshot.collection_duration_ms = pdi_duration_ms or 0
snapshot.evidence_summary = snapshot.build_summary()
await snapshot.save()
return True
async def _add_observation_timeline(
*,
incident: "Incident",
signal_data: dict[str, Any],
labels: dict[str, Any],
raw_evidence_created: bool,
) -> None:
try:
from src.services.approval_db import get_timeline_service
await get_timeline_service().add_event(
event_type="agent",
status="success",
title=f"Signal worker observation recorded: {_alertname(incident, signal_data)}",
description=json.dumps(
{
"stage": "observed_not_executed",
"raw_evidence_created": raw_evidence_created,
"target": labels.get("target"),
"sensor_host": labels.get("sensor_host"),
"sensor_ip": labels.get("sensor_ip"),
"telegram": "not_sent_internal_shadow",
},
ensure_ascii=False,
),
actor="signal_worker",
actor_role="signal_worker",
risk_level=_severity_value(getattr(incident, "severity", None)),
incident_id=_incident_id(incident),
)
except Exception as exc:
logger.warning(
"signal_observation_timeline_failed",
incident_id=_incident_id(incident),
error=str(exc),
)
async def _run_pre_decision_investigation(incident: "Incident") -> int | None:
started = time.monotonic()
try:
from src.services.pre_decision_investigator import get_pre_decision_investigator
await asyncio.wait_for(
get_pre_decision_investigator().investigate(incident),
timeout=SIGNAL_OBSERVATION_TIMEOUT_SEC,
)
return int((time.monotonic() - started) * 1000)
except asyncio.TimeoutError:
logger.warning(
"signal_observation_pdi_timeout",
incident_id=_incident_id(incident),
timeout_sec=SIGNAL_OBSERVATION_TIMEOUT_SEC,
)
return int((time.monotonic() - started) * 1000)
except Exception as exc:
logger.warning(
"signal_observation_pdi_failed",
incident_id=_incident_id(incident),
error=str(exc),
)
return int((time.monotonic() - started) * 1000)
async def record_signal_worker_observation(
incident: "Incident",
signal_data: dict[str, Any],
message_id: Any,
) -> dict[str, Any]:
"""Record signal-worker incidents as observable AwoooP events.
Returns a small status payload for logs/tests. All called operations are
best-effort from the worker perspective; callers may log failures but should
not convert them into signal-processing failures.
"""
labels = enrich_incident_for_signal_observation(incident, signal_data)
await _record_signal_channel_history(
incident=incident,
signal_data=signal_data,
message_id=message_id,
labels=labels,
)
pdi_duration_ms = await _run_pre_decision_investigation(incident)
raw_evidence_created = await _record_raw_signal_evidence_if_needed(
incident=incident,
signal_data=signal_data,
message_id=message_id,
labels=labels,
pdi_duration_ms=pdi_duration_ms,
)
await _add_observation_timeline(
incident=incident,
signal_data=signal_data,
labels=labels,
raw_evidence_created=raw_evidence_created,
)
return {
"incident_id": _incident_id(incident),
"alertname": _alertname(incident, signal_data),
"raw_evidence_created": raw_evidence_created,
"pdi_duration_ms": pdi_duration_ms,
}

View File

@@ -401,6 +401,29 @@ class SignalWorker:
affected_services=incident.affected_services,
persisted_to_pg=getattr(incident, "persisted_to_pg", False), # 2026-04-01 ogt: BrainIncident 無此欄位 (ADR-046 P2-01)
)
try:
from src.services.signal_observation_service import (
record_signal_worker_observation,
)
observation = await record_signal_worker_observation(
incident,
data,
message_id,
)
span.set_attribute("signal.observation_recorded", True)
span.set_attribute(
"signal.observation_raw_evidence",
bool(observation.get("raw_evidence_created")),
)
except Exception as exc:
span.set_attribute("signal.observation_recorded", False)
logger.warning(
"signal_observation_record_failed",
message_id=message_id,
incident_id=incident.incident_id,
error=str(exc),
)
else:
span.set_attribute("signal.processing_failed", True)
logger.warning(

View File

@@ -135,6 +135,22 @@ def _stub_incident(
return _Incident()
def _stub_host_incident() -> object:
class _Signal:
alert_name = "HostErrorLogFlood"
labels = {
"alertname": "HostErrorLogFlood",
"error_count": "30",
}
class _Incident:
incident_id = "INC-HOST"
signals = [_Signal()]
affected_services = ["ollama"]
return _Incident()
def _reg(tool_name: str, provider: MCPToolProvider, dim: SensorDimension) -> RegisteredTool:
return RegisteredTool(
tool=_make_tool(tool_name),
@@ -144,6 +160,16 @@ def _reg(tool_name: str, provider: MCPToolProvider, dim: SensorDimension) -> Reg
)
def test_build_tool_params_uses_host_alias_and_service_from_affected_service() -> None:
params = _build_tool_params(_stub_host_incident())
assert params["host"] == "192.168.0.188"
assert params["target"] == "ollama"
assert params["service_name"] == "ollama"
assert params["search_text"] == "HostErrorLogFlood"
assert params["time_window_minutes"] == 30
# ─────────────────────────────────────────────────────────────────────────────
# _compute_fingerprint
# ─────────────────────────────────────────────────────────────────────────────

View File

@@ -0,0 +1,74 @@
from __future__ import annotations
from types import SimpleNamespace
from src.models.incident import Severity
from src.services.signal_observation_service import (
build_signal_worker_provider_event_id,
enrich_incident_for_signal_observation,
format_signal_worker_event_content,
)
def _host_incident() -> SimpleNamespace:
signal = SimpleNamespace(
alert_name="HostErrorLogFlood",
labels={"error_count": "30"},
annotations={"summary": "30 ERROR log entries in last 5min"},
fingerprint="b000a87cfe4bc658",
)
return SimpleNamespace(
incident_id="INC-20260518-TEST01",
severity=Severity.P2,
affected_services=["ollama"],
signals=[signal],
)
def test_enrich_incident_for_signal_observation_restores_host_context() -> None:
incident = _host_incident()
labels = enrich_incident_for_signal_observation(
incident,
{
"source": "journal",
"namespace": "infra",
"target": "ollama",
"sensor_host": "ollama",
"sensor_ip": "192.168.0.188",
},
)
assert labels["alertname"] == "HostErrorLogFlood"
assert labels["namespace"] == "infra"
assert labels["target"] == "ollama"
assert labels["host"] == "192.168.0.188"
assert labels["sensor_host"] == "ollama"
assert incident.signals[0].labels["host"] == "192.168.0.188"
def test_signal_worker_provider_event_id_is_stable_and_bounded() -> None:
event_id = build_signal_worker_provider_event_id(
"1747528924000-0",
"b000a87cfe4bc658",
)
assert event_id == "signal-worker:received:1747528924000-0:b000a87cfe4bc658"
assert len(event_id) < 256
def test_signal_worker_event_content_is_operator_readable() -> None:
content = format_signal_worker_event_content(
incident_id="INC-20260518-TEST01",
alertname="HostErrorLogFlood",
severity="P2",
namespace="infra",
target="ollama",
fingerprint="b000a87cfe4bc658",
message_id="1747528924000-0",
signal_source="journal",
)
assert "Incident: INC-20260518-TEST01" in content
assert "Alert: HostErrorLogFlood" in content
assert "Telegram: not sent" in content