Microservices Architecture - Distributed System Design

Tổng quan

Microservices architecture decomposes applications into independent, loosely coupled services. Each service owns its data và business logic.

Core Principles

Single Responsibility

# Good: Focused service
class UserService:
    def create_user(self, user_data): pass
    def authenticate_user(self, credentials): pass
    def update_profile(self, user_id, data): pass

class PaymentService:
    def process_payment(self, payment_data): pass
    def refund_payment(self, payment_id): pass
    def get_payment_status(self, payment_id): pass

Database per Service

# Each service owns its data
user-service:
  database: user_db
  tables: [users, profiles, preferences]

payment-service:
  database: payment_db
  tables: [payments, transactions, refunds]

order-service:
  database: order_db
  tables: [orders, order_items, shipping]

Service Communication

Synchronous Communication

# REST API communication
import requests

class OrderService:
    def create_order(self, order_data):
        # Validate user
        user_response = requests.get(f"http://user-service/users/{order_data['user_id']}")
        if user_response.status_code != 200:
            raise UserNotFoundError()

        # Check inventory
        inventory_response = requests.post(
            "http://inventory-service/check",
            json=order_data['items']
        )
        if not inventory_response.json()['available']:
            raise InsufficientInventoryError()

        # Create order
        order = self.create_order_record(order_data)

        # Process payment
        payment_response = requests.post(
            "http://payment-service/charge",
            json={'amount': order.total, 'user_id': order_data['user_id']}
        )

        return order

Asynchronous Communication

# Event-driven communication
import pika

class OrderService:
    def create_order(self, order_data):
        order = self.create_order_record(order_data)

        # Publish event
        self.event_bus.publish('order.created', {
            'order_id': order.id,
            'user_id': order.user_id,
            'items': order.items,
            'total': order.total
        })

        return order

class PaymentService:
    def handle_order_created(self, event_data):
        try:
            payment = self.process_payment(event_data)
            self.event_bus.publish('payment.processed', {
                'order_id': event_data['order_id'],
                'payment_id': payment.id,
                'status': 'success'
            })
        except PaymentError as e:
            self.event_bus.publish('payment.failed', {
                'order_id': event_data['order_id'],
                'error': str(e)
            })

Service Discovery

Client-Side Discovery

class ServiceRegistry:
    def __init__(self):
        self.services = {}

    def register(self, service_name, instances):
        self.services[service_name] = instances

    def discover(self, service_name):
        instances = self.services.get(service_name, [])
        if not instances:
            raise ServiceNotFoundError(service_name)
        return random.choice(instances)  # Simple load balancing

class OrderService:
    def __init__(self, service_registry):
        self.registry = service_registry

    def call_user_service(self, user_id):
        user_service_url = self.registry.discover('user-service')
        return requests.get(f"{user_service_url}/users/{user_id}")

Server-Side Discovery

# Load balancer configuration
upstream user-service {
    server user-service-1:8080;
    server user-service-2:8080;
    server user-service-3:8080;
}

upstream payment-service {
    server payment-service-1:8080;
    server payment-service-2:8080;
}

server {
    location /api/users/ {
        proxy_pass http://user-service;
    }

    location /api/payments/ {
        proxy_pass http://payment-service;
    }
}

Data Management

Saga Pattern

class OrderSaga:
    def __init__(self):
        self.steps = [
            ('reserve_inventory', 'release_inventory'),
            ('process_payment', 'refund_payment'),
            ('create_order', 'cancel_order'),
            ('send_confirmation', 'send_cancellation')
        ]

    def execute(self, order_data):
        completed_steps = []

        try:
            for step, compensation in self.steps:
                result = self.execute_step(step, order_data)
                completed_steps.append((step, compensation, result))

        except Exception as e:
            # Compensating actions
            for step, compensation, result in reversed(completed_steps):
                try:
                    self.execute_step(compensation, result)
                except:
                    self.log_compensation_failure(step, compensation)
            raise e

        return result

CQRS (Command Query Responsibility Segregation)

# Write side (Commands)
class OrderCommandHandler:
    def handle_create_order(self, command):
        order = Order(
            user_id=command.user_id,
            items=command.items
        )
        self.order_repository.save(order)

        # Publish event
        self.event_bus.publish('order.created', order.to_event())

# Read side (Queries)
class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db  # Optimized for queries

    def get_user_orders(self, user_id):
        return self.read_db.query("""
            SELECT o.*, u.name, p.status 
            FROM orders_view o
            JOIN users u ON o.user_id = u.id
            LEFT JOIN payments p ON o.id = p.order_id
            WHERE o.user_id = %s
        """, [user_id])

# Event handler to update read model
class OrderProjectionHandler:
    def handle_order_created(self, event):
        self.read_db.insert_order_view(event.data)

    def handle_payment_processed(self, event):
        self.read_db.update_order_payment_status(
            event.order_id, 
            event.status
        )

