Saga パターン: コレオグラフィーとオーケストレーションを使用した分散トランザクション
マイクロサービスには次のようなものはありません BEGIN TRANSACTION ... COMMIT サービスの境界を越える。
オペレーションに注文サービス、在庫サービス、支払いサービスが含まれる場合、
3 つすべてが成功するか、どれも効果がないことを確認するにはどうすればよいでしょうか?
の サーガパターン が答えです。各サービスは独自のローカル トランザクションを実行します。
そして、失敗した場合には、 補償取引 すでに行われたことを元に戻すこと。
2PC がマイクロサービスで機能しない理由
Il 2フェーズコミット(2PC) 分散トランザクションの古典的なプロトコルです。 コーディネーターは参加者全員に準備をするよう求めます (準備段階)、コミットを命令します。 これは同種のデータベース (例: 2 つの PostgreSQL インスタンス) ではうまく機能しますが、マイクロサービスでは問題が発生します。
- カップリング: トランザクション中はすべてのサービスが利用可能である必要があります
- パフォーマンス: リソースはコミットされるまでロックされたままになります (分散ロック)
- 限定的なサポート: 多くのクラウド サービス、REST API、NoSQL データベースは 2PC をサポートしていません。
- スケーラビリティ: コーディネーターが単一障害点になる
Saga パターンは、1 つの大規模な分散トランザクションを 1 つのトランザクションに置き換えます。 ローカルトランザクションのシーケンス、 それぞれが独立して完了し、イベントやオーケストレーターによって調整されます。
例: 電子商取引の購買佐賀
Saga の購入手順:
- 注文の作成 (注文サービス) → OrdineCreato
- 在庫を予約する (在庫サービス) → 予約在庫
- 支払いの処理 (決済サービス) → 入金確認済み
- 注文の確認 (オーダーサービス)→注文確定
支払いが失敗した場合の補償取引は次のとおりです。
- リリースインベントリ (在庫サービス)
- 注文をキャンセルする (オーダーサービス)
コレオグラフィー サーガ: サービスの自律性
Nel 振付サーガ、中央のオーケストレーターは存在しません。 各サービスは、前のサービスによって生成されたイベントをリッスンし、それに応じて反応します。 そして新たな出来事を生み出します。調整ロジックは分散されています サービス間: 各サービスは、自分自身のステップと、それが反応するイベントのみを知っています。
プロ: 最大限のデカップリング、単一障害点がなく、実装が簡単です。 に対して: デバッグが困難 (saga ロジックが分散されている)、可視性がない この物語の現状の中心となるのは、管理が不十分な出来事が繰り返されるリスクです。
// CHOREOGRAPHY SAGA con Spring Kafka
// ===== STEP 1: Servizio Ordini - crea l'ordine =====
@Service
public class OrdiniService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public String creaOrdine(CreaOrdineCommand cmd) {
String ordineId = UUID.randomUUID().toString();
// Salva l'ordine in stato PENDING
Ordine ordine = Ordine.builder()
.id(ordineId)
.clienteId(cmd.getClienteId())
.prodotti(cmd.getProdotti())
.stato(StatoOrdine.PENDING)
.build();
ordineRepository.save(ordine);
// Pubblica evento: il Servizio Inventario lo ascolterà
OrdineCreato event = new OrdineCreato(ordineId, cmd.getProdotti());
kafkaTemplate.send("ordini-events", ordineId, event);
return ordineId;
}
// Ascolta la conferma del pagamento per finalizzare l'ordine
@KafkaListener(topics = "pagamenti-events", groupId = "ordini-service")
public void onPagamento(ConsumerRecord<String, Object> record) {
if (record.value() instanceof PagamentoConfermato event) {
ordineRepository.updateStato(event.getOrdineId(), StatoOrdine.CONFERMATO);
kafkaTemplate.send("ordini-events", event.getOrdineId(),
new OrdineConfermato(event.getOrdineId()));
}
if (record.value() instanceof PagamentoFallito event) {
// Compensating: annulla l'ordine
ordineRepository.updateStato(event.getOrdineId(), StatoOrdine.ANNULLATO);
}
}
}
// ===== STEP 2: Servizio Inventario - riserva i prodotti =====
@Service
public class InventarioService {
@KafkaListener(topics = "ordini-events", groupId = "inventario-service")
public void onOrdinCreato(ConsumerRecord<String, Object> record) {
if (record.value() instanceof OrdineCreato event) {
try {
// Tenta la prenotazione dell'inventario
prenotaInventario(event.getOrdineId(), event.getProdotti());
// Successo: pubblica evento per il Servizio Pagamenti
kafkaTemplate.send("inventario-events", event.getOrdineId(),
new InventarioRiservato(event.getOrdineId(), event.getTotale()));
} catch (InventarioInsufficienterException e) {
// Fallimento: pubblica evento di fallimento
// Il Servizio Ordini annullerà l'ordine
kafkaTemplate.send("inventario-events", event.getOrdineId(),
new InventarioNonDisponibile(event.getOrdineId(), e.getMessage()));
}
}
// Compensating: rilascia l'inventario se il pagamento fallisce
if (record.value() instanceof PagamentoFallito event) {
rilasciaInventario(event.getOrdineId());
}
}
}
// ===== STEP 3: Servizio Pagamenti - processa il pagamento =====
@Service
public class PagamentiService {
@KafkaListener(topics = "inventario-events", groupId = "pagamenti-service")
public void onInventarioRiservato(ConsumerRecord<String, Object> record) {
if (record.value() instanceof InventarioRiservato event) {
try {
processaPagamento(event.getOrdineId(), event.getTotale());
kafkaTemplate.send("pagamenti-events", event.getOrdineId(),
new PagamentoConfermato(event.getOrdineId()));
} catch (PagamentoRifiutatoException e) {
kafkaTemplate.send("pagamenti-events", event.getOrdineId(),
new PagamentoFallito(event.getOrdineId(), e.getMessage()));
}
}
}
}
オーケストレーションの物語: 集中管理
で」オーケストレーション サーガと呼ばれる中心的なコンポーネント サガ・オーケストレーター (またはプロセスマネージャー) がシーケンス全体を調整します。オーケストレーターはすべての手順を知っており、 コマンドをサービスに送信し、その応答イベントを待ちます。失敗した場合には、 オーケストレーターは補正コマンドを明示的に送信します。
プロ: 集中化された可視ロジック、デバッグが容易、明示的なサガ状態、 より正確なエラー処理。 に対して:メジャーカップリング、 オーケストレーターは、関係するすべてのサービスを知っています。
// ORCHESTRATION SAGA con Axon Framework (Java)
// Axon gestisce automaticamente la persistenza dello stato della saga
import org.axonframework.saga.*;
import org.axonframework.eventhandling.*;
@Saga
public class AcquistoSaga {
@Autowired
private transient CommandGateway commandGateway;
// Lo stato della saga persiste automaticamente tra gli step
private String ordineId;
private List<ProdottoOrdinato> prodotti;
private BigDecimal totale;
// STEP 1: Avvio della saga quando viene creato un ordine
@StartSaga
@SagaEventHandler(associationProperty = "ordineId")
public void on(OrdineCreato event) {
this.ordineId = event.getOrdineId();
this.prodotti = event.getProdotti();
this.totale = event.getTotale();
// Invia comando al servizio inventario
commandGateway.send(new RiservaInventarioCommand(
ordineId,
prodotti
));
}
// STEP 2: Inventario riservato con successo - procedi con il pagamento
@SagaEventHandler(associationProperty = "ordineId")
public void on(InventarioRiservato event) {
commandGateway.send(new ProcessaPagamentoCommand(
ordineId,
totale,
"CARTA_CREDITO"
));
}
// STEP 3: Pagamento confermato - finalizza l'ordine
@SagaEventHandler(associationProperty = "ordineId")
public void on(PagamentoConfermato event) {
commandGateway.send(new ConfermaOrdineCommand(ordineId));
}
// Fine saga (successo)
@EndSaga
@SagaEventHandler(associationProperty = "ordineId")
public void on(OrdineConfermato event) {
System.out.println("Saga completata con successo per ordine: " + ordineId);
}
// ===== COMPENSATING TRANSACTIONS =====
// Pagamento fallito: rilascia inventario, poi annulla ordine
@SagaEventHandler(associationProperty = "ordineId")
public void on(PagamentoFallito event) {
// Compensating step 1: rilascia inventario
commandGateway.send(new RilasciaInventarioCommand(ordineId, prodotti));
}
// Inventario rilasciato - annulla l'ordine
@SagaEventHandler(associationProperty = "ordineId")
public void on(InventarioRilasciato event) {
// Compensating step 2: annulla l'ordine
commandGateway.send(new AnnullaOrdineCommand(ordineId, "Pagamento fallito"));
}
// Inventario non disponibile - annulla direttamente
@SagaEventHandler(associationProperty = "ordineId")
public void on(InventarioNonDisponibile event) {
commandGateway.send(new AnnullaOrdineCommand(ordineId, "Inventario insufficiente"));
}
// Fine saga per percorso di fallimento
@EndSaga
@SagaEventHandler(associationProperty = "ordineId")
public void on(OrdineAnnullato event) {
System.out.println("Saga fallita/annullata per ordine: " + ordineId + ". Motivo: " + event.getMotivo());
}
}
AWS Step Functions を使用した佐賀 (サーバーレス)
AWS 上のサーバーレス アーキテクチャの場合、 ステップ関数 それはマネージドサービスです オーケストレーション サーガを実装します。各ステップは Lambda 関数または AWS アクションです。 Step Functions は定義により状態、再試行、補正を自動的に管理します アマゾン州言語 (ASL) で。
{
"Comment": "Saga acquisto e-commerce con compensating transactions",
"StartAt": "CreaOrdine",
"States": {
"CreaOrdine": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:crea-ordine",
"Next": "RiservaInventario",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "FallimentoSaga"
}
]
},
"RiservaInventario": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:riserva-inventario",
"Next": "ProcessaPagamento",
"Catch": [
{
"ErrorEquals": ["InventarioInsufficienterException"],
"Next": "CompensaAnnullaOrdine"
}
]
},
"ProcessaPagamento": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:processa-pagamento",
"Retry": [
{
"ErrorEquals": ["TransientPaymentError"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Next": "ConfermaOrdine",
"Catch": [
{
"ErrorEquals": ["PagamentoRifiutatoException"],
"Next": "CompensaRilasciaInventario"
}
]
},
"ConfermaOrdine": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:conferma-ordine",
"End": true
},
"CompensaRilasciaInventario": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:rilascia-inventario",
"Next": "CompensaAnnullaOrdine"
},
"CompensaAnnullaOrdine": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456:function:annulla-ordine",
"Next": "FallimentoSaga"
},
"FallimentoSaga": {
"Type": "Fail",
"Error": "SagaFailed",
"Cause": "La transazione distribuita e fallita, compensazioni eseguite"
}
}
}
補償トランザクションにおける冪等性
補償取引は次のとおりである必要があります。 冪等: 複数回呼び出された場合 (ネットワークまたはオーケストレーターの再試行のため)、結果は同じである必要があります。 最も一般的なテクニックは、 冪等性キー: orderId + 操作タイプは、操作の重複を防ぐ一意のキーを形成します。
// RilasciaInventarioHandler.java - Compensating transaction idempotente
@CommandHandler
public void handle(RilasciaInventarioCommand cmd) {
// Controllo idempotenza: questa compensazione e gia stata eseguita?
String idempotencyKey = cmd.getOrdineId() + ":RILASCIA_INVENTARIO";
if (compensazioniEseguite.contains(idempotencyKey)) {
System.out.println("Compensazione gia eseguita per " + idempotencyKey + ", skip");
return;
}
// Esegui la compensazione
for (ProdottoOrdinato prodotto : cmd.getProdotti()) {
inventarioRepository.incrementaDisponibilita(
prodotto.getProductId(),
prodotto.getQuantita()
);
}
// Segna la compensazione come eseguita
compensazioniEseguite.add(idempotencyKey);
// Pubblica evento per continuare la catena di compensazione
eventBus.publish(new InventarioRilasciato(cmd.getOrdineId()));
}
佐賀州の追跡: 州の可視性
オーケストレーション サーガに対する批判の 1 つは、状態を追跡するのが難しいことです。 大容量システムで。明示的なサガ追跡テーブルにより、この問題が解決されます。
-- Tabella di tracking per le saga in corso
CREATE TABLE saga_state (
saga_id VARCHAR(36) PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
ordine_id VARCHAR(36) NOT NULL,
current_step VARCHAR(100) NOT NULL,
stato VARCHAR(50) NOT NULL, -- IN_CORSO, COMPLETATA, FALLITA, COMPENSATA
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
error_message TEXT
);
-- Query utili per monitoring
-- Saga in corso da piu di 5 minuti (potenzialmente bloccate)
SELECT * FROM saga_state
WHERE stato = 'IN_CORSO'
AND started_at < NOW() - INTERVAL '5 minutes';
-- Distribuzione per step corrente
SELECT current_step, COUNT(*) as count
FROM saga_state WHERE stato = 'IN_CORSO'
GROUP BY current_step;
比較: コリオグラフィーとオーケストレーション
| 待ってます | 振付 | オーケストレーション |
|---|---|---|
| カップリング | 低 (イベント) | 中 (オーケストレーターはサービスを知っています) |
| ステータスの可視性 | ハード (分散) | 高 (集中型) |
| デバッグ | ハード (分散ロジック) | 簡単 (明示的な状態) |
| サービスの複雑さ | 高い(各サービスが補償を知っています) | 低 (単純なサービス、オーケストレーターのロジック) |
| スケーラビリティ | 高 (単一ポイントなし) | 中 (規模に応じたオーケストレーター) |
| 理想的な使用例 | 少ないステップ、自律的なサービス、独立したチーム | 多くのステップ、複雑なロジック、必要な可視性 |
シリーズの次のステップ
- 第 6 条 – AWS EventBridge: Choreography Saga はメッセージ バスを使用して、 交流イベント。 EventBridge は、インテリジェントなルーティングに最適な AWS 管理のバスです Lambda と SQS の間のイベントの数。
- 第 10 条 – 送信ボックスのパターン: 佐賀イベントのアトミックなディスパッチを確保 ローカルデータベースの更新も一緒に行います。
他シリーズとの連携
- イベントソーシング + CQRS (第 4 条): サーガの各ステップがイベントを生み出します これは、集合状態を再構築するための信頼できる情報源として使用できます。 Axon Framework は、Saga + Event Sourcing の組み合わせをネイティブにサポートしています。
- Apache Kafka (シリーズ 38): Kafka は理想的なメッセージ バスです。 コレオグラフィー・サーガは制作中、耐久性と順序性は保証されています キーパーティションごとに。







