Multithreading trong Java
1. Cơ Bản về Thread
Khái Niệm Cơ Bản
- Thread là gì?
- Đơn vị nhỏ nhất của việc thực thi trong chương trình
- Cho phép thực hiện nhiều tác vụ đồng thời
-
Chia sẻ tài nguyên với các thread khác trong cùng process
-
Process vs Thread
- Process: Chương trình độc lập, có không gian bộ nhớ riêng
- Thread: Chia sẻ không gian bộ nhớ trong cùng process
- Thread nhẹ hơn và tốn ít tài nguyên hơn process
1.1 Tạo Thread
- Phương pháp tạo thread:
- Kế thừa từ lớp Thread
- Implements interface Runnable
- Sử dụng Lambda expression (Java 8+)
- Sử dụng Thread Pool
// Cách 1: Kế thừa Thread
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread đang chạy");
}
}
// Cách 2: Implements Runnable
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable đang chạy");
}
}
// Sử dụng
Thread thread1 = new MyThread();
thread1.start();
Thread thread2 = new Thread(new MyRunnable());
thread2.start();
// Sử dụng Lambda
Thread thread3 = new Thread(() -> {
System.out.println("Lambda thread đang chạy");
});
thread3.start();
1.2 Thread Lifecycle
- Các trạng thái của Thread:
- NEW: Thread được tạo nhưng chưa start()
- RUNNABLE: Thread đang chạy hoặc sẵn sàng chạy
- BLOCKED: Thread đang chờ monitor lock
- WAITING: Thread đang chờ thread khác
- TIMED_WAITING: Thread đang chờ với timeout
-
TERMINATED: Thread đã kết thúc
-
Chuyển đổi trạng thái:
- start(): NEW -> RUNNABLE
- run(): Thực thi mã trong thread
- sleep(): RUNNABLE -> TIMED_WAITING
- wait(): RUNNABLE -> WAITING
- notify()/notifyAll(): WAITING -> RUNNABLE
- join(): Chờ thread khác kết thúc
2. Thread Synchronization
Khái Niệm
- Race Condition:
- Xảy ra khi nhiều thread cùng truy cập shared resource
- Có thể dẫn đến kết quả không mong muốn
-
Cần được xử lý bằng synchronization
-
Critical Section:
- Đoạn code truy cập shared resource
- Cần được bảo vệ để đảm bảo thread safety
- Chỉ cho phép một thread truy cập tại một thời điểm
2.1 Synchronized Method
- Cách hoạt động:
- Sử dụng từ khóa synchronized
- Tự động khóa object khi vào method
- Tự động mở khóa khi thoát method
public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
2.2 Synchronized Block
- Ưu điểm:
- Phạm vi đồng bộ hóa nhỏ hơn
- Hiệu năng tốt hơn synchronized method
- Có thể chọn object để synchronize
public class BankAccount {
private double balance;
private final Object lock = new Object();
public void withdraw(double amount) {
synchronized(lock) {
if (balance >= amount) {
balance -= amount;
}
}
}
public void deposit(double amount) {
synchronized(lock) {
balance += amount;
}
}
}
3. Thread Communication
Khái Niệm
- Inter-thread Communication:
- Cho phép các thread giao tiếp với nhau
- Đồng bộ hóa hoạt động giữa các thread
- Tránh busy waiting và tối ưu tài nguyên
3.1 wait(), notify(), notifyAll()
- Cơ chế hoạt động:
- wait(): Thread chờ đến khi được notify
- notify(): Đánh thức một thread đang wait
- notifyAll(): Đánh thức tất cả thread đang wait
public class MessageQueue {
private String message;
private boolean empty = true;
public synchronized String receive() {
while (empty) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
empty = true;
notifyAll();
return message;
}
public synchronized void send(String message) {
while (!empty) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
empty = false;
this.message = message;
notifyAll();
}
}
3.2 Join
- Mục đích sử dụng:
- Chờ thread khác kết thúc
- Đảm bảo thứ tự thực thi
- Phối hợp giữa các thread
public class JoinExample {
public void demonstrateJoin() {
Thread thread = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Thread hoàn thành");
});
thread.start();
try {
thread.join(); // Chờ thread kết thúc
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Main thread tiếp tục");
}
}
4. Thread Pool và Executor Framework
Khái Niệm
- Thread Pool:
- Quản lý và tái sử dụng thread
- Giới hạn số lượng thread tối đa
-
Tối ưu hiệu năng và tài nguyên
-
Executor Framework:
- API cao cấp để quản lý thread
- Tách biệt task submission và execution
- Cung cấp nhiều loại thread pool khác nhau
4.1 ExecutorService
- Các loại Executor:
- Fixed Thread Pool: Số lượng thread cố định
- Cached Thread Pool: Tạo thread mới khi cần
- Single Thread Executor: Chỉ một thread
- Scheduled Thread Pool: Thực thi theo lịch
public class ExecutorExample {
public void demonstrateExecutor() {
ExecutorService executor = Executors.newFixedThreadPool(5);
// Submit Runnable
executor.submit(() -> {
System.out.println("Task đang chạy");
});
// Submit Callable
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Kết quả task";
});
try {
String result = future.get(); // Chờ và lấy kết quả
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
}
4.2 CompletableFuture
- Đặc điểm:
- Hỗ trợ lập trình bất đồng bộ
- Cho phép kết hợp nhiều task
- Xử lý kết quả và lỗi linh hoạt
public class CompletableFutureExample {
public void demonstrateAsync() {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
// Task 1
return "Hello";
})
.thenApplyAsync(result -> {
// Task 2
return result + " World";
})
.thenApplyAsync(String::toUpperCase);
future.thenAccept(System.out::println);
}
public void demonstrateCombine() {
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1
.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
combined.thenAccept(System.out::println);
}
}
5. Thread Safety và Concurrent Collections
Khái Niệm
- Thread Safety:
- Đảm bảo code hoạt động đúng trong môi trường đa luồng
- Tránh race condition và data corruption
- Sử dụng synchronization và atomic operations
5.1 Atomic Classes
- Đặc điểm:
- Đảm bảo atomic operations
- Không cần synchronization
- Hiệu năng tốt hơn synchronized
public class AtomicExample {
private AtomicInteger counter = new AtomicInteger(0);
private AtomicReference<String> reference = new AtomicReference<>("");
public void increment() {
counter.incrementAndGet();
}
public void updateReference() {
reference.updateAndGet(current -> current + "Updated");
}
}
5.2 Concurrent Collections
- Các loại Collection:
- ConcurrentHashMap: Map thread-safe
- CopyOnWriteArrayList: List thread-safe
- BlockingQueue: Queue cho producer-consumer
- ConcurrentSkipListMap: Sorted map thread-safe
public class ConcurrentCollectionsExample {
// Thread-safe list
private List<String> list = Collections
.synchronizedList(new ArrayList<>());
private CopyOnWriteArrayList<String> copyOnWriteList =
new CopyOnWriteArrayList<>();
// Thread-safe map
private ConcurrentHashMap<String, Integer> map =
new ConcurrentHashMap<>();
// Thread-safe queue
private BlockingQueue<String> queue =
new LinkedBlockingQueue<>();
public void demonstrate() {
// ConcurrentHashMap operations
map.put("key", 1);
map.putIfAbsent("key", 2);
map.computeIfAbsent("key", k -> k.length());
// BlockingQueue operations
try {
queue.put("element");
String element = queue.take();
// Với timeout
String poll = queue.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
6. Lock Framework
Khái Niệm
- Lock Interface:
- Thay thế linh hoạt cho synchronized
- Hỗ trợ nhiều tính năng hơn
- Kiểm soát tốt hơn về lock/unlock
6.1 ReentrantLock
- Đặc điểm:
- Hỗ trợ lock công bằng
- Có thể interrupt thread đang đợi
- Có thể thử lock không chờ đợi
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public boolean tryIncrement() {
if (lock.tryLock()) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
}
6.2 ReadWriteLock
- Đặc điểm:
- Tách biệt lock đọc và ghi
- Nhiều thread có thể đọc cùng lúc
- Chỉ một thread được ghi tại một thời điểm
public class ReadWriteLockExample {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private Map<String, String> map = new HashMap<>();
public String read(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public void write(String key, String value) {
writeLock.lock();
try {
map.put(key, value);
} finally {
writeLock.unlock();
}
}
}
7. Best Practices và Anti-patterns
7.1 Best Practices
- Nguyên tắc cơ bản:
- Sử dụng Executor Framework thay vì tạo thread trực tiếp
- Xử lý InterruptedException đúng cách
- Sử dụng concurrent collections
- Tránh synchronized cho các operation dài
public class ThreadingBestPractices {
// 1. Sử dụng Executor Framework thay vì tạo Thread trực tiếp
private final ExecutorService executor =
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// 2. Xử lý InterruptedException đúng cách
public void handleInterruption() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Đặt lại trạng thái interrupted
Thread.currentThread().interrupt();
// Cleanup và return
return;
}
}
// 3. Sử dụng concurrent collections
private final ConcurrentHashMap<String, String> cache =
new ConcurrentHashMap<>();
// 4. Tránh synchronized cho các operation dài
public void longOperation() {
// Chỉ synchronized phần cần thiết
String data;
synchronized(this) {
data = getData();
}
processData(data); // Không cần synchronized
}
}
7.2 Anti-patterns
- Các lỗi cần tránh:
- Double-checked locking không an toàn
- Synchronized trên String literal
- Không xử lý InterruptedException
- Deadlock do lock không đúng thứ tự
public class ThreadingAntiPatterns {
// BAD: Double-checked locking không an toàn
private static volatile ThreadingAntiPatterns instance;
public static ThreadingAntiPatterns getInstance() {
if (instance == null) {
synchronized(ThreadingAntiPatterns.class) {
if (instance == null) {
instance = new ThreadingAntiPatterns();
}
}
}
return instance;
}
// BAD: Synchronized trên String literal
public void synchronizeOnString() {
synchronized("lock") { // Không nên làm thế này
// code
}
}
// BAD: Không xử lý InterruptedException
public void ignoreInterruption() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Không nên bỏ qua
e.printStackTrace();
}
}
}