背景

本文是《Java 后端从小白到大神》修仙系列第十一篇,正式进入Java后端世界,本篇文章主要聊Java基础。若想详细学习请点击首篇博文,我们开始把。

文章概览

  1. 核心工具类(Java JUC)

核心工具类(续)

一、锁(Lock)

锁的类型

  1. 显式锁:需要手动获取和释放锁(如 ReentrantLock)。
  2. 隐式锁:通过 synchronized 关键字隐式管理锁。
  3. 读写锁:区分读锁和写锁(如 ReentrantReadWriteLock)。
  4. 乐观锁:通过版本号或戳记(Stamp)实现无锁读(如 StampedLock)。

1. ReentrantLock

核心原理(可重入独占锁)

  • 基于 AQS(AbstractQueuedSynchronizer):通过 state 变量(int 类型)表示锁的状态:
    • state = 0:锁未被占用。
    • state > 0:锁被占用,数值表示重入次数(同一线程多次获取锁)。
  • 公平性策略
    • 非公平锁(默认):线程直接尝试通过 CAS 抢占锁,失败后进入等待队列。
    • 公平锁:严格按照队列顺序获取锁,避免线程饥饿。
  • 重入机制:记录当前持有锁的线程(exclusiveOwnerThread),每次重入时 state 自增。
  • 锁释放:释放锁时 state 自减,直到 state = 0 时完全释放。

关键流程

  1. 加锁

    1
    2
    3
    4
    5
    6
    
    final void lock() {
        if (compareAndSetState(0, 1)) // 非公平锁尝试直接抢占
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1); // 进入 AQS 队列等待
    }
    
  2. 解锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = (c == 0);
        if (free) setExclusiveOwnerThread(null);
        setState(c);
        return free;
    }
    

常用方法:

  • lock():获取锁,如果锁不可用则阻塞当前线程。
  • tryLock():尝试获取锁,如果锁可用则立即返回 true,否则返回 false。
  • tryLock(long timeout, TimeUnit unit):尝试在指定时间内获取锁,超时则返回 false。
  • unlock():释放锁。
  • isHeldByCurrentThread():检查当前线程是否持有该锁。
  • getHoldCount():获取当前线程持有该锁的次数。

示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
ReentrantLock lock = new ReentrantLock();

public void doWork() {
    lock.lock();
    try {
        // 临界区代码
    } finally {
        lock.unlock();
    }
}

// 尝试获取锁(超时)
public void tryDoWork() throws InterruptedException {
    if (lock.tryLock(1, TimeUnit.SECONDS)) {
        try {
            // 临界区代码
        } finally {
            lock.unlock();
        }
    }
}

使用场景

  • 需要手动控制锁的获取和释放。
  • 需要可中断或超时获取锁(如避免死锁)。

2. ReentrantReadWriteLock

核心原理(可重入读写锁)

  • 读写分离:将锁分为读锁(共享)和写锁(独占)。
  • AQS 状态拆分
    • 高 16 位:读锁的持有次数(共享模式)。
    • 低 16 位:写锁的重入次数(独占模式)。
  • 互斥规则
    • 写锁获取时,禁止其他线程获取读锁或写锁。
    • 读锁获取时,允许其他线程获取读锁,但禁止写锁。

