背景
本文是《Java 后端从小白到大神》修仙系列第十篇
,正式进入Java后端
世界,本篇文章主要聊Java基础
。若想详细学习请点击首篇博文,我们开始把。
文章概览
- 核心工具类(Java JUC)
核心工具类(续)
一、Java JUC 介绍
JUC(Java Util Concurrent) 是 Java 平台提供的一套用于高效处理多线程和并发编程的工具包,位于 java.util.concurrent 包中(Java 5 引入)。它的核心目标是简化复杂并发场景的开发,提升程序性能和可靠性,同时降低线程安全问题的风险。
二、Java 中的线程
1. 线程的概念
线程是程序执行的最小单元,是进程中的一个独立执行路径。一个进程可以包含多个线程,共享进程的内存和资源,但每个线程有自己的程序计数器、栈和局部变量。Java 中线程通过 java.lang.Thread
类实现。
2. 线程的原理
- 线程与进程的关系:
- 进程是资源分配的基本单位,线程是 CPU 调度的基本单位。
- 同一进程的线程共享堆内存,但每个线程有独立的栈内存。
- 线程调度:
- 由操作系统调度器分配 CPU 时间片,通过时间片轮转或优先级抢占实现并发。
- JVM 线程模型:
- Java 线程与操作系统原生线程一一对应(1:1 模型),由操作系统内核直接管理。
3. 线程的使用场景
- 异步任务:后台执行耗时操作(如文件下载、网络请求)。
- 高并发处理:Web 服务器处理多个客户端请求。
- 并行计算:利用多核 CPU 加速计算(如大数据处理)。
- GUI 应用:保持界面响应(如进度条更新)。
4. 创建线程的方法
1. 继承 Thread
类
1
2
3
4
5
6
7
8
9
10
11
12
13
|
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running: " + Thread.currentThread().getName());
}
}
public class Main {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程
}
}
|
2. 实现 Runnable
接口(推荐)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable running: " + Thread.currentThread().getName());
}
}
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
|
3. 使用 ExecutorService
(线程池)
1
2
3
4
5
|
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
System.out.println("Task in thread pool: " + Thread.currentThread().getName());
});
executor.shutdown();
|
5. 线程运行的原理
-
线程生命周期:
- 新建(New):线程对象创建但未启动。
- 就绪(Runnable):调用
start()
后等待 CPU 时间片。
- 运行(Running):获取 CPU 时间片执行
run()
方法。
- 阻塞(Blocked):等待锁、I/O 操作或休眠(如
sleep()
)。
- 终止(Terminated):
run()
执行完毕或发生异常。
-
内存分配:
- 栈内存:每个线程有独立的栈,存储局部变量和方法调用。
- 堆内存:所有线程共享堆,存储对象实例。
- ThreadLocal:线程私有变量,避免共享冲突。
6. 代码示例与内存模型
1. 示例:线程共享堆内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public class SharedMemoryDemo {
private static int sharedValue = 0; // 共享变量(堆内存)
public static void main(String[] args) {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
sharedValue++; // 线程不安全!
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final value: " + sharedValue); // 可能小于 2000
}
}
|
2. 示例:ThreadLocal 实现线程私有变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public class ThreadLocalDemo {
private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
threadLocal.set(10);
System.out.println("Thread 1: " + threadLocal.get()); // 输出 10
});
executor.submit(() -> {
threadLocal.set(20);
System.out.println("Thread 2: " + threadLocal.get()); // 输出 20
});
executor.shutdown();
}
}
|
总结:
-
创建线程的三种方式:
- 继承
Thread
(不推荐,Java 单继承限制)。
- 实现
Runnable
(解耦任务与线程)。
- 使用线程池(高效管理资源,避免频繁创建/销毁线程)。
-
线程安全的核心问题:
- 共享数据的原子性、可见性、有序性(需结合锁、
volatile
等机制)。
-
内存管理:
- 栈内存私有,堆内存共享。
- 使用
ThreadLocal
避免线程间数据竞争。
通过合理使用线程,可以显著提升程序性能,但需注意线程安全和资源管理问题。
三、线程间通信
1. 对象监视器锁
- 对象监视器锁(Monitor Lock) 是 Java 中用于实现线程同步的一种机制。每个 Java 对象都有一个与之关联的监视器锁。
- 当多个线程需要访问共享资源时,通过 synchronized 关键字可以确保同一时间只有一个线程能够执行被保护的代码块或方法。
- 线程在进入 synchronized 代码块或方法时会尝试获取对象的监视器锁,如果锁已被其他线程持有,则当前线程会被阻塞,直到锁被释放。
2. 等待/通知机制(wait
/notify
)
- 原理:基于对象监视器锁(
synchronized
关键字),线程通过 wait()
释放锁并等待,其他线程通过 notify()
或 notifyAll()
唤醒等待线程。
- 适用场景:生产者-消费者模型、任务协调。
- 调用 lock.wait() 的含义:
- 当前线程释放它持有的对象监视器锁。
- 当前线程进入等待状态,直到另一个线程调用同一个对象上的 notify() 或 notifyAll() 方法将其唤醒。
- 被唤醒的线程重新竞争锁,并继续执行后续代码。
- 具体流程:
- 线程 A 持有对象 lock 的监视器锁。
- 线程 A 调用 lock.wait();:
- 线程 A 释放 lock 的锁。
- 线程 A 进入等待队列。
- 其他线程(如线程 B)可以获取 lock 的锁并执行相关操作。
- 当线程 B 调用 lock.notify() 或 lock.notifyAll():
- 如果调用的是 notify(),随机唤醒一个等待的线程(如线程 A)。
- 如果调用的是 notifyAll(),所有等待的线程都被唤醒。
- 被唤醒的线程重新竞争 lock 的锁,成功获取后继续执行 wait() 后的代码。
- 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
class SharedResource {
private int data = -1;
private boolean available = false;
public synchronized void produce(int value) throws InterruptedException {
while (available) { // available == true,说明资源已被生产,尚未被消费,生产者需要等待
wait(); // 如果资源可用,生产者等待
}
data = value;
available = true;
System.out.println("Produced: " + data);
notify(); // 唤醒消费者
}
public synchronized int consume() throws InterruptedException {
// while (available) { wait(); } 的作用是让生产者在资源已被生产但尚未被消费时进入等待状态,避免覆盖现有数据。
while (!available) {
wait(); // 如果资源不可用,消费者等待
}
available = false;
System.out.println("Consumed: " + data);
notify(); // 唤醒生产者
return data;
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
resource.produce(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
resource.consume();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
|
- 注意事项:
- 必须在
synchronized
块内调用 wait()
和 notify()
。
- 使用
while
循环检查条件,避免虚假唤醒。
3. 使用 synchronized
修饰符
1. synchronized解析
在 Java 中,synchronized
关键字用于实现线程同步,它可以修饰方法或代码块。当 synchronized
修饰实例方法时,锁的对象是该方法所属的实例对象(即 this
)。
2. 详细解释
synchronized
修饰实例方法的锁机制
- 当一个线程进入被
synchronized
修饰的实例方法时,它会尝试获取该实例对象的监视器锁。
- 如果锁已被其他线程持有,则当前线程会被阻塞,直到锁被释放。
- 在代码中,
produce
和 consume
方法的锁对象是 SharedResource
的实例对象。
代码中的锁对象
假设我们创建了一个 SharedResource
对象:
1
|
SharedResource resource = new SharedResource();
|
- 当生产者线程调用
resource.produce(...)
时,它会尝试获取 resource
对象的监视器锁。
- 当消费者线程调用
resource.consume()
时,它也会尝试获取同一个 resource
对象的监视器锁。
- 因此,
produce
和 consume
方法共享同一个锁对象,即 resource
。
3. 验证锁对象
可以通过以下代码验证锁对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
class SharedResource {
private int data = -1;
private boolean available = false;
public synchronized void produce(int value) throws InterruptedException {
System.out.println("Producer thread holds lock: " + Thread.holdsLock(this));
// 其他逻辑...
}
public synchronized int consume() throws InterruptedException {
System.out.println("Consumer thread holds lock: " + Thread.holdsLock(this));
// 其他逻辑...
return -1;
}
}
|
Thread.holdsLock(this)
方法可以检查当前线程是否持有指定对象的锁。
- 输出结果将显示生产者和消费者线程持有的锁对象是同一个
SharedResource
实例。
总结
produce
和 consume
方法对应的对象监视器锁是 SharedResource
的实例对象。
- 这是因为
synchronized
修饰实例方法时,锁对象是该方法所属的实例对象(即 this
)。
- 在多线程环境下,生产者和消费者通过共享同一个锁对象来实现线程间的互斥访问,确保资源的安全性。
4. 使用 volatile
变量
1. volatile 关键字的作用
在 Java 中,volatile 是一个用于修饰变量的关键字,它确保了多线程环境下对变量的读写操作具有可见性和有序性。具体来说:
- 可见性:当一个线程修改了 volatile 变量的值,其他线程能够立即看到这个修改。
- 有序性:禁止指令重排序优化,确保对 volatile 变量的读写操作不会被编译器或处理器乱序执行。
2. 如何保证变量可见性
volatile 通过以下机制确保变量的可见性:
- 内存屏障(Memory Barrier)
- 当一个线程写入 volatile 变量时,JVM 会在写操作后插入一个 写屏障,确保所有之前的操作都已完成并写入主内存。
- 当另一个线程读取 volatile 变量时,JVM 会在读操作前插入一个 读屏障,确保从主内存中读取最新的值。
- 缓存一致性
- 在多核处理器中,每个核心都有自己的缓存。volatile 确保对变量的修改会立即同步到主内存,并且其他线程读取该变量时会直接从主内存中读取最新值,而不是从本地缓存中读取过期的值。
- 禁止指令重排序
- 编译器和处理器可能会对代码进行优化,重新排列指令以提高性能。然而,对于 volatile 变量,JVM 会确保读写操作不会被重排序,从而保证程序的行为符合预期。
- 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class VolatileExample {
private static volatile boolean flag = false;
public static void main(String[] args) throws InterruptedException {
Thread writerThread = new Thread(() -> {
try {
Thread.sleep(2000); // 模拟一些工作
flag = true;
System.out.println("Writer thread set flag to true.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread readerThread = new Thread(() -> {
while (!flag) {
// 等待 flag 被设置为 true
}
System.out.println("Reader thread detected flag as true.");
});
writerThread.start();
readerThread.start();
// 主线程和子线程。当主线程调用 subThread.join() 时,主线程会暂停执行,直到 subThread 完成
writerThread.join();
readerThread.join();
}
}
|
3. BlockingQueue
阻塞队列
BlockingQueue 是 Java 中用于实现线程间通信的一种机制。它提供了一种线程安全的队列,允许一个或多个生产者线程将元素插入队列,同时允许一个或多个消费者线程从队列中移除元素。
- 原理:
- 当队列为空时,尝试从队列中获取元素的操作会阻塞,直到有元素可用;同样,当队列为满时,尝试插入元素的操作也会阻塞,直到有空间可用。
- 线程安全:BlockingQueue 实现了所有必要的同步机制,确保多线程环境下对队列的操作是安全的。
- 多种实现:
- Java 提供了多种 BlockingQueue 的实现类,如 ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue 等,每种实现适用于不同的场景。
- 典型应用场景:生产者线程:负责生成数据并将其放入队列。消费者线程:负责从队列中取出数据并进行处理。生产者和消费者可以高效地协作,而无需显式地管理锁和条件变量。
- BlockingQueue 的主要方法:
- put(E e):将元素插入队列,如果队列已满,则阻塞直到有空间可用。
- take():从队列中移除并返回元素,如果队列为空,则阻塞直到有元素可用。
- offer(E e, long timeout, TimeUnit unit):尝试在指定时间内将元素插入队列,如果超时则返回 false。
- poll(long timeout, TimeUnit unit):尝试在指定时间内从队列中移除元素,如果超时则返回 null。
- peek():查看队列头部的元素而不移除它,如果队列为空则返回 null。
- 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
private static final int CAPACITY = 5;
private static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);
public static void main(String[] args) {
Thread producerThread = new Thread(new Producer());
Thread consumerThread = new Thread(new Consumer());
producerThread.start();
consumerThread.start();
}
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Producing: " + i);
queue.put(i); // 如果队列已满,put() 会阻塞
Thread.sleep(1000); // 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
Integer value = queue.take(); // 如果队列为空,take() 会阻塞
System.out.println("Consuming: " + value);
Thread.sleep(2000); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
|
4. Lock
与 Condition
通过 Lock
替代 synchronized
,Condition
替代 wait
/notify
,提供更灵活的线程通信。
Lock 的作用:Lock 接口及其实现类(如 ReentrantLock)提供了显式的锁操作,允许更细粒度的锁控制。相比 synchronized,Lock 提供了以下优势:
- 可中断的锁获取:可以在等待锁时响应中断。
- 超时锁获取:可以指定等待锁的最大时间。
- 公平锁:可以选择是否按照请求顺序分配锁。
- 多个条件变量:通过 Condition 对象实现更复杂的线程间协作。
Condition 的作用:Condition 接口通常与 Lock 一起使用,提供了一种线程间的条件等待机制。它类似于 Object 类中的 wait()、notify() 和 notifyAll() 方法,但更加灵活和安全。Condition 主要用于以下场景:
- 线程等待特定条件:当某个条件不满足时,线程可以调用 await() 进入等待状态。
- 线程唤醒其他线程:当条件满足时,可以通过 signal() 或 signalAll() 唤醒等待的线程。
主要方法:
- await():使当前线程进入等待状态,直到被其他线程唤醒或超时。
- signal():唤醒一个等待的线程。
- signalAll():唤醒所有等待的线程。
Lock 和 Condition 的典型应用场景:Lock 和 Condition 经常用于生产者-消费者模式或其他需要复杂线程间协作的场景。以下是一个使用 Lock 和 Condition 实现生产者-消费者模式的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockConditionExample {
private static final int CAPACITY = 5;
private static final Queue<Integer> queue = new LinkedList<>();
private static final Lock lock = new ReentrantLock();
private static final Condition notFull = lock.newCondition();
private static final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
Thread producerThread = new Thread(new Producer());
Thread consumerThread = new Thread(new Consumer());
producerThread.start();
consumerThread.start();
}
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
produce(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void produce(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == CAPACITY) {
System.out.println("Queue is full, waiting...");
notFull.await(); // 如果队列已满,等待
}
queue.offer(value);
System.out.println("Produced: " + value);
notEmpty.signal(); // 唤醒消费者
} finally {
lock.unlock();
}
Thread.sleep(1000); // 模拟生产时间
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
Integer value = consume();
if (value == null) break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private Integer consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Queue is empty, waiting...");
notEmpty.await(); // 如果队列为空,等待
}
Integer value = queue.poll();
System.out.println("Consumed: " + value);
notFull.signal(); // 唤醒生产者
return value;
} finally {
lock.unlock();
}
Thread.sleep(2000); // 模拟消费时间
}
}
}
|
5. CountDownLatch
CountDownLatch 是 Java 中用于实现线程间通信和同步的一种机制。它允许一个或多个线程等待其他线程完成一组操作,直到计数器达到零。
原理:CountDownLatch 使用一个计数器来控制线程的执行顺序。其主要功能如下:
- 初始化计数器:在创建 CountDownLatch 对象时,需要指定一个初始计数值。
- 等待线程:调用 await() 方法的线程会阻塞,直到计数器为零。
- 减少计数器:调用 countDown() 方法会将计数器减一,当计数器变为零时,所有等待的线程会被唤醒并继续执行。
graph LR
A[主线程<br>(3个计数器)] --等待--> B[子线程1] --> D[完成任务计数减1]
A --等待--> C[子线程2] --> E[完成任务计数减1]
A --等待--> F[子线程3] --> G[完成任务计数减1]
D --> H[主线程继续执行]
E --> H
G --> H
关键方法:
- await():使当前线程等待,直到计数器为零,或者线程被中断。
- countDown():将计数器减一。
- getCount():返回当前计数器的值。
CountDownLatch 与 join() 的对比:
- CountDownLatch:适用于多个线程的同步,尤其是当主线程需要等待多个子线程完成时。它可以更灵活地控制线程间的同步关系。
- join():适用于单个线程的同步,即主线程等待某个特定的子线程完成。它的使用场景相对简单,但不够灵活。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// 使用 CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(new Worker(latch)).start();
}
latch.await(); // 等待所有子线程完成
// 使用 join()
Thread t1 = new Thread(new Worker());
Thread t2 = new Thread(new Worker());
Thread t3 = new Thread(new Worker());
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join(); // 依次等待每个子线程完成
|
CountDownLatch 的典型应用场景:
- 多线程任务的同步:例如,主线程等待多个子线程完成后再继续执行。
- 启动信号:确保多个线程同时开始执行。
- 关闭信号:确保多个线程在某个条件满足后同时结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
private static final int THREAD_COUNT = 3;
private static final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Worker(latch, "Worker-" + i)).start();
}
System.out.println("Main thread waiting for workers to finish...");
latch.await(); // 主线程等待所有子线程完成
System.out.println("All workers have finished.");
}
static class Worker implements Runnable {
private final CountDownLatch latch;
private final String name;
public Worker(CountDownLatch latch, String name) {
this.latch = latch;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " is working...");
Thread.sleep(1000); // 模拟工作时间
System.out.println(name + " has finished.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 工作完成后减少计数器,latch.countDown() 是在每个 Worker 线程完成其任务后才被调用的
}
}
}
}
|
6. CyclicBarrier
CyclicBarrier 是 Java 中用于实现线程间通信和同步的一种机制。它允许一组线程相互等待,直到所有线程都到达一个公共屏障点(即屏障),然后一起继续执行。
原理:CyclicBarrier 使用一个计数器来跟踪到达屏障的线程数量。其主要功能如下:
- 初始化屏障:在创建 CyclicBarrier 对象时,需要指定参与的线程数量。
- 等待线程:调用 await() 方法的线程会阻塞,直到所有线程都调用了 await()。
- 重用屏障:与 CountDownLatch 不同,CyclicBarrier 可以重用,即一旦所有线程通过了屏障,计数器会被重置,可以再次使用。
graph LR
A[主线程] --> B[子线程1] --> D[子线程1任务完成到达屏障等待]
A --> C[子线程2] --> E[子线程2任务完成到达屏障等待]
A --> F[子线程3] --> G[子线程3任务完成到达屏障等待]
D --> H[全部到达屏障触发]
E --> H
G --> H
H --> I[主线程再执行屏障统一动作]
关键方法
- await():使当前线程等待,直到所有线程都到达屏障,或者线程被中断或超时。
- reset():将屏障重置为初始状态,使得计数器重新开始计数。
- getParties():返回参与屏障的线程数量。
- getNumberWaiting():返回当前正在等待的线程数量。
CyclicBarrier 与 CountDownLatch 的对比
- CyclicBarrier:适用于需要多次同步的场景,可以重用,所有线程必须同时到达屏障才能继续。
- CountDownLatch:适用于一次性任务的同步,计数器只能递减到零,不能重置,适用于主线程等待多个子线程完成。
1
2
3
4
5
6
7
8
9
10
11
12
|
// 使用 CyclicBarrier
CyclicBarrier barrier = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
new Thread(new Worker(barrier)).start();
}
// 使用 CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(new Worker(latch)).start();
}
latch.await(); // 主线程等待所有子线程完成
|
CyclicBarrier 的典型应用场景:
- 多线程协作任务:多个线程需要在某个阶段同步,确保它们同时开始下一个阶段。
- 并行计算:多个线程并行处理数据,完成后需要同步结果。
- 分阶段任务:任务分为多个阶段,每个阶段的所有线程必须完成才能进入下一阶段。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
private static final int THREAD_COUNT = 3;
private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
System.out.println("All threads have reached the barrier, continuing...");
});
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Worker(barrier, "Worker-" + i)).start(); // 主线程继续执行,不会等待 Worker 线程。
}
}
static class Worker implements Runnable {
private final CyclicBarrier barrier;
private final String name;
public Worker(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " is working...");
Thread.sleep((long) (Math.random() * 1000)); // 模拟不同时间的工作
System.out.println(name + " has finished.");
barrier.await(); // 等待其他线程到达屏障,阻塞自己直到所有线程都到达屏障
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
System.out.println(name + " was interrupted or barrier was broken.");
}
}
}
}
|
7. Exchanger
数据交换
Exchanger 是 Java 中用于实现线程间通信的一种机制。它允许两个线程在某个特定点交换对象。每个线程调用 exchange() 方法时会阻塞,直到另一个线程也调用 exchange(),然后两个线程可以交换它们传递的对象。
原理:Exchanger 提供了一种同步机制,确保两个线程可以在指定的屏障点交换数据。其主要功能如下:
- 初始化:创建一个 Exchanger 对象。
- 交换对象:两个线程分别调用 exchange(V x) 方法,传入要交换的对象。
- 阻塞等待:调用 exchange() 的线程会阻塞,直到另一个线程也调用 exchange()。
- 完成交换:当两个线程都到达交换点后,它们会交换各自传递的对象,并继续执行。
关键方法
- exchange(V x):尝试与其他线程交换对象 x,如果另一个线程也在等待交换,则立即交换并返回对方的对象;否则,当前线程会阻塞,直到另一个线程也调用 exchange()。
- exchange(V x, long timeout, TimeUnit unit):带有超时时间的交换操作,如果在指定时间内没有完成交换,则抛出 TimeoutException。
Exchanger 与 CyclicBarrier 和 CountDownLatch 的对比
- Exchanger:专门用于两个线程之间的数据交换,确保两个线程在指定的屏障点同步交换对象。
- CyclicBarrier:允许多个线程相互等待,直到所有线程都到达屏障点,适用于多线程同步任务。
- CountDownLatch:允许一个或多个线程等待其他线程完成一组操作,适用于主线程等待多个子线程完成的场景。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// 使用 Exchanger
Exchanger<String> exchanger = new Exchanger<>();
Thread threadA = new Thread(() -> {
try {
String data = "Data from A";
String exchangedData = exchanger.exchange(data);
System.out.println("Thread A received: " + exchangedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread threadB = new Thread(() -> {
try {
String data = "Data from B";
String exchangedData = exchanger.exchange(data);
System.out.println("Thread B received: " + exchangedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
threadA.start();
threadB.start();
|
Exchanger 的典型应用场景:
- 双线程协作任务:两个线程需要在某个阶段交换数据,例如生产者和消费者模式中的数据交换。
- 分阶段任务:任务分为多个阶段,每个阶段由不同线程处理,完成后需要交换中间结果。
- 对称任务:两个线程执行对称的任务,需要在某个点交换状态或数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
public class ExchangerExample {
private static final Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
Thread threadA = new Thread(new ExchangerWorker("Thread A", "Data from A"));
Thread threadB = new Thread(new ExchangerWorker("Thread B", "Data from B"));
threadA.start();
threadB.start();
try {
threadA.join();
threadB.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
static class ExchangerWorker implements Runnable {
private final Exchanger<String> exchanger;
private final String name;
private final String data;
public ExchangerWorker(Exchanger<String> exchanger, String name, String data) {
this.exchanger = exchanger;
this.name = name;
this.data = data;
}
@Override
public void run() {
try {
System.out.println(name + " is working...");
Thread.sleep((long) (Math.random() * 1000)); // 模拟不同时间的工作
System.out.println(name + " has finished.");
// 尝试与其他线程交换数据
String exchangedData = exchanger.exchange(data);
System.out.println(name + " received: " + exchangedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(name + " was interrupted.");
}
}
}
}
|
总结:线程间通信方式对比:
机制 |
特点 |
适用场景 |
wait /notify |
基于监视器锁,需同步块 |
简单协作,如生产者-消费者 |
volatile |
保证可见性,不保证原子性 |
状态标记(如停止线程) |
BlockingQueue |
线程安全,支持阻塞操作 |
生产者-消费者模型(解耦) |
Lock + Condition |
更灵活的条件控制 |
多条件等待 |
CountDownLatch |
一次性屏障,等待多个线程完成 |
主线程等待子线程初始化 |
CyclicBarrier |
可重复使用的屏障 |
多阶段并行任务 |
Exchanger |
两个线程交换数据 |
双向数据传递 |
最佳实践:
- 优先选择高层工具:如
BlockingQueue
、CountDownLatch
,避免直接使用底层 wait
/notify
。
- 避免忙等待:使用阻塞机制(如
take()
、await()
)替代循环检查条件。
- 注意线程安全:共享变量需通过锁或原子类保护。
- 资源释放:确保锁、连接等资源在
finally
块中释放。
四、线程池(Executor框架)
1. 核心接口与类
Executor
:顶层接口,定义执行任务的 execute()
方法。
ExecutorService
:扩展Executor
,支持任务提交、终止和管理。
ThreadPoolExecutor
:线程池核心实现类。
Executors
:工厂类,提供创建常见线程池的静态方法,提供常见线程池配置(需谨慎使用,避免资源耗尽)。
2. 线程池参数
- 核心参数:
corePoolSize
:核心线程数。
maximumPoolSize
:最大线程数。
keepAliveTime
:非核心线程空闲存活时间。
workQueue
:任务队列(如LinkedBlockingQueue
、ArrayBlockingQueue
)。
RejectedExecutionHandler
:拒绝策略(如AbortPolicy
、CallerRunsPolicy
)。
1
2
3
4
5
6
7
8
9
|
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数(长期保留的线程)
int maximumPoolSize, // 最大线程数(线程池允许的最大线程数)
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 存活时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂(自定义线程创建)
RejectedExecutionHandler handler // 拒绝策略
)
|
3. 使用方式
1. 通过 Executors
工厂类创建
1
2
3
4
5
6
7
8
9
10
11
|
// 固定大小线程池(无界队列)
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 可缓存线程池(自动扩容/缩容)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 单线程池(顺序执行任务)
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
// 支持定时任务的线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
|
2. 直接使用 ThreadPoolExecutor
(推荐)
1
2
3
4
5
6
7
8
9
|
ThreadPoolExecutor customPool = new ThreadPoolExecutor(
2, // corePoolSize
5, // maximumPoolSize
60, // keepAliveTime
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10), // 有界队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
|
4. 工作流程
- 提交任务时,若核心线程未满,创建新线程执行。
- 核心线程已满,任务进入工作队列等待。
- 队列满后,若线程数未达最大限制,创建非核心线程执行。
- 线程数已达最大且队列已满,触发拒绝策略。
5. 底层原理
- 线程管理:通过
Worker
内部类封装线程,循环从队列获取任务。
- 状态控制:使用
AtomicInteger
的高3位表示状态(RUNNING、SHUTDOWN、STOP等)。
- 任务调度:核心线程通过
getTask()
从队列获取任务,支持超时回收非核心线程。
6. 自定义线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
import java.util.concurrent.*;
public class CustomThreadPoolDemo {
public static void main(String[] args) {
// 自定义参数
int corePoolSize = 2;
int maxPoolSize = 5;
long keepAliveTime = 30;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
ThreadFactory threadFactory = new CustomThreadFactory();
RejectedExecutionHandler handler = new CustomRejectionPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler
);
// 提交任务
for (int i = 0; i < 15; i++) {
executor.execute(new Task(i));
}
// 关闭线程池
executor.shutdown();
}
static class Task implements Runnable {
private int taskId;
public Task(int id) {
this.taskId = id;
}
@Override
public void run() {
System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟任务耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 自定义线程工厂(设置线程名称)
static class CustomThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory() {
namePrefix = "CustomPool-" + poolNumber.getAndIncrement() + "-Thread-";
}
public Thread newThread(Runnable r) {
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
}
}
// 自定义拒绝策略(记录日志并重试)
static class CustomRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("Task rejected: " + r.toString());
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r); // 阻塞直到队列有空位
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
|
7. 拒绝策略
- AbortPolicy(默认):抛出
RejectedExecutionException
。
- CallerRunsPolicy:由提交任务的线程直接执行。
- DiscardPolicy:静默丢弃被拒绝的任务。
- DiscardOldestPolicy:丢弃队列中最旧的任务,重新提交当前任务。
在生产环境中,线程池的核心线程数(corePoolSize
)和最大线程数(maximumPoolSize
)的合理设置,需要结合 任务类型、硬件资源、业务场景 和 系统容错能力 综合考虑。以下是详细的分析和实践建议:
8. 生成环境线程池设置
1. 任务类型分析
- CPU密集型任务(如复杂计算、图像处理):
- 线程数过多会导致频繁的上下文切换,降低性能。
- 建议:
corePoolSize = CPU核心数 + 1
。
- 例如:4核CPU →
corePoolSize = 5
。
- IO密集型任务(如网络请求、文件读写、数据库操作):
- 线程在等待IO时不会占用CPU,可以适当增加线程数。
- 建议:
corePoolSize = 2 * CPU核心数
。
- 例如:4核CPU →
corePoolSize = 8
。
2. 硬件资源
- CPU核心数:通过
Runtime.getRuntime().availableProcessors()
获取。
- 内存限制:每个线程需要约
1MB
栈内存,线程数过多可能导致 OutOfMemoryError
。
3. 业务场景
- 低延迟任务:需快速响应,队列容量不宜过大。
- 批量任务:允许一定延迟,可增大队列容量。
- 混合任务:区分优先级,使用多个线程池隔离。
4. 容错能力
- 队列满后的拒绝策略(如降级、重试、日志记录)需与业务容错逻辑匹配。
5. 通用公式(参考)
-
CPU密集型:
1
2
|
corePoolSize = N + 1 // N为CPU核心数
maxPoolSize = N + 1 // 或等于corePoolSize
|
-
IO密集型:
1
2
|
corePoolSize = 2 * N
maxPoolSize = 2 * N 或更高(需结合队列容量和任务等待时间)
|
6. 精确公式(基于任务等待时间)
7. 典型场景配置
场景 |
corePoolSize |
maxPoolSize |
队列类型 |
拒绝策略 |
HTTP请求处理(Web服务) |
2 * N |
4 * N |
LinkedBlockingQueue |
CallerRunsPolicy |
数据库批量写入 |
N + 1 |
N + 1 |
ArrayBlockingQueue |
AbortPolicy |
异步日志记录 |
1 |
1 |
SynchronousQueue |
DiscardPolicy |
8. 队列容量选择
9. 动态调整
-
监控指标:线程池活跃度、队列堆积、拒绝任务数。
-
动态修改参数(Spring Boot示例):
1
2
3
4
5
6
7
8
9
|
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(50);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
|
10. 完整代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import java.util.concurrent.*;
public class ProductionThreadPoolConfig {
public static void main(String[] args) {
// 1. 获取CPU核心数
int cpuCores = Runtime.getRuntime().availableProcessors();
// 2. 配置线程池参数(以IO密集型为例)
int corePoolSize = 2 * cpuCores;
int maxPoolSize = 4 * cpuCores;
int queueCapacity = 100;
long keepAliveTime = 30;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity),
new CustomThreadFactory(), // 自定义线程名称
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满后由提交任务的线程执行
);
// 3. 提交任务
for (int i = 0; i < 200; i++) {
executor.execute(() -> {
try {
// 模拟IO操作(如HTTP请求)
Thread.sleep(100);
System.out.println("Task executed by: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 4. 关闭线程池
executor.shutdown();
}
// 自定义线程工厂(便于监控)
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PROD-THREAD-" + counter.getAndIncrement());
}
}
}
|
11. 注意事项
- 避免无界队列:使用
ArrayBlockingQueue
或 LinkedBlockingQueue
时需指定容量。
- 拒绝策略选择:
CallerRunsPolicy
:适合不允许丢失任务的场景。
AbortPolicy
:适合需要快速失败降级的场景。
- 监控与调优:
- 使用JMX或监控工具(如Prometheus + Grafana)观察线程池状态。
- 关注指标:
ActiveThreads
、QueueSize
、CompletedTaskCount
、RejectedTaskCount
。
12. 总结
- CPU密集型:核心线程数 ≈ CPU核心数,避免过多线程竞争。
- IO密集型:核心线程数可扩大,充分利用等待时间。
- 队列容量:需平衡吞吐量和内存占用。
- 动态调整:根据实际负载和监控数据持续优化。
生产环境中没有“万能配置”,需结合 压力测试 和 业务特性 最终确定参数。
13. 注意事项
- 避免使用无界队列:可能导致内存溢出(如
LinkedBlockingQueue
未指定容量)。
- 合理配置线程数:根据任务类型(CPU密集型、IO密集型)调整核心线程数。
- 显式关闭线程池:调用
shutdown()
或 shutdownNow()
避免资源泄漏。
总结
JUC提供了丰富的并发工具,合理使用可以显著提升多线程程序的性能和可靠性。掌握其核心组件(线程池、锁、原子类、并发集合)的原理和使用场景,结合实践加深理解,是Java并发编程的关键。