Case Study: E-commerce Platform (như Amazon/Shopify)

Tổng Quan

E-commerce Platform là hệ thống phức tạp quản lý products, orders, payments, inventory và shipping cho millions users và merchants.

Requirements

Functional Requirements

1. Product catalog và search
2. Shopping cart và checkout
3. Payment processing
4. Order management
5. Inventory management
6. User accounts và seller accounts

Non-Functional Requirements

- Scale: 100M products, 10M daily orders
- Peak traffic: 1M concurrent users
- Availability: 99.99% (especially during sales)
- Consistency: Strong consistency for payments/inventory
- Global presence: Multi-region deployment

System Architecture

Microservices Design

User Service → Authentication, profiles
Product Service → Catalog, search, reviews
Cart Service → Shopping cart management
Order Service → Order processing, tracking
Payment Service → Payment processing
Inventory Service → Stock management
Notification Service → Email, SMS alerts

Database Design

-- Products
CREATE TABLE products (
    product_id BIGINT PRIMARY KEY,
    seller_id BIGINT,
    name VARCHAR(255),
    description TEXT,
    price DECIMAL(10,2),
    category_id INT,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);

-- Orders
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    total_amount DECIMAL(10,2),
    status ENUM('pending', 'confirmed', 'shipped', 'delivered', 'cancelled'),
    created_at TIMESTAMP,
    shipping_address JSON
);

-- Order Items
CREATE TABLE order_items (
    order_id BIGINT,
    product_id BIGINT,
    quantity INT,
    unit_price DECIMAL(10,2),
    PRIMARY KEY (order_id, product_id)
);

-- Inventory
CREATE TABLE inventory (
    product_id BIGINT PRIMARY KEY,
    available_quantity INT,
    reserved_quantity INT,
    warehouse_id INT,
    last_updated TIMESTAMP
);

Shopping Cart System

class ShoppingCart:
    def __init__(self, user_id):
        self.user_id = user_id
        self.redis = Redis()
        self.cart_key = f"cart:{user_id}"

    def add_item(self, product_id, quantity):
        # Check inventory availability
        if not self.check_availability(product_id, quantity):
            raise InsufficientInventoryError()

        # Add to cart
        self.redis.hset(self.cart_key, product_id, quantity)
        self.redis.expire(self.cart_key, 86400)  # 24 hours

    def get_cart_items(self):
        cart_data = self.redis.hgetall(self.cart_key)
        items = []

        for product_id, quantity in cart_data.items():
            product = self.get_product_details(product_id)
            items.append({
                'product': product,
                'quantity': int(quantity),
                'subtotal': product.price * int(quantity)
            })

        return items

Order Processing

class OrderProcessor:
    def __init__(self):
        self.payment_service = PaymentService()
        self.inventory_service = InventoryService()
        self.notification_service = NotificationService()

    async def process_order(self, user_id, cart_items, payment_method, shipping_address):
        try:
            # Start distributed transaction
            transaction_id = self.start_transaction()

            # Reserve inventory
            await self.inventory_service.reserve_items(cart_items, transaction_id)

            # Process payment
            payment_result = await self.payment_service.charge(
                payment_method, 
                self.calculate_total(cart_items),
                transaction_id
            )

            if payment_result.success:
                # Create order
                order = await self.create_order(user_id, cart_items, shipping_address)

                # Commit inventory changes
                await self.inventory_service.commit_reservation(transaction_id)

                # Send notifications
                await self.notification_service.send_order_confirmation(order)

                # Clear cart
                await self.clear_cart(user_id)

                return order
            else:
                # Rollback on payment failure
                await self.inventory_service.rollback_reservation(transaction_id)
                raise PaymentFailedException()

        except Exception as e:
            # Rollback all changes
            await self.rollback_transaction(transaction_id)
            raise e

Inventory Management

