生产者和消费者
# 生产者和消费者
在 Java 中,生产者-消费者模式是一种典型的多线程同步问题,用于解决生产者线程和消费者线程之间的通信和同步问题。这种模式的核心思想是通过一个共享的数据结构(通常是队列)来协调生产者和消费者之间的操作。
在面试中经常会问到这个问题,并要求手写代码,为了让我们更好的理解并发和锁,以及更好的应对面试,下面就一起来看看对应的代码是如何的。
注:以下代码仅供参考,毕竟正确的代码不止一个。
# synchronized版
public class ConsumerAndCreater {
public static int num = 0;
public synchronized void consumer() throws InterruptedException {
while (num==0){
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName()+"此时 num "+ num);
this.notify();
}
public synchronized void create( ) throws InterruptedException {
while (num==1){
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName()+"此时 num "+ num);
this.notify();
}
public static void main(String[] args) throws InterruptedException{
ConsumerAndCreater cc = new ConsumerAndCreater();
Thread a1 = new Thread(()-> {
try {
for (int i = 0; i < 3; i++) {
cc.consumer();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"消费者");
Thread a2 = new Thread(()-> {
try {
for (int i = 0; i < 3; i++) {
cc.create();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"生产者");
a1.start();
a2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
运行效果:
生产者此时 num 1
消费者此时 num 0
生产者此时 num 1
消费者此时 num 0
生产者此时 num 1
消费者此时 num 0
2
3
4
5
6
这段代码正确实现了生产者-消费者模式。生产者和消费者通过 wait()
和 notify()
方法进行同步,确保在合适的情况下进行生产和消费。通过 synchronized
关键字确保对共享资源的访问是线程安全的。
# 虚假唤醒
一个问题,在上述代码中,为什么不用if 判断,而是用while判断?
答案:因为这可能会导致虚假唤醒。
# 虚假唤醒的例子
假设一个队列最多容纳 10 个元素,两个消费者线程同时等待队列非空:
- 初始队列为空,消费者 A 和 B 均进入
wait()
状态。 - 生产者向队列添加一个元素,并调用
notifyAll()
唤醒所有消费者。 - 消费者 A 被唤醒,消费元素后队列再次变空。
- 消费者 B 被虚假唤醒,若使用
if
判断队列是否为空,B 会直接执行remove()
操作,引发NoSuchElementException
;若使用while
,B 会重新检查条件并继续等待
- 总结
虚假唤醒(Spurious Wakeup) 是指线程在等待某个条件变量时(如调用 Object.wait()
或 Condition.await()
),未收到明确通知(如 notify()
或 signal()
)却被意外唤醒,而此时条件实际上并未满足。这种现象可能导致线程执行逻辑错误或数据不一致。
- 解决
那如何解决虚假唤醒呢?上述例子已经给出来了,使用while
替代if
。
# wait()的效果
当你在同步上下文中调用 this.wait();
时,当前线程将会释放对象的锁并进入等待状态,直到另一个线程通过调用 notify()
或 notifyAll()
方法来唤醒它。一旦 wait()
被调用,当前线程会暂停执行,并且不会继续执行 wait()
后面的任何代码,直到它被其他线程唤醒并且重新获取到该对象的锁。
# 注意事项
wait()
方法:使当前线程释放锁并进入等待状态,不会继续执行wait()
方法后面的代码,直到被唤醒。notify()
方法:唤醒一个在当前对象上等待的线程,但不会立即释放锁。当前线程会继续持有锁,直到synchronized
块或方法的执行完成。- 锁的释放:当前线程在
synchronized
块或方法的执行完成后释放锁,被唤醒的线程有机会重新竞争锁并继续执行。
在生产者和消费者模型,关于wait()
和notify()
的使用上,可以记住以下口诀:
不符合条件时,应该使用wait(),而符合条件时,应该使用notify().
# Lock版
然而在现实开发中,lock锁用得更为频繁,因为它更灵活,Lock
接口提供了更丰富的锁操作,如 tryLock()
、lockInterruptibly()
等,可以实现更复杂的锁机制。
public class LockCandC {
public static int num = 0;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private void consumer(){
lock.lock();
try {
while (num!=0){
//System.out.println(Thread.currentThread().getName()+"等待");
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName()+num);
condition.signal();
}catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
lock.unlock();
}
}
private void producer(){
lock.lock();
try {
while (num==0){
condition.await();
}
num++;
condition.signal();
System.out.println(Thread.currentThread().getName()+num);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
LockCandC lockCandC = new LockCandC();
Thread a1 = new Thread(()-> {
for (int i = 0; i < 5; i++) {
lockCandC.consumer();
}
},"消费者");
Thread a2 = new Thread(()-> {
for (int i = 0; i < 5; i++) {
lockCandC.producer();
}
},"生产者");
a1.start();
a2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
上面就是一个使用ReentrantLock(重入锁)实现的一个生产消费者,其中的Condition你可能会很陌生,不过看代码结构,你应该能猜到它的作用跟notify()和wait()类似。
# Condition
Java并发编程中,Condition
是 java.util.concurrent.locks
包中的一个重要接口,它提供了比 synchronized
块中的 wait()
和 notify()
方法更强大和灵活的线程同步机制。Condition
允许一个或多个线程等待某个特定条件的发生,从而实现更细粒度的线程同步。
# Condition
的主要特点
- 多条件支持:
- 一个
Lock
对象可以关联多个Condition
实例,每个Condition
实例可以有自己的等待队列,从而实现更细粒度的线程同步。
- 一个
- 灵活的等待和通知:
Condition
提供了多种等待和通知方法,如await()
、awaitUninterruptibly()
、awaitNanos(long nanosTimeout)
、awaitUntil(Date deadline)
等,允许线程在等待时处理中断、超时等情况。
- 可选的公平性:
- 与
ReentrantLock
一样,Condition
也可以选择是否使用公平锁,从而影响线程的等待和唤醒顺序。
- 与
总结一下就是——通过使用 Condition
,可以实现更细粒度的线程同步,适用于复杂的多线程同步场景。下面就用它来实现一个3个线程,T1,T2,T3轮流打印字符串的例子。
# 3个线程轮流顺序打印
public class OneByOne {
private static int num = 1;
private ReentrantLock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private void printA(){
lock.lock();
try {
while (num!=1){
condition1.await();
}
System.out.println("AAAAA");
System.out.println(Thread.currentThread().getName()+"执行,启动线程B");
num =2;
condition2.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
private void printB(){
lock.lock();
try {
while (num!=2){
condition2.await();
}
System.out.println("BBBBB");
System.out.println(Thread.currentThread().getName()+"执行,启动线程C");
num=3;
condition3.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
private void printC(){
lock.lock();
try {
while (num!=3){
condition3.await();
}
System.out.println("CCCCC");
System.out.println(Thread.currentThread().getName()+"执行,启动线程A");
num=1;
condition1.signal();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
OneByOne oneByOne = new OneByOne();
Thread a1 = new Thread(()-> {
for (int i = 0; i < 3; i++) {
oneByOne.printA();
}
},"线程A");
Thread a2 = new Thread(()-> {
for (int i = 0; i < 3; i++) {
oneByOne.printB();
}
},"线程B");
Thread a3 = new Thread(()-> {
for (int i = 0; i < 3; i++) {
oneByOne.printC();
}
},"线程C");
a1.start();
a2.start();
a3.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
运行结果:
AAAAA
线程A执行,启动线程B
BBBBB
线程B执行,启动线程C
CCCCC
线程C执行,启动线程A
AAAAA
线程A执行,启动线程B
BBBBB
线程B执行,启动线程C
CCCCC
线程C执行,启动线程A
AAAAA
线程A执行,启动线程B
BBBBB
线程B执行,启动线程C
CCCCC
线程C执行,启动线程A
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
解析:
它能实现准确通知的关键在于,condition。如果你在一个 Condition
上调用了 await()
方法,那么必须在同一 Condition
上调用 signal()
方法来唤醒等待的线程。这是因为 Condition
的等待队列是与特定的 Condition
实例绑定的。
在代码上来看就是,如果用实例condition1
执行了await()方法,那么唤醒,也需要用condition1
执行signal()。
而其中的num共享变量,则是为了实现线程的精准执行。因为线程的调度是由操作系统决定
的,你唤醒了线程,它并非一定要执行,如果有其他线程也处于唤醒状态,可能是其他线程先执行。当然这里只有三个子线程,不存在该问题。该问题这里就不展开了。
因此这里通过对num的值的判断实现了,精确控制。
扩展:
如果没有num的值判断,或者对共享变量的判断-选择。它们可能出现死锁的情况。
condition1.await();
condition2.signal();
2
因为没有具体的控制条件,尽管condition1让出了锁,接下来尝试唤醒condition2,但实际上锁可能被condition3拿到了,但它没被唤醒,因此程序就会进入死锁