MITRE ATT&CK の統合: カバレッジ ギャップをプログラムでマッピング
枠組み マイターアタック&CK (敵対的な戦術、テクニック) および Common Knowledge) は、防御的なサイバーセキュリティの共通言語となっています。 200 を超える文書化されたエンタープライズ テクニックを 14 の戦術にまとめ、 ATT&CK は、実際の攻撃者の行動の最も完全なマップを表します。 しかし、地図を持っているだけでは十分ではありません。知っておく必要があります。 あなたの組織がどこにあるのか そのマップ上で、つまり、どのテクニックを検出でき、どのテクニックが重大なギャップを表すのかを示します。
この記事ではその方法について説明します MITRE ATT&CK をプログラムで統合する 検出エンジニアリング ワークフロー: STIX データをダウンロードして解析する方法 フレームワーク、既存のルールをテクニックにマッピングする方法、特定し、 カバレッジギャップに優先順位を付け、ヒートマップと自動レポートを生成する方法 検知態勢を経営陣に伝えるため。
この記事で学べること
- ATT&CK フレームワークの構造: 戦術、テクニック、サブテクニック、手順
- ATT&CK STIX データ: Python を介したダウンロード、解析、クエリ
- シグマルールの自動マッピング -> ATT&CK テクニック
- カバレッジ分析: 定量的なカバレッジ指標
- 脅威インテリジェンスに基づいたギャップの特定と優先順位付け
- JSON 形式での ATT&CK Navigator ヒートマップの生成
- CISO と経営陣向けの自動レポート
- 継続的なカバレッジ追跡のための DaC パイプラインとの統合
ATT&CK フレームワーク: 構造と主要な概念
MITRE ATT&CKは攻撃手法に関する知識を1つにまとめたものです 3 レベルの階層:
- 戦術 (14): いつでも攻撃者のゴールを目指します。 例: 「資格情報アクセス」(資格情報の窃取)、「水平移動」(展開) ネットワーク内)。戦術とは、悪意のある行為の「理由」です。
- テクニック (200 以上): 達成するために使用される具体的な方法 戦術の目標。例: T1003「OS 資格情報のダンプ」。テクニック 私は「方法」です。
- サブテクニック (400 以上): テクニックの特定のバリエーション。 例: T1003.001 "LSASS メモリ"、T1003.002 "セキュリティ アカウント マネージャー"。 サブテクニックは「いかに正確に」です。
- 手順: グループがどのように行動するかの具体的な事例を文書化したもの APT はテクニックを使用しました。それらは、フィールドの実際のコンテキストを提供します。
ATT&CK STIX データのダウンロードと解析
MITRE は、フレームワーク全体を STIX 2.1 形式で GitHub で公開しています。使ってみましょう
図書館 mitreattack-python ある方法でデータにアクセスする
構造化された:
pip install mitreattack-python requests pandas matplotlib
#!/usr/bin/env python3
"""
ATT&CK Framework Data Client
Scarica e analizza i dati STIX di MITRE ATT&CK.
"""
import json
import requests
from pathlib import Path
from dataclasses import dataclass, field
from mitreattack.stix20 import MitreAttackData
@dataclass
class AttackTechnique:
id: str # Es: T1003
name: str # Es: OS Credential Dumping
tactic: str # Es: credential-access
description: str
is_subtechnique: bool
parent_id: str = "" # Es: T1003 per T1003.001
platforms: list[str] = field(default_factory=list)
data_sources: list[str] = field(default_factory=list)
detection_notes: str = ""
class AttackFrameworkClient:
ENTERPRISE_URL = (
"https://raw.githubusercontent.com/mitre/cti/master/"
"enterprise-attack/enterprise-attack.json"
)
def __init__(self, cache_path: str = "attack_data.json"):
self.cache_path = Path(cache_path)
self._ensure_data()
self.attack_data = MitreAttackData(str(self.cache_path))
def _ensure_data(self) -> None:
"""Scarica i dati ATT&CK se non presenti in cache."""
if not self.cache_path.exists():
print("Downloading MITRE ATT&CK data...")
resp = requests.get(self.ENTERPRISE_URL, timeout=60)
resp.raise_for_status()
with open(self.cache_path, 'w') as f:
json.dump(resp.json(), f)
print(f"Data saved to {self.cache_path}")
def get_all_techniques(self) -> list[AttackTechnique]:
"""Recupera tutte le tecniche (incluse sub-tecniche) dal framework."""
techniques = []
stix_techniques = self.attack_data.get_techniques(remove_revoked_deprecated=True)
for t in stix_techniques:
# Estrae ID ATT&CK dalla reference esterna
attack_id = ""
for ref in t.get("external_references", []):
if ref.get("source_name") == "mitre-attack":
attack_id = ref.get("external_id", "")
break
if not attack_id:
continue
# Determina la tattica primaria
kill_chain_phases = t.get("kill_chain_phases", [])
tactic = (kill_chain_phases[0].get("phase_name", "unknown")
if kill_chain_phases else "unknown")
# Determina se e sub-tecnica
is_subtechnique = "." in attack_id
parent_id = attack_id.split(".")[0] if is_subtechnique else ""
techniques.append(AttackTechnique(
id=attack_id,
name=t.get("name", ""),
tactic=tactic,
description=t.get("description", "")[:500],
is_subtechnique=is_subtechnique,
parent_id=parent_id,
platforms=t.get("x_mitre_platforms", []),
data_sources=t.get("x_mitre_data_sources", []),
detection_notes=t.get("x_mitre_detection", "")[:300]
))
return techniques
def get_techniques_by_tactic(
self, tactic: str
) -> list[AttackTechnique]:
"""Filtra tecniche per tattica."""
all_techniques = self.get_all_techniques()
return [t for t in all_techniques if t.tactic == tactic]
def get_technique_by_id(
self, technique_id: str
) -> AttackTechnique | None:
"""Trova una tecnica specifica per ID."""
all_techniques = self.get_all_techniques()
for t in all_techniques:
if t.id == technique_id:
return t
return None
# Uso
client = AttackFrameworkClient()
techniques = client.get_all_techniques()
print(f"Tecniche caricate: {len(techniques)}")
print(f"Sub-tecniche: {sum(1 for t in techniques if t.is_subtechnique)}")
print(f"Tecniche padre: {sum(1 for t in techniques if not t.is_subtechnique)}")
シグマ ルールを ATT&CK テクニックにマッピングする
適切に作成されたシグマ ルールには、次の形式の ATT&CK タグが含まれています。
attack.t1234.001。これにより、自動的にビルドできるようになります
カバレッジマップ:
#!/usr/bin/env python3
"""
Sigma to ATT&CK Coverage Mapper
Mappa le regole Sigma alle tecniche ATT&CK e calcola la copertura.
"""
import yaml
from pathlib import Path
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class RuleInfo:
path: str
title: str
status: str # experimental, test, stable
level: str # low, medium, high, critical
tags: list[str] = field(default_factory=list)
@dataclass
class TechniqueCoverage:
technique_id: str
technique_name: str
tactic: str
rules: list[RuleInfo] = field(default_factory=list)
@property
def coverage_score(self) -> float:
"""Score da 0 a 1 basato su numero e qualità delle regole."""
if not self.rules:
return 0.0
score = 0.0
for rule in self.rules:
# Peso in base allo status
status_weight = {
"stable": 1.0,
"test": 0.5,
"experimental": 0.25
}.get(rule.status, 0.1)
# Peso in base al livello di severita
level_weight = {
"critical": 1.0,
"high": 0.8,
"medium": 0.5,
"low": 0.3,
"informational": 0.1
}.get(rule.level, 0.1)
score += status_weight * level_weight
# Normalizza con cap a 1.0
return min(1.0, score)
@property
def stable_rule_count(self) -> int:
return sum(1 for r in self.rules if r.status == "stable")
def extract_attack_tags(rule: dict) -> list[str]:
"""Estrae gli ID tecnica ATT&CK dai tag della regola Sigma."""
technique_ids = []
for tag in rule.get("tags", []):
tag_lower = tag.lower()
if tag_lower.startswith("attack.t"):
# Estrae l'ID: "attack.t1003.001" -> "T1003.001"
tech_id = tag.replace("attack.", "").upper()
if len(tech_id) >= 5: # Minimo "T1234"
technique_ids.append(tech_id)
return technique_ids
def build_coverage_map(
rules_dir: str,
attack_client: AttackFrameworkClient
) -> dict[str, TechniqueCoverage]:
"""
Analizza il repository Sigma e costruisce la mappa di copertura
per ogni tecnica ATT&CK.
"""
coverage_map: dict[str, TechniqueCoverage] = {}
rules_path = Path(rules_dir)
for rule_file in rules_path.rglob("*.yml"):
try:
with open(rule_file, encoding='utf-8') as f:
rule = yaml.safe_load(f)
except Exception as e:
print(f"Error reading {rule_file}: {e}")
continue
if not isinstance(rule, dict):
continue
technique_ids = extract_attack_tags(rule)
if not technique_ids:
continue
rule_info = RuleInfo(
path=str(rule_file.relative_to(rules_path)),
title=rule.get("title", "Unknown"),
status=rule.get("status", "experimental"),
level=rule.get("level", "medium"),
tags=rule.get("tags", [])
)
for tech_id in technique_ids:
if tech_id not in coverage_map:
# Recupera info dalla libreria ATT&CK
technique = attack_client.get_technique_by_id(tech_id)
coverage_map[tech_id] = TechniqueCoverage(
technique_id=tech_id,
technique_name=technique.name if technique else "Unknown",
tactic=technique.tactic if technique else "unknown"
)
coverage_map[tech_id].rules.append(rule_info)
return coverage_map
ギャップ分析と優先順位付け
カバレッジ マップを構築したら、最も重要なステップは次のとおりです。 ギャップを特定し、優先順位を付けます。すべてのギャップが同じではない 緊急性: T1190 (公開アプリケーションの悪用) に関するギャップと多くの点 ほとんどの場合、T1586 (侵害アカウント) のギャップよりも重要です。 組織の一部。
@dataclass
class CoverageGap:
technique_id: str
technique_name: str
tactic: str
priority_score: float
reasons: list[str] = field(default_factory=list)
def calculate_gap_priority(
technique: AttackTechnique,
threat_intel: dict # Frequenza osservata nei report di threat intel
) -> float:
"""
Calcola un priority score per un gap di copertura.
Score più alto = gap più urgente da colmare.
Fattori considerati:
- Frequenza di utilizzo dagli APT group (da threat intel)
- Disponibilità di data sources necessari
- Impatto potenziale della tecnica
"""
score = 0.0
reasons = []
# Fattore 1: Frequenza nelle campagne recenti (0-40 punti)
freq = threat_intel.get(technique.id, 0)
freq_score = min(40.0, freq * 10)
score += freq_score
if freq > 2:
reasons.append(f"Observed in {freq} recent threat campaigns")
# Fattore 2: Tattiche ad alto impatto (0-30 punti)
high_impact_tactics = [
"credential-access", "lateral-movement",
"impact", "exfiltration", "command-and-control"
]
if technique.tactic in high_impact_tactics:
score += 30.0
reasons.append(f"High-impact tactic: {technique.tactic}")
# Fattore 3: Data sources disponibili (0-20 punti)
# Tecniche con molti data sources sono più facili da rilevare
available_sources = len(technique.data_sources)
if available_sources >= 3:
score += 20.0
reasons.append("Multiple data sources available for detection")
elif available_sources >= 1:
score += 10.0
# Fattore 4: Supporto piattaforme comuni (0-10 punti)
if "Windows" in technique.platforms:
score += 5.0
if "Linux" in technique.platforms or "macOS" in technique.platforms:
score += 5.0
return score
def identify_coverage_gaps(
all_techniques: list[AttackTechnique],
coverage_map: dict[str, TechniqueCoverage],
threat_intel: dict,
min_coverage_score: float = 0.5
) -> list[CoverageGap]:
"""
Identifica le tecniche con copertura insufficiente
e le ordina per priorità.
"""
gaps = []
for technique in all_techniques:
coverage = coverage_map.get(technique.id)
current_score = coverage.coverage_score if coverage else 0.0
if current_score < min_coverage_score:
priority = calculate_gap_priority(technique, threat_intel)
reasons = []
if current_score == 0:
reasons.append("No detection rules exist")
else:
reasons.append(
f"Insufficient coverage score: {current_score:.2f}/1.0"
)
gaps.append(CoverageGap(
technique_id=technique.id,
technique_name=technique.name,
tactic=technique.tactic,
priority_score=priority,
reasons=reasons
))
# Ordina per priority score decrescente
return sorted(gaps, key=lambda g: g.priority_score, reverse=True)
# Esempio di threat intel basata su report pubblici
THREAT_INTEL_FREQUENCY = {
"T1059.001": 8, # PowerShell - molto comune
"T1003.001": 7, # LSASS Memory - molto comune
"T1566.001": 9, # Spearphishing Attachment
"T1190": 6, # Exploit Public-Facing Application
"T1021.001": 5, # Remote Desktop Protocol
"T1053.005": 4, # Scheduled Task
"T1078": 6, # Valid Accounts
}
ATT&CK Navigator のヒートマップの生成
ATT&CKナビゲーター および公式 MITRE ツール インタラクティブなマトリックスでカバレッジを表示します。 ~への入力を受け入れる プログラムで生成できる JSON 形式:
def generate_navigator_layer(
coverage_map: dict[str, TechniqueCoverage],
gaps: list[CoverageGap],
layer_name: str = "Detection Coverage"
) -> dict:
"""
Genera un layer JSON per ATT&CK Navigator che mostra
la copertura con colori: verde = buona, rosso = gap critico.
"""
techniques_layer = []
# Tecniche con copertura
for tech_id, coverage in coverage_map.items():
score = coverage.coverage_score
if score >= 0.8:
color = "#4caf50" # Verde: copertura eccellente
comment = f"Covered by {coverage.stable_rule_count} stable rules"
elif score >= 0.5:
color = "#ff9800" # Arancione: copertura parziale
comment = "Partial coverage - needs improvement"
else:
color = "#f44336" # Rosso: copertura insufficiente
comment = "Insufficient coverage"
techniques_layer.append({
"techniqueID": tech_id,
"color": color,
"comment": comment,
"score": round(score * 100),
"metadata": [
{"name": "rule_count", "value": str(len(coverage.rules))},
{"name": "stable_rules",
"value": str(coverage.stable_rule_count)}
]
})
# Gap critici senza copertura
covered_ids = set(coverage_map.keys())
for gap in gaps[:20]: # Top 20 gap prioritari
if gap.technique_id not in covered_ids:
techniques_layer.append({
"techniqueID": gap.technique_id,
"color": "#b71c1c", # Rosso scuro: gap critico
"comment": f"PRIORITY GAP (score: {gap.priority_score:.0f}): " +
", ".join(gap.reasons),
"score": 0
})
return {
"name": layer_name,
"versions": {
"attack": "14",
"navigator": "4.9",
"layer": "4.5"
},
"domain": "enterprise-attack",
"description": f"Auto-generated coverage layer - {len(coverage_map)} techniques covered",
"filters": {"platforms": ["Windows", "Linux", "macOS"]},
"sorting": 3,
"layout": {"layout": "side", "aggregateFunction": "average"},
"hideDisabled": False,
"techniques": techniques_layer,
"gradient": {
"colors": ["#ff6666", "#ffe766", "#8ec843"],
"minValue": 0,
"maxValue": 100
},
"legendItems": [
{"label": "No coverage", "color": "#b71c1c"},
{"label": "Partial coverage", "color": "#ff9800"},
{"label": "Good coverage", "color": "#4caf50"}
]
}
管理用自動レポート
経営者が読めるレポートはセキュリティ体制を伝える必要があります 明確で実行可能な方法で:
def generate_executive_report(
all_techniques: list[AttackTechnique],
coverage_map: dict[str, TechniqueCoverage],
gaps: list[CoverageGap]
) -> str:
"""Genera un report testuale executive-friendly."""
total = len(all_techniques)
covered = len(coverage_map)
well_covered = sum(
1 for c in coverage_map.values() if c.coverage_score >= 0.8
)
partial = sum(
1 for c in coverage_map.values()
if 0.5 <= c.coverage_score < 0.8
)
insufficient = covered - well_covered - partial
uncovered = total - covered
coverage_pct = covered / total * 100
good_pct = well_covered / total * 100
report = f"""
=== DETECTION ENGINEERING COVERAGE REPORT ===
Generated: 2026-03-09
EXECUTIVE SUMMARY
-----------------
ATT&CK Techniques Total: {total:4d}
Covered (any level): {covered:4d} ({coverage_pct:.1f}%)
Well-covered (score >= 0.8): {well_covered:4d} ({good_pct:.1f}%)
Partial coverage: {partial:4d}
Insufficient coverage: {insufficient:4d}
Not covered: {uncovered:4d}
TOP 10 PRIORITY GAPS (by risk score)
--------------------------------------
"""
for i, gap in enumerate(gaps[:10], 1):
report += (
f"{i:2d}. [{gap.technique_id}] {gap.technique_name}\n"
f" Tactic: {gap.tactic}\n"
f" Priority Score: {gap.priority_score:.0f}/100\n"
f" Reasons: {', '.join(gap.reasons[:2])}\n\n"
)
report += "COVERAGE BY TACTIC\n------------------\n"
tactic_stats: dict[str, dict] = {}
for coverage in coverage_map.values():
tac = coverage.tactic
if tac not in tactic_stats:
tactic_stats[tac] = {"covered": 0, "total": 0, "score_sum": 0}
tactic_stats[tac]["covered"] += 1
tactic_stats[tac]["score_sum"] += coverage.coverage_score
for tac, stats in sorted(tactic_stats.items()):
avg_score = stats["score_sum"] / stats["covered"] * 100
report += f"{tac:35s} {stats['covered']:3d} rules, avg score {avg_score:.0f}%\n"
return report
DaC パイプラインへの統合
カバレッジ レポートは、メインにマージされるたびに自動的に生成される必要があります。 そして CI/CD パイプライン アーティファクトとして公開されます。
# .github/workflows/coverage-report.yml
name: ATT&CK Coverage Report
on:
push:
branches: [main]
schedule:
- cron: '0 8 * * 1' # Ogni lunedi mattina
jobs:
coverage-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: pip install mitreattack-python pyyaml requests
- name: Generate coverage report
run: |
python scripts/attck_coverage.py \
--rules-dir ./rules \
--output-json reports/coverage.json \
--output-layer reports/navigator_layer.json \
--output-report reports/executive_report.txt
- name: Comment on PR with coverage delta
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const report = fs.readFileSync('reports/executive_report.txt', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: '## ATT&CK Coverage Update\n```\n' + report + '\n```'
});
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: attck-coverage-${{ github.sha }}
path: reports/
retention-days: 90
カバレッジ分析のベストプラクティス
| 練習する | 説明 | インパクト |
|---|---|---|
| 最もよく使われるテクニックに焦点を当てる | M-Trends、Verizon DBIR などのレポートを使用して優先順位を付ける | 高い |
| データソースを検討する | 見えないものは検出できません。まずデータソースを監査する | 評論家 |
| 関連するプラットフォーマーを区別する | macOS をお持ちでない場合は、macOS 専用のテクニックを優先しないでください。 | 中くらい |
| 長期にわたる追跡 | カバレッジデルタを週ごとに測定する | 高い |
| ルールを定期的に検証する | 6 か月以上テストされていないルールは機能しなくなる可能性があります | 高い |
カバレッジ分析でよくある間違い
- 品質ではなくルールを重視します。 10の実験ルール テストされた安定したルール 2 つ未満の価値があります。加重カバレッジスコアを使用する
- データ ソースを無視します。 Sysmon が導入されていない場合は、 多くの Windows テクニックは、ルールに関係なく検出できません
- 100% をカバーすることは不可能です。 いくつかのテクニックは 検出は非常に困難です(陸地以外で生活しているため)。ギャップに注目 高いリスクと高い検出可能性
- 何もせずに報告する: すべてのギャップレポートを生成する必要があります 所有者と期限を含む検出アイテムのバックログ
結論
MITRE ATT&CK をプログラム的に検出エンジニアリング ワークフローに統合します カバレッジ分析を手動で主観的なアクティビティからプロセスに変換します。 自動化、客観的、継続的。その結果が検出プログラムです 測定可能で、実際の脅威に優先順位が付けられ、管理者に伝達可能 具体的なデータとともに。
シリーズの次の記事
次の記事では、アラートへの対応を自動化する方法について説明します。 と Python の SOAR プレイブック、の行動を調整する 自動的な封じ込め、強化、通知。