关键实现

  • 读锁(ReadLock)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    protected final int tryAcquireShared(int unused) {
        // 检查是否有写锁被其他线程持有
        if (exclusiveCount(getState()) != 0 && getExclusiveOwnerThread() != current)
            return -1; // 获取失败
        int r = sharedCount(getState()); // 当前读锁数量
        if (!readerShouldBlock() && r < MAX_COUNT) {
            if (compareAndSetState(c, c + SHARED_UNIT)) // CAS 更新读锁数量
                return 1; // 成功获取读锁
        }
        return fullTryAcquireShared(current); // 完整尝试(处理竞争)
    }
    
  • 写锁(WriteLock)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    protected final boolean tryAcquire(int acquires) {
        // 检查是否有其他线程持有锁(读或写)
        if (getState() != 0) {
            if (current != getExclusiveOwnerThread())
                return false; // 被其他线程占用
        }
        // CAS 更新写锁状态
        if (compareAndSetState(c, c + acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
        return false;
    }
    

常用方法

  • readLock():获取读锁。
  • writeLock():获取写锁。

示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

public void readData() {
    rwLock.readLock().lock();
    try {
        // 读操作
    } finally {
        rwLock.readLock().unlock();
    }
}

public void writeData() {
    rwLock.writeLock().lock();
    try {
        // 写操作
    } finally {
        rwLock.writeLock().unlock();
    }
}

使用场景

  • 读多写少的场景(如缓存系统)。
  • 允许并发读但互斥写。

3. StampedLock

核心原理(基于戳记的高性能锁)

  • 乐观读(Optimistic Read):无锁读取数据,通过戳记(long stamp)验证数据是否被修改。
  • 锁模式
    • 读锁(Read Lock):共享锁,允许多线程同时读。
    • 写锁(Write Lock):独占锁,互斥其他所有锁。
  • 戳记(Stamp):一个版本号,用于检测锁状态变化。
  • 锁转换:支持从读锁升级为写锁(需释放读锁后重新获取)。

关键机制

  • 乐观读流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    public long tryOptimisticRead() {
        long s;
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }
    
    public boolean validate(long stamp) {
        // 检查戳记是否有效(期间是否有写操作)
        return (stamp & SBITS) == (state & SBITS);
    }
    
  • 写锁获取

    1
    2
    3
    4
    5
    6
    7
    8
    
    public long writeLock() {
        long s, next;
        while (((s = state) & ABITS) == 0L) { // 无锁时尝试获取
            if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                return next;
        }
        return 0L;
    }
    

常用方法

  • readLock():获取读锁。
  • writeLock():获取写锁。
  • tryOptimisticRead():尝试乐观读。
  • validate(stamp):验证乐观读是否有效。

示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
StampedLock stampedLock = new StampedLock();

public void optimisticRead() {
    long stamp = stampedLock.tryOptimisticRead();
    // 读操作
    if (!stampedLock.validate(stamp)) {
        // 乐观读失败,转为悲观读
        stamp = stampedLock.readLock();
        try {
            // 重新读数据
        } finally {
            stampedLock.unlockRead(stamp);
        }
    }
}

public void write() {
    long stamp = stampedLock.writeLock();
    try {
        // 写操作
    } finally {
        stampedLock.unlockWrite(stamp);
    }
}

使用场景

  • 读操作远多于写操作。
  • 需要高性能的无锁读(通过乐观锁)。

4. Condition

常用方法

  • await():线程使当前线程进入等待状态,直到被其他线程唤醒。
  • signal():唤醒一个等待线程。
  • signalAll():唤醒所有等待线程。

示例(生产者-消费者模型)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ReentrantLock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
Queue<Integer> queue = new LinkedList<>();
int capacity = 10;

public void produce(int item) throws InterruptedException {
    lock.lock();
    try {
        while (queue.size() == capacity) {
            notFull.await(); // 等待队列不满
        }
        queue.add(item);
        notEmpty.signal(); // 通知消费者
    } finally {
        lock.unlock();
    }
}

public int consume() throws InterruptedException {
    lock.lock();
    try {
        while (queue.isEmpty()) {
            notEmpty.await(); // 等待队列不空
        }
        int item = queue.remove();
        notFull.signal(); // 通知生产者
        return item;
    } finally {
        lock.unlock();
    }
}

使用场景

  • 线程间基于条件的协调(如生产者-消费者模型)。

对比与总结

特性 ReentrantLock ReentrantReadWriteLock StampedLock
锁类型 独占锁 读写分离 读锁、写锁、乐观读
重入性 支持 支持 写锁不可重入
公平性 支持公平/非公平 支持公平/非公平 仅非公平
性能 中等 读多写少时较高 极高(尤其乐观读场景)
AQS 依赖
适用场景 精确控制锁的获取/释放 读多写少 读远多于写的高并发场景

注意事项

  • StampedLock 的乐观读需要手动验证戳记,使用不当可能导致数据不一致。
  • ReentrantReadWriteLock 在持续高并发读时可能导致写锁饥饿。
  • 避免在 StampedLock 中嵌套锁操作(容易导致死锁)。

通过合理选择锁类型,可以显著提升并发程序的性能和可靠性。

二、原子类(Atomic Classes)

1. 原子类原理

原子类的核心原理是 CAS(Compare-And-Swap),一种无锁并发算法。CAS 包含三个操作:

  1. 内存地址(V):要修改的共享变量。
  2. 预期原值(A):线程认为当前的值。
  3. 新值(B):希望更新的值。

操作流程

  1. 读取内存中的当前值。
  2. 检查当前值是否等于预期原值 A
  3. 如果相等,将内存值更新为新值 B;否则,放弃操作或重试。

CAS 通过硬件指令(如 x86 的 CMPXCHG)保证原子性,避免了锁的开销,适合高并发场景。

2. 常见原子类及方法

以下是 Java 中常用的原子类及其核心方法:

原子类 用途 核心方法
AtomicInteger 原子整型操作 incrementAndGet(), getAndAdd(int delta), compareAndSet(int expect, int update)
AtomicLong 原子长整型操作 类似 AtomicInteger,支持 long 类型
AtomicBoolean 原子布尔操作 compareAndSet(boolean expect, boolean update)
AtomicReference<V> 原子对象引用操作 getAndSet(V newValue), compareAndSet(V expect, V update)
AtomicIntegerArray 原子整型数组操作 getAndAdd(int index, int delta)
AtomicStampedReference<V> 解决 CAS 的 ABA 问题 getStamp(), compareAndSet(V expected, V newValue, int expectedStamp, int newStamp)

3. 代码示例

1. AtomicInteger 示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {
    private static AtomicInteger counter = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.incrementAndGet(); // 原子递增
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                counter.addAndGet(2); // 原子加2
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("Final counter value: " + counter.get()); // 输出 3000
    }
}
2. AtomicReference 示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceExample {
    private static AtomicReference<String> message = new AtomicReference<>("Hello");

    public static void main(String[] args) {
        // 原子更新引用
        message.compareAndSet("Hello", "World");
        System.out.println(message.get()); // 输出 "World"
    }
}
3. AtomicStampedReference 示例(解决 ABA 问题)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.atomic.AtomicStampedReference;

public class ABAExample {
    private static AtomicStampedReference<Integer> atomicStampedRef = 
        new AtomicStampedReference<>(100, 0); // 初始值 100,版本号 0

    public static void main(String[] args) {
        int stamp = atomicStampedRef.getStamp();
        Integer value = atomicStampedRef.getReference();

        // 模拟其他线程修改值并恢复(ABA 场景)
        atomicStampedRef.compareAndSet(value, 200, stamp, stamp + 1); // 修改为 200,版本号+1
        atomicStampedRef.compareAndSet(200, 100, stamp + 1, stamp + 2); // 修改回 100,版本号+1

        // 当前线程尝试更新(预期版本号为初始值 0,但实际版本号已变为 2,更新失败)
        boolean success = atomicStampedRef.compareAndSet(100, 300, stamp, stamp + 1);
        System.out.println("Update success? " + success); // 输出 false
    }
}

4. 适用场景

  1. 计数器:如 AtomicInteger 实现线程安全的计数器。
  2. 状态标志:如 AtomicBoolean 控制开关状态。
  3. 无锁数据结构:如无锁队列、栈(基于 AtomicReference)。
  4. 避免 ABA 问题:使用 AtomicStampedReferenceAtomicMarkableReference

总结

  • 优势:原子类通过 CAS 实现无锁并发,性能优于 synchronized 或显式锁。
  • 局限性
    • 高竞争场景可能导致频繁 CAS 重试,降低性能。
    • 无法保证复合操作的原子性(如多个变量同时更新)。
  • 选择建议
    • 简单原子操作(如计数)优先使用原子类。
    • 复杂场景需结合锁或 java.util.concurrent 包中的高级工具(如 ConcurrentHashMap)。

三、并发集合

1. ConcurrentHashMap

ConcurrentHashMap 是一个线程安全的哈希表实现,JDK 1.8优化使用CAS + synchronized替代分段锁,支持高并发读写操作,读操作通常不需要加锁,写操作则通过细粒度锁实现。

核心原理

  • 分段锁(Segment Locking):

    • 在早期版本(Java 7 及之前),ConcurrentHashMap 使用了分段锁机制。
    • 哈希表被划分为多个段(Segment),每个段独立加锁。
    • 不同线程可以同时访问不同的段,从而提高并发性能。
    • 写操作只锁定涉及的段,而读操作通常不需要加锁。
  • CAS 操作(Compare-And-Swap):

    • 在 Java 8 中,ConcurrentHashMap 改进了实现,去掉了分段锁,改为基于 CAS 操作和 synchronized 锁。
    • 写操作使用 CAS 操作尝试更新数据,如果失败则回退并重试。
    • 对于高竞争的情况,会使用 synchronized 锁定单个桶(Bucket)以确保线程安全。
  • 无锁读操作:

    • 读操作通常是无锁的,因为底层数据结构是不可变的(Immutable)。
    • 即使写操作正在进行,读操作仍然可以安全地访问旧数据。

数据结构

  • 数组 + 链表/红黑树:

    • 底层是一个数组,每个数组元素称为一个桶(Bucket)。
    • 如果某个桶中的元素数量较少,则使用链表存储;如果元素数量较多,则转换为红黑树以提高查询效率。
  • 动态扩容:

    • 当哈希表的容量不足时,会触发扩容操作。
    • 扩容过程中,新旧数组并存,读操作可以继续访问旧数组,写操作逐步迁移到新数组。

并发控制

  • 细粒度锁:

    • 每个桶(Bucket)独立加锁,减少了锁的竞争。
    • 不同线程可以同时操作不同的桶,提高了并发性能。
  • CAS 操作:

    • 写操作优先使用 CAS 操作尝试更新数据。
    • 如果 CAS 失败,则回退并重试,或者使用 synchronized 锁定单个桶。

常用方法

  • put(K key, V value):插入键值对。
  • get(Object key):获取指定键的值。
  • remove(Object key):删除指定键的键值对。
  • computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction):如果键不存在,则根据提供的函数计算并插入值。
  • forEach(BiConsumer<? super K, ? super V> action):遍历所有键值对。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个 ConcurrentHashMap
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // 启动多个线程进行写操作
        Thread writer1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                map.put("Key" + i, i);
                System.out.println("Writer1 added: Key" + i + " -> " + i);
            }
        });

        Thread writer2 = new Thread(() -> {
            for (int i = 5; i < 10; i++) {
                map.put("Key" + i, i);
                System.out.println("Writer2 added: Key" + i + " -> " + i);
            }
        });

        writer1.start();
        writer2.start();

        // 等待写线程完成
        writer1.join();
        writer2.join();

        // 启动一个线程进行读操作
        Thread reader = new Thread(() -> {
            map.forEach((key, value) -> {
                System.out.println("Reader found: " + key + " -> " + value);
            });
        });

        reader.start();
        reader.join();

        System.out.println("Final map size: " + map.size());
    }
}

