背景
本文是《Java 后端从小白到大神》修仙系列第十篇,正式进入Java后端世界,本篇文章主要聊Java基础中的JUC(Java Util Concurrent)并发工具包。JUC是Java并发编程的核心工具,掌握它对于开发高性能、线程安全的应用程序至关重要。若想详细学习请点击首篇博文,我们开始吧。
文章概览
- JUC 框架概述
- 线程基础与原理
- 线程间通信机制
- 线程池(Executor框架)
- 并发工具类详解
- 最佳实践与性能调优
1. JUC 框架概述
JUC(Java Util Concurrent)是 Java 5 引入的一套用于高效处理多线程和并发编程的工具包,位于 java.util.concurrent 包中。它的核心目标是简化复杂并发场景的开发,提升程序性能和可靠性,同时降低线程安全问题的风险。
1.1 JUC 的核心组件
JUC 主要包含以下核心组件:
| 组件类型 |
核心类/接口 |
作用 |
| 线程池 |
ExecutorService、ThreadPoolExecutor |
管理线程生命周期,提高线程利用率 |
| 并发集合 |
ConcurrentHashMap、CopyOnWriteArrayList |
线程安全的集合实现 |
| 同步工具 |
CountDownLatch、CyclicBarrier、Semaphore |
线程间同步与协作 |
| 锁 |
Lock、ReentrantLock、ReadWriteLock |
比 synchronized 更灵活的锁机制 |
| 原子类 |
AtomicInteger、AtomicReference |
无锁的线程安全操作 |
| 线程安全工具 |
ThreadLocal、Future、Callable |
线程本地存储和异步任务 |
1.2 JUC 的设计理念
JUC 的设计遵循以下核心原则:
- 分离关注点:将线程管理与任务执行分离
- 提高并发性能:通过非阻塞算法、CAS 操作等提高并发性能
- 简化编程模型:提供高级抽象,减少手动线程管理
- 安全性优先:内置线程安全机制,降低并发错误
2. 线程基础与原理
2.1 线程的概念
线程是程序执行的最小单元,是进程中的一个独立执行路径。一个进程可以包含多个线程,共享进程的内存和资源,但每个线程有自己的程序计数器、栈和局部变量。Java 中线程通过 java.lang.Thread 类实现。
2.2 线程的原理
-
线程与进程的关系:
- 进程是资源分配的基本单位,线程是 CPU 调度的基本单位。
- 同一进程的线程共享堆内存,但每个线程有独立的栈内存。
-
线程调度:
- 由操作系统调度器分配 CPU 时间片,通过时间片轮转或优先级抢占实现并发。
-
JVM 线程模型:
- Java 线程与操作系统原生线程一一对应(1:1 模型),由操作系统内核直接管理。
用最生活化的比喻
- 操作系统(OS)= 国家政府
- 内核线程(KLT)= 正式公务员
- Java 线程 = 你开的一个任务
Java 1:1 线程模型 = 你每开一个 Java 线程,政府就派一个正式公务员去跑。
- 你启动
new Thread() → OS 立刻创建一个真实内核线程
- 你
start() → OS 调度这个线程跑
- 你
wait() / sleep() → OS 把它挂起
- 你结束 → OS 销毁它
Java 虚拟机只负责调用操作系统 API,自己不调度、不管理,完全交给操作系统。
2.3 线程的使用场景
- 异步任务:后台执行耗时操作(如文件下载、网络请求)。
- 高并发处理:Web 服务器处理多个客户端请求。
- 并行计算:利用多核 CPU 加速计算(如大数据处理)。
- GUI 应用:保持界面响应(如进度条更新)。
2.4 创建线程的方法
2.4.1 继承 Thread 类
1
2
3
4
5
6
7
8
9
10
11
12
13
|
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running: " + Thread.currentThread().getName());
}
}
public class ThreadExample {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动线程
}
}
|
2.4.2 实现 Runnable 接口(推荐)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable running: " + Thread.currentThread().getName());
}
}
public class RunnableExample {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
|
2.4.3 使用 Callable 和 Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("Callable running: " + Thread.currentThread().getName());
return 42;
}
}
public class CallableExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();
Integer result = futureTask.get(); // 等待任务完成并获取结果
System.out.println("Result: " + result); // Result: 42
}
}
|
2.4.4 使用 ExecutorService(线程池)
1
2
3
4
5
6
7
8
9
10
11
12
|
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
System.out.println("Task in thread pool: " + Thread.currentThread().getName());
});
executor.shutdown();
}
}
|
2.5 线程生命周期
- 新建(New):线程对象创建但未启动。
- 就绪(Runnable):调用
start() 后等待 CPU 时间片。
- 运行(Running):获取 CPU 时间片执行
run() 方法。
- 阻塞(Blocked):等待锁、I/O 操作或休眠(如
sleep())。
- 终止(Terminated):
run() 执行完毕或发生异常。
2.6 线程内存模型
-
内存分配:
- 栈内存:每个线程有独立的栈,存储局部变量和方法调用。
- 堆内存:所有线程共享堆,存储对象实例。
- 方法区:存储类信息、常量、静态变量等。
-
可见性问题:
- 由于 CPU 缓存的存在,一个线程对共享变量的修改可能不会立即被其他线程看到。
- 解决方法:使用
volatile 关键字或锁机制。
-
ThreadLocal:
- 线程私有变量,避免共享冲突。
- 注意:使用不当可能导致内存泄漏。
2.6.1 示例:线程共享堆内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public class SharedMemoryDemo {
private static int sharedValue = 0; // 共享变量(堆内存)
public static void main(String[] args) {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
sharedValue++; // 线程不安全!
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final value: " + sharedValue); // 可能小于 2000
}
}
|
2.6.2 示例:ThreadLocal 实现线程私有变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class ThreadLocalDemo {
// 1. ThreadLocal 是静态的(全局只有一个工具)
// 2. withInitial(() -> 0):每个线程第一次get(),默认值是0
// 3. 每个线程存自己的值,互相隔离、互不干扰
private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
// 创建固定2个线程的线程池
// 注意:此时还没有创建任何线程!线程池是空的!
java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(2);
// ==============================================
// 必须执行 executor.submit(任务),才会:
// 1. 把任务交给线程池
// 2. 线程池才会创建线程
// 3. 任务才会开始运行
// 不执行 submit → 没有任务 → 不创建线程 → 什么都不会跑!
// ==============================================
// ====================== 第一次 submit ======================
// 执行 submit → 提交【任务1】→ 线程池创建【线程1】来执行
executor.submit(() -> {
threadLocal.set(10);
System.out.println("Thread 1: " + threadLocal.get());
});
// ====================== 第二次 submit ======================
// 执行 submit → 提交【任务2】→ 线程池创建【线程2】来执行
executor.submit(() -> {
threadLocal.set(20);
System.out.println("Thread 2: " + threadLocal.get());
});
// 关闭线程池,不再接受新任务
executor.shutdown();
}
}
|
3. 线程间通信机制
3.1 对象监视器锁
- 对象监视器锁(Monitor Lock)是 Java 中用于实现线程同步的一种机制。每个 Java 对象都有一个与之关联的监视器锁。
- 当多个线程需要访问共享资源时,通过
synchronized 关键字可以确保同一时间只有一个线程能够执行被保护的代码块或方法。
- 线程在进入
synchronized 代码块或方法时会尝试获取对象的监视器锁,如果锁已被其他线程持有,则当前线程会被阻塞,直到锁被释放。
一句话先讲透
对象监视器锁 = 每个 Java 对象自带的一把“唯一钥匙”
synchronized = 抢这把钥匙
抢到钥匙才能进房间干活,没抢到就在外面等着
3.2 等待/通知机制(wait/notify)
- 原理:基于对象监视器锁(
synchronized 关键字),线程通过 wait() 释放锁并等待,其他线程通过 notify() 或 notifyAll() 唤醒等待线程。
- 适用场景:生产者-消费者模型、任务协调。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
/**
* SharedResource = 一台【自动售货机】
*/
class SharedResource {
private int data = -1; // 商品(饮料)
private boolean available = false; // 售货机里【有没有货】
// false = 空的,true = 有货
/**
* 生产者 = 【补货员】
* 上锁 = 谁都不能同时动这台售货机
*/
public synchronized void produce(int value) throws InterruptedException {
// 售货机【有货】available = true
// 补货员:有货我就不能补,等着别人买走
while (available) {
wait(); // 释放锁 → 睡觉等通知
}
// 走到这说明:货空了,可以补货
data = value; // 把新饮料放进去
available = true; // 标记:现在有货了
System.out.println("Produced: " + data);
notify(); // 喊一声:货补好了!谁等着买就起来吧
}
/**
* 消费者 = 【买饮料的人】
* 上锁 = 同一时间只能一个人买
*/
public synchronized int consume() throws InterruptedException {
// 售货机【没货】available = false
// 顾客:没货我买不了,等着补货
while (!available) {
wait(); // 释放锁 → 睡觉等通知
}
// 走到这说明:有货了,可以买
available = false; // 拿走饮料,售货机变空
System.out.println("Consumed: " + data);
notify(); // 喊一声:饮料买走啦,补货员可以来补货了
return data;
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
SharedResource vendingMachine = new SharedResource(); // 一台售货机
// 线程1:补货员,循环补5次货
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
vendingMachine.produce(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 线程2:顾客,循环买5次
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
vendingMachine.consume();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
|
3.3 使用 volatile 变量
- 作用:确保多线程环境下对变量的读写操作具有可见性和有序性。
- 可见性:当一个线程修改了
volatile 变量的值,其他线程能够立即看到这个修改。
- 有序性:禁止指令重排序优化,确保对
volatile 变量的读写操作不会被编译器或处理器乱序执行。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class VolatileExample {
private static volatile boolean flag = false;
// 99.9% 写先打印,读后打印,写线程 flag=true 之后紧接着执行打印,是连续指令,速度极快,读线程必须先退出循环,才能打印,多一步动作
public static void main(String[] args) throws InterruptedException {
Thread writerThread = new Thread(() -> {
try {
Thread.sleep(2000); // 模拟一些工作
flag = true;
System.out.println("Writer thread set flag to true.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread readerThread = new Thread(() -> {
while (!flag) {
// 等待 flag 被设置为 true
}
System.out.println("Reader thread detected flag as true.");
});
writerThread.start();
readerThread.start();
writerThread.join();
readerThread.join();
}
}
|
3.4 BlockingQueue 阻塞队列
- 原理:提供了一种线程安全的队列,允许一个或多个生产者线程将元素插入队列,同时允许一个或多个消费者线程从队列中移除元素。
- 特点:当队列为空时,尝试从队列中获取元素的操作会阻塞;当队列为满时,尝试插入元素的操作也会阻塞。
- 实现类:
ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue 等。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
private static final int CAPACITY = 5;
private static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);
public static void main(String[] args) {
Thread producerThread = new Thread(new Producer());
Thread consumerThread = new Thread(new Consumer());
producerThread.start();
consumerThread.start();
}
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Producing: " + i);
queue.put(i); // 如果队列已满,put() 会阻塞
Thread.sleep(1000); // 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
Integer value = queue.take(); // 如果队列为空,take() 会阻塞
System.out.println("Consuming: " + value);
Thread.sleep(2000); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
|
3.5 Lock 与 Condition
Lock:提供了比 synchronized 更灵活的锁操作,如可中断锁、超时锁、公平锁等。
Condition:与 Lock 配合使用,提供了更灵活的条件等待机制,类似于 wait()/notify() 但功能更强大。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockConditionExample {
private static final int CAPACITY = 5;
private static final Queue<Integer> queue = new LinkedList<>();
private static final Lock lock = new ReentrantLock();
private static final Condition notFull = lock.newCondition();
private static final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
Thread producerThread = new Thread(new Producer());
Thread consumerThread = new Thread(new Consumer());
producerThread.start();
consumerThread.start();
}
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
produce(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void produce(int value) throws InterruptedException {
lock.lock();
try {
while (queue.size() == CAPACITY) {
System.out.println("Queue is full, waiting...");
notFull.await(); // 队列满 = 不能生产 = 等待 “不满” 信号 → 所以 await notFull
}
queue.offer(value);
System.out.println("Produced: " + value);
notEmpty.signal(); // 唤醒消费者
} finally {
lock.unlock();
}
Thread.sleep(1000); // 模拟生产时间
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
consume();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Queue is empty, waiting...");
notEmpty.await(); // 队列空了 = 不能消费 = 等待 “不空” 信号 → 所以 await notEmpty
}
Integer value = queue.poll();
System.out.println("Consumed: " + value);
notFull.signal(); // 唤醒生产者
} finally {
lock.unlock();
}
Thread.sleep(2000); // 模拟消费时间
}
}
}
|
3.6 CountDownLatch
- 原理:使用一个计数器来控制线程的执行顺序,等待其他线程完成一组操作,直到计数器达到零。
- 适用场景:主线程等待多个子线程完成初始化、多个线程同步启动等。
生活场景:大巴车发车
- 大巴车 = 主线程
- 要等的乘客 = 子线程
- CountDownLatch 计数器 = 车上要坐满的人数
- latch.await() = 司机在车里等着,不发车
- latch.countDown() = 上来一个乘客,计数器减1
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
private static final int THREAD_COUNT = 3;
private static final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
public static void main(String[] args) throws InterruptedException {
// 你(主线程)站在门口,做了三件事:
// 喊 一号工人去干活
// 喊 二号工人去干活
// 喊 三号工人去干活
// → 你喊完这三句,for 循环就结束了!→ 你不用等他们干完,你自己立刻往前走!
// 启动线程 ≠ 等待线程执行完,线程什么时候真正开始跑,是操作系统调度决定的
// start () 是 “异步” 的,调用完立刻返回,主线程继续跑!
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Worker(latch, "Worker-" + i)).start();
}
System.out.println("Main thread waiting for workers to finish...");
latch.await(); // 主线程等待所有子线程完成
System.out.println("All workers have finished.");
}
static class Worker implements Runnable {
private final CountDownLatch latch;
private final String name;
public Worker(CountDownLatch latch, String name) {
this.latch = latch;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " is working...");
Thread.sleep(1000); // 模拟工作时间
System.out.println(name + " has finished.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 工作完成后减少计数器
}
}
}
}
|
3.7 CyclicBarrier
- 原理:允许一组线程相互等待,直到所有线程都到达一个公共屏障点,然后一起继续执行。
- 特点:可以重用,即一旦所有线程通过了屏障,计数器会被重置,可以再次使用。
- 适用场景:多阶段并行任务、分阶段计算等。
生活场景:旅游集合
- 每个人先自己走一段
- 到了指定地点(屏障点)必须等所有人
- 人到齐了 → 大家一起出发走下一段
这就是 CyclicBarrier(循环屏障)
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
private static final int THREAD_COUNT = 3;
// 定义一个屏障:需要3个线程到达,并且人齐后执行一段动作
private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
System.out.println("===== 所有线程都到齐了,屏障放行,并且自动重置 =====");
});
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Worker(barrier, "Worker-" + i)).start();
}
}
static class Worker implements Runnable {
private final CyclicBarrier barrier;
private final String name;
public Worker(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
// ==============================================
// 第一轮:各自先干活,然后在屏障处等待
// ==============================================
System.out.println(name + " 开始【第一轮工作】...");
Thread.sleep((long) (Math.random() * 1000)); // 模拟快慢不同
System.out.println(name + " 【第一轮干完】,到达屏障,等待其他人");
// 等待所有人到齐 → 到齐后放行 → 屏障自动重置
barrier.await();
// ==============================================
// 第二轮:继续干活,再次到达同一个屏障
// 重点:这里再次使用同一个 barrier,体现【循环】重用
// ==============================================
System.out.println("\n" + name + " 开始【第二轮工作】...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(name + " 【第二轮干完】,再次到达屏障,等待其他人");
// 同一个屏障,再次等待所有人 → 再次重置
barrier.await();
// 两轮都结束
System.out.println(name + " 全部任务完成!");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
System.out.println(name + " 被中断或屏障损坏");
}
}
}
}
|
3.8 Exchanger 数据交换
- 原理:允许两个线程在某个特定点交换对象,每个线程调用
exchange() 方法时会阻塞,直到另一个线程也调用 exchange()。
- 适用场景:双线程协作、数据交换、生产者-消费者模式等。
生活场景:双人约会
- 把
Exchanger 看作 双人约会点
- 规则只有一条:只能两个人见面交换,多一个都不行!
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
import java.util.concurrent.Exchanger;
public class ExchangerExample {
// 创建一个交换器:专门给两个线程交换【字符串】用
private static final Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
// 创建线程A,带数据:Data from A
Thread threadA = new Thread(new ExchangerWorker("Thread A", "Data from A"));
// 创建线程B,带数据:Data from B
Thread threadB = new Thread(new ExchangerWorker("Thread B", "Data from B"));
threadA.start(); // 启动线程A
threadB.start(); // 启动线程B
try {
// 主线程等待A、B全部执行完
threadA.join();
threadB.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 工作线程:每个线程都会带着自己的数据,去交换
static class ExchangerWorker implements Runnable {
private final String name; // 线程名字
private final String data; // 线程自己要拿出去交换的数据
// 构造方法:给线程设置名字和要交换的数据
public ExchangerWorker(String name, String data) {
this.name = name;
this.data = data;
}
@Override
public void run() {
try {
System.out.println(name + " is working...");
// 随机睡一下:模拟每个线程执行速度不一样
Thread.sleep((long) (Math.random() * 1000));
System.out.println(name + " has finished.");
// ======================== 核心方法 ========================
// exchanger.exchange(我要交出去的数据)
// 作用:
// 1. 线程在这里【阻塞等待】
// 2. 等【另一个线程】也调用 exchange
// 3. 两个线程都到了 → 互相交换数据
// 4. 方法返回值 = 对方线程交过来的数据
// ==========================================================
String exchangedData = exchanger.exchange(data);
// 拿到对方线程的数据
System.out.println(name + " received: " + exchangedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(name + " was interrupted.");
}
}
}
}
|
3.9 线程间通信方式对比
只要不是线程自己玩自己的,而是需要 “配合、等待、传数据、打招呼”,都叫线程间通信。
| 机制 |
特点 |
适用场景 |
wait/notify |
基于监视器锁,需同步块 |
简单协作,如生产者-消费者 |
volatile |
保证可见性,不保证原子性 |
状态标记(如停止线程) |
BlockingQueue |
线程安全,支持阻塞操作 |
生产者-消费者模型(解耦) |
Lock + Condition |
更灵活的条件控制 |
多条件等待 |
CountDownLatch |
一次性屏障,等待多个线程完成 |
主线程等待子线程初始化 |
CyclicBarrier |
可重复使用的屏障 |
多阶段并行任务 |
Exchanger |
两个线程交换数据 |
双向数据传递 |
4. 线程池(Executor框架)
4.1 核心接口与类
Executor:顶层接口,定义执行任务的 execute() 方法。
ExecutorService:扩展 Executor,支持任务提交、终止和管理。
ThreadPoolExecutor:线程池核心实现类。
Executors:工厂类,提供创建常见线程池的静态方法。
ScheduledExecutorService:支持定时和周期性任务执行。
4.2 创建线程池的两种方式
手动创建(推荐,最可控) 直接 new ThreadPoolExecutor(...)
1
2
3
4
5
6
7
|
ThreadPoolExecutor pool = new ThreadPoolExecutor(
5,
10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.AbortPolicy()
);
|
Executors 工具类快速创建(简单常用) Executors 内部其实也是 new ThreadPoolExecutor(...),只是帮你封装好了。
1
2
3
4
5
6
7
8
9
10
11
|
// 固定线程数
ExecutorService pool = Executors.newFixedThreadPool(5);
// 单线程线程池
ExecutorService pool = Executors.newSingleThreadExecutor();
// 可缓存线程池(自动扩容)
ExecutorService pool = Executors.newCachedThreadPool();
// 定时任务线程池
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
|
4.3 线程池参数
1
2
3
4
5
6
7
8
9
|
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数(长期保留的线程)
int maximumPoolSize, // 最大线程数(线程池允许的最大线程数)
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 存活时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂(自定义线程创建)
RejectedExecutionHandler handler // 拒绝策略
)
|
4.4 线程池工作流程
- 提交任务时,若核心线程未满,创建新线程执行。
- 核心线程已满,任务进入工作队列等待。
- 队列满后,若线程数未达最大限制,创建非核心线程执行。
- 线程数已达最大且队列已满,触发拒绝策略。
4.5 常见线程池类型
| 线程池类型 |
特点 |
适用场景 |
FixedThreadPool |
固定大小,无界队列 |
稳定的并发需求 |
CachedThreadPool |
可缓存,自动扩容/缩容 |
短期任务,高并发 |
SingleThreadExecutor |
单线程,顺序执行 |
需要顺序执行的任务 |
ScheduledThreadPool |
支持定时任务 |
周期性任务 |
WorkStealingPool |
工作窃取,并行度可配置 |
并行计算 |
4.6 拒绝策略
-
AbortPolicy(默认)
-
CallerRunsPolicy
谁提交任务,谁就自己执行该任务。
- 线程池繁忙无法接收
- 由提交任务的线程(如主线程)直接运行
- 不丢任务、不抛异常,压力回传给提交方
-
DiscardPolicy
直接静默丢弃当前新任务。
- 线程池满员
- 不执行、不通知、不抛异常
- 任务直接消失
-
DiscardOldestPolicy
丢弃队列中最早的旧任务,再尝试提交当前新任务。
- 队列满时,踢出排队最久的任务
- 把最新任务加入队列
- 丢老保新
设置方式(代码示例)
1
2
3
4
5
6
7
8
|
ExecutorService pool = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲线程存活时间
new ArrayBlockingQueue<>(10), // 任务队列
// 在此设置拒绝策略
new ThreadPoolExecutor.AbortPolicy()
);
|
策略替换写法
1
2
3
4
5
6
7
8
9
10
11
|
// 1. 抛异常(默认)
new ThreadPoolExecutor.AbortPolicy()
// 2. 谁提交谁运行
new ThreadPoolExecutor.CallerRunsPolicy()
// 3. 丢弃当前新任务
new ThreadPoolExecutor.DiscardPolicy()
// 4. 丢弃队列最老任务
new ThreadPoolExecutor.DiscardOldestPolicy()
|
4.7 生产环境线程池配置
4.7.1 任务类型分析
4.7.2 队列选择
- 有界队列:如
ArrayBlockingQueue,避免内存溢出。
- 无界队列:如
LinkedBlockingQueue,适合任务量可控的场景。
- 同步队列:如
SynchronousQueue,容量为 0,不存储任务,必须有空闲线程等待接收,否则提交会被拒绝;不属于有界也不属于无界,任务直接在线程间交接,不排队。
4.7.3 完整配置示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ProductionThreadPoolConfig {
public static void main(String[] args) {
// 1. 获取CPU核心数
int cpuCores = Runtime.getRuntime().availableProcessors();
// 2. 配置线程池参数(以IO密集型为例)
int corePoolSize = 2 * cpuCores;
int maxPoolSize = 4 * cpuCores;
int queueCapacity = 100;
long keepAliveTime = 30;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity),
new CustomThreadFactory(), // 自定义线程名称
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满后由提交任务的线程执行
);
// 3. 提交任务
for (int i = 0; i < 200; i++) {
final int taskId = i;
executor.execute(() -> {
try {
// 模拟IO操作(如HTTP请求)
Thread.sleep(100);
System.out.println("Task " + taskId + " executed by: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 4. 关闭线程池
executor.shutdown();
}
// 自定义线程工厂(便于监控)
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "PROD-THREAD-" + counter.getAndIncrement());
thread.setDaemon(false); // 非守护线程
thread.setPriority(Thread.NORM_PRIORITY); // 正常优先级
return thread;
}
}
}
|
4.8 线程池监控
-
监控指标:
activeCount:当前活跃线程数
queueSize:队列中的任务数
completedTaskCount:已完成的任务数
rejectedTaskCount:被拒绝的任务数
-
监控工具:
-
JMX(Java Management Extensions)
Java 内置的管理与监控规范/标准,定义了 JVM、线程池、内存等组件如何暴露监控指标。
常用可视化工具 jconsole、jvisualvm 基于 JMX 协议连接 JVM,实时查看线程池状态、线程数、队列长度等信息,不依赖任何框架,通用且稳定。
-
Prometheus + Grafana
生产级监控系统,用于采集、存储、展示线程池及应用指标,支持图表展示与告警。
-
Spring Boot Actuator
Spring Boot 提供的监控模块,可快速暴露线程池、健康状态等指标,常配合 Prometheus 使用。
5. 并发工具类详解
5.1 原子类
- 原理:基于 CAS(Compare-And-Swap)操作实现无锁的线程安全。
- 优势:避免了锁的开销,适用于高并发场景。
- 常用类:
AtomicInteger、AtomicLong、AtomicBoolean
AtomicReference、AtomicStampedReference(解决ABA问题)
AtomicIntegerArray、AtomicLongArray
CAS = 比较并交换,是一种 CPU 指令级的原子操作。
- 先从主内存读取当前值,记为 A
→ 这是你线程本地拿到的旧值
- 你根据业务计算出要改成的新值 B
- 执行 CPU 指令 CAS(内存地址, 预期值A, 新值B)
- 去主内存里看一眼:现在的值还是 A 吗?
- 是 → 改成 B,成功
- 否 → 说明中间被其他线程改过了,本次失败
- 失败就循环重试(自旋),直到成功
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet(); // 原子操作
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final counter: " + counter.get()); // 一定是 2000
}
}
|
5.2 并发集合
ConcurrentHashMap:线程安全的哈希表,支持高并发读写。
CopyOnWriteArrayList:读写分离的列表,适合读多写少的场景。
CopyOnWriteArraySet:基于 CopyOnWriteArrayList 的集合实现。
ConcurrentLinkedQueue:无界线程安全队列,基于链表实现。
BlockingQueue 实现类:线程安全的阻塞队列。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
map.compute("key", (k, v) -> v == null ? 1 : v + 1);
}
};
Thread t1 = new Thread(task);
Thread t2 = new Thread(task);
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final value: " + map.get("key")); // 一定是 2000
}
}
|
5.3 ThreadLocal
- 作用:为每个线程提供独立的变量副本,避免线程间共享变量的竞争。
- 使用场景:存储线程上下文、事务信息、用户会话等。
- 注意事项:使用不当可能导致内存泄漏,需要及时调用
remove() 方法清理。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public class ThreadLocalExample {
private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
private static ThreadLocal<Long> startTime = ThreadLocal.withInitial(() -> System.currentTimeMillis());
public static void main(String[] args) {
Runnable task = () -> {
try {
threadLocal.set((int) (Math.random() * 100));
System.out.println(Thread.currentThread().getName() + ": " + threadLocal.get());
Thread.sleep(1000);
long duration = System.currentTimeMillis() - startTime.get();
System.out.println(Thread.currentThread().getName() + " took: " + duration + "ms");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadLocal.remove(); // 清理资源,避免内存泄漏
startTime.remove();
}
};
Thread t1 = new Thread(task, "Thread-1");
Thread t2 = new Thread(task, "Thread-2");
t1.start();
t2.start();
}
}
|
6. 最佳实践与性能调优
6.1 线程安全最佳实践
-
优先使用高层并发工具:如 BlockingQueue、CountDownLatch,避免直接使用 wait/notify。
-
选择合适的并发集合:
- 读多写少:
CopyOnWriteArrayList
- 高并发读写:
ConcurrentHashMap
- 队列操作:
LinkedBlockingQueue
-
使用原子类替代锁:对于简单的计数器、标志位等,使用 Atomic 类性能更好。
-
合理使用锁:
- 减小锁范围:只锁定必要的代码块
- 选择合适的锁类型:
ReentrantLock 或 synchronized
- 避免死锁:保持锁的获取顺序一致
-
线程池调优:
- 根据任务类型设置合理的线程数
- 使用有界队列避免内存溢出
- 选择合适的拒绝策略
-
避免线程局部变量泄漏:使用 ThreadLocal 后及时调用 remove()。
-
使用 volatile 正确:仅用于状态标记,不用于复合操作。
6.2 性能调优技巧
-
减少上下文切换:
-
优化内存访问:
- 减少伪共享,同变量挤在一个缓存行,多线程互相拖慢,使用@Contended注解,给变量加隔离,让它们各占一行,消除干扰,提升并发性能
- 合理使用缓存行对齐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// 没加 @Contended(伪共享,慢)
class Data {
long x; // 线程A改
long y; // 线程B改
// x 和 y 挤在同一个缓存行 → 互相干扰
}
// 加了 @Contended(无伪共享,快)
import sun.misc.Contended;
class Data {
@Contended
long x; // 独占一个缓存行
@Contended
long y; // 独占一个缓存行
}
|
-
使用并行流:对于大数据处理,使用 parallelStream() 利用多核CPU。
-
异步处理:对于IO密集型任务,使用异步IO或CompletableFuture。
-
监控与分析:
- 使用JFR工具,JFR 是 JVM 内置的低开销、生产级性能与事件记录工具,类似 Java 应用的 “黑匣子”。记录 JVM 与应用的时间戳事件(线程、锁、GC、方法采样、内存分配、I/O 等超 200 种)。生成 .jfr 二进制文件,用 JDK Mission Control (JMC) 可视化分析。
- 使用 VisualVM 监控线程状态
- 使用压测工具(如 JMeter)评估系统性能
6.3 常见并发问题及解决方案
| 问题 |
原因 |
解决方案 |
| 死锁 |
线程循环等待对方释放锁 |
保持锁获取顺序一致,使用 Lock 的超时机制 |
| 活锁 |
线程不断重试失败的操作 |
增加随机退避时间,使用 tryLock() |
| 饥饿 |
某些线程长期无法获取资源 |
使用公平锁,合理设置线程优先级 |
| 内存泄漏 |
ThreadLocal 未清理 |
及时调用 remove() 方法 |
| 竞态条件 |
多线程同时修改共享资源 |
使用锁或原子类保证原子性 |
| 上下文切换开销 |
线程数过多 |
合理设置线程池大小,减少线程创建销毁 |
总结
JUC 是 Java 并发编程的核心工具包,提供了丰富的组件来简化并发编程,提高程序性能和可靠性。本文系统介绍了:
- JUC 框架概述:核心组件和设计理念
- 线程基础与原理:线程创建、生命周期、内存模型
- 线程间通信机制:
wait/notify、volatile、BlockingQueue、Lock/Condition、CountDownLatch、CyclicBarrier、Exchanger
- 线程池:核心参数、工作流程、配置最佳实践
- 并发工具类:原子类、并发集合、
ThreadLocal
- 最佳实践与性能调优:线程安全实践、性能优化技巧、常见问题解决方案
通过掌握这些知识,你将能够:
- 编写线程安全的并发程序
- 合理使用线程池提高系统性能
- 避免常见的并发问题
- 优化并发程序的性能
并发编程是 Java 后端开发的重要组成部分,掌握 JUC 工具包的使用和原理,将为你开发高性能、可靠的应用程序奠定坚实的基础。