Event-Driven Architecture - Asynchronous System Design
Tổng quan
Event-driven architecture sử dụng events để trigger behavior across loosely coupled microservices.
Core Concepts
Events
# Event structure
class OrderCreatedEvent:
def __init__(self, order_id, user_id, items, total):
self.event_type = "OrderCreated"
self.timestamp = datetime.utcnow()
self.order_id = order_id
self.user_id = user_id
self.items = items
self.total = total
Event Bus
class EventBus:
def __init__(self):
self.subscribers = defaultdict(list)
def subscribe(self, event_type, handler):
self.subscribers[event_type].append(handler)
def publish(self, event):
for handler in self.subscribers[event.event_type]:
handler(event)
Patterns
Pub/Sub (Publisher-Subscriber)
# Publisher
class OrderService:
def create_order(self, order_data):
order = self.save_order(order_data)
event = OrderCreatedEvent(
order.id, order.user_id,
order.items, order.total
)
self.event_bus.publish(event)
return order
# Subscribers
class EmailService:
def handle_order_created(self, event):
self.send_confirmation_email(event.user_id, event.order_id)
class InventoryService:
def handle_order_created(self, event):
self.reserve_items(event.items)
Event Sourcing
class EventStore:
def append(self, stream_id, events):
for event in events:
self.db.insert({
'stream_id': stream_id,
'event_type': event.__class__.__name__,
'event_data': json.dumps(event.__dict__),
'version': self.get_next_version(stream_id),
'timestamp': datetime.utcnow()
})
def get_events(self, stream_id):
return self.db.query(
"SELECT * FROM events WHERE stream_id = ? ORDER BY version",
[stream_id]
)
class OrderAggregate:
def replay_events(self, events):
for event in events:
self.apply(event)
Message Queues
RabbitMQ Example
import pika
class RabbitMQPublisher:
def publish_event(self, routing_key, event):
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='events', exchange_type='topic')
channel.basic_publish(
exchange='events',
routing_key=routing_key,
body=json.dumps(event.__dict__)
)
connection.close()
Apache Kafka Example
from kafka import KafkaProducer, KafkaConsumer
class KafkaEventBus:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
def publish(self, topic, event):
self.producer.send(topic, event.__dict__)
def subscribe(self, topic, handler):
consumer = KafkaConsumer(
topic,
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
handler(message.value)
Best Practices
Event Design
- Events should be immutable
- Include all necessary context
- Use clear, descriptive names
- Version your events
Error Handling
class EventHandler:
def handle_with_retry(self, event, max_retries=3):
for attempt in range(max_retries):
try:
self.process(event)
return
except Exception as e:
if attempt == max_retries - 1:
self.send_to_dead_letter_queue(event, e)
else:
time.sleep(2 ** attempt) # Exponential backoff
Saga Pattern
class PaymentSaga:
def handle_order_created(self, event):
try:
payment = self.process_payment(event.user_id, event.total)
self.publish(PaymentSuccessEvent(event.order_id, payment.id))
except PaymentFailedException:
self.publish(PaymentFailedEvent(event.order_id))
Next Steps
- 📚 Học về Serverless Architecture
- 🎯 Practice event modeling
- 🏗️ Explore CQRS patterns
- 💻 Setup event monitoring
Content sẽ được expand với detailed event-driven patterns.