Message Queues - Asynchronous Communication
Tổng quan
Message queues enable asynchronous communication between services, improving system resilience và scalability.
Core Concepts
Producer-Consumer Pattern
# Producer
class MessageProducer:
def send_message(self, queue_name, message):
self.queue.publish(queue_name, message)
# Consumer
class MessageConsumer:
def consume_messages(self, queue_name):
while True:
message = self.queue.get(queue_name)
if message:
self.process_message(message)
Message Structure
class Message:
def __init__(self, id, payload, timestamp=None, retry_count=0):
self.id = id
self.payload = payload
self.timestamp = timestamp or datetime.utcnow()
self.retry_count = retry_count
self.headers = {}
Message Queue Types
Point-to-Point (Queue)
class PointToPointQueue:
def __init__(self):
self.queue = []
def send(self, message):
self.queue.append(message)
def receive(self):
return self.queue.pop(0) if self.queue else None
Publish-Subscribe (Topic)
class PubSubTopic:
def __init__(self):
self.subscribers = []
def subscribe(self, subscriber):
self.subscribers.append(subscriber)
def publish(self, message):
for subscriber in self.subscribers:
subscriber.receive(message)
Popular Message Queue Systems
RabbitMQ
import pika
class RabbitMQClient:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
def send_message(self, queue_name, message):
self.channel.queue_declare(queue=queue_name, durable=True)
self.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # Persist message
)
def consume_messages(self, queue_name, callback):
self.channel.queue_declare(queue=queue_name, durable=True)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False
)
self.channel.start_consuming()
Apache Kafka
from kafka import KafkaProducer, KafkaConsumer
class KafkaClient:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
def send_message(self, topic, message):
future = self.producer.send(topic, message)
return future.get(timeout=10)
def consume_messages(self, topic, group_id):
consumer = KafkaConsumer(
topic,
bootstrap_servers=['localhost:9092'],
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
yield message.value
Amazon SQS
import boto3
class SQSClient:
def __init__(self):
self.sqs = boto3.client('sqs')
def send_message(self, queue_url, message):
response = self.sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
DelaySeconds=0
)
return response['MessageId']
def receive_messages(self, queue_url):
response = self.sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20 # Long polling
)
return response.get('Messages', [])
Message Patterns
Work Queue
class WorkQueue:
def add_task(self, task):
message = {
'task_type': task.type,
'task_data': task.data,
'priority': task.priority
}
self.queue.send('work_queue', message)
class Worker:
def process_tasks(self):
while True:
message = self.queue.receive('work_queue')
if message:
self.execute_task(message)
self.queue.ack(message)
Request-Reply
class RequestReplyPattern:
def send_request(self, request):
correlation_id = str(uuid.uuid4())
reply_queue = f"reply_{correlation_id}"
message = {
'data': request,
'reply_to': reply_queue,
'correlation_id': correlation_id
}
self.queue.send('request_queue', message)
# Wait for reply
reply = self.queue.receive(reply_queue, timeout=30)
return reply
def handle_request(self, message):
# Process request
result = self.process(message['data'])
# Send reply
reply = {
'correlation_id': message['correlation_id'],
'result': result
}
self.queue.send(message['reply_to'], reply)
Message Durability
Persistent Messages
class DurableMessageQueue:
def send_persistent_message(self, queue_name, message):
self.queue.send(
queue_name,
message,
persistent=True, # Survive broker restart
confirm=True # Wait for confirmation
)
Message Acknowledgment
class ReliableConsumer:
def consume_with_ack(self, queue_name):
message = self.queue.receive(queue_name, auto_ack=False)
try:
self.process_message(message)
self.queue.ack(message) # Acknowledge successful processing
except Exception as e:
self.queue.nack(message, requeue=True) # Negative acknowledgment
Error Handling
Dead Letter Queues
class DeadLetterQueue:
def setup_dlq(self, main_queue, dlq_name, max_retries=3):
queue_config = {
'x-message-ttl': 60000, # 1 minute
'x-max-retries': max_retries,
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': dlq_name
}
self.queue.declare(main_queue, arguments=queue_config)
Retry Logic
class RetryableConsumer:
def process_with_retry(self, message, max_retries=3):
retry_count = message.get('retry_count', 0)
try:
self.process_message(message)
except Exception as e:
if retry_count < max_retries:
# Exponential backoff
delay = 2 ** retry_count
message['retry_count'] = retry_count + 1
self.queue.send_delayed(
'retry_queue',
message,
delay=delay
)
else:
self.queue.send('dead_letter_queue', message)
Performance Optimization
Batch Processing
class BatchProcessor:
def __init__(self, batch_size=100):
self.batch_size = batch_size
self.batch = []
def consume_in_batches(self, queue_name):
while True:
message = self.queue.receive(queue_name)
if message:
self.batch.append(message)
if len(self.batch) >= self.batch_size:
self.process_batch()
self.batch.clear()
Message Compression
import gzip
class CompressedMessageQueue:
def send_compressed(self, queue_name, message):
compressed_data = gzip.compress(
json.dumps(message).encode('utf-8')
)
self.queue.send(queue_name, compressed_data)
def receive_compressed(self, queue_name):
compressed_data = self.queue.receive(queue_name)
if compressed_data:
decompressed = gzip.decompress(compressed_data)
return json.loads(decompressed.decode('utf-8'))
Monitoring
Queue Metrics
class QueueMonitoring:
def get_metrics(self, queue_name):
return {
'queue_depth': self.queue.get_depth(queue_name),
'message_rate': self.get_message_rate(queue_name),
'consumer_count': self.get_consumer_count(queue_name),
'avg_processing_time': self.get_avg_processing_time(queue_name),
'error_rate': self.get_error_rate(queue_name)
}
Scaling Patterns
Competing Consumers
class CompetingConsumers:
def __init__(self, num_consumers=5):
self.consumers = []
for i in range(num_consumers):
consumer = MessageConsumer(f"consumer_{i}")
self.consumers.append(consumer)
def start_all_consumers(self, queue_name):
for consumer in self.consumers:
threading.Thread(
target=consumer.consume_messages,
args=[queue_name]
).start()
Message Routing
class MessageRouter:
def route_message(self, message):
if message['type'] == 'order':
return 'order_processing_queue'
elif message['type'] == 'payment':
return 'payment_queue'
elif message['priority'] == 'high':
return 'high_priority_queue'
else:
return 'default_queue'
Best Practices
Message Design
- Keep messages small
- Include correlation IDs
- Set appropriate TTL
- Use idempotent operations
- Handle duplicates gracefully
Queue Management
- Monitor queue depths
- Set up dead letter queues
- Implement proper error handling
- Plan for scaling
- Use appropriate message durability
Next Steps
- 📚 Học về Databases & Storage
- 🎯 Practice message queue patterns
- 🏗️ Explore event streaming
- 💻 Setup queue monitoring
Content sẽ được expand với advanced messaging patterns.