Consistency Patterns - Data Consistency in Distributed Systems

Tổng quan

Consistency patterns define how data remains consistent across distributed systems. Understanding various consistency models critical cho designing reliable systems.

CAP Theorem

Trade-offs

  • Consistency: All nodes see same data simultaneously
  • Availability: System remains operational
  • Partition Tolerance: System continues despite network failures
# CAP Theorem implications
if network_partition_occurs():
    choose_one_of([
        "consistency",     # CP: Consistent, Partition-tolerant
        "availability"     # AP: Available, Partition-tolerant
    ])
    # Cannot have both during partition

Consistency Models

Strong Consistency

# Example: Distributed lock
class DistributedLock:
    def acquire_lock(self, resource_id):
        # All nodes must agree before proceeding
        consensus = self.raft_consensus.propose({
            'action': 'acquire_lock',
            'resource': resource_id,
            'node': self.node_id
        })
        return consensus.committed

    def read_with_lock(self, key):
        with self.acquire_lock(key):
            # Guaranteed latest data
            return self.database.read(key)

Eventual Consistency

# Example: DNS propagation
class DNSRecord:
    def update_record(self, domain, ip):
        # Update primary server
        self.primary_dns.update(domain, ip)

        # Async propagation to secondaries
        for secondary in self.secondary_dns_servers:
            self.async_queue.push({
                'action': 'update',
                'domain': domain,
                'ip': ip,
                'server': secondary
            })

        # Eventually consistent across all servers
        return {'status': 'propagating', 'ttl': '300s'}

Weak Consistency

# Example: Real-time chat
class ChatService:
    def send_message(self, message):
        # Best effort delivery
        for user in self.online_users:
            try:
                self.websocket_send(user, message)
            except ConnectionError:
                # Drop message, don't guarantee delivery
                self.logger.warn(f"Failed to deliver to {user}")
                continue

Consensus Algorithms

Raft Consensus

class RaftNode:
    def __init__(self):
        self.state = 'follower'  # follower, candidate, leader
        self.current_term = 0
        self.voted_for = None

    def start_election(self):
        self.state = 'candidate'
        self.current_term += 1
        self.voted_for = self.node_id

        votes = 1  # Vote for self
        for peer in self.peers:
            response = peer.request_vote(self.current_term, self.node_id)
            if response.vote_granted:
                votes += 1

        if votes > len(self.peers) // 2:
            self.become_leader()

    def replicate_log(self, entry):
        # Leader replicates to majority
        confirmations = 0
        for peer in self.peers:
            if peer.append_entries(entry):
                confirmations += 1

        if confirmations >= len(self.peers) // 2:
            self.commit_entry(entry)
            return True
        return False

PBFT (Practical Byzantine Fault Tolerance)

class PBFTNode:
    def three_phase_commit(self, request):
        # Phase 1: Pre-prepare
        if self.is_primary():
            self.broadcast_preprepare(request)

        # Phase 2: Prepare
        prepare_votes = self.collect_prepare_votes()
        if prepare_votes >= 2 * self.faulty_nodes + 1:
            self.broadcast_commit(request)

        # Phase 3: Commit
        commit_votes = self.collect_commit_votes()
        if commit_votes >= 2 * self.faulty_nodes + 1:
            self.execute_request(request)
            return True
        return False

ACID Properties

Implementation Example

class DatabaseTransaction:
    def __init__(self):
        self.operations = []
        self.locks = []

    def begin(self):
        self.transaction_id = generate_uuid()
        self.start_time = time.now()

    def read(self, key):
        # Isolation: Read committed
        lock = self.acquire_read_lock(key)
        self.locks.append(lock)
        value = self.database.read_committed(key)
        return value

    def write(self, key, value):
        # Atomicity: Log operation
        lock = self.acquire_write_lock(key)
        self.locks.append(lock)

        operation = {
            'type': 'write',
            'key': key,
            'old_value': self.database.read(key),
            'new_value': value,
            'timestamp': time.now()
        }
        self.operations.append(operation)

    def commit(self):
        try:
            # Durability: Write to WAL first
            self.wal.append_log(self.operations)

            # Apply all operations atomically
            for op in self.operations:
                self.database.apply_operation(op)

            self.wal.mark_committed(self.transaction_id)

        except Exception as e:
            self.rollback()
            raise e
        finally:
            self.release_locks()

    def rollback(self):
        # Atomicity: Undo all operations
        for op in reversed(self.operations):
            self.database.undo_operation(op)
        self.release_locks()

