Event-Driven Architecture - Asynchronous System Design

Tổng quan

Event-driven architecture sử dụng events để trigger behavior across loosely coupled microservices.

Core Concepts

Events

# Event structure
class OrderCreatedEvent:
    def __init__(self, order_id, user_id, items, total):
        self.event_type = "OrderCreated"
        self.timestamp = datetime.utcnow()
        self.order_id = order_id
        self.user_id = user_id
        self.items = items
        self.total = total

Event Bus

class EventBus:
    def __init__(self):
        self.subscribers = defaultdict(list)

    def subscribe(self, event_type, handler):
        self.subscribers[event_type].append(handler)

    def publish(self, event):
        for handler in self.subscribers[event.event_type]:
            handler(event)

Patterns

Pub/Sub (Publisher-Subscriber)

# Publisher
class OrderService:
    def create_order(self, order_data):
        order = self.save_order(order_data)

        event = OrderCreatedEvent(
            order.id, order.user_id, 
            order.items, order.total
        )
        self.event_bus.publish(event)
        return order

# Subscribers
class EmailService:
    def handle_order_created(self, event):
        self.send_confirmation_email(event.user_id, event.order_id)

class InventoryService:
    def handle_order_created(self, event):
        self.reserve_items(event.items)

Event Sourcing

class EventStore:
    def append(self, stream_id, events):
        for event in events:
            self.db.insert({
                'stream_id': stream_id,
                'event_type': event.__class__.__name__,
                'event_data': json.dumps(event.__dict__),
                'version': self.get_next_version(stream_id),
                'timestamp': datetime.utcnow()
            })

    def get_events(self, stream_id):
        return self.db.query(
            "SELECT * FROM events WHERE stream_id = ? ORDER BY version",
            [stream_id]
        )

class OrderAggregate:
    def replay_events(self, events):
        for event in events:
            self.apply(event)

Message Queues

RabbitMQ Example

import pika

class RabbitMQPublisher:
    def publish_event(self, routing_key, event):
        connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        channel = connection.channel()

        channel.exchange_declare(exchange='events', exchange_type='topic')

        channel.basic_publish(
            exchange='events',
            routing_key=routing_key,
            body=json.dumps(event.__dict__)
        )
        connection.close()

Apache Kafka Example

from kafka import KafkaProducer, KafkaConsumer

class KafkaEventBus:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )

    def publish(self, topic, event):
        self.producer.send(topic, event.__dict__)

    def subscribe(self, topic, handler):
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )

        for message in consumer:
            handler(message.value)

Best Practices

Event Design

  • Events should be immutable
  • Include all necessary context
  • Use clear, descriptive names
  • Version your events

Error Handling

class EventHandler:
    def handle_with_retry(self, event, max_retries=3):
        for attempt in range(max_retries):
            try:
                self.process(event)
                return
            except Exception as e:
                if attempt == max_retries - 1:
                    self.send_to_dead_letter_queue(event, e)
                else:
                    time.sleep(2 ** attempt)  # Exponential backoff

Saga Pattern

class PaymentSaga:
    def handle_order_created(self, event):
        try:
            payment = self.process_payment(event.user_id, event.total)
            self.publish(PaymentSuccessEvent(event.order_id, payment.id))
        except PaymentFailedException:
            self.publish(PaymentFailedEvent(event.order_id))

Next Steps

  1. 📚 Học về Serverless Architecture
  2. 🎯 Practice event modeling
  3. 🏗️ Explore CQRS patterns
  4. 💻 Setup event monitoring

Content sẽ được expand với detailed event-driven patterns.