경보 분류 자동화: 그래프 분석으로 MTTD 감소
경보 분류 문제는 SOC 분석가의 삶에서 가장 비용이 많이 들고 좌절감을 주는 문제 중 하나입니다. IBM 2025 데이터에 따르면 침해를 식별하는 평균 시간(MTTD - Mean Time to Detect)과 의 194일 고급 자동화를 구현하지 않은 조직의 경우. 반면, AI 자동화와 그래프 기반 상관관계를 결합한 조직은 이러한 현상을 줄입니다. 가치를 몇 시간 또는 심지어 몇 분 동일한 위협 카테고리에 대해
문제의 핵심은 경보 피로입니다. 평균 SOC는 하루에 수천 개의 경보를 일정한 속도로 처리합니다. 일부 환경에서는 97%를 초과하는 오탐률이 발생합니다. 분석가는 대부분의 시간을 보낸다. 실제 위협을 조사하는 대신 양성 경고를 평가합니다. 그래프 분석은 한 가지 접근 방식을 제공합니다. 근본적으로 다릅니다. 각 경고를 개별적으로 평가하는 대신 그래프로 경고의 상관 관계를 분석합니다. 다단계 공격 패턴을 식별하고 심각도에 따라 자동으로 우선순위를 지정합니다. 상황에 맞는.
무엇을 배울 것인가
- 기존 분류가 제대로 확장되지 않는 이유와 그래프 분석을 통해 이를 변환하는 방법
- 그래프 기반 경보 상관관계 시스템 아키텍처
- NetworkX 및 Neo4j를 사용한 실제 구현
- 자동 우선순위 지정을 위한 점수 알고리즘
- 기존 SOC 파이프라인과 통합
- 성공 지표: MTTD, 거짓 긍정 비율, 분석가 처리량
경보 피로 문제
솔루션을 구축하기 전에 문제를 깊이 이해하는 것이 필요합니다. 경고 피로는 그렇지 않습니다. 단순히 "경보가 너무 많음": 이는 기존 SIEM 아키텍처에 뿌리를 둔 시스템적 문제입니다. 그리고 인간의 인지 한계 내에서.
기존 SIEM은 일련의 규칙에 따라 개별적으로 각 로그 이벤트를 평가합니다. 규칙이 있을 때 일치하면 경고가 생성됩니다. 결과는 다음과 같습니다.
- 합법적인 네트워크 스캔(예: Nessus 취약점 스캔)은 수백 개의 경고를 생성합니다.
Port Scan Detected - 자동 패치 프로세스는 수십 개의 경고를 생성합니다.
Suspicious Process Creation - VPN을 통해 집에서 로그인하는 사용자는 경고를 생성합니다.
Impossible Travel올바르게 구성되지 않은 경우
그래프 분석은 이 문제를 우아하게 해결합니다. 그 일부인 경고를 그룹화합니다. 동일한 공격 시나리오 단일 상황화된 사건에서 다음을 제공합니다. 분석가는 몇 분이 아닌 몇 초 만에 정보에 근거한 결정을 내리는 데 필요한 상황을 파악합니다.
부문 데이터(2025)
- 조직의 73%가 자동화된 경고 분류 기능을 갖추고 있습니다(Gurucul 2025)
- AI 자동화로 채택자의 60%에 대해 조사 시간이 25~50% 단축됩니다.
- ReliaQuest: AI 자동화 사용 시 응답 시간 < 7분 vs. AI 자동화 사용 시 2.3일
- Dropzone AI: 3~10분 조사로 90% 경고 적용 범위
그래프 기반 경고 상관관계의 기본
기본적이고 간단한 개념: 모든 경고는 마디 그래프에서, 그리고 처지 경고 간(동일한 호스트, 동일한 사용자, 동일한 기간, 동일한 ATT&CK 기술)은 아치. 결과 그래프는 다음과 같습니다. 잠재적인 공격 시나리오를 나타내는 관련 경고 클러스터입니다.
가장 유용한 상관관계 유형은 다음과 같습니다.
| 상관관계 유형 | 표준 | Forza | Esempio |
|---|---|---|---|
| 폭풍 | 창 T 내의 경고(예: 5분) | 낮은 | 포트 스캔 + 무차별 공격을 동시에 수행 |
| 실재 | 동일한 호스트/IP/사용자 | 평균 | 동일한 엔드포인트에 대한 다른 경고 |
| ATT&CK 킬 체인 | 논리적 순서의 기술 | 높은 | 정찰 + 초기 액세스 + 지속성 |
| IOC 중복 | 동일한 악성 해시/도메인/IP | 높은 | 동일한 C2 및 경고 |
| 인과관계 | 상위/하위 프로세스, 원본 네트워크 | 매우 높음 | cmd.exe는 페이로드를 다운로드하는 word.exe로 시작됩니다. |
구현: NetworkX를 사용한 경고 그래프
다음을 사용하여 Python 구현부터 시작하겠습니다. 네트워크X 그래프 관리를 위해 이 솔루션은 프로토타입 및 중간 규모 환경(일일 최대 100,000개의 경고)에 적합합니다. 더 높은 볼륨의 경우 Neo4j가 사용됩니다(다음 섹션 참조).
# Sistema di Alert Graph Correlation
# File: alert_graph.py
import networkx as nx
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
import uuid
import json
@dataclass
class Alert:
id: str
timestamp: datetime
rule_name: str
severity: str # 'low', 'medium', 'high', 'critical'
host: str
user: Optional[str]
src_ip: Optional[str]
dst_ip: Optional[str]
technique_id: Optional[str] # MITRE ATT&CK ID (es. T1059.001)
raw_data: dict = field(default_factory=dict)
def severity_score(self) -> int:
return {'low': 1, 'medium': 2, 'high': 3, 'critical': 4}.get(self.severity, 1)
@dataclass
class AlertCluster:
id: str
alerts: list[Alert]
score: float
attack_chain: list[str] # Sequenza di tecniche ATT&CK
primary_host: str
created_at: datetime
class AlertGraphCorrelator:
# Finestra temporale per correlazione (default: 30 minuti)
CORRELATION_WINDOW_MINUTES = 30
# Pesi per il calcolo del punteggio cluster
WEIGHT_SEVERITY = 3.0
WEIGHT_TECHNIQUE_CHAIN = 5.0
WEIGHT_SAME_HOST = 2.0
WEIGHT_SAME_USER = 2.5
WEIGHT_IOC_OVERLAP = 4.0
# Kill chain ATT&CK semplificata per correlazione
KILL_CHAIN_ORDER = [
'TA0043', # Reconnaissance
'TA0042', # Resource Development
'TA0001', # Initial Access
'TA0002', # Execution
'TA0003', # Persistence
'TA0004', # Privilege Escalation
'TA0005', # Defense Evasion
'TA0006', # Credential Access
'TA0007', # Discovery
'TA0008', # Lateral Movement
'TA0009', # Collection
'TA0011', # Command and Control
'TA0010', # Exfiltration
'TA0040', # Impact
]
def __init__(self):
self.graph = nx.DiGraph()
self.alerts: dict[str, Alert] = {}
def add_alert(self, alert: Alert) -> None:
"""Aggiunge un alert al grafo e crea correlazioni."""
self.alerts[alert.id] = alert
# Aggiungi nodo con attributi
self.graph.add_node(alert.id, **{
'timestamp': alert.timestamp.isoformat(),
'severity': alert.severity,
'host': alert.host,
'user': alert.user,
'technique': alert.technique_id,
'score': alert.severity_score()
})
# Cerca correlazioni con alert esistenti
for existing_id, existing in self.alerts.items():
if existing_id == alert.id:
continue
correlations = self._calculate_correlations(alert, existing)
if correlations:
total_weight = sum(c['weight'] for c in correlations)
edge_labels = [c['type'] for c in correlations]
self.graph.add_edge(
existing_id, alert.id,
weight=total_weight,
correlation_types=edge_labels
)
def _calculate_correlations(self, alert1: Alert,
alert2: Alert) -> list[dict]:
"""Calcola le correlazioni tra due alert."""
correlations = []
# 1. Correlazione temporale
time_diff = abs((alert1.timestamp - alert2.timestamp).total_seconds())
if time_diff <= self.CORRELATION_WINDOW_MINUTES * 60:
time_weight = 1.0 - (time_diff / (self.CORRELATION_WINDOW_MINUTES * 60))
correlations.append({'type': 'temporal', 'weight': time_weight})
# 2. Stesso host
if alert1.host == alert2.host:
correlations.append({'type': 'same_host', 'weight': self.WEIGHT_SAME_HOST})
# 3. Stesso utente
if (alert1.user and alert2.user and alert1.user == alert2.user):
correlations.append({'type': 'same_user', 'weight': self.WEIGHT_SAME_USER})
# 4. IOC overlap (IP)
if (alert1.src_ip and alert2.src_ip and alert1.src_ip == alert2.src_ip):
correlations.append({'type': 'ioc_overlap_ip', 'weight': self.WEIGHT_IOC_OVERLAP})
# 5. Kill chain sequenziale ATT&CK
if alert1.technique_id and alert2.technique_id:
chain_score = self._calculate_kill_chain_score(
alert1.technique_id, alert2.technique_id
)
if chain_score > 0:
correlations.append({'type': 'kill_chain', 'weight': chain_score})
return correlations
def _calculate_kill_chain_score(self, technique1: str,
technique2: str) -> float:
"""Calcola un punteggio basato sulla progressione kill chain."""
# Mapping semplificato tecnica -> tattica
# In produzione si usa la MITRE ATT&CK API
technique_to_tactic = {
'T1595': 'TA0043', # Recon - Active Scanning
'T1190': 'TA0001', # Initial Access - Exploit Public-Facing App
'T1059': 'TA0002', # Execution - Command and Scripting
'T1053': 'TA0003', # Persistence - Scheduled Task
'T1078': 'TA0004', # Privilege Escalation - Valid Accounts
'T1562': 'TA0005', # Defense Evasion - Impair Defenses
'T1003': 'TA0006', # Credential Access - OS Credential Dumping
'T1087': 'TA0007', # Discovery - Account Discovery
'T1021': 'TA0008', # Lateral Movement - Remote Services
'T1071': 'TA0011', # C2 - Application Layer Protocol
}
tactic1 = technique_to_tactic.get(technique1)
tactic2 = technique_to_tactic.get(technique2)
if not tactic1 or not tactic2:
return 0.0
try:
idx1 = self.KILL_CHAIN_ORDER.index(tactic1)
idx2 = self.KILL_CHAIN_ORDER.index(tactic2)
# Punteggio più alto se la progressione e logica (tecnica più avanzata dopo)
if idx2 > idx1:
progression = (idx2 - idx1) / len(self.KILL_CHAIN_ORDER)
return self.WEIGHT_TECHNIQUE_CHAIN * progression
except ValueError:
pass
return 0.0
def get_clusters(self, min_cluster_size: int = 2) -> list[AlertCluster]:
"""Identifica cluster di alert correlati."""
# Usa connected components sul grafo non diretto per trovare i cluster
undirected = self.graph.to_undirected()
components = list(nx.connected_components(undirected))
clusters = []
for component in components:
if len(component) < min_cluster_size:
continue
component_alerts = [self.alerts[aid] for aid in component
if aid in self.alerts]
score = self._calculate_cluster_score(component_alerts, component)
attack_chain = self._extract_attack_chain(component_alerts)
primary_host = self._find_primary_host(component_alerts)
clusters.append(AlertCluster(
id=str(uuid.uuid4()),
alerts=component_alerts,
score=score,
attack_chain=attack_chain,
primary_host=primary_host,
created_at=datetime.now()
))
# Ordina per score decrescente (massima priorità prima)
return sorted(clusters, key=lambda c: c.score, reverse=True)
def _calculate_cluster_score(self, alerts: list[Alert],
component: set) -> float:
"""Calcola il punteggio di priorità del cluster."""
score = 0.0
# 1. Contributo severita
severity_sum = sum(a.severity_score() for a in alerts)
max_severity = max(a.severity_score() for a in alerts)
score += severity_sum * self.WEIGHT_SEVERITY
score += max_severity * 2 # Bonus per alert critici
# 2. Numero di tecniche ATT&CK distinte
techniques = set(a.technique_id for a in alerts if a.technique_id)
score += len(techniques) * self.WEIGHT_TECHNIQUE_CHAIN
# 3. Peso archi nel sottografo
subgraph = self.graph.subgraph(component)
edge_weight_sum = sum(
data.get('weight', 0)
for _, _, data in subgraph.edges(data=True)
)
score += edge_weight_sum
# 4. Numero host distinti (lateral movement indicator)
hosts = set(a.host for a in alerts)
if len(hosts) > 1:
score += len(hosts) * 3.0 # Lateral movement e molto significativo
return score
def _extract_attack_chain(self, alerts: list[Alert]) -> list[str]:
"""Estrae la kill chain osservata dal cluster."""
techniques = [a.technique_id for a in alerts if a.technique_id]
# Ordina per timestamp
sorted_alerts = sorted(alerts, key=lambda a: a.timestamp)
return [a.technique_id for a in sorted_alerts if a.technique_id]
def _find_primary_host(self, alerts: list[Alert]) -> str:
"""Identifica l'host più coinvolto nel cluster."""
host_counts = {}
for alert in alerts:
host_counts[alert.host] = host_counts.get(alert.host, 0) + 1
return max(host_counts, key=host_counts.get) if host_counts else 'unknown'
다단계 채점을 통한 자동 우선순위 지정
클러스터 점수는 개별 경고의 심각도뿐만 아니라 는 상황적 맥락: 킬 체인의 진행, 관련된 자산, 알려진 악성 IOC의 존재.
# Sistema di scoring avanzato con contesto asset
# File: alert_scorer.py
@dataclass
class AssetCriticality:
hostname: str
criticality: str # 'low', 'medium', 'high', 'critical'
asset_type: str # 'workstation', 'server', 'dc', 'database', 'ot'
business_owner: str
class ContextualScorer:
# Moltiplicatori per criticalita asset
ASSET_MULTIPLIERS = {
'workstation': 1.0,
'server': 1.5,
'database': 2.0,
'dc': 3.0, # Domain Controller
'ot': 4.0 # OT/ICS systems
}
CRITICALITY_MULTIPLIERS = {
'low': 1.0,
'medium': 1.5,
'high': 2.0,
'critical': 3.0
}
def __init__(self, asset_registry: dict[str, AssetCriticality],
threat_intel_ips: set[str]):
self.asset_registry = asset_registry
self.threat_intel_ips = threat_intel_ips
def score_cluster(self, cluster: AlertCluster) -> dict:
"""Calcola score completo con breakdown."""
base_score = cluster.score
context_multiplier = 1.0
breakdown = {}
# 1. Asset criticality multiplier
asset = self.asset_registry.get(cluster.primary_host)
if asset:
type_mult = self.ASSET_MULTIPLIERS.get(asset.asset_type, 1.0)
crit_mult = self.CRITICALITY_MULTIPLIERS.get(asset.criticality, 1.0)
asset_mult = type_mult * crit_mult
context_multiplier *= asset_mult
breakdown['asset_multiplier'] = asset_mult
# 2. Threat Intel overlap
ti_hits = sum(
1 for alert in cluster.alerts
if alert.src_ip in self.threat_intel_ips
)
if ti_hits > 0:
ti_boost = 1.0 + (ti_hits * 0.5)
context_multiplier *= ti_boost
breakdown['threat_intel_boost'] = ti_boost
# 3. Kill chain completeness
chain_length = len(cluster.attack_chain)
chain_multiplier = 1.0 + (chain_length * 0.2) # +20% per ogni step
context_multiplier *= chain_multiplier
breakdown['chain_multiplier'] = chain_multiplier
# 4. Time pressure (alert recenti hanno priorità maggiore)
most_recent = max(a.timestamp for a in cluster.alerts)
age_minutes = (datetime.now() - most_recent).total_seconds() / 60
recency_multiplier = max(0.5, 1.0 - (age_minutes / 1440)) # Decade in 24h
context_multiplier *= recency_multiplier
breakdown['recency_multiplier'] = recency_multiplier
final_score = base_score * context_multiplier
breakdown['base_score'] = base_score
breakdown['context_multiplier'] = context_multiplier
breakdown['final_score'] = final_score
return breakdown
def prioritize_queue(self, clusters: list[AlertCluster]) -> list[dict]:
"""Genera la coda di lavoro prioritizzata per gli analisti."""
scored = []
for cluster in clusters:
score_breakdown = self.score_cluster(cluster)
scored.append({
'cluster': cluster,
'score': score_breakdown['final_score'],
'breakdown': score_breakdown,
'priority': self._score_to_priority(score_breakdown['final_score'])
})
return sorted(scored, key=lambda x: x['score'], reverse=True)
def _score_to_priority(self, score: float) -> str:
if score >= 100:
return 'P1 - Critical'
elif score >= 50:
return 'P2 - High'
elif score >= 20:
return 'P3 - Medium'
else:
return 'P4 - Low'
Enterprise Volumes용 Neo4j와의 통합
하루에 수백만 건의 경고가 발생하는 엔터프라이즈 환경의 경우 인메모리 NetworkX는 확장 가능하지 않습니다. 네오4j가장 널리 사용되는 그래프 데이터베이스인 는 기본 쿼리 성능을 제공합니다. 복잡한 상관관계와 과거 데이터의 지속성.
# Alert Graph su Neo4j
# File: neo4j_correlator.py
from neo4j import GraphDatabase
from datetime import datetime, timedelta
class Neo4jAlertCorrelator:
def __init__(self, uri: str, username: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
self._create_indexes()
def _create_indexes(self) -> None:
"""Crea indici per query performance."""
with self.driver.session() as session:
session.run("""
CREATE INDEX alert_timestamp IF NOT EXISTS
FOR (a:Alert) ON (a.timestamp)
""")
session.run("""
CREATE INDEX alert_host IF NOT EXISTS
FOR (a:Alert) ON (a.host)
""")
session.run("""
CREATE INDEX alert_user IF NOT EXISTS
FOR (a:Alert) ON (a.user)
""")
def ingest_alert(self, alert: dict) -> None:
"""Inserisce un alert e crea relazioni di correlazione."""
with self.driver.session() as session:
# Crea il nodo Alert
session.run("""
CREATE (a:Alert {
id: $id,
timestamp: datetime($timestamp),
rule_name: $rule_name,
severity: $severity,
host: $host,
user: $user,
src_ip: $src_ip,
technique_id: $technique_id
})
""", **alert)
# Crea relazione SAME_HOST con alert recenti
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp >= datetime($cutoff)
AND NOT (a)-[:SAME_HOST]-(b)
MERGE (a)-[:SAME_HOST {weight: 2.0}]-(b)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(minutes=30)).isoformat())
# Crea relazione KILL_CHAIN per progressione logica
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp < a.timestamp
AND b.timestamp >= datetime($cutoff)
AND b.technique_id IS NOT NULL
AND a.technique_id IS NOT NULL
MERGE (b)-[:PRECEDES {weight: 3.0}]->(a)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(hours=2)).isoformat())
def find_incidents(self, min_alerts: int = 3,
hours_back: int = 24) -> list[dict]:
"""Trova cluster di alert che rappresentano potenziali incidenti."""
cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat()
with self.driver.session() as session:
result = session.run("""
MATCH (a:Alert)
WHERE a.timestamp >= datetime($cutoff)
CALL apoc.path.subgraphNodes(a, {
relationshipFilter: 'SAME_HOST|SAME_USER|PRECEDES',
maxLevel: 5
}) YIELD node
WITH collect(DISTINCT node) AS cluster_nodes
WHERE size(cluster_nodes) >= $min_alerts
RETURN cluster_nodes,
reduce(s = 0, n IN cluster_nodes |
s + CASE n.severity
WHEN 'critical' THEN 4
WHEN 'high' THEN 3
WHEN 'medium' THEN 2
ELSE 1 END) AS total_score
ORDER BY total_score DESC
LIMIT 100
""", cutoff=cutoff, min_alerts=min_alerts)
return [dict(record) for record in result]
def get_attack_path(self, incident_id: str) -> list[dict]:
"""Recupera il percorso di attacco per un incidente."""
with self.driver.session() as session:
result = session.run("""
MATCH path = (start:Alert)-[:PRECEDES*]->(end:Alert)
WHERE start.id IN $incident_alerts
AND NOT ()-[:PRECEDES]->(start)
RETURN [node IN nodes(path) |
{id: node.id, technique: node.technique_id,
host: node.host, timestamp: node.timestamp}
] AS attack_path
ORDER BY length(path) DESC
LIMIT 1
""", incident_alerts=[incident_id])
return [dict(record) for record in result]
def close(self) -> None:
self.driver.close()
심사 대시보드 및 API
상관 관계 및 우선 순위 지정 시스템은 SOC 플랫폼(TheHive, Cortex XSOAR, IBM QRadar)를 사용하여 분석가에게 작업 대기열을 제공할 수 있습니다. 단순 알림 목록 대신 지능적입니다.
# FastAPI endpoint per il triage queue
# File: triage_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
app = FastAPI(title="Alert Triage API")
class AlertIngestionRequest(BaseModel):
id: str
timestamp: str
rule_name: str
severity: str
host: str
user: str | None = None
src_ip: str | None = None
technique_id: str | None = None
raw_data: dict = {}
class TriageResponse(BaseModel):
incident_id: str
priority: str
score: float
alert_count: int
primary_host: str
attack_chain: list[str]
recommended_actions: list[str]
@app.post("/api/v1/alerts")
async def ingest_alert(alert: AlertIngestionRequest) -> dict:
"""Ingesta un alert e ritorna la correlazione risultante."""
correlator = get_correlator() # Singleton o dependency injection
alert_obj = Alert(**alert.dict())
correlator.add_alert(alert_obj)
# Ottieni cluster aggiornato
clusters = correlator.get_clusters()
affected_cluster = next(
(c for c in clusters if any(a.id == alert.id for a in c.alerts)),
None
)
if affected_cluster:
return {
"status": "correlated",
"cluster_id": affected_cluster.id,
"cluster_size": len(affected_cluster.alerts),
"cluster_score": affected_cluster.score
}
return {"status": "isolated", "cluster_id": None}
@app.get("/api/v1/triage-queue")
async def get_triage_queue(limit: int = 50, min_score: float = 0) -> list[dict]:
"""Ritorna la coda di triage prioritizzata."""
correlator = get_correlator()
scorer = get_scorer()
clusters = correlator.get_clusters()
prioritized = scorer.prioritize_queue(clusters)
return [
{
"incident_id": item['cluster'].id,
"priority": item['priority'],
"score": round(item['score'], 2),
"alert_count": len(item['cluster'].alerts),
"primary_host": item['cluster'].primary_host,
"attack_chain": item['cluster'].attack_chain,
"created_at": item['cluster'].created_at.isoformat(),
"score_breakdown": item['breakdown']
}
for item in prioritized
if item['score'] >= min_score
][:limit]
@app.get("/api/v1/incident/{incident_id}/timeline")
async def get_incident_timeline(incident_id: str) -> dict:
"""Ritorna la timeline degli eventi per un incidente."""
correlator = get_correlator()
clusters = correlator.get_clusters()
cluster = next((c for c in clusters if c.id == incident_id), None)
if not cluster:
raise HTTPException(status_code=404, detail="Incident not found")
sorted_alerts = sorted(cluster.alerts, key=lambda a: a.timestamp)
return {
"incident_id": incident_id,
"timeline": [
{
"timestamp": a.timestamp.isoformat(),
"rule": a.rule_name,
"host": a.host,
"user": a.user,
"technique": a.technique_id,
"severity": a.severity
}
for a in sorted_alerts
],
"duration_minutes": (
(sorted_alerts[-1].timestamp - sorted_alerts[0].timestamp).total_seconds() / 60
) if len(sorted_alerts) > 1 else 0
}
성공 지표 및 모니터링
모든 심사 자동화 시스템은 객관적인 지표로 모니터링하여 검증해야 합니다. 실제로 SOC의 효율성이 향상되고 새로운 문제가 발생하지 않는다는 것입니다.
# Monitoring delle metriche SOC
# File: soc_metrics.py
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class SOCMetrics:
# Metriche chiave
total_alerts: int = 0
correlated_alerts: int = 0
true_positives: int = 0
false_positives: int = 0
total_incidents: int = 0
# Tempi (in minuti)
mttd_values: list[float] = field(default_factory=list) # Mean Time to Detect
mtti_values: list[float] = field(default_factory=list) # Mean Time to Investigate
mttr_values: list[float] = field(default_factory=list) # Mean Time to Respond
def correlation_rate(self) -> float:
"""% di alert correlati in incidenti."""
if self.total_alerts == 0:
return 0.0
return (self.correlated_alerts / self.total_alerts) * 100
def false_positive_rate(self) -> float:
"""% di falsi positivi sul totale investigato."""
total = self.true_positives + self.false_positives
if total == 0:
return 0.0
return (self.false_positives / total) * 100
def avg_mttd(self) -> float:
if not self.mttd_values:
return 0.0
return sum(self.mttd_values) / len(self.mttd_values)
def alert_compression_ratio(self) -> float:
"""Quanti alert per incidente in media (riduzione noise)."""
if self.total_incidents == 0:
return 1.0
return self.correlated_alerts / self.total_incidents
def report(self) -> dict:
return {
"total_alerts": self.total_alerts,
"total_incidents": self.total_incidents,
"alert_compression_ratio": f"{self.alert_compression_ratio():.1f}:1",
"correlation_rate_pct": f"{self.correlation_rate():.1f}%",
"false_positive_rate_pct": f"{self.false_positive_rate():.1f}%",
"avg_mttd_minutes": f"{self.avg_mttd():.1f}",
"avg_mtti_minutes": (
f"{sum(self.mtti_values)/len(self.mtti_values):.1f}"
if self.mtti_values else "N/A"
)
}
분류 자동화의 안티 패턴
- 상관관계 임계값이 너무 낮습니다.: 24시간 이내에 모든 경고의 상관 관계를 분석합니다. 동일한 호스트에 거대하고 쓸모없는 클러스터가 생성됩니다. 좁은 시간 창(15~30분)을 사용하세요. 약한 상관 관계에 대해.
- 자산 맥락 없이 점수를 매기세요: 허니팟에 대한 "높음" 경고 및 훨씬 더 적은 경고 도메인 컨트롤러에 동일한 경고가 긴급합니다. 항상 중요한 자산으로 풍요롭게 하세요.
- 피드백 루프 없는 자동화: 시스템은 사용자 피드백을 통해 학습해야 합니다. 분석가(TP/FP)는 시간이 지남에 따라 개선됩니다. 정적 시스템의 성능이 저하됩니다.
- 단일 경고 사고 무시: 모든 공격이 당신을 더 경계하게 만드는 것은 아닙니다. 중요하고 격리된 경고(예: DCSync)는 상관 관계를 우회하고 P1 대기열로 직접 이동해야 합니다.
TheHive 및 SOAR과 통합
분류 시스템은 TheHive 또는 Cortex와 같은 SOAR 플랫폼과 자연스럽게 통합됩니다. 사고 수명주기 및 대응 자동화를 관리합니다.
# Integrazione TheHive
# File: thehive_integration.py
import httpx
from datetime import datetime
class TheHiveIntegration:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def create_case_from_cluster(self, cluster: AlertCluster,
score_info: dict) -> str:
"""Crea un caso TheHive da un cluster di alert."""
severity_map = {
'P1 - Critical': 3,
'P2 - High': 2,
'P3 - Medium': 1,
'P4 - Low': 1
}
case_payload = {
"title": f"[AUTO] Incident {cluster.id[:8]} - {cluster.primary_host}",
"description": self._build_description(cluster, score_info),
"severity": severity_map.get(score_info.get('priority', 'P4 - Low'), 1),
"startDate": int(cluster.created_at.timestamp() * 1000),
"tags": [
f"auto-generated",
f"host:{cluster.primary_host}",
f"alerts:{len(cluster.alerts)}",
*[f"att&ck:{t}" for t in cluster.attack_chain[:5]]
],
"tasks": self._generate_tasks(cluster, score_info)
}
with httpx.Client() as client:
response = client.post(
f"{self.base_url}/api/case",
json=case_payload,
headers=self.headers
)
response.raise_for_status()
return response.json()['id']
def _build_description(self, cluster: AlertCluster,
score_info: dict) -> str:
alerts_summary = "\n".join(
f"- [{a.severity.upper()}] {a.rule_name} @ {a.host} ({a.timestamp.strftime('%H:%M:%S')})"
for a in sorted(cluster.alerts, key=lambda x: x.timestamp)
)
return f"""## Alert Cluster Auto-Generated
**Score**: {score_info.get('score', 'N/A')}
**Priority**: {score_info.get('priority', 'N/A')}
**Primary Host**: {cluster.primary_host}
**Alert Count**: {len(cluster.alerts)}
### Attack Chain
{' -> '.join(cluster.attack_chain) if cluster.attack_chain else 'N/A'}
### Alert Timeline
{alerts_summary}
### Score Breakdown
{chr(10).join(f"- {k}: {v}" for k, v in score_info.get('breakdown', {}).items())}
"""
def _generate_tasks(self, cluster: AlertCluster,
score_info: dict) -> list[dict]:
"""Genera task di investigazione automatici."""
tasks = [
{"title": "Verify alert legitimacy", "order": 0},
{"title": f"Investigate host: {cluster.primary_host}", "order": 1},
]
if len(set(a.host for a in cluster.alerts)) > 1:
tasks.append({"title": "Assess lateral movement scope", "order": 2})
if cluster.attack_chain:
tasks.append({"title": "Map attack progression to ATT&CK", "order": 3})
tasks.append({"title": "Document findings and close/escalate", "order": 99})
return tasks
결론 및 주요 시사점
그래프 분석을 통한 분류 자동화는 사치가 아닙니다. 분석가 수를 비례적으로 확장하지 않고 확장하려는 SOC. 경고 소음 감소, 상황에 따른 상관 관계 및 지능형 우선 순위 지정 이를 통해 분석가는 실제로 중요한 것, 즉 실제 위협을 조사하는 데 집중할 수 있습니다.
주요 시사점
- 그래프 분석은 격리된 경고를 상황에 맞는 공격 시나리오로 변환합니다.
- 다중 요소 채점(심각도 + 자산 중요도 + 킬 체인 + 위협 정보)은 심각도에 따른 단순 순위보다 우수합니다.
- 프로토타입용 NetworkX, 엔터프라이즈 프로덕션용 Neo4j
- SOAR(TheHive, XSOAR)과의 통합으로 자동화 조사 루프가 종료됩니다.
- 항상 SOC 지표 모니터링: MTTD, 거짓 긍정 비율, 경고 압축 비율
- 분석가 피드백은 시스템의 지속적인 개선에 필수적입니다.
관련 기사
- Python의 SOAR 플레이북: 사고 대응 자동화
- AI 지원 탐지: 시그마 규칙 생성을 위한 LLM
- 이상 행동 탐지: 로그 데이터에 대한 ML
- 분야로서의 탐지 엔지니어링: 스크립트에서 파이프라인까지







