JDK并发包
# JDK并发包
# 重入锁-ReentrantLock
重入锁(Reentrant Lock)是在Java并发编程中一种重要的锁机制,它具有“可重入”的特性,顾名思义,就是指同一线程在获取到锁之后,可以再次获取该锁而不会被自身阻塞。这种特性使得在使用重入锁的代码块或方法中可以安全地调用另一个也需要同样锁的代码块或方法,而不会发生死锁。
ReentrantLock
是一种可重入锁,它是Java并发API中提供的一个非常重要的工具类,相较于Java原生的synchronized
关键字,它提供了更为灵活和强大的锁定机制。以下是ReentrantLock
的主要特点和功能:
- 可重入性(Reentrancy): 和
synchronized
一样,ReentrantLock
允许同一线程在获取锁之后再次获取锁,这意味着在递归调用或循环内嵌套调用获取锁的方法时,线程不会阻塞自己。 - 公平性选择:
ReentrantLock
支持两种锁模式:公平锁和非公平锁。默认情况下,新创建的ReentrantLock
是非公平锁,它不保证线程获取锁的顺序完全按照它们请求锁的顺序;而公平锁遵循FIFO原则,等待时间最长的线程将会优先得到锁。 - 可中断性: 与
synchronized
不同,ReentrantLock
支持线程在等待锁的过程中被中断。线程可以通过调用lockInterruptibly()
方法来获取锁,若在等待过程中收到中断请求,该方法会抛出InterruptedException
。 - 超时获取锁: 提供了尝试获取锁并设定超时时间的方法
tryLock(long timeout, TimeUnit unit)
,在给定时间内无法获取锁时,线程将返回而不阻塞。 - 条件变量(Condition):
ReentrantLock
允许创建多个条件队列,每个条件队列关联一种线程等待通知的条件。相比于synchronized
中的wait()
和notify()
,ReentrantLock
配合newCondition()
方法创建的Condition
对象提供了更细粒度的线程唤醒和挂起功能,可以避免“条件虚假唤醒”的问题。
以下是一个简单的示例:
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
lock.lock();
try{
i++;
}finally {
lock.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
可以明显的发现,与synchronized相比,重入锁有着显示的操作过程。开发人员必须手动指定何时加锁,何时释放锁。也正因为这样,重入锁对逻辑控制的灵活性要远远好于synchronized。但值得注意的是,在退出临界区时,必须记得释放锁
# 反复进入
它之所叫重入锁时因为该锁可以反复进入.上述简单例子中的加锁部分也可以改成下方,也能起到同样的效果
lock.lock;
lock.lock;
try{
i++;
}finally {
lock.unlock();
lock.unlock();
}
2
3
4
5
6
7
8
它允许一个线程连续两次获得同一把锁。但要注意的是,如果同一个线程多次获得锁,那么在释放的时候,也要释放对应次数。
# 中断相应
对于synchronized来说,如果一个线程在等待锁,那么结果只有两种情况,要么它获得这把锁继续执行,要么它就保持等待。而使用重入锁,则提供另外一种可能,那就是线程可以被中断。也就是在等待锁的过程中,程序可以根据需要取消对锁的请求。有些时候,这么做是非常有必要的。比如,如果你和朋友约好一起去打球,如果你等了半小时,朋友还没有到,突然接到一个电话,说由于突发情况,不能如约了。那么你一定就扫兴地打道回府了。中断正式提供了一套类似的机制。如果一个线程正在等待锁,那么它依然可以收到一个通知,被告知无须再等待,可以停止工作了。这种情况对于处理死锁是有一定帮助的。
public class LockDemo1 implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int num ;
public LockDemo1(int num){
this.num = num;
}
@Override
public void run() {
try {
if(num == 1){
//可以对中断进行响应的获取锁的方式
lock1.lockInterruptibly();
try{
Thread.sleep(1000);
}catch (InterruptedException e){
}lock2.lockInterruptibly();
}else {
lock2.lockInterruptibly();
try{
Thread.sleep(1000);
}catch (InterruptedException e){
}lock1.lockInterruptibly();
}
}catch (InterruptedException e){
e.printStackTrace();
if(lock1.isHeldByCurrentThread()){
lock1.unlock();
}
if(lock2.isHeldByCurrentThread()){
lock2.unlock();
}
}finally {
//检测当前线程是否持有着这把锁
if(lock1.isHeldByCurrentThread()){
lock1.unlock();
}
//检测当前线程是否持有着这把锁
if(lock2.isHeldByCurrentThread()){
lock2.unlock();
}
System.out.println(Thread.currentThread().getName()+"Thread is exit" );
}
}
public static void main(String[] args) throws InterruptedException {
LockDemo1 lockDemo1 = new LockDemo1(1);
LockDemo1 lockDemo2 = new LockDemo1(2);
Thread t1 = new Thread(lockDemo1);
Thread t2 = new Thread(lockDemo2);
t1.start();
t2.start();
Thread.sleep(1000);
t2.interrupt();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
线程t1和t2启动后,t1先占用lock1,再占用lock2;t2先占用lock2,再请求lock1。因此,很容易形成t1和t2之间的相互等待。在这里,对锁的请求,统一使用lockInterruptibly()
方法。这是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中,可以响应中断。
但请注意,相应中断并非意味着可以自行解锁。而是说而是会立即抛出一个InterruptedException
异常。
在捕获到这个异常后,线程仍然保持着之前获取的所有锁(如果有),并且需要程序员在适当的处理逻辑中手动释放这些锁。这是通过调用ReentrantLock
的unlock()
方法来实现的。
因为上述原理的存在,所以在后来,由于t2线程被中断,故t2会放弃对lock1的申请,同时释放已获得lock2。这个操作导致t1线程可以顺利得到lock2而继续执行下去。
# 申请等待限时
除了等待外部通知之外,要避免死锁还有另外一种方法,那就是限时等待。依然以约朋友打球为例,如果朋友迟迟不来,又无法联系到他。那么,在等待1~2个小时后,我想大部分人都会扫兴离去。对线程来说也是这样。通常,我们无法判断为什么一个线程迟迟拿不到锁。也许是因为死锁了,也许是因为产生了饥饿。但如果给定一个等待时间,让线程自动放弃,那么对系统来说是有意义的。我们可以使用tryLock()
方法进行一次限时的等待。
# 指定时间
以下是指定时间的等待的例子:
public class LockTime implements Runnable{
static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
try {
if(lock.tryLock(5,TimeUnit.SECONDS)){
System.out.println(Thread.currentThread().getName()+" get lock");
Thread.sleep(6000);
}else {
System.out.println(Thread.currentThread().getName()+" get lock failed");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
public static void main(String[] args) {
LockTime lockTime = new LockTime();
Thread t1 = new Thread(lockTime);
Thread t2 = new Thread(lockTime);
t1.start();
t2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
上述例子中,由于占用锁的线程会持有锁长达6秒,故另一个线程无法在5秒的等待时间内获得锁,因此,请求锁会失败。ReentrantLock.tryLock()
方法也可以不带参数直接运行。在这种情况下,当前线程会尝试获得锁,如果锁并未被其他线程占用,则申请锁会成功,并立即返回true。如果锁被其他线程占用,则当前线程不会进行等待,而是立即返回false。这种模式不会引起线程等待,因此也不会产生死锁。
# 不指定时间
不指定时间则调用方法时,不用选择对应参数
while (!lock.tryLock()) {
// 锁不可用,可以在这里做一些其他事情,如短暂休眠然后再尝试
Thread.sleep(100); // 避免过于频繁的尝试
}
try {
// 执行临界区代码
} finally {
lock.unlock(); // 不论如何都要记得在最后释放锁
}
2
3
4
5
6
7
8
9
10
注意:ReentrantLock.tryLock()
方法在返回false
后,它并不会自动再次尝试获取锁。这是一个单一的尝试获取锁的操作,如果锁不可用,则立即返回false
。如果你需要在锁不可用时不断尝试获取锁,需要在应用程序代码层面自行实现这一逻辑,例如通过循环或者其他定时机制来反复调用tryLock()
直至成功为止。
# 公平锁
在大多数情况下,锁的申请都是非公平的。也就是说,线程1首先请求了锁A,接着线程2也请求了锁A。那么当锁A可用时,是线程1可以获得锁还是线程2可以获得锁呢?这是不一定的。系统只是会从这个锁的等待队列中随机挑选一个。因此不能保证其公平性。
这就好比买票不排队,大家都乱哄哄得围在售票窗口前,售票员忙得焦头烂额,也顾不及谁先谁后,随便找个人出票就完事了。而公平的锁,则不是这样,它会按照时间的先后顺序,保证先到者先得,后到者后得。
如果我们使用synchronized
关键字进行锁控制,那么产生的锁就是非公平的。而重入锁允许我们对其公平性进行设置。它有一个如下的构造函数,它的默认参数是false,或者说它的默认创建的是非公平锁
public ReentrantLock(boolean fair) {}
当参数fair为true时,表示锁是公平的。公平锁看起来很优美,但是要实现公平锁必然要求系统维护一个有序队列,因此公平锁的实现成本比较高,性能相对也非常低下
public class LockTimeDemo1 implements Runnable{
static ReentrantLock lock = new ReentrantLock(true);
@Override
public void run() {
while (true){
try {
lock.lockInterruptibly();
System.out.println(Thread.currentThread().getName()+"get lock");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
}
public static void main(String[] args) {
LockTimeDemo1 lockTimeDemo1 = new LockTimeDemo1();
Thread t1 = new Thread(lockTimeDemo1);
Thread t2 = new Thread(lockTimeDemo1);
t1.start();
t2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
上述例子中,使用的便是公平锁,可以发现,它的输出始终是交替进行的,因为两个线程t1和t2请求这把锁是公平锁,假如t2这次没得到锁的线程,当他t1线程释放后,必定是它获得该锁。
而非公平锁则不会,使用它则可能发生同一个线程多次获得锁的情况。
# Condition条件
如果大家理解了Object.wait()和Object.notify()方法的话,那么就能很容易地理解Condition对象了。它和wait()和notify()方法的作用是大致相同的。但是wait()和notify()方法是和synchronized关键字合作使用的。
而Condtion是与重入锁相关联的。通过Lock接口(重入锁就实现了这一接口)的Condition newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。
以下是一个简单的例子:
public class LockConditionD implements Runnable{
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
@Override
public void run() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+"get lock");
condition.await();
System.out.println("Thread is going on");
}catch(InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
LockConditionD lcd = new LockConditionD();
Thread t1 = new Thread(lcd);
t1.start();
Thread.sleep(1000);
System.out.println("Thread is waiting");
lock.lock();
condition.signal();
//唤醒后,如果不释放锁,尽管t1已经被唤醒 但仍无法继续执行
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
和Object.wait()
和notify()
方法一样,当线程使用Condition.await()
时,要求线程持有相关的重入锁,在Condition.await()调用后,这个线程会释放这把锁。同理,
在Condition.signal()
方法调用时,也要求线程先获得相关的锁。在signal()方法调用后,系统会从当前Condition对象的等待队列中,唤醒一个线程。一旦线程被唤醒,它会重新尝试获得与之绑定的重入锁,一旦成功获取,就可以继续执行了。因此,在signal()方法调用之后,一般需要释放相关的锁,谦让给被唤醒的线程,让它可以继续执行。
# 升级-更准确的唤醒
以下是通过Condition实现的3个线程,按顺序轮流打印的demo。
public class OneByOne {
private static int num = 1;
private ReentrantLock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private void printA(){
lock.lock();
try {
while (num!=1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"执行 ---- > 启动线程B");
num =2;
condition2.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
private void printB(){
lock.lock();
try {
while (num!=2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"执行---- >启动线程C");
num=3;
condition3.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
private void printC(){
lock.lock();
try {
while (num!=3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"执行---- >启动线程A");
num=1;
condition1.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
OneByOne oneByOne = new OneByOne();
Thread a1 = new Thread(()-> {
for (int i = 0; i < 3; i++) {
oneByOne.printA();
}
},"线程A");
Thread a2 = new Thread(()-> {
for (int i = 0; i < 3; i++) {
oneByOne.printB();
}
},"线程B");
Thread a3 = new Thread(()-> {
for (int i = 0; i < 3; i++) {
oneByOne.printC();
}
},"线程C");
a1.start();
a2.start();
a3.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
一个lock可以有多个Condition对象。
# 注意
调用signal()
方法时,它需要确保唤醒的线程能够顺利地从等待状态转为可运行状态,并最终重新获得锁以继续执行。如果不对调用signal()
方法的线程做锁的要求,那么:
- 无法保证线程之间的同步关系,因为唤醒线程的过程涉及到对等待队列的修改,这是一项关键性的并发操作,没有锁,意味着对于等待队列的修改可能是并发的,是不安全的。这将导致队列的状态变得不确定,甚至可能出现数据竞争或丢失信号的情况。
- 若没有持有锁就调用
signal()
,被唤醒的线程可能会在没有锁保护的情况下尝试重新获取锁,这可能导致竞态条件,甚至数据不一致。 - Java的并发包
java.util.concurrent.locks
的设计规定,只有当线程持有相关联的锁时,才能调用Condition
对象的方法。这也是为了避免产生不符合预期的并发行为。
因此,调用signal()
方法前,必须确保调用线程已获取了与Condition
对象关联的锁。这样才能保证整个等待-通知机制的正确性和一致性。调用后,也需要释放对应的锁。
# synchronize和lock
这里简单的总结下,synchronized 和 lock的区别。这里的lock指的是 java.util.concurrent.locks
包下的锁,该包下的锁不仅有上文提到的ReentrantLock-可重入锁,还有其他的一些锁,比如读写锁
、偏向锁
等等,这些后文再介绍。
- 锁的释放
synchronized:自动释放,锁在同步代码块或方法执行完毕后自动释放,即使发生异常也会自动释放。
Lock:需要手动释放(如ReentrantLock),需要显式地调用 unlock()
方法来释放锁,通常在 finally
块中进行,以确保即使发生异常也能释放锁。
- 性能区别
synchronized是托管给JVM执行的,而lock是java写的控制锁的代码。在Java1.5中,synchronized是性能很低效的。因为这是一个重量级操作。需要调用操作接口,导致有可能加锁消耗的系统时间比加锁以外的操作还多。相比之下使用java提供的lock对象,性能更高一些。但是到了JDK1.6,发生了变化。synchronize在语义上很清晰,可以进行很多优化,有适应自旋,锁消除,锁粗化,轻量级锁,偏向锁等等。导致在Java1.6上synchronize的性能并不比Lock差。官方也表示,他们也更支持synchronize,在未来的版本中还有优化余地。
- 用途区别
synchronized原语和ReentrantLock在一般情况下没有什么区别,但是在非常复杂的同步应用中,请考虑使用ReentrantLock,特别是遇到下面2种需求的时候。
- 某个线程在等待一个锁的控制权的这段时间需要中断。
- 需要分开处理一些wait(等待)-notify(唤醒),ReentrantLock里面的Condition应用(Condition定义了等待await/通知signal、signalAll两种类型的方法),能够控制notify哪个线程。
- 具有公平锁功能,每个到来的线程都将排队等候。
简单总结下,就是Synchronized 适合锁少量的代码同步问题,Lock适合锁大量同步代码问题。
最后,对于ReentrantLock,它默认是非公平锁,即创建lock时不加参数
- 公平锁:十分公平,必须先来后到;
- 非公平锁:十分不公平,可以插队; (默认为非公平锁)
# 探究-Lock
Lock,这里代指java.util.concurrent.locks
包下所有锁。lock锁通常来说它所的就是对象
。
更详细一些就是,调用该lock的对象。
那lock也可以锁类吗?(其实类也可以说是Class对象,因此我们常说锁——锁的是对象)。
- 当使用
synchronized
关键字对静态方法或代码块加锁时,这实际上是对类的锁,也即对类的 Class 对象加锁。在ReentrantLock
中,可以通过创建一个静态的ReentrantLock
实例来达到类似的效果。
public class MyClass {
private static final ReentrantLock lock = new ReentrantLock();
public static void myStaticMethod() {
lock.lock();
try {
// 同步代码块
} finally {
lock.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
# 信号量(Semaphore)
信号量为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。
信号量主要提供了以下构造函数:
public Semaphore(int permits)
public Semaphore(int permits, boolean fair) //第二个参数可以指定是否公平
2
在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。
当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。
信号量的主要逻辑方法有:
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()
2
3
4
5
acquire()
方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。
acquireUninterruptibly()
方法和acquire()方法类似,但是不响应中断。tryAcquire()
尝试获得一个许可,如果成功返回true,失败则返回false,它不会进行等待,立即返回。
release()
用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
以下是一个简单的例子:
public class SemaphoreLock implements Runnable{
final Semaphore semaphore = new Semaphore(5);
@Override
public void run() {
try {
semaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"has done");
semaphore.release();
}catch (InterruptedException e){
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
final SemaphoreLock semaphoreLock = new SemaphoreLock();
for (int i = 0; i < 20; i++) {
executorService.submit(semaphoreLock);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上述,同时开启20个线程。观察这段程序的输出,你就会发现系统以5个线程一组为单位,依次输出带有线程名称的提示文本。
pool-1-thread-4has done
pool-1-thread-5has done
pool-1-thread-1has done
pool-1-thread-2has done
pool-1-thread-3has done
pool-1-thread-7has done
pool-1-thread-9has done
pool-1-thread-6has done
pool-1-thread-8has done
pool-1-thread-10has done
pool-1-thread-13has done
pool-1-thread-11has done
pool-1-thread-14has done
pool-1-thread-12has done
pool-1-thread-15has done
pool-1-thread-19has done
pool-1-thread-18has done
pool-1-thread-20has done
pool-1-thread-16has done
pool-1-thread-17has done
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
总结:信号量常用于限制资源池的大小、控制并发用户数量、或是分布式系统中控制对共享资源的访问等场景。
# 读写锁
读写锁(Read-Write Lock)是一种特殊的锁机制,它允许在同一时刻有多个读取者共享访问资源,而在同一时间仅允许一个写入者访问资源。这种锁的设计之初是为了提高读取密集型场景的并发性能,因为它允许多个线程同时读取共享资源,而不需要相互阻塞。
虽然它减少了读与读之间的阻塞,但是,考虑到数据完整性,写写操作和读写操作间依然是需要相互等待和持有锁的,或者说阻塞的。
如果在系统中,读操作次数远远大于写操作,则读写锁就可以发挥最大的功效,提升系统的性能。
它们的创建如下:
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
2
3
4
以下是一个简单的例子,可以更换注释的部分来查看读写锁和重入锁的各自耗时:
public class ReadWriteLock {
private final Lock lock = new ReentrantLock();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
int value;
public Object handleRead(Lock lock){
try{
lock.lock();
Thread.sleep(1000);
return value;
} catch (InterruptedException e) {
e.printStackTrace();
return value;
} finally {
lock.unlock();
}
}
public void handleWrite(Lock lock,int index) throws InterruptedException {
try{
lock.lock();
Thread.sleep(1000);
value = index;
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
final ReadWriteLock demo = new ReadWriteLock();
Runnable readRunnable = new Runnable() {
@Override
public void run() {
try {
demo.handleRead(demo.readLock);
//demo.handleRead(demo.lock);
}finally {
}
}
};
Runnable writeRunnabel = new Runnable() {
@Override
public void run() {
try {
demo.handleWrite(demo.writeLock,new Random().nextInt());
//demo.handleWrite(demo.lock,new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10; i++) {
new Thread(readRunnable).start();
}
for (int i = 0; i < 2; i++) {
new Thread(writeRunnabel).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 倒计时器-CountDownlatch
CountDownLatch是Java并发库(java.util.concurrent包)中的一种同步工具类,它允许一个或多个线程等待其他线程完成它们的工作后再继续执行。CountDownLatch的核心功能体现在一个计数器上,该计数器只能递减且不可重置。
基本原理和构造: 当你创建一个CountDownLatch实例时,需要传入一个int类型的初始计数值。每当一个线程完成自己的工作后,它会调用countDown()
方法将计数器减1。当计数器的值变为0时,那些调用了await()
方法并正在等待的线程会被唤醒并继续执行。
使用场景:
- 主线程等待多个子线程完成各自任务后才继续执行后续操作。
- 在多阶段任务中,某个阶段等待前面所有阶段完成后才能启动。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SimpleCountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个CountDownLatch,计数值为3,表示我们需要等待3个线程完成
CountDownLatch latch = new CountDownLatch(3);
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 提交三个任务到线程池
for (int i = 0; i < 3; i++) {
executorService.submit(new Task(latch));
}
// 主线程等待所有子线程完成
latch.await();
// 所有子线程完成后,关闭线程池
executorService.shutdown();
System.out.println("All tasks have completed!");
}
static class Task implements Runnable {
private final CountDownLatch latch;
Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
// 模拟一些耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 操作完成后,通知主线程
latch.countDown();
System.out.println(Thread.currentThread().getName() + " has finished its task.");
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
在这个示例中,主线程创建了一个CountDownLatch实例,计数值为3,表示要等待3个子线程完成。然后,向线程池提交了3个Task
实例作为任务,每个任务完成后调用latch.countDown()
。主线程通过调用latch.await()
阻塞等待所有子线程完成。当所有子线程执行完各自的run()
方法并通过countDown()
方法将计数器减至0时,主线程才会从await()
方法返回并继续执行后续操作。
# 循环栅栏:CyclicBarrier
CyclicBarrier(循环屏障)也是Java并发编程中的一个同步工具类,位于java.util.concurrent
包中。与CountDownLatch不同,CyclicBarrier主要用于让一组线程在完成各自任务后,聚集在一起等待,直到所有线程都达到某个约定的屏障点(也称为同步点),然后一起继续执行下一步操作。
CyclicBarrier的特点:
- 循环使用:CyclicBarrier的名字来源于它可以重置并循环使用,一旦所有等待线程都到达了屏障点,屏障就会重置以便下次使用。
- 计数机制:CyclicBarrier在创建时需要设定一个parties参数,表示需要等待的线程数量。每有一个线程调用
await()
方法,计数器就会减一。当计数器减至0时,所有等待的线程将被释放并继续执行。 - 预定义动作:CyclicBarrier允许在所有线程到达屏障点后,执行一个预定义的Runnable任务(可选)。这个动作只会被执行一次,并且在所有线程都被释放之前执行。
- 中断和超时:CyclicBarrier的
await()
方法支持中断和超时机制,如果线程在等待期间被中断或者等待超时,将抛出InterruptedException或TimeoutException。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
static class Worker implements Runnable {
private final CyclicBarrier barrier;
Worker(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
// 模拟线程执行任务
System.out.println(Thread.currentThread().getName() + " is working...");
Thread.sleep(1000); // 假设这是耗时的任务
// 到达屏障点,等待其他线程
barrier.await();
System.out.println(Thread.currentThread().getName() + " passed the barrier and continues execution.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int parties = 5; // 需要等待的线程数量
CyclicBarrier barrier = new CyclicBarrier(parties);
ExecutorService executor = Executors.newFixedThreadPool(parties);
for (int i = 0; i < parties; i++) {
executor.submit(new Worker(barrier));
}
// 关闭线程池
executor.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
在这个示例中,我们创建了一个CyclicBarrier实例,初始化它的parties为5,表示需要等待5个线程都调用await()
方法。当这5个Worker线程都执行到barrier.await()
时,它们都会被阻塞住,直到最后一个线程也到达await()
,然后所有线程一起恢复执行。
# 线程阻塞工具类:LockSupport
LockSupport
是Java并发包(java.util.concurrent.locks)中的一个工具类,它提供了底层的线程阻塞和唤醒操作。LockSupport主要用于构建锁和其他同步组件,而不是直接供应用程序使用。它通过底层的Unsafe类以及JNI调用操作系统原语来实现线程间的阻塞和唤醒,这比直接使用synchronized
关键字或者Thread.sleep()
等方法更为灵活和低级。
主要方法如下:
park()
: 阻塞当前线程。如果调用此方法的线程已经获得了许可(通过unpark()
方法给予),则立即返回;否则,线程将会被禁足(无法参与CPU调度),直到其他线程通过unpark()
方法给它发放许可。unpark(Thread thread)
: 给定线程发放许可。如果目标线程正处于parked状态,那么它将被唤醒。即使目标线程当前并未处于park状态,该许可仍会被记录下来,因此目标线程在其后的某次park调用中会立即返回。parkNanos(Object blocker, long nanos)
: 类似于park()
,但带有超时时间,以纳秒为单位。如果在指定时间内未获得许可,则线程会自动解除阻塞。parkUntil(Object blocker, long deadline)
: 类似于parkNanos()
,但超时时间是以系统绝对时间(从1970年1月1日 UTC 开始的毫秒数)为准。
这些方法允许程序员更细粒度地控制线程的阻塞和唤醒,这对于实现自定义的同步工具非常有用。例如,在实现Semaphore、Condition或者其他复杂的并发控制结构时,常常会用到LockSupport。同时,传递给park和unpark方法的blocker对象主要用于监控和诊断目的,它可以帮助分析工具识别哪个线程因为什么原因被阻塞。
待补充