背景
本文是《Java 后端从小白到大神》修仙系列第十一篇
,正式进入Java后端
世界,本篇文章主要聊Java基础
。若想详细学习请点击首篇博文,我们开始把。
文章概览
- 核心工具类(Java JUC)
核心工具类(续)
一、锁(Lock)
锁的类型:
- 显式锁:需要手动获取和释放锁(如
ReentrantLock
)。
- 隐式锁:通过
synchronized
关键字隐式管理锁。
- 读写锁:区分读锁和写锁(如
ReentrantReadWriteLock
)。
- 乐观锁:通过版本号或戳记(Stamp)实现无锁读(如
StampedLock
)。
1. ReentrantLock
核心原理(可重入独占锁):
- 基于 AQS(AbstractQueuedSynchronizer):通过
state
变量(int
类型)表示锁的状态:
state = 0
:锁未被占用。
state > 0
:锁被占用,数值表示重入次数(同一线程多次获取锁)。
- 公平性策略:
- 非公平锁(默认):线程直接尝试通过 CAS 抢占锁,失败后进入等待队列。
- 公平锁:严格按照队列顺序获取锁,避免线程饥饿。
- 重入机制:记录当前持有锁的线程(
exclusiveOwnerThread
),每次重入时 state
自增。
- 锁释放:释放锁时
state
自减,直到 state = 0
时完全释放。
关键流程:
-
加锁:
1
2
3
4
5
6
|
final void lock() {
if (compareAndSetState(0, 1)) // 非公平锁尝试直接抢占
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 进入 AQS 队列等待
}
|
-
解锁:
1
2
3
4
5
6
7
8
9
|
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free) setExclusiveOwnerThread(null);
setState(c);
return free;
}
|
常用方法:
lock()
:获取锁,如果锁不可用则阻塞当前线程。
tryLock()
:尝试获取锁,如果锁可用则立即返回 true,否则返回 false。
tryLock(long timeout, TimeUnit unit)
:尝试在指定时间内获取锁,超时则返回 false。
unlock()
:释放锁。
isHeldByCurrentThread()
:检查当前线程是否持有该锁。
getHoldCount()
:获取当前线程持有该锁的次数。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
ReentrantLock lock = new ReentrantLock();
public void doWork() {
lock.lock();
try {
// 临界区代码
} finally {
lock.unlock();
}
}
// 尝试获取锁(超时)
public void tryDoWork() throws InterruptedException {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
// 临界区代码
} finally {
lock.unlock();
}
}
}
|
使用场景:
- 需要手动控制锁的获取和释放。
- 需要可中断或超时获取锁(如避免死锁)。
2. ReentrantReadWriteLock
核心原理(可重入读写锁):
- 读写分离:将锁分为读锁(共享)和写锁(独占)。
- AQS 状态拆分:
- 高 16 位:读锁的持有次数(共享模式)。
- 低 16 位:写锁的重入次数(独占模式)。
- 互斥规则:
- 写锁获取时,禁止其他线程获取读锁或写锁。
- 读锁获取时,允许其他线程获取读锁,但禁止写锁。
关键实现:
-
读锁(ReadLock):
1
2
3
4
5
6
7
8
9
10
11
|
protected final int tryAcquireShared(int unused) {
// 检查是否有写锁被其他线程持有
if (exclusiveCount(getState()) != 0 && getExclusiveOwnerThread() != current)
return -1; // 获取失败
int r = sharedCount(getState()); // 当前读锁数量
if (!readerShouldBlock() && r < MAX_COUNT) {
if (compareAndSetState(c, c + SHARED_UNIT)) // CAS 更新读锁数量
return 1; // 成功获取读锁
}
return fullTryAcquireShared(current); // 完整尝试(处理竞争)
}
|
-
写锁(WriteLock):
1
2
3
4
5
6
7
8
9
10
11
12
13
|
protected final boolean tryAcquire(int acquires) {
// 检查是否有其他线程持有锁(读或写)
if (getState() != 0) {
if (current != getExclusiveOwnerThread())
return false; // 被其他线程占用
}
// CAS 更新写锁状态
if (compareAndSetState(c, c + acquires)) {
setExclusiveOwnerThread(current);
return true;
}
return false;
}
|
常用方法:
readLock()
:获取读锁。
writeLock()
:获取写锁。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void readData() {
rwLock.readLock().lock();
try {
// 读操作
} finally {
rwLock.readLock().unlock();
}
}
public void writeData() {
rwLock.writeLock().lock();
try {
// 写操作
} finally {
rwLock.writeLock().unlock();
}
}
|
使用场景:
- 读多写少的场景(如缓存系统)。
- 允许并发读但互斥写。
3. StampedLock
核心原理(基于戳记的高性能锁):
- 乐观读(Optimistic Read):无锁读取数据,通过戳记(
long stamp
)验证数据是否被修改。
- 锁模式:
- 读锁(Read Lock):共享锁,允许多线程同时读。
- 写锁(Write Lock):独占锁,互斥其他所有锁。
- 戳记(Stamp):一个版本号,用于检测锁状态变化。
- 锁转换:支持从读锁升级为写锁(需释放读锁后重新获取)。
关键机制:
-
乐观读流程:
1
2
3
4
5
6
7
8
9
|
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
public boolean validate(long stamp) {
// 检查戳记是否有效(期间是否有写操作)
return (stamp & SBITS) == (state & SBITS);
}
|
-
写锁获取:
1
2
3
4
5
6
7
8
|
public long writeLock() {
long s, next;
while (((s = state) & ABITS) == 0L) { // 无锁时尝试获取
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
return 0L;
}
|
常用方法:
readLock()
:获取读锁。
writeLock()
:获取写锁。
tryOptimisticRead()
:尝试乐观读。
validate(stamp)
:验证乐观读是否有效。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
StampedLock stampedLock = new StampedLock();
public void optimisticRead() {
long stamp = stampedLock.tryOptimisticRead();
// 读操作
if (!stampedLock.validate(stamp)) {
// 乐观读失败,转为悲观读
stamp = stampedLock.readLock();
try {
// 重新读数据
} finally {
stampedLock.unlockRead(stamp);
}
}
}
public void write() {
long stamp = stampedLock.writeLock();
try {
// 写操作
} finally {
stampedLock.unlockWrite(stamp);
}
}
|
使用场景:
- 读操作远多于写操作。
- 需要高性能的无锁读(通过乐观锁)。
4. Condition
常用方法:
await()
:线程使当前线程进入等待状态,直到被其他线程唤醒。
signal()
:唤醒一个等待线程。
signalAll()
:唤醒所有等待线程。
示例(生产者-消费者模型):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
ReentrantLock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
Queue<Integer> queue = new LinkedList<>();
int capacity = 10;
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待队列不满
}
queue.add(item);
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待队列不空
}
int item = queue.remove();
notFull.signal(); // 通知生产者
return item;
} finally {
lock.unlock();
}
}
|
使用场景:
对比与总结:
特性 |
ReentrantLock |
ReentrantReadWriteLock |
StampedLock |
锁类型 |
独占锁 |
读写分离 |
读锁、写锁、乐观读 |
重入性 |
支持 |
支持 |
写锁不可重入 |
公平性 |
支持公平/非公平 |
支持公平/非公平 |
仅非公平 |
性能 |
中等 |
读多写少时较高 |
极高(尤其乐观读场景) |
AQS 依赖 |
是 |
是 |
否 |
适用场景 |
精确控制锁的获取/释放 |
读多写少 |
读远多于写的高并发场景 |
注意事项:
StampedLock
的乐观读需要手动验证戳记,使用不当可能导致数据不一致。
ReentrantReadWriteLock
在持续高并发读时可能导致写锁饥饿。
- 避免在
StampedLock
中嵌套锁操作(容易导致死锁)。
通过合理选择锁类型,可以显著提升并发程序的性能和可靠性。
二、原子类(Atomic Classes)
1. 原子类原理
原子类的核心原理是 CAS(Compare-And-Swap),一种无锁并发算法。CAS 包含三个操作:
- 内存地址(V):要修改的共享变量。
- 预期原值(A):线程认为当前的值。
- 新值(B):希望更新的值。
操作流程:
- 读取内存中的当前值。
- 检查当前值是否等于预期原值
A
。
- 如果相等,将内存值更新为新值
B
;否则,放弃操作或重试。
CAS 通过硬件指令(如 x86 的 CMPXCHG
)保证原子性,避免了锁的开销,适合高并发场景。
2. 常见原子类及方法
以下是 Java 中常用的原子类及其核心方法:
原子类 |
用途 |
核心方法 |
AtomicInteger |
原子整型操作 |
incrementAndGet() , getAndAdd(int delta) , compareAndSet(int expect, int update) |
AtomicLong |
原子长整型操作 |
类似 AtomicInteger ,支持 long 类型 |
AtomicBoolean |
原子布尔操作 |
compareAndSet(boolean expect, boolean update) |
AtomicReference<V> |
原子对象引用操作 |
getAndSet(V newValue) , compareAndSet(V expect, V update) |
AtomicIntegerArray |
原子整型数组操作 |
getAndAdd(int index, int delta) |
AtomicStampedReference<V> |
解决 CAS 的 ABA 问题 |
getStamp() , compareAndSet(V expected, V newValue, int expectedStamp, int newStamp) |
3. 代码示例
1. AtomicInteger
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet(); // 原子递增
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.addAndGet(2); // 原子加2
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Final counter value: " + counter.get()); // 输出 3000
}
}
|
2. AtomicReference
示例
1
2
3
4
5
6
7
8
9
10
11
|
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceExample {
private static AtomicReference<String> message = new AtomicReference<>("Hello");
public static void main(String[] args) {
// 原子更新引用
message.compareAndSet("Hello", "World");
System.out.println(message.get()); // 输出 "World"
}
}
|
3. AtomicStampedReference
示例(解决 ABA 问题)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import java.util.concurrent.atomic.AtomicStampedReference;
public class ABAExample {
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(100, 0); // 初始值 100,版本号 0
public static void main(String[] args) {
int stamp = atomicStampedRef.getStamp();
Integer value = atomicStampedRef.getReference();
// 模拟其他线程修改值并恢复(ABA 场景)
atomicStampedRef.compareAndSet(value, 200, stamp, stamp + 1); // 修改为 200,版本号+1
atomicStampedRef.compareAndSet(200, 100, stamp + 1, stamp + 2); // 修改回 100,版本号+1
// 当前线程尝试更新(预期版本号为初始值 0,但实际版本号已变为 2,更新失败)
boolean success = atomicStampedRef.compareAndSet(100, 300, stamp, stamp + 1);
System.out.println("Update success? " + success); // 输出 false
}
}
|
4. 适用场景
- 计数器:如
AtomicInteger
实现线程安全的计数器。
- 状态标志:如
AtomicBoolean
控制开关状态。
- 无锁数据结构:如无锁队列、栈(基于
AtomicReference
)。
- 避免 ABA 问题:使用
AtomicStampedReference
或 AtomicMarkableReference
。
总结:
- 优势:原子类通过 CAS 实现无锁并发,性能优于
synchronized
或显式锁。
- 局限性:
- 高竞争场景可能导致频繁 CAS 重试,降低性能。
- 无法保证复合操作的原子性(如多个变量同时更新)。
- 选择建议:
- 简单原子操作(如计数)优先使用原子类。
- 复杂场景需结合锁或
java.util.concurrent
包中的高级工具(如 ConcurrentHashMap
)。
三、并发集合
1. ConcurrentHashMap
ConcurrentHashMap 是一个线程安全的哈希表实现,JDK 1.8优化使用CAS + synchronized
替代分段锁,支持高并发读写操作,读操作通常不需要加锁,写操作则通过细粒度锁实现。
核心原理:
数据结构:
-
数组 + 链表/红黑树:
- 底层是一个数组,每个数组元素称为一个桶(Bucket)。
- 如果某个桶中的元素数量较少,则使用链表存储;如果元素数量较多,则转换为红黑树以提高查询效率。
-
动态扩容:
- 当哈希表的容量不足时,会触发扩容操作。
- 扩容过程中,新旧数组并存,读操作可以继续访问旧数组,写操作逐步迁移到新数组。
并发控制:
-
细粒度锁:
- 每个桶(Bucket)独立加锁,减少了锁的竞争。
- 不同线程可以同时操作不同的桶,提高了并发性能。
-
CAS 操作:
- 写操作优先使用 CAS 操作尝试更新数据。
- 如果 CAS 失败,则回退并重试,或者使用 synchronized 锁定单个桶。
常用方法:
- put(K key, V value):插入键值对。
- get(Object key):获取指定键的值。
- remove(Object key):删除指定键的键值对。
- computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction):如果键不存在,则根据提供的函数计算并插入值。
- forEach(BiConsumer<? super K, ? super V> action):遍历所有键值对。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 ConcurrentHashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 启动多个线程进行写操作
Thread writer1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
map.put("Key" + i, i);
System.out.println("Writer1 added: Key" + i + " -> " + i);
}
});
Thread writer2 = new Thread(() -> {
for (int i = 5; i < 10; i++) {
map.put("Key" + i, i);
System.out.println("Writer2 added: Key" + i + " -> " + i);
}
});
writer1.start();
writer2.start();
// 等待写线程完成
writer1.join();
writer2.join();
// 启动一个线程进行读操作
Thread reader = new Thread(() -> {
map.forEach((key, value) -> {
System.out.println("Reader found: " + key + " -> " + value);
});
});
reader.start();
reader.join();
System.out.println("Final map size: " + map.size());
}
}
|
2. CopyOnWriteArrayList
CopyOnWriteArrayList 是一个线程安全的动态数组实现,写操作(如添加、删除、修改)会复制整个底层数组,因此写操作较慢,但读操作非常快且无锁,适用于读多写少的场景。
核心原理:
-
写时复制(Copy-On-Write):
- 每次写操作(如添加、删除、修改)都会创建底层数组的一个副本,并在副本上进行修改。
- 修改完成后,将引用指向新的数组。
- 由于写操作会复制整个数组,因此写操作较慢,但读操作非常快且无锁。
-
无锁读操作:
- 读操作直接访问底层数组,无需加锁。
- 即使写操作正在进行,读操作仍然可以安全地访问旧数组。
数据结构:
-
底层数组:
- 数据存储在一个数组中。
- 写操作会创建数组的副本,并在副本上进行修改。
-
迭代器快照:
- 迭代器返回的是数组的一个快照(Snapshot),不会反映后续的修改。
- 这种设计保证了迭代器的一致性,避免了并发修改异常(ConcurrentModificationException)。
并发控制:
-
写操作加锁:
- 写操作使用 ReentrantLock 加锁,确保同一时间只有一个线程可以修改数组。
- 写操作完成后,将引用指向新的数组。
-
读操作无锁:
- 读操作直接访问底层数组,无需加锁。
- 由于写操作会创建数组的副本,读操作始终访问的是旧数组,因此不会受到写操作的影响。
常用方法:
- add(E e):添加元素。
- remove(Object o):删除指定元素。
- get(int index):获取指定索引处的元素。
- size():返回列表的大小。
- iterator():返回一个迭代器(快照形式,不会反映后续修改)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteArrayListExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 CopyOnWriteArrayList
List<String> list = new CopyOnWriteArrayList<>();
// 启动多个线程进行写操作
Thread writer1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add("Item" + i);
System.out.println("Writer1 added: Item" + i);
}
});
Thread writer2 = new Thread(() -> {
for (int i = 5; i < 10; i++) {
list.add("Item" + i);
System.out.println("Writer2 added: Item" + i);
}
});
writer1.start();
writer2.start();
// 等待写线程完成
writer1.join();
writer2.join();
// 启动一个线程进行读操作
Thread reader = new Thread(() -> {
for (String item : list) {
System.out.println("Reader found: " + item);
}
});
reader.start();
reader.join();
System.out.println("Final list size: " + list.size());
}
}
|
特性 |
ConcurrentHashMap |
CopyOnWriteArrayList |
底层结构 |
数组 + 链表/红黑树 |
动态数组 |
线程安全性 |
分段锁/CAS 操作 |
写时复制(Copy-On-Write) |
读操作性能 |
高效,无锁 |
高效,无锁 |
写操作性能 |
较高,细粒度锁/CAS 操作 |
较低,每次写操作复制整个数组 |
适用场景 |
频繁读取和少量更新 |
读多写少 |
四、同步辅助类
1. CountDownLatch
1
2
3
4
|
CountDownLatch latch = new CountDownLatch(3);
// 线程完成时调用 latch.countDown()
latch.await(); // 主线程等待所有任务完成
|
2. CyclicBarrier
1
2
3
|
CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("All threads reached barrier"));
// 线程中调用 barrier.await()
|
3. Semaphore
作用:控制同时访问某一资源的线程数量。
Semaphore 的常用方法:
方法 |
描述 |
Semaphore(int permits) |
创建一个具有指定许可数量的 Semaphore,默认非公平。 |
Semaphore(int permits, boolean fair) |
创建一个具有指定许可数量的 Semaphore,并指定是否公平。 |
void acquire() |
获取一个许可,如果没有可用许可,线程将被阻塞。 |
void acquireUninterruptibly() |
获取一个许可,忽略中断。 |
int availablePermits() |
返回当前可用的许可数量。 |
int drainPermits() |
获取并返回所有可用的许可,将许可数量置为 0。 |
boolean tryAcquire() |
尝试获取一个许可,如果没有可用许可立即返回 false,不会阻塞。 |
boolean tryAcquire(long timeout, TimeUnit unit) |
尝试在指定时间内获取一个许可,如果获取不到返回 false。 |
void release() |
释放一个许可,增加可用许可数量。 |
void release(int permits) |
释放指定数量的许可,增加可用许可数量。 |
int getQueueLength() |
返回等待获取许可的线程数量。 |
boolean hasQueuedThreads() |
检查是否有线程在等待获取许可。 |
boolean isFair() |
检查 Semaphore 是否是公平的。 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private static final int MAX_PERMITS = 3;
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(MAX_PERMITS);
for (int i = 0; i < 5; i++) {
new Thread(new Worker(semaphore, "Worker-" + i)).start();
}
}
static class Worker implements Runnable {
private final Semaphore semaphore;
private final String name;
public Worker(Semaphore semaphore, String name) {
this.semaphore = semaphore;
this.name = name;
}
@Override
public void run() {
try {
System.out.println(name + " is trying to acquire a permit.");
semaphore.acquire();
System.out.println(name + " acquired a permit.");
// 模拟工作时间
Thread.sleep(2000);
System.out.println(name + " is releasing the permit.");
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
|
五、Future与CompletableFuture
1. Future
Future 是 Java 并发包(java.util.concurrent)中的一个接口,用于表示异步计算的结果。它提供了一种机制来获取异步任务的执行结果,而不需要阻塞调用线程。
主要作用:
- 异步计算结果:表示异步任务的结果。
- 获取结果:提供方法来获取异步任务的结果。
- 取消任务:提供方法来取消异步任务。
常用方法:
方法 |
描述 |
boolean cancel(boolean mayInterruptIfRunning) |
尝试取消任务。如果任务已完成或已取消,返回 false。如果任务正在运行且 mayInterruptIfRunning 为 true,则中断任务。 |
boolean isCancelled() |
检查任务是否被取消。 |
boolean isDone() |
检查任务是否已完成。 |
V get() |
获取任务的结果,如果任务尚未完成,则阻塞调用线程。 |
V get(long timeout, TimeUnit unit) |
获取任务的结果,如果任务在指定时间内未完成,则抛出 TimeoutException。 |
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) {
// 创建一个单线程的 ExecutorService
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交一个异步任务,任务执行 2 秒后返回结果 42
Future<Integer> future = executor.submit(() -> {
System.out.println("Task started");
Thread.sleep(2000);
System.out.println("Task completed");
return 42;
});
try {
System.out.println("Waiting for result...");
Integer result = future.get(); // 使用 future.get() 阻塞等待任务结果
System.out.println("Result: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
|
2. CompletableFuture
CompletableFuture 是 Java 8 引入的一个类,它是 Future 的增强版本,提供了更强大的异步编程功能。CompletableFuture 允许你以声明式的方式组合多个异步任务,并处理它们的结果。
主要作用:
- 异步计算结果:表示异步任务的结果。
- 组合任务:提供方法来组合多个异步任务。
- 处理结果:提供方法来处理任务的结果。
- 异常处理:提供方法来处理任务执行过程中发生的异常。
常用方法:
方法 |
描述 |
static CompletableFuture supplyAsync(Supplier supplier) |
异步执行 Supplier 并返回 CompletableFuture。 |
static CompletableFuture supplyAsync(Supplier supplier, Executor executor) |
使用指定的 Executor 异步执行 Supplier 并返回 CompletableFuture。 |
static CompletableFuture runAsync(Runnable runnable) |
异步执行 Runnable 并返回 CompletableFuture。 |
static CompletableFuture runAsync(Runnable runnable, Executor executor) |
使用指定的 Executor 异步执行 Runnable 并返回 CompletableFuture。 |
CompletableFuture thenApply(Function<? super T, ? extends U> fn) |
在任务完成后应用一个函数并返回一个新的 CompletableFuture。 |
CompletableFuture thenAccept(Consumer<? super T> action) |
在任务完成后执行一个操作并返回一个新的 CompletableFuture。 |
CompletableFuture thenRun(Runnable action) |
在任务完成后执行一个操作并返回一个新的 CompletableFuture。 |
CompletableFuture exceptionally(Function<Throwable, ? extends T> fn) |
在任务抛出异常时应用一个函数并返回一个新的 CompletableFuture。 |
CompletableFuture whenComplete(BiConsumer<? super T, ? super Throwable> action) |
在任务完成后执行一个操作,无论任务是否成功。 |
CompletableFuture handle(BiFunction<? super T, Throwable, ? extends U> fn) |
在任务完成后应用一个函数,无论任务是否成功。 |
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) {
// 异步执行任务并返回 CompletableFuture
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Task started");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task completed");
return 42;
});
// 处理任务结果
future.thenAccept(result -> {
System.out.println("Result: " + result);
});
// 异步执行另一个任务
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task 2 started");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task 2 completed");
return "Hello";
});
// 组合两个任务
CompletableFuture<String> combinedFuture = future.thenCombine(future2, (result1, result2) -> {
return result2 + " " + result1;
});
// 处理组合任务的结果
combinedFuture.thenAccept(result -> {
System.out.println("Combined Result: " + result);
});
// 异常处理
CompletableFuture<Integer> futureWithException = CompletableFuture.supplyAsync(() -> {
System.out.println("Task with Exception started");
throw new RuntimeException("Something went wrong");
}).exceptionally(ex -> {
System.out.println("Exception: " + ex.getMessage());
return 0;
});
// 处理异常任务的结果
futureWithException.thenAccept(result -> {
System.out.println("Exception Task Result: " + result);
});
// 防止主线程立即退出
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|
3. FutureTask
FutureTask 是 Java 并发包(java.util.concurrent,简称 JUC)中的一个类,它实现了 RunnableFuture 接口,该接口继承了 Runnable 和 Future 接口。FutureTask 可以包装一个 Callable 对象,允许你以 Runnable 的形式执行任务,并提供 Future 的功能来获取任务的结果。
主要作用:
- 包装 Callable:允许将 Callable 对象包装成 Runnable 对象,从而可以提交到 ExecutorService 中执行。
- 获取任务结果:提供 Future 接口的功能,可以获取异步任务的结果、检查任务状态、取消任务等。
使用场景:
- 异步任务执行:需要异步执行任务并获取结果。
- 线程池管理:通过 ExecutorService 提交任务,管理线程池中的任务。
- 任务取消:需要取消正在执行的任务。
- 任务状态检查:需要检查任务是否完成或取消。
常用方法:
方法 |
描述 |
FutureTask(Callable callable) |
使用 Callable 构造 FutureTask |
FutureTask(Runnable runnable, V result) |
使用 Runnable 和结果构造 FutureTask |
void run() |
执行任务 |
boolean cancel(boolean mayInterruptIfRunning) |
尝试取消任务。如果任务已完成或已取消,返回 false。如果任务正在运行且 mayInterruptIfRunning 为 true,则中断任务 |
boolean isCancelled() |
检查任务是否被取消 |
boolean isDone() |
检查任务是否已完成 |
V get() |
获取任务的结果,如果任务尚未完成,则阻塞调用线程 |
V get(long timeout, TimeUnit unit) |
获取任务的结果,如果任务在指定时间内未完成,则抛出 TimeoutException |
void set(V v) |
设置任务的结果 |
void setException(Throwable t) |
设置任务的异常 |
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
// 基本用法
public class FutureTaskExample {
public static void main(String[] args) {
// 创建一个 Callable 任务
Callable<Integer> callableTask = () -> {
System.out.println("Callable task started");
Thread.sleep(2000);
System.out.println("Callable task completed");
return 42;
};
// 使用 FutureTask 包装 Callable 任务
FutureTask<Integer> futureTask = new FutureTask<>(callableTask);
// 创建一个线程并启动
Thread thread = new Thread(futureTask);
thread.start();
try {
System.out.println("Main thread waiting for result...");
// 获取任务结果,阻塞直到任务完成
Integer result = futureTask.get();
System.out.println("Result: " + result); // Result: 42
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
// 使用ExecutorService
public class FutureTaskWithExecutorExample {
public static void main(String[] args) {
// 创建一个 Callable 任务
Callable<Integer> callableTask = () -> {
System.out.println("Callable task started");
Thread.sleep(2000);
System.out.println("Callable task completed");
return 42;
};
// 使用 FutureTask 包装 Callable 任务
FutureTask<Integer> futureTask = new FutureTask<>(callableTask);
// 创建一个 ExecutorService
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交 FutureTask 到 ExecutorService
executor.submit(futureTask);
try {
System.out.println("Main thread waiting for result...");
// 获取任务结果,阻塞直到任务完成
Integer result = futureTask.get();
System.out.println("Result: " + result); // Result: 42
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
// 异常处理
public class FutureTaskExceptionExample {
public static void main(String[] args) {
// 创建一个 Callable 任务,该任务会抛出异常
Callable<Integer> callableTask = () -> {
System.out.println("Callable task started");
Thread.sleep(2000);
throw new RuntimeException("Something went wrong");
};
// 使用 FutureTask 包装 Callable 任务
FutureTask<Integer> futureTask = new FutureTask<>(callableTask);
// 创建一个线程并启动
Thread thread = new Thread(futureTask);
thread.start();
try {
System.out.println("Main thread waiting for result...");
// 获取任务结果,阻塞直到任务完成
Integer result = futureTask.get();
System.out.println("Result: " + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("Exception caught: " + e.getCause().getMessage());
}
}
}
// 使用set和setException方法
public class FutureTaskSetExample {
public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask<>(() -> {
// 模拟任务执行
Thread.sleep(2000);
return "Task Result";
});
// 创建一个线程并启动
Thread thread = new Thread(futureTask);
thread.start();
// 模拟主线程设置结果
try {
Thread.sleep(1000);
futureTask.set("Forced Result");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("Main thread waiting for result...");
// 获取任务结果,阻塞直到任务完成
String result = futureTask.get();
System.out.println("Result: " + result); // Result: Forced Result
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
|
六、AQS(AbstractQueuedSynchronizer)
AbstractQueuedSynchronizer(AQS)是 Java 并发包(java.util.concurrent,简称 JUC)中的一个核心类。它提供了一种框架来实现依赖于先进先出(FIFO)队列的阻塞锁和相关的同步器(如信号量、读写锁等)。AQS 的设计使得开发者可以专注于同步状态的管理,而不需要关心线程排队和唤醒的复杂逻辑。
原理:AQS 的核心思想是通过一个整型状态变量 state 来表示同步状态,并使用 FIFO 队列来管理等待获取同步状态的线程。主要操作包括:
- 获取同步状态:线程尝试获取同步状态(如锁),如果获取失败则进入等待队列。
- 释放同步状态:线程释放同步状态(如解锁),并唤醒等待队列中的下一个线程。
- 等待队列:AQS 使用一个双向链表来实现 FIFO 等待队列,每个节点代表一个等待中的线程。
AQS 的优势:
- 简化同步器实现:AQS 提供了统一的线程排队和唤醒机制,开发者只需关注同步状态的管理。
- 高效并发控制:AQS 内部使用了高效的 CAS 操作和自旋锁,减少了线程切换的开销。
- 灵活扩展:AQS 提供了丰富的钩子方法,方便开发者根据需求定制同步逻辑。
JUC 中使用 AQS 的工具:
工具 |
描述 |
ReentrantLock |
可重入锁,支持公平和非公平模式。 |
ReentrantReadWriteLock |
读写锁,允许多个读线程同时访问,但写线程独占访问。 |
Semaphore |
许可证控制,限制同时访问某一资源的线程数量。 |
CountDownLatch |
计数器倒计时门闩,当计数器归零时允许所有等待线程继续执行。 |
CyclicBarrier |
循环屏障,多个线程在某个屏障点等待,直到所有线程都到达屏障点才继续执行。 |
FutureTask |
可取消的任务,用于异步计算结果。 |
SynchronousQueue |
同步队列,每次插入操作必须等待另一个线程的移除操作,反之亦然。 |
Exchanger |
交换器,两个线程可以在某个点交换数据。 |
示例代码:一个简单的自定义锁实现,基于 AQS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
class SimpleLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
// 测试代码
public class SimpleLockTest {
public static void main(String[] args) {
SimpleLock lock = new SimpleLock();
Thread t1 = new Thread(() -> {
lock.lock();
System.out.println("Thread 1 acquired the lock.");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 1 releasing the lock.");
lock.unlock();
});
Thread t2 = new Thread(() -> {
System.out.println("Thread 2 trying to acquire the lock...");
lock.lock();
System.out.println("Thread 2 acquired the lock.");
lock.unlock();
});
t1.start();
t2.start();
}
}
|
总结
JUC提供了丰富的并发工具,合理使用可以显著提升多线程程序的性能和可靠性。掌握其核心组件(线程池、锁、原子类、并发集合)的原理和使用场景,结合实践加深理解,是Java并发编程的关键。