Case Study: Chat System (như WhatsApp/Slack)

Tổng Quan

Chat System là ứng dụng real-time messaging cho phép users giao tiếp qua text, voice, video. Yêu cầu low latency, high availability và scalability cao.

Requirements

Functional Requirements

1. One-on-one messaging
2. Group chats (max 500 members)
3. Online/offline status
4. Message delivery status (sent, delivered, read)
5. File sharing (images, documents)
6. Message history và search
7. Push notifications

Non-Functional Requirements

- Scale: 1B users, 100M concurrent connections
- Message volume: 10B messages/day
- Latency: < 100ms message delivery
- Availability: 99.99%
- Message retention: 7 years
- End-to-end encryption

Capacity Estimation

Storage Requirements

class ChatSystemCapacity:
    def __init__(self):
        self.daily_active_users = 100_000_000
        self.messages_per_user_per_day = 50
        self.avg_message_size = 100  # bytes
        self.media_message_ratio = 0.2
        self.avg_media_size = 2 * 1024 * 1024  # 2MB

    def daily_storage_requirement(self):
        total_messages = self.daily_active_users * self.messages_per_user_per_day

        text_messages = total_messages * (1 - self.media_message_ratio)
        media_messages = total_messages * self.media_message_ratio

        text_storage = text_messages * self.avg_message_size
        media_storage = media_messages * self.avg_media_size

        return text_storage + media_storage  # bytes per day

Traffic Estimation

Messages per day: 100M users * 50 messages = 5B messages/day
Peak QPS: 5B / (24 * 3600) * 3 = ~174K messages/second

Concurrent connections: 100M users
WebSocket connections: 100M persistent connections

Database Design

Message Storage

-- Messages table (partitioned by date)
CREATE TABLE messages (
    message_id BIGINT PRIMARY KEY,
    chat_id BIGINT NOT NULL,
    sender_id BIGINT NOT NULL,
    content TEXT,
    message_type ENUM('text', 'image', 'file', 'voice'),
    media_url TEXT,
    created_at TIMESTAMP,
    updated_at TIMESTAMP,
    is_deleted BOOLEAN DEFAULT FALSE
) PARTITION BY RANGE (created_at);

-- Chats table
CREATE TABLE chats (
    chat_id BIGINT PRIMARY KEY,
    chat_type ENUM('one_on_one', 'group'),
    chat_name VARCHAR(100),
    created_by BIGINT,
    created_at TIMESTAMP,
    last_message_at TIMESTAMP,
    member_count INT DEFAULT 0
);

-- Chat members
CREATE TABLE chat_members (
    chat_id BIGINT,
    user_id BIGINT,
    joined_at TIMESTAMP,
    role ENUM('admin', 'member'),
    last_read_message_id BIGINT,
    PRIMARY KEY (chat_id, user_id)
);

-- Message delivery status
CREATE TABLE message_status (
    message_id BIGINT,
    user_id BIGINT,
    status ENUM('sent', 'delivered', 'read'),
    timestamp TIMESTAMP,
    PRIMARY KEY (message_id, user_id)
);

Real-time Architecture

WebSocket Management

class WebSocketManager:
    def __init__(self):
        self.connections = {}  # user_id -> websocket
        self.user_servers = {}  # user_id -> server_id

    async def handle_connection(self, websocket, user_id):
        self.connections[user_id] = websocket
        self.user_servers[user_id] = self.server_id

        # Update user status to online
        await self.update_user_status(user_id, 'online')

        try:
            async for message in websocket:
                await self.handle_message(user_id, message)
        except websockets.exceptions.ConnectionClosed:
            pass
        finally:
            await self.cleanup_connection(user_id)

    async def send_message(self, recipient_id, message):
        if recipient_id in self.connections:
            # User connected to this server
            await self.connections[recipient_id].send(json.dumps(message))
        else:
            # User connected to different server
            await self.forward_to_server(recipient_id, message)

Message Delivery