class InventoryService:
    def __init__(self):
        self.db = Database()
        self.cache = Redis()

    async def reserve_items(self, items, transaction_id):
        for item in items:
            current_stock = await self.get_available_stock(item.product_id)

            if current_stock < item.quantity:
                raise InsufficientStockError(f"Only {current_stock} available")

            # Atomic reservation
            await self.db.execute("""
                UPDATE inventory 
                SET available_quantity = available_quantity - %s,
                    reserved_quantity = reserved_quantity + %s
                WHERE product_id = %s AND available_quantity >= %s
            """, [item.quantity, item.quantity, item.product_id, item.quantity])

            # Cache invalidation
            await self.cache.delete(f"stock:{item.product_id}")

    async def get_available_stock(self, product_id):
        # Check cache first
        cached_stock = await self.cache.get(f"stock:{product_id}")
        if cached_stock:
            return int(cached_stock)

        # Fetch from database
        stock = await self.db.fetchone(
            "SELECT available_quantity FROM inventory WHERE product_id = %s",
            [product_id]
        )

        # Cache for 5 minutes
        await self.cache.setex(f"stock:{product_id}", 300, stock)
        return stock

Payment Processing

class PaymentService:
    def __init__(self):
        self.stripe = StripeClient()
        self.paypal = PayPalClient()
        self.wallet_service = WalletService()

    async def charge(self, payment_method, amount, transaction_id):
        if payment_method.type == 'credit_card':
            return await self.process_card_payment(payment_method, amount)
        elif payment_method.type == 'paypal':
            return await self.process_paypal_payment(payment_method, amount)
        elif payment_method.type == 'wallet':
            return await self.process_wallet_payment(payment_method, amount)

    async def process_card_payment(self, card_info, amount):
        try:
            charge = await self.stripe.charges.create(
                amount=int(amount * 100),  # Convert to cents
                currency='usd',
                source=card_info.token,
                description='E-commerce purchase'
            )

            return PaymentResult(
                success=True,
                transaction_id=charge.id,
                amount=amount
            )
        except stripe.error.CardError as e:
            return PaymentResult(
                success=False,
                error_message=str(e)
            )

Search và Recommendation

class ProductSearch:
    def __init__(self):
        self.elasticsearch = ElasticsearchClient()
        self.recommendation_engine = RecommendationEngine()

    async def search_products(self, query, filters=None, user_id=None):
        # Build search query
        search_body = {
            'query': {
                'multi_match': {
                    'query': query,
                    'fields': ['name^2', 'description', 'category']
                }
            },
            'size': 50
        }

        # Apply filters
        if filters:
            search_body['query'] = {
                'bool': {
                    'must': [search_body['query']],
                    'filter': self.build_filters(filters)
                }
            }

        results = await self.elasticsearch.search(
            index='products',
            body=search_body
        )

        # Personalize results if user is logged in
        if user_id:
            results = await self.personalize_results(results, user_id)

        return results['hits']['hits']

Caching Strategy

class EcommerceCaching:
    def __init__(self):
        self.redis = Redis()
        self.cdn = CDN()

    def cache_product_details(self, product_id, product_data):
        # Cache product details for 1 hour
        self.redis.setex(f"product:{product_id}", 3600, json.dumps(product_data))

    def cache_category_products(self, category_id, products):
        # Cache category listings for 30 minutes
        self.redis.setex(f"category:{category_id}", 1800, json.dumps(products))

    def cache_user_recommendations(self, user_id, recommendations):
        # Cache personal recommendations for 6 hours
        self.redis.setex(f"recommendations:{user_id}", 21600, json.dumps(recommendations))

Performance Optimization

class PerformanceOptimizer:
    def optimize_product_images(self, product_id):
        # Generate multiple image sizes
        sizes = ['thumbnail', 'medium', 'large', 'zoom']

        for size in sizes:
            optimized_image = self.image_processor.resize_and_compress(
                product_id, size
            )

            # Upload to CDN
            self.cdn.upload(f"products/{product_id}/{size}.jpg", optimized_image)

    def preload_popular_products(self):
        # Identify trending products
        popular_products = self.analytics.get_trending_products()

        # Preload to cache
        for product in popular_products:
            self.cache_product_details(product.id, product.to_dict())

Next Steps

Nội dung này sẽ được mở rộng thêm với: - Advanced fraud detection - Multi-tenant architecture cho marketplaces - Real-time analytics dashboard - Supply chain integration - International payment methods