Python の SOAR プレイブック: インシデント対応の自動化
Security Orchestration, Automation and Response (SOAR) プレイブックは、単なるスクリプトではありません。 そして、セキュリティインシデントに対する対応アクションの正式で反復可能なオーケストレーション。 アラートがプレイブックをトリガーすると、調整された一連の操作 (データ強化、 マルウェア分析、自動封じ込め、通知と文書化 - 代わりに数秒で実行されます 時間単位よりも短縮され、平均応答時間 (MTTR) が大幅に短縮されます。
最近の業界データはその影響を裏付けています: Python ベースの実装は次のような成果を達成しました 92% の精度で 2.5 分の MTTR、1 時間あたり 500 件のインシデントを管理します。 Cortex XSOAR、Cortex を使用した TheHive、および Shuffle (オープンソース) などのプラットフォームを使用すると、次のことが可能になります。 セキュリティ エコシステムに完全にアクセスできる Python でプレイブックを実装します。
この記事では、次の 3 つの一般的なシナリオに対応する完全な SOAR プレイブックを Python で構築します。 エンドポイントマルウェア、ブルートフォース。それぞれについて、フロー全体が実装されます。 トリアージ、強化、封じ込め、エスカレーション、自動文書化。
何を学ぶか
- Python でのモジュール型 SOAR プレイブックのアーキテクチャ
- 自動エンリッチメント: VirusTotal、Shodan、Active Directory
- 自動封じ込め: エンドポイントの分離、IP ブロック、ユーザーの無効化
- オーケストレーションのための TheHive および Cortex との統合
- プレイブックのテストとバージョン管理
- コンテキストが事前に設定された人間によるエスカレーション パターン
モジュール型 SOAR プレイブックのアーキテクチャ
適切に設計されたプレイブックは、独立した機能ブロックで構成されており、 異なる Playbook 間で再利用されます。モジュール構造によりテストとメンテナンスが容易になります そして時間の経過による進化。
SOAR プレイブックの基本的な構成要素は次のとおりです。
- トリガー: プレイブックをトリガーするイベント (SIEM からのアラート、不審なメール、ユーザー レポート)
- エンリッチメント: 追加のコンテキスト (インテルの脅威、資産情報、ユーザー履歴) のコレクション
- 決断: 自動トリアージ ロジック (誤検知?エスカレーション?即時封じ込め?)
- アクション: 応答アクション (分離、ブロック、通知)
- ドキュメント: チケット更新、プレイブックログ、証拠収集
- クローズ/エスカレーション: 自動終了または人間のアナリストへのエスカレーション
# Framework base per Playbook SOAR
# File: soar_framework.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from enum import Enum
import logging
class PlaybookStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
ESCALATED = "escalated"
FAILED = "failed"
class ActionResult(Enum):
SUCCESS = "success"
FAILURE = "failure"
SKIPPED = "skipped"
ESCALATE = "escalate"
@dataclass
class PlaybookContext:
"""Contesto condiviso tra tutti gli step del playbook."""
alert_id: str
alert_type: str
alert_data: dict
start_time: datetime = field(default_factory=datetime.now)
enrichment_data: dict = field(default_factory=dict)
actions_taken: list[dict] = field(default_factory=list)
evidence: list[dict] = field(default_factory=list)
escalation_reason: Optional[str] = None
status: PlaybookStatus = PlaybookStatus.RUNNING
def add_action(self, action: str, result: ActionResult,
details: dict = None) -> None:
self.actions_taken.append({
'timestamp': datetime.now().isoformat(),
'action': action,
'result': result.value,
'details': details or {}
})
def add_evidence(self, evidence_type: str, data: Any,
source: str) -> None:
self.evidence.append({
'timestamp': datetime.now().isoformat(),
'type': evidence_type,
'data': data,
'source': source
})
def set_escalation(self, reason: str) -> None:
self.escalation_reason = reason
self.status = PlaybookStatus.ESCALATED
class PlaybookStep(ABC):
"""Classe base per ogni step del playbook."""
def __init__(self, name: str):
self.name = name
self.logger = logging.getLogger(f"soar.{name}")
@abstractmethod
def execute(self, context: PlaybookContext) -> ActionResult:
"""Esegue lo step. Deve essere implementato da ogni sottoclasse."""
pass
def __str__(self) -> str:
return self.name
class Playbook:
"""Orchestratore del playbook: esegue gli step in sequenza."""
def __init__(self, name: str, steps: list[PlaybookStep]):
self.name = name
self.steps = steps
self.logger = logging.getLogger(f"soar.playbook.{name}")
def run(self, alert_id: str, alert_type: str,
alert_data: dict) -> PlaybookContext:
context = PlaybookContext(
alert_id=alert_id,
alert_type=alert_type,
alert_data=alert_data
)
self.logger.info(f"Playbook '{self.name}' avviato per alert {alert_id}")
for step in self.steps:
if context.status in [PlaybookStatus.ESCALATED, PlaybookStatus.FAILED]:
self.logger.info(f"Skip step '{step}': playbook in stato {context.status}")
break
self.logger.info(f"Esecuzione step: {step}")
try:
result = step.execute(context)
context.add_action(step.name, result)
if result == ActionResult.ESCALATE:
context.status = PlaybookStatus.ESCALATED
self.logger.warning(f"Step '{step}' richiede escalation")
break
except Exception as e:
self.logger.error(f"Step '{step}' fallito: {e}", exc_info=True)
context.add_action(step.name, ActionResult.FAILURE,
{'error': str(e)})
context.status = PlaybookStatus.FAILED
break
if context.status == PlaybookStatus.RUNNING:
context.status = PlaybookStatus.COMPLETED
self.logger.info(f"Playbook completato con status: {context.status.value}")
return context
フィッシング ハンドブック: トリアージから封じ込めまで
フィッシングは、SOC で最も一般的なシナリオです。成熟したプレイブックにより応答時間が短縮されます 45 分 (手動) から 2 分未満 (自動)、URL、添付ファイル、 電子メールのヘッダーを検証し、クリックしたユーザーを隔離します。
# Playbook Phishing Completo
# File: playbooks/phishing_playbook.py
import httpx
import re
import base64
from email.parser import Parser
from email.policy import default as default_policy
class ExtractEmailArtifactsStep(PlaybookStep):
"""Estrae URL, allegati e header dall'email sospetta."""
def __init__(self):
super().__init__("extract_email_artifacts")
def execute(self, context: PlaybookContext) -> ActionResult:
email_raw = context.alert_data.get('email_raw', '')
if not email_raw:
return ActionResult.SKIPPED
# Parse email
msg = Parser(policy=default_policy).parsestr(email_raw)
# Estrai header
headers = {
'from': msg.get('From', ''),
'reply_to': msg.get('Reply-To', ''),
'x_originating_ip': msg.get('X-Originating-IP', ''),
'dkim': 'DKIM-Signature' in msg,
'spf': 'Received-SPF' in email_raw,
'dmarc': 'DMARC' in email_raw.upper()
}
# Estrai URL dal body
body = msg.get_body(preferencelist=('plain', 'html'))
body_text = body.get_content() if body else ''
urls = re.findall(r'https?://[^\s<>"]+', body_text)
# Estrai allegati
attachments = []
for part in msg.iter_attachments():
attachments.append({
'filename': part.get_filename(),
'content_type': part.get_content_type(),
'size': len(part.get_payload(decode=True) or b''),
'content_b64': base64.b64encode(
part.get_payload(decode=True) or b''
).decode()
})
context.enrichment_data['email_headers'] = headers
context.enrichment_data['urls'] = list(set(urls)) # dedup
context.enrichment_data['attachments'] = attachments
context.add_evidence('email_headers', headers, 'email_parser')
context.add_evidence('extracted_urls', urls, 'email_parser')
self.logger.info(f"Estratti {len(urls)} URL e {len(attachments)} allegati")
return ActionResult.SUCCESS
class VirusTotalEnrichmentStep(PlaybookStep):
"""Analizza URL e hash allegati con VirusTotal."""
def __init__(self, vt_api_key: str):
super().__init__("virustotal_enrichment")
self.vt_api_key = vt_api_key
self.base_url = "https://www.virustotal.com/api/v3"
self.headers = {"x-apikey": vt_api_key}
def execute(self, context: PlaybookContext) -> ActionResult:
vt_results = {}
# Analizza URL
for url in context.enrichment_data.get('urls', [])[:10]: # Max 10 URL
try:
result = self._check_url(url)
vt_results[url] = result
if result.get('malicious', 0) > 0:
context.add_evidence('malicious_url', {'url': url, 'vt': result}, 'virustotal')
except Exception as e:
self.logger.warning(f"VT check URL fallito per {url}: {e}")
context.enrichment_data['virustotal'] = vt_results
# Determina se e necessario containment immediato
malicious_count = sum(
1 for r in vt_results.values() if r.get('malicious', 0) > 2
)
if malicious_count > 0:
context.enrichment_data['vt_verdict'] = 'malicious'
self.logger.warning(f"Trovati {malicious_count} URL malevoli su VT")
else:
context.enrichment_data['vt_verdict'] = 'clean'
return ActionResult.SUCCESS
def _check_url(self, url: str) -> dict:
"""Controlla un URL su VirusTotal."""
import hashlib
url_id = base64.urlsafe_b64encode(url.encode()).decode().rstrip('=')
with httpx.Client() as client:
response = client.get(
f"{self.base_url}/urls/{url_id}",
headers=self.headers,
timeout=10
)
if response.status_code == 404:
# Submetti per analisi
post_response = client.post(
f"{self.base_url}/urls",
headers=self.headers,
data={"url": url},
timeout=10
)
return {"status": "submitted", "malicious": 0}
if response.status_code != 200:
return {"status": "error", "malicious": 0}
data = response.json()
stats = data.get('data', {}).get(
'attributes', {}
).get('last_analysis_stats', {})
return {
"malicious": stats.get('malicious', 0),
"suspicious": stats.get('suspicious', 0),
"harmless": stats.get('harmless', 0),
"undetected": stats.get('undetected', 0)
}
class TriageDecisionStep(PlaybookStep):
"""Decide l'azione basandosi sull'enrichment."""
def __init__(self):
super().__init__("triage_decision")
def execute(self, context: PlaybookContext) -> ActionResult:
vt_verdict = context.enrichment_data.get('vt_verdict', 'unknown')
headers = context.enrichment_data.get('email_headers', {})
# Score-based triage
risk_score = 0
# Fattori di rischio
if vt_verdict == 'malicious':
risk_score += 40
if not headers.get('dkim', True):
risk_score += 15
if not headers.get('spf', True):
risk_score += 15
if not headers.get('dmarc', True):
risk_score += 10
if context.enrichment_data.get('attachments'):
for att in context.enrichment_data['attachments']:
if any(ext in str(att.get('filename', '')).lower()
for ext in ['.exe', '.js', '.vbs', '.ps1', '.macro']):
risk_score += 20
context.enrichment_data['risk_score'] = risk_score
self.logger.info(f"Risk score calcolato: {risk_score}")
if risk_score >= 50:
context.enrichment_data['triage_result'] = 'high_risk'
return ActionResult.SUCCESS # Procedi con containment
elif risk_score >= 25:
context.enrichment_data['triage_result'] = 'medium_risk'
context.set_escalation(
f"Risk score {risk_score}: richiede review umana"
)
return ActionResult.ESCALATE
else:
context.enrichment_data['triage_result'] = 'low_risk'
return ActionResult.SUCCESS
class ContainPhishingStep(PlaybookStep):
"""Azioni di containment per phishing confermato."""
def __init__(self, exchange_client, ad_client):
super().__init__("contain_phishing")
self.exchange_client = exchange_client
self.ad_client = ad_client
def execute(self, context: PlaybookContext) -> ActionResult:
if context.enrichment_data.get('triage_result') != 'high_risk':
return ActionResult.SKIPPED
affected_user = context.alert_data.get('recipient_email', '')
actions = []
# 1. Rimuovi email simili da tutte le mailbox
if context.enrichment_data.get('urls'):
for url in context.enrichment_data['urls'][:3]:
result = self.exchange_client.search_and_delete(
sender=context.alert_data.get('sender_email', ''),
url_contains=url
)
actions.append({'type': 'email_deletion', 'url': url, 'result': result})
# 2. Blocca mittente in Exchange
sender = context.alert_data.get('sender_email', '')
if sender:
self.exchange_client.add_to_blocklist(sender)
actions.append({'type': 'sender_blocked', 'sender': sender})
# 3. Forza reset password se utente ha cliccato link
if context.alert_data.get('user_clicked_link', False):
self.ad_client.force_password_reset(affected_user)
self.ad_client.revoke_sessions(affected_user)
actions.append({
'type': 'password_reset',
'user': affected_user,
'reason': 'User clicked malicious link'
})
context.add_evidence('containment_actions', actions, 'soar_playbook')
self.logger.info(f"Containment completato: {len(actions)} azioni")
return ActionResult.SUCCESS
def build_phishing_playbook(vt_api_key: str,
exchange_client,
ad_client) -> Playbook:
"""Factory per il playbook phishing."""
return Playbook(
name="phishing_response",
steps=[
ExtractEmailArtifactsStep(),
VirusTotalEnrichmentStep(vt_api_key),
TriageDecisionStep(),
ContainPhishingStep(exchange_client, ad_client),
DocumentIncidentStep(), # Definito sotto
NotifyStakeholdersStep()
]
)
エンドポイント上のマルウェア プレイブック: 強化と分離
エンドポイントのマルウェア プレイブックは、より積極的に対応する必要があります: 隔離 エンドポイントの横方向の動きを防ぐために重要です。しかし、隔離する前に必要なことは、 可能な限り多くのフォレンジックデータを収集します。
# Playbook Malware Endpoint
# File: playbooks/malware_endpoint_playbook.py
class CollectForensicDataStep(PlaybookStep):
"""Raccoglie dati forensi dall'endpoint prima dell'isolamento."""
def __init__(self, edr_client):
super().__init__("collect_forensic_data")
self.edr = edr_client
def execute(self, context: PlaybookContext) -> ActionResult:
endpoint = context.alert_data.get('endpoint_hostname', '')
malicious_process = context.alert_data.get('process_name', '')
forensics = {}
# Processo malevolo
forensics['process_tree'] = self.edr.get_process_tree(
endpoint, malicious_process
)
# Network connections del processo
forensics['network_connections'] = self.edr.get_process_network(
endpoint, context.alert_data.get('pid')
)
# File system changes nelle ultime 2 ore
forensics['file_changes'] = self.edr.get_recent_file_changes(
endpoint, hours_back=2
)
# Autorun entries (persistence)
forensics['autoruns'] = self.edr.get_autoruns(endpoint)
# Memory dump del processo (se disponibile)
try:
forensics['memory_dump_path'] = self.edr.dump_process_memory(
endpoint, context.alert_data.get('pid')
)
except Exception as e:
self.logger.warning(f"Memory dump non disponibile: {e}")
context.enrichment_data['forensics'] = forensics
context.add_evidence('forensic_collection', forensics, 'edr')
return ActionResult.SUCCESS
class MalwareHashAnalysisStep(PlaybookStep):
"""Analizza hash del malware su multipli servizi TI."""
def __init__(self, vt_api_key: str, malware_bazaar_key: str):
super().__init__("malware_hash_analysis")
self.vt_key = vt_api_key
self.bazaar_key = malware_bazaar_key
def execute(self, context: PlaybookContext) -> ActionResult:
file_hash = context.alert_data.get('file_hash', '')
if not file_hash:
return ActionResult.SKIPPED
results = {}
# VirusTotal
with httpx.Client() as client:
vt_resp = client.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers={"x-apikey": self.vt_key},
timeout=15
)
if vt_resp.status_code == 200:
vt_data = vt_resp.json()['data']['attributes']
stats = vt_data.get('last_analysis_stats', {})
results['virustotal'] = {
'malicious': stats.get('malicious', 0),
'total': sum(stats.values()),
'family': vt_data.get('popular_threat_classification', {}).get(
'suggested_threat_label', 'unknown'
)
}
# MalwareBazaar
with httpx.Client() as client:
bazaar_resp = client.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_info", "hash": file_hash},
timeout=15
)
if bazaar_resp.status_code == 200:
bazaar_data = bazaar_resp.json()
if bazaar_data.get('query_status') == 'ok':
results['malware_bazaar'] = bazaar_data.get('data', [{}])[0]
context.enrichment_data['malware_analysis'] = results
# Aggiorna verdict
vt_malicious = results.get('virustotal', {}).get('malicious', 0)
if vt_malicious > 5:
context.enrichment_data['malware_verdict'] = 'confirmed_malware'
elif vt_malicious > 0:
context.enrichment_data['malware_verdict'] = 'suspicious'
else:
context.enrichment_data['malware_verdict'] = 'unknown'
return ActionResult.SUCCESS
class IsolateEndpointStep(PlaybookStep):
"""Isola l'endpoint dalla rete per prevenire lateral movement."""
def __init__(self, edr_client, cmdb_client):
super().__init__("isolate_endpoint")
self.edr = edr_client
self.cmdb = cmdb_client
def execute(self, context: PlaybookContext) -> ActionResult:
verdict = context.enrichment_data.get('malware_verdict', 'unknown')
if verdict not in ['confirmed_malware', 'suspicious']:
return ActionResult.SKIPPED
endpoint = context.alert_data.get('endpoint_hostname', '')
# Verifica criticalita dell'asset prima dell'isolamento
asset_info = self.cmdb.get_asset_info(endpoint)
if asset_info.get('criticality') == 'critical':
# Non isolare automaticamente asset critici
context.set_escalation(
f"Endpoint '{endpoint}' e critico (tipo: {asset_info.get('type')}). "
f"Isolamento richiede approvazione manuale."
)
return ActionResult.ESCALATE
# Isola l'endpoint
isolation_result = self.edr.isolate_endpoint(
endpoint,
reason=f"Malware detection - alert {context.alert_id}",
allow_edr_communication=True # Mantieni canale per remediation
)
context.add_evidence('isolation', {
'endpoint': endpoint,
'result': isolation_result,
'timestamp': datetime.now().isoformat()
}, 'edr')
self.logger.info(f"Endpoint {endpoint} isolato: {isolation_result}")
return ActionResult.SUCCESS
自動ドキュメントとチケット作成
成熟したプレイブックはインシデント文書を自動的に生成し、チケットを入力します。 すべてのエンリッチメント データ、実行されたアクション、および修復の推奨事項が含まれます。 これにより、アナリストの時間の 40% を占めることが多い「レポート作成のオーバーヘッド」が排除されます。
# Documentazione Automatica
# File: steps/documentation_step.py
import jinja2
INCIDENT_REPORT_TEMPLATE = """
## Incident Report - {{ context.alert_id }}
**Status**: {{ context.status.value }}
**Start Time**: {{ context.start_time.strftime('%Y-%m-%d %H:%M:%S') }}
**Alert Type**: {{ context.alert_type }}
### Summary
{{ summary }}
### Risk Assessment
- **Risk Score**: {{ context.enrichment_data.get('risk_score', 'N/A') }}
- **Verdict**: {{ context.enrichment_data.get('vt_verdict', context.enrichment_data.get('malware_verdict', 'N/A')) }}
- **Triage Result**: {{ context.enrichment_data.get('triage_result', 'N/A') }}
### Threat Intelligence
{% if context.enrichment_data.get('virustotal') %}
**VirusTotal**: {{ context.enrichment_data.virustotal | tojson(indent=2) }}
{% endif %}
### Actions Taken
{% for action in context.actions_taken %}
- **{{ action.timestamp }}** - {{ action.action }}: {{ action.result }}
{% if action.details %} Details: {{ action.details }}{% endif %}
{% endfor %}
### Evidence Collected
{% for ev in context.evidence %}
- **{{ ev.type }}** (from {{ ev.source }}): {{ ev.timestamp }}
{% endfor %}
{% if context.escalation_reason %}
### Escalation Required
**Reason**: {{ context.escalation_reason }}
**Recommended Actions**:
1. Validate analyst judgment on enrichment data
2. Confirm containment or determine alternative
3. Initiate full forensic investigation if warranted
{% endif %}
### Indicators of Compromise
{% if context.enrichment_data.get('urls') %}
**Malicious URLs**:
{% for url in context.enrichment_data.urls %}
- {{ url }}
{% endfor %}
{% endif %}
"""
class DocumentIncidentStep(PlaybookStep):
"""Crea documentazione strutturata dell'incidente."""
def __init__(self, thehive_client=None):
super().__init__("document_incident")
self.thehive = thehive_client
self.template = jinja2.Template(INCIDENT_REPORT_TEMPLATE)
def execute(self, context: PlaybookContext) -> ActionResult:
# Genera summary basato sul tipo di alert
summary = self._generate_summary(context)
# Renderizza il report
report = self.template.render(
context=context,
summary=summary
)
context.enrichment_data['incident_report'] = report
# Crea/aggiorna caso in TheHive se disponibile
if self.thehive:
try:
case_id = context.alert_data.get('thehive_case_id')
if case_id:
self.thehive.update_case(
case_id,
description=report,
tags=self._extract_tags(context)
)
else:
new_case_id = self.thehive.create_case({
'title': f"[SOAR] {context.alert_type} - {context.alert_id}",
'description': report,
'severity': self._determine_severity(context),
'tags': self._extract_tags(context)
})
context.enrichment_data['thehive_case_id'] = new_case_id
except Exception as e:
self.logger.error(f"Errore creazione caso TheHive: {e}")
return ActionResult.SUCCESS
def _generate_summary(self, context: PlaybookContext) -> str:
alert_type = context.alert_type
if alert_type == 'phishing':
return (
f"Email di phishing rilevata con {len(context.enrichment_data.get('urls', []))} "
f"URL sospetti. Verdict VirusTotal: {context.enrichment_data.get('vt_verdict', 'N/A')}."
)
elif alert_type == 'malware_endpoint':
endpoint = context.alert_data.get('endpoint_hostname', 'N/A')
return (
f"Malware rilevato su {endpoint}. "
f"Verdict: {context.enrichment_data.get('malware_verdict', 'N/A')}."
)
return "Incidente di sicurezza processato automaticamente."
def _extract_tags(self, context: PlaybookContext) -> list[str]:
tags = [f"soar-auto", f"type:{context.alert_type}"]
if context.status == PlaybookStatus.ESCALATED:
tags.append("needs-human-review")
verdict = context.enrichment_data.get('vt_verdict') or \
context.enrichment_data.get('malware_verdict')
if verdict:
tags.append(f"verdict:{verdict}")
return tags
def _determine_severity(self, context: PlaybookContext) -> int:
score = context.enrichment_data.get('risk_score', 0)
if score >= 50:
return 3 # High
elif score >= 25:
return 2 # Medium
return 1 # Low
Playbook のテストとバージョン管理
SOAR プレイブックは、アプリケーション コードと同じ厳密さでテストする必要があります。 本番環境でプレイブックに欠陥があると、過剰な封じ込め (ホストの分離) が発生する可能性があります。 合法的)または不十分な封じ込め(実際の脅威に対する行動の失敗)。
# Testing Framework per SOAR Playbook
# File: tests/test_phishing_playbook.py
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
class MockExchangeClient:
def __init__(self):
self.blocked_senders = []
self.deleted_emails = []
def add_to_blocklist(self, sender: str) -> None:
self.blocked_senders.append(sender)
def search_and_delete(self, sender: str, url_contains: str) -> dict:
self.deleted_emails.append({"sender": sender, "url": url_contains})
return {"deleted_count": 3, "status": "success"}
class MockADClient:
def __init__(self):
self.password_resets = []
self.revoked_sessions = []
def force_password_reset(self, user: str) -> None:
self.password_resets.append(user)
def revoke_sessions(self, user: str) -> None:
self.revoked_sessions.append(user)
class MockVirusTotalClient:
def __init__(self, malicious_urls: list[str] = None):
self.malicious_urls = malicious_urls or []
def check_url(self, url: str) -> dict:
if url in self.malicious_urls:
return {"malicious": 15, "suspicious": 3, "harmless": 0}
return {"malicious": 0, "suspicious": 0, "harmless": 50}
class TestPhishingPlaybook:
def setup_method(self):
self.exchange_client = MockExchangeClient()
self.ad_client = MockADClient()
@pytest.fixture
def high_risk_phishing_alert(self) -> dict:
return {
"alert_id": "test-001",
"alert_type": "phishing",
"sender_email": "attacker@evil.com",
"recipient_email": "victim@company.com",
"user_clicked_link": True,
"email_raw": """From: attacker@evil.com
To: victim@company.com
Subject: Urgente: Aggiorna le tue credenziali
Clicca qui: http://malicious-phish.com/steal-creds
""",
"thehive_case_id": None
}
def test_phishing_high_risk_containment(self, high_risk_phishing_alert):
"""Test: phishing ad alto rischio deve triggherare containment."""
with patch('httpx.Client') as mock_http:
# Mocka VirusTotal response con URL malevolo
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"data": {
"attributes": {
"last_analysis_stats": {
"malicious": 15, "suspicious": 2,
"harmless": 0, "undetected": 5
}
}
}
}
mock_http.return_value.__enter__.return_value.get.return_value = mock_response
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
context = playbook.run(
alert_id=high_risk_phishing_alert['alert_id'],
alert_type=high_risk_phishing_alert['alert_type'],
alert_data=high_risk_phishing_alert
)
# Verifica stato finale
assert context.status in [PlaybookStatus.COMPLETED, PlaybookStatus.ESCALATED]
# Se completato, verifica azioni intraprese
if context.status == PlaybookStatus.COMPLETED:
action_names = [a['action'] for a in context.actions_taken]
assert 'virustotal_enrichment' in action_names
assert 'triage_decision' in action_names
def test_low_risk_no_containment(self):
"""Test: phishing a basso rischio non deve triggherare containment."""
alert = {
"alert_id": "test-002",
"alert_type": "phishing",
"sender_email": "newsletter@legit.com",
"recipient_email": "user@company.com",
"user_clicked_link": False,
"email_raw": "From: newsletter@legit.com\nSubject: News\n\nHello!"
}
playbook = build_phishing_playbook(
vt_api_key="test-key",
exchange_client=self.exchange_client,
ad_client=self.ad_client
)
with patch('httpx.Client'):
context = playbook.run("test-002", "phishing", alert)
# Nessun isolamento su basso rischio
assert len(self.exchange_client.blocked_senders) == 0
assert len(self.ad_client.password_resets) == 0
ベスト プラクティス: ドライラン ハンドブック
本番環境にデプロイする前に、各 Playbook をドライラン モードで 2 週間実行します。 ドライランでは、エンリッチメントと意思決定のすべてのステップが実行されますが、封じ込めアクションはスキップされます。 実際 (隔離、ブロック、パスワードのリセット)、彼が行うであろうことのみを記録します。 これにより、リスクを負わずにトリアージ ロジックを調整できます。
結論と重要なポイント
Python の SOAR プレイブックは、現代のインシデント対応の最前線を表します。 セキュリティ統合のエコシステムを備えた汎用言語の柔軟性 利用可能な最も豊富な。モジュール式フレームワークにより、再利用性、テスト容易性、および 長期にわたるメンテナンス性。
重要なポイント
- モジュラー アーキテクチャ (ステップ + プレイブック + コンテキスト) は、SOAR エンタープライズの基礎です
- 正確なトリアージの決定には、マルチソースのエンリッチメント (VT、EDR、CMDB、AD) が重要です
- 重要な資産の自動エスカレーションにより、過剰な封じ込めによる巻き添え被害を防止します
- 自動文書化によりレポートのオーバーヘッドが排除されます (アナリスト時間の 40%)
- 模擬クライアントを使用したテストにより、実稼働システムに影響を与えることなく品質を保証します
- ドライラン モードは、本番稼働前にトリアージ ロジックを調整するために不可欠です
関連記事
- アラートトリアージの自動化: グラフ分析による MTTD の削減
- 脅威インテリジェンスの取り込み: STIX/TAXII フィード プロセッサ
- 専門分野としての検出エンジニアリング: スクリプトからパイプラインまで







