背景
本文是《Java 后端从小白到大神》修仙系列第十一篇,正式进入Java后端世界,本篇文章主要聊Java并发编程。若想详细学习请点击首篇博文,我们开始吧。
文章概览
本文是 JUC 系列的续篇,主要涵盖以下内容:
- 锁机制:ReentrantLock、ReentrantReadWriteLock、StampedLock 和 Condition
- 原子类:CAS 原理与常见原子类的使用
- 并发集合:ConcurrentHashMap 和 CopyOnWriteArrayList
- 同步辅助类:CountDownLatch、CyclicBarrier 和 Semaphore
- 异步编程:Future 与 CompletableFuture
- AQS 原理:AbstractQueuedSynchronizer 的核心实现
核心工具类(JUC 续篇)
一、锁(Lock)
锁的类型:
- 显式锁:需要手动获取和释放锁(如
ReentrantLock),提供更灵活的锁操作。
- 隐式锁:通过
synchronized 关键字隐式管理锁,使用简单但功能有限。
- 读写锁:区分读锁和写锁(如
ReentrantReadWriteLock),适用于读多写少场景。
- 乐观锁:通过版本号或戳记(Stamp)实现无锁读(如
StampedLock),提供更高的并发性能。
锁的核心概念:
- 可重入性:同一线程可以多次获取同一把锁,避免死锁。
- 公平性:是否按照线程请求锁的顺序分配锁,公平锁可以避免线程饥饿,但性能略低。
- 锁粒度:锁保护的资源范围,粒度越小,并发性能越高。
- 死锁:两个或多个线程互相等待对方释放锁,导致所有线程阻塞。
1. ReentrantLock
核心原理(可重入独占锁):
- 基于 AQS(AbstractQueuedSynchronizer):通过
state 变量(int 类型)表示锁的状态:
state = 0:锁未被占用。
state > 0:锁被占用,数值表示重入次数(同一线程多次获取锁)。
- 公平性策略:
- 非公平锁(默认):线程直接尝试通过 CAS 抢占锁,失败后进入等待队列。优点是性能高,缺点是可能导致线程饥饿。
- 公平锁:严格按照队列顺序获取锁,避免线程饥饿。优点是公平,缺点是性能略低。
- 重入机制:记录当前持有锁的线程(
exclusiveOwnerThread),每次重入时 state 自增,确保同一线程可以多次获取锁。
- 锁释放:释放锁时
state 自减,直到 state = 0 时完全释放,此时会唤醒等待队列中的下一个线程。
ReentrantLock 与 synchronized 的对比:
- 灵活性:ReentrantLock 提供更灵活的锁操作,如可中断锁、超时锁、条件变量等。
- 公平性:ReentrantLock 可以选择公平或非公平模式,而 synchronized 只能是非公平的。
- 性能:在低竞争场景下,两者性能相近;在高竞争场景下,ReentrantLock 性能更优。
- 使用方式:ReentrantLock 需要手动获取和释放锁,而 synchronized 是隐式的。
关键流程:
-
加锁:
1
2
3
4
5
6
|
final void lock() {
if (compareAndSetState(0, 1)) // 非公平锁尝试直接抢占
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 进入 AQS 队列等待
}
|
-
解锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
protected final boolean tryRelease(int releases) {
// state - 1
int c = getState() - releases;
// 不是当前持有锁的线程 → 抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// c == 0 表示完全释放
boolean free = (c == 0);
// 完全释放 → 清空持有线程
if (free) setExclusiveOwnerThread(null);
// 保存新 state
setState(c);
// 返回:是否真正释放了锁
return free;
}
|
常用方法:
lock():获取锁,如果锁不可用则阻塞当前线程。
tryLock():尝试获取锁,如果锁可用则立即返回 true,否则返回 false。
tryLock(long timeout, TimeUnit unit):尝试在指定时间内获取锁,超时则返回 false。
unlock():释放锁。
isHeldByCurrentThread():检查当前线程是否持有该锁。
getHoldCount():获取当前线程持有该锁的次数。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
// 创建【一把】可重入锁对象(全局唯一)
ReentrantLock lock = new ReentrantLock();
/**
* 标准加锁执行业务方法
* 作用:获取不到锁就【一直等待】,直到拿到锁才执行
*/
public void doWork() {
// 1. 尝试获取锁
// - 如果锁空闲 → 立即获取,state = 1
// - 如果锁被其他线程持有 → 当前线程【阻塞等待】
lock.lock();
try {
// 2. 临界区(安全区)
// 只有成功获取锁的线程,才能执行这里的代码
// 多线程下不会出现并发安全问题
} finally {
// 3. 【必须】在 finally 里释放锁
// 保证无论程序是否异常,锁一定能被释放,避免死锁
// 释放逻辑:state - 1,减到 0 则完全释放锁
lock.unlock();
}
}
/**
* 带【超时时间】的尝试获取锁
* 作用:1秒内获取不到锁,就【放弃等待】,直接返回 false,不阻塞
*/
public void tryDoWork() throws InterruptedException {
// 尝试获取锁,最多等待 1 秒
// - 1秒内拿到锁 → 返回 true,继续执行
// - 1秒内没拿到锁 → 返回 false,跳过执行
// 不会像 lock() 那样无限等待
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
// 临界区代码:只有成功获取锁才会执行
} finally {
// 只要拿到了锁,就必须释放
lock.unlock();
}
}
}
|
使用场景:
- 需要手动控制锁的获取和释放。
- 需要可中断或超时获取锁(如避免死锁)。
2. ReentrantReadWriteLock
核心原理(可重入读写锁):
- 读写分离:将锁分为读锁(共享)和写锁(独占),允许多个线程同时读取,但写入时需要独占。
- AQS 状态拆分:高 16 位 = 读计数,低 16 位 = 写计数,一个 int 存两个数。
- 高 16 位:读锁的持有次数(共享模式),每个读线程持有一次。
- 低 16 位:写锁的重入次数(独占模式),同一线程多次获取写锁时递增。
- 互斥规则:
- 写锁获取时,禁止其他线程获取读锁或写锁,保证写入操作的原子性。
- 读锁获取时,允许其他线程获取读锁,但禁止写锁,保证读取操作的一致性。
- 重入机制:
- 写锁可以降级为读锁(先获取写锁,再获取读锁,最后释放写锁)。
- 读锁不能升级为写锁(会导致死锁)。
ReentrantReadWriteLock 的适用场景:
- 读多写少的场景,如缓存系统、配置中心等。
- 需要保证数据一致性的同时提高并发性能的场景。
关键实现:
-
读锁(ReadLock):
1
2
3
4
5
6
7
8
9
10
11
|
protected final int tryAcquireShared(int unused) {
// 检查是否有写锁被其他线程持有
if (exclusiveCount(getState()) != 0 && getExclusiveOwnerThread() != current)
return -1; // 获取失败
int r = sharedCount(getState()); // 当前读锁数量
if (!readerShouldBlock() && r < MAX_COUNT) {
if (compareAndSetState(c, c + SHARED_UNIT)) // CAS 更新读锁数量
return 1; // 成功获取读锁
}
return fullTryAcquireShared(current); // 完整尝试(处理竞争)
}
|
-
写锁(WriteLock):
1
2
3
4
5
6
7
8
9
10
11
12
13
|
protected final boolean tryAcquire(int acquires) {
// 检查是否有其他线程持有锁(读或写)
if (getState() != 0) {
if (current != getExclusiveOwnerThread())
return false; // 被其他线程占用
}
// CAS 更新写锁状态
if (compareAndSetState(c, c + acquires)) {
setExclusiveOwnerThread(current);
return true;
}
return false;
}
|
常用方法:
readLock():获取读锁。
writeLock():获取写锁。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void readData() {
rwLock.readLock().lock();
try {
// 读操作
} finally {
rwLock.readLock().unlock();
}
}
public void writeData() {
rwLock.writeLock().lock();
try {
// 写操作
} finally {
rwLock.writeLock().unlock();
}
}
|
使用场景:
- 读多写少的场景(如缓存系统)。
- 允许并发读但互斥写。
3. StampedLock
核心原理(基于戳记的高性能锁):
- 乐观读(Optimistic Read):无锁读取数据,通过戳记(
long stamp)验证数据是否被修改,适用于读操作远多于写操作的场景。
- 锁模式:
- 读锁(Read Lock):共享锁,允许多线程同时读,性能略低于乐观读。
- 写锁(Write Lock):独占锁,互斥其他所有锁,保证写入操作的原子性。
- 乐观读模式:无锁读取,通过戳记验证数据一致性。
- 戳记(Stamp):一个 64 位的版本号,用于检测锁状态变化,不同模式的戳记格式不同。
- 锁转换:
- 支持从乐观读升级为读锁或写锁。
- 支持从读锁降级为乐观读。
- 写锁可以降级为读锁。
StampedLock 的优势:
- 在读多写少的场景下,性能远高于 ReentrantReadWriteLock。
- 乐观读模式避免了读锁的获取和释放开销。
- 提供了灵活的锁转换机制,适应不同的并发场景。
StampedLock 的注意事项:
- 写锁不可重入,同一线程多次获取写锁会导致死锁。
- 乐观读需要手动验证戳记,使用不当可能导致数据不一致。
- 不支持条件变量(Condition)。
关键机制:
-
乐观读流程:
1
2
3
4
5
6
7
8
9
|
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
public boolean validate(long stamp) {
// 检查戳记是否有效(期间是否有写操作)
return (stamp & SBITS) == (state & SBITS);
}
|
-
写锁获取:
1
2
3
4
5
6
7
8
|
public long writeLock() {
long s, next;
while (((s = state) & ABITS) == 0L) { // 无锁时尝试获取
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
return 0L;
}
|
常用方法:
readLock():获取读锁。
writeLock():获取写锁。
tryOptimisticRead():尝试乐观读。
validate(stamp):验证乐观读是否有效。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
import java.util.concurrent.locks.StampedLock;
// 创建 StampedLock 实例(支持 乐观读、悲观读锁、写锁)
StampedLock stampedLock = new StampedLock();
/**
* 【乐观读】核心方法
* 特点:不加锁、无阻塞、极高性能
* 适用:读多写少,大概率不会被写入打断的场景
*/
public void optimisticRead() {
// 1. 尝试【乐观读】—— 完全不加锁,只获取一个版本戳(stamp)
// 乐观假设:这段时间内不会有线程来修改数据
long stamp = stampedLock.tryOptimisticRead();
// ======================
// 这里直接执行【读操作】
// 不加锁,所以速度极快
// ======================
// 2. 校验:从乐观读到现在,数据是否被【写操作】修改过
// stamp 没变 = 没被修改 = 读的数据有效
// stamp 变了 = 被写过 = 数据无效,需要重新读
if (!stampedLock.validate(stamp)) {
// 3. 乐观读失败(数据被修改)
// 【降级为悲观读锁】—— 加读锁,保证安全读取
stamp = stampedLock.readLock();
try {
// ======================
// 加锁后【重新读取数据】
// 此时绝对安全,不会被写操作打断
// ======================
} finally {
// 4. 释放悲观读锁
stampedLock.unlockRead(stamp);
}
}
}
/**
* 【写锁】方法
* 特点:独占锁、互斥、安全写入
* 写的时候,任何读、其他写都不能执行
*/
public void write() {
// 1. 获取【独占写锁】
// 加锁成功前,线程会阻塞等待
long stamp = stampedLock.writeLock();
try {
// ======================
// 【写操作】
// 独占执行,线程安全
// ======================
} finally {
// 2. 释放写锁
// 必须在 finally 里保证释放
stampedLock.unlockWrite(stamp);
}
}
|
使用场景:
- 读操作远多于写操作。
- 需要高性能的无锁读(通过乐观锁)。
4. Condition
核心原理:
- Condition 是与 Lock 配合使用的条件变量,用于线程间的通信。
- 每个 Condition 实例都绑定到一个 Lock 对象上。
- 提供了比 Object 的 wait()/notify() 更灵活的线程等待和唤醒机制。
常用方法:
await():线程释放锁并进入等待状态,直到被其他线程唤醒。
await(long time, TimeUnit unit):线程释放锁并进入等待状态,直到被唤醒或超时。
signal():唤醒一个等待在该条件上的线程。
signalAll():唤醒所有等待在该条件上的线程。
Condition 与 Object 的 wait()/notify() 的对比:
- 灵活性:Condition 可以创建多个条件变量,而 Object 只有一个。
- 精确性:Condition 可以精确唤醒特定条件的线程,而 Object 只能随机唤醒一个线程。
- 可控性:Condition 的 await() 可以响应中断,而 Object 的 wait() 也可以响应中断。
示例(生产者-消费者模型):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
ReentrantLock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
Queue<Integer> queue = new LinkedList<>();
int capacity = 10;
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待队列不满
}
queue.add(item);
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待队列不空
}
int item = queue.remove();
notFull.signal(); // 通知生产者
return item;
} finally {
lock.unlock();
}
}
|
使用场景:
对比与总结:
| 特性 |
ReentrantLock |
ReentrantReadWriteLock |
StampedLock |
| 锁类型 |
独占锁 |
读写分离 |
读锁、写锁、乐观读 |
| 重入性 |
支持 |
支持 |
写锁不可重入 |
| 公平性 |
支持公平/非公平 |
支持公平/非公平 |
仅非公平 |
| 性能 |
中等 |
读多写少时较高 |
极高(尤其乐观读场景) |
| AQS 依赖 |
是 |
是 |
否 |
| 适用场景 |
精确控制锁的获取/释放 |
读多写少 |
读远多于写的高并发场景 |
注意事项:
StampedLock 的乐观读需要手动验证戳记,使用不当可能导致数据不一致。
ReentrantReadWriteLock 在持续高并发读时可能导致写锁饥饿。
- 避免在
StampedLock 中嵌套锁操作(容易导致死锁)。
通过合理选择锁类型,可以显著提升并发程序的性能和可靠性。
二、原子类(Atomic Classes)
1. 原子类原理
原子类的核心原理是 CAS(Compare-And-Swap),一种无锁并发算法,它是实现线程安全的基础。
CAS 操作的三个要素:
- 内存地址(V):要修改的共享变量在内存中的地址。
- 预期原值(A):线程认为当前共享变量的值。
- 新值(B):线程希望将共享变量更新为的值。
CAS 操作流程:
- 读取内存地址 V 处的当前值。
- 检查当前值是否等于预期原值 A。
- 如果相等,将内存地址 V 处的值更新为新值 B;否则,放弃操作或重试。
CAS 的实现:
- CAS 通过硬件指令(如 x86 的
CMPXCHG)保证原子性,避免了锁的开销。
- Java 中的
Unsafe 类提供了 CAS 操作的底层实现。
CAS 的优点:
- 无锁操作,减少了线程上下文切换的开销。
- 高并发场景下性能优于锁。
- 可以实现乐观锁机制。
CAS 的缺点:
- ABA 问题:当变量从 A 变为 B 再变回 A 时,CAS 会认为变量没有变化。
- 自旋开销:高竞争场景下,CAS 可能会导致线程不断重试,增加 CPU 开销。
- 只能保证单个变量的原子性:无法保证复合操作的原子性。
2. 常见原子类及方法
Java 提供了丰富的原子类,用于实现无锁的线程安全操作。以下是常用的原子类及其核心方法:
| 原子类 |
用途 |
核心方法 |
应用场景 |
AtomicInteger |
原子整型操作 |
incrementAndGet(), getAndAdd(int delta), compareAndSet(int expect, int update) |
计数器、序号生成器 |
AtomicLong |
原子长整型操作 |
类似 AtomicInteger,支持 long 类型 |
大计数器、时间戳 |
AtomicBoolean |
原子布尔操作 |
compareAndSet(boolean expect, boolean update) |
状态标记、开关控制 |
AtomicReference<V> |
原子对象引用操作 |
getAndSet(V newValue), compareAndSet(V expect, V update) |
无锁数据结构、原子更新对象 |
AtomicIntegerArray |
原子整型数组操作 |
getAndAdd(int index, int delta) |
原子数组操作 |
AtomicStampedReference<V> |
解决 CAS 的 ABA 问题 |
getStamp(), compareAndSet(V expected, V newValue, int expectedStamp, int newStamp) |
避免 ABA 问题的场景 |
AtomicMarkableReference<V> |
带标记的原子引用 |
compareAndSet(V expected, V newValue, boolean expectedMark, boolean newMark) |
只需知道值是否被修改,不需要版本号的场景 |
AtomicLongArray |
原子长整型数组操作 |
类似 AtomicIntegerArray,支持 long 类型 |
原子长整型数组操作 |
AtomicReferenceArray<E> |
原子对象引用数组操作 |
类似 AtomicReference,支持数组操作 |
原子对象引用数组操作 |
3. 代码示例
1. AtomicInteger 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import java.util.concurrent.atomic.AtomicInteger;
/**
* AtomicInteger 示例:实现线程安全的计数器
*/
public class AtomicIntegerExample {
// 原子整型计数器,初始值为 0
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
// 创建两个线程,分别执行不同的原子操作
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet(); // 原子递增,相当于 counter++
}
}, "IncrementThread");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.addAndGet(2); // 原子加 2,相当于 counter += 2
}
}, "AddThread");
// 启动线程
t1.start();
t2.start();
// 等待线程执行完成
t1.join();
t2.join();
// 输出最终结果:1000 * 1 + 1000 * 2 = 3000
System.out.println("Final counter value: " + counter.get()); // 输出 3000
}
}
|
2. AtomicReference 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import java.util.concurrent.atomic.AtomicReference;
/**
* AtomicReference 示例
* 作用:对【对象引用】进行原子操作(线程安全,无锁)
* 相当于:原子版的对象引用,保证多线程下赋值、替换安全
*/
public class AtomicReferenceExample {
// 创建一个【原子引用】对象
// 作用:包裹一个 String 类型的引用,让它的所有操作都是原子的
// 初始值:"Hello"
private static AtomicReference<String> message = new AtomicReference<>("Hello");
public static void main(String[] args) {
// 获取原子引用里的【当前值】
System.out.println("初始值: " + message.get()); // 输出 Hello
// ============================
// 核心方法:compareAndSet(预期值, 新值)
// 原理:
// 1. 先判断当前值 是否等于 预期值
// 2. 如果相等 → 原子更新为新值,返回 true
// 3. 如果不相等 → 不更新,返回 false
// ============================
// 预期值是 Hello,当前值确实是 Hello → 更新成功
boolean updated = message.compareAndSet("Hello", "World");
System.out.println("更新成功? " + updated); // true
System.out.println("更新后的值: " + message.get()); // World
// ============================
// 再次尝试更新
// 预期值:Hello
// 但当前值已经是 World → 不匹配
// ============================
updated = message.compareAndSet("Hello", "Java");
System.out.println("再次更新成功? " + updated); // false
System.out.println("最终值: " + message.get()); // World
}
}
|
3. AtomicStampedReference 示例(解决 ABA 问题)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* AtomicStampedReference 示例
* 作用:给原子引用加【版本号】,彻底解决 CAS 的 ABA 问题
*/
public class ABAExample {
/**
* 创建带【版本号】的原子引用
* 参数1:初始值 = 100
* 参数2:初始版本号 = 0
*
* 普通 AtomicInteger 只记值,会产生 ABA;
* 这个类 既记值,又记版本,能识别 ABA
*/
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(100, 0);
public static void main(String[] args) {
// --------------------------
// 第一步:获取【初始值】和【初始版本号】
// --------------------------
// 获取当前版本号
int stamp = atomicStampedRef.getStamp();
// 获取当前值
Integer value = atomicStampedRef.getReference();
// 输出:初始值: 100, 版本号: 0
System.out.println("初始值: " + value + ", 版本号: " + stamp);
// --------------------------
// 第二步:模拟 ABA 场景
// 100 → 200 → 100
// --------------------------
System.out.println("=== 模拟 ABA 场景(值变回去了,但版本号变了)===");
// 1. 将值 100 → 200,版本号 0 → 1
// compareAndSet(预期值, 新值, 预期版本, 新版本)
atomicStampedRef.compareAndSet(value, 200, stamp, stamp + 1);
System.out.println("100 → 200 后:值=" + atomicStampedRef.getReference()
+ ",版本号=" + atomicStampedRef.getStamp());
// 2. 将值 200 → 100,版本号 1 → 2
atomicStampedRef.compareAndSet(200, 100, stamp + 1, stamp + 2);
System.out.println("200 → 100 后:值=" + atomicStampedRef.getReference()
+ ",版本号=" + atomicStampedRef.getStamp());
// --------------------------
// 第三步:当前线程尝试更新
// 它还以为值没变(还是100),但版本号已经变了!
// --------------------------
System.out.println("=== 当前线程尝试更新 ===");
// 预期值:100
// 预期版本:0(最开始的版本)
// 想改成:300,版本变成 1
// 但!现在版本已经是 2 了!
boolean success = atomicStampedRef.compareAndSet(100, 300, stamp, stamp + 1);
// 输出:更新成功? false
System.out.println("更新成功? " + success);
// 最终值还是 100,版本号 2
System.out.println("最终值: " + atomicStampedRef.getReference()
+ ", 版本号: " + atomicStampedRef.getStamp());
}
}
|
4. 适用场景
原子类适用于以下场景:
- 计数器:如
AtomicInteger 实现线程安全的计数器,用于统计请求数、操作次数等。
- 状态标志:如
AtomicBoolean 控制开关状态,用于实现线程安全的状态切换。
- 无锁数据结构:如基于
AtomicReference 实现的无锁队列、栈等,提高并发性能。
- 避免 ABA 问题:使用
AtomicStampedReference 或 AtomicMarkableReference,确保操作的原子性和一致性。
- 序号生成器:如
AtomicLong 生成唯一的序列号,用于分布式系统中的 ID 生成。
- 原子更新对象:如
AtomicReference 用于原子更新对象引用,实现无锁的对象替换。
原子类的优缺点:
-
优势:
- 无锁操作,减少了线程上下文切换的开销。
- 高并发场景下性能优于
synchronized 或显式锁。
- 提供了细粒度的原子操作,满足各种并发需求。
- 易于使用,API 简洁明了。
-
局限性:
- 高竞争场景可能导致频繁 CAS 重试,增加 CPU 开销。
- 无法保证复合操作的原子性(如多个变量同时更新)。
- ABA 问题需要特殊处理(使用
AtomicStampedReference 或 AtomicMarkableReference)。
使用建议:
- 简单原子操作:优先使用原子类,如计数、状态标记等。
- 复杂场景:需结合锁或
java.util.concurrent 包中的高级工具(如 ConcurrentHashMap)。
- 高竞争场景:考虑使用
LongAdder 等更适合高竞争场景的类。
- 避免 ABA 问题:使用
AtomicStampedReference 或 AtomicMarkableReference。
三、并发集合
1. ConcurrentHashMap
ConcurrentHashMap 是专为高并发场景设计的线程安全哈希表实现,核心优势是:读操作几乎无锁、高效,写操作通过细粒度锁实现,避免全局阻塞,兼顾线程安全与高性能,远超 Hashtable(全局锁)和加锁的 HashMap。
数据结构:
底层固定为「数组 + 链表/红黑树」,可直观理解为:
- 数组:一排独立的「桶(Bucket)」,默认初始容量为16个桶
- 链表:当单个桶中元素数量 ≤ 7 时,用链表存储(查询效率 O(n))
- 红黑树:当单个桶中元素数量 ≥ 8 时,自动转为红黑树(查询效率优化为 O(log n))
关键参数:
- 默认初始容量:16(初始16个桶)
- 负载因子:0.75(核心参数)—— 当哈希表现有元素数量达到「容量 × 0.75」时,触发扩容
- 扩容规则:容量翻倍(16→32→64→128…),最大容量为 2^30
核心原理:
Java 7 及之前:分段锁(Segment Locking)
核心是「分段管理」,可想象为:将整个 ConcurrentHashMap 拆分为 16 个独立的「小 HashMap(Segment)」,每个 Segment 对应一个独立的锁。
- 锁的类型:每个 Segment 继承 ReentrantLock(可重入锁),是独立的互斥锁
- 并发逻辑:不同线程可同时访问不同的 Segment,写操作只锁定当前操作的 Segment,读操作无需加锁(靠 volatile 保证可见性)
- 局限:并发度被固定为16(最多16个线程同时写),结构复杂、内存占用高
Java 8 及之后:CAS + synchronized(桶级锁)
彻底移除 Segment,简化结构,将锁粒度从「段」缩小到「单个桶」,性能大幅提升。
动态扩容:
当哈希表现有元素数量达到「容量 × 0.75」(负载因子阈值)时,触发扩容,全程不阻塞大量线程:
- 新建一个容量为原数组2倍的新数组;
- 支持多线程协助迁移数据(避免单线程迁移效率低);
- 迁移期间:读操作可继续访问旧数组,写操作优先写入新数组;
- 迁移完成后,废弃旧数组,新数组成为主数组,扩容结束。
并发控制:
-
细粒度锁:
- 每个桶(Bucket)独立加锁,减少了锁的竞争。
- 不同线程可以同时操作不同的桶,提高了并发性能。
-
CAS 操作:
- 写操作优先使用 CAS 操作尝试更新数据,避免了锁的开销。
- 如果 CAS 失败,则回退并重试,或者使用 synchronized 锁定单个桶。
-
无锁读操作:
- 读操作通常是无锁的,因为底层数据结构是不可变的(Immutable)。
- 即使写操作正在进行,读操作仍然可以安全地访问旧数据。
- 通过 volatile 关键字保证数据的可见性。
常用方法:
- put(K key, V value):插入键值对。
- get(Object key):获取指定键的值。
- remove(Object key):删除指定键的键值对。
- computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction):如果键不存在,则根据提供的函数计算并插入值。
- forEach(BiConsumer<? super K, ? super V> action):遍历所有键值对。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
import java.util.concurrent.ConcurrentHashMap;
/**
* ConcurrentHashMap 示例:演示多线程下的安全读写操作
* 核心:多线程同时写不会出现并发安全问题(无需手动加锁),读操作无锁高效
*/
public class ConcurrentHashMapExample {
public static void main(String[] args) throws InterruptedException {
// 1. 创建 ConcurrentHashMap 实例(线程安全,高并发适配)
// 泛型指定:key为String类型,value为Integer类型
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 2. 启动第一个写线程(Writer1):往map中添加 Key0~Key4,对应值 0~4
Thread writer1 = new Thread(() -> {
// 循环5次,生成5个键值对
for (int i = 0; i < 5; i++) {
// ConcurrentHashMap 的 put 方法是线程安全的,无需手动加锁
map.put("Key" + i, i);
// 打印当前线程添加的键值对,便于观察执行顺序
System.out.println("Writer1 added: Key" + i + " -> " + i);
}
});
// 3. 启动第二个写线程(Writer2):往map中添加 Key5~Key9,对应值 5~9
Thread writer2 = new Thread(() -> {
// 循环5次,生成另外5个键值对
for (int i = 5; i < 10; i++) {
// 两个写线程同时执行put,ConcurrentHashMap 内部会通过细粒度锁保证安全
map.put("Key" + i, i);
System.out.println("Writer2 added: Key" + i + " -> " + i);
}
});
// 4. 启动两个写线程,开始执行写操作(两个线程会并发执行)
writer1.start();
writer2.start();
// 5. 主线程等待两个写线程执行完毕(join():主线程阻塞,直到对应线程执行结束)
// 避免主线程提前执行读操作,导致读取不到完整数据
writer1.join();
writer2.join();
// 6. 启动读线程(Reader):遍历map,读取所有键值对
Thread reader = new Thread(() -> {
// forEach 遍历map,ConcurrentHashMap 的读操作无锁,高效且线程安全
// 即使此时有写操作(本示例无),也能保证读取到的是最新可见的值
map.forEach((key, value) -> {
System.out.println("Reader found: " + key + " -> " + value);
});
});
// 7. 启动读线程,执行读操作
reader.start();
reader.join();
// 主线程等待读线程执行完毕,再打印最终map大小
// 8. 打印map最终大小,理论值为10(Key0~Key9,共10个键值对)
// 由于ConcurrentHashMap线程安全,不会出现键值对丢失,大小一定是10
System.out.println("Final map size: " + map.size());
}
}
|
2. CopyOnWriteArrayList
CopyOnWriteArrayList 是一个线程安全的动态数组实现,写操作(如添加、删除、修改)会复制整个底层数组,因此写操作较慢,但读操作非常快且无锁,适用于读多写少的场景。
核心原理:
-
写时复制(Copy-On-Write):
- 每次写操作(如添加、删除、修改)都会创建底层数组的一个副本,并在副本上进行修改。
- 修改完成后,将引用指向新的数组。
- 由于写操作会复制整个数组,因此写操作较慢,但读操作非常快且无锁。
-
无锁读操作:
- 读操作直接访问底层数组,无需加锁。
- 即使写操作正在进行,读操作仍然可以安全地访问旧数组。
数据结构:
-
底层数组:
- 数据存储在一个数组中。
- 写操作会创建数组的副本,并在副本上进行修改。
-
迭代器快照:
- 迭代器返回的是数组的一个快照(Snapshot),不会反映后续的修改。
- 这种设计保证了迭代器的一致性,避免了并发修改异常(ConcurrentModificationException)。
并发控制:
-
写操作加锁:
- 写操作使用 ReentrantLock 加锁,确保同一时间只有一个线程可以修改数组。
- 写操作完成后,将引用指向新的数组。
-
读操作无锁:
- 读操作直接访问底层数组,无需加锁。
- 由于写操作会创建数组的副本,读操作始终访问的是旧数组,因此不会受到写操作的影响。
常用方法:
- add(E e):添加元素。
- remove(Object o):删除指定元素。
- get(int index):获取指定索引处的元素。
- size():返回列表的大小。
- iterator():返回一个迭代器(快照形式,不会反映后续修改)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteArrayListExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 CopyOnWriteArrayList
List<String> list = new CopyOnWriteArrayList<>();
// 启动多个线程进行写操作
Thread writer1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add("Item" + i);
System.out.println("Writer1 added: Item" + i);
}
});
Thread writer2 = new Thread(() -> {
for (int i = 5; i < 10; i++) {
list.add("Item" + i);
System.out.println("Writer2 added: Item" + i);
}
});
writer1.start();
writer2.start();
// 等待写线程完成
writer1.join();
writer2.join();
// 启动一个线程进行读操作
Thread reader = new Thread(() -> {
for (String item : list) {
System.out.println("Reader found: " + item);
}
});
reader.start();
reader.join();
System.out.println("Final list size: " + list.size());
}
}
|
| 特性 |
ConcurrentHashMap |
CopyOnWriteArrayList |
| 底层结构 |
数组 + 链表/红黑树 |
动态数组 |
| 线程安全性 |
分段锁/CAS 操作 |
写时复制(Copy-On-Write) |
| 读操作性能 |
高效,无锁 |
高效,无锁 |
| 写操作性能 |
较高,细粒度锁/CAS 操作 |
较低,每次写操作复制整个数组 |
| 适用场景 |
频繁读取和少量更新 |
读多写少 |
四、同步辅助类
1. CountDownLatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import java.util.concurrent.CountDownLatch;
// 演示:主线程等待 3 个工作线程全部执行完
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 计数器初始值 = 3
CountDownLatch latch = new CountDownLatch(3);
// 创建 3 个线程
for (int i = 1; i <= 3; i++) {
int finalI = i;
new Thread(() -> {
System.out.println("线程" + finalI + " 开始执行");
try {
Thread.sleep(1000); // 模拟业务
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + finalI + " 执行完毕");
// 计数器 -1
latch.countDown();
}).start();
}
// 主线程阻塞,直到计数器变为 0
System.out.println("主线程等待所有线程执行完毕...");
latch.await();
// 所有线程执行完才会走到这里
System.out.println("所有线程执行完毕,主线程继续运行");
}
}
|
2. CyclicBarrier
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import java.util.concurrent.CyclicBarrier;
// 演示:3 个线程互相等待,全部到位后一起执行
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 等待 3 个线程,全部到达后执行回调
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// ========== 【人齐了,先执行这里!】 ==========
System.out.println("=== 所有线程到达屏障,开始继续执行 ===");
});
// 创建 3 个线程
for (int i = 1; i <= 3; i++) {
int finalI = i;
new Thread(() -> {
System.out.println("线程" + finalI + " 到达屏障,等待其他线程");
try {
// ========== 【大家都在这里排队】 ==========
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
// ========== 【人齐后,大家一起从这里继续跑】 ==========
System.out.println("线程" + finalI + " 继续执行后续任务");
}).start();
}
}
}
|
3. Semaphore
作用
控制同一时间,最多允许多少个线程同时访问某个资源 / 某段代码。
相当于 “许可证”机制:线程必须先拿到许可证才能执行,拿不到就阻塞等待。
核心原理
- 初始化时指定许可证数量(permits)。
- 线程执行前调用
acquire():申请许可证,有则拿走,没有就阻塞等待。
- 线程执行完调用
release():归还许可证,让别的线程可以继续用。
- 本质就是限流、控制并发度。
适用场景
- 接口限流、连接池控制最大连接数
- 资源有限的场景(如最多 5 个线程同时上传、下载)
- 控制某段代码的并发线程数
生活场景
- Semaphore = 停车场
- 许可证数量 = 车位数量
- acquire = 进场停车
- release = 离场
- 车位满了,外面的车就只能等
代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import java.util.concurrent.Semaphore;
/**
* 模拟:最多允许 3 个线程同时执行
*/
public class SemaphoreDemo {
public static void main(String[] args) {
// 最多 3 个许可证 → 同一时间最多 3 个线程并发
Semaphore semaphore = new Semaphore(3);
// 启动 10 个线程
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
try {
// 获取许可证(拿不到就阻塞)
semaphore.acquire();
System.out.println("线程" + finalI + " 开始执行,当前正在执行:" + (3 - semaphore.availablePermits()));
// 模拟业务执行
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可证(必须放,否则许可证耗尽)
semaphore.release();
System.out.println("线程" + finalI + " 执行完毕,释放许可证");
}
}).start();
}
}
}
|
Semaphore 的常用方法:
| 方法 |
描述 |
| Semaphore(int permits) |
创建一个具有指定许可数量的 Semaphore,默认非公平。 |
| Semaphore(int permits, boolean fair) |
创建一个具有指定许可数量的 Semaphore,并指定是否公平。 |
| void acquire() |
获取一个许可,如果没有可用许可,线程将被阻塞。 |
| void acquireUninterruptibly() |
获取一个许可,忽略中断。 |
| int availablePermits() |
返回当前可用的许可数量。 |
| int drainPermits() |
获取并返回所有可用的许可,将许可数量置为 0。 |
| boolean tryAcquire() |
尝试获取一个许可,如果没有可用许可立即返回 false,不会阻塞。 |
| boolean tryAcquire(long timeout, TimeUnit unit) |
尝试在指定时间内获取一个许可,如果获取不到返回 false。 |
| void release() |
释放一个许可,增加可用许可数量。 |
| void release(int permits) |
释放指定数量的许可,增加可用许可数量。 |
| int getQueueLength() |
返回等待获取许可的线程数量。 |
| boolean hasQueuedThreads() |
检查是否有线程在等待获取许可。 |
| boolean isFair() |
检查 Semaphore 是否是公平的。 |
五、Future与CompletableFuture
1. Future
Future 就是:异步任务的“未来结果小票”。
你把任务交给线程池去后台执行,不用死等,先拿到一张小票(Future),等你需要结果的时候,再用这张小票去领取结果。
主要作用:
- 让任务后台异步执行,不卡住当前线程
- 提供获取结果的方法
- 提供取消任务的方法
- 提供查询任务是否完成的方法
简单说:
提交任务 → 不用等 → 过会儿再来拿结果。
生活场景:
- 你 = 主线程
- 餐厅厨房 = 线程池
- 点菜 = 提交任务
- 小票 = Future
- 拿小票取餐 =
future.get()
流程:
- 你点菜(提交任务)
- 厨房后台做饭(异步执行)
- 给你一张小票(返回 Future)
- 你可以先玩手机(不阻塞)
- 饿了拿小票取餐(get() 获取结果)
常用方法:
future.get():阻塞等待,直到任务完成,返回结果
future.isDone():判断任务是否执行完
future.cancel():取消任务
future.isCancelled():判断任务是否被取消
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
```java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureDemo {
public static void main(String[] args) throws Exception {
// 创建线程池
ExecutorService pool = Executors.newSingleThreadExecutor();
// 提交异步任务 → 立刻返回 Future 小票(不阻塞)
Future<Integer> future = pool.submit(() -> {
// 异步执行的任务
Thread.sleep(2000);
return 100; // 最终结果
});
System.out.println("任务已提交,我先去干别的事...");
// 需要结果时,再获取(会阻塞到任务完成)
Integer result = future.get();
System.out.println("异步任务结果:" + result);
pool.shutdown();
}
}
|
2. CompletableFuture
CompletableFuture = 加强版 Future,支持链式异步编排,不用阻塞、不用手动轮询,可以把多个异步任务像流水线一样串起来。
主要作用:
- 异步任务可以链式回调:A 执行完自动执行 B
- 非阻塞:不用一直等着
- 多任务组合:并行跑、等全部完成、等任意一个完成
- 优雅异常处理
- 手动完成任务(自己控制什么时候结束)
Future缺点:
- 只能
get() 阻塞等结果
- 不能自动触发下一步
- 多个异步任务很难优雅串联
- 异常处理麻烦
核心能力:
- 任务串行(一个接一个执行)
thenApply:接收上一步结果,处理并返回新结果,上一步结果 → 处理 → 返回新结果 (有入参、有出参)
thenAccept:接收上一步结果,只消费不返回,上一步结果 → 处理 → 不返回东西 (有入参、无出参)
- 任务并行(多个任务同时执行)
allOf:等待所有任务执行完成
anyOf:等待任意一个任务执行完成
- 异常处理
exceptionally:任务抛出异常时执行,返回默认值
handle:无论正常/异常都进入,统一处理结果或异常
- 异步创建:
runAsync:执行无返回值的任务(Runnable)
supplyAsync:执行有返回值的任务(Supplier)
生活场景:
- Future = 取餐小票
- CompletableFuture = 外卖送到家,还能自动加热、自动装盘
代码示例:
CompletableFuture 代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureAllMethodsDemo {
public static void main(String[] args) throws InterruptedException {
// 自定义线程池(用于演示 supplyAsync 带线程池的版本)
ExecutorService threadPool = Executors.newFixedThreadPool(3);
System.out.println("======================= 1. runAsync / supplyAsync =======================");
// 1. runAsync:无返回值
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> {
System.out.println("runAsync 执行(无返回值)");
});
// 2. supplyAsync:有返回值(默认线程池)
CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 执行(有返回值)");
return 10;
});
// 3. supplyAsync:有返回值(自定义线程池)
CompletableFuture<Integer> supplyPoolFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 自定义线程池执行");
return 20;
}, threadPool);
System.out.println("======================= 2. 串行方法 thenApply / thenAccept / thenRun / thenCombine =======================");
// 4. thenApply:接收结果 → 处理 → 返回新结果
CompletableFuture<String> applyFuture = supplyFuture.thenApply(num -> {
System.out.println("thenApply 接收:" + num + " → 处理成字符串");
return "结果:" + num;
});
// 5. thenAccept:接收结果 → 只消费,不返回
CompletableFuture<Void> acceptFuture = supplyFuture.thenAccept(num -> {
System.out.println("thenAccept 消费结果:" + num);
});
// 6. thenRun:任务完成后执行,不接收结果
CompletableFuture<Void> runAfterFuture = supplyFuture.thenRun(() -> {
System.out.println("thenRun:上一步完成,我执行了");
});
// 7. thenCombine:等两个任务都完成,合并结果
System.out.println("\n===== 多任务组合:thenCombine =====");
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 3);
// thenCombine:等两个任务都完成,合并结果
CompletableFuture<Integer> combineFuture = task1.thenCombine(task2, (a, b) -> {
System.out.println("thenCombine 合并:" + a + " + " + b);
return a + b;
});
combineFuture.thenAccept(result -> System.out.println("合并结果:" + result));
System.out.println("======================= 3. 异常处理 exceptionally / handle / whenComplete =======================");
// 8. exceptionally:异常时返回默认值
// 捕获异常 + 屏蔽异常 + 返回默认值 -1
CompletableFuture<Integer> exceptionFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("异常任务执行");
throw new RuntimeException("出错啦!");
}).exceptionally(ex -> {
System.out.println("exceptionally 捕获异常:" + ex.getMessage());
return -1;
});
// 9. whenComplete:无论成功/失败都执行(不修改返回值)
// whenComplete = 监听结果 + 不修改结果 + 不能返回新值
// whenComplete:成功 / 异常都跑,不能改结果
CompletableFuture<Integer> whenCompleteFuture = CompletableFuture.supplyAsync(() -> 100)
.whenComplete((res, ex) -> {
if (ex != null) System.out.println("whenComplete 异常");
else System.out.println("whenComplete 成功,结果:" + res);
});
// 10. handle:无论成功/异常,都处理并返回新结果
CompletableFuture<Integer> handleFuture = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("handle 异常");
}).handle((res, ex) -> {
if (ex != null) {
System.out.println("handle 处理异常");
return -99;
}
return res;
});
System.out.println("======================= 4. 并行组合 allOf / anyOf =======================");
CompletableFuture<Integer> taskA = CompletableFuture.supplyAsync(() -> {
sleep(1000); return 1;
});
CompletableFuture<Integer> taskB = CompletableFuture.supplyAsync(() -> {
sleep(500); return 2;
});
CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {
sleep(1500); return 3;
});
// 11. allOf:等待所有任务完成
CompletableFuture<Void> all = CompletableFuture.allOf(taskA, taskB, taskC);
all.thenRun(() -> System.out.println("allOf:所有任务完成"));
// 12. anyOf:任意一个完成就继续
CompletableFuture<Object> any = CompletableFuture.anyOf(taskA, taskB, taskC);
any.thenAccept(res -> System.out.println("anyOf:最先完成的结果:" + res));
// 主线程等待
Thread.sleep(3000);
threadPool.shutdown();
System.out.println("\n======================= 全部演示完成 =======================");
}
private static void sleep(long ms) {
try { Thread.sleep(ms); } catch (Exception ignored) {}
}
}
|
3. FutureTask
- FutureTask 是
java.util.concurrent 下的工具类,实现了 RunnableFuture 接口。
- RunnableFuture 又同时继承 Runnable + Future,所以 FutureTask 一身两用:
- 可以当作 Runnable 交给线程执行
- 可以当作 Future 获取异步执行结果、判断是否完成、取消任务等
一句话概括:
FutureTask = 一个既能被线程执行,又能拿到执行结果的任务包装类。
主要作用:
- 包装 Callable(有返回值、可抛异常)或 Runnable(无返回值)任务
- 让普通线程(Thread)也能执行有返回值的异步任务
- 提供任务状态管理:是否完成、是否取消、获取结果、异常处理
- 是 ThreadPoolExecutor、Future 等机制的基础实现类
生活场景:
- 你去餐厅吃饭,点了一道菜(创建任务)
- 厨师开始做菜(线程执行 FutureTask)
- 你不用干等,可以先玩手机(异步执行)
- 菜做好后,你可以端过来吃(get() 获取结果)
- 如果菜烧糊了,你会收到问题反馈(异常抛出)
- 你也可以选择取消这道菜(cancel() 取消任务)
FutureTask 就像这道菜的“订单+制作+取餐”一体化凭证。
核心特点
- 结果只计算一次:任务执行完成后,再次调用 get() 会直接返回已缓存结果,不会重复执行。
- 可被 Thread / 线程池 执行:因为是 Runnable。
- 可阻塞获取结果:像 Future 一样。
- 异常会被包装:任务内部抛异常,会在调用 get() 时抛出
ExecutionException。
常用方法:
| 方法 |
描述 |
| FutureTask(Callable callable) |
使用 Callable 构造 FutureTask |
| FutureTask(Runnable runnable, V result) |
使用 Runnable 和结果构造 FutureTask |
| void run() |
执行任务(一般由线程自动调用) |
| boolean cancel(boolean mayInterruptIfRunning) |
尝试取消任务。如果任务已完成或已取消,返回 false。如果任务正在运行且 mayInterruptIfRunning 为 true,则中断任务 |
| boolean isCancelled() |
检查任务是否被取消 |
| boolean isDone() |
检查任务是否已完成 |
| V get() throws InterruptedException, ExecutionException |
阻塞等待任务执行完成,并返回结果;任务异常会在这里抛出。 |
| V get(long timeout, TimeUnit unit) |
获取任务的结果,带超时时间的获取,避免一直死等,则抛出 TimeoutException |
| void set(V v) |
设置任务的结果 |
| void setException(Throwable t) |
设置任务的异常 |
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
/**
* FutureTask 完整使用示例
* 包含:基本用法、线程池用法、异常处理、set手动设结果、setException手动设异常
*/
public class FutureTaskCompleteDemo {
public static void main(String[] args) {
System.out.println("============= 1. FutureTask 基本用法 =============");
basicUsage();
System.out.println("\n============= 2. FutureTask + 线程池使用 =============");
withExecutor();
System.out.println("\n============= 3. FutureTask 异常处理 =============");
exceptionHandle();
System.out.println("\n============= 4. 使用 set() 手动设置任务结果 =============");
useSetMethod();
System.out.println("\n============= 5. 使用 setException() 手动设置任务异常 =============");
useSetExceptionMethod();
}
/**
* 1. 基本用法:FutureTask + Thread
* 适用于简单单线程异步执行,获取返回值
*/
public static void basicUsage() {
// 定义一个有返回值的异步任务
Callable<Integer> callableTask = () -> {
System.out.println("Callable 任务开始执行");
Thread.sleep(2000); // 模拟业务耗时
System.out.println("Callable 任务执行完成");
return 42; // 返回结果
};
// 使用 FutureTask 包装 Callable
FutureTask<Integer> futureTask = new FutureTask<>(callableTask);
// 创建线程并启动任务
Thread thread = new Thread(futureTask);
thread.start();
try {
System.out.println("主线程等待任务结果...");
// 阻塞获取结果,直到任务执行完毕
Integer result = futureTask.get();
System.out.println("任务执行结果:" + result);
} catch (InterruptedException e) {
System.out.println("获取结果被中断");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("任务执行异常:" + e.getCause());
}
}
/**
* 2. 配合线程池 ExecutorService 使用
* 实际开发中更常用,统一管理线程
*/
public static void withExecutor() {
Callable<Integer> callableTask = () -> {
System.out.println("线程池中 Callable 任务开始");
Thread.sleep(2000);
System.out.println("线程池中 Callable 任务完成");
return 42;
};
FutureTask<Integer> futureTask = new FutureTask<>(callableTask);
// 创建单线程线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交 FutureTask 到线程池执行
executor.submit(futureTask);
try {
System.out.println("主线程等待线程池任务结果...");
Integer result = futureTask.get();
System.out.println("线程池任务结果:" + result);
} catch (InterruptedException e) {
System.out.println("获取结果被中断");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("线程池任务执行异常:" + e.getCause());
} finally {
// 关闭线程池
executor.shutdown();
}
}
/**
* 3. 任务抛出异常 + 异常捕获
* 任务内部抛出异常,会在 get() 时抛出 ExecutionException
*/
public static void exceptionHandle() {
Callable<Integer> callableTask = () -> {
System.out.println("任务开始执行");
Thread.sleep(2000);
// 模拟业务异常
throw new RuntimeException("业务执行出现错误!");
};
FutureTask<Integer> futureTask = new FutureTask<>(callableTask);
new Thread(futureTask).start();
try {
System.out.println("主线程等待任务结果...");
futureTask.get();
} catch (InterruptedException e) {
System.out.println("等待被中断");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// 真正的异常在 e.getCause() 里
System.out.println("捕获到任务异常:" + e.getCause().getMessage());
}
}
/**
* 4. 使用 set() 手动设置任务结果
* 可以强制让任务提前完成,并返回指定结果
*/
public static void useSetMethod() {
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(2000);
return "任务正常执行结果";
});
new Thread(futureTask).start();
try {
Thread.sleep(1000);
// 手动强制设置结果,任务会立即完成,不再继续执行
futureTask.set("手动强制设置的结果");
System.out.println("已手动设置任务结果");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
String result = futureTask.get();
System.out.println("最终获取结果:" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 5. 使用 setException() 手动设置任务异常
* 强制让任务以异常方式完成
*/
public static void useSetExceptionMethod() {
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(2000);
return "任务正常执行结果";
});
new Thread(futureTask).start();
try {
Thread.sleep(1000);
// 手动设置异常,任务立即异常结束
futureTask.setException(new RuntimeException("手动触发任务异常"));
System.out.println("已手动设置任务异常");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
futureTask.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("通过 get 捕获手动异常:" + e.getCause().getMessage());
}
}
}
|
4. Runnable/Callable/Future/FutureTask/CompletableFuture区别
一句话总览
- Runnable:无返回值、无异常
- Callable:有返回值、可抛异常
- Future:获取异步结果的接口
- FutureTask:Future 的实现类,可执行、可拿结果
- CompletableFuture:Future 的增强版,支持链式、多任务组合、异常处理
关系图
1
2
3
4
5
6
7
8
9
|
Runnable ←←← 基础无返回值任务
↑
Callable ←←← 有返回值任务
↑
Future ←←← 获取异步结果的接口
↑
FutureTask ←←← 实现了 Runnable + Future
↑
CompletableFuture ←←← 最强、最常用
|
五 个内容区别
Runnable
- 函数:
run()
- 无返回值
- 不能抛检查异常
- 最基础的任务
Callable
- 函数:
call()
- 有返回值
- 能抛异常
- 想拿异步结果必须用它
Future(接口)
- 定义获取异步结果的规范
- 方法:
get()、cancel()、isDone()
- 只是接口,不能直接用
FutureTask(实现类)
- 实现了
Runnable + Future
- 既能执行,又能拿结果
- 包装 Callable/Runnable
- 功能基础,不支持链式、不支持组合
CompletableFuture(增强版)
- 实现了
Future + CompletionStage
- 支持链式调用
- 支持多任务组合(allOf、anyOf)
- 支持异常处理
- 实际开发中最常用
核心区别
| 类型 |
是否有返回值 |
是否可抛异常 |
是否支持链式 |
是否支持多任务组合 |
地位 |
| Runnable |
无 |
不能 |
不支持 |
不支持 |
基础任务 |
| Callable |
有 |
能 |
不支持 |
不支持 |
带结果任务 |
| Future |
有 |
能 |
不支持 |
不支持 |
结果接口 |
| FutureTask |
有 |
能 |
不支持 |
不支持 |
基础实现 |
| CompletableFuture |
有 |
能 |
支持 |
支持 |
高级增强版 |
六、AQS(AbstractQueuedSynchronizer)
AbstractQueuedSynchronizer(简称AQS),是 Java 并发包(JUC)的「核心底层工具」—— 可以把它理解成一个「现成的同步框架」,就像一个“预制好的模板”,开发者不用自己写复杂的线程排队、唤醒逻辑,只需要关注“锁的状态管理”,就能快速实现各种同步器(比如锁、信号量、读写锁等)。
核心原理:
AQS 的核心结构,就像「一个收费站+一条排队车道」,用生活化场景类比,再对应到AQS的实际组件,瞬间就能理解:
核心组件(对应“收费站模型”)
-
状态变量 state(收费站的“通行指示灯”):这是一个整型变量,专门表示「同步状态」(也就是“锁的占用情况”)。
- state = 0:表示“锁空闲”(指示灯绿色,可通行);
- state ≥ 1:表示“锁被占用”(指示灯红色,不可通行),如果是可重入锁,state会随重入次数递增;
- 线程抢锁,本质就是“尝试把state从0改成1”;线程释放锁,就是“把state改回0”。
-
FIFO等待队列(收费站的“排队车道”):这是一个「双向链表」,每个链表节点就代表一个“没抢到锁、正在排队等待的线程”。
- 队列是“先进先出”的(先到先排队,先排队先通行),避免线程争抢混乱;
- 线程没抢到锁,不会乱跑,会自动“站进队列”,然后进入阻塞状态(相当于“停车等待”);
- 队列里的线程,只有等前面的线程释放锁,才会被唤醒,继续抢锁。
-
CAS操作(收费站的“自动栏杆”):是一种原子操作,用来“安全修改state的值”(避免多个线程同时改state,导致混乱)。
- 线程抢锁时,会用CAS尝试把state从0改成1:改成功了,说明抢到锁;改失败了,说明锁被占用,进入队列等待;
- CAS保证了“同一时刻,只有一个线程能抢到锁”,是AQS线程安全的核心。
核心流程(对应“车辆过收费站”)
- 第一步:线程抢锁(车辆到收费站)线程过来尝试获取锁,本质就是通过CAS操作,尝试修改state的值(从0改成1)。
- 第二步:判断抢锁结果(栏杆是否抬起)
- 抢锁成功(CAS修改state成功):state变成1,线程直接进入“临界区”(相当于车辆通过收费站,去执行自己的业务代码);
- 抢锁失败(CAS修改state失败):说明锁已经被其他线程占用,当前线程会被“包装成一个节点”,加入到FIFO等待队列的末尾(相当于车辆开进排队车道,停车等待),然后线程进入阻塞状态(不占用CPU资源,等待被唤醒)。
- 第三步:线程释放锁(车辆通过收费站,栏杆落下)持有锁的线程执行完临界区代码后,会释放锁:通过操作把state改回0(相当于收费站指示灯变回绿色)。
- 第四步:唤醒等待线程(通知下一辆车通行)锁释放后,AQS会自动唤醒等待队列中“最前面的线程”(FIFO原则,先排队的先唤醒),被唤醒的线程会再次尝试通过CAS修改state,抢锁成功后,进入临界区执行代码;如果抢锁失败(极端情况,比如被其他线程抢先),会再次进入队列等待。
AQS 工作原理:
graph TD
%% 主线流程(按真实执行顺序编号)
A1["1. 线程启动"] --> A["2. 线程尝试获取同步状态"]
A --> B{3. 抢占锁成功?}
B -->|成功| C["4. 执行临界区代码"]
B -->|失败| D["5. 进入 AQS 等待队列"]
D --> E["6. 线程阻塞(park)"]
C --> F["7. 执行完毕,释放同步状态"]
F --> G["8. 唤醒等待队列中的后继线程"]
G --> H["9. 线程被唤醒(unpark)"]
H --> A["2. 再次尝试获取同步状态"]
%% AQS 内部结构
subgraph AQS 内部核心组件
I["状态变量 state"]
J["双向等待队列"]
K["CAS 原子操作"]
end
%% 关联关系
A --> I
A --> K
D --> J
F --> I
G --> J
序号流程说明
- 线程启动
- 尝试获取锁(AQS state + CAS)
- 判断是否抢占成功
- 成功 → 执行业务代码
- 失败 → 进入等待队列
- 线程阻塞
- 锁使用完毕 → 释放锁
- 唤醒队列里的等待线程
- 被唤醒线程重新去抢锁
JUC 中使用 AQS 的工具:
| 工具 |
描述 |
| ReentrantLock |
可重入锁,支持公平和非公平模式。 |
| ReentrantReadWriteLock |
读写锁,允许多个读线程同时访问,但写线程独占访问。 |
| Semaphore |
许可证控制,限制同时访问某一资源的线程数量。 |
| CountDownLatch |
计数器倒计时门闩,当计数器归零时允许所有等待线程继续执行。 |
| CyclicBarrier |
循环屏障,多个线程在某个屏障点等待,直到所有线程都到达屏障点才继续执行。 |
| FutureTask |
可取消的任务,用于异步计算结果。 |
| SynchronousQueue |
同步队列,每次插入操作必须等待另一个线程的移除操作,反之亦然。 |
| Exchanger |
交换器,两个线程可以在某个点交换数据。 |
示例代码:一个简单的自定义锁实现,基于 AQS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 基于 AQS 实现的【自定义独占锁】
* 作用:理解 AQS 最核心的 3 个方法:tryAcquire / tryRelease / isHeldExclusively
*/
class SimpleLock {
/**
* 自定义同步器:继承 AQS
* 所有锁的核心逻辑都写在这里
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 尝试【获取锁】(独占模式)
* 底层 AQS 会自动调用这个方法
* @param acquires 一般固定传 1
* @return true=获取成功 / false=获取失败
*/
@Override
protected boolean tryAcquire(int acquires) {
// CAS 尝试把 state 从 0 → 1
// state=0 表示锁空闲
if (compareAndSetState(0, 1)) {
// 获取成功:标记当前线程持有锁
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// 获取失败(锁被其他线程占用)
return false;
}
/**
* 尝试【释放锁】
* @param releases 一般固定传 1
* @return 释放成功
*/
@Override
protected boolean tryRelease(int releases) {
// 如果当前锁已经是空闲状态,直接抛异常
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
// 清空持有锁的线程
setExclusiveOwnerThread(null);
// 设置 state = 0,表示锁释放
setState(0);
return true;
}
/**
* 判断锁是否【当前线程持有】
*/
@Override
protected boolean isHeldExclusively() {
// state == 1 表示锁被占用
return getState() == 1;
}
}
// 真正的同步器实例
private final Sync sync = new Sync();
/**
* 加锁:对外暴露的 lock() 方法
* 内部调用 AQS 的 acquire()
*/
public void lock() {
sync.acquire(1);
}
/**
* 解锁:对外暴露的 unlock() 方法
* 内部调用 AQS 的 release()
*/
public void unlock() {
sync.release(1);
}
/**
* 判断锁是否正在被占用
*/
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
// ====================== 测试类 ======================
public class SimpleLockTest {
public static void main(String[] args) {
// 创建自定义锁
SimpleLock lock = new SimpleLock();
// 线程 1:先启动,先抢占锁
Thread t1 = new Thread(() -> {
lock.lock(); // 加锁
System.out.println("线程1:成功获取到锁");
try {
Thread.sleep(2000); // 模拟业务执行 2 秒
System.out.println("线程1:业务执行中...");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1:释放锁");
lock.unlock(); // 解锁
});
// 线程 2:后启动,会等待线程1释放锁
Thread t2 = new Thread(() -> {
System.out.println("线程2:尝试获取锁...");
lock.lock(); // 阻塞等待锁
System.out.println("线程2:成功获取到锁");
lock.unlock(); // 用完释放
System.out.println("线程2:释放锁");
});
t1.start();
t2.start();
}
}
|
总结
JUC提供了丰富的并发工具,合理使用可以显著提升多线程程序的性能和可靠性。掌握其核心组件(线程池、锁、原子类、并发集合)的原理和使用场景,结合实践加深理解,是Java并发编程的关键。