2. CopyOnWriteArrayList

CopyOnWriteArrayList 是一个线程安全的动态数组实现,写操作(如添加、删除、修改)会复制整个底层数组,因此写操作较慢,但读操作非常快且无锁,适用于读多写少的场景。

核心原理

  • 写时复制(Copy-On-Write):

    • 每次写操作(如添加、删除、修改)都会创建底层数组的一个副本,并在副本上进行修改。
    • 修改完成后,将引用指向新的数组。
    • 由于写操作会复制整个数组,因此写操作较慢,但读操作非常快且无锁。
  • 无锁读操作:

    • 读操作直接访问底层数组,无需加锁。
    • 即使写操作正在进行,读操作仍然可以安全地访问旧数组。

数据结构

  • 底层数组:

    • 数据存储在一个数组中。
    • 写操作会创建数组的副本,并在副本上进行修改。
  • 迭代器快照:

    • 迭代器返回的是数组的一个快照(Snapshot),不会反映后续的修改。
    • 这种设计保证了迭代器的一致性,避免了并发修改异常(ConcurrentModificationException)。

并发控制

  • 写操作加锁:

    • 写操作使用 ReentrantLock 加锁,确保同一时间只有一个线程可以修改数组。
    • 写操作完成后,将引用指向新的数组。
  • 读操作无锁:

    • 读操作直接访问底层数组,无需加锁。
    • 由于写操作会创建数组的副本,读操作始终访问的是旧数组,因此不会受到写操作的影响。

