Microservices Architecture - Distributed System Design
Tổng quan
Microservices architecture decomposes applications into independent, loosely coupled services. Each service owns its data và business logic.
Core Principles
Single Responsibility
# Good: Focused service
class UserService:
def create_user(self, user_data): pass
def authenticate_user(self, credentials): pass
def update_profile(self, user_id, data): pass
class PaymentService:
def process_payment(self, payment_data): pass
def refund_payment(self, payment_id): pass
def get_payment_status(self, payment_id): pass
Database per Service
# Each service owns its data
user-service:
database: user_db
tables: [users, profiles, preferences]
payment-service:
database: payment_db
tables: [payments, transactions, refunds]
order-service:
database: order_db
tables: [orders, order_items, shipping]
Service Communication
Synchronous Communication
# REST API communication
import requests
class OrderService:
def create_order(self, order_data):
# Validate user
user_response = requests.get(f"http://user-service/users/{order_data['user_id']}")
if user_response.status_code != 200:
raise UserNotFoundError()
# Check inventory
inventory_response = requests.post(
"http://inventory-service/check",
json=order_data['items']
)
if not inventory_response.json()['available']:
raise InsufficientInventoryError()
# Create order
order = self.create_order_record(order_data)
# Process payment
payment_response = requests.post(
"http://payment-service/charge",
json={'amount': order.total, 'user_id': order_data['user_id']}
)
return order
Asynchronous Communication
# Event-driven communication
import pika
class OrderService:
def create_order(self, order_data):
order = self.create_order_record(order_data)
# Publish event
self.event_bus.publish('order.created', {
'order_id': order.id,
'user_id': order.user_id,
'items': order.items,
'total': order.total
})
return order
class PaymentService:
def handle_order_created(self, event_data):
try:
payment = self.process_payment(event_data)
self.event_bus.publish('payment.processed', {
'order_id': event_data['order_id'],
'payment_id': payment.id,
'status': 'success'
})
except PaymentError as e:
self.event_bus.publish('payment.failed', {
'order_id': event_data['order_id'],
'error': str(e)
})
Service Discovery
Client-Side Discovery
class ServiceRegistry:
def __init__(self):
self.services = {}
def register(self, service_name, instances):
self.services[service_name] = instances
def discover(self, service_name):
instances = self.services.get(service_name, [])
if not instances:
raise ServiceNotFoundError(service_name)
return random.choice(instances) # Simple load balancing
class OrderService:
def __init__(self, service_registry):
self.registry = service_registry
def call_user_service(self, user_id):
user_service_url = self.registry.discover('user-service')
return requests.get(f"{user_service_url}/users/{user_id}")
Server-Side Discovery
# Load balancer configuration
upstream user-service {
server user-service-1:8080;
server user-service-2:8080;
server user-service-3:8080;
}
upstream payment-service {
server payment-service-1:8080;
server payment-service-2:8080;
}
server {
location /api/users/ {
proxy_pass http://user-service;
}
location /api/payments/ {
proxy_pass http://payment-service;
}
}
Data Management
Saga Pattern
class OrderSaga:
def __init__(self):
self.steps = [
('reserve_inventory', 'release_inventory'),
('process_payment', 'refund_payment'),
('create_order', 'cancel_order'),
('send_confirmation', 'send_cancellation')
]
def execute(self, order_data):
completed_steps = []
try:
for step, compensation in self.steps:
result = self.execute_step(step, order_data)
completed_steps.append((step, compensation, result))
except Exception as e:
# Compensating actions
for step, compensation, result in reversed(completed_steps):
try:
self.execute_step(compensation, result)
except:
self.log_compensation_failure(step, compensation)
raise e
return result
CQRS (Command Query Responsibility Segregation)
# Write side (Commands)
class OrderCommandHandler:
def handle_create_order(self, command):
order = Order(
user_id=command.user_id,
items=command.items
)
self.order_repository.save(order)
# Publish event
self.event_bus.publish('order.created', order.to_event())
# Read side (Queries)
class OrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db # Optimized for queries
def get_user_orders(self, user_id):
return self.read_db.query("""
SELECT o.*, u.name, p.status
FROM orders_view o
JOIN users u ON o.user_id = u.id
LEFT JOIN payments p ON o.id = p.order_id
WHERE o.user_id = %s
""", [user_id])
# Event handler to update read model
class OrderProjectionHandler:
def handle_order_created(self, event):
self.read_db.insert_order_view(event.data)
def handle_payment_processed(self, event):
self.read_db.update_order_payment_status(
event.order_id,
event.status
)
Circuit Breaker Pattern
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise CircuitBreakerOpenError()
try:
result = func(*args, **kwargs)
self.reset()
return result
except Exception as e:
self.record_failure()
raise e
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
def reset(self):
self.failure_count = 0
self.state = 'CLOSED'
# Usage
class PaymentService:
def __init__(self):
self.circuit_breaker = CircuitBreaker()
def process_payment(self, payment_data):
return self.circuit_breaker.call(
self._call_payment_gateway,
payment_data
)
API Gateway
class APIGateway:
def __init__(self):
self.routes = {
'/api/users/*': 'user-service',
'/api/orders/*': 'order-service',
'/api/payments/*': 'payment-service'
}
self.auth_service = AuthService()
def handle_request(self, request):
# Authentication
if not self.auth_service.validate_token(request.headers.get('Authorization')):
return Response(status=401)
# Rate limiting
if not self.rate_limiter.allow(request.client_ip):
return Response(status=429)
# Route to appropriate service
service = self.find_service(request.path)
# Load balancing
instance = self.load_balancer.get_instance(service)
# Request transformation
transformed_request = self.transform_request(request)
# Forward request
response = self.forward_request(instance, transformed_request)
# Response transformation
return self.transform_response(response)
Deployment Patterns
Blue-Green Deployment
# Blue environment (current production)
blue-user-service:
replicas: 3
image: user-service:v1.0
blue-order-service:
replicas: 3
image: order-service:v1.0
# Green environment (new version)
green-user-service:
replicas: 3
image: user-service:v1.1
green-order-service:
replicas: 3
image: order-service:v1.1
# Switch traffic
load-balancer:
target: green # Switch from blue to green
Canary Deployment
# Production version (90% traffic)
user-service-stable:
replicas: 9
image: user-service:v1.0
weight: 90
# Canary version (10% traffic)
user-service-canary:
replicas: 1
image: user-service:v1.1
weight: 10
Monitoring & Observability
Distributed Tracing
import opentracing
class OrderService:
def create_order(self, order_data, span_context=None):
span = opentracing.tracer.start_span(
'create_order',
child_of=span_context
)
span.set_tag('user_id', order_data['user_id'])
try:
# Call user service
user_span = opentracing.tracer.start_span(
'validate_user',
child_of=span
)
user = self.user_service.get_user(order_data['user_id'])
user_span.finish()
# Call payment service
payment_span = opentracing.tracer.start_span(
'process_payment',
child_of=span
)
payment = self.payment_service.charge(order_data)
payment_span.finish()
order = self.create_order_record(order_data)
span.set_tag('order_id', order.id)
return order
except Exception as e:
span.set_tag('error', True)
span.log_kv({'error.kind': type(e).__name__, 'message': str(e)})
raise
finally:
span.finish()
Best Practices
Service Design
- Keep services focused và cohesive
- Design for failure
- Make services stateless
- Use async communication when possible
- Implement proper monitoring
Data Management
- Each service owns its data
- Use event sourcing for audit trails
- Implement compensation transactions
- Cache frequently accessed data
- Plan for data consistency
Security
- Implement service-to-service authentication
- Use API gateway for external access
- Encrypt inter-service communication
- Implement proper authorization
- Regular security audits
Common Anti-Patterns
Distributed Monolith
# Anti-pattern: Tight coupling
class OrderService:
def create_order(self, order_data):
# Synchronous calls to multiple services
user = self.user_service.get_user_sync(order_data['user_id'])
inventory = self.inventory_service.reserve_sync(order_data['items'])
payment = self.payment_service.charge_sync(order_data)
# All services must be available for order to succeed
return self.create_order_record(order_data)
Chatty Communication
# Anti-pattern: Too many service calls
class OrderService:
def get_order_details(self, order_id):
order = self.get_order(order_id)
user = self.user_service.get_user(order.user_id) # Call 1
items = []
for item_id in order.item_ids:
item = self.catalog_service.get_item(item_id) # Call per item
items.append(item)
payment = self.payment_service.get_payment(order.payment_id) # Call N+2
return {
'order': order,
'user': user,
'items': items,
'payment': payment
}
Next Steps
- 📚 Học về Event-Driven Architecture
- 🎯 Practice service decomposition
- 🏗️ Explore container orchestration
- 💻 Setup distributed monitoring
Content sẽ được expand với detailed microservices implementation patterns.