Python의 SOAR 플레이북: 사고 대응 자동화
SOAR(보안 오케스트레이션, 자동화 및 대응) 플레이북과 스크립트 이상의 기능: 보안 사고에 대한 공식적이고 반복 가능한 대응 조치 조율입니다. 경고가 플레이북을 트리거하면 조정된 작업 순서(데이터 강화, 맬웨어 분석, 자동 격리, 알림 및 문서화가 대신 몇 초 만에 수행됩니다. 시간보다 훨씬 단축되어 MTTR(평균 응답 시간)이 눈에 띄게 단축됩니다.
최근 업계 데이터는 다음과 같은 영향을 확인합니다. Python 기반 구현이 달성되었습니다. 92% 정확도의 2.5분 MTTR, 시간당 500건의 사건을 관리합니다. Cortex XSOAR, Cortex가 포함된 TheHive 및 Shuffle(오픈 소스)과 같은 플랫폼을 사용하면 다음을 수행할 수 있습니다. 보안 생태계에 대한 전체 액세스 권한을 갖고 Python으로 플레이북을 구현합니다.
이 문서에서는 세 가지 일반적인 시나리오에 대해 Python으로 완전한 SOAR 플레이북을 구축합니다. 엔드포인트 악성코드, 무차별 대입. 각각에 대해 전체 흐름이 구현됩니다. 선별, 강화, 봉쇄, 에스컬레이션 및 자동 문서화.
무엇을 배울 것인가
- Python의 모듈형 SOAR 플레이북 아키텍처
- 자동 강화: VirusTotal, Shodan, Active Directory
- 자동 격리: 엔드포인트 격리, IP 차단, 사용자 비활성화
- 오케스트레이션을 위한 TheHive 및 Cortex와의 통합
- 플레이북 테스트 및 버전 관리
- 컨텍스트가 미리 입력된 인적 에스컬레이션 패턴
모듈형 SOAR 플레이북의 아키텍처
독립적인 기능 블록으로 구성된 잘 설계된 플레이북 다른 플레이북 간에 재사용됩니다. 모듈식 구조로 테스트 및 유지 관리가 용이합니다. 그리고 시간이 지남에 따라 진화합니다.
SOAR 플레이북의 기본 구성 요소는 다음과 같습니다.
- 트리거: 플레이북을 트리거하는 이벤트(SIEM의 경고, 의심스러운 이메일, 사용자 신고)
- 농축: 추가 컨텍스트 수집(인텔 위협, 자산 정보, 사용자 기록)
- 결정: 자동 분류 논리(오탐지? 에스컬레이션? 즉시 봉쇄?)
- 행동: 대응 조치(격리, 차단, 알림)
- 선적 서류 비치: 티켓 업데이트, 플레이북 로그, 증거 수집
- 폐쇄/에스컬레이션: 자동 종료 또는 인간 분석가에게 에스컬레이션
# Framework base per Playbook SOAR
# File: soar_framework.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from enum import Enum
import logging
class PlaybookStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
ESCALATED = "escalated"
FAILED = "failed"
class ActionResult(Enum):
SUCCESS = "success"
FAILURE = "failure"
SKIPPED = "skipped"
ESCALATE = "escalate"
@dataclass
class PlaybookContext:
"""Contesto condiviso tra tutti gli step del playbook."""
alert_id: str
alert_type: str
alert_data: dict
start_time: datetime = field(default_factory=datetime.now)
enrichment_data: dict = field(default_factory=dict)
actions_taken: list[dict] = field(default_factory=list)
evidence: list[dict] = field(default_factory=list)
escalation_reason: Optional[str] = None
status: PlaybookStatus = PlaybookStatus.RUNNING
def add_action(self, action: str, result: ActionResult,
details: dict = None) -> None:
self.actions_taken.append({
'timestamp': datetime.now().isoformat(),
'action': action,
'result': result.value,
'details': details or {}
})
def add_evidence(self, evidence_type: str, data: Any,
source: str) -> None:
self.evidence.append({
'timestamp': datetime.now().isoformat(),
'type': evidence_type,
'data': data,
'source': source
})
def set_escalation(self, reason: str) -> None:
self.escalation_reason = reason
self.status = PlaybookStatus.ESCALATED
class PlaybookStep(ABC):
"""Classe base per ogni step del playbook."""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"soar.{name}")
@abstractmethod
def execute(self, context: PlaybookContext) -> ActionResult:
"""Esegue lo step. Deve essere implementato da ogni sottoclasse."""
pass
def __str__(self) -> str:
return self.name
class Playbook:
"""Orchestratore del playbook: esegue gli step in sequenza."""
def __init__(self, name: str, steps: list[PlaybookStep]):
self.name = name
self.steps = steps
self.logger = logging.getLogger(f"soar.playbook.{name}")
def run(self, alert_id: str, alert_type: str,
alert_data: dict) -> PlaybookContext:
context = PlaybookContext(
alert_id=alert_id,
alert_type=alert_type,
alert_data=alert_data
)
self.logger.info(f"Playbook '{self.name}' avviato per alert {alert_id}")
for step in self.steps:
if context.status in [PlaybookStatus.ESCALATED, PlaybookStatus.FAILED]:
self.logger.info(f"Skip step '{step}': playbook in stato {context.status}")
break
self.logger.info(f"Esecuzione step: {step}")
try:
result = step.execute(context)
context.add_action(step.name, result)
if result == ActionResult.ESCALATE:
context.status = PlaybookStatus.ESCALATED
self.logger.warning(f"Step '{step}' richiede escalation")
break
except Exception as e:
self.logger.error(f"Step '{step}' fallito: {e}", exc_info=True)
context.add_action(step.name, ActionResult.FAILURE,
{'error': str(e)})
context.status = PlaybookStatus.FAILED
break
if context.status == PlaybookStatus.RUNNING:
context.status = PlaybookStatus.COMPLETED
self.logger.info(f"Playbook completato con status: {context.status.value}")
return context
피싱 플레이북: 선별부터 격리까지
피싱은 SOC에서 가장 일반적인 시나리오입니다. 성숙한 플레이북으로 응답 시간 단축 45분(수동)에서 2분 미만(자동)까지 URL, 첨부파일 분석, 이메일 헤더를 확인하고 클릭한 사용자를 격리합니다.
# Playbook Phishing Completo
# File: playbooks/phishing_playbook.py
import httpx
import re
import base64
from email.parser import Parser
from email.policy import default as default_policy
class ExtractEmailArtifactsStep(PlaybookStep):
"""Estrae URL, allegati e header dall'email sospetta."""
def __init__(self):
super().__init__("extract_email_artifacts")
def execute(self, context: PlaybookContext) -> ActionResult:
email_raw = context.alert_data.get('email_raw', '')
if not email_raw:
return ActionResult.SKIPPED
# Parse email
msg = Parser(policy=default_policy).parsestr(email_raw)
# Estrai header
headers = {
'from': msg.get('From', ''),
'reply_to': msg.get('Reply-To', ''),
'x_originating_ip': msg.get('X-Originating-IP', ''),
'dkim': 'DKIM-Signature' in msg,
'spf': 'Received-SPF' in email_raw,
'dmarc': 'DMARC' in email_raw.upper()
}
# Estrai URL dal body
body = msg.get_body(preferencelist=('plain', 'html'))
body_text = body.get_content() if body else ''
urls = re.findall(r'https?://[^\s<>"]+', body_text)
# Estrai allegati
attachments = []
for part in msg.iter_attachments():
attachments.append({
'filename': part.get_filename(),
'content_type': part.get_content_type(),
'size': len(part.get_payload(decode=True) or b''),
'content_b64': base64.b64encode(
part.get_payload(decode=True) or b''
).decode()
})
context.enrichment_data['email_headers'] = headers
context.enrichment_data['urls'] = list(set(urls)) # dedup
context.enrichment_data['attachments'] = attachments
context.add_evidence('email_headers', headers, 'email_parser')
context.add_evidence('extracted_urls', urls, 'email_parser')
self.logger.info(f"Estratti {len(urls)} URL e {len(attachments)} allegati")
return ActionResult.SUCCESS
class VirusTotalEnrichmentStep(PlaybookStep):
"""Analizza URL e hash allegati con VirusTotal."""
def __init__(self, vt_api_key: str):
super().__init__("virustotal_enrichment")
self.vt_api_key = vt_api_key
self.base_url = "https://www.virustotal.com/api/v3"
self.headers = {"x-apikey": vt_api_key}
def execute(self, context: PlaybookContext) -> ActionResult:
vt_results = {}
# Analizza URL
for url in context.enrichment_data.get('urls', [])[:10]: # Max 10 URL
try:
result = self._check_url(url)
vt_results[url] = result
if result.get('malicious', 0) > 0:
context.add_evidence('malicious_url', {'url': url, 'vt': result}, 'virustotal')
except Exception as e:
self.logger.warning(f"VT check URL fallito per {url}: {e}")
context.enrichment_data['virustotal'] = vt_results
# Determina se e necessario containment immediato
malicious_count = sum(
1 for r in vt_results.values() if r.get('malicious', 0) > 2
)
if malicious_count > 0:
context.enrichment_data['vt_verdict'] = 'malicious'
self.logger.warning(f"Trovati {malicious_count} URL malevoli su VT")
else:
context.enrichment_data['vt_verdict'] = 'clean'
return ActionResult.SUCCESS
def _check_url(self, url: str) -> dict:
"""Controlla un URL su VirusTotal."""
import hashlib
url_id = base64.urlsafe_b64encode(url.encode()).decode().rstrip('=')
with httpx.Client() as client:
response = client.get(
f"{self.base_url}/urls/{url_id}",
headers=self.headers,
timeout=10
)
if response.status_code == 404:
# Submetti per analisi
post_response = client.post(
f"{self.base_url}/urls",
headers=self.headers,
data={"url": url},
timeout=10
)
return {"status": "submitted", "malicious": 0}
if response.status_code != 200:
return {"status": "error", "malicious": 0}
data = response.json()
stats = data.get('data', {}).get(
'attributes', {}
).get('last_analysis_stats', {})
return {
"malicious": stats.get('malicious', 0),
"suspicious": stats.get('suspicious', 0),
"harmless": stats.get('harmless', 0),
"undetected": stats.get('undetected', 0)
}
class TriageDecisionStep(PlaybookStep):
"""Decide l'azione basandosi sull'enrichment."""
def __init__(self):
super().__init__("triage_decision")
def execute(self, context: PlaybookContext) -> ActionResult:
vt_verdict = context.enrichment_data.get('vt_verdict', 'unknown')
headers = context.enrichment_data.get('email_headers', {})
# Score-based triage
risk_score = 0
# Fattori di rischio
if vt_verdict == 'malicious':
risk_score += 40
if not headers.get('dkim', True):
risk_score += 15
if not headers.get('spf', True):
risk_score += 15
if not headers.get('dmarc', True):
risk_score += 10
if context.enrichment_data.get('attachments'):
for att in context.enrichment_data['attachments']:
if any(ext in str(att.get('filename', '')).lower()
for ext in ['.exe', '.js', '.vbs', '.ps1', '.macro']):
risk_score += 20
context.enrichment_data['risk_score'] = risk_score
self.logger.info(f"Risk score calcolato: {risk_score}")
if risk_score >= 50:
context.enrichment_data['triage_result'] = 'high_risk'
return ActionResult.SUCCESS # Procedi con containment
elif risk_score >= 25:
context.enrichment_data['triage_result'] = 'medium_risk'
context.set_escalation(
f"Risk score {risk_score}: richiede review umana"
)
return ActionResult.ESCALATE
else:
context.enrichment_data['triage_result'] = 'low_risk'
return ActionResult.SUCCESS
class ContainPhishingStep(PlaybookStep):
"""Azioni di containment per phishing confermato."""
def __init__(self, exchange_client, ad_client):
super().__init__("contain_phishing")
self.exchange_client = exchange_client
self.ad_client = ad_client
def execute(self, context: PlaybookContext) -> ActionResult:
if context.enrichment_data.get('triage_result') != 'high_risk':
return ActionResult.SKIPPED
affected_user = context.alert_data.get('recipient_email', '')
actions = []
# 1. Rimuovi email simili da tutte le mailbox
if context.enrichment_data.get('urls'):
for url in context.enrichment_data['urls'][:3]:
result = self.exchange_client.search_and_delete(
sender=context.alert_data.get('sender_email', ''),
url_contains=url
)
actions.append({'type': 'email_deletion', 'url': url, 'result': result})
# 2. Blocca mittente in Exchange
sender = context.alert_data.get('sender_email', '')
if sender:
self.exchange_client.add_to_blocklist(sender)
actions.append({'type': 'sender_blocked', 'sender': sender})
# 3. Forza reset password se utente ha cliccato link
if context.alert_data.get('user_clicked_link', False):
self.ad_client.force_password_reset(affected_user)
self.ad_client.revoke_sessions(affected_user)
actions.append({
'type': 'password_reset',
'user': affected_user,
'reason': 'User clicked malicious link'
})
context.add_evidence('containment_actions', actions, 'soar_playbook')
self.logger.info(f"Containment completato: {len(actions)} azioni")
return ActionResult.SUCCESS
def build_phishing_playbook(vt_api_key: str,
exchange_client,
ad_client) -> Playbook:
"""Factory per il playbook phishing."""
return Playbook(
name="phishing_response",
steps=[
ExtractEmailArtifactsStep(),
VirusTotalEnrichmentStep(vt_api_key),
TriageDecisionStep(),
ContainPhishingStep(exchange_client, ad_client),
DocumentIncidentStep(), # Definito sotto
NotifyStakeholdersStep()
]
)
엔드포인트 플레이북의 악성 코드: 강화 및 격리
엔드포인트 악성코드 플레이북은 보다 적극적으로 대응해야 합니다: 격리 끝점의 측면 이동을 방지하는 데 중요합니다. 하지만 격리 전에는 필수입니다. 가능한 한 많은 법의학 데이터를 수집하십시오.
# Playbook Malware Endpoint
# File: playbooks/malware_endpoint_playbook.py
class CollectForensicDataStep(PlaybookStep):
"""Raccoglie dati forensi dall'endpoint prima dell'isolamento."""
def __init__(self, edr_client):
super().__init__("collect_forensic_data")
self.edr = edr_client
def execute(self, context: PlaybookContext) -> ActionResult:
endpoint = context.alert_data.get('endpoint_hostname', '')
malicious_process = context.alert_data.get('process_name', '')
forensics = {}
# Processo malevolo
forensics['process_tree'] = self.edr.get_process_tree(
endpoint, malicious_process
)
# Network connections del processo
forensics['network_connections'] = self.edr.get_process_network(
endpoint, context.alert_data.get('pid')
)
# File system changes nelle ultime 2 ore
forensics['file_changes'] = self.edr.get_recent_file_changes(
endpoint, hours_back=2
)
# Autorun entries (persistence)
forensics['autoruns'] = self.edr.get_autoruns(endpoint)
# Memory dump del processo (se disponibile)
try:
forensics['memory_dump_path'] = self.edr.dump_process_memory(
endpoint, context.alert_data.get('pid')
)
except Exception as e:
self.logger.warning(f"Memory dump non disponibile: {e}")
context.enrichment_data['forensics'] = forensics
context.add_evidence('forensic_collection', forensics, 'edr')
return ActionResult.SUCCESS
class MalwareHashAnalysisStep(PlaybookStep):
"""Analizza hash del malware su multipli servizi TI."""
def __init__(self, vt_api_key: str, malware_bazaar_key: str):
super().__init__("malware_hash_analysis")
self.vt_key = vt_api_key
self.bazaar_key = malware_bazaar_key
def execute(self, context: PlaybookContext) -> ActionResult:
file_hash = context.alert_data.get('file_hash', '')
if not file_hash:
return ActionResult.SKIPPED
results = {}
# VirusTotal
with httpx.Client() as client:
vt_resp = client.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers={"x-apikey": self.vt_key},
timeout=15
)
if vt_resp.status_code == 200:
vt_data = vt_resp.json()['data']['attributes']
stats = vt_data.get('last_analysis_stats', {})
results['virustotal'] = {
'malicious': stats.get('malicious', 0),
'total': sum(stats.values()),
'family': vt_data.get('popular_threat_classification', {}).get(
'suggested_threat_label', 'unknown'
)
}
# MalwareBazaar
with httpx.Client() as client:
bazaar_resp = client.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_info", "hash": file_hash},
timeout=15
)
if bazaar_resp.status_code == 200:
bazaar_data = bazaar_resp.json()
if bazaar_data.get('query_status') == 'ok':
results['malware_bazaar'] = bazaar_data.get('data', [{}])[0]
context.enrichment_data['malware_analysis'] = results
# Aggiorna verdict
vt_malicious = results.get('virustotal', {}).get('malicious', 0)
if vt_malicious > 5:
context.enrichment_data['malware_verdict'] = 'confirmed_malware'
elif vt_malicious > 0:
context.enrichment_data['malware_verdict'] = 'suspicious'
else:
context.enrichment_data['malware_verdict'] = 'unknown'
return ActionResult.SUCCESS
class IsolateEndpointStep(PlaybookStep):
"""Isola l'endpoint dalla rete per prevenire lateral movement."""
def __init__(self, edr_client, cmdb_client):
super().__init__("isolate_endpoint")
self.edr = edr_client
self.cmdb = cmdb_client
def execute(self, context: PlaybookContext) -> ActionResult:
verdict = context.enrichment_data.get('malware_verdict', 'unknown')
if verdict not in ['confirmed_malware', 'suspicious']:
return ActionResult.SKIPPED
endpoint = context.alert_data.get('endpoint_hostname', '')
# Verifica criticalita dell'asset prima dell'isolamento
asset_info = self.cmdb.get_asset_info(endpoint)
if asset_info.get('criticality') == 'critical':
# Non isolare automaticamente asset critici
context.set_escalation(
f"Endpoint '{endpoint}' e critico (tipo: {asset_info.get('type')}). "
f"Isolamento richiede approvazione manuale."
)
return ActionResult.ESCALATE
# Isola l'endpoint
isolation_result = self.edr.isolate_endpoint(
endpoint,
reason=f"Malware detection - alert {context.alert_id}",
allow_edr_communication=True # Mantieni canale per remediation
)
context.add_evidence('isolation', {
'endpoint': endpoint,
'result': isolation_result,
'timestamp': datetime.now().isoformat()
}, 'edr')
self.logger.info(f"Endpoint {endpoint} isolato: {isolation_result}")
return ActionResult.SUCCESS
자동 문서화 및 티켓 생성
성숙한 플레이북은 자동으로 사고 문서를 생성하여 티켓을 채웁니다. 모든 강화 데이터, 취해진 조치 및 수정 권장 사항이 포함됩니다. 이는 종종 분석가 시간의 40%를 차지하는 "보고 오버헤드"를 제거합니다.
# Documentazione Automatica
# File: steps/documentation_step.py
import jinja2
INCIDENT_REPORT_TEMPLATE = """
## Incident Report - {{ context.alert_id }}
**Status**: {{ context.status.value }}
**Start Time**: {{ context.start_time.strftime('%Y-%m-%d %H:%M:%S') }}
**Alert Type**: {{ context.alert_type }}
### Summary
{{ summary }}
### Risk Assessment
- **Risk Score**: {{ context.enrichment_data.get('risk_score', 'N/A') }}
- **Verdict**: {{ context.enrichment_data.get('vt_verdict', context.enrichment_data.get('malware_verdict', 'N/A')) }}
- **Triage Result**: {{ context.enrichment_data.get('triage_result', 'N/A') }}
### Threat Intelligence
{% if context.enrichment_data.get('virustotal') %}
**VirusTotal**: {{ context.enrichment_data.virustotal | tojson(indent=2) }}
{% endif %}
### Actions Taken
{% for action in context.actions_taken %}
- **{{ action.timestamp }}** - {{ action.action }}: {{ action.result }}
{% if action.details %} Details: {{ action.details }}{% endif %}
{% endfor %}
### Evidence Collected
{% for ev in context.evidence %}
- **{{ ev.type }}** (from {{ ev.source }}): {{ ev.timestamp }}
{% endfor %}
{% if context.escalation_reason %}
### Escalation Required
**Reason**: {{ context.escalation_reason }}
**Recommended Actions**:
1. Validate analyst judgment on enrichment data
2. Confirm containment or determine alternative
3. Initiate full forensic investigation if warranted
{% endif %}
### Indicators of Compromise
{% if context.enrichment_data.get('urls') %}
**Malicious URLs**:
{% for url in context.enrichment_data.urls %}
- {{ url }}
{% endfor %}
{% endif %}
"""
class DocumentIncidentStep(PlaybookStep):
"""Crea documentazione strutturata dell'incidente."""
def __init__(self, thehive_client=None):
super().__init__("document_incident")
self.thehive = thehive_client
self.template = jinja2.Template(INCIDENT_REPORT_TEMPLATE)
def execute(self, context: PlaybookContext) -> ActionResult:
# Genera summary basato sul tipo di alert
summary = self._generate_summary(context)
# Renderizza il report
report = self.template.render(
context=context,
summary=summary
)
context.enrichment_data['incident_report'] = report
# Crea/aggiorna caso in TheHive se disponibile
if self.thehive:
try:
case_id = context.alert_data.get('thehive_case_id')
if case_id:
self.thehive.update_case(
case_id,
description=report,
tags=self._extract_tags(context)
)
else:
new_case_id = self.thehive.create_case({
'title': f"[SOAR] {context.alert_type} - {context.alert_id}",
'description': report,
'severity': self._determine_severity(context),
'tags': self._extract_tags(context)
})
context.enrichment_data['thehive_case_id'] = new_case_id
except Exception as e:
self.logger.error(f"Errore creazione caso TheHive: {e}")
return ActionResult.SUCCESS
def _generate_summary(self, context: PlaybookContext) -> str:
alert_type = context.alert_type
if alert_type == 'phishing':
return (
f"Email di phishing rilevata con {len(context.enrichment_data.get('urls', []))} "
f"URL sospetti. Verdict VirusTotal: {context.enrichment_data.get('vt_verdict', 'N/A')}."
)
elif alert_type == 'malware_endpoint':
endpoint = context.alert_data.get('endpoint_hostname', 'N/A')
return (
f"Malware rilevato su {endpoint}. "
f"Verdict: {context.enrichment_data.get('malware_verdict', 'N/A')}."
)
return "Incidente di sicurezza processato automaticamente."
def _extract_tags(self, context: PlaybookContext) -> list[str]:
tags = [f"soar-auto", f"type:{context.alert_type}"]
if context.status == PlaybookStatus.ESCALATED:
tags.append("needs-human-review")
verdict = context.enrichment_data.get('vt_verdict') or \
context.enrichment_data.get('malware_verdict')
if verdict:
tags.append(f"verdict:{verdict}")
return tags
def _determine_severity(self, context: PlaybookContext) -> int:
score = context.enrichment_data.get('risk_score', 0)
if score >= 50:
return 3 # High
elif score >= 25:
return 2 # Medium
return 1 # Low
플레이북 테스트 및 버전 관리
SOAR 플레이북은 애플리케이션 코드와 동일하게 엄격하게 테스트되어야 합니다. 프로덕션 중 잘못된 플레이북으로 인해 과도한 격리(호스트 격리)가 발생할 수 있습니다. 적법함) 또는 불충분한 격리(실제 위협에 대한 조치 실패).
# Testing Framework per SOAR Playbook
# File: tests/test_phishing_playbook.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
class MockExchangeClient:
def __init__(self):
self.blocked_senders = []
self.deleted_emails = []
def add_to_blocklist(self, sender: str) -> None:
self.blocked_senders.append(sender)
def search_and_delete(self, sender: str, url_contains: str) -> dict:
self.deleted_emails.append({"sender": sender, "url": url_contains})
return {"deleted_count": 3, "status": "success"}
class MockADClient:
def __init__(self):
self.password_resets = []
self.revoked_sessions = []
def force_password_reset(self, user: str) -> None:
self.password_resets.append(user)
def revoke_sessions(self, user: str) -> None:
self.revoked_sessions.append(user)
class MockVirusTotalClient:
def __init__(self, malicious_urls: list[str] = None):
self.malicious_urls = malicious_urls or []
def check_url(self, url: str) -> dict:
if url in self.malicious_urls:
return {"malicious": 15, "suspicious": 3, "harmless": 0}
return {"malicious": 0, "suspicious": 0, "harmless": 50}
class TestPhishingPlaybook:
def setup_method(self):
self.exchange_client = MockExchangeClient()
self.ad_client = MockADClient()
@pytest.fixture
def high_risk_phishing_alert(self) -> dict:
return {
"alert_id": "test-001",
"alert_type": "phishing",
"sender_email": "attacker@evil.com",
"recipient_email": "victim@company.com",
"user_clicked_link": True,
"email_raw": """From: attacker@evil.com
To: victim@company.com
Subject: Urgente: Aggiorna le tue credenziali
Clicca qui: http://malicious-phish.com/steal-creds
""",
"thehive_case_id": None
}
def test_phishing_high_risk_containment(self, high_risk_phishing_alert):
"""Test: phishing ad alto rischio deve triggherare containment."""
with patch('httpx.Client') as mock_http:
# Mocka VirusTotal response con URL malevolo
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"data": {
"attributes": {
"last_analysis_stats": {
"malicious": 15, "suspicious": 2,
"harmless": 0, "undetected": 5
}
}
}
}
mock_http.return_value.__enter__.return_value.get.return_value = mock_response
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
context = playbook.run(
alert_id=high_risk_phishing_alert['alert_id'],
alert_type=high_risk_phishing_alert['alert_type'],
alert_data=high_risk_phishing_alert
)
# Verifica stato finale
assert context.status in [PlaybookStatus.COMPLETED, PlaybookStatus.ESCALATED]
# Se completato, verifica azioni intraprese
if context.status == PlaybookStatus.COMPLETED:
action_names = [a['action'] for a in context.actions_taken]
assert 'virustotal_enrichment' in action_names
assert 'triage_decision' in action_names
def test_low_risk_no_containment(self):
"""Test: phishing a basso rischio non deve triggherare containment."""
alert = {
"alert_id": "test-002",
"alert_type": "phishing",
"sender_email": "newsletter@legit.com",
"recipient_email": "user@company.com",
"user_clicked_link": False,
"email_raw": "From: newsletter@legit.com\nSubject: News\n\nHello!"
}
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
with patch('httpx.Client'):
context = playbook.run("test-002", "phishing", alert)
# Nessun isolamento su basso rischio
assert len(self.exchange_client.blocked_senders) == 0
assert len(self.ad_client.password_resets) == 0
모범 사례: 테스트 실행 플레이북
프로덕션에 배포하기 전에 각 플레이북을 2주 동안 테스트 실행 모드로 실행하세요. 연습 실행은 모든 강화 및 결정 단계를 수행하지만 포함 작업은 건너뜁니다. 실제(격리, 차단, 비밀번호 재설정), 그가 수행한 작업만 기록합니다. 이를 통해 위험 없이 분류 논리를 보정할 수 있습니다.
결론 및 주요 시사점
Python의 SOAR 플레이북은 현대 사고 대응의 최전선을 나타냅니다. 보안 통합 생태계를 갖춘 범용 언어의 유연성 가장 부유한. 모듈식 프레임워크는 재사용성, 테스트 가능성 및 시간이 지남에 따른 유지 관리 가능성.
주요 시사점
- 모듈식 아키텍처(단계 + 플레이북 + 컨텍스트)는 모든 SOAR 기업의 기초입니다.
- 정확한 분류 결정을 위해서는 다중 소스 강화(VT, EDR, CMDB, AD)가 중요합니다.
- 중요 자산에 대한 자동 에스컬레이션으로 과도한 격리로 인한 부수적 피해 방지
- 자동 문서화로 보고 오버헤드 제거(분석가 시간의 40%)
- 모의 클라이언트를 사용한 테스트는 생산 시스템에 영향을 주지 않고 품질을 보장합니다.
- 테스트 실행 모드는 실행 전에 분류 논리를 보정하는 데 필수적입니다.
관련 기사
- 경보 분류 자동화: 그래프 분석으로 MTTD 감소
- 위협 인텔리전스 수집: STIX/TAXII 피드 프로세서
- 분야로서의 탐지 엔지니어링: 스크립트에서 파이프라인까지







