Multithreading trong Java

1. Cơ Bản về Thread

Khái Niệm Cơ Bản

  • Thread là gì?
  • Đơn vị nhỏ nhất của việc thực thi trong chương trình
  • Cho phép thực hiện nhiều tác vụ đồng thời
  • Chia sẻ tài nguyên với các thread khác trong cùng process

  • Process vs Thread

  • Process: Chương trình độc lập, có không gian bộ nhớ riêng
  • Thread: Chia sẻ không gian bộ nhớ trong cùng process
  • Thread nhẹ hơn và tốn ít tài nguyên hơn process

1.1 Tạo Thread

  • Phương pháp tạo thread:
  • Kế thừa từ lớp Thread
  • Implements interface Runnable
  • Sử dụng Lambda expression (Java 8+)
  • Sử dụng Thread Pool
// Cách 1: Kế thừa Thread
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread đang chạy");
    }
}

// Cách 2: Implements Runnable
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable đang chạy");
    }
}

// Sử dụng
Thread thread1 = new MyThread();
thread1.start();

Thread thread2 = new Thread(new MyRunnable());
thread2.start();

// Sử dụng Lambda
Thread thread3 = new Thread(() -> {
    System.out.println("Lambda thread đang chạy");
});
thread3.start();

1.2 Thread Lifecycle

  • Các trạng thái của Thread:
  • NEW: Thread được tạo nhưng chưa start()
  • RUNNABLE: Thread đang chạy hoặc sẵn sàng chạy
  • BLOCKED: Thread đang chờ monitor lock
  • WAITING: Thread đang chờ thread khác
  • TIMED_WAITING: Thread đang chờ với timeout
  • TERMINATED: Thread đã kết thúc

  • Chuyển đổi trạng thái:

  • start(): NEW -> RUNNABLE
  • run(): Thực thi mã trong thread
  • sleep(): RUNNABLE -> TIMED_WAITING
  • wait(): RUNNABLE -> WAITING
  • notify()/notifyAll(): WAITING -> RUNNABLE
  • join(): Chờ thread khác kết thúc

2. Thread Synchronization

Khái Niệm

  • Race Condition:
  • Xảy ra khi nhiều thread cùng truy cập shared resource
  • Có thể dẫn đến kết quả không mong muốn
  • Cần được xử lý bằng synchronization

  • Critical Section:

  • Đoạn code truy cập shared resource
  • Cần được bảo vệ để đảm bảo thread safety
  • Chỉ cho phép một thread truy cập tại một thời điểm

2.1 Synchronized Method

  • Cách hoạt động:
  • Sử dụng từ khóa synchronized
  • Tự động khóa object khi vào method
  • Tự động mở khóa khi thoát method
public class Counter {
    private int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

2.2 Synchronized Block

  • Ưu điểm:
  • Phạm vi đồng bộ hóa nhỏ hơn
  • Hiệu năng tốt hơn synchronized method
  • Có thể chọn object để synchronize
public class BankAccount {
    private double balance;
    private final Object lock = new Object();

    public void withdraw(double amount) {
        synchronized(lock) {
            if (balance >= amount) {
                balance -= amount;
            }
        }
    }

    public void deposit(double amount) {
        synchronized(lock) {
            balance += amount;
        }
    }
}

3. Thread Communication

Khái Niệm

  • Inter-thread Communication:
  • Cho phép các thread giao tiếp với nhau
  • Đồng bộ hóa hoạt động giữa các thread
  • Tránh busy waiting và tối ưu tài nguyên

3.1 wait(), notify(), notifyAll()

  • Cơ chế hoạt động:
  • wait(): Thread chờ đến khi được notify
  • notify(): Đánh thức một thread đang wait
  • notifyAll(): Đánh thức tất cả thread đang wait
public class MessageQueue {
    private String message;
    private boolean empty = true;

    public synchronized String receive() {
        while (empty) {
            try {
                wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        empty = true;
        notifyAll();
        return message;
    }

    public synchronized void send(String message) {
        while (!empty) {
            try {
                wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        empty = false;
        this.message = message;
        notifyAll();
    }
}

3.2 Join

  • Mục đích sử dụng:
  • Chờ thread khác kết thúc
  • Đảm bảo thứ tự thực thi
  • Phối hợp giữa các thread
public class JoinExample {
    public void demonstrateJoin() {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("Thread hoàn thành");
        });

        thread.start();
        try {
            thread.join(); // Chờ thread kết thúc
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("Main thread tiếp tục");
    }
}

4. Thread Pool và Executor Framework

Khái Niệm

  • Thread Pool:
  • Quản lý và tái sử dụng thread
  • Giới hạn số lượng thread tối đa
  • Tối ưu hiệu năng và tài nguyên

  • Executor Framework:

  • API cao cấp để quản lý thread
  • Tách biệt task submission và execution
  • Cung cấp nhiều loại thread pool khác nhau

4.1 ExecutorService

  • Các loại Executor:
  • Fixed Thread Pool: Số lượng thread cố định
  • Cached Thread Pool: Tạo thread mới khi cần
  • Single Thread Executor: Chỉ một thread
  • Scheduled Thread Pool: Thực thi theo lịch
public class ExecutorExample {
    public void demonstrateExecutor() {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // Submit Runnable
        executor.submit(() -> {
            System.out.println("Task đang chạy");
        });

        // Submit Callable
        Future<String> future = executor.submit(() -> {
            Thread.sleep(1000);
            return "Kết quả task";
        });

        try {
            String result = future.get(); // Chờ và lấy kết quả
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            Thread.currentThread().interrupt();
        }

        executor.shutdown();
    }
}

4.2 CompletableFuture

  • Đặc điểm:
  • Hỗ trợ lập trình bất đồng bộ
  • Cho phép kết hợp nhiều task
  • Xử lý kết quả và lỗi linh hoạt
public class CompletableFutureExample {
    public void demonstrateAsync() {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                // Task 1
                return "Hello";
            })
            .thenApplyAsync(result -> {
                // Task 2
                return result + " World";
            })
            .thenApplyAsync(String::toUpperCase);

        future.thenAccept(System.out::println);
    }

    public void demonstrateCombine() {
        CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> "World");

        CompletableFuture<String> combined = future1
            .thenCombine(future2, (s1, s2) -> s1 + " " + s2);

        combined.thenAccept(System.out::println);
    }
}

5. Thread Safety và Concurrent Collections

Khái Niệm