常用方法

  • add(E e):添加元素。
  • remove(Object o):删除指定元素。
  • get(int index):获取指定索引处的元素。
  • size():返回列表的大小。
  • iterator():返回一个迭代器(快照形式,不会反映后续修改)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个 CopyOnWriteArrayList
        List<String> list = new CopyOnWriteArrayList<>();

        // 启动多个线程进行写操作
        Thread writer1 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                list.add("Item" + i);
                System.out.println("Writer1 added: Item" + i);
            }
        });

        Thread writer2 = new Thread(() -> {
            for (int i = 5; i < 10; i++) {
                list.add("Item" + i);
                System.out.println("Writer2 added: Item" + i);
            }
        });

        writer1.start();
        writer2.start();

        // 等待写线程完成
        writer1.join();
        writer2.join();

        // 启动一个线程进行读操作
        Thread reader = new Thread(() -> {
            for (String item : list) {
                System.out.println("Reader found: " + item);
            }
        });

        reader.start();
        reader.join();

        System.out.println("Final list size: " + list.size());
    }
}
特性 ConcurrentHashMap CopyOnWriteArrayList
底层结构 数组 + 链表/红黑树 动态数组
线程安全性 分段锁/CAS 操作 写时复制(Copy-On-Write)
读操作性能 高效,无锁 高效,无锁
写操作性能 较高,细粒度锁/CAS 操作 较低,每次写操作复制整个数组
适用场景 频繁读取和少量更新 读多写少

