Distributed Systems - Large-Scale System Design
Tổng quan
Distributed systems spread computation across multiple machines to achieve scalability, reliability, và performance.
Core Challenges
CAP Theorem
- Consistency: All nodes see same data
- Availability: System remains operational
- Partition Tolerance: System works despite network failures
# CP System (Consistent + Partition Tolerant)
class ConsistentSystem:
def write(self, data):
if not self.can_reach_majority():
raise UnavailableException("Cannot ensure consistency")
return self.replicate_to_majority(data)
# AP System (Available + Partition Tolerant)
class AvailableSystem:
def write(self, data):
# Always accept writes, resolve conflicts later
return self.write_locally(data)
Distributed Computing Fallacies
- Network is reliable
- Latency is zero
- Bandwidth is infinite
- Network is secure
- Topology doesn't change
- Administrator exists
- Transport cost is zero
- Network is homogeneous
Consensus Algorithms
Raft Consensus
class RaftNode:
def __init__(self, node_id):
self.node_id = node_id
self.state = 'follower' # follower, candidate, leader
self.current_term = 0
def request_vote(self, term, candidate_id):
if term > self.current_term:
self.current_term = term
self.voted_for = candidate_id
return True
return False
def append_entries(self, entries, leader_term):
if leader_term >= self.current_term:
self.current_term = leader_term
self.state = 'follower'
return self.log.append(entries)
return False
Byzantine Fault Tolerance
class ByzantineNode:
def three_phase_commit(self, proposal):
# Phase 1: Prepare
prepare_votes = self.send_prepare(proposal)
# Phase 2: Commit
if prepare_votes > (2 * self.faulty_nodes):
commit_votes = self.send_commit(proposal)
# Phase 3: Execute
if commit_votes > (2 * self.faulty_nodes):
return self.execute(proposal)
return False
Data Distribution
Consistent Hashing
import hashlib
class ConsistentHashRing:
def __init__(self, nodes=None, replicas=3):
self.replicas = replicas
self.ring = {}
self.sorted_keys = []
if nodes:
for node in nodes:
self.add_node(node)
def add_node(self, node):
for i in range(self.replicas):
key = self.hash(f"{node}:{i}")
self.ring[key] = node
self.sorted_keys.append(key)
self.sorted_keys.sort()
def get_node(self, key):
if not self.ring:
return None
hash_key = self.hash(key)
idx = self.get_position(hash_key)
return self.ring[self.sorted_keys[idx]]
def hash(self, key):
return int(hashlib.md5(key.encode()).hexdigest(), 16)
Sharding Strategies
# Range-based sharding
class RangeSharding:
def get_shard(self, key):
if key < 1000:
return 'shard_1'
elif key < 2000:
return 'shard_2'
else:
return 'shard_3'
# Hash-based sharding
class HashSharding:
def __init__(self, num_shards):
self.num_shards = num_shards
def get_shard(self, key):
return f"shard_{hash(key) % self.num_shards}"
Replication Patterns
Master-Slave Replication
class MasterSlaveDB:
def __init__(self):
self.master = MasterNode()
self.slaves = [SlaveNode() for _ in range(3)]
def write(self, data):
# Write to master
result = self.master.write(data)
# Async replication to slaves
for slave in self.slaves:
self.async_replicate(slave, data)
return result
def read(self):
# Read from any slave for load distribution
slave = random.choice(self.slaves)
return slave.read()
Multi-Master Replication
class MultiMasterDB:
def __init__(self, nodes):
self.nodes = nodes
def write(self, data):
# Write to any node
node = self.select_node()
result = node.write(data)
# Propagate to other nodes
for other_node in self.nodes:
if other_node != node:
other_node.async_update(data)
return result
def resolve_conflicts(self, data_versions):
# Last-write-wins or vector clocks
return max(data_versions, key=lambda x: x.timestamp)
Load Balancing
Load Balancer Types
class LoadBalancer:
def __init__(self, servers):
self.servers = servers
self.current = 0
def round_robin(self):
server = self.servers[self.current]
self.current = (self.current + 1) % len(self.servers)
return server
def weighted_round_robin(self, weights):
# Select based on weights
total_weight = sum(weights)
for i, weight in enumerate(weights):
if random.random() < weight / total_weight:
return self.servers[i]
def least_connections(self):
return min(self.servers, key=lambda s: s.active_connections)
Fault Tolerance
Circuit Breaker
class CircuitBreaker:
def __init__(self, threshold=5, timeout=60):
self.threshold = threshold
self.timeout = timeout
self.failure_count = 0
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args):
if self.state == 'OPEN':
if time.time() - self.last_failure > self.timeout:
self.state = 'HALF_OPEN'
else:
raise CircuitOpenException()
try:
result = func(*args)
self.on_success()
return result
except Exception:
self.on_failure()
raise
Bulkhead Pattern
class BulkheadThreadPool:
def __init__(self):
self.critical_pool = ThreadPoolExecutor(max_workers=10)
self.non_critical_pool = ThreadPoolExecutor(max_workers=5)
def execute_critical(self, task):
return self.critical_pool.submit(task)
def execute_non_critical(self, task):
return self.non_critical_pool.submit(task)
Message Passing
Actor Model
class Actor:
def __init__(self):
self.mailbox = queue.Queue()
self.running = True
def send(self, message):
self.mailbox.put(message)
def receive(self):
while self.running:
try:
message = self.mailbox.get(timeout=1)
self.handle_message(message)
except queue.Empty:
continue
def handle_message(self, message):
# Override in subclasses
pass
class OrderActor(Actor):
def handle_message(self, message):
if message.type == 'CREATE_ORDER':
self.create_order(message.data)
elif message.type == 'UPDATE_ORDER':
self.update_order(message.data)
Time Synchronization
Vector Clocks
class VectorClock:
def __init__(self, process_id, num_processes):
self.process_id = process_id
self.clock = [0] * num_processes
def tick(self):
self.clock[self.process_id] += 1
def update(self, other_clock):
for i in range(len(self.clock)):
self.clock[i] = max(self.clock[i], other_clock[i])
self.tick()
def happens_before(self, other):
return (self.clock <= other.clock and
self.clock != other.clock)
Monitoring
Distributed Tracing
import opentelemetry.trace as trace
tracer = trace.get_tracer(__name__)
def distributed_operation():
with tracer.start_as_current_span("operation") as span:
span.set_attribute("service.name", "order-service")
# Call another service
with tracer.start_as_current_span("payment-call") as child_span:
result = call_payment_service()
child_span.set_attribute("payment.amount", result.amount)
return result
Best Practices
Design Principles
- Design for failure
- Embrace eventual consistency
- Use idempotent operations
- Implement proper monitoring
- Plan for partitions
Common Patterns
# Retry with exponential backoff
def retry_with_backoff(func, max_retries=3):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise e
time.sleep(2 ** attempt)
Next Steps
- 📚 Học về Load Balancers
- 🎯 Practice consensus algorithms
- 🏗️ Explore distributed databases
- 💻 Setup distributed monitoring
Content sẽ được expand với detailed distributed system patterns.