アラートトリアージの自動化: グラフ分析による MTTD の削減
アラートのトリアージの問題は、SOC アナリストの人生の中で最も費用がかかり、イライラするものの 1 つです。 IBM の 2025 年のデータによると、侵害を特定するまでの平均時間 (MTTD - 平均検出時間) と、 の 194日 高度な自動化を導入していない組織向け。 一方、AI 自動化とグラフベースの相関関係を組み合わせた組織では、これが削減されます。 値を 数時間、あるいは数分 同じカテゴリの脅威について。
問題の中心はアラート疲労です。平均的な SOC は 1 日に何千ものアラートを一定の割合で処理します。 一部の環境では誤検知率が 97% を超えます。アナリストはほとんどの時間を費やします 本当の脅威を調査するのではなく、無害なアラートを評価します。グラフ分析は 1 つのアプローチを提供します 根本的に異なります。各アラートを個別に評価するのではなく、アラートをグラフで関連付けます。 アクティビティを分析し、複数段階の攻撃パターンを特定し、重大度に基づいて自動的に優先順位を付けます。 文脈的に。
何を学ぶか
- 従来のトリアージがうまくスケールできない理由と、グラフ分析がそれをどのように変えるか
- グラフベースのアラート相関システムのアーキテクチャ
- NetworkXとNeo4jによる実践的な実装
- 自動優先順位付けのためのスコアリング アルゴリズム
- 既存の SOC パイプラインとの統合
- 成功指標: MTTD、誤検知率、アナリストのスループット
警戒疲労の問題
ソリューションを構築する前に、問題を深く理解する必要があります。アラート疲労ではありません 単純に「アラートが多すぎる」: これは従来の SIEM のアーキテクチャに根ざしたシステム的な問題です そして人間の認知の限界内で。
従来の SIEM は、一連のルールに照らして各ログ イベントを個別に評価します。ルールがあるとき 一致するとアラートが生成されます。結果は次のようになります。
- 正規のネットワーク スキャン (Nessus 脆弱性スキャンなど) により、数百のアラートが生成される
Port Scan Detected - 自動パッチ適用プロセスにより数十のアラートが生成される
Suspicious Process Creation - VPN 経由で自宅からログインしたユーザーがアラートを生成する
Impossible Travel正しく構成されていない場合
グラフ分析はこの問題をエレガントに解決します。 その一部であるアラートをグループ化します 同じ攻撃シナリオの 単一のコンテキスト化されたインシデントで、 情報に基づいた意思決定を数分ではなく数秒で行うために必要なコンテキストを分析します。
セクターデータ (2025)
- 組織の 73% がアラートトリアージを自動化しています (Gurucul 2025)
- AI 自動化により、導入者の 60% で調査時間が 25 ~ 50% 削減されました
- ReliaQuest: AI 自動化を使用した場合、応答時間は 7 分未満、使用しない場合は 2.3 日
- Dropzone AI: 3 ~ 10 分の調査で 90% のアラートをカバー
グラフベースのアラート相関関係の基礎
基本的でシンプルな概念: すべてのアラートは ノード グラフでは、 関係 アラート間 (同じホスト、同じユーザー、同じ時間枠、 同じ ATT&CK テクニック) アーチ。結果のグラフから明らかになるのは、 潜在的な攻撃シナリオを表す関連アラートのクラスター。
最も有用な相関関係のタイプは次のとおりです。
| 相関の種類 | 基準 | Forza | Esempio |
|---|---|---|---|
| Temporale | ウィンドウ T 内でアラート (例: 5 分) | 低い | ポートスキャンとブルートフォースを同時に実行 |
| 実在物 | 同じホスト/IP/ユーザー | 平均 | 同じエンドポイント上の異なるアラート |
| ATT&CK キルチェーン | 論理的な順序でのテクニック | 高い | 偵察 + 初期アクセス + 永続性 |
| IOC の重複 | 同じ悪意のあるハッシュ/ドメイン/IP | 高い | 同じC2プラスアラート |
| 因果関係 | 親/子プロセス、生成されたネットワーク | 非常に高い | cmd.exe はペイロードをダウンロードする word.exe によって開始されます |
実装: NetworkX を使用したアラート グラフ
次を使用して Python の実装を始めましょう。 ネットワークX グラフ管理用。 このソリューションは、プロトタイプおよび中規模の環境 (1 日あたり最大 100,000 件のアラート) に適しています。 より大きなボリュームの場合は、Neo4j が使用されます (次のセクションを参照)。
# Sistema di Alert Graph Correlation
# File: alert_graph.py
import networkx as nx
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
import uuid
import json
@dataclass
class Alert:
id: str
timestamp: datetime
rule_name: str
severity: str # 'low', 'medium', 'high', 'critical'
host: str
user: Optional[str]
src_ip: Optional[str]
dst_ip: Optional[str]
technique_id: Optional[str] # MITRE ATT&CK ID (es. T1059.001)
raw_data: dict = field(default_factory=dict)
def severity_score(self) -> int:
return {'low': 1, 'medium': 2, 'high': 3, 'critical': 4}.get(self.severity, 1)
@dataclass
class AlertCluster:
id: str
alerts: list[Alert]
score: float
attack_chain: list[str] # Sequenza di tecniche ATT&CK
primary_host: str
created_at: datetime
class AlertGraphCorrelator:
# Finestra temporale per correlazione (default: 30 minuti)
CORRELATION_WINDOW_MINUTES = 30
# Pesi per il calcolo del punteggio cluster
WEIGHT_SEVERITY = 3.0
WEIGHT_TECHNIQUE_CHAIN = 5.0
WEIGHT_SAME_HOST = 2.0
WEIGHT_SAME_USER = 2.5
WEIGHT_IOC_OVERLAP = 4.0
# Kill chain ATT&CK semplificata per correlazione
KILL_CHAIN_ORDER = [
'TA0043', # Reconnaissance
'TA0042', # Resource Development
'TA0001', # Initial Access
'TA0002', # Execution
'TA0003', # Persistence
'TA0004', # Privilege Escalation
'TA0005', # Defense Evasion
'TA0006', # Credential Access
'TA0007', # Discovery
'TA0008', # Lateral Movement
'TA0009', # Collection
'TA0011', # Command and Control
'TA0010', # Exfiltration
'TA0040', # Impact
]
def __init__(self):
self.graph = nx.DiGraph()
self.alerts: dict[str, Alert] = {}
def add_alert(self, alert: Alert) -> None:
"""Aggiunge un alert al grafo e crea correlazioni."""
self.alerts[alert.id] = alert
# Aggiungi nodo con attributi
self.graph.add_node(alert.id, **{
'timestamp': alert.timestamp.isoformat(),
'severity': alert.severity,
'host': alert.host,
'user': alert.user,
'technique': alert.technique_id,
'score': alert.severity_score()
})
# Cerca correlazioni con alert esistenti
for existing_id, existing in self.alerts.items():
if existing_id == alert.id:
continue
correlations = self._calculate_correlations(alert, existing)
if correlations:
total_weight = sum(c['weight'] for c in correlations)
edge_labels = [c['type'] for c in correlations]
self.graph.add_edge(
existing_id, alert.id,
weight=total_weight,
correlation_types=edge_labels
)
def _calculate_correlations(self, alert1: Alert,
alert2: Alert) -> list[dict]:
"""Calcola le correlazioni tra due alert."""
correlations = []
# 1. Correlazione temporale
time_diff = abs((alert1.timestamp - alert2.timestamp).total_seconds())
if time_diff <= self.CORRELATION_WINDOW_MINUTES * 60:
time_weight = 1.0 - (time_diff / (self.CORRELATION_WINDOW_MINUTES * 60))
correlations.append({'type': 'temporal', 'weight': time_weight})
# 2. Stesso host
if alert1.host == alert2.host:
correlations.append({'type': 'same_host', 'weight': self.WEIGHT_SAME_HOST})
# 3. Stesso utente
if (alert1.user and alert2.user and alert1.user == alert2.user):
correlations.append({'type': 'same_user', 'weight': self.WEIGHT_SAME_USER})
# 4. IOC overlap (IP)
if (alert1.src_ip and alert2.src_ip and alert1.src_ip == alert2.src_ip):
correlations.append({'type': 'ioc_overlap_ip', 'weight': self.WEIGHT_IOC_OVERLAP})
# 5. Kill chain sequenziale ATT&CK
if alert1.technique_id and alert2.technique_id:
chain_score = self._calculate_kill_chain_score(
alert1.technique_id, alert2.technique_id
)
if chain_score > 0:
correlations.append({'type': 'kill_chain', 'weight': chain_score})
return correlations
def _calculate_kill_chain_score(self, technique1: str,
technique2: str) -> float:
"""Calcola un punteggio basato sulla progressione kill chain."""
# Mapping semplificato tecnica -> tattica
# In produzione si usa la MITRE ATT&CK API
technique_to_tactic = {
'T1595': 'TA0043', # Recon - Active Scanning
'T1190': 'TA0001', # Initial Access - Exploit Public-Facing App
'T1059': 'TA0002', # Execution - Command and Scripting
'T1053': 'TA0003', # Persistence - Scheduled Task
'T1078': 'TA0004', # Privilege Escalation - Valid Accounts
'T1562': 'TA0005', # Defense Evasion - Impair Defenses
'T1003': 'TA0006', # Credential Access - OS Credential Dumping
'T1087': 'TA0007', # Discovery - Account Discovery
'T1021': 'TA0008', # Lateral Movement - Remote Services
'T1071': 'TA0011', # C2 - Application Layer Protocol
}
tactic1 = technique_to_tactic.get(technique1)
tactic2 = technique_to_tactic.get(technique2)
if not tactic1 or not tactic2:
return 0.0
try:
idx1 = self.KILL_CHAIN_ORDER.index(tactic1)
idx2 = self.KILL_CHAIN_ORDER.index(tactic2)
# Punteggio più alto se la progressione e logica (tecnica più avanzata dopo)
if idx2 > idx1:
progression = (idx2 - idx1) / len(self.KILL_CHAIN_ORDER)
return self.WEIGHT_TECHNIQUE_CHAIN * progression
except ValueError:
pass
return 0.0
def get_clusters(self, min_cluster_size: int = 2) -> list[AlertCluster]:
"""Identifica cluster di alert correlati."""
# Usa connected components sul grafo non diretto per trovare i cluster
undirected = self.graph.to_undirected()
components = list(nx.connected_components(undirected))
clusters = []
for component in components:
if len(component) < min_cluster_size:
continue
component_alerts = [self.alerts[aid] for aid in component
if aid in self.alerts]
score = self._calculate_cluster_score(component_alerts, component)
attack_chain = self._extract_attack_chain(component_alerts)
primary_host = self._find_primary_host(component_alerts)
clusters.append(AlertCluster(
id=str(uuid.uuid4()),
alerts=component_alerts,
score=score,
attack_chain=attack_chain,
primary_host=primary_host,
created_at=datetime.now()
))
# Ordina per score decrescente (massima priorità prima)
return sorted(clusters, key=lambda c: c.score, reverse=True)
def _calculate_cluster_score(self, alerts: list[Alert],
component: set) -> float:
"""Calcola il punteggio di priorità del cluster."""
score = 0.0
# 1. Contributo severita
severity_sum = sum(a.severity_score() for a in alerts)
max_severity = max(a.severity_score() for a in alerts)
score += severity_sum * self.WEIGHT_SEVERITY
score += max_severity * 2 # Bonus per alert critici
# 2. Numero di tecniche ATT&CK distinte
techniques = set(a.technique_id for a in alerts if a.technique_id)
score += len(techniques) * self.WEIGHT_TECHNIQUE_CHAIN
# 3. Peso archi nel sottografo
subgraph = self.graph.subgraph(component)
edge_weight_sum = sum(
data.get('weight', 0)
for _, _, data in subgraph.edges(data=True)
)
score += edge_weight_sum
# 4. Numero host distinti (lateral movement indicator)
hosts = set(a.host for a in alerts)
if len(hosts) > 1:
score += len(hosts) * 3.0 # Lateral movement e molto significativo
return score
def _extract_attack_chain(self, alerts: list[Alert]) -> list[str]:
"""Estrae la kill chain osservata dal cluster."""
techniques = [a.technique_id for a in alerts if a.technique_id]
# Ordina per timestamp
sorted_alerts = sorted(alerts, key=lambda a: a.timestamp)
return [a.technique_id for a in sorted_alerts if a.technique_id]
def _find_primary_host(self, alerts: list[Alert]) -> str:
"""Identifica l'host più coinvolto nel cluster."""
host_counts = {}
for alert in alerts:
host_counts[alert.host] = host_counts.get(alert.host, 0) + 1
return max(host_counts, key=host_counts.get) if host_counts else 'unknown'
多要素スコアリングによる自動優先順位付け
クラスターのスコアリングでは、個々のアラートの重大度だけでなく、 の 文脈上のコンテキスト: キルチェーンの進行状況、クリティカル度 関連する資産、既知の悪意のある IOC の存在。
# Sistema di scoring avanzato con contesto asset
# File: alert_scorer.py
@dataclass
class AssetCriticality:
hostname: str
criticality: str # 'low', 'medium', 'high', 'critical'
asset_type: str # 'workstation', 'server', 'dc', 'database', 'ot'
business_owner: str
class ContextualScorer:
# Moltiplicatori per criticalita asset
ASSET_MULTIPLIERS = {
'workstation': 1.0,
'server': 1.5,
'database': 2.0,
'dc': 3.0, # Domain Controller
'ot': 4.0 # OT/ICS systems
}
CRITICALITY_MULTIPLIERS = {
'low': 1.0,
'medium': 1.5,
'high': 2.0,
'critical': 3.0
}
def __init__(self, asset_registry: dict[str, AssetCriticality],
threat_intel_ips: set[str]):
self.asset_registry = asset_registry
self.threat_intel_ips = threat_intel_ips
def score_cluster(self, cluster: AlertCluster) -> dict:
"""Calcola score completo con breakdown."""
base_score = cluster.score
context_multiplier = 1.0
breakdown = {}
# 1. Asset criticality multiplier
asset = self.asset_registry.get(cluster.primary_host)
if asset:
type_mult = self.ASSET_MULTIPLIERS.get(asset.asset_type, 1.0)
crit_mult = self.CRITICALITY_MULTIPLIERS.get(asset.criticality, 1.0)
asset_mult = type_mult * crit_mult
context_multiplier *= asset_mult
breakdown['asset_multiplier'] = asset_mult
# 2. Threat Intel overlap
ti_hits = sum(
1 for alert in cluster.alerts
if alert.src_ip in self.threat_intel_ips
)
if ti_hits > 0:
ti_boost = 1.0 + (ti_hits * 0.5)
context_multiplier *= ti_boost
breakdown['threat_intel_boost'] = ti_boost
# 3. Kill chain completeness
chain_length = len(cluster.attack_chain)
chain_multiplier = 1.0 + (chain_length * 0.2) # +20% per ogni step
context_multiplier *= chain_multiplier
breakdown['chain_multiplier'] = chain_multiplier
# 4. Time pressure (alert recenti hanno priorità maggiore)
most_recent = max(a.timestamp for a in cluster.alerts)
age_minutes = (datetime.now() - most_recent).total_seconds() / 60
recency_multiplier = max(0.5, 1.0 - (age_minutes / 1440)) # Decade in 24h
context_multiplier *= recency_multiplier
breakdown['recency_multiplier'] = recency_multiplier
final_score = base_score * context_multiplier
breakdown['base_score'] = base_score
breakdown['context_multiplier'] = context_multiplier
breakdown['final_score'] = final_score
return breakdown
def prioritize_queue(self, clusters: list[AlertCluster]) -> list[dict]:
"""Genera la coda di lavoro prioritizzata per gli analisti."""
scored = []
for cluster in clusters:
score_breakdown = self.score_cluster(cluster)
scored.append({
'cluster': cluster,
'score': score_breakdown['final_score'],
'breakdown': score_breakdown,
'priority': self._score_to_priority(score_breakdown['final_score'])
})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _score_to_priority(self, score: float) -> str:
if score >= 100:
return 'P1 - Critical'
elif score >= 50:
return 'P2 - High'
elif score >= 20:
return 'P3 - Medium'
else:
return 'P4 - Low'
エンタープライズ ボリューム用の Neo4j との統合
1 日に何百万ものアラートが発生するエンタープライズ環境の場合、インメモリ NetworkX は拡張性がありません。 Neo4jは、最も人気のあるグラフ データベースであり、ネイティブ クエリ パフォーマンスを提供します。 複雑な相関関係と履歴データの永続性。
# Alert Graph su Neo4j
# File: neo4j_correlator.py
from neo4j import GraphDatabase
from datetime import datetime, timedelta
class Neo4jAlertCorrelator:
def __init__(self, uri: str, username: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
self._create_indexes()
def _create_indexes(self) -> None:
"""Crea indici per query performance."""
with self.driver.session() as session:
session.run("""
CREATE INDEX alert_timestamp IF NOT EXISTS
FOR (a:Alert) ON (a.timestamp)
""")
session.run("""
CREATE INDEX alert_host IF NOT EXISTS
FOR (a:Alert) ON (a.host)
""")
session.run("""
CREATE INDEX alert_user IF NOT EXISTS
FOR (a:Alert) ON (a.user)
""")
def ingest_alert(self, alert: dict) -> None:
"""Inserisce un alert e crea relazioni di correlazione."""
with self.driver.session() as session:
# Crea il nodo Alert
session.run("""
CREATE (a:Alert {
id: $id,
timestamp: datetime($timestamp),
rule_name: $rule_name,
severity: $severity,
host: $host,
user: $user,
src_ip: $src_ip,
technique_id: $technique_id
})
""", **alert)
# Crea relazione SAME_HOST con alert recenti
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp >= datetime($cutoff)
AND NOT (a)-[:SAME_HOST]-(b)
MERGE (a)-[:SAME_HOST {weight: 2.0}]-(b)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(minutes=30)).isoformat())
# Crea relazione KILL_CHAIN per progressione logica
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp < a.timestamp
AND b.timestamp >= datetime($cutoff)
AND b.technique_id IS NOT NULL
AND a.technique_id IS NOT NULL
MERGE (b)-[:PRECEDES {weight: 3.0}]->(a)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(hours=2)).isoformat())
def find_incidents(self, min_alerts: int = 3,
hours_back: int = 24) -> list[dict]:
"""Trova cluster di alert che rappresentano potenziali incidenti."""
cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat()
with self.driver.session() as session:
result = session.run("""
MATCH (a:Alert)
WHERE a.timestamp >= datetime($cutoff)
CALL apoc.path.subgraphNodes(a, {
relationshipFilter: 'SAME_HOST|SAME_USER|PRECEDES',
maxLevel: 5
}) YIELD node
WITH collect(DISTINCT node) AS cluster_nodes
WHERE size(cluster_nodes) >= $min_alerts
RETURN cluster_nodes,
reduce(s = 0, n IN cluster_nodes |
s + CASE n.severity
WHEN 'critical' THEN 4
WHEN 'high' THEN 3
WHEN 'medium' THEN 2
ELSE 1 END) AS total_score
ORDER BY total_score DESC
LIMIT 100
""", cutoff=cutoff, min_alerts=min_alerts)
return [dict(record) for record in result]
def get_attack_path(self, incident_id: str) -> list[dict]:
"""Recupera il percorso di attacco per un incidente."""
with self.driver.session() as session:
result = session.run("""
MATCH path = (start:Alert)-[:PRECEDES*]->(end:Alert)
WHERE start.id IN $incident_alerts
AND NOT ()-[:PRECEDES]->(start)
RETURN [node IN nodes(path) |
{id: node.id, technique: node.technique_id,
host: node.host, timestamp: node.timestamp}
] AS attack_path
ORDER BY length(path) DESC
LIMIT 1
""", incident_alerts=[incident_id])
return [dict(record) for record in result]
def close(self) -> None:
self.driver.close()
トリアージ ダッシュボードと API
相関および優先順位付けシステムは、SOC プラットフォーム (TheHive、 Cortex XSOAR、IBM QRadar) を利用してアナリストにワーク キューを提示できます アラートのフラットなリストではなく、インテリジェントなアラートを提供します。
# FastAPI endpoint per il triage queue
# File: triage_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
app = FastAPI(title="Alert Triage API")
class AlertIngestionRequest(BaseModel):
id: str
timestamp: str
rule_name: str
severity: str
host: str
user: str | None = None
src_ip: str | None = None
technique_id: str | None = None
raw_data: dict = {}
class TriageResponse(BaseModel):
incident_id: str
priority: str
score: float
alert_count: int
primary_host: str
attack_chain: list[str]
recommended_actions: list[str]
@app.post("/api/v1/alerts")
async def ingest_alert(alert: AlertIngestionRequest) -> dict:
"""Ingesta un alert e ritorna la correlazione risultante."""
correlator = get_correlator() # Singleton o dependency injection
alert_obj = Alert(**alert.dict())
correlator.add_alert(alert_obj)
# Ottieni cluster aggiornato
clusters = correlator.get_clusters()
affected_cluster = next(
(c for c in clusters if any(a.id == alert.id for a in c.alerts)),
None
)
if affected_cluster:
return {
"status": "correlated",
"cluster_id": affected_cluster.id,
"cluster_size": len(affected_cluster.alerts),
"cluster_score": affected_cluster.score
}
return {"status": "isolated", "cluster_id": None}
@app.get("/api/v1/triage-queue")
async def get_triage_queue(limit: int = 50, min_score: float = 0) -> list[dict]:
"""Ritorna la coda di triage prioritizzata."""
correlator = get_correlator()
scorer = get_scorer()
clusters = correlator.get_clusters()
prioritized = scorer.prioritize_queue(clusters)
return [
{
"incident_id": item['cluster'].id,
"priority": item['priority'],
"score": round(item['score'], 2),
"alert_count": len(item['cluster'].alerts),
"primary_host": item['cluster'].primary_host,
"attack_chain": item['cluster'].attack_chain,
"created_at": item['cluster'].created_at.isoformat(),
"score_breakdown": item['breakdown']
}
for item in prioritized
if item['score'] >= min_score
][:limit]
@app.get("/api/v1/incident/{incident_id}/timeline")
async def get_incident_timeline(incident_id: str) -> dict:
"""Ritorna la timeline degli eventi per un incidente."""
correlator = get_correlator()
clusters = correlator.get_clusters()
cluster = next((c for c in clusters if c.id == incident_id), None)
if not cluster:
raise HTTPException(status_code=404, detail="Incident not found")
sorted_alerts = sorted(cluster.alerts, key=lambda a: a.timestamp)
return {
"incident_id": incident_id,
"timeline": [
{
"timestamp": a.timestamp.isoformat(),
"rule": a.rule_name,
"host": a.host,
"user": a.user,
"technique": a.technique_id,
"severity": a.severity
}
for a in sorted_alerts
],
"duration_minutes": (
(sorted_alerts[-1].timestamp - sorted_alerts[0].timestamp).total_seconds() / 60
) if len(sorted_alerts) > 1 else 0
}
成功の指標とモニタリング
トリアージ自動化システムは、検証するために客観的な指標で監視する必要があります。 実際に SOC の効率が向上しており、新たな問題が発生しているわけではありません。
# Monitoring delle metriche SOC
# File: soc_metrics.py
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class SOCMetrics:
# Metriche chiave
total_alerts: int = 0
correlated_alerts: int = 0
true_positives: int = 0
false_positives: int = 0
total_incidents: int = 0
# Tempi (in minuti)
mttd_values: list[float] = field(default_factory=list) # Mean Time to Detect
mtti_values: list[float] = field(default_factory=list) # Mean Time to Investigate
mttr_values: list[float] = field(default_factory=list) # Mean Time to Respond
def correlation_rate(self) -> float:
"""% di alert correlati in incidenti."""
if self.total_alerts == 0:
return 0.0
return (self.correlated_alerts / self.total_alerts) * 100
def false_positive_rate(self) -> float:
"""% di falsi positivi sul totale investigato."""
total = self.true_positives + self.false_positives
if total == 0:
return 0.0
return (self.false_positives / total) * 100
def avg_mttd(self) -> float:
if not self.mttd_values:
return 0.0
return sum(self.mttd_values) / len(self.mttd_values)
def alert_compression_ratio(self) -> float:
"""Quanti alert per incidente in media (riduzione noise)."""
if self.total_incidents == 0:
return 1.0
return self.correlated_alerts / self.total_incidents
def report(self) -> dict:
return {
"total_alerts": self.total_alerts,
"total_incidents": self.total_incidents,
"alert_compression_ratio": f"{self.alert_compression_ratio():.1f}:1",
"correlation_rate_pct": f"{self.correlation_rate():.1f}%",
"false_positive_rate_pct": f"{self.false_positive_rate():.1f}%",
"avg_mttd_minutes": f"{self.avg_mttd():.1f}",
"avg_mtti_minutes": (
f"{sum(self.mtti_values)/len(self.mtti_values):.1f}"
if self.mtti_values else "N/A"
)
}
自動トリアージにおけるアンチパターン
- 相関しきい値が低すぎる: 24 時間以内にアラートを関連付けます 同じホスト上に巨大で役に立たないクラスターが作成されます。狭い時間枠を使用する (15 ~ 30 分) 弱い相関の場合。
- アセットコンテキストなしのスコア: ハニーポットに関する「高」アラート、およびそれ以下のアラート ドメイン コントローラー上の同じアラートの緊急。重要な資産を常に充実させます。
- フィードバックループのない自動化: システムはユーザーのフィードバックから学習する必要があります アナリスト(TP/FP)は時間の経過とともに改善します。静的なシステムは劣化します。
- 単一アラート インシデントを無視する: すべての攻撃が警戒心を高めるわけではありません。 重要な分離されたアラート (DCSync など) は、相関をバイパスして P1 キューに直接送信する必要があります。
TheHive および SOAR との統合
トリアージ システムは、TheHive や Cortex などの SOAR プラットフォームと自然に統合されます。 インシデントのライフサイクルと対応の自動化を管理します。
# Integrazione TheHive
# File: thehive_integration.py
import httpx
from datetime import datetime
class TheHiveIntegration:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_case_from_cluster(self, cluster: AlertCluster,
score_info: dict) -> str:
"""Crea un caso TheHive da un cluster di alert."""
severity_map = {
'P1 - Critical': 3,
'P2 - High': 2,
'P3 - Medium': 1,
'P4 - Low': 1
}
case_payload = {
"title": f"[AUTO] Incident {cluster.id[:8]} - {cluster.primary_host}",
"description": self._build_description(cluster, score_info),
"severity": severity_map.get(score_info.get('priority', 'P4 - Low'), 1),
"startDate": int(cluster.created_at.timestamp() * 1000),
"tags": [
f"auto-generated",
f"host:{cluster.primary_host}",
f"alerts:{len(cluster.alerts)}",
*[f"att&ck:{t}" for t in cluster.attack_chain[:5]]
],
"tasks": self._generate_tasks(cluster, score_info)
}
with httpx.Client() as client:
response = client.post(
f"{self.base_url}/api/case",
json=case_payload,
headers=self.headers
)
response.raise_for_status()
return response.json()['id']
def _build_description(self, cluster: AlertCluster,
score_info: dict) -> str:
alerts_summary = "\n".join(
f"- [{a.severity.upper()}] {a.rule_name} @ {a.host} ({a.timestamp.strftime('%H:%M:%S')})"
for a in sorted(cluster.alerts, key=lambda x: x.timestamp)
)
return f"""## Alert Cluster Auto-Generated
**Score**: {score_info.get('score', 'N/A')}
**Priority**: {score_info.get('priority', 'N/A')}
**Primary Host**: {cluster.primary_host}
**Alert Count**: {len(cluster.alerts)}
### Attack Chain
{' -> '.join(cluster.attack_chain) if cluster.attack_chain else 'N/A'}
### Alert Timeline
{alerts_summary}
### Score Breakdown
{chr(10).join(f"- {k}: {v}" for k, v in score_info.get('breakdown', {}).items())}
"""
def _generate_tasks(self, cluster: AlertCluster,
score_info: dict) -> list[dict]:
"""Genera task di investigazione automatici."""
tasks = [
{"title": "Verify alert legitimacy", "order": 0},
{"title": f"Investigate host: {cluster.primary_host}", "order": 1},
]
if len(set(a.host for a in cluster.alerts)) > 1:
tasks.append({"title": "Assess lateral movement scope", "order": 2})
if cluster.attack_chain:
tasks.append({"title": "Map attack progression to ATT&CK", "order": 3})
tasks.append({"title": "Document findings and close/escalate", "order": 99})
return tasks
結論と重要なポイント
グラフ分析による自動トリアージは贅沢品ではなく、運用上必要不可欠なものです。 アナリストの数を比例的に拡張せずに拡張したいと考えている SOC。 アラートノイズの低減、状況に応じた相関付け、およびインテリジェントな優先順位付け これにより、アナリストは本当に重要なこと、つまり本当の脅威の調査に集中できるようになります。
重要なポイント
- グラフ分析により、個別のアラートを状況に応じた攻撃シナリオに変換します
- 多要素スコアリング (重大度 + 資産の重要度 + キル チェーン + 脅威情報) は、重大度による単純なランキングよりも優れています。
- プロトタイプには NetworkX、エンタープライズ生産には Neo4j
- SOAR (TheHive、XSOAR) との統合により、自動化調査ループが終了します
- SOC メトリクスを常に監視: MTTD、誤検知率、アラート圧縮率
- アナリストからのフィードバックはシステムの継続的な改善の基礎です
関連記事
- Python の SOAR プレイブック: インシデント対応の自動化
- AI 支援検出: シグマ ルール生成用の LLM
- 動作異常の検出: ログ データの ML
- 専門分野としての検出エンジニアリング: スクリプトからパイプラインまで