四、同步辅助类

1. CountDownLatch

  • 作用:等待多个线程完成。
1
2
3
4
CountDownLatch latch = new CountDownLatch(3);

// 线程完成时调用 latch.countDown()
latch.await(); // 主线程等待所有任务完成

2. CyclicBarrier

  • 作用:多个线程相互等待,到达屏障后继续执行。
1
2
3
CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("All threads reached barrier"));

// 线程中调用 barrier.await()

3. Semaphore

作用:控制同时访问某一资源的线程数量。

Semaphore 的常用方法

方法 描述
Semaphore(int permits) 创建一个具有指定许可数量的 Semaphore,默认非公平。
Semaphore(int permits, boolean fair) 创建一个具有指定许可数量的 Semaphore,并指定是否公平。
void acquire() 获取一个许可,如果没有可用许可,线程将被阻塞。
void acquireUninterruptibly() 获取一个许可,忽略中断。
int availablePermits() 返回当前可用的许可数量。
int drainPermits() 获取并返回所有可用的许可,将许可数量置为 0。
boolean tryAcquire() 尝试获取一个许可,如果没有可用许可立即返回 false,不会阻塞。
boolean tryAcquire(long timeout, TimeUnit unit) 尝试在指定时间内获取一个许可,如果获取不到返回 false。
void release() 释放一个许可,增加可用许可数量。
void release(int permits) 释放指定数量的许可,增加可用许可数量。
int getQueueLength() 返回等待获取许可的线程数量。
boolean hasQueuedThreads() 检查是否有线程在等待获取许可。
boolean isFair() 检查 Semaphore 是否是公平的。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    private static final int MAX_PERMITS = 3;

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(MAX_PERMITS);

        for (int i = 0; i < 5; i++) {
            new Thread(new Worker(semaphore, "Worker-" + i)).start();
        }
    }

    static class Worker implements Runnable {
        private final Semaphore semaphore;
        private final String name;

        public Worker(Semaphore semaphore, String name) {
            this.semaphore = semaphore;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " is trying to acquire a permit.");
                semaphore.acquire();
                System.out.println(name + " acquired a permit.");

                // 模拟工作时间
                Thread.sleep(2000);

                System.out.println(name + " is releasing the permit.");
                semaphore.release();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

五、Future与CompletableFuture

1. Future

Future 是 Java 并发包(java.util.concurrent)中的一个接口,用于表示异步计算的结果。它提供了一种机制来获取异步任务的执行结果,而不需要阻塞调用线程。

主要作用

  • 异步计算结果:表示异步任务的结果。
  • 获取结果:提供方法来获取异步任务的结果。
  • 取消任务:提供方法来取消异步任务。

常用方法

方法 描述
boolean cancel(boolean mayInterruptIfRunning) 尝试取消任务。如果任务已完成或已取消,返回 false。如果任务正在运行且 mayInterruptIfRunning 为 true,则中断任务。
boolean isCancelled() 检查任务是否被取消。
boolean isDone() 检查任务是否已完成。
V get() 获取任务的结果,如果任务尚未完成,则阻塞调用线程。
V get(long timeout, TimeUnit unit) 获取任务的结果,如果任务在指定时间内未完成,则抛出 TimeoutException。

代码示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) {
        // 创建一个单线程的 ExecutorService
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 提交一个异步任务,任务执行 2 秒后返回结果 42
        Future<Integer> future = executor.submit(() -> {
            System.out.println("Task started");
            Thread.sleep(2000);
            System.out.println("Task completed");
            return 42;
        });

        try {
            System.out.println("Waiting for result...");
            Integer result = future.get(); // 使用 future.get() 阻塞等待任务结果
            System.out.println("Result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

2. CompletableFuture

CompletableFuture 是 Java 8 引入的一个类,它是 Future 的增强版本,提供了更强大的异步编程功能。CompletableFuture 允许你以声明式的方式组合多个异步任务,并处理它们的结果。

主要作用:

  • 异步计算结果:表示异步任务的结果。
  • 组合任务:提供方法来组合多个异步任务。
  • 处理结果:提供方法来处理任务的结果。
  • 异常处理:提供方法来处理任务执行过程中发生的异常。

常用方法:

方法 描述
static CompletableFuture supplyAsync(Supplier supplier) 异步执行 Supplier 并返回 CompletableFuture。
static CompletableFuture supplyAsync(Supplier supplier, Executor executor) 使用指定的 Executor 异步执行 Supplier 并返回 CompletableFuture。
static CompletableFuture runAsync(Runnable runnable) 异步执行 Runnable 并返回 CompletableFuture。
static CompletableFuture runAsync(Runnable runnable, Executor executor) 使用指定的 Executor 异步执行 Runnable 并返回 CompletableFuture。
CompletableFuture thenApply(Function<? super T, ? extends U> fn) 在任务完成后应用一个函数并返回一个新的 CompletableFuture。
CompletableFuture thenAccept(Consumer<? super T> action) 在任务完成后执行一个操作并返回一个新的 CompletableFuture。
CompletableFuture thenRun(Runnable action) 在任务完成后执行一个操作并返回一个新的 CompletableFuture。
CompletableFuture exceptionally(Function<Throwable, ? extends T> fn) 在任务抛出异常时应用一个函数并返回一个新的 CompletableFuture。
CompletableFuture whenComplete(BiConsumer<? super T, ? super Throwable> action) 在任务完成后执行一个操作,无论任务是否成功。
CompletableFuture handle(BiFunction<? super T, Throwable, ? extends U> fn) 在任务完成后应用一个函数,无论任务是否成功。

代码示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 异步执行任务并返回 CompletableFuture
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task started");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task completed");
            return 42;
        });

        // 处理任务结果
        future.thenAccept(result -> {
            System.out.println("Result: " + result);
        });

        // 异步执行另一个任务
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 2 started");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Task 2 completed");
            return "Hello";
        });

        // 组合两个任务
        CompletableFuture<String> combinedFuture = future.thenCombine(future2, (result1, result2) -> {
            return result2 + " " + result1;
        });

        // 处理组合任务的结果
        combinedFuture.thenAccept(result -> {
            System.out.println("Combined Result: " + result);
        });

        // 异常处理
        CompletableFuture<Integer> futureWithException = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task with Exception started");
            throw new RuntimeException("Something went wrong");
        }).exceptionally(ex -> {
            System.out.println("Exception: " + ex.getMessage());
            return 0;
        });

        // 处理异常任务的结果
        futureWithException.thenAccept(result -> {
            System.out.println("Exception Task Result: " + result);
        });

        // 防止主线程立即退出
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3. FutureTask