class MessageDelivery:
    def __init__(self):
        self.message_queue = MessageQueue()
        self.websocket_manager = WebSocketManager()

    async def send_message(self, chat_id, sender_id, content):
        # Create message record
        message = await self.create_message(chat_id, sender_id, content)

        # Get chat members
        members = await self.get_chat_members(chat_id)

        # Send to online members via WebSocket
        online_members = []
        offline_members = []

        for member_id in members:
            if await self.is_user_online(member_id):
                await self.websocket_manager.send_message(member_id, {
                    'type': 'new_message',
                    'message': message.to_dict()
                })
                online_members.append(member_id)
            else:
                offline_members.append(member_id)

        # Queue push notifications for offline users
        if offline_members:
            await self.queue_push_notifications(message, offline_members)

        # Update delivery status
        await self.update_delivery_status(message.id, online_members, 'delivered')

        return message

Message Routing

Server Discovery

class MessageRouter:
    def __init__(self):
        self.service_discovery = ServiceDiscovery()
        self.load_balancer = LoadBalancer()

    async def route_message(self, recipient_id, message):
        # Find which server the user is connected to
        server_id = await self.find_user_server(recipient_id)

        if server_id:
            # Forward message to appropriate server
            await self.forward_message(server_id, recipient_id, message)
        else:
            # User is offline, store for later delivery
            await self.store_offline_message(recipient_id, message)

    async def find_user_server(self, user_id):
        # Check Redis for user -> server mapping
        server_id = await self.redis.get(f"user_server:{user_id}")
        return server_id

Message Ordering

class MessageOrdering:
    def __init__(self):
        self.sequence_generators = {}  # chat_id -> sequence

    def get_next_sequence(self, chat_id):
        if chat_id not in self.sequence_generators:
            self.sequence_generators[chat_id] = 0

        self.sequence_generators[chat_id] += 1
        return self.sequence_generators[chat_id]

    def ensure_message_order(self, chat_id, messages):
        # Sort messages by sequence number
        return sorted(messages, key=lambda m: m.sequence_number)

Group Chat Optimization

Fan-out Strategy

class GroupChatHandler:
    def __init__(self):
        self.max_group_size = 500

    async def send_group_message(self, chat_id, sender_id, content):
        members = await self.get_active_members(chat_id)

        if len(members) <= 100:
            # Small group: immediate fan-out
            await self.immediate_fanout(chat_id, members, content)
        else:
            # Large group: async fan-out
            await self.async_fanout(chat_id, members, content)

    async def immediate_fanout(self, chat_id, members, content):
        tasks = []
        for member_id in members:
            task = asyncio.create_task(
                self.send_to_member(member_id, content)
            )
            tasks.append(task)

        await asyncio.gather(*tasks)

    async def async_fanout(self, chat_id, members, content):
        # Batch members into chunks
        batch_size = 50
        for i in range(0, len(members), batch_size):
            batch = members[i:i + batch_size]

            # Send batch to background worker
            await self.queue_batch_delivery(chat_id, batch, content)

Push Notifications

Notification Service

class PushNotificationService:
    def __init__(self):
        self.apns = APNSClient()  # iOS
        self.fcm = FCMClient()   # Android

    async def send_notification(self, user_id, message):
        user_devices = await self.get_user_devices(user_id)

        for device in user_devices:
            if device.platform == 'ios':
                await self.send_ios_notification(device.token, message)
            elif device.platform == 'android':
                await self.send_android_notification(device.token, message)

    async def send_ios_notification(self, device_token, message):
        payload = {
            'aps': {
                'alert': {
                    'title': f"New message from {message.sender_name}",
                    'body': message.content[:100]  # Truncate long messages
                },
                'badge': await self.get_unread_count(message.recipient_id),
                'sound': 'default'
            },
            'chat_id': message.chat_id,
            'message_id': message.id
        }

        await self.apns.send(device_token, payload)

Encryption

End-to-End Encryption