  • Thread Safety:
  • Đảm bảo code hoạt động đúng trong môi trường đa luồng
  • Tránh race condition và data corruption
  • Sử dụng synchronization và atomic operations

5.1 Atomic Classes

  • Đặc điểm:
  • Đảm bảo atomic operations
  • Không cần synchronization
  • Hiệu năng tốt hơn synchronized
public class AtomicExample {
    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicReference<String> reference = new AtomicReference<>("");

    public void increment() {
        counter.incrementAndGet();
    }

    public void updateReference() {
        reference.updateAndGet(current -> current + "Updated");
    }
}

5.2 Concurrent Collections

  • Các loại Collection:
  • ConcurrentHashMap: Map thread-safe
  • CopyOnWriteArrayList: List thread-safe
  • BlockingQueue: Queue cho producer-consumer
  • ConcurrentSkipListMap: Sorted map thread-safe
public class ConcurrentCollectionsExample {
    // Thread-safe list
    private List<String> list = Collections
        .synchronizedList(new ArrayList<>());
    private CopyOnWriteArrayList<String> copyOnWriteList = 
        new CopyOnWriteArrayList<>();

    // Thread-safe map
    private ConcurrentHashMap<String, Integer> map = 
        new ConcurrentHashMap<>();

    // Thread-safe queue
    private BlockingQueue<String> queue = 
        new LinkedBlockingQueue<>();

    public void demonstrate() {
        // ConcurrentHashMap operations
        map.put("key", 1);
        map.putIfAbsent("key", 2);
        map.computeIfAbsent("key", k -> k.length());

        // BlockingQueue operations
        try {
            queue.put("element");
            String element = queue.take();

            // Với timeout
            String poll = queue.poll(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

6. Lock Framework

Khái Niệm

  • Lock Interface:
  • Thay thế linh hoạt cho synchronized
  • Hỗ trợ nhiều tính năng hơn
  • Kiểm soát tốt hơn về lock/unlock

6.1 ReentrantLock

  • Đặc điểm:
  • Hỗ trợ lock công bằng
  • Có thể interrupt thread đang đợi
  • Có thể thử lock không chờ đợi
public class ReentrantLockExample {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;

    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public boolean tryIncrement() {
        if (lock.tryLock()) {
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;
    }
}

6.2 ReadWriteLock

  • Đặc điểm:
  • Tách biệt lock đọc và ghi
  • Nhiều thread có thể đọc cùng lúc
  • Chỉ một thread được ghi tại một thời điểm
public class ReadWriteLockExample {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();
    private Map<String, String> map = new HashMap<>();

    public String read(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public void write(String key, String value) {
        writeLock.lock();
        try {
            map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
}

7. Best Practices và Anti-patterns

7.1 Best Practices

  • Nguyên tắc cơ bản:
  • Sử dụng Executor Framework thay vì tạo thread trực tiếp
  • Xử lý InterruptedException đúng cách
  • Sử dụng concurrent collections
  • Tránh synchronized cho các operation dài
public class ThreadingBestPractices {
    // 1. Sử dụng Executor Framework thay vì tạo Thread trực tiếp
    private final ExecutorService executor = 
        Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
        );

    // 2. Xử lý InterruptedException đúng cách
    public void handleInterruption() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // Đặt lại trạng thái interrupted
            Thread.currentThread().interrupt();
            // Cleanup và return
            return;
        }
    }

    // 3. Sử dụng concurrent collections
    private final ConcurrentHashMap<String, String> cache = 
        new ConcurrentHashMap<>();

    // 4. Tránh synchronized cho các operation dài
    public void longOperation() {
        // Chỉ synchronized phần cần thiết
        String data;
        synchronized(this) {
            data = getData();
        }
        processData(data); // Không cần synchronized
    }
}

7.2 Anti-patterns

  • Các lỗi cần tránh:
  • Double-checked locking không an toàn
  • Synchronized trên String literal
  • Không xử lý InterruptedException
  • Deadlock do lock không đúng thứ tự
public class ThreadingAntiPatterns {
    // BAD: Double-checked locking không an toàn
    private static volatile ThreadingAntiPatterns instance;
    public static ThreadingAntiPatterns getInstance() {
        if (instance == null) {
            synchronized(ThreadingAntiPatterns.class) {
                if (instance == null) {
                    instance = new ThreadingAntiPatterns();
                }
            }
        }
        return instance;
    }

    // BAD: Synchronized trên String literal
    public void synchronizeOnString() {
        synchronized("lock") { // Không nên làm thế này
            // code
        }
    }

    // BAD: Không xử lý InterruptedException
    public void ignoreInterruption() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // Không nên bỏ qua
            e.printStackTrace();
        }
    }
}