FutureTask 是 Java 并发包(java.util.concurrent,简称 JUC)中的一个类,它实现了 RunnableFuture 接口,该接口继承了 Runnable 和 Future 接口。FutureTask 可以包装一个 Callable 对象,允许你以 Runnable 的形式执行任务,并提供 Future 的功能来获取任务的结果。

主要作用

  • 包装 Callable:允许将 Callable 对象包装成 Runnable 对象,从而可以提交到 ExecutorService 中执行。
  • 获取任务结果:提供 Future 接口的功能,可以获取异步任务的结果、检查任务状态、取消任务等。

使用场景

  • 异步任务执行:需要异步执行任务并获取结果。
  • 线程池管理:通过 ExecutorService 提交任务,管理线程池中的任务。
  • 任务取消:需要取消正在执行的任务。
  • 任务状态检查:需要检查任务是否完成或取消。

常用方法

方法 描述
FutureTask(Callable callable) 使用 Callable 构造 FutureTask
FutureTask(Runnable runnable, V result) 使用 Runnable 和结果构造 FutureTask
void run() 执行任务
boolean cancel(boolean mayInterruptIfRunning) 尝试取消任务。如果任务已完成或已取消,返回 false。如果任务正在运行且 mayInterruptIfRunning 为 true,则中断任务
boolean isCancelled() 检查任务是否被取消
boolean isDone() 检查任务是否已完成
V get() 获取任务的结果,如果任务尚未完成,则阻塞调用线程
V get(long timeout, TimeUnit unit) 获取任务的结果,如果任务在指定时间内未完成,则抛出 TimeoutException
void set(V v) 设置任务的结果
void setException(Throwable t) 设置任务的异常