BASE Properties

Example: E-commerce Order

class OrderService:
    def place_order(self, order_data):
        # Basic Availability: Accept order immediately
        order_id = self.generate_order_id()

        # Soft state: Mark as processing
        self.cache.set(f"order:{order_id}", {
            'status': 'processing',
            'data': order_data,
            'timestamp': time.now()
        })

        # Eventually consistent: Async processing
        self.async_processor.enqueue([
            ('validate_inventory', order_data),
            ('process_payment', order_data),
            ('update_inventory', order_data),
            ('send_confirmation', order_id)
        ])

        return {'order_id': order_id, 'status': 'accepted'}

    def check_order_status(self, order_id):
        # May return stale data initially
        return self.cache.get(f"order:{order_id}")

Vector Clocks

Distributed Event Ordering

class VectorClock:
    def __init__(self, node_id, num_nodes):
        self.node_id = node_id
        self.clock = [0] * num_nodes

    def tick(self):
        self.clock[self.node_id] += 1

    def update(self, other_clock):
        for i in range(len(self.clock)):
            self.clock[i] = max(self.clock[i], other_clock[i])
        self.tick()

    def happens_before(self, other_clock):
        return (all(self.clock[i] <= other_clock[i] for i in range(len(self.clock))) 
                and any(self.clock[i] < other_clock[i] for i in range(len(self.clock))))

class DistributedEvent:
    def __init__(self, data, vector_clock):
        self.data = data
        self.timestamp = vector_clock.copy()

    def compare(self, other):
        if self.timestamp.happens_before(other.timestamp):
            return -1
        elif other.timestamp.happens_before(self.timestamp):
            return 1
        return 0  # Concurrent events

Conflict Resolution

Last Write Wins (LWW)

class LWWRegister:
    def __init__(self):
        self.value = None
        self.timestamp = 0

    def write(self, value, timestamp):
        if timestamp > self.timestamp:
            self.value = value
            self.timestamp = timestamp

    def merge(self, other_register):
        if other_register.timestamp > self.timestamp:
            self.value = other_register.value
            self.timestamp = other_register.timestamp

Multi-Value Register

class MVRegister:
    def __init__(self):
        self.values = {}  # {vector_clock: value}

    def write(self, value, vector_clock):
        # Remove dominated values
        self.values = {
            vc: v for vc, v in self.values.items()
            if not vector_clock.dominates(vc)
        }
        self.values[vector_clock] = value

    def read(self):
        # Return all concurrent values
        return list(self.values.values())

Database Consistency Levels

Cassandra Example

# Consistency level configuration
class CassandraClient:
    def read_with_consistency(self, key, consistency_level):
        if consistency_level == 'ONE':
            # Read from any one replica
            return self.read_from_any_replica(key)

        elif consistency_level == 'QUORUM':
            # Read from majority of replicas
            replicas = self.get_replicas(key)
            responses = []
            for replica in replicas[:len(replicas)//2 + 1]:
                responses.append(replica.read(key))
            return self.resolve_conflicts(responses)

        elif consistency_level == 'ALL':
            # Read from all replicas
            replicas = self.get_replicas(key)
            responses = [r.read(key) for r in replicas]
            return self.resolve_conflicts(responses)

Best Practices

Choosing Consistency Model

def choose_consistency_model(use_case):
    if use_case == 'financial_transactions':
        return 'strong_consistency'  # ACID required

    elif use_case == 'social_media_feed':
        return 'eventual_consistency'  # Performance > consistency

    elif use_case == 'real_time_gaming':
        return 'weak_consistency'  # Latency critical

    elif use_case == 'inventory_management':
        return 'strong_consistency'  # Prevent overselling

    elif use_case == 'user_preferences':
        return 'eventual_consistency'  # Personal data, eventual sync OK

Handling Network Partitions

class PartitionTolerantService:
    def handle_partition(self):
        if self.partition_detected():
            if self.prefer_consistency():
                # CP: Reject writes, maintain consistency
                self.reject_writes()
                self.serve_reads_from_majority()
            else:
                # AP: Continue operations, resolve conflicts later
                self.continue_operations()
                self.queue_conflict_resolution()

Next Steps

  1. 📚 Học về Microservices Architecture
  2. 🎯 Practice consistency implementations
  3. 🏗️ Explore consensus algorithms
  4. 💻 Setup conflict resolution

Content sẽ được expand với detailed consistency implementation patterns.