class E2EEncryption:
    def __init__(self):
        self.key_exchange = DiffieHellmanKeyExchange()

    def encrypt_message(self, message, recipient_public_key):
        # Generate symmetric key for this message
        symmetric_key = self.generate_symmetric_key()

        # Encrypt message with symmetric key
        encrypted_message = AES.encrypt(message, symmetric_key)

        # Encrypt symmetric key with recipient's public key
        encrypted_key = RSA.encrypt(symmetric_key, recipient_public_key)

        return {
            'encrypted_message': encrypted_message,
            'encrypted_key': encrypted_key
        }

    def decrypt_message(self, encrypted_data, private_key):
        # Decrypt symmetric key
        symmetric_key = RSA.decrypt(encrypted_data['encrypted_key'], private_key)

        # Decrypt message
        message = AES.decrypt(encrypted_data['encrypted_message'], symmetric_key)

        return message

Caching Strategy

Message Caching

class MessageCache:
    def __init__(self):
        self.redis = Redis()
        self.cache_ttl = 3600  # 1 hour

    async def cache_recent_messages(self, chat_id, messages):
        cache_key = f"recent_messages:{chat_id}"

        # Store as sorted set with timestamp as score
        for message in messages:
            await self.redis.zadd(
                cache_key,
                {message.to_json(): message.created_at.timestamp()}
            )

        # Keep only last 100 messages
        await self.redis.zremrangebyrank(cache_key, 0, -101)
        await self.redis.expire(cache_key, self.cache_ttl)

    async def get_recent_messages(self, chat_id, limit=50):
        cache_key = f"recent_messages:{chat_id}"
        cached_messages = await self.redis.zrevrange(cache_key, 0, limit-1)

        if cached_messages:
            return [json.loads(msg) for msg in cached_messages]

        # Cache miss: fetch from database
        messages = await self.fetch_from_database(chat_id, limit)
        await self.cache_recent_messages(chat_id, messages)
        return messages

File Sharing

Media Upload

class MediaUploadService:
    def __init__(self):
        self.s3_client = S3Client()
        self.cdn = CDNClient()
        self.max_file_size = 100 * 1024 * 1024  # 100MB

    async def upload_file(self, file_data, user_id, chat_id):
        if len(file_data) > self.max_file_size:
            raise FileTooLargeError("File exceeds maximum size limit")

        # Generate unique filename
        file_id = uuid.uuid4()
        file_extension = self.get_file_extension(file_data)
        filename = f"{chat_id}/{file_id}.{file_extension}"

        # Upload to S3
        s3_url = await self.s3_client.upload(filename, file_data)

        # Generate CDN URL
        cdn_url = await self.cdn.generate_url(filename)

        # Create file record
        file_record = await self.create_file_record(
            file_id, filename, len(file_data), user_id, chat_id
        )

        return {
            'file_id': file_id,
            'cdn_url': cdn_url,
            'file_size': len(file_data)
        }

Monitoring và Analytics

Real-time Metrics

class ChatMetrics:
    def __init__(self):
        self.metrics_collector = MetricsCollector()

    def track_message_delivery(self, message_id, latency):
        self.metrics_collector.histogram(
            'message_delivery_latency',
            latency,
            tags={'message_type': 'text'}
        )

    def track_connection_count(self, server_id, count):
        self.metrics_collector.gauge(
            'active_connections',
            count,
            tags={'server_id': server_id}
        )

    def track_message_throughput(self, messages_per_second):
        self.metrics_collector.gauge(
            'message_throughput',
            messages_per_second
        )

Scalability Solutions

Database Sharding

class ChatSharding:
    def __init__(self):
        self.num_shards = 1000

    def get_chat_shard(self, chat_id):
        return chat_id % self.num_shards

    def get_user_shard(self, user_id):
        return user_id % self.num_shards

    def shard_messages_by_time(self, timestamp):
        # Partition messages by month
        return f"messages_{timestamp.strftime('%Y_%m')}"

Next Steps

Nội dung này sẽ được mở rộng thêm với: - Voice và video calling architecture - Advanced encryption protocols - Multi-device synchronization - Offline message handling - Chat backup và restore