Case Study: Video Streaming Platform (như YouTube/Netflix)

Tổng Quan

Video Streaming Platform cung cấp video content cho millions users worldwide với high quality và low latency.

Requirements

Functional Requirements

1. Video upload và encoding
2. Video streaming playback
3. User authentication và profiles
4. Video search và recommendations
5. Comments và social features

Non-Functional Requirements

- Scale: 1B users, 10M concurrent viewers
- Storage: 500PB video content
- Bandwidth: 100Gbps aggregate
- Latency: < 2 seconds start time
- Availability: 99.9%

Video Processing Pipeline

Upload và Encoding

class VideoProcessor:
    def __init__(self):
        self.encoding_queue = MessageQueue()
        self.storage = S3Storage()

    async def process_upload(self, video_file, user_id):
        # Upload raw video
        raw_url = await self.storage.upload(video_file)

        # Queue for encoding
        job = {
            'video_id': uuid.uuid4(),
            'raw_url': raw_url,
            'user_id': user_id,
            'resolutions': ['360p', '720p', '1080p', '4K']
        }

        await self.encoding_queue.send(job)
        return job['video_id']

    def encode_video(self, job):
        for resolution in job['resolutions']:
            encoded_video = self.transcode(job['raw_url'], resolution)
            encoded_url = self.storage.upload(encoded_video)

            # Update video metadata
            self.update_video_metadata(job['video_id'], resolution, encoded_url)

Content Delivery Network

class CDNManager:
    def __init__(self):
        self.edge_servers = self.discover_edge_servers()

    def get_streaming_url(self, video_id, user_location):
        # Find closest edge server
        edge_server = self.find_closest_edge(user_location)

        # Generate streaming URL
        base_url = f"https://{edge_server}/stream/{video_id}"

        # Add adaptive bitrate parameters
        return f"{base_url}?quality=auto&user_id={user_id}"

Streaming Architecture

Adaptive Bitrate Streaming

class AdaptiveStreaming:
    def __init__(self):
        self.quality_levels = {
            '240p': {'bitrate': 500, 'resolution': '426x240'},
            '360p': {'bitrate': 800, 'resolution': '640x360'},
            '720p': {'bitrate': 2500, 'resolution': '1280x720'},
            '1080p': {'bitrate': 5000, 'resolution': '1920x1080'}
        }

    def select_quality(self, bandwidth, device_capabilities):
        suitable_qualities = []

        for quality, specs in self.quality_levels.items():
            if (specs['bitrate'] <= bandwidth and 
                self.device_supports(device_capabilities, specs['resolution'])):
                suitable_qualities.append(quality)

        return max(suitable_qualities) if suitable_qualities else '240p'

Video Database Design

CREATE TABLE videos (
    video_id VARCHAR(36) PRIMARY KEY,
    user_id BIGINT,
    title VARCHAR(255),
    description TEXT,
    duration_seconds INT,
    upload_date TIMESTAMP,
    view_count BIGINT DEFAULT 0,
    like_count INT DEFAULT 0,
    dislike_count INT DEFAULT 0
);

CREATE TABLE video_files (
    video_id VARCHAR(36),
    quality ENUM('240p', '360p', '720p', '1080p', '4K'),
    file_url TEXT,
    file_size BIGINT,
    bitrate INT,
    PRIMARY KEY (video_id, quality)
);

Recommendation System

class VideoRecommendation:
    def __init__(self):
        self.ml_model = self.load_recommendation_model()

    def get_recommendations(self, user_id, num_recommendations=20):
        # Get user viewing history
        user_history = self.get_user_history(user_id)

        # Get user preferences
        preferences = self.extract_preferences(user_history)

        # Generate candidates
        candidates = self.get_candidate_videos(preferences)

        # Score and rank
        scored_videos = []
        for video in candidates:
            score = self.calculate_relevance_score(user_id, video)
            scored_videos.append((video, score))

        return sorted(scored_videos, key=lambda x: x[1], reverse=True)[:num_recommendations]

Caching Strategy

class VideoCache:
    def __init__(self):
        self.hot_cache = Redis()  # Popular videos
        self.cdn_cache = CDN()    # Edge caching

    def cache_popular_videos(self):
        # Cache top 1% most viewed videos
        popular_videos = self.get_trending_videos()

        for video in popular_videos:
            # Pre-load to edge servers
            self.cdn_cache.prefetch(video.streaming_urls)

            # Cache metadata
            self.hot_cache.set(f"video:{video.id}", video.metadata)

Analytics và Monitoring

class StreamingAnalytics:
    def track_viewing_session(self, user_id, video_id, events):
        metrics = {
            'user_id': user_id,
            'video_id': video_id,
            'start_time': events['play'],
            'buffer_events': events['buffer_count'],
            'quality_changes': events['quality_switches'],
            'completion_rate': events['watch_duration'] / events['video_duration']
        }

        self.analytics_pipeline.send(metrics)

Next Steps

Nội dung này sẽ được mở rộng thêm với: - Live streaming architecture - Global content distribution - DRM và content protection - Real-time analytics dashboard