Distributed Systems - Large-Scale System Design

Tổng quan

Distributed systems spread computation across multiple machines to achieve scalability, reliability, và performance.

Core Challenges

CAP Theorem

  • Consistency: All nodes see same data
  • Availability: System remains operational
  • Partition Tolerance: System works despite network failures
# CP System (Consistent + Partition Tolerant)
class ConsistentSystem:
    def write(self, data):
        if not self.can_reach_majority():
            raise UnavailableException("Cannot ensure consistency")
        return self.replicate_to_majority(data)

# AP System (Available + Partition Tolerant)  
class AvailableSystem:
    def write(self, data):
        # Always accept writes, resolve conflicts later
        return self.write_locally(data)

Distributed Computing Fallacies

  1. Network is reliable
  2. Latency is zero
  3. Bandwidth is infinite
  4. Network is secure
  5. Topology doesn't change
  6. Administrator exists
  7. Transport cost is zero
  8. Network is homogeneous

Consensus Algorithms

Raft Consensus

class RaftNode:
    def __init__(self, node_id):
        self.node_id = node_id
        self.state = 'follower'  # follower, candidate, leader
        self.current_term = 0

    def request_vote(self, term, candidate_id):
        if term > self.current_term:
            self.current_term = term
            self.voted_for = candidate_id
            return True
        return False

    def append_entries(self, entries, leader_term):
        if leader_term >= self.current_term:
            self.current_term = leader_term
            self.state = 'follower'
            return self.log.append(entries)
        return False

Byzantine Fault Tolerance

class ByzantineNode:
    def three_phase_commit(self, proposal):
        # Phase 1: Prepare
        prepare_votes = self.send_prepare(proposal)

        # Phase 2: Commit  
        if prepare_votes > (2 * self.faulty_nodes):
            commit_votes = self.send_commit(proposal)

            # Phase 3: Execute
            if commit_votes > (2 * self.faulty_nodes):
                return self.execute(proposal)

        return False

Data Distribution

Consistent Hashing

import hashlib

class ConsistentHashRing:
    def __init__(self, nodes=None, replicas=3):
        self.replicas = replicas
        self.ring = {}
        self.sorted_keys = []

        if nodes:
            for node in nodes:
                self.add_node(node)

    def add_node(self, node):
        for i in range(self.replicas):
            key = self.hash(f"{node}:{i}")
            self.ring[key] = node
            self.sorted_keys.append(key)
        self.sorted_keys.sort()

    def get_node(self, key):
        if not self.ring:
            return None

        hash_key = self.hash(key)
        idx = self.get_position(hash_key)
        return self.ring[self.sorted_keys[idx]]

    def hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

Sharding Strategies

# Range-based sharding
class RangeSharding:
    def get_shard(self, key):
        if key < 1000:
            return 'shard_1'
        elif key < 2000:
            return 'shard_2'
        else:
            return 'shard_3'

# Hash-based sharding
class HashSharding:
    def __init__(self, num_shards):
        self.num_shards = num_shards

    def get_shard(self, key):
        return f"shard_{hash(key) % self.num_shards}"

Replication Patterns

Master-Slave Replication

class MasterSlaveDB:
    def __init__(self):
        self.master = MasterNode()
        self.slaves = [SlaveNode() for _ in range(3)]

    def write(self, data):
        # Write to master
        result = self.master.write(data)

        # Async replication to slaves
        for slave in self.slaves:
            self.async_replicate(slave, data)

        return result

    def read(self):
        # Read from any slave for load distribution
        slave = random.choice(self.slaves)
        return slave.read()

Multi-Master Replication

class MultiMasterDB:
    def __init__(self, nodes):
        self.nodes = nodes

    def write(self, data):
        # Write to any node
        node = self.select_node()
        result = node.write(data)

        # Propagate to other nodes
        for other_node in self.nodes:
            if other_node != node:
                other_node.async_update(data)

        return result

    def resolve_conflicts(self, data_versions):
        # Last-write-wins or vector clocks
        return max(data_versions, key=lambda x: x.timestamp)

Load Balancing

Load Balancer Types

class LoadBalancer:
    def __init__(self, servers):
        self.servers = servers
        self.current = 0

    def round_robin(self):
        server = self.servers[self.current]
        self.current = (self.current + 1) % len(self.servers)
        return server

    def weighted_round_robin(self, weights):
        # Select based on weights
        total_weight = sum(weights)
        for i, weight in enumerate(weights):
            if random.random() < weight / total_weight:
                return self.servers[i]

    def least_connections(self):
        return min(self.servers, key=lambda s: s.active_connections)

Fault Tolerance

Circuit Breaker

class CircuitBreaker:
    def __init__(self, threshold=5, timeout=60):
        self.threshold = threshold
        self.timeout = timeout
        self.failure_count = 0
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN

    def call(self, func, *args):
        if self.state == 'OPEN':
            if time.time() - self.last_failure > self.timeout:
                self.state = 'HALF_OPEN'
            else:
                raise CircuitOpenException()

        try:
            result = func(*args)
            self.on_success()
            return result
        except Exception:
            self.on_failure()
            raise

Bulkhead Pattern

class BulkheadThreadPool:
    def __init__(self):
        self.critical_pool = ThreadPoolExecutor(max_workers=10)
        self.non_critical_pool = ThreadPoolExecutor(max_workers=5)

    def execute_critical(self, task):
        return self.critical_pool.submit(task)

    def execute_non_critical(self, task):
        return self.non_critical_pool.submit(task)

Message Passing

Actor Model

class Actor:
    def __init__(self):
        self.mailbox = queue.Queue()
        self.running = True

    def send(self, message):
        self.mailbox.put(message)

    def receive(self):
        while self.running:
            try:
                message = self.mailbox.get(timeout=1)
                self.handle_message(message)
            except queue.Empty:
                continue

    def handle_message(self, message):
        # Override in subclasses
        pass

class OrderActor(Actor):
    def handle_message(self, message):
        if message.type == 'CREATE_ORDER':
            self.create_order(message.data)
        elif message.type == 'UPDATE_ORDER':
            self.update_order(message.data)

Time Synchronization

Vector Clocks

class VectorClock:
    def __init__(self, process_id, num_processes):
        self.process_id = process_id
        self.clock = [0] * num_processes

    def tick(self):
        self.clock[self.process_id] += 1

    def update(self, other_clock):
        for i in range(len(self.clock)):
            self.clock[i] = max(self.clock[i], other_clock[i])
        self.tick()

    def happens_before(self, other):
        return (self.clock <= other.clock and 
                self.clock != other.clock)

Monitoring

Distributed Tracing

import opentelemetry.trace as trace

tracer = trace.get_tracer(__name__)

def distributed_operation():
    with tracer.start_as_current_span("operation") as span:
        span.set_attribute("service.name", "order-service")

        # Call another service
        with tracer.start_as_current_span("payment-call") as child_span:
            result = call_payment_service()
            child_span.set_attribute("payment.amount", result.amount)

        return result

Best Practices

Design Principles

  • Design for failure
  • Embrace eventual consistency
  • Use idempotent operations
  • Implement proper monitoring
  • Plan for partitions

Common Patterns

# Retry with exponential backoff
def retry_with_backoff(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            time.sleep(2 ** attempt)

Next Steps

  1. 📚 Học về Load Balancers
  2. 🎯 Practice consensus algorithms
  3. 🏗️ Explore distributed databases
  4. 💻 Setup distributed monitoring

Content sẽ được expand với detailed distributed system patterns.