Case Study: Chat System (như WhatsApp/Slack)
Tổng Quan
Chat System là ứng dụng real-time messaging cho phép users giao tiếp qua text, voice, video. Yêu cầu low latency, high availability và scalability cao.
Requirements
Functional Requirements
1. One-on-one messaging
2. Group chats (max 500 members)
3. Online/offline status
4. Message delivery status (sent, delivered, read)
5. File sharing (images, documents)
6. Message history và search
7. Push notifications
Non-Functional Requirements
- Scale: 1B users, 100M concurrent connections
- Message volume: 10B messages/day
- Latency: < 100ms message delivery
- Availability: 99.99%
- Message retention: 7 years
- End-to-end encryption
Capacity Estimation
Storage Requirements
class ChatSystemCapacity:
def __init__(self):
self.daily_active_users = 100_000_000
self.messages_per_user_per_day = 50
self.avg_message_size = 100 # bytes
self.media_message_ratio = 0.2
self.avg_media_size = 2 * 1024 * 1024 # 2MB
def daily_storage_requirement(self):
total_messages = self.daily_active_users * self.messages_per_user_per_day
text_messages = total_messages * (1 - self.media_message_ratio)
media_messages = total_messages * self.media_message_ratio
text_storage = text_messages * self.avg_message_size
media_storage = media_messages * self.avg_media_size
return text_storage + media_storage # bytes per day
Traffic Estimation
Messages per day: 100M users * 50 messages = 5B messages/day
Peak QPS: 5B / (24 * 3600) * 3 = ~174K messages/second
Concurrent connections: 100M users
WebSocket connections: 100M persistent connections
Database Design
Message Storage
-- Messages table (partitioned by date)
CREATE TABLE messages (
message_id BIGINT PRIMARY KEY,
chat_id BIGINT NOT NULL,
sender_id BIGINT NOT NULL,
content TEXT,
message_type ENUM('text', 'image', 'file', 'voice'),
media_url TEXT,
created_at TIMESTAMP,
updated_at TIMESTAMP,
is_deleted BOOLEAN DEFAULT FALSE
) PARTITION BY RANGE (created_at);
-- Chats table
CREATE TABLE chats (
chat_id BIGINT PRIMARY KEY,
chat_type ENUM('one_on_one', 'group'),
chat_name VARCHAR(100),
created_by BIGINT,
created_at TIMESTAMP,
last_message_at TIMESTAMP,
member_count INT DEFAULT 0
);
-- Chat members
CREATE TABLE chat_members (
chat_id BIGINT,
user_id BIGINT,
joined_at TIMESTAMP,
role ENUM('admin', 'member'),
last_read_message_id BIGINT,
PRIMARY KEY (chat_id, user_id)
);
-- Message delivery status
CREATE TABLE message_status (
message_id BIGINT,
user_id BIGINT,
status ENUM('sent', 'delivered', 'read'),
timestamp TIMESTAMP,
PRIMARY KEY (message_id, user_id)
);
Real-time Architecture
WebSocket Management
class WebSocketManager:
def __init__(self):
self.connections = {} # user_id -> websocket
self.user_servers = {} # user_id -> server_id
async def handle_connection(self, websocket, user_id):
self.connections[user_id] = websocket
self.user_servers[user_id] = self.server_id
# Update user status to online
await self.update_user_status(user_id, 'online')
try:
async for message in websocket:
await self.handle_message(user_id, message)
except websockets.exceptions.ConnectionClosed:
pass
finally:
await self.cleanup_connection(user_id)
async def send_message(self, recipient_id, message):
if recipient_id in self.connections:
# User connected to this server
await self.connections[recipient_id].send(json.dumps(message))
else:
# User connected to different server
await self.forward_to_server(recipient_id, message)
Message Delivery
class MessageDelivery:
def __init__(self):
self.message_queue = MessageQueue()
self.websocket_manager = WebSocketManager()
async def send_message(self, chat_id, sender_id, content):
# Create message record
message = await self.create_message(chat_id, sender_id, content)
# Get chat members
members = await self.get_chat_members(chat_id)
# Send to online members via WebSocket
online_members = []
offline_members = []
for member_id in members:
if await self.is_user_online(member_id):
await self.websocket_manager.send_message(member_id, {
'type': 'new_message',
'message': message.to_dict()
})
online_members.append(member_id)
else:
offline_members.append(member_id)
# Queue push notifications for offline users
if offline_members:
await self.queue_push_notifications(message, offline_members)
# Update delivery status
await self.update_delivery_status(message.id, online_members, 'delivered')
return message
Message Routing
Server Discovery
class MessageRouter:
def __init__(self):
self.service_discovery = ServiceDiscovery()
self.load_balancer = LoadBalancer()
async def route_message(self, recipient_id, message):
# Find which server the user is connected to
server_id = await self.find_user_server(recipient_id)
if server_id:
# Forward message to appropriate server
await self.forward_message(server_id, recipient_id, message)
else:
# User is offline, store for later delivery
await self.store_offline_message(recipient_id, message)
async def find_user_server(self, user_id):
# Check Redis for user -> server mapping
server_id = await self.redis.get(f"user_server:{user_id}")
return server_id
Message Ordering
class MessageOrdering:
def __init__(self):
self.sequence_generators = {} # chat_id -> sequence
def get_next_sequence(self, chat_id):
if chat_id not in self.sequence_generators:
self.sequence_generators[chat_id] = 0
self.sequence_generators[chat_id] += 1
return self.sequence_generators[chat_id]
def ensure_message_order(self, chat_id, messages):
# Sort messages by sequence number
return sorted(messages, key=lambda m: m.sequence_number)
Group Chat Optimization
Fan-out Strategy
class GroupChatHandler:
def __init__(self):
self.max_group_size = 500
async def send_group_message(self, chat_id, sender_id, content):
members = await self.get_active_members(chat_id)
if len(members) <= 100:
# Small group: immediate fan-out
await self.immediate_fanout(chat_id, members, content)
else:
# Large group: async fan-out
await self.async_fanout(chat_id, members, content)
async def immediate_fanout(self, chat_id, members, content):
tasks = []
for member_id in members:
task = asyncio.create_task(
self.send_to_member(member_id, content)
)
tasks.append(task)
await asyncio.gather(*tasks)
async def async_fanout(self, chat_id, members, content):
# Batch members into chunks
batch_size = 50
for i in range(0, len(members), batch_size):
batch = members[i:i + batch_size]
# Send batch to background worker
await self.queue_batch_delivery(chat_id, batch, content)
Push Notifications
Notification Service
class PushNotificationService:
def __init__(self):
self.apns = APNSClient() # iOS
self.fcm = FCMClient() # Android
async def send_notification(self, user_id, message):
user_devices = await self.get_user_devices(user_id)
for device in user_devices:
if device.platform == 'ios':
await self.send_ios_notification(device.token, message)
elif device.platform == 'android':
await self.send_android_notification(device.token, message)
async def send_ios_notification(self, device_token, message):
payload = {
'aps': {
'alert': {
'title': f"New message from {message.sender_name}",
'body': message.content[:100] # Truncate long messages
},
'badge': await self.get_unread_count(message.recipient_id),
'sound': 'default'
},
'chat_id': message.chat_id,
'message_id': message.id
}
await self.apns.send(device_token, payload)
Encryption
End-to-End Encryption
class E2EEncryption:
def __init__(self):
self.key_exchange = DiffieHellmanKeyExchange()
def encrypt_message(self, message, recipient_public_key):
# Generate symmetric key for this message
symmetric_key = self.generate_symmetric_key()
# Encrypt message with symmetric key
encrypted_message = AES.encrypt(message, symmetric_key)
# Encrypt symmetric key with recipient's public key
encrypted_key = RSA.encrypt(symmetric_key, recipient_public_key)
return {
'encrypted_message': encrypted_message,
'encrypted_key': encrypted_key
}
def decrypt_message(self, encrypted_data, private_key):
# Decrypt symmetric key
symmetric_key = RSA.decrypt(encrypted_data['encrypted_key'], private_key)
# Decrypt message
message = AES.decrypt(encrypted_data['encrypted_message'], symmetric_key)
return message
Caching Strategy
Message Caching
class MessageCache:
def __init__(self):
self.redis = Redis()
self.cache_ttl = 3600 # 1 hour
async def cache_recent_messages(self, chat_id, messages):
cache_key = f"recent_messages:{chat_id}"
# Store as sorted set with timestamp as score
for message in messages:
await self.redis.zadd(
cache_key,
{message.to_json(): message.created_at.timestamp()}
)
# Keep only last 100 messages
await self.redis.zremrangebyrank(cache_key, 0, -101)
await self.redis.expire(cache_key, self.cache_ttl)
async def get_recent_messages(self, chat_id, limit=50):
cache_key = f"recent_messages:{chat_id}"
cached_messages = await self.redis.zrevrange(cache_key, 0, limit-1)
if cached_messages:
return [json.loads(msg) for msg in cached_messages]
# Cache miss: fetch from database
messages = await self.fetch_from_database(chat_id, limit)
await self.cache_recent_messages(chat_id, messages)
return messages
File Sharing
Media Upload
class MediaUploadService:
def __init__(self):
self.s3_client = S3Client()
self.cdn = CDNClient()
self.max_file_size = 100 * 1024 * 1024 # 100MB
async def upload_file(self, file_data, user_id, chat_id):
if len(file_data) > self.max_file_size:
raise FileTooLargeError("File exceeds maximum size limit")
# Generate unique filename
file_id = uuid.uuid4()
file_extension = self.get_file_extension(file_data)
filename = f"{chat_id}/{file_id}.{file_extension}"
# Upload to S3
s3_url = await self.s3_client.upload(filename, file_data)
# Generate CDN URL
cdn_url = await self.cdn.generate_url(filename)
# Create file record
file_record = await self.create_file_record(
file_id, filename, len(file_data), user_id, chat_id
)
return {
'file_id': file_id,
'cdn_url': cdn_url,
'file_size': len(file_data)
}
Monitoring và Analytics
Real-time Metrics
class ChatMetrics:
def __init__(self):
self.metrics_collector = MetricsCollector()
def track_message_delivery(self, message_id, latency):
self.metrics_collector.histogram(
'message_delivery_latency',
latency,
tags={'message_type': 'text'}
)
def track_connection_count(self, server_id, count):
self.metrics_collector.gauge(
'active_connections',
count,
tags={'server_id': server_id}
)
def track_message_throughput(self, messages_per_second):
self.metrics_collector.gauge(
'message_throughput',
messages_per_second
)
Scalability Solutions
Database Sharding
class ChatSharding:
def __init__(self):
self.num_shards = 1000
def get_chat_shard(self, chat_id):
return chat_id % self.num_shards
def get_user_shard(self, user_id):
return user_id % self.num_shards
def shard_messages_by_time(self, timestamp):
# Partition messages by month
return f"messages_{timestamp.strftime('%Y_%m')}"
Next Steps
Nội dung này sẽ được mở rộng thêm với: - Voice và video calling architecture - Advanced encryption protocols - Multi-device synchronization - Offline message handling - Chat backup và restore