動作異常の検出: ログ データの ML
決定論的ルールには基本的な制限があります。つまり、予測されたもののみを検出するということです。 攻撃者は既知のパターンの外で活動し、正規のツールを使用して (地上に生息せず)、 有効な資格情報の盗難、またはまったく新しい技術は、従来の SIEM をほぼ完全に回避します。 ここは、 ログに適用された機械学習.
動作異常検出は特定の動作を探すのではなく、検索します。 正常からの逸脱。 午前 3 時にユーザーが通常の 10 倍のファイルにアクセスし、接続を確立するプロセス 以前は見られなかったネットワーク パターン、サービス アカウントが Active Directory を列挙しようとしている: これらのパターン 異常は、いかなるルールも明示的に予測することなくデータから出現します。
この記事では、Windows/Linux ログ上に完全な動作異常検出システムを構築します。 教師なし検出には Isolation Forest を使用し、深い検出にはオートエンコーダーを使用し、 一時的な変動(時間、日、季節)を管理するためのベースライン モデリング フレームワーク。
何を学ぶか
- ML のセキュリティ ログの特徴エンジニアリング
- Isolation Forest: ログ異常検出の理論、実装、調整
- 複雑な異常検出のためのオートエンコーダー
- 時間的な季節性を考慮したベースライン モデリング
- SHAP による誤検知の削減と解釈可能性
- ドリフト検出による本番環境への導入
動的ベースラインの問題
IT システムにおける「通常の動作」の概念は静的なものではありません。朝8時にサーバーが 同時接続数は 5 で、「通常」です。午前 3 時の同じ数値は異常である可能性があります。 リモートで働くユーザーは、オフィスで働くユーザーとはまったく異なるアクセス パターンを持っています。
したがって、異常検出モデルは次のようにトレーニングする必要があります。 動的ベースライン 以下を考慮します。
- 時間サイクル: 勤務時間中のさまざまな活動と夜間の活動
- 週間サイクル: 営業日と週末の比較
- 月次/季節周期性: 活動が活発な時期(月末など)
- 個々のユーザープロファイル: 各ユーザーには固有のパターンがあります
- 地理的背景: 通常の場所からのアクセスと新しい場所からのアクセス
セキュリティログの特徴エンジニアリング
特徴エンジニアリングの品質は、どのアルゴリズムよりも検出の品質を決定します。 生のログ (Windows イベント、Linux syslog、auth.log) は数値特徴に変換する必要がある ML モデルにとって重要です。
# Feature Engineering per Log di Sicurezza
# File: security_feature_engineer.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
class SecurityFeatureEngineer:
def __init__(self, window_size_minutes: int = 60):
self.window_size = window_size_minutes
def extract_user_session_features(self, logs_df: pd.DataFrame) -> pd.DataFrame:
"""
Input: DataFrame con colonne [timestamp, user, event_id, host,
src_ip, process_name, logon_type]
Output: DataFrame con features aggregate per sessione utente
"""
logs_df['timestamp'] = pd.to_datetime(logs_df['timestamp'])
logs_df['hour'] = logs_df['timestamp'].dt.hour
logs_df['day_of_week'] = logs_df['timestamp'].dt.dayofweek
logs_df['is_business_hours'] = logs_df['hour'].between(8, 18).astype(int)
logs_df['is_weekend'] = (logs_df['day_of_week'] >= 5).astype(int)
# Aggregazione per user-window
features = []
for user, user_logs in logs_df.groupby('user'):
# Finestre temporali mobili
user_logs = user_logs.sort_values('timestamp')
for i in range(0, len(user_logs), self.window_size):
window = user_logs.iloc[i:i+self.window_size]
if len(window) == 0:
continue
feature_row = self._compute_window_features(user, window)
features.append(feature_row)
return pd.DataFrame(features)
def _compute_window_features(self, user: str,
window: pd.DataFrame) -> dict:
"""Calcola features per una finestra temporale."""
return {
'user': user,
'window_start': window['timestamp'].min(),
# Volume features
'total_events': len(window),
'unique_hosts': window['host'].nunique(),
'unique_processes': window['process_name'].nunique(),
'unique_src_ips': window['src_ip'].nunique(),
# Event type distribution
'logon_events': (window['event_id'] == 4624).sum(),
'failed_logons': (window['event_id'] == 4625).sum(),
'logoff_events': (window['event_id'] == 4634).sum(),
'privilege_use': (window['event_id'] == 4672).sum(),
'process_creation': (window['event_id'] == 4688).sum(),
# Temporal features
'is_business_hours_ratio': window['is_business_hours'].mean(),
'is_weekend_ratio': window['is_weekend'].mean(),
'hour_entropy': self._entropy(window['hour']),
# Logon type distribution
'interactive_logons': (window['logon_type'] == 2).sum(),
'network_logons': (window['logon_type'] == 3).sum(),
'remote_interactive': (window['logon_type'] == 10).sum(),
# Ratios e derived features
'failed_logon_rate': (
(window['event_id'] == 4625).sum() /
max((window['event_id'] == 4624).sum(), 1)
),
'host_diversity': (
window['host'].nunique() / max(len(window), 1)
),
}
def _entropy(self, series: pd.Series) -> float:
"""Calcola l'entropia di Shannon di una serie categorica."""
if len(series) == 0:
return 0.0
counts = series.value_counts(normalize=True)
return -sum(p * np.log2(p) for p in counts if p > 0)
def extract_network_features(self, netflow_df: pd.DataFrame) -> pd.DataFrame:
"""Features da NetFlow/zeek logs."""
netflow_df['timestamp'] = pd.to_datetime(netflow_df['timestamp'])
features = netflow_df.groupby(['src_ip', pd.Grouper(
key='timestamp', freq=f'{self.window_size}min'
)]).agg(
total_bytes=('bytes', 'sum'),
total_packets=('packets', 'sum'),
unique_dst_ips=('dst_ip', 'nunique'),
unique_dst_ports=('dst_port', 'nunique'),
connection_count=('dst_ip', 'count'),
avg_duration=('duration', 'mean'),
# Beaconing indicator: bassa varianza in intervalli di connessione
duration_std=('duration', 'std'),
# Port scanning indicator
high_port_count=(
'dst_port',
lambda x: (x > 1024).sum()
),
).reset_index()
# Beaconing score (bassa std = possibile C2)
features['beaconing_score'] = 1 / (features['duration_std'] + 1)
# Port scan score
features['port_scan_score'] = (
features['unique_dst_ports'] /
features['connection_count'].clip(lower=1)
)
return features
ログ異常検出のための隔離フォレスト
孤立の森 教師なし異常検出のための最も普及しているアルゴリズム 高次元データについて。原則はエレガントです: 異常は稀であり、異なっており、 デシジョン ツリーをいくつかランダムに分割することで、それらを「分離」するのが簡単になります。
実際的な観点から言えば、通常のイベントでは、多くの分割を他の分割から分離する必要があります。 異常なイベント (真の例外) は、ほとんど分割されずにすぐに分離されます。 異常スコアは、必要な分割数の逆数に比例します。
# Isolation Forest per User Behavior Anomaly Detection
# File: isolation_forest_detector.py
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
import joblib
from pathlib import Path
class UserBehaviorIsolationForest:
def __init__(self,
contamination: float = 0.05, # 5% atteso anomalie
n_estimators: int = 200,
random_state: int = 42):
self.model = IsolationForest(
contamination=contamination,
n_estimators=n_estimators,
max_samples='auto',
max_features=1.0,
random_state=random_state,
n_jobs=-1
)
self.scaler = StandardScaler()
self.feature_names: list[str] = []
self.is_fitted = False
# Feature numeriche usate per il modello
NUMERIC_FEATURES = [
'total_events', 'unique_hosts', 'unique_processes', 'unique_src_ips',
'logon_events', 'failed_logons', 'privilege_use', 'process_creation',
'is_business_hours_ratio', 'hour_entropy', 'failed_logon_rate',
'host_diversity', 'interactive_logons', 'network_logons'
]
def fit(self, features_df: pd.DataFrame) -> 'UserBehaviorIsolationForest':
"""Addestra il modello sul comportamento normale."""
X = features_df[self.NUMERIC_FEATURES].fillna(0)
self.feature_names = self.NUMERIC_FEATURES
# Normalizza le features
X_scaled = self.scaler.fit_transform(X)
# Addestra Isolation Forest
self.model.fit(X_scaled)
self.is_fitted = True
print(f"Modello addestrato su {len(X)} campioni")
return self
def predict(self, features_df: pd.DataFrame) -> pd.DataFrame:
"""Predice anomalie. Ritorna DataFrame con score e label."""
if not self.is_fitted:
raise RuntimeError("Modello non addestrato. Chiama fit() prima.")
X = features_df[self.NUMERIC_FEATURES].fillna(0)
X_scaled = self.scaler.transform(X)
# Score: più negativo = più anomalo
anomaly_scores = self.model.decision_function(X_scaled)
predictions = self.model.predict(X_scaled) # 1=normale, -1=anomalia
result_df = features_df.copy()
result_df['anomaly_score'] = anomaly_scores
# Normalizza score in [0, 1] dove 1 = massima anomalia
score_min = anomaly_scores.min()
score_max = anomaly_scores.max()
result_df['anomaly_score_normalized'] = (
1 - (anomaly_scores - score_min) / (score_max - score_min + 1e-10)
)
result_df['is_anomaly'] = predictions == -1
result_df['anomaly_label'] = predictions
return result_df
def fit_predict_with_rolling_baseline(
self,
all_features_df: pd.DataFrame,
training_days: int = 30,
evaluation_window_days: int = 1
) -> pd.DataFrame:
"""
Addestra su una finestra mobile e predice sulla finestra successiva.
Simula il deployment rolling in produzione.
"""
all_features_df = all_features_df.sort_values('window_start')
all_features_df['window_start'] = pd.to_datetime(all_features_df['window_start'])
all_results = []
start_date = all_features_df['window_start'].min()
end_date = all_features_df['window_start'].max()
current_date = start_date + timedelta(days=training_days)
while current_date <= end_date:
# Training window: ultimi N giorni
train_start = current_date - timedelta(days=training_days)
train_mask = (
(all_features_df['window_start'] >= train_start) &
(all_features_df['window_start'] < current_date)
)
train_df = all_features_df[train_mask]
# Evaluation window: prossimo giorno
eval_end = current_date + timedelta(days=evaluation_window_days)
eval_mask = (
(all_features_df['window_start'] >= current_date) &
(all_features_df['window_start'] < eval_end)
)
eval_df = all_features_df[eval_mask]
if len(train_df) < 100 or len(eval_df) == 0:
current_date += timedelta(days=evaluation_window_days)
continue
# Addestra e predice
model = UserBehaviorIsolationForest()
model.fit(train_df)
results = model.predict(eval_df)
all_results.append(results)
current_date += timedelta(days=evaluation_window_days)
return pd.concat(all_results, ignore_index=True) if all_results else pd.DataFrame()
def save(self, path: str) -> None:
Path(path).parent.mkdir(parents=True, exist_ok=True)
joblib.dump({
'model': self.model,
'scaler': self.scaler,
'feature_names': self.feature_names
}, path)
@classmethod
def load(cls, path: str) -> 'UserBehaviorIsolationForest':
data = joblib.load(path)
instance = cls()
instance.model = data['model']
instance.scaler = data['scaler']
instance.feature_names = data['feature_names']
instance.is_fitted = True
return instance
複雑な異常検出のためのオートエンコーダー
隔離の森は「時間厳守」異常(通常とは大きく異なる単一の出来事)に優れています。 しかし異常と闘っている 文脈に応じた e 集団的。オートエンコーダー ニューラルは全体像を完成させます。通常のデータのみでトレーニングされ、圧縮と再構成を学習します。 典型的なパターン。モデルがそうでないため、異常により高い再構成誤差が発生します。 トレーニング中にそのパターンを見たことがありますか。
# Autoencoder per Anomaly Detection
# File: autoencoder_detector.py
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset
class SecurityAutoencoder(nn.Module):
def __init__(self, input_dim: int, encoding_dim: int = 8):
super(SecurityAutoencoder, self).__init__()
# Encoder: comprime l'input in una rappresentazione latente
self.encoder = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.BatchNorm1d(64),
nn.Dropout(0.2),
nn.Linear(64, 32),
nn.ReLU(),
nn.BatchNorm1d(32),
nn.Linear(32, encoding_dim),
nn.ReLU()
)
# Decoder: ricostruisce l'input dalla rappresentazione latente
self.decoder = nn.Sequential(
nn.Linear(encoding_dim, 32),
nn.ReLU(),
nn.BatchNorm1d(32),
nn.Linear(32, 64),
nn.ReLU(),
nn.BatchNorm1d(64),
nn.Dropout(0.2),
nn.Linear(64, input_dim),
nn.Sigmoid() # Output normalizzato [0, 1]
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
def encode(self, x: torch.Tensor) -> torch.Tensor:
return self.encoder(x)
class AutoencoderAnomalyDetector:
def __init__(self, encoding_dim: int = 8, epochs: int = 100,
batch_size: int = 64, learning_rate: float = 1e-3,
device: str = 'auto'):
self.encoding_dim = encoding_dim
self.epochs = epochs
self.batch_size = batch_size
self.learning_rate = learning_rate
self.device = (
torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device == 'auto' else torch.device(device)
)
self.model: SecurityAutoencoder = None
self.threshold: float = None
self.scaler = None
def fit(self, X_normal: np.ndarray) -> 'AutoencoderAnomalyDetector':
"""Addestra l'autoencoder solo su dati normali."""
from sklearn.preprocessing import MinMaxScaler
self.scaler = MinMaxScaler()
X_scaled = self.scaler.fit_transform(X_normal).astype(np.float32)
input_dim = X_scaled.shape[1]
self.model = SecurityAutoencoder(input_dim, self.encoding_dim).to(self.device)
# Training
dataset = TensorDataset(torch.FloatTensor(X_scaled))
loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)
optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
criterion = nn.MSELoss()
self.model.train()
for epoch in range(self.epochs):
total_loss = 0
for batch in loader:
x = batch[0].to(self.device)
optimizer.zero_grad()
reconstructed = self.model(x)
loss = criterion(reconstructed, x)
loss.backward()
optimizer.step()
total_loss += loss.item()
if epoch % 20 == 0:
avg_loss = total_loss / len(loader)
print(f"Epoch {epoch}/{self.epochs}, Loss: {avg_loss:.6f}")
# Calcola threshold come percentile 95 degli errori di ricostruzione
# sul training set normale
reconstruction_errors = self._compute_reconstruction_errors(X_scaled)
self.threshold = np.percentile(reconstruction_errors, 95)
print(f"Threshold anomalia: {self.threshold:.6f}")
return self
def _compute_reconstruction_errors(self, X_scaled: np.ndarray) -> np.ndarray:
"""Calcola errori di ricostruzione elemento per elemento."""
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X_scaled).to(self.device)
reconstructed = self.model(X_tensor)
errors = torch.mean((X_tensor - reconstructed) ** 2, dim=1)
return errors.cpu().numpy()
def predict(self, X: np.ndarray) -> dict:
"""Predice anomalie e restituisce score e labels."""
X_scaled = self.scaler.transform(X).astype(np.float32)
reconstruction_errors = self._compute_reconstruction_errors(X_scaled)
is_anomaly = reconstruction_errors > self.threshold
anomaly_score = reconstruction_errors / self.threshold # Score normalizzato
return {
'reconstruction_error': reconstruction_errors,
'anomaly_score': anomaly_score,
'is_anomaly': is_anomaly,
'threshold': self.threshold
}
SHAP による解釈可能性: 異常を理解する
「異常: はい/いいえ」のみを生成する異常検出システムは、アナリストにとっての有用性が限られています。 SHAP (シャープリー添加剤の説明) サンプルがなぜそうなったのかを説明できるようになります 異常として分類され、どの特徴が異常スコアに最も寄与したかを示します。
# Interpretabilita con SHAP per Anomaly Detection
# File: shap_explainer.py
import shap
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
class AnomalyExplainer:
def __init__(self, isolation_forest_model,
feature_names: list[str]):
self.model = isolation_forest_model
self.feature_names = feature_names
self.explainer = None
def fit_explainer(self, background_data: pd.DataFrame) -> None:
"""Inizializza lo SHAP explainer con dati di background."""
X_bg = background_data[self.feature_names].fillna(0)
self.explainer = shap.TreeExplainer(self.model)
def explain_anomaly(self, anomalous_sample: pd.Series) -> dict:
"""Spiega perchè un campione e anomalo."""
if self.explainer is None:
raise RuntimeError("Chiama fit_explainer() prima.")
X = anomalous_sample[self.feature_names].fillna(0).values.reshape(1, -1)
shap_values = self.explainer.shap_values(X)
feature_contributions = sorted(
zip(self.feature_names, shap_values[0]),
key=lambda x: abs(x[1]),
reverse=True
)
return {
'top_anomaly_drivers': [
{
'feature': name,
'shap_value': float(value),
'actual_value': float(anomalous_sample.get(name, 0)),
'direction': 'increases_anomaly' if value < 0 else 'decreases_anomaly'
}
for name, value in feature_contributions[:5]
],
'explanation': self._generate_natural_language_explanation(
feature_contributions[:3], anomalous_sample
)
}
def _generate_natural_language_explanation(
self,
top_features: list[tuple],
sample: pd.Series
) -> str:
"""Genera una spiegazione in linguaggio naturale."""
explanations = []
for feature, shap_val in top_features:
value = sample.get(feature, 0)
if feature == 'failed_logon_rate' and value > 0.3:
explanations.append(
f"Tasso di logon falliti anomalmente alto ({value:.1%})"
)
elif feature == 'unique_hosts' and value > 5:
explanations.append(
f"Accesso a {int(value)} host distinti (inusuale)"
)
elif feature == 'is_business_hours_ratio' and value < 0.2:
explanations.append(
f"Attivita prevalentemente fuori orario lavorativo ({value:.1%})"
)
elif feature == 'hour_entropy' and value > 2.0:
explanations.append(
f"Pattern orario molto irregolare (entropia: {value:.2f})"
)
return "; ".join(explanations) if explanations else "Pattern comportamentale inusuale rilevato"
完全なパイプラインとデプロイメント
本番パイプラインには、特徴エンジニアリング、検出モデル、説明が統合されています ほぼリアルタイムでログを処理する継続的なストリームでアラートを送信します。
# Pipeline completa di produzione
# File: anomaly_detection_pipeline.py
import logging
from dataclasses import dataclass
@dataclass
class AnomalyAlert:
user: str
window_start: str
anomaly_score: float
reconstruction_error: float
explanation: str
top_features: list[dict]
severity: str
class AnomalyDetectionPipeline:
def __init__(self,
if_model: UserBehaviorIsolationForest,
ae_model: AutoencoderAnomalyDetector,
feature_names: list[str]):
self.if_model = if_model
self.ae_model = ae_model
self.feature_names = feature_names
self.explainer = AnomalyExplainer(if_model.model, feature_names)
self.logger = logging.getLogger(__name__)
def process_batch(self, features_df: pd.DataFrame,
score_threshold: float = 0.7) -> list[AnomalyAlert]:
"""Processa un batch di features e ritorna gli alert."""
alerts = []
# Isolation Forest predictions
if_results = self.if_model.predict(features_df)
# Autoencoder predictions
X = features_df[self.feature_names].fillna(0).values
ae_results = self.ae_model.predict(X)
# Combina i due modelli con ensemble voting
for idx, row in if_results.iterrows():
if_score = row['anomaly_score_normalized']
ae_score = ae_results['anomaly_score'][idx]
# Ensemble: media pesata (IF più affidabile su questo tipo di dati)
ensemble_score = 0.6 * if_score + 0.4 * min(ae_score, 1.0)
if ensemble_score >= score_threshold:
# Genera spiegazione SHAP
try:
explanation = self.explainer.explain_anomaly(row)
except Exception as e:
self.logger.warning(f"SHAP explain failed: {e}")
explanation = {'explanation': 'N/A', 'top_anomaly_drivers': []}
severity = self._score_to_severity(ensemble_score)
alerts.append(AnomalyAlert(
user=row.get('user', 'unknown'),
window_start=str(row.get('window_start', '')),
anomaly_score=round(ensemble_score, 3),
reconstruction_error=float(ae_results['reconstruction_error'][idx]),
explanation=explanation.get('explanation', ''),
top_features=explanation.get('top_anomaly_drivers', []),
severity=severity
))
return sorted(alerts, key=lambda a: a.anomaly_score, reverse=True)
def _score_to_severity(self, score: float) -> str:
if score >= 0.95:
return 'critical'
elif score >= 0.85:
return 'high'
elif score >= 0.75:
return 'medium'
else:
return 'low'
モデルドリフト管理
ユーザーの行動は時間の経過とともに変化します (新しいツール、組織の再編、リモートワークなど)。 6 か月前にトレーニングされたモデルでは、動作に関して誤検知が多すぎる可能性があります それが普通になってしまった。の ドリフト検出 自動的にこの劣化を防ぎます。
# Drift Detection per Anomaly Models
# File: drift_detector.py
from scipy import stats
class ModelDriftDetector:
def __init__(self, baseline_scores: np.ndarray,
drift_threshold: float = 0.05):
self.baseline_scores = baseline_scores
self.drift_threshold = drift_threshold
def check_drift(self, recent_scores: np.ndarray) -> dict:
"""
Usa Kolmogorov-Smirnov test per rilevare drift nella distribuzione
degli anomaly score.
"""
ks_statistic, p_value = stats.ks_2samp(
self.baseline_scores, recent_scores
)
drift_detected = p_value < self.drift_threshold
severity = 'none'
if drift_detected:
if ks_statistic > 0.3:
severity = 'high'
elif ks_statistic > 0.15:
severity = 'medium'
else:
severity = 'low'
return {
'drift_detected': drift_detected,
'ks_statistic': float(ks_statistic),
'p_value': float(p_value),
'severity': severity,
'recommendation': (
'Retraining necessario' if severity == 'high'
else 'Monitoraggio aumentato' if severity == 'medium'
else 'Nessuna azione richiesta'
)
}
def detect_false_positive_spike(self, fp_rate_history: list[float],
window: int = 7) -> bool:
"""Rileva spike nel tasso di falsi positivi."""
if len(fp_rate_history) < window:
return False
recent = np.mean(fp_rate_history[-window:])
historical = np.mean(fp_rate_history[:-window])
return recent > historical * 2 # FP rate raddoppiato = alert
アンチパターン: 不正確な汚染率
パラメータ contamination 孤独の森主宰、評論家。設定値が高すぎる (0.10 など)
膨大な数の誤検知が発生します。低すぎると (0.001 など)、実際の異常が漏れてしまいます。
正しい推定値は、環境内の悪意のあるイベントの過去の割合から得られます。不在時
履歴データの値を 0.05 から開始し、アナリストのフィードバックに基づいて調整することをお勧めします。
導入の最初の数週間。
結論と重要なポイント
ML ベースの動作異常検出は、基本的に次の武器を補完します。 検出エンジニア: 決定論的ルールの盲点をカバーし、次の方法を使用して攻撃者を検出します。 地上に生息する技術を利用し、有効な認証情報を使用して活動する内部関係者による脅威を特定します。
重要なポイント
- アルゴリズムの選択よりも品質の特徴エンジニアリングの方が重要です
- Isolation Forest はログ異常検出の開始点です: 高速、スケーラブル、監視なし
- オートエンコーダーは、コンテキスト異常および複雑な異常に対して IF を補完します。
- SHAP は、アナリストが異常を解釈できるようにするために不可欠です
- ベースラインローリングにより、動作の進化に伴うモデルの老化を防止します
- 自動ドリフト検出により長期にわたって品質を保証
- 複数のモデルのアンサンブルにより、偽陽性と偽陰性の両方が削減されます。
関連記事
- アラートトリアージの自動化: グラフ分析による MTTD の削減
- シグマ ルール: ユニバーサル検出ロジック
- AI 支援検出: シグマ ルール生成用の LLM
- Git と CI/CD を使用したコードとしての検出パイプライン







