イベント駆動設計
目次
- 概要
- イベント駆動設計とは何か
- イベント、コマンド、クエリ
- ブローカー、チャネル、コンシューマ
- 同期通信との違い
- イベントの設計
- 配信方式
- 冪等性
- 順序と重複
- エラーハンドリング
- イベントソーシング
- CQRS
- 観測可能性
- 使うべき場面
- イベントスキーマ
- アウトボックスパターン
- コンシューマ設計
- AsyncAPI仕様による非同期API設計
- AWS イベント駆動アーキテクチャ
- Microsoft Azure イベント駆動設計
- Reactive Manifesto と宣言的設計
- マイクロサービス通信パターン
- イベント駆動設計のベストプラクティス
- 実装基盤を選ぶときの整理
- イベント駆動設計のトレードオフ
- まとめ
- 参考文献
概要
出来事を中心にシステムをつなぐ
イベント駆動設計は、システム内で起きた出来事をイベントとして発行し、それに関心のある処理が購読する設計です。サービス同士を直接呼び合うのではなく、出来事を介して疎結合にします。
イベント駆動設計の中心は「何が起きたか」を明示し、後続処理を疎結合にすることです。ただし、非同期化は複雑さも増やすため、冪等性、順序、再試行、観測可能性を同時に設計します。
この章で重視すること
- イベント、コマンド、クエリを分ける
- 非同期化の利点と複雑さを両方見る
- 冪等性、重複、順序を最初から設計する
- ドメインイベントとメッセージング基盤を混同しない
イベント駆動設計とは何か
イベントは、過去に起きた事実です。
OrderCreated
PaymentFailed
InventoryReserved
UserRegistered
イベント駆動では、発行者は受信者を直接知りません。
AsyncAPIのイベント駆動アーキテクチャ解説では、イベントは「何かが起きた」ことを伝えるためのメッセージであり、サービス間の通信をブローカーやチャネルで整理します。RESTの同期リクエストとは違い、受信者が後から処理できる点が特徴です。
イベント、コマンド、クエリ
| 種類 | 意味 | 例 |
|---|---|---|
| コマンド | 何かをしてほしい | CreateOrder |
| イベント | 何かが起きた | OrderCreated |
| クエリ | 情報がほしい | GetOrder |
イベントを命令のように使うと、責務が曖昧になります。イベントは「起きた事実」として設計します。
AsyncAPIの文献でも、messageは構造としては共通でも、eventとcommandは概念として異なると説明されています。eventは「起きた事実」を通知し、commandは「何かをしてほしい」という依頼です。名前も過去形と命令形で分けると読みやすくなります。
| 種類 | 名前の例 | 意味 |
|---|---|---|
| event | OrderPlaced |
注文が確定した |
| command | ShipOrder |
注文を出荷してほしい |
| query | GetOrder |
注文を取得したい |
ブローカー、チャネル、コンシューマ
イベント駆動では、producer、broker、channel、consumerを分けて考えます。
brokerはmessageを受け取り、関心を示したsubscriberへ届ける基盤です。channelはtopic、routing key、event typeなどサービスや製品によって呼び名が違いますが、特定種類のmessageを流す場所として設計します。
channelに複数の意味を混ぜると、consumer側が不要なmessageを大量に受け取り、filterや分岐が増えます。
避けたい:
business-events
読みやすい:
order.placed
order.cancelled
invoice.issued
channel名は実装都合より、業務上の事実や関心に合わせます。
同期通信との違い
同期通信は結果がすぐ分かる一方、呼び出し先の遅延や障害を受けやすいです。
非同期イベントは呼び出し元を軽くできますが、結果整合性になります。
| 方式 | 利点 | 注意点 |
|---|---|---|
| 同期API | 分かりやすい、即時結果 | 密結合、障害伝播 |
| 非同期イベント | 疎結合、スケールしやすい | 遅延、重複、順序、追跡 |
イベントの設計
イベントには、最低限次の情報を入れます。
{
"eventId": "evt_123",
"eventType": "OrderCreated",
"occurredAt": "2026-04-29T10:00:00Z",
"version": 1,
"data": {
"orderId": "ord_123",
"customerId": "cus_456",
"totalAmount": 12000
}
}
設計上の注意:
- イベントIDを持つ
- 発生時刻を持つ
- スキーマバージョンを持つ
- 個人情報を入れすぎない
- 内部DB構造をそのまま出さない
AsyncAPIで契約を明文化する
イベントもAPIです。AsyncAPI Specificationは、非同期APIやイベント駆動アーキテクチャのメッセージ、チャネル、スキーマを記述するための仕様です。OpenAPIがHTTP APIの契約を文書化するように、AsyncAPIはイベントの契約を文書化します。
asyncapi: 3.0.0
info:
title: Order Events
version: 1.0.0
channels:
order.created:
address: order.created
messages:
OrderCreated:
payload:
type: object
properties:
orderId:
type: string
customerId:
type: string
イベント契約を明文化すると、発行側と購読側の認識ずれを減らせます。
AsyncAPI Specificationは、receive側の定義をsend側へ機械的に反転することを推奨していません。受信に使うchannelと送信に使うchannelが同じとは限らず、operation名やsummaryも意味が変わるためです。producerとconsumerは、それぞれの責務としてdocumentを持つ方が安全です。
Receiver document:
operation: onUserSignedUp
action: receive
Sender document:
operation: publishUserSignedUp
action: send
「同じeventを扱う」ことと「同じAPI契約を持つ」ことは別です。broker設定、topic権限、DLQ、retry policyなど、AsyncAPIだけでは表しきれない運用設定も別途管理します。
配信方式
| 方式 | 例 | 向いている場面 |
|---|---|---|
| キュー | SQS, RabbitMQ | 1つの処理者に渡す |
| Pub/Sub | SNS, Google Pub/Sub | 複数購読者に配る |
| ストリーム | Kafka, Kinesis | 順序、再処理、履歴 |
| イベントバス | EventBridge | サービス間連携 |
キューとストリームは似ていますが、履歴を読む設計かどうかが大きく違います。
冪等性
イベント処理では、同じイベントが複数回届くことを前提にします。冪等性とは、同じ処理を複数回実行しても結果が変わらない性質です。
eventId = evt_123 を処理済みとして記録
同じ eventId が来たらスキップ
決済、在庫、通知では特に重要です。
順序と重複
分散システムでは、イベントが順番通りに届くとは限りません。
対策:
- 集約単位で順序キーを使う
- バージョン番号を持つ
- 古いイベントを無視する
- 順序が必要な範囲を小さくする
すべてのイベントに全体順序を求めると、スケーラビリティが落ちます。
エラーハンドリング
イベント処理では失敗が必ず起きます。
- リトライ
- デッドレターキュー
- 遅延再試行
- 手動復旧
- 補償トランザクション
- アラート
AWSのイベント駆動アーキテクチャ資料でも、疎結合化、スケール、非同期処理の利点と同時に、失敗時の再試行、デッドレター、監視を設計することが重要です。
イベントソーシング
イベントソーシングは、現在状態ではなくイベント列を正として保存する方式です。
OrderCreated
ItemAdded
PaymentAuthorized
OrderShipped
利点:
- 状態の履歴が完全に残る
- 監査に強い
- 過去イベントから別のビューを再構築できる
注意点:
- 実装が複雑
- スキーマ進化が難しい
- 読み取りモデルが別途必要
CQRS
CQRSは、更新モデルと参照モデルを分ける考え方です。
読み取りが多いシステムや、画面ごとに最適化された参照モデルが必要な場合に有効です。
観測可能性
イベント駆動では、処理が非同期になるため追跡が難しくなります。
必要なもの:
- correlationId
- causationId
- eventId
- traceId
- 処理遅延
- キュー長
- リトライ回数
- デッドレター件数
オブザーバビリティを設計しないと、「どこで止まったか」が分からなくなります。
使うべき場面
向いている:
- 後続処理が複数ある
- 即時完了しなくてよい
- 外部サービス障害を分離したい
- 監査ログや履歴が重要
- ストリーム処理が必要
向いていない:
- 強い一貫性が必要
- 即時結果が必要
- 処理フローが単純
- チームが非同期運用に慣れていない
zure Architecture: Event-driven architecture style](https://learn.microsoft.com/en-us/azure/architecture/guide/architecture-styles/event-driven)
- microservices.io: Transactional Outbox
- Reactive Manifesto
- AsyncAPI: Event-Driven Architectures
- AsyncAPI: About
- AsyncAPI Specification
- AWS Prescriptive Guidance: Event-driven architecture
- AWS: Event-driven architectures
イベントスキーマ
イベント駆動では、イベント名だけでなくschemaが契約になります。producerとconsumerが別々に進化するため、互換性を意識して設計します。
{
"eventId": "evt_123",
"eventType": "OrderConfirmed",
"occurredAt": "2026-04-29T10:00:00Z",
"version": 1,
"payload": {
"orderId": "ord_123",
"customerId": "cus_456"
}
}
入れるべき情報:
- event id
- event type
- occurred at
- schema version
- correlation id
- causation id
- producer
- payload
アウトボックスパターン
DB更新とイベント発行を別々に行うと、片方だけ成功する問題が起きます。Outbox patternでは、業務DBの同じtransaction内にoutbox tableへイベントを書き、別processがそれをbrokerへ送ります。
これにより、DB更新とイベント記録の原子性を保ちやすくなります。ただし、consumer側では重複配信を前提に冪等に処理します。
コンシューマ設計
consumerは、イベントを受け取ったら必ず1回だけ処理できるとは限りません。
- 同じeventが複数回来る
- 順序が入れ替わる
- 古いschemaが届く
- 下流APIが失敗する
- 処理途中でprocessが落ちる
そのため、idempotency key、処理済みevent table、retry policy、dead letter queue、schema互換性を設計します。
AsyncAPI仕様による非同期API設計
AsyncAPI (asyncapi.com) は、非同期メッセージングプロトコルのための OpenAPI等価物です。
AsyncAPI 3.0 スキーマ構造
asyncapi: '3.0.0'
info:
title: 'E-Commerce Event Broker'
version: '1.0.0'
description: 'イベント駆動型ECシステムのメッセージング仕様'
defaultContentType: 'application/json'
servers:
rabbitmq:
host: 'rabbitmq.example.com:5672'
protocol: amqp
protocolVersion: '0.9.1'
kafka:
host: 'kafka.example.com:9092'
protocol: kafka
channels:
orders/created:
description: '注文作成イベント'
address: 'orders.created'
publish:
summary: '新規注文のイベント発行'
message:
$ref: '#/components/messages/OrderCreated'
inventory/updated:
description: '在庫更新イベント'
address: 'inventory.updated'
subscribe:
message:
$ref: '#/components/messages/InventoryUpdated'
components:
messages:
OrderCreated:
payload:
type: object
properties:
orderId:
type: string
format: uuid
customerId:
type: string
totalAmount:
type: number
format: float
items:
type: array
items:
type: object
properties:
productId:
type: string
quantity:
type: integer
unitPrice:
type: number
timestamp:
type: string
format: date-time
メッセージング仕様の重要性
- プロデューサー・コンシューマー間の契約管理
- スキーマ進化への対応
- API第一設計による堅牢性
参考: AsyncAPI Community は、各プロトコル(RabbitMQ, Kafka, MQTT)の実装例を提供しています。
AWS イベント駆動アーキテクチャ
AWS (docs.aws.amazon.com, aws.amazon.com) は、フルマネージドなイベント駆動サービスを提供します。
Amazon EventBridge
EventBridgeは、アプリケーション間のイベント配信を自動化します。
import boto3
import json
from datetime import datetime
client = boto3.client('events', region_name='us-east-1')
# イベントルールの作成
response = client.put_rule(
Name='order-processing-rule',
EventPattern=json.dumps({
'source': ['orders.service'],
'detail-type': ['Order Placed'],
'detail': {
'status': ['pending']
}
}),
State='ENABLED'
)
# ターゲット(SQS)の指定
client.put_targets(
Rule='order-processing-rule',
Targets=[
{
'Id': '1',
'Arn': 'arn:aws:sqs:us-east-1:123456789012:order-queue',
'RoleArn': 'arn:aws:iam::123456789012:role/EventBridgeRole'
}
]
)
イベントパターンマッチング:
{
"source": ["myapp"],
"detail-type": ["User Action"],
"detail": {
"action": ["login", "logout"],
"region": [{"prefix": "us-"}]
}
}
Amazon SNS/SQS メッセージング
SNS(Simple Notification Service): パブリッシュ・サブスクライブモデル
sns = boto3.client('sns')
topic_arn = 'arn:aws:sns:us-east-1:123456789012:user-events'
# メッセージの発行
response = sns.publish(
TopicArn=topic_arn,
Message=json.dumps({
'event_type': 'user.registered',
'user_id': '12345',
'email': 'user@example.com',
'timestamp': datetime.utcnow().isoformat()
}),
MessageAttributes={
'event_type': {'DataType': 'String', 'StringValue': 'user.registered'}
}
)
SQS(Simple Queue Service): メッセージキュー(メッセージ保持)
sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/order-queue'
# メッセージの送信
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
'order_id': 'ORD-001',
'amount': 99.99
}),
MessageAttributes={
'priority': {'StringValue': 'high', 'DataType': 'String'},
'retry_count': {'StringValue': '0', 'DataType': 'Number'}
}
)
# メッセージの受信
messages = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20 # ロングポーリング
)
AWS Lambda + EventBridge統合
def lambda_handler(event, context):
'''EventBridgeイベント処理ハンドラ'''
detail = event['detail']
order_id = detail['orderId']
# 在庫確認
if check_inventory(order_id):
# 決済処理
process_payment(detail)
# 完了イベント発行
publish_event('order.confirmed', detail)
else:
# キャンセルイベント発行
publish_event('order.cancelled', detail)
return {'statusCode': 200}
Microsoft Azure イベント駆動設計
Azure (learn.microsoft.com) のイベント駆動パターン実装:
Azure Event Grid
Event Gridは、イベントソースからイベントハンドラへの配信を管理します。
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import CloudEvent
endpoint = "https://{topic-name}.eventgrid.azure.net/api/events"
credential = AzureKeyCredential("{access-key}")
client = EventGridPublisherClient(endpoint, credential)
# カスタムイベントの発行
event = CloudEvent(
type="user.registered",
source="my-application",
data={
"user_id": "12345",
"email": "user@example.com",
"registration_date": "2026-05-02"
}
)
client.send(event)
Azure Service Bus
メッセージング:トピック・サブスクリプション
from azure.servicebus import ServiceBusClient, ServiceBusMessage
connstr = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=..."
client = ServiceBusClient.from_connection_string(connstr)
# メッセージ送信
with client.get_topic_sender(topic_name="orders") as sender:
message = ServiceBusMessage(
body="Order data",
subject="order.created",
session_id="order-123",
correlation_id="req-456"
)
sender.send_messages(message)
# メッセージ受信
with client.get_subscription_receiver(
topic_name="orders",
subscription_name="inventory-service"
) as receiver:
messages = receiver.receive_messages(max_wait_time=10)
for msg in messages:
print(msg.subject, msg.body)
msg.complete()
Reactive Manifesto と宣言的設計
Reactive Manifesto (reactivemanifesto.org) は、分散システムの4つの特性を定義しています。
4つの特性
- Responsive(応答性): システムはリアルタイムレスポンスを提供
- Resilient(復元力): コンポーネント障害時にシステムは継続稼働
- Elastic(弾性): 負荷変動に応じてスケール
- Message Driven(メッセージ駆動): 非同期メッセージングによる疎結合
実装パターン
from asyncio import Queue
import asyncio
class ReactiveEventBus:
'''イベント駆動型リアクティブシステム'''
def __init__(self):
self.subscribers = {}
self.event_queue = Queue()
async def subscribe(self, event_type, handler):
'''イベント購読'''
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def publish(self, event_type, data):
'''イベント発行(非同期)'''
await self.event_queue.put((event_type, data))
async def start(self):
'''イベント処理ループ'''
while True:
event_type, data = await self.event_queue.get()
if event_type in self.subscribers:
# 全購読者に並行実行で通知
tasks = [
handler(data)
for handler in self.subscribers[event_type]
]
await asyncio.gather(*tasks, return_exceptions=True)
# 使用例
bus = ReactiveEventBus()
async def handle_order(data):
print(f"処理: {data}")
await asyncio.sleep(1) # 非同期処理
await bus.subscribe('order.created', handle_order)
await bus.publish('order.created', {'order_id': 123})
マイクロサービス通信パターン
Microservices.io (microservices.io) の通信パターン実装:
Saga パターン (分散トランザクション)
class OrderSaga:
'''注文処理Saga(分散トランザクション)'''
async def execute_order(self, order_data):
try:
# ステップ1: 在庫予約
reservation_id = await self.inventory_service.reserve(
order_data['items']
)
# ステップ2: 決済
payment_id = await self.payment_service.charge(
order_data['amount']
)
# ステップ3: 配送
shipment_id = await self.shipping_service.create_shipment(
order_data
)
# 完了
await self.order_service.confirm(order_data['order_id'])
except Exception as e:
# 補償トランザクション実行
await self.compensate(reservation_id, payment_id)
raise
async def compensate(self, reservation_id, payment_id):
'''ロールバック処理'''
await self.inventory_service.release(reservation_id)
await self.payment_service.refund(payment_id)
Circuit Breaker パターン
import asyncio
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = 'closed' # 正常
OPEN = 'open' # 故障
HALF_OPEN = 'half_open' # 復旧テスト中
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
async def call(self, coro):
if self.state == CircuitState.OPEN:
# 復旧時間経過をチェック
if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await coro
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# 使用例
breaker = CircuitBreaker(failure_threshold=3)
async def call_external_api():
return await breaker.call(some_async_operation())
イベント駆動設計のベストプラクティス
-
イベント粒度の最適化
- 粗粒度: パフォーマンス向上、但し再利用性低い
- 細粒度: 再利用性高い、但しイベント数増加
-
冪等性の確保
# イベント再処理時に同じ結果を生じる def process_payment(event_id, amount): existing = db.query(Payment).filter(id=event_id).first() if existing: return existing return db.insert(Payment(id=event_id, amount=amount)) -
イベント順序の保証
- パーティショニングキー(customer_id等)で順序保証
- Kafka: 同一パーティション内で順序保証
-
デッドレター処理
# 処理失敗イベントをDLQに移動 async def handle_with_dlq(event, max_retries=3): for attempt in range(max_retries): try: await process_event(event) return except Exception as e: if attempt == max_retries - 1: await dead_letter_queue.send(event, error=str(e)) await asyncio.sleep(2 ** attempt) # 指数バックオフ
実装基盤を選ぶときの整理
AsyncAPI、AWS EventBridge、Azure Service Bus、Kafkaのような技術は、どれもイベント駆動を支える道具ですが、役割は同じではありません。AsyncAPIは契約を記述する仕様、EventBridgeやService Busはマネージドな配送基盤、Kafkaはログ指向のストリーミング基盤です。
| 選択肢 | 向いていること | 注意点 |
|---|---|---|
| AsyncAPI | イベント契約、チャネル、メッセージ形式の文書化 | broker設定や権限までは別管理が必要 |
| EventBridge | SaaS連携、疎結合なイベントルーティング | 厳密な順序や高スループット処理には別設計が必要 |
| Azure Service Bus | enterprise messaging、queue、topic、dead-letter | メッセージ設計と再試行方針を明示する |
| Kafka | 大量イベント、再処理、stream processing | 運用、partition設計、schema進化が難しい |
技術選定では、まず「イベントを契約として管理したいのか」「業務イベントを配送したいのか」「履歴を再処理したいのか」を分けます。ここを混ぜると、軽い通知のために重いstreaming基盤を持ったり、再処理が必要なデータを一過性のqueueだけに流したりしがちです。
イベント駆動設計のトレードオフ
スケーラビリティ vs 複雑性
| 特性 | 同期API | 非同期イベント |
|---|---|---|
| 実装の単純さ | 高い | 低い(分散追跡、補償トランザクション等) |
| 結果の即時性 | 即座 | 遅延(最終一貫性) |
| スケーラビリティ | 制限される(直結) | 優れている(疎結合) |
| 障害の隔離 | 困難(波及する) | 容易(独立処理) |
| 監視・デバッグ | 直線的 | 複雑(分散イベント追跡) |
使い分け原則
-
同期APIを選ぶべき場合:
- 即座の応答が必要
- 結果が確定的でなければならない
- オブジェクト数が少ない
-
非同期イベントを選ぶべき場合:
- 高スケーラビリティが必須
- 結果整合性で許容される
- サービス間の疎結合が重要
- イベントソーシングで監査径跡が必要
まとめ
イベント駆動設計は、サービス間を疎結合にし、拡張しやすい構造を作る強力な方法です。一方で、重複、順序、再試行、観測可能性を設計しないと、問題の追跡が難しくなります。イベントは「起きた事実」として扱い、必要な複雑さだけを導入することが重要です。