代码示例

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// 基本用法
public class FutureTaskExample {
    public static void main(String[] args) {
        // 创建一个 Callable 任务
        Callable<Integer> callableTask = () -> {
            System.out.println("Callable task started");
            Thread.sleep(2000);
            System.out.println("Callable task completed");
            return 42;
        };

        // 使用 FutureTask 包装 Callable 任务
        FutureTask<Integer> futureTask = new FutureTask<>(callableTask);

        // 创建一个线程并启动
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            System.out.println("Main thread waiting for result...");
            // 获取任务结果,阻塞直到任务完成
            Integer result = futureTask.get();
            System.out.println("Result: " + result); // Result: 42
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

// 使用ExecutorService
public class FutureTaskWithExecutorExample {
    public static void main(String[] args) {
        // 创建一个 Callable 任务
        Callable<Integer> callableTask = () -> {
            System.out.println("Callable task started");
            Thread.sleep(2000);
            System.out.println("Callable task completed");
            return 42;
        };

        // 使用 FutureTask 包装 Callable 任务
        FutureTask<Integer> futureTask = new FutureTask<>(callableTask);

        // 创建一个 ExecutorService
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 提交 FutureTask 到 ExecutorService
        executor.submit(futureTask);

        try {
            System.out.println("Main thread waiting for result...");
            // 获取任务结果,阻塞直到任务完成
            Integer result = futureTask.get();
            System.out.println("Result: " + result); // Result: 42
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

// 异常处理
public class FutureTaskExceptionExample {
    public static void main(String[] args) {
        // 创建一个 Callable 任务,该任务会抛出异常
        Callable<Integer> callableTask = () -> {
            System.out.println("Callable task started");
            Thread.sleep(2000);
            throw new RuntimeException("Something went wrong");
        };

        // 使用 FutureTask 包装 Callable 任务
        FutureTask<Integer> futureTask = new FutureTask<>(callableTask);

        // 创建一个线程并启动
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            System.out.println("Main thread waiting for result...");
            // 获取任务结果,阻塞直到任务完成
            Integer result = futureTask.get();
            System.out.println("Result: " + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            System.out.println("Exception caught: " + e.getCause().getMessage());
        }
    }
}

// 使用set和setException方法
public class FutureTaskSetExample {
    public static void main(String[] args) {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            // 模拟任务执行
            Thread.sleep(2000);
            return "Task Result";
        });

        // 创建一个线程并启动
        Thread thread = new Thread(futureTask);
        thread.start();

        // 模拟主线程设置结果
        try {
            Thread.sleep(1000);
            futureTask.set("Forced Result");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println("Main thread waiting for result...");
            // 获取任务结果,阻塞直到任务完成
            String result = futureTask.get();
            System.out.println("Result: " + result); // Result: Forced Result
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

六、AQS(AbstractQueuedSynchronizer)

AbstractQueuedSynchronizer(AQS)是 Java 并发包(java.util.concurrent,简称 JUC)中的一个核心类。它提供了一种框架来实现依赖于先进先出(FIFO)队列的阻塞锁和相关的同步器(如信号量、读写锁等)。AQS 的设计使得开发者可以专注于同步状态的管理,而不需要关心线程排队和唤醒的复杂逻辑。

原理:AQS 的核心思想是通过一个整型状态变量 state 来表示同步状态,并使用 FIFO 队列来管理等待获取同步状态的线程。主要操作包括:

  • 获取同步状态:线程尝试获取同步状态(如锁),如果获取失败则进入等待队列。
  • 释放同步状态:线程释放同步状态(如解锁),并唤醒等待队列中的下一个线程。
  • 等待队列:AQS 使用一个双向链表来实现 FIFO 等待队列,每个节点代表一个等待中的线程。

AQS 的优势

  • 简化同步器实现:AQS 提供了统一的线程排队和唤醒机制,开发者只需关注同步状态的管理。
  • 高效并发控制:AQS 内部使用了高效的 CAS 操作和自旋锁,减少了线程切换的开销。
  • 灵活扩展:AQS 提供了丰富的钩子方法,方便开发者根据需求定制同步逻辑。

JUC 中使用 AQS 的工具

工具 描述
ReentrantLock 可重入锁,支持公平和非公平模式。
ReentrantReadWriteLock 读写锁,允许多个读线程同时访问,但写线程独占访问。
Semaphore 许可证控制,限制同时访问某一资源的线程数量。
CountDownLatch 计数器倒计时门闩,当计数器归零时允许所有等待线程继续执行。
CyclicBarrier 循环屏障,多个线程在某个屏障点等待,直到所有线程都到达屏障点才继续执行。
FutureTask 可取消的任务,用于异步计算结果。
SynchronousQueue 同步队列,每次插入操作必须等待另一个线程的移除操作,反之亦然。
Exchanger 交换器,两个线程可以在某个点交换数据。

示例代码:一个简单的自定义锁实现,基于 AQS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

class SimpleLock {
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

// 测试代码
public class SimpleLockTest {
    public static void main(String[] args) {
        SimpleLock lock = new SimpleLock();

        Thread t1 = new Thread(() -> {
            lock.lock();
            System.out.println("Thread 1 acquired the lock.");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread 1 releasing the lock.");
            lock.unlock();
        });

        Thread t2 = new Thread(() -> {
            System.out.println("Thread 2 trying to acquire the lock...");
            lock.lock();
            System.out.println("Thread 2 acquired the lock.");
            lock.unlock();
        });

        t1.start();
        t2.start();
    }
}

总结

JUC提供了丰富的并发工具,合理使用可以显著提升多线程程序的性能和可靠性。掌握其核心组件(线程池、锁、原子类、并发集合)的原理和使用场景,结合实践加深理解,是Java并发编程的关键。