Alert Triage Automation: Reducing MTTD with Graph Analysis
Alert triage is one of the most costly and frustrating challenges in the life of a SOC analyst. According to IBM 2025 data, the average Mean Time to Detect (MTTD) is still 194 days for organizations that have not implemented advanced automation. On the other hand, organizations combining AI automation with graph-based correlation reduce this figure to hours or even minutes for the same threat categories.
The core of the problem is alert fatigue: an average SOC manages thousands of alerts per day, with a false positive rate that in some environments exceeds 97%. Analysts spend most of their time evaluating benign alerts instead of investigating real threats. Graph analysis offers a radically different approach: instead of evaluating each alert in isolation, it correlates alerts into activity graphs, identifies multi-step attack patterns, and automatically prioritizes based on contextual severity.
What You Will Learn
- Why traditional triage does not scale and how graph analysis transforms it
- Architecture of a graph-based alert correlation system
- Practical implementation with NetworkX and Neo4j
- Multi-factor scoring algorithms for automatic prioritization
- Integration with existing SOC pipelines
- Success metrics: MTTD, false positive rate, analyst throughput
The Alert Fatigue Problem
Before building solutions, it is necessary to deeply understand the problem. Alert fatigue is not simply "too many alerts": it is a systemic problem rooted in the architecture of traditional SIEMs and human cognitive limits.
A traditional SIEM evaluates each log event in isolation against a set of rules. When a rule matches, it generates an alert. The result is:
- A legitimate network scan (e.g., Nessus vulnerability scan) generates hundreds of
Port Scan Detectedalerts - An automated patching process generates dozens of
Suspicious Process Creationalerts - A user accessing from home via VPN generates
Impossible Travelalerts if not properly configured
Graph analysis elegantly solves this problem by grouping alerts that are part of the same attack scenario into a single contextualized incident, giving analysts the context needed to make informed decisions in seconds rather than minutes.
Industry Data (2025)
- 73% of organizations have automated alert triage (Gurucul 2025)
- AI automation reduces investigation time by 25-50% for 60% of adopters
- ReliaQuest: with AI automation, response time < 7 minutes vs 2.3 days without
- Dropzone AI: 90% alert coverage with 3-10 minute investigations
Fundamentals of Graph-Based Alert Correlation
The fundamental concept is simple: each alert is a node in the graph, and the relationships between alerts (same host, same user, same time window, same ATT&CK technique) are the edges. The resulting graph reveals clusters of correlated alerts representing potential attack scenarios.
| Correlation Type | Criterion | Strength | Example |
|---|---|---|---|
| Temporal | Alerts within window T (e.g. 5 min) | Low | Port scan + brute force in same hour |
| Entity | Same host/IP/user | Medium | Different alerts on same endpoint |
| ATT&CK Kill Chain | Techniques in logical sequence | High | Recon + Initial Access + Persistence |
| IOC Overlap | Same hash/domain/malicious IP | High | Same C2 in multiple alerts |
| Causal | Parent/child process, originating network | Very high | cmd.exe launched from word.exe that downloads payload |
Implementation: Alert Graph with NetworkX
We start with a Python implementation using NetworkX for graph management. This solution is suitable for prototypes and environments with moderate volumes (up to ~100k alerts/day). For higher volumes, Neo4j is used (see next section).
# Alert Graph Correlation System
# File: alert_graph.py
import networkx as nx
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
import uuid
@dataclass
class Alert:
id: str
timestamp: datetime
rule_name: str
severity: str # 'low', 'medium', 'high', 'critical'
host: str
user: Optional[str]
src_ip: Optional[str]
technique_id: Optional[str]
raw_data: dict = field(default_factory=dict)
def severity_score(self) -> int:
return {'low': 1, 'medium': 2, 'high': 3, 'critical': 4}.get(self.severity, 1)
@dataclass
class AlertCluster:
id: str
alerts: list[Alert]
score: float
attack_chain: list[str]
primary_host: str
created_at: datetime
class AlertGraphCorrelator:
CORRELATION_WINDOW_MINUTES = 30
WEIGHT_SEVERITY = 3.0
WEIGHT_TECHNIQUE_CHAIN = 5.0
WEIGHT_SAME_HOST = 2.0
WEIGHT_SAME_USER = 2.5
WEIGHT_IOC_OVERLAP = 4.0
def __init__(self):
self.graph = nx.DiGraph()
self.alerts: dict[str, Alert] = {}
def add_alert(self, alert: Alert) -> None:
"""Adds an alert to the graph and creates correlations."""
self.alerts[alert.id] = alert
self.graph.add_node(alert.id, **{
'timestamp': alert.timestamp.isoformat(),
'severity': alert.severity,
'host': alert.host,
'user': alert.user,
'technique': alert.technique_id,
'score': alert.severity_score()
})
for existing_id, existing in self.alerts.items():
if existing_id == alert.id:
continue
correlations = self._calculate_correlations(alert, existing)
if correlations:
total_weight = sum(c['weight'] for c in correlations)
edge_labels = [c['type'] for c in correlations]
self.graph.add_edge(
existing_id, alert.id,
weight=total_weight,
correlation_types=edge_labels
)
def get_clusters(self, min_cluster_size: int = 2) -> list[AlertCluster]:
"""Identifies correlated alert clusters."""
undirected = self.graph.to_undirected()
components = list(nx.connected_components(undirected))
clusters = []
for component in components:
if len(component) < min_cluster_size:
continue
component_alerts = [self.alerts[aid] for aid in component
if aid in self.alerts]
score = self._calculate_cluster_score(component_alerts, component)
attack_chain = self._extract_attack_chain(component_alerts)
primary_host = self._find_primary_host(component_alerts)
clusters.append(AlertCluster(
id=str(uuid.uuid4()),
alerts=component_alerts,
score=score,
attack_chain=attack_chain,
primary_host=primary_host,
created_at=datetime.now()
))
return sorted(clusters, key=lambda c: c.score, reverse=True)
Automatic Prioritization with Multi-Factor Scoring
Cluster scoring must capture not only the severity of individual alerts, but also the contextual context: kill chain progression, criticality of involved assets, presence of known malicious IOCs.
# Contextual scorer with asset enrichment
class ContextualScorer:
ASSET_MULTIPLIERS = {
'workstation': 1.0,
'server': 1.5,
'database': 2.0,
'dc': 3.0, # Domain Controller
'ot': 4.0 # OT/ICS systems
}
CRITICALITY_MULTIPLIERS = {
'low': 1.0,
'medium': 1.5,
'high': 2.0,
'critical': 3.0
}
def score_cluster(self, cluster: AlertCluster) -> dict:
"""Calculates complete score with breakdown."""
base_score = cluster.score
context_multiplier = 1.0
breakdown = {}
# 1. Asset criticality multiplier
asset = self.asset_registry.get(cluster.primary_host)
if asset:
type_mult = self.ASSET_MULTIPLIERS.get(asset.asset_type, 1.0)
crit_mult = self.CRITICALITY_MULTIPLIERS.get(asset.criticality, 1.0)
asset_mult = type_mult * crit_mult
context_multiplier *= asset_mult
breakdown['asset_multiplier'] = asset_mult
# 2. Threat Intel overlap
ti_hits = sum(
1 for alert in cluster.alerts
if alert.src_ip in self.threat_intel_ips
)
if ti_hits > 0:
ti_boost = 1.0 + (ti_hits * 0.5)
context_multiplier *= ti_boost
breakdown['threat_intel_boost'] = ti_boost
# 3. Kill chain completeness
chain_length = len(cluster.attack_chain)
chain_multiplier = 1.0 + (chain_length * 0.2)
context_multiplier *= chain_multiplier
breakdown['chain_multiplier'] = chain_multiplier
final_score = base_score * context_multiplier
breakdown['final_score'] = final_score
return breakdown
def _score_to_priority(self, score: float) -> str:
if score >= 100:
return 'P1 - Critical'
elif score >= 50:
return 'P2 - High'
elif score >= 20:
return 'P3 - Medium'
else:
return 'P4 - Low'
Neo4j Integration for Enterprise Volumes
For enterprise environments with millions of alerts per day, in-memory NetworkX does not scale. Neo4j, the most widely used graph database, provides native performance for complex correlation queries and historical data persistence.
# Alert Graph on Neo4j
from neo4j import GraphDatabase
from datetime import datetime, timedelta
class Neo4jAlertCorrelator:
def __init__(self, uri: str, username: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(username, password))
self._create_indexes()
def ingest_alert(self, alert: dict) -> None:
"""Inserts an alert and creates correlation relationships."""
with self.driver.session() as session:
session.run("""
CREATE (a:Alert {
id: $id,
timestamp: datetime($timestamp),
rule_name: $rule_name,
severity: $severity,
host: $host,
technique_id: $technique_id
})
""", **alert)
# Create SAME_HOST relationship with recent alerts
session.run("""
MATCH (a:Alert {id: $id})
MATCH (b:Alert)
WHERE b.id <> $id
AND b.host = a.host
AND b.timestamp >= datetime($cutoff)
AND NOT (a)-[:SAME_HOST]-(b)
MERGE (a)-[:SAME_HOST {weight: 2.0}]-(b)
""", id=alert['id'],
cutoff=(datetime.fromisoformat(alert['timestamp'])
- timedelta(minutes=30)).isoformat())
def find_incidents(self, min_alerts: int = 3,
hours_back: int = 24) -> list[dict]:
"""Finds alert clusters representing potential incidents."""
cutoff = (datetime.now() - timedelta(hours=hours_back)).isoformat()
with self.driver.session() as session:
result = session.run("""
MATCH (a:Alert)
WHERE a.timestamp >= datetime($cutoff)
CALL apoc.path.subgraphNodes(a, {
relationshipFilter: 'SAME_HOST|SAME_USER|PRECEDES',
maxLevel: 5
}) YIELD node
WITH collect(DISTINCT node) AS cluster_nodes
WHERE size(cluster_nodes) >= $min_alerts
RETURN cluster_nodes,
reduce(s = 0, n IN cluster_nodes |
s + CASE n.severity
WHEN 'critical' THEN 4
WHEN 'high' THEN 3
WHEN 'medium' THEN 2
ELSE 1 END) AS total_score
ORDER BY total_score DESC
LIMIT 100
""", cutoff=cutoff, min_alerts=min_alerts)
return [dict(record) for record in result]
Anti-Patterns in Triage Automation
- Correlation threshold too low: Correlating any alert within 24 hours on the same host creates enormous and useless clusters. Use tight time windows (15-30 min) for weak correlations.
- Score without asset context: A "high" alert on a honeypot is much less urgent than the same alert on a Domain Controller. Always enrich with asset criticality.
- Automation without feedback loop: The system must learn from analyst feedback (TP/FP) to improve over time. A static system degrades.
- Ignoring single-alert incidents: Not every attack leaves multiple alerts. Critical isolated alerts (e.g., DCSync) must bypass correlation and go directly to P1 queue.
SOC Metrics and Monitoring
Any triage automation system must be monitored with objective metrics to verify that it is actually improving SOC efficiency and not introducing new issues.
# SOC metrics monitoring
@dataclass
class SOCMetrics:
total_alerts: int = 0
correlated_alerts: int = 0
true_positives: int = 0
false_positives: int = 0
total_incidents: int = 0
mttd_values: list[float] = field(default_factory=list)
def false_positive_rate(self) -> float:
total = self.true_positives + self.false_positives
if total == 0:
return 0.0
return (self.false_positives / total) * 100
def alert_compression_ratio(self) -> float:
"""How many alerts per incident on average (noise reduction)."""
if self.total_incidents == 0:
return 1.0
return self.correlated_alerts / self.total_incidents
def report(self) -> dict:
return {
"total_alerts": self.total_alerts,
"total_incidents": self.total_incidents,
"alert_compression_ratio": f"{self.alert_compression_ratio():.1f}:1",
"false_positive_rate_pct": f"{self.false_positive_rate():.1f}%",
"avg_mttd_minutes": (
f"{sum(self.mttd_values)/len(self.mttd_values):.1f}"
if self.mttd_values else "N/A"
)
}
Conclusions and Key Takeaways
Automating triage through graph analysis is not a luxury: it is an operational necessity for any SOC that wants to scale without proportionally scaling the number of analysts. Noise reduction, contextual correlation, and intelligent prioritization allow analysts to focus on what really matters: investigating real threats.
Key Takeaways
- Graph analysis transforms isolated alerts into contextualized attack scenarios
- Multi-factor scoring (severity + asset criticality + kill chain + threat intel) outperforms simple severity ranking
- NetworkX for prototypes, Neo4j for enterprise production
- Integration with SOAR (TheHive, XSOAR) closes the automation-investigation loop
- Always monitor SOC metrics: MTTD, false positive rate, alert compression ratio
- Analyst feedback is fundamental for continuous system improvement
Related Articles
- SOAR Playbook in Python: Incident Response Automation
- AI-Assisted Detection: LLMs for Sigma Rule Generation
- Behavioral Anomaly Detection: ML on Log Data
- Detection Engineering as a Discipline: From Scripts to Pipeline







