Case Study: E-commerce Platform (như Amazon/Shopify)
Tổng Quan
E-commerce Platform là hệ thống phức tạp quản lý products, orders, payments, inventory và shipping cho millions users và merchants.
Requirements
Functional Requirements
1. Product catalog và search
2. Shopping cart và checkout
3. Payment processing
4. Order management
5. Inventory management
6. User accounts và seller accounts
Non-Functional Requirements
- Scale: 100M products, 10M daily orders
- Peak traffic: 1M concurrent users
- Availability: 99.99% (especially during sales)
- Consistency: Strong consistency for payments/inventory
- Global presence: Multi-region deployment
System Architecture
Microservices Design
User Service → Authentication, profiles
Product Service → Catalog, search, reviews
Cart Service → Shopping cart management
Order Service → Order processing, tracking
Payment Service → Payment processing
Inventory Service → Stock management
Notification Service → Email, SMS alerts
Database Design
-- Products
CREATE TABLE products (
product_id BIGINT PRIMARY KEY,
seller_id BIGINT,
name VARCHAR(255),
description TEXT,
price DECIMAL(10,2),
category_id INT,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
-- Orders
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
total_amount DECIMAL(10,2),
status ENUM('pending', 'confirmed', 'shipped', 'delivered', 'cancelled'),
created_at TIMESTAMP,
shipping_address JSON
);
-- Order Items
CREATE TABLE order_items (
order_id BIGINT,
product_id BIGINT,
quantity INT,
unit_price DECIMAL(10,2),
PRIMARY KEY (order_id, product_id)
);
-- Inventory
CREATE TABLE inventory (
product_id BIGINT PRIMARY KEY,
available_quantity INT,
reserved_quantity INT,
warehouse_id INT,
last_updated TIMESTAMP
);
Shopping Cart System
class ShoppingCart:
def __init__(self, user_id):
self.user_id = user_id
self.redis = Redis()
self.cart_key = f"cart:{user_id}"
def add_item(self, product_id, quantity):
# Check inventory availability
if not self.check_availability(product_id, quantity):
raise InsufficientInventoryError()
# Add to cart
self.redis.hset(self.cart_key, product_id, quantity)
self.redis.expire(self.cart_key, 86400) # 24 hours
def get_cart_items(self):
cart_data = self.redis.hgetall(self.cart_key)
items = []
for product_id, quantity in cart_data.items():
product = self.get_product_details(product_id)
items.append({
'product': product,
'quantity': int(quantity),
'subtotal': product.price * int(quantity)
})
return items
Order Processing
class OrderProcessor:
def __init__(self):
self.payment_service = PaymentService()
self.inventory_service = InventoryService()
self.notification_service = NotificationService()
async def process_order(self, user_id, cart_items, payment_method, shipping_address):
try:
# Start distributed transaction
transaction_id = self.start_transaction()
# Reserve inventory
await self.inventory_service.reserve_items(cart_items, transaction_id)
# Process payment
payment_result = await self.payment_service.charge(
payment_method,
self.calculate_total(cart_items),
transaction_id
)
if payment_result.success:
# Create order
order = await self.create_order(user_id, cart_items, shipping_address)
# Commit inventory changes
await self.inventory_service.commit_reservation(transaction_id)
# Send notifications
await self.notification_service.send_order_confirmation(order)
# Clear cart
await self.clear_cart(user_id)
return order
else:
# Rollback on payment failure
await self.inventory_service.rollback_reservation(transaction_id)
raise PaymentFailedException()
except Exception as e:
# Rollback all changes
await self.rollback_transaction(transaction_id)
raise e
Inventory Management
class InventoryService:
def __init__(self):
self.db = Database()
self.cache = Redis()
async def reserve_items(self, items, transaction_id):
for item in items:
current_stock = await self.get_available_stock(item.product_id)
if current_stock < item.quantity:
raise InsufficientStockError(f"Only {current_stock} available")
# Atomic reservation
await self.db.execute("""
UPDATE inventory
SET available_quantity = available_quantity - %s,
reserved_quantity = reserved_quantity + %s
WHERE product_id = %s AND available_quantity >= %s
""", [item.quantity, item.quantity, item.product_id, item.quantity])
# Cache invalidation
await self.cache.delete(f"stock:{item.product_id}")
async def get_available_stock(self, product_id):
# Check cache first
cached_stock = await self.cache.get(f"stock:{product_id}")
if cached_stock:
return int(cached_stock)
# Fetch from database
stock = await self.db.fetchone(
"SELECT available_quantity FROM inventory WHERE product_id = %s",
[product_id]
)
# Cache for 5 minutes
await self.cache.setex(f"stock:{product_id}", 300, stock)
return stock
Payment Processing
class PaymentService:
def __init__(self):
self.stripe = StripeClient()
self.paypal = PayPalClient()
self.wallet_service = WalletService()
async def charge(self, payment_method, amount, transaction_id):
if payment_method.type == 'credit_card':
return await self.process_card_payment(payment_method, amount)
elif payment_method.type == 'paypal':
return await self.process_paypal_payment(payment_method, amount)
elif payment_method.type == 'wallet':
return await self.process_wallet_payment(payment_method, amount)
async def process_card_payment(self, card_info, amount):
try:
charge = await self.stripe.charges.create(
amount=int(amount * 100), # Convert to cents
currency='usd',
source=card_info.token,
description='E-commerce purchase'
)
return PaymentResult(
success=True,
transaction_id=charge.id,
amount=amount
)
except stripe.error.CardError as e:
return PaymentResult(
success=False,
error_message=str(e)
)
Search và Recommendation
class ProductSearch:
def __init__(self):
self.elasticsearch = ElasticsearchClient()
self.recommendation_engine = RecommendationEngine()
async def search_products(self, query, filters=None, user_id=None):
# Build search query
search_body = {
'query': {
'multi_match': {
'query': query,
'fields': ['name^2', 'description', 'category']
}
},
'size': 50
}
# Apply filters
if filters:
search_body['query'] = {
'bool': {
'must': [search_body['query']],
'filter': self.build_filters(filters)
}
}
results = await self.elasticsearch.search(
index='products',
body=search_body
)
# Personalize results if user is logged in
if user_id:
results = await self.personalize_results(results, user_id)
return results['hits']['hits']
Caching Strategy
class EcommerceCaching:
def __init__(self):
self.redis = Redis()
self.cdn = CDN()
def cache_product_details(self, product_id, product_data):
# Cache product details for 1 hour
self.redis.setex(f"product:{product_id}", 3600, json.dumps(product_data))
def cache_category_products(self, category_id, products):
# Cache category listings for 30 minutes
self.redis.setex(f"category:{category_id}", 1800, json.dumps(products))
def cache_user_recommendations(self, user_id, recommendations):
# Cache personal recommendations for 6 hours
self.redis.setex(f"recommendations:{user_id}", 21600, json.dumps(recommendations))
Performance Optimization
class PerformanceOptimizer:
def optimize_product_images(self, product_id):
# Generate multiple image sizes
sizes = ['thumbnail', 'medium', 'large', 'zoom']
for size in sizes:
optimized_image = self.image_processor.resize_and_compress(
product_id, size
)
# Upload to CDN
self.cdn.upload(f"products/{product_id}/{size}.jpg", optimized_image)
def preload_popular_products(self):
# Identify trending products
popular_products = self.analytics.get_trending_products()
# Preload to cache
for product in popular_products:
self.cache_product_details(product.id, product.to_dict())
Next Steps
Nội dung này sẽ được mở rộng thêm với: - Advanced fraud detection - Multi-tenant architecture cho marketplaces - Real-time analytics dashboard - Supply chain integration - International payment methods