Message Queues - Asynchronous Communication

Tổng quan

Message queues enable asynchronous communication between services, improving system resilience và scalability.

Core Concepts

Producer-Consumer Pattern

# Producer
class MessageProducer:
    def send_message(self, queue_name, message):
        self.queue.publish(queue_name, message)

# Consumer
class MessageConsumer:
    def consume_messages(self, queue_name):
        while True:
            message = self.queue.get(queue_name)
            if message:
                self.process_message(message)

Message Structure

class Message:
    def __init__(self, id, payload, timestamp=None, retry_count=0):
        self.id = id
        self.payload = payload
        self.timestamp = timestamp or datetime.utcnow()
        self.retry_count = retry_count
        self.headers = {}

Message Queue Types

Point-to-Point (Queue)

class PointToPointQueue:
    def __init__(self):
        self.queue = []

    def send(self, message):
        self.queue.append(message)

    def receive(self):
        return self.queue.pop(0) if self.queue else None

Publish-Subscribe (Topic)

class PubSubTopic:
    def __init__(self):
        self.subscribers = []

    def subscribe(self, subscriber):
        self.subscribers.append(subscriber)

    def publish(self, message):
        for subscriber in self.subscribers:
            subscriber.receive(message)

RabbitMQ

import pika

class RabbitMQClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

    def send_message(self, queue_name, message):
        self.channel.queue_declare(queue=queue_name, durable=True)
        self.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2)  # Persist message
        )

    def consume_messages(self, queue_name, callback):
        self.channel.queue_declare(queue=queue_name, durable=True)
        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback,
            auto_ack=False
        )
        self.channel.start_consuming()

Apache Kafka

from kafka import KafkaProducer, KafkaConsumer

class KafkaClient:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )

    def send_message(self, topic, message):
        future = self.producer.send(topic, message)
        return future.get(timeout=10)

    def consume_messages(self, topic, group_id):
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=['localhost:9092'],
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )

        for message in consumer:
            yield message.value

Amazon SQS

import boto3

class SQSClient:
    def __init__(self):
        self.sqs = boto3.client('sqs')

    def send_message(self, queue_url, message):
        response = self.sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(message),
            DelaySeconds=0
        )
        return response['MessageId']

    def receive_messages(self, queue_url):
        response = self.sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20  # Long polling
        )
        return response.get('Messages', [])

Message Patterns

Work Queue

class WorkQueue:
    def add_task(self, task):
        message = {
            'task_type': task.type,
            'task_data': task.data,
            'priority': task.priority
        }
        self.queue.send('work_queue', message)

class Worker:
    def process_tasks(self):
        while True:
            message = self.queue.receive('work_queue')
            if message:
                self.execute_task(message)
                self.queue.ack(message)

Request-Reply

class RequestReplyPattern:
    def send_request(self, request):
        correlation_id = str(uuid.uuid4())
        reply_queue = f"reply_{correlation_id}"

        message = {
            'data': request,
            'reply_to': reply_queue,
            'correlation_id': correlation_id
        }

        self.queue.send('request_queue', message)

        # Wait for reply
        reply = self.queue.receive(reply_queue, timeout=30)
        return reply

    def handle_request(self, message):
        # Process request
        result = self.process(message['data'])

        # Send reply
        reply = {
            'correlation_id': message['correlation_id'],
            'result': result
        }
        self.queue.send(message['reply_to'], reply)

Message Durability

Persistent Messages

class DurableMessageQueue:
    def send_persistent_message(self, queue_name, message):
        self.queue.send(
            queue_name, 
            message,
            persistent=True,  # Survive broker restart
            confirm=True      # Wait for confirmation
        )

Message Acknowledgment

class ReliableConsumer:
    def consume_with_ack(self, queue_name):
        message = self.queue.receive(queue_name, auto_ack=False)

        try:
            self.process_message(message)
            self.queue.ack(message)  # Acknowledge successful processing
        except Exception as e:
            self.queue.nack(message, requeue=True)  # Negative acknowledgment

Error Handling

Dead Letter Queues

class DeadLetterQueue:
    def setup_dlq(self, main_queue, dlq_name, max_retries=3):
        queue_config = {
            'x-message-ttl': 60000,  # 1 minute
            'x-max-retries': max_retries,
            'x-dead-letter-exchange': 'dlx',
            'x-dead-letter-routing-key': dlq_name
        }
        self.queue.declare(main_queue, arguments=queue_config)

Retry Logic

class RetryableConsumer:
    def process_with_retry(self, message, max_retries=3):
        retry_count = message.get('retry_count', 0)

        try:
            self.process_message(message)
        except Exception as e:
            if retry_count < max_retries:
                # Exponential backoff
                delay = 2 ** retry_count
                message['retry_count'] = retry_count + 1

                self.queue.send_delayed(
                    'retry_queue',
                    message,
                    delay=delay
                )
            else:
                self.queue.send('dead_letter_queue', message)

Performance Optimization

Batch Processing

class BatchProcessor:
    def __init__(self, batch_size=100):
        self.batch_size = batch_size
        self.batch = []

    def consume_in_batches(self, queue_name):
        while True:
            message = self.queue.receive(queue_name)
            if message:
                self.batch.append(message)

                if len(self.batch) >= self.batch_size:
                    self.process_batch()
                    self.batch.clear()

Message Compression

import gzip

class CompressedMessageQueue:
    def send_compressed(self, queue_name, message):
        compressed_data = gzip.compress(
            json.dumps(message).encode('utf-8')
        )
        self.queue.send(queue_name, compressed_data)

    def receive_compressed(self, queue_name):
        compressed_data = self.queue.receive(queue_name)
        if compressed_data:
            decompressed = gzip.decompress(compressed_data)
            return json.loads(decompressed.decode('utf-8'))

Monitoring

Queue Metrics

class QueueMonitoring:
    def get_metrics(self, queue_name):
        return {
            'queue_depth': self.queue.get_depth(queue_name),
            'message_rate': self.get_message_rate(queue_name),
            'consumer_count': self.get_consumer_count(queue_name),
            'avg_processing_time': self.get_avg_processing_time(queue_name),
            'error_rate': self.get_error_rate(queue_name)
        }

Scaling Patterns

Competing Consumers

class CompetingConsumers:
    def __init__(self, num_consumers=5):
        self.consumers = []
        for i in range(num_consumers):
            consumer = MessageConsumer(f"consumer_{i}")
            self.consumers.append(consumer)

    def start_all_consumers(self, queue_name):
        for consumer in self.consumers:
            threading.Thread(
                target=consumer.consume_messages,
                args=[queue_name]
            ).start()

Message Routing

class MessageRouter:
    def route_message(self, message):
        if message['type'] == 'order':
            return 'order_processing_queue'
        elif message['type'] == 'payment':
            return 'payment_queue'
        elif message['priority'] == 'high':
            return 'high_priority_queue'
        else:
            return 'default_queue'

Best Practices

Message Design

  • Keep messages small
  • Include correlation IDs
  • Set appropriate TTL
  • Use idempotent operations
  • Handle duplicates gracefully

Queue Management

  • Monitor queue depths
  • Set up dead letter queues
  • Implement proper error handling
  • Plan for scaling
  • Use appropriate message durability

Next Steps

  1. 📚 Học về Databases & Storage
  2. 🎯 Practice message queue patterns
  3. 🏗️ Explore event streaming
  4. 💻 Setup queue monitoring

Content sẽ được expand với advanced messaging patterns.