AI 支援検出: シグマ ルール生成用の LLM
大規模言語モデルの検出エンジニアリングへの統合は、近年の SOC 環境における最も根本的な変化の 1 つを表しています。 これは単に反復的なタスクを自動化するだけの問題ではありません。LLM を使用してシグマ ルールを生成するということは、プロセスを数秒に圧縮することを意味します。 専門アナリストは、脅威レポートの読み取りから、展開可能なテスト済みルールの作成までを数時間で完了します。
のようなフレームワーク シグマジェンは、MITRE ATT&CK APAC 2025 で発表され、微調整されたモデルがどのようにレポートを取り込むことができるかを示しています。 脅威インテリジェンスを作成し、ATT&CK テクニックを抽出し、高精度にマップされたシグマ ルールを生成します。同時に、オープンソース ツールと n8n ベースのワークフローにより、小規模チームでも企業の投資なしで AI 支援パイプラインを構築できます。
この記事では、検出ルールを生成するための AI 支援システムのアーキテクチャについて説明します。エンジニアリング プロンプトから、 自動検証、合成ログを使用したテスト、既存の CI/CD パイプラインへの統合まで。
何を学ぶか
- LLM はシグマ形式についてどのように考えているのか、またなぜ LLM が直接 SPL よりも優れた出力を生成するのか
- 検出ルールを生成するための具体的なプロンプトエンジニアリング手法
- エンドツーエンドの AI 支援パイプラインのアーキテクチャ
- 生成されたルールの自動検証とテスト
- SigmaGen、pySigma、CI/CD ワークフローとの統合
- 回避すべきアンチパターンとセキュリティ コンテキストにおける LLM の実際の制限
LLM がシグマ ルールの生成に優れている理由
応用研究から明らかになった (そして実践によって確認された) 最も興味深い観察の 1 つは、LLM が高品質の出力を生成するということです。 生成すると大幅に高くなる シグマ 直接の SPL または KQL クエリと比較して。その理由は構造的なものです。
Sigma の YAML 形式では、次のものを明確に分離します。
- タイトルと説明: モデルは、何を検出しているのか、なぜ検出しているのかを明確にする必要があります。
- ログソース: データソース (カテゴリ、製品、サービス) を指定します。
- 検出: ブール一致ロジック
- 状態: セレクターの組み合わせ方法
- 誤検知: エッジケースに対する明示的な推論
この構造により、モデルは逐次的かつ宣言的に「考える」ことが強制され、次のような場合に発生する論理エラーが軽減されます。 プラットフォーム固有のクエリ言語で出力を直接要求します。実際には、シグマは次のように機能します。 暗黙の思考連鎖 検出に適用される LLM の場合。
ベンチマークデータ
LLMCloudHunter プロジェクト (2024 年) の研究者は、GPT-4 のようなジェネラリスト LLM が 73% のケースで有効なシグマ ルールを生成することを実証しました。 構造化された CTI レポートでは 41% でしたが、直接 SPL 出力を求めた場合は 41% でした。データセットを微調整すると、この割合は 89% に上昇します 安全仕様。
AI 支援パイプラインのアーキテクチャ
検出ルールを生成するための AI 支援パイプラインは、次の 5 つの異なるステージで構成されます。
- 摂取: 脅威レポート、CTI ブログ投稿、CVE アドバイザリーの取り込み
- 抽出: IOC 抽出、ATT&CK テクニック、動作の説明
- 世代: LLM によるシグマ ルールの生成
- 検証: 自動構文および意味検証
- テスト: 合成ログと CI/CD 統合を使用したテスト
# Architettura base della pipeline AI-assisted
# File: ai_sigma_pipeline.py
import openai
import yaml
import subprocess
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
@dataclass
class ThreatReport:
content: str
source_url: str
report_type: str # 'cti_blog', 'advisory', 'malware_analysis'
@dataclass
class GeneratedRule:
sigma_yaml: str
mitre_techniques: list[str]
confidence: float
validation_passed: bool
test_results: Optional[dict] = None
class AISigmaPipeline:
def __init__(self, openai_api_key: str, rules_output_dir: str):
self.client = openai.OpenAI(api_key=openai_api_key)
self.output_dir = Path(rules_output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def process_report(self, report: ThreatReport) -> list[GeneratedRule]:
"""Pipeline completa da report a regole validate."""
# Stage 1: Extraction
techniques = self._extract_attack_techniques(report)
behaviors = self._extract_behaviors(report)
# Stage 2: Generation
rules = []
for technique in techniques:
rule = self._generate_sigma_rule(
report=report,
technique=technique,
behaviors=behaviors
)
if rule:
rules.append(rule)
# Stage 3: Validation + Testing
return [self._validate_and_test(r) for r in rules]
def _extract_attack_techniques(self, report: ThreatReport) -> list[str]:
"""Estrae tecniche ATT&CK dal report tramite LLM."""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"Sei un analista di threat intelligence esperto in MITRE ATT&CK. "
"Estrai SOLO le tecniche ATT&CK (formato T1234 o T1234.001) "
"esplicitamente descritte nel testo. Rispondi solo con una lista JSON."
)
},
{
"role": "user",
"content": f"Report:\n{report.content[:4000]}"
}
],
temperature=0.1 # Bassa temperatura per output deterministico
)
import json
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return []
高品質のための迅速なエンジニアリング Sigma Rules
出力の品質は、プロンプトの構造に大きく依存します。結果を出すための基本的なパターンは 3 つあります シグマ ルールの生成で構成されます。
パターン 1: 構造化されたシステム プロンプト
システム プロンプトには、モデルが有効なシグマを生成するために必要なメタ情報が正確に含まれている必要があります。
YAML 構造、有効な値 category e productのベストプラクティス
falsepositives 許可される重大度のレベル。
# Prompt di sistema ottimizzato per generazione Sigma rules
SIGMA_SYSTEM_PROMPT = """
Sei un Detection Engineer esperto nella scrittura di Sigma rules.
Quando generi una Sigma rule, rispetta SEMPRE questa struttura YAML:
title: [titolo descrittivo, max 80 char]
id: [UUID v4]
status: experimental
description: [descrizione dettagliata del comportamento rilevato]
references:
- [URL del report originale se disponibile]
author: AI-Assisted Detection
date: [data odierna in formato YYYY-MM-DD]
tags:
- attack.[tattica]
- attack.[tecnica]
logsource:
category: [process_creation | network_connection | file_event | registry_event]
product: [windows | linux | macos]
detection:
[nome_selettore]:
[campo]: [valore o lista valori]
condition: [nome_selettore]
falsepositives:
- [casi legittimi plausibili]
level: [informational | low | medium | high | critical]
REGOLE CRITICHE:
- Usa SEMPRE wildcards (*) nei valori stringa per evitare match esatti fragili
- Preferisci campi con alta disponibilità (Image, CommandLine, ParentImage)
- Indica sempre almeno un falso positivo realistico
- Il campo 'condition' deve essere semplice e leggibile
- Non usare regex complesse se un approccio con keywords e sufficiente
"""
def build_generation_prompt(technique_id: str, behaviors: list[str],
logsource_hint: str, report_excerpt: str) -> str:
return f"""Genera una Sigma rule per rilevare la tecnica MITRE ATT&CK {technique_id}.
Comportamenti osservati nel report:
{chr(10).join(f'- {b}' for b in behaviors[:5])}
Tipo di log suggerito: {logsource_hint}
Estratto del report originale:
{report_excerpt[:1500]}
Genera UNA SOLA Sigma rule in formato YAML valido. Non aggiungere spiegazioni fuori dal YAML."""
パターン 2: ショット数が少なく、質の高い作例
プロンプト (少数ショット) に 2 ~ 3 の高品質なルールの例を含めると、出力の一貫性が大幅に向上します。 特に異常なログソースや複雑な条件の場合に最適です。
# Few-shot: esempio di regola di qualità inclusa nel prompt
FEW_SHOT_EXAMPLE = """
Esempio di regola di alta qualità per ispirazione:
title: Suspicious PowerShell Encoded Command Execution
id: 5b4f6d89-1234-4321-ab12-fedcba987654
status: stable
description: >
Rileva l'esecuzione di PowerShell con parametri di encoding (-enc, -EncodedCommand)
frequentemente usati da malware per offuscare payload malevoli.
references:
- https://attack.mitre.org/techniques/T1059/001/
author: SigmaHQ Community
date: 2025-01-15
tags:
- attack.execution
- attack.t1059.001
logsource:
category: process_creation
product: windows
detection:
selection:
Image|endswith:
- '\\\\powershell.exe'
- '\\\\pwsh.exe'
CommandLine|contains:
- ' -enc '
- ' -EncodedCommand '
- ' -ec '
condition: selection
falsepositives:
- Software legittimo che usa PowerShell con encoding per configurazioni complesse
- Script di deployment enterprise
level: medium
"""
パターン 3: 明示的な思考連鎖
複雑な手法の場合、ルールを記述する前にモデルに推論を要求すると、より正確な出力が生成されます。 このアプローチではレイテンシーは増加しますが、必要な反復回数は大幅に削減されます。
def generate_with_cot(self, technique_id: str, report: ThreatReport) -> GeneratedRule:
"""Generazione con Chain-of-Thought esplicito."""
# Step 1: Chiedi al modello di ragionare
reasoning_response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SIGMA_SYSTEM_PROMPT},
{
"role": "user",
"content": f"""Prima di scrivere la regola per {technique_id}, analizza:
1. Quali artefatti forensi questa tecnica lascia nei log?
2. Qual è il logsource più appropriato?
3. Quali campi hanno la maggiore discriminazione signal/noise?
4. Quali sono i falsi positivi più comuni?
Report: {report.content[:2000]}"""
}
],
temperature=0.3
)
reasoning = reasoning_response.choices[0].message.content
# Step 2: Usa il ragionamento per guidare la generazione
rule_response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SIGMA_SYSTEM_PROMPT},
{"role": "user", "content": f"Analisi tecnica:\n{reasoning}"},
{
"role": "assistant",
"content": "Basandomi su questa analisi, genero la Sigma rule ottimale:"
},
{"role": "user", "content": "Procedi con la generazione YAML."}
],
temperature=0.1
)
return GeneratedRule(
sigma_yaml=rule_response.choices[0].message.content,
mitre_techniques=[technique_id],
confidence=0.0, # Calcolata nella validazione
validation_passed=False
)
生成されたルールの自動検証
LLM は、構文的には有効でも意味的に正しくない YAML を生成する可能性があります: 存在しないログソース、フィールド 名前が間違っています。セレクターを正しく参照しない条件です。自動検証 ルールがリポジトリに入る前のクリティカル ゲート。
import yaml
from sigma.rule import SigmaRule
from sigma.exceptions import SigmaError
import re
import uuid
class SigmaRuleValidator:
# Logsource validi più comuni
VALID_CATEGORIES = {
'process_creation', 'network_connection', 'file_event',
'registry_event', 'registry_add', 'registry_set',
'dns_query', 'image_load', 'pipe_created', 'raw_access_read'
}
VALID_LEVELS = {'informational', 'low', 'medium', 'high', 'critical'}
VALID_STATUSES = {'stable', 'test', 'experimental', 'deprecated', 'unsupported'}
def validate(self, sigma_yaml: str) -> tuple[bool, list[str]]:
"""Valida una Sigma rule. Restituisce (valida, lista errori)."""
errors = []
# 1. Validazione YAML sintattico
try:
rule_dict = yaml.safe_load(sigma_yaml)
except yaml.YAMLError as e:
return False, [f"YAML invalido: {str(e)}"]
# 2. Campi obbligatori
required_fields = ['title', 'description', 'logsource', 'detection']
for field in required_fields:
if field not in rule_dict:
errors.append(f"Campo obbligatorio mancante: {field}")
if errors:
return False, errors
# 3. Validazione logsource
logsource = rule_dict.get('logsource', {})
if 'category' in logsource:
if logsource['category'] not in self.VALID_CATEGORIES:
errors.append(
f"Categoria logsource non valida: {logsource['category']}. "
f"Valide: {', '.join(self.VALID_CATEGORIES)}"
)
# 4. Validazione detection
detection = rule_dict.get('detection', {})
if 'condition' not in detection:
errors.append("Campo 'condition' mancante in detection")
else:
condition = detection['condition']
# Verifica che i selettori nella condition esistano
selectors = [k for k in detection.keys() if k != 'condition']
# Parse base della condition per trovare riferimenti
referenced = re.findall(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\b', condition)
for ref in referenced:
if ref not in ['and', 'or', 'not', '1', 'of', 'all', 'them', 'filter']:
if ref not in selectors:
errors.append(
f"Condition referenzia '{ref}' che non esiste nei selettori: {selectors}"
)
# 5. Validazione level
level = rule_dict.get('level', '')
if level and level not in self.VALID_LEVELS:
errors.append(f"Level non valido: {level}. Validi: {self.VALID_LEVELS}")
# 6. UUID check
rule_id = rule_dict.get('id', '')
if rule_id:
try:
uuid.UUID(str(rule_id))
except ValueError:
errors.append(f"ID non e un UUID valido: {rule_id}")
else:
errors.append("Campo 'id' mancante - genera un UUID v4")
# 7. Validazione pySigma (se disponibile)
try:
SigmaRule.from_yaml(sigma_yaml)
except SigmaError as e:
errors.append(f"Errore pySigma: {str(e)}")
return len(errors) == 0, errors
アンチパターン: LLM 出力を盲目的に信頼する
AI 支援検出を実装するチームでよくある間違いは、生成されたルールを検証せずに展開することです。 LLM はシグマ生成において特定の反復可能な間違いを犯します。
- フィールドを使用する
ProcessNameの代わりにImage(シスモン) - 存在しないセレクターを参照する条件の書き込み
- 非標準のログソース カテゴリの発明
- 使用
containsターゲット ログソースでワイルドカードをサポートしていないフィールド
実際のログを使用した自動検証とテストはオプションではありません。
合成ログを使用した自動テスト
構文検証後、両方の動作をシミュレートするログで各ルールをテストする必要があります。 通常のトラフィック (偽陽性テスト) よりも悪意のあるトラフィック (真陽性テスト) が予想されます。このアプローチは、 呼ばれた ルール単体テスト、そして成熟したパイプラインと成熟したパイプラインを区別する実践 実験。
import json
from sigma.collection import SigmaCollection
from sigma.backends.test import TextQueryTestBackend
from typing import Any
class SigmaRuleTester:
def __init__(self):
self.backend = TextQueryTestBackend()
def generate_test_events(self, sigma_yaml: str,
llm_client) -> dict[str, list[dict]]:
"""Genera eventi di test tramite LLM basandosi sulla regola."""
rule_dict = yaml.safe_load(sigma_yaml)
prompt = f"""Data questa Sigma rule:
{sigma_yaml}
Genera in formato JSON due liste di eventi di log:
1. "true_positives": 3 eventi che DEVONO triggherare la regola
2. "false_positives": 3 eventi legittimi che NON devono triggherare la regola
Ogni evento deve avere i campi esatti che la regola usa per il matching.
Formato richiesto:
{
"true_positives": [
{"Image": "C:\\\\Windows\\\\System32\\\\cmd.exe", "CommandLine": "...", ...}
],
"false_positives": [
{"Image": "C:\\\\Program Files\\\\...", "CommandLine": "...", ...}
]
}"""
response = llm_client.chat.completions.create(
model="gpt-4o-mini", # Modello più economico per i test
messages=[{"role": "user", "content": prompt}],
temperature=0.2
)
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return {"true_positives": [], "false_positives": []}
def run_tests(self, sigma_yaml: str, test_events: dict) -> dict[str, Any]:
"""Esegue i test e restituisce risultati dettagliati."""
results = {
"tp_tests": {"passed": 0, "failed": 0, "details": []},
"fp_tests": {"passed": 0, "failed": 0, "details": []},
"overall_pass": False
}
# Nota: questa e una simulazione del meccanismo di test.
# In produzione si usano tool come sigma-test o un SIEM sandbox.
rule_dict = yaml.safe_load(sigma_yaml)
detection = rule_dict.get('detection', {})
for event in test_events.get('true_positives', []):
matched = self._simulate_match(event, detection)
if matched:
results["tp_tests"]["passed"] += 1
results["tp_tests"]["details"].append({"event": event, "result": "PASS"})
else:
results["tp_tests"]["failed"] += 1
results["tp_tests"]["details"].append({"event": event, "result": "FAIL - non matchato"})
for event in test_events.get('false_positives', []):
matched = self._simulate_match(event, detection)
if not matched:
results["fp_tests"]["passed"] += 1
results["fp_tests"]["details"].append({"event": event, "result": "PASS"})
else:
results["fp_tests"]["failed"] += 1
results["fp_tests"]["details"].append({"event": event, "result": "FAIL - falso positivo"})
tp_ok = results["tp_tests"]["failed"] == 0
fp_ok = results["fp_tests"]["failed"] == 0
results["overall_pass"] = tp_ok and fp_ok
return results
def _simulate_match(self, event: dict, detection: dict) -> bool:
"""Simulazione semplificata del match. Per produzione: usa sigma-test."""
# Logica di matching semplificata per demo
for selector_name, selector_criteria in detection.items():
if selector_name == 'condition':
continue
if not isinstance(selector_criteria, dict):
continue
for field, value in selector_criteria.items():
actual_field = field.split('|')[0]
modifier = field.split('|')[1] if '|' in field else 'exact'
event_value = event.get(actual_field, '')
if isinstance(value, list):
for v in value:
if self._apply_modifier(str(event_value), str(v), modifier):
return True
else:
if self._apply_modifier(str(event_value), str(value), modifier):
return True
return False
def _apply_modifier(self, event_val: str, pattern: str, modifier: str) -> bool:
pattern_clean = pattern.replace('*', '')
if modifier == 'contains':
return pattern_clean.lower() in event_val.lower()
elif modifier == 'endswith':
return event_val.lower().endswith(pattern_clean.lower())
elif modifier == 'startswith':
return event_val.lower().startswith(pattern_clean.lower())
return event_val.lower() == pattern_clean.lower()
SigmaGen: AI 支援検出のためのオープンソース フレームワーク
シグマジェンMITRE ATT&CK APAC 2025 で発表された、フレームワークの最先端技術を表す AI による検出ルール生成のためのオープンソース。このプロジェクトでは、厳選されたデータセットの微調整と、 ルールのライフサイクル全体をカバーするパイプライン アーキテクチャ。
# Integrazione con SigmaGen (workflow concettuale)
# SigmaGen usa un approccio in tre fasi:
# 1. Ingestion di CTI (blog, advisory, STIX feeds)
# 2. Extraction di tecniche ATT&CK tramite NER specializzato
# 3. Generazione Sigma tramite modello fine-tuned
# Workflow alternativo con n8n e LLM generici:
# n8n Workflow JSON (estratto concettuale)
N8N_WORKFLOW_STRUCTURE = {
"nodes": [
{
"name": "RSS_CTI_Feed",
"type": "n8n-nodes-base.rssFeedRead",
"parameters": {
"url": "https://example-cti-blog.com/feed.xml"
}
},
{
"name": "Extract_Techniques",
"type": "n8n-nodes-base.openAi",
"parameters": {
"model": "gpt-4o",
"prompt": "Estrai tecniche ATT&CK da: {{$json.content}}",
"system_prompt": "Sei un analista CTI esperto..."
}
},
{
"name": "Generate_Sigma",
"type": "n8n-nodes-base.openAi",
"parameters": {
"model": "gpt-4o",
"prompt": "Genera Sigma rule per: {{$json.techniques}}",
"system_prompt": SIGMA_SYSTEM_PROMPT
}
},
{
"name": "Validate_Rule",
"type": "n8n-nodes-base.code",
"parameters": {
"code": "// Chiama API di validazione Python"
}
},
{
"name": "GitHub_PR",
"type": "n8n-nodes-base.github",
"parameters": {
"operation": "createPullRequest",
"repository": "org/detection-rules"
}
}
]
}
# Pipeline Python completa con gestione errori
class FullAISigmaPipeline:
def __init__(self, config: dict):
self.llm_client = openai.OpenAI(api_key=config['openai_key'])
self.validator = SigmaRuleValidator()
self.tester = SigmaRuleTester()
self.max_retries = config.get('max_retries', 3)
def generate_validated_rule(self, technique_id: str,
report: ThreatReport) -> Optional[GeneratedRule]:
"""Genera, valida e testa una regola con retry automatico."""
errors_history = []
for attempt in range(self.max_retries):
# Genera la regola (con errori precedenti nel prompt se disponibili)
sigma_yaml = self._generate_with_error_feedback(
technique_id, report, errors_history
)
# Valida
is_valid, errors = self.validator.validate(sigma_yaml)
if not is_valid:
errors_history.extend(errors)
continue
# Test con eventi sintetici
test_events = self.tester.generate_test_events(sigma_yaml, self.llm_client)
test_results = self.tester.run_tests(sigma_yaml, test_events)
if test_results['overall_pass']:
return GeneratedRule(
sigma_yaml=sigma_yaml,
mitre_techniques=[technique_id],
confidence=self._calculate_confidence(test_results),
validation_passed=True,
test_results=test_results
)
else:
errors_history.append(f"Test falliti: {test_results}")
return None # Non e riuscito a generare una regola valida
def _generate_with_error_feedback(self, technique_id: str,
report: ThreatReport,
errors: list[str]) -> str:
"""Genera con feedback sugli errori precedenti."""
error_context = ""
if errors:
error_context = f"\n\nTentativo precedente fallito con errori:\n" + \
"\n".join(f"- {e}" for e in errors[-3:]) # Ultimi 3 errori
prompt = build_generation_prompt(
technique_id=technique_id,
behaviors=[],
logsource_hint="process_creation",
report_excerpt=report.content[:2000] + error_context
)
response = self.llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SIGMA_SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
temperature=0.1
)
return response.choices[0].message.content
def _calculate_confidence(self, test_results: dict) -> float:
"""Calcola un confidence score basato sui test."""
tp = test_results["tp_tests"]
fp = test_results["fp_tests"]
total_tp = tp["passed"] + tp["failed"]
total_fp = fp["passed"] + fp["failed"]
if total_tp == 0 or total_fp == 0:
return 0.5
tp_rate = tp["passed"] / total_tp
fp_ok_rate = fp["passed"] / total_fp
return (tp_rate + fp_ok_rate) / 2
CI/CD パイプラインへの統合
生成および検証されたルールは自動的に本番環境には導入されません。ルールはプロセスを経る必要があります。 人間によるレビューとコードとしての検出パイプラインの CI/CD ゲートの説明。おすすめのフローはAI 直接マージではなくプルリクエストを生成します。
# GitHub Actions workflow per AI-generated rules
# File: .github/workflows/ai-sigma-generation.yml
"""
name: AI Sigma Rule Generation
on:
schedule:
- cron: '0 6 * * *' # Ogni giorno alle 6:00 UTC
workflow_dispatch:
inputs:
cti_url:
description: 'URL del CTI report da processare'
required: false
jobs:
generate-rules:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: pip install openai pySigma pySigma-backend-splunk pyyaml
- name: Run AI pipeline
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: python scripts/ai_sigma_pipeline.py
- name: Validate generated rules
run: python scripts/validate_all_rules.py rules/ai-generated/
- name: Create Pull Request
uses: peter-evans/create-pull-request@v6
with:
title: '[AI-Generated] Detection rules da CTI feed'
body: |
## Regole generate automaticamente da AI
Tecniche ATT&CK rilevate e tradotte in Sigma rules.
**REVIEW OBBLIGATORIA** - Verificare prima del merge:
- [ ] Logsource corretto per il SIEM target
- [ ] Condition logicamente corretta
- [ ] Falsi positivi accettabili
- [ ] Test su ambiente staging completato
labels: 'ai-generated,needs-review'
branch: 'ai-generated-rules'
"""
# Script di validazione bulk
# File: scripts/validate_all_rules.py
import sys
from pathlib import Path
def validate_directory(rules_dir: str) -> int:
"""Valida tutte le regole in una directory. Restituisce exit code."""
validator = SigmaRuleValidator()
rules_path = Path(rules_dir)
failed = []
for rule_file in rules_path.glob("**/*.yml"):
content = rule_file.read_text()
is_valid, errors = validator.validate(content)
if not is_valid:
failed.append((rule_file.name, errors))
print(f"FAIL: {rule_file.name}")
for e in errors:
print(f" - {e}")
else:
print(f"OK: {rule_file.name}")
print(f"\nRisultati: {len(failed)} fallite su {len(list(rules_path.glob('**/*.yml')))} totali")
return 1 if failed else 0
if __name__ == "__main__":
sys.exit(validate_directory(sys.argv[1] if len(sys.argv) > 1 else "rules/"))
代替モデルとコストの考慮事項
すべての組織が機密 CTI データをクラウド API に送信できるわけではありませんし、送信したいわけでもありません。テンプレートの使用 敷地内経由 オラマ o vLLM 環境に対する具体的な代替案 データ常駐要件あり。
| モデル | シグマ品質 | 100 ルールあたりのコスト | 平均遅延 | 滞在日 |
|---|---|---|---|---|
| GPT-4o | 高 (87% 有効) | ~2.50ドル | 3-8秒 | クラウド(OpenAI) |
| GPT-4o-ミニ | 良い (71% 有効) | ~0.15ドル | 1-3秒 | クラウド(OpenAI) |
| クロード 3.5 ソネット | 高 (84% 有効) | ~$3.00 | 3-6秒 | クラウド (人類) |
| ラマ 3.1 70B (ローカル) | まあまあ (58% 有効) | ~$0 (以下) | 15~45秒 | オンプレミス |
| ミストラル 7B 微調整済み | 良い (69% 有効) | ~$0 (以下) | 5~15秒 | オンプレミス |
費用対効果の高い戦略
最初の生成と再試行には GPT-4o-mini を使用します (低コスト、高品質)。 GPT-4o は、mini が 2 回の試行後に失敗した場合にのみ適用されます。このハイブリッドアプローチにより、 同等の品質を維持しながら、平均コストは GPT-4o のみを使用する場合と比較して 30% に下がります。
実際の制限とアンチパターン
このテクノロジーの評価では、正直であることが基本です。 LLM は絶対確実ではありません 検出ルールの生成、その制限を知ることは、その制限を利用するのと同じくらい重要です 強み。
検出エンジニアリングにおける LLM の制限事項の文書化
-
野原の幻覚: モデルは存在しないフィールド名を作り出すことができます
実際のログ内 (例:
ProcessHashの代わりにHashesシズモン)。 - レポートの過学習: 生成されたルールは具体的すぎる可能性があります (IOC ベース) 一般的な動作をキャプチャする代わりに。
- 非現実的な誤検知: LLM によって生成される「偽陽性」は、多くの場合、 一般的なものであり、組織の特定の状況における実際のケースを表すものではありません。
- 闇の術: まれな、またはごく最近の (カットオフ後の) ATT&CK テクニックの場合、 取得量を増やさないと品質が大幅に低下します。
- SIEM コンテキストの欠如: モデルは特定の正規化を認識していません SIEM の (例: Elastic ECS と比較して Splunk が Windows フィールドを正規化する方法)。
SIEM コンテキスト固有の RAG
取得拡張生成 (RAG) を使用すると、組織固有のコンテキストをプロンプトに挿入できます。 SIEM フィールドの正規化、既存のルール、誤検知キャリブレーション データ。 このアプローチにより、コンテキストの欠如に関連するエラーが大幅に減少します。
# RAG per generazione Sigma contestualizzata
from chromadb import Client
from chromadb.utils import embedding_functions
class RAGSigmaGenerator:
def __init__(self, chroma_persist_dir: str, openai_key: str):
self.chroma_client = Client()
self.embed_fn = embedding_functions.OpenAIEmbeddingFunction(
api_key=openai_key,
model_name="text-embedding-3-small"
)
# Collection di regole esistenti per few-shot contestuale
self.rules_collection = self.chroma_client.get_or_create_collection(
name="sigma_rules",
embedding_function=self.embed_fn
)
# Collection di field mappings SIEM-specifici
self.field_mappings_collection = self.chroma_client.get_or_create_collection(
name="siem_field_mappings",
embedding_function=self.embed_fn
)
def index_existing_rules(self, rules_dir: str) -> None:
"""Indicizza le regole esistenti per few-shot retrieval."""
for rule_file in Path(rules_dir).glob("**/*.yml"):
content = rule_file.read_text()
rule_dict = yaml.safe_load(content)
self.rules_collection.add(
documents=[content],
metadatas=[{
"title": rule_dict.get('title', ''),
"category": rule_dict.get('logsource', {}).get('category', ''),
"tags": str(rule_dict.get('tags', []))
}],
ids=[str(rule_file)]
)
def generate_with_rag(self, technique_id: str, report: ThreatReport) -> str:
"""Genera Sigma rule con contesto recuperato dal RAG."""
# Recupera regole simili come few-shot examples
similar_rules = self.rules_collection.query(
query_texts=[report.content[:500]],
n_results=3
)
# Recupera field mappings SIEM-specifici
field_mappings = self.field_mappings_collection.query(
query_texts=[f"process_creation windows {technique_id}"],
n_results=2
)
# Costruisce prompt arricchito con contesto RAG
context = "\n\n".join(similar_rules['documents'][0][:2])
mappings_context = "\n".join(field_mappings['documents'][0])
enhanced_prompt = f"""
Regole simili esistenti nel nostro repository (usa come ispirazione):
{context}
Field mappings specifici del nostro SIEM:
{mappings_context}
Ora genera una nuova regola per la tecnica {technique_id}
basandoti sul seguente report: {report.content[:2000]}
"""
# Continua con la generazione standard...
return enhanced_prompt
結論と次のステップ
AI 支援検出エンジニアリングはトレンドではありません。これは、次のことを行う必要があるチームにとって運用上必要なものです。 新しい攻撃手法が出現するスピードに遅れないようにしてください。の組み合わせ LLM を使用した生成、厳密な自動検証、および合成ログを使用したテストにより、 新たに公開された脅威レポートにより、検出までの時間が数日から数時間に短縮されます。
重要なポイント
- LLM は、YAML 構造のおかげで、直接 SPL クエリよりも高品質のシグマ ルールを生成します。
- 迅速なエンジニアリング (構造化されたシステムの即時、少数のショット、思考の連鎖) が品質にとって重要です
- 自動構文および意味検証はオプションではありません
- AI が生成した合成ログを使用したテストで品質ループを閉じる
- 生成されたルールは、自動マージではなく人間による PR レビューを通過する必要があります。
- SIEM コンテキスト固有の RAG により、フィールド エラーと誤検知が削減されます
- ローカル モデル (Llama、微調整された Mistral) はデータ常駐の有効な代替手段です
関連記事
- シグマ ルール: ユニバーサル検出ロジックと SIEM 変換
- Git と CI/CD を使用したコードとしての検出パイプライン
- 検出ルールのテスト: セキュリティ ロジックの単体テスト
- 脅威インテリジェンスの取り込み: STIX/TAXII フィード プロセッサ







