fix(awooop): bridge signal worker observations
This commit is contained in:
@@ -550,6 +550,10 @@ _SHORT_HOST_MAP: dict[str, str] = {
|
||||
"120": "192.168.0.120",
|
||||
"121": "192.168.0.121",
|
||||
"188": "192.168.0.188",
|
||||
"ollama": "192.168.0.188",
|
||||
"ai-web": "192.168.0.188",
|
||||
"harbor": "192.168.0.110",
|
||||
"gitea": "192.168.0.110",
|
||||
}
|
||||
"""
|
||||
Prometheus instance label 使用短主機名(如 "110:9100"),
|
||||
@@ -591,18 +595,43 @@ def _build_prometheus_query(alertname: str, namespace: str, pod_name: str) -> st
|
||||
def _build_tool_params(incident: "Incident") -> dict[str, Any]:
|
||||
"""從 Incident 提取 MCP 工具呼叫所需的公共參數。"""
|
||||
labels = _get_labels(incident)
|
||||
raw_host = labels.get("instance", "").split(":")[0] or labels.get("host", "")
|
||||
host = _SHORT_HOST_MAP.get(raw_host, raw_host) # 短名 → 完整 IP
|
||||
affected_services = getattr(incident, "affected_services", []) or []
|
||||
target = (
|
||||
labels.get("target")
|
||||
or labels.get("node")
|
||||
or (affected_services[0] if affected_services else "")
|
||||
)
|
||||
raw_host = (
|
||||
labels.get("instance", "").split(":")[0]
|
||||
or labels.get("host", "")
|
||||
or labels.get("sensor_ip", "")
|
||||
or (target if str(labels.get("alertname", "")).lower().startswith("host") else "")
|
||||
)
|
||||
host_key = str(raw_host).split(":")[0].strip().lower()
|
||||
host = _SHORT_HOST_MAP.get(host_key, raw_host) # 短名/主機別名 → 完整 IP
|
||||
namespace = labels.get("namespace", "awoooi-prod")
|
||||
pod_name = labels.get("pod", labels.get("name", ""))
|
||||
alertname = labels.get("alertname", "")
|
||||
service_name = (
|
||||
labels.get("service")
|
||||
or labels.get("app")
|
||||
or labels.get("container")
|
||||
or target
|
||||
or pod_name
|
||||
)
|
||||
return {
|
||||
"namespace": namespace,
|
||||
"pod_name": pod_name,
|
||||
"deployment": labels.get("deployment", ""),
|
||||
"host": host,
|
||||
"container": labels.get("container", labels.get("name", "")),
|
||||
"target": target,
|
||||
"service_name": service_name,
|
||||
"alertname": alertname,
|
||||
"search_text": alertname,
|
||||
"severity": labels.get("severity", ""),
|
||||
"time_window_minutes": 30,
|
||||
"limit": 100,
|
||||
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: Prometheus tool 需要 query 欄位
|
||||
# 原本缺少此欄位 → prometheus_query/range tool 傳入空 query → 回傳 error dict
|
||||
"query": _build_prometheus_query(alertname, namespace, pod_name),
|
||||
|
||||
511
apps/api/src/services/signal_observation_service.py
Normal file
511
apps/api/src/services/signal_observation_service.py
Normal file
@@ -0,0 +1,511 @@
|
||||
"""Signal-worker observation bridge for AwoooP truth-chain.
|
||||
|
||||
The Redis signal worker predates AwoooP channel events. Incidents created from
|
||||
that path used to stop at "source persisted" plus a seed timeline event, which
|
||||
made operator history look like the alert was received but never observed. This
|
||||
service records the non-Telegram path as first-class AwoooP evidence without
|
||||
claiming that any auto-repair was executed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
from src.services.channel_hub import (
|
||||
_db_timestamp_now,
|
||||
build_external_alert_provider_event_id,
|
||||
build_external_alert_run_id,
|
||||
build_inbound_source_envelope,
|
||||
ensure_completed_shadow_run,
|
||||
mirror_inbound_event,
|
||||
record_outbound_message,
|
||||
)
|
||||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from src.models.incident import Incident
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
PROJECT_ID = "awoooi"
|
||||
SIGNAL_OBSERVATION_TIMEOUT_SEC = 12.0
|
||||
|
||||
|
||||
def _to_text(value: Any, default: str = "") -> str:
|
||||
if value is None:
|
||||
return default
|
||||
if isinstance(value, bytes):
|
||||
return value.decode(errors="replace")
|
||||
text_value = str(value)
|
||||
return text_value if text_value else default
|
||||
|
||||
|
||||
def _as_dict(value: Any) -> dict[str, Any]:
|
||||
if isinstance(value, dict):
|
||||
return dict(value)
|
||||
if value in (None, ""):
|
||||
return {}
|
||||
if isinstance(value, bytes):
|
||||
value = value.decode(errors="replace")
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
parsed = json.loads(value)
|
||||
except json.JSONDecodeError:
|
||||
return {}
|
||||
return dict(parsed) if isinstance(parsed, dict) else {}
|
||||
return {}
|
||||
|
||||
|
||||
def _first_signal(incident: "Incident") -> Any | None:
|
||||
signals = getattr(incident, "signals", []) or []
|
||||
return signals[0] if signals else None
|
||||
|
||||
|
||||
def _severity_value(value: Any) -> str:
|
||||
return _to_text(getattr(value, "value", value), "warning")
|
||||
|
||||
|
||||
def _incident_id(incident: "Incident") -> str:
|
||||
return _to_text(getattr(incident, "incident_id", None), "unknown")
|
||||
|
||||
|
||||
def _alertname(incident: "Incident", signal_data: dict[str, Any]) -> str:
|
||||
signal = _first_signal(incident)
|
||||
labels = _as_dict(getattr(signal, "labels", None))
|
||||
return (
|
||||
_to_text(signal_data.get("alert_name"))
|
||||
or _to_text(getattr(signal, "alert_name", None))
|
||||
or _to_text(labels.get("alertname"))
|
||||
or "unknown"
|
||||
)
|
||||
|
||||
|
||||
def _annotations(incident: "Incident", signal_data: dict[str, Any]) -> dict[str, Any]:
|
||||
signal = _first_signal(incident)
|
||||
merged = _as_dict(signal_data.get("annotations"))
|
||||
merged.update(_as_dict(getattr(signal, "annotations", None)))
|
||||
return merged
|
||||
|
||||
|
||||
def _labels(incident: "Incident", signal_data: dict[str, Any]) -> dict[str, Any]:
|
||||
signal = _first_signal(incident)
|
||||
merged = _as_dict(signal_data.get("labels"))
|
||||
merged.update(_as_dict(getattr(signal, "labels", None)))
|
||||
affected_services = getattr(incident, "affected_services", []) or []
|
||||
target = (
|
||||
_to_text(signal_data.get("target"))
|
||||
or _to_text(merged.get("target"))
|
||||
or (_to_text(affected_services[0]) if affected_services else "")
|
||||
)
|
||||
sensor_ip = _to_text(signal_data.get("sensor_ip"))
|
||||
sensor_host = _to_text(signal_data.get("sensor_host"))
|
||||
namespace = _to_text(signal_data.get("namespace"), "default")
|
||||
|
||||
if target:
|
||||
merged.setdefault("target", target)
|
||||
if namespace:
|
||||
merged.setdefault("namespace", namespace)
|
||||
if sensor_ip:
|
||||
merged.setdefault("sensor_ip", sensor_ip)
|
||||
if sensor_host:
|
||||
merged.setdefault("sensor_host", sensor_host)
|
||||
if not merged.get("host") and (sensor_ip or target):
|
||||
merged["host"] = sensor_ip or target
|
||||
alertname = _alertname(incident, signal_data)
|
||||
if alertname:
|
||||
merged.setdefault("alertname", alertname)
|
||||
merged.setdefault("source", _to_text(signal_data.get("source"), "signal_worker"))
|
||||
return merged
|
||||
|
||||
|
||||
def enrich_incident_for_signal_observation(
|
||||
incident: "Incident",
|
||||
signal_data: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
"""Add signal-worker context back to the local incident used by PDI."""
|
||||
labels = _labels(incident, signal_data)
|
||||
signal = _first_signal(incident)
|
||||
if signal is not None:
|
||||
try:
|
||||
signal.labels = labels
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"signal_observation_label_enrich_skipped",
|
||||
incident_id=_incident_id(incident),
|
||||
)
|
||||
return labels
|
||||
|
||||
|
||||
def _fingerprint(incident: "Incident", signal_data: dict[str, Any]) -> str:
|
||||
signal = _first_signal(incident)
|
||||
return (
|
||||
_to_text(signal_data.get("fingerprint"))
|
||||
or _to_text(getattr(signal, "fingerprint", None))
|
||||
or "no-fingerprint"
|
||||
)
|
||||
|
||||
|
||||
def build_signal_worker_provider_event_id(message_id: Any, fingerprint: str) -> str:
|
||||
raw_event_id = f"{_to_text(message_id, 'unknown')}:{fingerprint[:16]}"
|
||||
return build_external_alert_provider_event_id(
|
||||
"signal-worker",
|
||||
raw_event_id,
|
||||
"received",
|
||||
)
|
||||
|
||||
|
||||
def format_signal_worker_event_content(
|
||||
*,
|
||||
incident_id: str,
|
||||
alertname: str,
|
||||
severity: str,
|
||||
namespace: str,
|
||||
target: str,
|
||||
fingerprint: str,
|
||||
message_id: str,
|
||||
signal_source: str,
|
||||
) -> str:
|
||||
"""Render a redaction-safe internal event preview."""
|
||||
return "\n".join(
|
||||
[
|
||||
"Signal worker inbound received",
|
||||
f"Incident: {incident_id}",
|
||||
f"Alert: {alertname}",
|
||||
f"Severity: {severity}",
|
||||
f"Namespace: {namespace or 'default'}",
|
||||
f"Target: {target or '-'}",
|
||||
f"Fingerprint: {fingerprint}",
|
||||
f"Redis Message: {message_id}",
|
||||
f"Source: {signal_source or 'signal_worker'}",
|
||||
"Telegram: not sent (internal shadow mirror)",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
async def _record_signal_channel_history(
|
||||
*,
|
||||
incident: "Incident",
|
||||
signal_data: dict[str, Any],
|
||||
message_id: Any,
|
||||
labels: dict[str, Any],
|
||||
) -> None:
|
||||
incident_id = _incident_id(incident)
|
||||
alertname = _alertname(incident, signal_data)
|
||||
severity = _severity_value(
|
||||
signal_data.get("severity") or getattr(incident, "severity", None)
|
||||
)
|
||||
namespace = _to_text(labels.get("namespace"), "default")
|
||||
target = _to_text(labels.get("target")) or ",".join(
|
||||
_to_text(value) for value in (getattr(incident, "affected_services", []) or [])
|
||||
)
|
||||
fingerprint = _fingerprint(incident, signal_data)
|
||||
signal_source = _to_text(signal_data.get("source"), "signal_worker")
|
||||
safe_message_id = _to_text(message_id, "unknown")
|
||||
provider_event_id = build_signal_worker_provider_event_id(safe_message_id, fingerprint)
|
||||
run_id = build_external_alert_run_id(PROJECT_ID, provider_event_id)
|
||||
annotations = _annotations(incident, signal_data)
|
||||
content = format_signal_worker_event_content(
|
||||
incident_id=incident_id,
|
||||
alertname=alertname,
|
||||
severity=severity,
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
fingerprint=fingerprint,
|
||||
message_id=safe_message_id,
|
||||
signal_source=signal_source,
|
||||
)
|
||||
envelope = build_inbound_source_envelope(
|
||||
provider="signal-worker",
|
||||
stage="received",
|
||||
provider_event_id=provider_event_id,
|
||||
raw_event_id=safe_message_id,
|
||||
raw_content=content,
|
||||
alertname=alertname,
|
||||
severity=severity,
|
||||
namespace=namespace,
|
||||
target_resource=target,
|
||||
fingerprint=fingerprint,
|
||||
incident_id=incident_id,
|
||||
labels=labels,
|
||||
annotations=annotations,
|
||||
extra={
|
||||
"redis_stream": "awoooi:signals",
|
||||
"signal_source": signal_source,
|
||||
"sensor_host": labels.get("sensor_host"),
|
||||
"sensor_ip": labels.get("sensor_ip"),
|
||||
},
|
||||
)
|
||||
|
||||
async with get_db_context(PROJECT_ID) as db:
|
||||
await ensure_completed_shadow_run(
|
||||
db,
|
||||
project_id=PROJECT_ID,
|
||||
run_id=run_id,
|
||||
agent_id="legacy-signal-worker",
|
||||
trigger_type="signal_worker_inbound",
|
||||
trigger_ref=provider_event_id,
|
||||
input_payload={
|
||||
"incident_id": incident_id,
|
||||
"alertname": alertname,
|
||||
"severity": severity,
|
||||
"namespace": namespace,
|
||||
"target": target,
|
||||
"fingerprint": fingerprint,
|
||||
"message_id": safe_message_id,
|
||||
},
|
||||
)
|
||||
conversation_event_id = await mirror_inbound_event(
|
||||
db,
|
||||
project_id=PROJECT_ID,
|
||||
channel_type="internal",
|
||||
provider_event_id=provider_event_id,
|
||||
platform_subject_id="signal-worker",
|
||||
channel_user_id=signal_source,
|
||||
channel_chat_id=f"signal-worker:{namespace or 'default'}",
|
||||
content_type="text",
|
||||
raw_content=content,
|
||||
source_envelope=envelope,
|
||||
provider_ts=_db_timestamp_now(),
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
exists = (
|
||||
await db.execute(
|
||||
text(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM awooop_outbound_message
|
||||
WHERE project_id = :project_id
|
||||
AND triggered_by_state = 'signal_worker_observed'
|
||||
AND coalesce(source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb)
|
||||
? :incident_id
|
||||
LIMIT 1
|
||||
"""
|
||||
),
|
||||
{"project_id": PROJECT_ID, "incident_id": incident_id},
|
||||
)
|
||||
).first()
|
||||
if exists:
|
||||
return
|
||||
|
||||
outbound_envelope = {
|
||||
"schema_version": "signal_worker_observation_outbound_v1",
|
||||
"source_refs": {
|
||||
"incident_ids": [incident_id],
|
||||
"alert_ids": [provider_event_id, safe_message_id],
|
||||
"fingerprints": [fingerprint],
|
||||
},
|
||||
"automation_stage": "observed_not_executed",
|
||||
"truth_note": "internal shadow message only; no auto-repair execution claimed",
|
||||
}
|
||||
await record_outbound_message(
|
||||
db,
|
||||
project_id=PROJECT_ID,
|
||||
run_id=run_id,
|
||||
conversation_event_id=conversation_event_id,
|
||||
channel_type="internal",
|
||||
channel_chat_id=f"signal-worker:{namespace or 'default'}",
|
||||
message_type="interim",
|
||||
content=(
|
||||
f"AwoooP signal-worker observation recorded\n"
|
||||
f"Incident: {incident_id}\n"
|
||||
f"Alert: {alertname}\n"
|
||||
"Stage: observed_not_executed\n"
|
||||
"Telegram: not sent"
|
||||
),
|
||||
source_envelope=outbound_envelope,
|
||||
provider_message_id=provider_event_id[:64],
|
||||
send_status="shadow",
|
||||
triggered_by_state="signal_worker_observed",
|
||||
is_shadow=True,
|
||||
)
|
||||
|
||||
|
||||
async def _evidence_counts(incident_id: str) -> tuple[int, int]:
|
||||
async with get_db_context(PROJECT_ID) as db:
|
||||
row = (
|
||||
await db.execute(
|
||||
text(
|
||||
"""
|
||||
SELECT
|
||||
count(*)::int AS total,
|
||||
coalesce(sum(sensors_succeeded), 0)::int AS succeeded
|
||||
FROM incident_evidence
|
||||
WHERE incident_id = :incident_id
|
||||
"""
|
||||
),
|
||||
{"incident_id": incident_id},
|
||||
)
|
||||
).mappings().first()
|
||||
if not row:
|
||||
return 0, 0
|
||||
return int(row["total"] or 0), int(row["succeeded"] or 0)
|
||||
|
||||
|
||||
async def _record_raw_signal_evidence_if_needed(
|
||||
*,
|
||||
incident: "Incident",
|
||||
signal_data: dict[str, Any],
|
||||
message_id: Any,
|
||||
labels: dict[str, Any],
|
||||
pdi_duration_ms: int | None,
|
||||
) -> bool:
|
||||
incident_id = _incident_id(incident)
|
||||
try:
|
||||
_total, succeeded = await _evidence_counts(incident_id)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"signal_observation_evidence_count_failed",
|
||||
incident_id=incident_id,
|
||||
error=str(exc),
|
||||
)
|
||||
succeeded = 0
|
||||
|
||||
if succeeded > 0:
|
||||
return False
|
||||
|
||||
alertname = _alertname(incident, signal_data)
|
||||
severity = _severity_value(
|
||||
signal_data.get("severity") or getattr(incident, "severity", None)
|
||||
)
|
||||
annotations = _annotations(incident, signal_data)
|
||||
snapshot = EvidenceSnapshot(incident_id=incident_id)
|
||||
snapshot.alert_info = {
|
||||
"alert_name": alertname,
|
||||
"severity": severity,
|
||||
"affected_services": getattr(incident, "affected_services", []) or [],
|
||||
"labels": labels,
|
||||
"annotations": annotations,
|
||||
"source": _to_text(signal_data.get("source"), "signal_worker"),
|
||||
"incident_id": incident_id,
|
||||
"redis_message_id": _to_text(message_id, "unknown"),
|
||||
}
|
||||
snapshot.metrics_snapshot = {
|
||||
"source": "signal_worker_raw_event",
|
||||
"error_count": labels.get("error_count"),
|
||||
"sensor_host": labels.get("sensor_host"),
|
||||
"sensor_ip": labels.get("sensor_ip"),
|
||||
"target": labels.get("target"),
|
||||
}
|
||||
snapshot.historical_context = (
|
||||
"Raw signal-worker observation captured for truth-chain continuity. "
|
||||
"This is evidence only; no auto-repair execution was performed."
|
||||
)
|
||||
snapshot.mcp_health = {"signal_worker_raw_event": True}
|
||||
snapshot.sensors_attempted = 1
|
||||
snapshot.sensors_succeeded = 1
|
||||
snapshot.collection_duration_ms = pdi_duration_ms or 0
|
||||
snapshot.evidence_summary = snapshot.build_summary()
|
||||
await snapshot.save()
|
||||
return True
|
||||
|
||||
|
||||
async def _add_observation_timeline(
|
||||
*,
|
||||
incident: "Incident",
|
||||
signal_data: dict[str, Any],
|
||||
labels: dict[str, Any],
|
||||
raw_evidence_created: bool,
|
||||
) -> None:
|
||||
try:
|
||||
from src.services.approval_db import get_timeline_service
|
||||
|
||||
await get_timeline_service().add_event(
|
||||
event_type="agent",
|
||||
status="success",
|
||||
title=f"Signal worker observation recorded: {_alertname(incident, signal_data)}",
|
||||
description=json.dumps(
|
||||
{
|
||||
"stage": "observed_not_executed",
|
||||
"raw_evidence_created": raw_evidence_created,
|
||||
"target": labels.get("target"),
|
||||
"sensor_host": labels.get("sensor_host"),
|
||||
"sensor_ip": labels.get("sensor_ip"),
|
||||
"telegram": "not_sent_internal_shadow",
|
||||
},
|
||||
ensure_ascii=False,
|
||||
),
|
||||
actor="signal_worker",
|
||||
actor_role="signal_worker",
|
||||
risk_level=_severity_value(getattr(incident, "severity", None)),
|
||||
incident_id=_incident_id(incident),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"signal_observation_timeline_failed",
|
||||
incident_id=_incident_id(incident),
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
|
||||
async def _run_pre_decision_investigation(incident: "Incident") -> int | None:
|
||||
started = time.monotonic()
|
||||
try:
|
||||
from src.services.pre_decision_investigator import get_pre_decision_investigator
|
||||
|
||||
await asyncio.wait_for(
|
||||
get_pre_decision_investigator().investigate(incident),
|
||||
timeout=SIGNAL_OBSERVATION_TIMEOUT_SEC,
|
||||
)
|
||||
return int((time.monotonic() - started) * 1000)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
"signal_observation_pdi_timeout",
|
||||
incident_id=_incident_id(incident),
|
||||
timeout_sec=SIGNAL_OBSERVATION_TIMEOUT_SEC,
|
||||
)
|
||||
return int((time.monotonic() - started) * 1000)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"signal_observation_pdi_failed",
|
||||
incident_id=_incident_id(incident),
|
||||
error=str(exc),
|
||||
)
|
||||
return int((time.monotonic() - started) * 1000)
|
||||
|
||||
|
||||
async def record_signal_worker_observation(
|
||||
incident: "Incident",
|
||||
signal_data: dict[str, Any],
|
||||
message_id: Any,
|
||||
) -> dict[str, Any]:
|
||||
"""Record signal-worker incidents as observable AwoooP events.
|
||||
|
||||
Returns a small status payload for logs/tests. All called operations are
|
||||
best-effort from the worker perspective; callers may log failures but should
|
||||
not convert them into signal-processing failures.
|
||||
"""
|
||||
labels = enrich_incident_for_signal_observation(incident, signal_data)
|
||||
await _record_signal_channel_history(
|
||||
incident=incident,
|
||||
signal_data=signal_data,
|
||||
message_id=message_id,
|
||||
labels=labels,
|
||||
)
|
||||
pdi_duration_ms = await _run_pre_decision_investigation(incident)
|
||||
raw_evidence_created = await _record_raw_signal_evidence_if_needed(
|
||||
incident=incident,
|
||||
signal_data=signal_data,
|
||||
message_id=message_id,
|
||||
labels=labels,
|
||||
pdi_duration_ms=pdi_duration_ms,
|
||||
)
|
||||
await _add_observation_timeline(
|
||||
incident=incident,
|
||||
signal_data=signal_data,
|
||||
labels=labels,
|
||||
raw_evidence_created=raw_evidence_created,
|
||||
)
|
||||
return {
|
||||
"incident_id": _incident_id(incident),
|
||||
"alertname": _alertname(incident, signal_data),
|
||||
"raw_evidence_created": raw_evidence_created,
|
||||
"pdi_duration_ms": pdi_duration_ms,
|
||||
}
|
||||
@@ -401,6 +401,29 @@ class SignalWorker:
|
||||
affected_services=incident.affected_services,
|
||||
persisted_to_pg=getattr(incident, "persisted_to_pg", False), # 2026-04-01 ogt: BrainIncident 無此欄位 (ADR-046 P2-01)
|
||||
)
|
||||
try:
|
||||
from src.services.signal_observation_service import (
|
||||
record_signal_worker_observation,
|
||||
)
|
||||
|
||||
observation = await record_signal_worker_observation(
|
||||
incident,
|
||||
data,
|
||||
message_id,
|
||||
)
|
||||
span.set_attribute("signal.observation_recorded", True)
|
||||
span.set_attribute(
|
||||
"signal.observation_raw_evidence",
|
||||
bool(observation.get("raw_evidence_created")),
|
||||
)
|
||||
except Exception as exc:
|
||||
span.set_attribute("signal.observation_recorded", False)
|
||||
logger.warning(
|
||||
"signal_observation_record_failed",
|
||||
message_id=message_id,
|
||||
incident_id=incident.incident_id,
|
||||
error=str(exc),
|
||||
)
|
||||
else:
|
||||
span.set_attribute("signal.processing_failed", True)
|
||||
logger.warning(
|
||||
|
||||
@@ -135,6 +135,22 @@ def _stub_incident(
|
||||
return _Incident()
|
||||
|
||||
|
||||
def _stub_host_incident() -> object:
|
||||
class _Signal:
|
||||
alert_name = "HostErrorLogFlood"
|
||||
labels = {
|
||||
"alertname": "HostErrorLogFlood",
|
||||
"error_count": "30",
|
||||
}
|
||||
|
||||
class _Incident:
|
||||
incident_id = "INC-HOST"
|
||||
signals = [_Signal()]
|
||||
affected_services = ["ollama"]
|
||||
|
||||
return _Incident()
|
||||
|
||||
|
||||
def _reg(tool_name: str, provider: MCPToolProvider, dim: SensorDimension) -> RegisteredTool:
|
||||
return RegisteredTool(
|
||||
tool=_make_tool(tool_name),
|
||||
@@ -144,6 +160,16 @@ def _reg(tool_name: str, provider: MCPToolProvider, dim: SensorDimension) -> Reg
|
||||
)
|
||||
|
||||
|
||||
def test_build_tool_params_uses_host_alias_and_service_from_affected_service() -> None:
|
||||
params = _build_tool_params(_stub_host_incident())
|
||||
|
||||
assert params["host"] == "192.168.0.188"
|
||||
assert params["target"] == "ollama"
|
||||
assert params["service_name"] == "ollama"
|
||||
assert params["search_text"] == "HostErrorLogFlood"
|
||||
assert params["time_window_minutes"] == 30
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# _compute_fingerprint
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
74
apps/api/tests/test_signal_observation_service.py
Normal file
74
apps/api/tests/test_signal_observation_service.py
Normal file
@@ -0,0 +1,74 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
|
||||
from src.models.incident import Severity
|
||||
from src.services.signal_observation_service import (
|
||||
build_signal_worker_provider_event_id,
|
||||
enrich_incident_for_signal_observation,
|
||||
format_signal_worker_event_content,
|
||||
)
|
||||
|
||||
|
||||
def _host_incident() -> SimpleNamespace:
|
||||
signal = SimpleNamespace(
|
||||
alert_name="HostErrorLogFlood",
|
||||
labels={"error_count": "30"},
|
||||
annotations={"summary": "30 ERROR log entries in last 5min"},
|
||||
fingerprint="b000a87cfe4bc658",
|
||||
)
|
||||
return SimpleNamespace(
|
||||
incident_id="INC-20260518-TEST01",
|
||||
severity=Severity.P2,
|
||||
affected_services=["ollama"],
|
||||
signals=[signal],
|
||||
)
|
||||
|
||||
|
||||
def test_enrich_incident_for_signal_observation_restores_host_context() -> None:
|
||||
incident = _host_incident()
|
||||
|
||||
labels = enrich_incident_for_signal_observation(
|
||||
incident,
|
||||
{
|
||||
"source": "journal",
|
||||
"namespace": "infra",
|
||||
"target": "ollama",
|
||||
"sensor_host": "ollama",
|
||||
"sensor_ip": "192.168.0.188",
|
||||
},
|
||||
)
|
||||
|
||||
assert labels["alertname"] == "HostErrorLogFlood"
|
||||
assert labels["namespace"] == "infra"
|
||||
assert labels["target"] == "ollama"
|
||||
assert labels["host"] == "192.168.0.188"
|
||||
assert labels["sensor_host"] == "ollama"
|
||||
assert incident.signals[0].labels["host"] == "192.168.0.188"
|
||||
|
||||
|
||||
def test_signal_worker_provider_event_id_is_stable_and_bounded() -> None:
|
||||
event_id = build_signal_worker_provider_event_id(
|
||||
"1747528924000-0",
|
||||
"b000a87cfe4bc658",
|
||||
)
|
||||
|
||||
assert event_id == "signal-worker:received:1747528924000-0:b000a87cfe4bc658"
|
||||
assert len(event_id) < 256
|
||||
|
||||
|
||||
def test_signal_worker_event_content_is_operator_readable() -> None:
|
||||
content = format_signal_worker_event_content(
|
||||
incident_id="INC-20260518-TEST01",
|
||||
alertname="HostErrorLogFlood",
|
||||
severity="P2",
|
||||
namespace="infra",
|
||||
target="ollama",
|
||||
fingerprint="b000a87cfe4bc658",
|
||||
message_id="1747528924000-0",
|
||||
signal_source="journal",
|
||||
)
|
||||
|
||||
assert "Incident: INC-20260518-TEST01" in content
|
||||
assert "Alert: HostErrorLogFlood" in content
|
||||
assert "Telegram: not sent" in content
|
||||
Reference in New Issue
Block a user