Behavioral Anomaly Detection: ML on Log Data
Deterministic rules have a fundamental limitation: they detect only what has been anticipated. An attacker operating outside known patterns - using legitimate tools (living-off-the-land), stolen valid credentials, or completely new techniques - largely evades traditional SIEMs. This is where Machine Learning applied to logs comes in.
Behavioral anomaly detection does not look for specific behaviors: it looks for deviations from normality. A user accessing 10x more files than usual at 3:00 AM, a process establishing network connections never seen before, a service account attempting to enumerate Active Directory: these anomalous patterns emerge from data without any rule having explicitly anticipated them.
This article builds a complete behavioral anomaly detection system on Windows/Linux logs, using Isolation Forest for unsupervised detection, autoencoders for deep anomaly detection, and a baseline modeling framework to handle temporal variability (hours, days, seasons).
What You Will Learn
- Feature engineering on security logs for ML
- Isolation Forest: theory, implementation and tuning for log anomaly detection
- Autoencoders for complex anomaly detection
- Baseline modeling with temporal seasonality
- False positive reduction and interpretability with SHAP
- Production deployment with drift detection
The Dynamic Baseline Problem
The concept of "normal behavior" in an IT system is not static. A server with 5 simultaneous connections at 8:00 AM is "normal"; the same number at 3:00 AM might be anomalous. A user working remotely has completely different access patterns from someone working in the office.
Anomaly detection models must therefore be trained on dynamic baselines that account for:
- Hourly cyclicality: different activity during business hours vs. nighttime
- Weekly cyclicality: business days vs. weekends
- Monthly/seasonal cyclicality: periods of high activity (e.g., month-end)
- Individual user profiles: each user has unique patterns
- Geographic context: access from usual vs. new locations
Feature Engineering on Security Logs
The quality of feature engineering determines detection quality more than any algorithm. Raw logs (Windows events, Linux syslog, auth.log) must be transformed into meaningful numerical features for ML models.
# Feature Engineering for Security Logs
import pandas as pd
import numpy as np
class SecurityFeatureEngineer:
def __init__(self, window_size_minutes: int = 60):
self.window_size = window_size_minutes
def extract_user_session_features(self, logs_df: pd.DataFrame) -> pd.DataFrame:
"""
Input: DataFrame with [timestamp, user, event_id, host, src_ip, process_name, logon_type]
Output: DataFrame with aggregated features per user session
"""
logs_df['timestamp'] = pd.to_datetime(logs_df['timestamp'])
logs_df['hour'] = logs_df['timestamp'].dt.hour
logs_df['day_of_week'] = logs_df['timestamp'].dt.dayofweek
logs_df['is_business_hours'] = logs_df['hour'].between(8, 18).astype(int)
logs_df['is_weekend'] = (logs_df['day_of_week'] >= 5).astype(int)
features = []
for user, user_logs in logs_df.groupby('user'):
user_logs = user_logs.sort_values('timestamp')
for i in range(0, len(user_logs), self.window_size):
window = user_logs.iloc[i:i+self.window_size]
if len(window) == 0:
continue
features.append(self._compute_window_features(user, window))
return pd.DataFrame(features)
def _compute_window_features(self, user: str, window: pd.DataFrame) -> dict:
return {
'user': user,
'window_start': window['timestamp'].min(),
'total_events': len(window),
'unique_hosts': window['host'].nunique(),
'unique_processes': window['process_name'].nunique(),
'logon_events': (window['event_id'] == 4624).sum(),
'failed_logons': (window['event_id'] == 4625).sum(),
'privilege_use': (window['event_id'] == 4672).sum(),
'process_creation': (window['event_id'] == 4688).sum(),
'is_business_hours_ratio': window['is_business_hours'].mean(),
'hour_entropy': self._entropy(window['hour']),
'failed_logon_rate': (
(window['event_id'] == 4625).sum() /
max((window['event_id'] == 4624).sum(), 1)
),
'host_diversity': window['host'].nunique() / max(len(window), 1),
'network_logons': (window['logon_type'] == 3).sum(),
'remote_interactive': (window['logon_type'] == 10).sum(),
}
def _entropy(self, series: pd.Series) -> float:
if len(series) == 0:
return 0.0
counts = series.value_counts(normalize=True)
return -sum(p * np.log2(p) for p in counts if p > 0)
Isolation Forest for Log Anomaly Detection
Isolation Forest is the most widely used algorithm for unsupervised anomaly detection on high-dimensional data. The principle is elegant: anomalies, being rare and different, are easier to "isolate" with few random splits of a decision tree.
In practical terms: a normal event requires many splits to be isolated from others; an anomalous event (a true exception) is isolated quickly, with few splits. The anomaly score is proportional to the inverse of the number of splits needed.
# Isolation Forest for User Behavior Anomaly Detection
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
import joblib
class UserBehaviorIsolationForest:
NUMERIC_FEATURES = [
'total_events', 'unique_hosts', 'unique_processes',
'logon_events', 'failed_logons', 'privilege_use', 'process_creation',
'is_business_hours_ratio', 'hour_entropy', 'failed_logon_rate',
'host_diversity', 'network_logons', 'remote_interactive'
]
def __init__(self, contamination: float = 0.05,
n_estimators: int = 200, random_state: int = 42):
self.model = IsolationForest(
contamination=contamination,
n_estimators=n_estimators,
max_samples='auto',
random_state=random_state,
n_jobs=-1
)
self.scaler = StandardScaler()
self.is_fitted = False
def fit(self, features_df: pd.DataFrame) -> 'UserBehaviorIsolationForest':
"""Trains the model on normal behavior data."""
X = features_df[self.NUMERIC_FEATURES].fillna(0)
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled)
self.is_fitted = True
print(f"Model trained on {len(X)} samples")
return self
def predict(self, features_df: pd.DataFrame) -> pd.DataFrame:
"""Predicts anomalies. Returns DataFrame with scores and labels."""
if not self.is_fitted:
raise RuntimeError("Model not trained. Call fit() first.")
X = features_df[self.NUMERIC_FEATURES].fillna(0)
X_scaled = self.scaler.transform(X)
anomaly_scores = self.model.decision_function(X_scaled)
predictions = self.model.predict(X_scaled) # 1=normal, -1=anomaly
result_df = features_df.copy()
result_df['anomaly_score'] = anomaly_scores
score_min, score_max = anomaly_scores.min(), anomaly_scores.max()
result_df['anomaly_score_normalized'] = (
1 - (anomaly_scores - score_min) / (score_max - score_min + 1e-10)
)
result_df['is_anomaly'] = predictions == -1
return result_df
Autoencoder for Complex Anomaly Detection
Isolation Forest excels at "point" anomalies (single events very different from the norm), but struggles with contextual and collective anomalies. A neural autoencoder completes the picture: trained only on normal data, it learns to compress and reconstruct typical patterns. Anomalies produce high reconstruction error because the model has never seen that pattern during training.
# Autoencoder for Anomaly Detection
import torch
import torch.nn as nn
import numpy as np
class SecurityAutoencoder(nn.Module):
def __init__(self, input_dim: int, encoding_dim: int = 8):
super(SecurityAutoencoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.BatchNorm1d(64),
nn.Dropout(0.2),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, encoding_dim),
nn.ReLU()
)
self.decoder = nn.Sequential(
nn.Linear(encoding_dim, 32),
nn.ReLU(),
nn.BatchNorm1d(32),
nn.Linear(32, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, input_dim),
nn.Sigmoid()
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.decoder(self.encoder(x))
class AutoencoderAnomalyDetector:
def __init__(self, encoding_dim: int = 8, epochs: int = 100,
batch_size: int = 64, learning_rate: float = 1e-3):
self.encoding_dim = encoding_dim
self.epochs = epochs
self.batch_size = batch_size
self.learning_rate = learning_rate
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model = None
self.threshold = None
self.scaler = None
def fit(self, X_normal: np.ndarray) -> 'AutoencoderAnomalyDetector':
"""Trains the autoencoder only on normal data."""
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
self.scaler = MinMaxScaler()
X_scaled = self.scaler.fit_transform(X_normal).astype(np.float32)
input_dim = X_scaled.shape[1]
self.model = SecurityAutoencoder(input_dim, self.encoding_dim).to(self.device)
dataset = TensorDataset(torch.FloatTensor(X_scaled))
loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)
optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
criterion = nn.MSELoss()
self.model.train()
for epoch in range(self.epochs):
for batch in loader:
x = batch[0].to(self.device)
optimizer.zero_grad()
loss = criterion(self.model(x), x)
loss.backward()
optimizer.step()
# Set threshold at 95th percentile of reconstruction errors on normal data
errors = self._compute_reconstruction_errors(X_scaled)
self.threshold = np.percentile(errors, 95)
return self
def predict(self, X: np.ndarray) -> dict:
X_scaled = self.scaler.transform(X).astype(np.float32)
errors = self._compute_reconstruction_errors(X_scaled)
return {
'reconstruction_error': errors,
'anomaly_score': errors / self.threshold,
'is_anomaly': errors > self.threshold
}
def _compute_reconstruction_errors(self, X_scaled: np.ndarray) -> np.ndarray:
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X_scaled).to(self.device)
reconstructed = self.model(X_tensor)
errors = torch.mean((X_tensor - reconstructed) ** 2, dim=1)
return errors.cpu().numpy()
Interpretability with SHAP: Understanding Anomalies
An anomaly detection system that produces only "anomaly: yes/no" has limited utility for analysts. SHAP (SHapley Additive exPlanations) allows explaining why a sample was classified as anomalous, indicating which features contributed most to the anomaly score.
# SHAP Explainer for Anomaly Detection
import shap
class AnomalyExplainer:
def __init__(self, isolation_forest_model, feature_names: list[str]):
self.model = isolation_forest_model
self.feature_names = feature_names
self.explainer = None
def fit_explainer(self, background_data: pd.DataFrame) -> None:
X_bg = background_data[self.feature_names].fillna(0)
self.explainer = shap.TreeExplainer(self.model)
def explain_anomaly(self, anomalous_sample: pd.Series) -> dict:
X = anomalous_sample[self.feature_names].fillna(0).values.reshape(1, -1)
shap_values = self.explainer.shap_values(X)
feature_contributions = sorted(
zip(self.feature_names, shap_values[0]),
key=lambda x: abs(x[1]),
reverse=True
)
return {
'top_anomaly_drivers': [
{
'feature': name,
'shap_value': float(value),
'actual_value': float(anomalous_sample.get(name, 0)),
'direction': 'increases_anomaly' if value < 0 else 'decreases_anomaly'
}
for name, value in feature_contributions[:5]
]
}
Model Drift Management
User behaviors change over time (new tools, reorganizations, remote work). A model trained 6 months ago may generate too many false positives on behaviors that have become normal. Automatic drift detection prevents this degradation.
# Drift Detection
from scipy import stats
class ModelDriftDetector:
def __init__(self, baseline_scores: np.ndarray, drift_threshold: float = 0.05):
self.baseline_scores = baseline_scores
self.drift_threshold = drift_threshold
def check_drift(self, recent_scores: np.ndarray) -> dict:
"""Uses Kolmogorov-Smirnov test to detect distribution drift."""
ks_statistic, p_value = stats.ks_2samp(
self.baseline_scores, recent_scores
)
drift_detected = p_value < self.drift_threshold
severity = 'none'
if drift_detected:
severity = 'high' if ks_statistic > 0.3 else (
'medium' if ks_statistic > 0.15 else 'low'
)
return {
'drift_detected': drift_detected,
'ks_statistic': float(ks_statistic),
'p_value': float(p_value),
'severity': severity,
'recommendation': (
'Retraining required' if severity == 'high'
else 'Increased monitoring' if severity == 'medium'
else 'No action required'
)
}
Anti-Pattern: Wrong Contamination Rate
The contamination parameter of Isolation Forest is critical. Setting it too high (e.g., 0.10)
produces an enormous number of false positives; too low (e.g., 0.001) misses real anomalies.
The correct estimate comes from the historical percentage of malicious events in the environment.
In the absence of historical data, starting with 0.05 and calibrating based on analyst feedback
in the first weeks of deployment is recommended.
Production Pipeline
The production pipeline integrates feature engineering, detection models, explanation, and alerting into a continuous flow that processes logs in near real-time.
# Production pipeline with ensemble
@dataclass
class AnomalyAlert:
user: str
window_start: str
anomaly_score: float
explanation: str
top_features: list[dict]
severity: str
class AnomalyDetectionPipeline:
def process_batch(self, features_df: pd.DataFrame,
score_threshold: float = 0.7) -> list[AnomalyAlert]:
alerts = []
# Get predictions from both models
if_results = self.if_model.predict(features_df)
X = features_df[self.feature_names].fillna(0).values
ae_results = self.ae_model.predict(X)
for idx, row in if_results.iterrows():
if_score = row['anomaly_score_normalized']
ae_score = min(ae_results['anomaly_score'][idx], 1.0)
# Weighted ensemble: IF more reliable for this data type
ensemble_score = 0.6 * if_score + 0.4 * ae_score
if ensemble_score >= score_threshold:
explanation = self.explainer.explain_anomaly(row)
severity = (
'critical' if ensemble_score >= 0.95
else 'high' if ensemble_score >= 0.85
else 'medium' if ensemble_score >= 0.75
else 'low'
)
alerts.append(AnomalyAlert(
user=row.get('user', 'unknown'),
window_start=str(row.get('window_start', '')),
anomaly_score=round(ensemble_score, 3),
explanation=explanation.get('explanation', ''),
top_features=explanation.get('top_anomaly_drivers', []),
severity=severity
))
return sorted(alerts, key=lambda a: a.anomaly_score, reverse=True)
Conclusions and Key Takeaways
Behavioral anomaly detection based on ML fundamentally complements the detection engineer's arsenal: it covers the blind spots of deterministic rules, detects attackers using living-off-the-land techniques, and identifies insider threats operating with valid credentials.
Key Takeaways
- Quality feature engineering matters more than algorithm choice
- Isolation Forest is the starting point for log anomaly detection: fast, scalable, unsupervised
- Autoencoders complement IF for contextual and complex anomalies
- SHAP is essential for making anomalies interpretable to analysts
- Rolling baseline prevents the model from becoming stale as behaviors evolve
- Automatic drift detection ensures quality over time
- Ensemble of multiple models reduces both false positives and false negatives
Related Articles
- Alert Triage Automation: Reducing MTTD with Graph Analysis
- Sigma Rules: Universal Detection Logic
- AI-Assisted Detection: LLMs for Sigma Rule Generation
- Detection-as-Code Pipeline with Git and CI/CD







