CQRS: 独立したスケーリングのための個別の読み取りと書き込み
従来の REST API は、同じモデルを使用してデータの読み取りと書き込みを行います。しかし、ニーズは 読み取りと書き込みは多くの場合大きく異なります。書き込みは一貫性を保証する必要があり、 複雑なビジネスルールを検証する。読み取りは高速で、スケーラブルで、返される必要があります UI に最適化された形式のデータ。両方のニーズを満たすようにしてください 同じモデルは侵害につながります: ビューに対する複雑なクエリ、または「肥大化した」モデル あらゆる種類の読書をサポートします。
CQRS (コマンドクエリ責任分離) を明示的に分離します ライティングモデル (コマンド側) 読み取りモデルから (クエリ側)。 コマンド側は、状態を変更する操作 (作成、更新、削除) を管理します。 クエリ側は、通常は最適化されたデータ モデルを使用して読み取り操作を処理します。 特定のアプリケーション ビューの場合。 2 つの間の同期は次の方法で行われます。 非同期イベントまたはプロジェクション。
何を学ぶか
- CQRS アーキテクチャ: コマンド側、クエリ側、非同期同期
- 検証を使用して TypeScript でコマンド ハンドラーを実装する
- プロジェクション: ビューに最適化された読み取りモデルを構築する
- イベントによる非同期同期 (イベントソーシング + CQRS)
- イベント ソーシングを使用しない CQRS: 簡略化されたハイブリッド モデル
- 独立した読み取りおよび書き込みスケーリング
- トレードオフ: 一貫性と運用の複雑さの可能性
CQRS アーキテクチャ
CQRS では、各操作は または です。 指示 (ステータス変更) または 1 つ クエリ (状態を変更せずに読み取ります)。両方を同時には決してしないでください 方法 (Bertrand Meyer の CQS 原則):
// WRONG: un metodo che fa sia command che query
async function reserveInventory(productId: string, quantity: number): Promise<Stock> {
const stock = await db.getStock(productId);
stock.reserved += quantity; // MODIFICA
await db.saveStock(stock);
return stock; // LETTURA
}
// RIGHT CQRS: separa command e query
async function reserveInventory(productId: string, quantity: number): Promise<void> {
// Command: solo modifica, nessun ritorno di stato
const stock = await stockRepo.findById(productId);
stock.reserve(quantity);
await stockRepo.save(stock);
// Pubblica evento per aggiornare il read model
await eventBus.publish(new InventoryReservedEvent(productId, quantity));
}
async function getStockLevel(productId: string): Promise<StockLevelDto> {
// Query: legge dal read model ottimizzato, ZERO side effects
return await stockReadModel.findById(productId);
}
コマンド側: コマンド管理
コマンド側はコマンド(意図を記述する不変オブジェクト)を受け取ります。 有効であり、集約上でビジネス ロジックを実行し、イベントを発行します。パターン コマンドハンドラー そしてこのフローを調整するコンポーネント:
// ---- COMMANDS ----
// Ogni command e un DTO immutabile
class PlaceOrderCommand {
constructor(
public readonly orderId: string,
public readonly customerId: string,
public readonly items: ReadonlyArray<{
productId: string;
quantity: number;
}>,
public readonly shippingAddress: Readonly<Address>
) {}
}
// ---- COMMAND HANDLER ----
class PlaceOrderCommandHandler {
constructor(
private readonly orderRepo: OrderRepository,
private readonly productRepo: ProductRepository,
private readonly eventBus: EventBus
) {}
async handle(command: PlaceOrderCommand): Promise<void> {
// 1. Validazione: prodotti esistono?
const products = await Promise.all(
command.items.map((item) => this.productRepo.findById(item.productId))
);
if (products.some((p) => p === null)) {
throw new Error('One or more products not found');
}
// 2. Crea l'Aggregate (logica business)
const order = new OrderAggregate();
order.create(command.orderId, command.customerId);
for (let i = 0; i < command.items.length; i++) {
const product = products[i]!;
order.addItem(
command.items[i].productId,
command.items[i].quantity,
product.price
);
}
order.confirm();
// 3. Persisti gli eventi generati dall'Aggregate
await this.orderRepo.save(order);
// 4. Pubblica gli eventi per aggiornare il read model e notificare altri servizi
const events = order.getUncommittedEvents();
await this.eventBus.publishAll(events);
}
}
// ---- COMMAND BUS ----
// Dispatcher che instrada i command all'handler corretto
class CommandBus {
private handlers = new Map<string, (cmd: unknown) => Promise<void>>();
register<T>(commandType: string, handler: (cmd: T) => Promise<void>): void {
this.handlers.set(commandType, handler as (cmd: unknown) => Promise<void>);
}
async dispatch<T>(commandType: string, command: T): Promise<void> {
const handler = this.handlers.get(commandType);
if (!handler) {
throw new Error(`No handler registered for command: ${commandType}`);
}
await handler(command);
}
}
// Setup
const commandBus = new CommandBus();
commandBus.register('PlaceOrder', (cmd: PlaceOrderCommand) =>
placeOrderHandler.handle(cmd)
);
クエリ側: モデルとプロジェクションの読み取り
クエリ側は、 モデルの読み取り: データの表現 特定のアプリケーションのクエリ向けに最適化されています。コマンド側が動作する間、 集計、クエリ側は、次のように構築された非正規化ビューを操作します。 予測.
// ---- READ MODEL ----
// Viste denormalizzate ottimizzate per le query dell'UI
// View per la lista ordini: include dati del cliente + totale + stato
interface OrderListItemReadModel {
orderId: string;
customerName: string; // denormalizzato (non serve join)
customerEmail: string;
totalAmount: number;
currency: string;
status: string;
itemCount: number;
placedAt: string;
}
// View per il dettaglio ordine
interface OrderDetailReadModel {
orderId: string;
customerId: string;
customerName: string;
items: Array<{
productId: string;
productName: string; // denormalizzato
quantity: number;
unitPrice: number;
subtotal: number;
}>;
totalAmount: number;
shippingAddress: Address;
status: string;
statusHistory: Array<{ status: string; changedAt: string }>;
estimatedDelivery?: string;
}
// ---- PROIEZIONE ----
// Aggiorna il read model in risposta agli eventi del command side
class OrderReadModelProjection {
constructor(
private readonly readModelDb: ReadModelDatabase,
private readonly customerRepo: CustomerReadRepository
) {}
// Quando un ordine viene confermato, crea/aggiorna le view nel read model
async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
// Carica i dati aggiuntivi necessari per la view denormalizzata
const customer = await this.customerRepo.findById(event.customerId);
const orderState = await this.orderRepo.findById(event.orderId);
// Inserisce/aggiorna la view della lista ordini
await this.readModelDb.upsert('order_list_view', {
order_id: event.orderId,
customer_name: customer.fullName,
customer_email: customer.email,
total_amount: orderState.totalAmount,
currency: orderState.currency,
status: 'Confirmed',
item_count: orderState.items.size,
placed_at: orderState.createdAt,
});
// Inserisce la view di dettaglio
const itemsWithNames = await this.enrichItemsWithProductNames(orderState.items);
await this.readModelDb.upsert('order_detail_view', {
order_id: event.orderId,
// ... tutti i campi della view dettaglio
items: JSON.stringify(itemsWithNames),
status_history: JSON.stringify([
{ status: 'Confirmed', changedAt: event.occurredAt }
]),
});
}
async onOrderCancelled(event: OrderCancelledEvent): Promise<void> {
// Aggiorna solo lo status nella view
await this.readModelDb.update('order_list_view',
{ order_id: event.orderId },
{ status: 'Cancelled' }
);
// Aggiungi alla status history nella view dettaglio
await this.readModelDb.appendToJsonArray('order_detail_view',
{ order_id: event.orderId },
'status_history',
{ status: 'Cancelled', changedAt: event.occurredAt }
);
}
}
// ---- QUERY HANDLER ----
class OrderQueryHandler {
constructor(private readonly readModelDb: ReadModelDatabase) {}
// Query velocissima sul read model denormalizzato
async getOrderList(customerId: string, page: number, pageSize: number):
Promise<OrderListItemReadModel[]>
{
return this.readModelDb.query(
`SELECT * FROM order_list_view
WHERE customer_id = $1
ORDER BY placed_at DESC
LIMIT $2 OFFSET $3`,
[customerId, pageSize, page * pageSize]
);
}
async getOrderDetail(orderId: string): Promise<OrderDetailReadModel | null> {
return this.readModelDb.queryOne(
'SELECT * FROM order_detail_view WHERE order_id = $1',
[orderId]
);
}
}
イベントソーシングを使用しない CQRS
CQRS とイベント ソーシングは直交しています。これらはうまく連携しますが、使用することもできます。 別に。イベント ソーシングを使用しない単純化された CQRS モデルはデータベースを使用します。 書き込み用のメイン データベースと読み取り用の別のデータベース (またはマテリアライズド ビュー):
// CQRS semplificato: stesso database, modelli separati
// Write side usa le entita ORM normali
// Read side usa query SQL ottimizzate o viste materializzate
// View materializzata PostgreSQL per la lista ordini
CREATE MATERIALIZED VIEW order_list_view AS
SELECT
o.id AS order_id,
c.full_name AS customer_name,
c.email AS customer_email,
o.total_amount,
o.currency,
o.status,
COUNT(oi.id) AS item_count,
o.created_at AS placed_at
FROM orders o
JOIN customers c ON c.id = o.customer_id
LEFT JOIN order_items oi ON oi.order_id = o.id
GROUP BY o.id, c.full_name, c.email;
-- Refresh automatico della vista materializzata
CREATE INDEX idx_order_list_customer ON order_list_view (customer_email);
-- Con PostgreSQL 17: INCREMENTAL REFRESH (solo le righe cambiate)
-- REFRESH MATERIALIZED VIEW CONCURRENTLY order_list_view;
-- Query sul read model: 100x piu veloce di una query con JOIN
SELECT * FROM order_list_view
WHERE customer_email = 'mario@example.com'
ORDER BY placed_at DESC
LIMIT 20;
独立したスケーリング読み取りと書き込み
CQRS を使用すると、読み取り側と書き込み側を個別にスケーリングできます。システムの場合 書き込みよりも読み取りが 100 倍多い (電子商取引では一般的)、次のことが可能です。
- 書き込み側: PostgreSQL プライマリ データベースを備えた 2 つのインスタンス
- 読み取り面: Redis キャッシュ + 読み取り専用 PostgreSQL レプリケーションを備えた 10 個のインスタンス
// Architettura di scaling con Kubernetes
# command-side-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-command-service
spec:
replicas: 2 # write side: pochi ma consistenti
template:
spec:
containers:
- name: api
image: company/order-service:v1
env:
- name: DB_URL
value: "postgres://primary-db:5432/orders" # database primario
- name: SERVICE_MODE
value: "command"
---
# query-side-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-query-service
spec:
replicas: 10 # read side: molte repliche, stateless
template:
spec:
containers:
- name: api
image: company/order-service:v1
env:
- name: DB_URL
value: "postgres://read-replica:5432/orders" # replica read-only
- name: REDIS_URL
value: "redis://cache:6379"
- name: SERVICE_MODE
value: "query"
フロントエンドでの最終的な整合性を管理する
非同期同期を使用する CQRS では、短い期間 (通常はミリ秒) が発生します。 ここで、読み取りモデルは書き込み後にまだ更新されていません。フロントエンドが処理する必要がある これは正しく:
// Pattern 1: Optimistic Update
// Il frontend aggiorna l'UI immediatamente, senza aspettare la query
async function placeOrder(orderData: PlaceOrderDto) {
// 1. Ottimisticamente aggiorna l'UI locale
dispatch({ type: 'ADD_ORDER_OPTIMISTIC', order: { ...orderData, status: 'Pending' } });
try {
// 2. Invia il command al backend
const response = await api.placeOrder(orderData);
// 3. Dopo N ms, ricarica dal read model (che dovrebbe essere aggiornato)
setTimeout(async () => {
const updatedOrder = await api.getOrder(response.orderId);
dispatch({ type: 'UPDATE_ORDER', order: updatedOrder });
}, 500);
} catch (error) {
// 4. In caso di errore, reverta l'ottimistic update
dispatch({ type: 'REVERT_ORDER_OPTIMISTIC' });
throw error;
}
}
// Pattern 2: Poll finche il read model non e aggiornato
async function pollUntilUpdated(orderId: string, expectedStatus: string) {
const MAX_ATTEMPTS = 10;
const DELAY_MS = 200;
for (let i = 0; i < MAX_ATTEMPTS; i++) {
const order = await api.getOrder(orderId);
if (order.status === expectedStatus) return order;
await new Promise(resolve => setTimeout(resolve, DELAY_MS));
}
throw new Error('Read model not updated within expected time');
}
CQRS のトレードオフ
利点
- 独立したスケーリング: 実際の負荷に基づいて読み取り側と書き込み側を個別にスケールします
- 最適化されたテンプレート: 読み取りモデルはビューに対して正確に非正規化できるため、複雑なクエリが不要になります。
- 高い読み取りパフォーマンス: 読み取りモデルのクエリは、事前に計算されたテーブルに対する単純な SELECT です。
- 責任の分離: 書き込みコードと読み取りコードが互いに汚染しない
管理が複雑
- 可能な一貫性: 読み取りモデルは書き込みよりわずかに遅れる場合があります。フロントエンドがそれを処理する必要がある
- ロジックの重複: 一部の検証はコマンド ハンドラーと読み取りモデルの両方で行う必要がある場合があります。
- さらに展開するコンポーネント: コマンドサービス、クエリサービス、プロジェクション、読み取りモデルデータベース
- 単純な CRUD には適していません: CQRS の複雑さは、複雑なビジネス ロジックを持たない操作には価値がありません。
結論と次のステップ
CQRS は、読み取りおよび書き込み負荷のあるシステムにとって最も効果的なアーキテクチャの 1 つです とても違う。コマンド側の明示的な分離 (一貫性、検証済み、イベント駆動型) クエリ側から (高速、非正規化、スケーラブル)、単一モデルのトレードオフを解決します。
次の記事では、イベント ソーシングと CQRS を組み合わせます。予測がどのように行われるかを見ていきます。 イベント ストアから読み取り、読み取りモデルを構築し、再試行による投影を管理する方法 エラーが発生した場合、およびスナップショットを使用して読み取りモデルの再構築を最適化する方法 失敗した後。
イベント駆動型アーキテクチャ シリーズの今後の記事
関連シリーズ
- イベントソーシング — CQRS の自然な補数
- 実践的なソフトウェアアーキテクチャ — アーキテクチャ全体のコンテキストの中で CQRS をどこに配置するか