Circuit Breaker Pattern

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN

    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.timeout:
                self.state = 'HALF_OPEN'
            else:
                raise CircuitBreakerOpenError()

        try:
            result = func(*args, **kwargs)
            self.reset()
            return result

        except Exception as e:
            self.record_failure()
            raise e

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'

    def reset(self):
        self.failure_count = 0
        self.state = 'CLOSED'

# Usage
class PaymentService:
    def __init__(self):
        self.circuit_breaker = CircuitBreaker()

    def process_payment(self, payment_data):
        return self.circuit_breaker.call(
            self._call_payment_gateway,
            payment_data
        )

API Gateway

class APIGateway:
    def __init__(self):
        self.routes = {
            '/api/users/*': 'user-service',
            '/api/orders/*': 'order-service',
            '/api/payments/*': 'payment-service'
        }
        self.auth_service = AuthService()

    def handle_request(self, request):
        # Authentication
        if not self.auth_service.validate_token(request.headers.get('Authorization')):
            return Response(status=401)

        # Rate limiting
        if not self.rate_limiter.allow(request.client_ip):
            return Response(status=429)

        # Route to appropriate service
        service = self.find_service(request.path)

        # Load balancing
        instance = self.load_balancer.get_instance(service)

        # Request transformation
        transformed_request = self.transform_request(request)

        # Forward request
        response = self.forward_request(instance, transformed_request)

        # Response transformation
        return self.transform_response(response)

Deployment Patterns

Blue-Green Deployment

# Blue environment (current production)
blue-user-service:
  replicas: 3
  image: user-service:v1.0

blue-order-service:
  replicas: 3
  image: order-service:v1.0

# Green environment (new version)
green-user-service:
  replicas: 3
  image: user-service:v1.1

green-order-service:
  replicas: 3
  image: order-service:v1.1

# Switch traffic
load-balancer:
  target: green  # Switch from blue to green

Canary Deployment

# Production version (90% traffic)
user-service-stable:
  replicas: 9
  image: user-service:v1.0
  weight: 90

# Canary version (10% traffic)
user-service-canary:
  replicas: 1
  image: user-service:v1.1
  weight: 10

Monitoring & Observability

Distributed Tracing

import opentracing

class OrderService:
    def create_order(self, order_data, span_context=None):
        span = opentracing.tracer.start_span(
            'create_order',
            child_of=span_context
        )
        span.set_tag('user_id', order_data['user_id'])

        try:
            # Call user service
            user_span = opentracing.tracer.start_span(
                'validate_user',
                child_of=span
            )
            user = self.user_service.get_user(order_data['user_id'])
            user_span.finish()

            # Call payment service
            payment_span = opentracing.tracer.start_span(
                'process_payment',
                child_of=span
            )
            payment = self.payment_service.charge(order_data)
            payment_span.finish()

            order = self.create_order_record(order_data)
            span.set_tag('order_id', order.id)

            return order

        except Exception as e:
            span.set_tag('error', True)
            span.log_kv({'error.kind': type(e).__name__, 'message': str(e)})
            raise
        finally:
            span.finish()

Best Practices

Service Design

  • Keep services focused và cohesive
  • Design for failure
  • Make services stateless
  • Use async communication when possible
  • Implement proper monitoring

Data Management

  • Each service owns its data
  • Use event sourcing for audit trails
  • Implement compensation transactions
  • Cache frequently accessed data
  • Plan for data consistency

Security

  • Implement service-to-service authentication
  • Use API gateway for external access
  • Encrypt inter-service communication
  • Implement proper authorization
  • Regular security audits

Common Anti-Patterns

Distributed Monolith

# Anti-pattern: Tight coupling
class OrderService:
    def create_order(self, order_data):
        # Synchronous calls to multiple services
        user = self.user_service.get_user_sync(order_data['user_id'])
        inventory = self.inventory_service.reserve_sync(order_data['items'])
        payment = self.payment_service.charge_sync(order_data)

        # All services must be available for order to succeed
        return self.create_order_record(order_data)

Chatty Communication

# Anti-pattern: Too many service calls
class OrderService:
    def get_order_details(self, order_id):
        order = self.get_order(order_id)
        user = self.user_service.get_user(order.user_id)  # Call 1

        items = []
        for item_id in order.item_ids:
            item = self.catalog_service.get_item(item_id)  # Call per item
            items.append(item)

        payment = self.payment_service.get_payment(order.payment_id)  # Call N+2

        return {
            'order': order,
            'user': user,
            'items': items,
            'payment': payment
        }

Next Steps

  1. 📚 Học về Event-Driven Architecture
  2. 🎯 Practice service decomposition
  3. 🏗️ Explore container orchestration
  4. 💻 Setup distributed monitoring

Content sẽ được expand với detailed microservices implementation patterns.