Consistency Patterns - Data Consistency in Distributed Systems
Tổng quan
Consistency patterns define how data remains consistent across distributed systems. Understanding various consistency models critical cho designing reliable systems.
CAP Theorem
Trade-offs
- Consistency: All nodes see same data simultaneously
- Availability: System remains operational
- Partition Tolerance: System continues despite network failures
# CAP Theorem implications
if network_partition_occurs():
choose_one_of([
"consistency", # CP: Consistent, Partition-tolerant
"availability" # AP: Available, Partition-tolerant
])
# Cannot have both during partition
Consistency Models
Strong Consistency
# Example: Distributed lock
class DistributedLock:
def acquire_lock(self, resource_id):
# All nodes must agree before proceeding
consensus = self.raft_consensus.propose({
'action': 'acquire_lock',
'resource': resource_id,
'node': self.node_id
})
return consensus.committed
def read_with_lock(self, key):
with self.acquire_lock(key):
# Guaranteed latest data
return self.database.read(key)
Eventual Consistency
# Example: DNS propagation
class DNSRecord:
def update_record(self, domain, ip):
# Update primary server
self.primary_dns.update(domain, ip)
# Async propagation to secondaries
for secondary in self.secondary_dns_servers:
self.async_queue.push({
'action': 'update',
'domain': domain,
'ip': ip,
'server': secondary
})
# Eventually consistent across all servers
return {'status': 'propagating', 'ttl': '300s'}
Weak Consistency
# Example: Real-time chat
class ChatService:
def send_message(self, message):
# Best effort delivery
for user in self.online_users:
try:
self.websocket_send(user, message)
except ConnectionError:
# Drop message, don't guarantee delivery
self.logger.warn(f"Failed to deliver to {user}")
continue
Consensus Algorithms
Raft Consensus
class RaftNode:
def __init__(self):
self.state = 'follower' # follower, candidate, leader
self.current_term = 0
self.voted_for = None
def start_election(self):
self.state = 'candidate'
self.current_term += 1
self.voted_for = self.node_id
votes = 1 # Vote for self
for peer in self.peers:
response = peer.request_vote(self.current_term, self.node_id)
if response.vote_granted:
votes += 1
if votes > len(self.peers) // 2:
self.become_leader()
def replicate_log(self, entry):
# Leader replicates to majority
confirmations = 0
for peer in self.peers:
if peer.append_entries(entry):
confirmations += 1
if confirmations >= len(self.peers) // 2:
self.commit_entry(entry)
return True
return False
PBFT (Practical Byzantine Fault Tolerance)
class PBFTNode:
def three_phase_commit(self, request):
# Phase 1: Pre-prepare
if self.is_primary():
self.broadcast_preprepare(request)
# Phase 2: Prepare
prepare_votes = self.collect_prepare_votes()
if prepare_votes >= 2 * self.faulty_nodes + 1:
self.broadcast_commit(request)
# Phase 3: Commit
commit_votes = self.collect_commit_votes()
if commit_votes >= 2 * self.faulty_nodes + 1:
self.execute_request(request)
return True
return False
ACID Properties
Implementation Example
class DatabaseTransaction:
def __init__(self):
self.operations = []
self.locks = []
def begin(self):
self.transaction_id = generate_uuid()
self.start_time = time.now()
def read(self, key):
# Isolation: Read committed
lock = self.acquire_read_lock(key)
self.locks.append(lock)
value = self.database.read_committed(key)
return value
def write(self, key, value):
# Atomicity: Log operation
lock = self.acquire_write_lock(key)
self.locks.append(lock)
operation = {
'type': 'write',
'key': key,
'old_value': self.database.read(key),
'new_value': value,
'timestamp': time.now()
}
self.operations.append(operation)
def commit(self):
try:
# Durability: Write to WAL first
self.wal.append_log(self.operations)
# Apply all operations atomically
for op in self.operations:
self.database.apply_operation(op)
self.wal.mark_committed(self.transaction_id)
except Exception as e:
self.rollback()
raise e
finally:
self.release_locks()
def rollback(self):
# Atomicity: Undo all operations
for op in reversed(self.operations):
self.database.undo_operation(op)
self.release_locks()
BASE Properties
Example: E-commerce Order
class OrderService:
def place_order(self, order_data):
# Basic Availability: Accept order immediately
order_id = self.generate_order_id()
# Soft state: Mark as processing
self.cache.set(f"order:{order_id}", {
'status': 'processing',
'data': order_data,
'timestamp': time.now()
})
# Eventually consistent: Async processing
self.async_processor.enqueue([
('validate_inventory', order_data),
('process_payment', order_data),
('update_inventory', order_data),
('send_confirmation', order_id)
])
return {'order_id': order_id, 'status': 'accepted'}
def check_order_status(self, order_id):
# May return stale data initially
return self.cache.get(f"order:{order_id}")
Vector Clocks
Distributed Event Ordering
class VectorClock:
def __init__(self, node_id, num_nodes):
self.node_id = node_id
self.clock = [0] * num_nodes
def tick(self):
self.clock[self.node_id] += 1
def update(self, other_clock):
for i in range(len(self.clock)):
self.clock[i] = max(self.clock[i], other_clock[i])
self.tick()
def happens_before(self, other_clock):
return (all(self.clock[i] <= other_clock[i] for i in range(len(self.clock)))
and any(self.clock[i] < other_clock[i] for i in range(len(self.clock))))
class DistributedEvent:
def __init__(self, data, vector_clock):
self.data = data
self.timestamp = vector_clock.copy()
def compare(self, other):
if self.timestamp.happens_before(other.timestamp):
return -1
elif other.timestamp.happens_before(self.timestamp):
return 1
return 0 # Concurrent events
Conflict Resolution
Last Write Wins (LWW)
class LWWRegister:
def __init__(self):
self.value = None
self.timestamp = 0
def write(self, value, timestamp):
if timestamp > self.timestamp:
self.value = value
self.timestamp = timestamp
def merge(self, other_register):
if other_register.timestamp > self.timestamp:
self.value = other_register.value
self.timestamp = other_register.timestamp
Multi-Value Register
class MVRegister:
def __init__(self):
self.values = {} # {vector_clock: value}
def write(self, value, vector_clock):
# Remove dominated values
self.values = {
vc: v for vc, v in self.values.items()
if not vector_clock.dominates(vc)
}
self.values[vector_clock] = value
def read(self):
# Return all concurrent values
return list(self.values.values())
Database Consistency Levels
Cassandra Example
# Consistency level configuration
class CassandraClient:
def read_with_consistency(self, key, consistency_level):
if consistency_level == 'ONE':
# Read from any one replica
return self.read_from_any_replica(key)
elif consistency_level == 'QUORUM':
# Read from majority of replicas
replicas = self.get_replicas(key)
responses = []
for replica in replicas[:len(replicas)//2 + 1]:
responses.append(replica.read(key))
return self.resolve_conflicts(responses)
elif consistency_level == 'ALL':
# Read from all replicas
replicas = self.get_replicas(key)
responses = [r.read(key) for r in replicas]
return self.resolve_conflicts(responses)
Best Practices
Choosing Consistency Model
def choose_consistency_model(use_case):
if use_case == 'financial_transactions':
return 'strong_consistency' # ACID required
elif use_case == 'social_media_feed':
return 'eventual_consistency' # Performance > consistency
elif use_case == 'real_time_gaming':
return 'weak_consistency' # Latency critical
elif use_case == 'inventory_management':
return 'strong_consistency' # Prevent overselling
elif use_case == 'user_preferences':
return 'eventual_consistency' # Personal data, eventual sync OK
Handling Network Partitions
class PartitionTolerantService:
def handle_partition(self):
if self.partition_detected():
if self.prefer_consistency():
# CP: Reject writes, maintain consistency
self.reject_writes()
self.serve_reads_from_majority()
else:
# AP: Continue operations, resolve conflicts later
self.continue_operations()
self.queue_conflict_resolution()
Next Steps
- 📚 Học về Microservices Architecture
- 🎯 Practice consistency implementations
- 🏗️ Explore consensus algorithms
- 💻 Setup conflict resolution
Content sẽ được expand với detailed consistency implementation patterns.