MITRE ATT&CK 통합: 프로그래밍 방식으로 적용 범위 격차 매핑
프레임워크 마이터 공격&CK (적대적 전술, 기술 및 Common Knowledge)는 방어 사이버 보안의 공통 언어가 되었습니다. 14가지 전술로 구성된 200개 이상의 문서화된 기업 기술을 통해 ATT&CK는 실제 공격자의 행동에 대한 가장 완벽한 지도를 나타냅니다. 하지만 지도를 갖는 것만으로는 충분하지 않습니다. 알아야 할 사항 귀하의 조직이 위치한 곳 즉, 어떤 기술을 감지할 수 있고 어떤 기술이 중요한 격차를 나타내는지 알려줍니다.
이 기사에서는 방법을 살펴 보겠습니다. 프로그래밍 방식으로 MITRE ATT&CK 통합 탐지 엔지니어링 워크플로: STIX 데이터를 다운로드하고 구문 분석하는 방법 프레임워크, 기존 규칙을 기술에 매핑하는 방법, 식별 및 적용 범위 격차의 우선순위 지정, 히트맵 및 자동 보고서 생성 방법 탐지 상황을 경영진에게 전달합니다.
이 기사에서 배울 내용
- ATT&CK 프레임워크의 구조: 전술, 기술, 하위 기술, 절차
- ATT&CK STIX 데이터: Python을 통해 다운로드, 구문 분석 및 쿼리
- 시그마 규칙 자동 매핑 -> ATT&CK 기술
- 커버리지 분석: 정량적 커버리지 지표
- 위협 인텔리전스를 기반으로 격차 식별 및 우선순위 지정
- JSON 형식의 ATT&CK Navigator 히트맵 생성
- CISO 및 경영진을 위한 자동 보고
- 지속적인 커버리지 추적을 위해 DaC 파이프라인과 통합
ATT&CK 프레임워크: 구조 및 주요 개념
MITRE ATT&CK는 공격 기법에 대한 지식을 하나로 정리합니다. 3단계 계층구조:
- 전술 (14): 언제든지 공격자의 목표를 달성할 수 있습니다. 예: "Credential Access"(자격 증명 도용), "Lateral Movement"(확장 네트워크에서). 전술은 악의적인 행동의 "이유"입니다.
- 기술(200개 이상): 달성하기 위해 사용되는 구체적인 방법 전술의 목표. 예: T1003 "OS 자격 증명 덤핑". 기술 나는 "어떻게"입니다.
- 하위 기술(400개 이상): 기술의 특정 변형. 예: T1003.001 "LSASS 메모리", T1003.002 "보안 계정 관리자". 하위 기술은 "정확히"입니다.
- 절차: 그룹이 어떻게 행동하는지에 대한 구체적인 사례를 문서화했습니다. APT는 기법을 사용했습니다. 그들은 현장의 실제 상황을 제공합니다.
ATT&CK STIX 데이터 다운로드 및 구문 분석
MITRE는 GitHub에 STIX 2.1 형식으로 전체 프레임워크를 게시합니다. 사용하자
도서관 mitreattack-python 어떤 방식으로든 데이터에 접근하려면
구조:
pip install mitreattack-python requests pandas matplotlib
#!/usr/bin/env python3
"""
ATT&CK Framework Data Client
Scarica e analizza i dati STIX di MITRE ATT&CK.
"""
import json
import requests
from pathlib import Path
from dataclasses import dataclass, field
from mitreattack.stix20 import MitreAttackData
@dataclass
class AttackTechnique:
id: str # Es: T1003
name: str # Es: OS Credential Dumping
tactic: str # Es: credential-access
description: str
is_subtechnique: bool
parent_id: str = "" # Es: T1003 per T1003.001
platforms: list[str] = field(default_factory=list)
data_sources: list[str] = field(default_factory=list)
detection_notes: str = ""
class AttackFrameworkClient:
ENTERPRISE_URL = (
"https://raw.githubusercontent.com/mitre/cti/master/"
"enterprise-attack/enterprise-attack.json"
)
def __init__(self, cache_path: str = "attack_data.json"):
self.cache_path = Path(cache_path)
self._ensure_data()
self.attack_data = MitreAttackData(str(self.cache_path))
def _ensure_data(self) -> None:
"""Scarica i dati ATT&CK se non presenti in cache."""
if not self.cache_path.exists():
print("Downloading MITRE ATT&CK data...")
resp = requests.get(self.ENTERPRISE_URL, timeout=60)
resp.raise_for_status()
with open(self.cache_path, 'w') as f:
json.dump(resp.json(), f)
print(f"Data saved to {self.cache_path}")
def get_all_techniques(self) -> list[AttackTechnique]:
"""Recupera tutte le tecniche (incluse sub-tecniche) dal framework."""
techniques = []
stix_techniques = self.attack_data.get_techniques(remove_revoked_deprecated=True)
for t in stix_techniques:
# Estrae ID ATT&CK dalla reference esterna
attack_id = ""
for ref in t.get("external_references", []):
if ref.get("source_name") == "mitre-attack":
attack_id = ref.get("external_id", "")
break
if not attack_id:
continue
# Determina la tattica primaria
kill_chain_phases = t.get("kill_chain_phases", [])
tactic = (kill_chain_phases[0].get("phase_name", "unknown")
if kill_chain_phases else "unknown")
# Determina se e sub-tecnica
is_subtechnique = "." in attack_id
parent_id = attack_id.split(".")[0] if is_subtechnique else ""
techniques.append(AttackTechnique(
id=attack_id,
name=t.get("name", ""),
tactic=tactic,
description=t.get("description", "")[:500],
is_subtechnique=is_subtechnique,
parent_id=parent_id,
platforms=t.get("x_mitre_platforms", []),
data_sources=t.get("x_mitre_data_sources", []),
detection_notes=t.get("x_mitre_detection", "")[:300]
))
return techniques
def get_techniques_by_tactic(
self, tactic: str
) -> list[AttackTechnique]:
"""Filtra tecniche per tattica."""
all_techniques = self.get_all_techniques()
return [t for t in all_techniques if t.tactic == tactic]
def get_technique_by_id(
self, technique_id: str
) -> AttackTechnique | None:
"""Trova una tecnica specifica per ID."""
all_techniques = self.get_all_techniques()
for t in all_techniques:
if t.id == technique_id:
return t
return None
# Uso
client = AttackFrameworkClient()
techniques = client.get_all_techniques()
print(f"Tecniche caricate: {len(techniques)}")
print(f"Sub-tecniche: {sum(1 for t in techniques if t.is_subtechnique)}")
print(f"Tecniche padre: {sum(1 for t in techniques if not t.is_subtechnique)}")
시그마 규칙을 ATT&CK 기술에 매핑
잘 작성된 시그마 규칙에는 다음 형식의 ATT&CK 태그가 포함되어 있습니다.
attack.t1234.001. 이를 통해 자동으로 빌드할 수 있습니다.
적용 범위 지도:
#!/usr/bin/env python3
"""
Sigma to ATT&CK Coverage Mapper
Mappa le regole Sigma alle tecniche ATT&CK e calcola la copertura.
"""
import yaml
from pathlib import Path
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class RuleInfo:
path: str
title: str
status: str # experimental, test, stable
level: str # low, medium, high, critical
tags: list[str] = field(default_factory=list)
@dataclass
class TechniqueCoverage:
technique_id: str
technique_name: str
tactic: str
rules: list[RuleInfo] = field(default_factory=list)
@property
def coverage_score(self) -> float:
"""Score da 0 a 1 basato su numero e qualità delle regole."""
if not self.rules:
return 0.0
score = 0.0
for rule in self.rules:
# Peso in base allo status
status_weight = {
"stable": 1.0,
"test": 0.5,
"experimental": 0.25
}.get(rule.status, 0.1)
# Peso in base al livello di severita
level_weight = {
"critical": 1.0,
"high": 0.8,
"medium": 0.5,
"low": 0.3,
"informational": 0.1
}.get(rule.level, 0.1)
score += status_weight * level_weight
# Normalizza con cap a 1.0
return min(1.0, score)
@property
def stable_rule_count(self) -> int:
return sum(1 for r in self.rules if r.status == "stable")
def extract_attack_tags(rule: dict) -> list[str]:
"""Estrae gli ID tecnica ATT&CK dai tag della regola Sigma."""
technique_ids = []
for tag in rule.get("tags", []):
tag_lower = tag.lower()
if tag_lower.startswith("attack.t"):
# Estrae l'ID: "attack.t1003.001" -> "T1003.001"
tech_id = tag.replace("attack.", "").upper()
if len(tech_id) >= 5: # Minimo "T1234"
technique_ids.append(tech_id)
return technique_ids
def build_coverage_map(
rules_dir: str,
attack_client: AttackFrameworkClient
) -> dict[str, TechniqueCoverage]:
"""
Analizza il repository Sigma e costruisce la mappa di copertura
per ogni tecnica ATT&CK.
"""
coverage_map: dict[str, TechniqueCoverage] = {}
rules_path = Path(rules_dir)
for rule_file in rules_path.rglob("*.yml"):
try:
with open(rule_file, encoding='utf-8') as f:
rule = yaml.safe_load(f)
except Exception as e:
print(f"Error reading {rule_file}: {e}")
continue
if not isinstance(rule, dict):
continue
technique_ids = extract_attack_tags(rule)
if not technique_ids:
continue
rule_info = RuleInfo(
path=str(rule_file.relative_to(rules_path)),
title=rule.get("title", "Unknown"),
status=rule.get("status", "experimental"),
level=rule.get("level", "medium"),
tags=rule.get("tags", [])
)
for tech_id in technique_ids:
if tech_id not in coverage_map:
# Recupera info dalla libreria ATT&CK
technique = attack_client.get_technique_by_id(tech_id)
coverage_map[tech_id] = TechniqueCoverage(
technique_id=tech_id,
technique_name=technique.name if technique else "Unknown",
tactic=technique.tactic if technique else "unknown"
)
coverage_map[tech_id].rules.append(rule_info)
return coverage_map
격차 분석 및 우선순위 지정
커버리지 맵이 구축되면 가장 중요한 단계는 다음과 같습니다. 격차를 파악하고 우선순위를 정합니다. 모든 격차가 동일하지는 않습니다. 긴급성: T1190(공용 애플리케이션 활용)의 격차 및 많은 대부분의 경우 T1586(손상 계정)의 격차보다 더 중요합니다. 조직의 일부.
@dataclass
class CoverageGap:
technique_id: str
technique_name: str
tactic: str
priority_score: float
reasons: list[str] = field(default_factory=list)
def calculate_gap_priority(
technique: AttackTechnique,
threat_intel: dict # Frequenza osservata nei report di threat intel
) -> float:
"""
Calcola un priority score per un gap di copertura.
Score più alto = gap più urgente da colmare.
Fattori considerati:
- Frequenza di utilizzo dagli APT group (da threat intel)
- Disponibilità di data sources necessari
- Impatto potenziale della tecnica
"""
score = 0.0
reasons = []
# Fattore 1: Frequenza nelle campagne recenti (0-40 punti)
freq = threat_intel.get(technique.id, 0)
freq_score = min(40.0, freq * 10)
score += freq_score
if freq > 2:
reasons.append(f"Observed in {freq} recent threat campaigns")
# Fattore 2: Tattiche ad alto impatto (0-30 punti)
high_impact_tactics = [
"credential-access", "lateral-movement",
"impact", "exfiltration", "command-and-control"
]
if technique.tactic in high_impact_tactics:
score += 30.0
reasons.append(f"High-impact tactic: {technique.tactic}")
# Fattore 3: Data sources disponibili (0-20 punti)
# Tecniche con molti data sources sono più facili da rilevare
available_sources = len(technique.data_sources)
if available_sources >= 3:
score += 20.0
reasons.append("Multiple data sources available for detection")
elif available_sources >= 1:
score += 10.0
# Fattore 4: Supporto piattaforme comuni (0-10 punti)
if "Windows" in technique.platforms:
score += 5.0
if "Linux" in technique.platforms or "macOS" in technique.platforms:
score += 5.0
return score
def identify_coverage_gaps(
all_techniques: list[AttackTechnique],
coverage_map: dict[str, TechniqueCoverage],
threat_intel: dict,
min_coverage_score: float = 0.5
) -> list[CoverageGap]:
"""
Identifica le tecniche con copertura insufficiente
e le ordina per priorità.
"""
gaps = []
for technique in all_techniques:
coverage = coverage_map.get(technique.id)
current_score = coverage.coverage_score if coverage else 0.0
if current_score < min_coverage_score:
priority = calculate_gap_priority(technique, threat_intel)
reasons = []
if current_score == 0:
reasons.append("No detection rules exist")
else:
reasons.append(
f"Insufficient coverage score: {current_score:.2f}/1.0"
)
gaps.append(CoverageGap(
technique_id=technique.id,
technique_name=technique.name,
tactic=technique.tactic,
priority_score=priority,
reasons=reasons
))
# Ordina per priority score decrescente
return sorted(gaps, key=lambda g: g.priority_score, reverse=True)
# Esempio di threat intel basata su report pubblici
THREAT_INTEL_FREQUENCY = {
"T1059.001": 8, # PowerShell - molto comune
"T1003.001": 7, # LSASS Memory - molto comune
"T1566.001": 9, # Spearphishing Attachment
"T1190": 6, # Exploit Public-Facing Application
"T1021.001": 5, # Remote Desktop Protocol
"T1053.005": 4, # Scheduled Task
"T1078": 6, # Valid Accounts
}
ATT&CK Navigator용 히트맵 생성
ATT&CK 네비게이터 공식 MITRE 도구 대화형 매트릭스에서 적용 범위를 확인하세요. 입력 수락 프로그래밍 방식으로 생성할 수 있는 JSON 형식:
def generate_navigator_layer(
coverage_map: dict[str, TechniqueCoverage],
gaps: list[CoverageGap],
layer_name: str = "Detection Coverage"
) -> dict:
"""
Genera un layer JSON per ATT&CK Navigator che mostra
la copertura con colori: verde = buona, rosso = gap critico.
"""
techniques_layer = []
# Tecniche con copertura
for tech_id, coverage in coverage_map.items():
score = coverage.coverage_score
if score >= 0.8:
color = "#4caf50" # Verde: copertura eccellente
comment = f"Covered by {coverage.stable_rule_count} stable rules"
elif score >= 0.5:
color = "#ff9800" # Arancione: copertura parziale
comment = "Partial coverage - needs improvement"
else:
color = "#f44336" # Rosso: copertura insufficiente
comment = "Insufficient coverage"
techniques_layer.append({
"techniqueID": tech_id,
"color": color,
"comment": comment,
"score": round(score * 100),
"metadata": [
{"name": "rule_count", "value": str(len(coverage.rules))},
{"name": "stable_rules",
"value": str(coverage.stable_rule_count)}
]
})
# Gap critici senza copertura
covered_ids = set(coverage_map.keys())
for gap in gaps[:20]: # Top 20 gap prioritari
if gap.technique_id not in covered_ids:
techniques_layer.append({
"techniqueID": gap.technique_id,
"color": "#b71c1c", # Rosso scuro: gap critico
"comment": f"PRIORITY GAP (score: {gap.priority_score:.0f}): " +
", ".join(gap.reasons),
"score": 0
})
return {
"name": layer_name,
"versions": {
"attack": "14",
"navigator": "4.9",
"layer": "4.5"
},
"domain": "enterprise-attack",
"description": f"Auto-generated coverage layer - {len(coverage_map)} techniques covered",
"filters": {"platforms": ["Windows", "Linux", "macOS"]},
"sorting": 3,
"layout": {"layout": "side", "aggregateFunction": "average"},
"hideDisabled": False,
"techniques": techniques_layer,
"gradient": {
"colors": ["#ff6666", "#ffe766", "#8ec843"],
"minValue": 0,
"maxValue": 100
},
"legendItems": [
{"label": "No coverage", "color": "#b71c1c"},
{"label": "Partial coverage", "color": "#ff9800"},
{"label": "Good coverage", "color": "#4caf50"}
]
}
관리를 위한 자동 보고
경영진이 읽을 수 있는 보고서는 보안 상태를 전달해야 합니다. 명확하고 실행 가능한 방식으로:
def generate_executive_report(
all_techniques: list[AttackTechnique],
coverage_map: dict[str, TechniqueCoverage],
gaps: list[CoverageGap]
) -> str:
"""Genera un report testuale executive-friendly."""
total = len(all_techniques)
covered = len(coverage_map)
well_covered = sum(
1 for c in coverage_map.values() if c.coverage_score >= 0.8
)
partial = sum(
1 for c in coverage_map.values()
if 0.5 <= c.coverage_score < 0.8
)
insufficient = covered - well_covered - partial
uncovered = total - covered
coverage_pct = covered / total * 100
good_pct = well_covered / total * 100
report = f"""
=== DETECTION ENGINEERING COVERAGE REPORT ===
Generated: 2026-03-09
EXECUTIVE SUMMARY
-----------------
ATT&CK Techniques Total: {total:4d}
Covered (any level): {covered:4d} ({coverage_pct:.1f}%)
Well-covered (score >= 0.8): {well_covered:4d} ({good_pct:.1f}%)
Partial coverage: {partial:4d}
Insufficient coverage: {insufficient:4d}
Not covered: {uncovered:4d}
TOP 10 PRIORITY GAPS (by risk score)
--------------------------------------
"""
for i, gap in enumerate(gaps[:10], 1):
report += (
f"{i:2d}. [{gap.technique_id}] {gap.technique_name}\n"
f" Tactic: {gap.tactic}\n"
f" Priority Score: {gap.priority_score:.0f}/100\n"
f" Reasons: {', '.join(gap.reasons[:2])}\n\n"
)
report += "COVERAGE BY TACTIC\n------------------\n"
tactic_stats: dict[str, dict] = {}
for coverage in coverage_map.values():
tac = coverage.tactic
if tac not in tactic_stats:
tactic_stats[tac] = {"covered": 0, "total": 0, "score_sum": 0}
tactic_stats[tac]["covered"] += 1
tactic_stats[tac]["score_sum"] += coverage.coverage_score
for tac, stats in sorted(tactic_stats.items()):
avg_score = stats["score_sum"] / stats["covered"] * 100
report += f"{tac:35s} {stats['covered']:3d} rules, avg score {avg_score:.0f}%\n"
return report
DaC 파이프라인에 통합
적용 범위 보고서는 기본으로 병합될 때마다 자동으로 생성되어야 합니다. CI/CD 파이프라인 아티팩트로 게시됩니다.
# .github/workflows/coverage-report.yml
name: ATT&CK Coverage Report
on:
push:
branches: [main]
schedule:
- cron: '0 8 * * 1' # Ogni lunedi mattina
jobs:
coverage-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: pip install mitreattack-python pyyaml requests
- name: Generate coverage report
run: |
python scripts/attck_coverage.py \
--rules-dir ./rules \
--output-json reports/coverage.json \
--output-layer reports/navigator_layer.json \
--output-report reports/executive_report.txt
- name: Comment on PR with coverage delta
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const report = fs.readFileSync('reports/executive_report.txt', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: '## ATT&CK Coverage Update\n```\n' + report + '\n```'
});
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: attck-coverage-${{ github.sha }}
path: reports/
retention-days: 90
커버리지 분석 모범 사례
| 관행 | 설명 | 영향 |
|---|---|---|
| 가장 많이 사용되는 기술에 집중 | M-Trends, Verizon DBIR과 같은 보고서를 사용하여 우선순위를 정하세요. | 높은 |
| 데이터 소스 고려 | 보이지 않는 것을 감지할 수 없습니다. 먼저 데이터 소스를 감사하세요. | 비평가 |
| 관련 플랫폼 게임 구별 | macOS가 없다면 macOS 전용 기술을 우선시하지 마세요 | 중간 |
| 시간 경과에 따른 추적 | 주 단위로 적용 범위 델타 측정 | 높은 |
| 정기적으로 규칙 유효성을 검사합니다. | 6개월 이상 테스트되지 않은 규칙은 더 이상 작동하지 않을 수 있습니다. | 높은 |
커버리지 분석의 일반적인 실수
- 품질이 아닌 규칙을 중요하게 생각하세요. 10가지 실험 규칙 테스트된 안정적인 규칙 2개 미만의 가치가 있습니다. 가중치 적용 범위_점수 사용
- 데이터 소스를 무시합니다. Sysmon을 배포하지 않은 경우 규칙에 관계없이 많은 Windows 기술을 감지할 수 없습니다.
- 100% 커버리지는 불가능합니다: 일부 기술에는 감지가 매우 어렵습니다(토지에서 생활). 격차에 집중하라 높은 위험과 높은 탐지 가능성
- 조치 없이 신고: 모든 격차 보고서는 생성되어야 합니다. 소유자 및 기한이 포함된 탐지 항목의 백로그
결론
MITRE ATT&CK를 탐지 엔지니어링 워크플로에 프로그래밍 방식으로 통합하세요. 커버리지 분석을 수동적이고 주관적인 활동에서 프로세스로 전환합니다. 자동화되고 객관적이며 지속적입니다. 결과는 탐지 프로그램입니다 측정 가능하고, 실제 위협에 우선순위를 부여하며, 경영진에 전달 가능 구체적인 데이터로.
시리즈의 다음 기사
다음 기사에서는 경고에 대한 응답을 자동화하는 방법을 살펴보겠습니다. 와 Python의 SOAR 플레이북, 작업 조정 자동 격리, 강화 및 알림.







