在每篇文章后面加2个换行,方便使用cat命令合并成一个文件的时候不会影响目录的生成
13 KiB
81. 并发工具优于 wait 和 notify
本书第 1 版中专门用了一个条目来说明如何正确地使用 wait
和 notify
( Bloch01,详见第 50 条) 。它提出的建议仍然有效,并且在本条目的最后也对此做了概述,但是这条建议现在远远没有之前那么重要了。这是因为几乎没有理由再使用 wait
和 notify
了。自从 Java 5 发行版本开始, Java 平台就提供了更高级的并发工具,它们可以完成以前必须在 wait
和 notify
上手写代码来完成的各项工作。 既然正确地使用 wait 和 notify 比较困难,就应该用更高级的并发工具来代替。
java.util.concurrent
中更高级的工具分成三类: Executor Framework 、并发集合(Concurrent Collection)以及同步器(Synchronizer),Executor Framework 只在第 80 条中简单地提到过,并发集合和同步器将在本条目中进行简单的阐述。
并发集合为标准的集合接口(如 List
、Queue
和 Map
)提供了高性能的并发实现。为了提供高并发性,这些实现在内部自己管理同步(详见第 79 条) 。因此, 并发集合中不可能排除并发活动;将它锁定没有什么作用,只会使程序的速度变慢。
因为无法排除并发集合中的并发活动,这意味着也无法自动地在并发集合中组成方法调用。因此,有些并发集合接口已经通过依赖状态的修改操作(state-dependent modify operation)进行了扩展,它将几个基本操作合并到了单个原子操作中。事实证明,这些操作在并发集合中已经够用,它们通过缺省方法(详见第 21 条)被加到了 Java 8 对应的集合接口中。
例如, Map
的 putIfAbsent(key, value)
方法,当键没有映射时会替它插入一个映射,并返回与键关联的前一个值,如果没有这样的值,则返回 null 。这样就能很容易地实现线程安全的标准 Map
了。例如,下面这个方法模拟了 String.intern
的行为:
// Concurrent canonicalizing map atop ConcurrentMap - not optimal
private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
public static String intern(String s) {
String previousValue = map.putIfAbsent(s, s);
return previousValue == null ? s : previousValue;
}
事实上,你还可以做得更好。ConcurrentHashMap
对获取操作(如 get)进行了优化。因此,只有当 get
表明有必要的时候,才值得先调用 get
,再调用 putIfAbsent
:
// Concurrent canonicalizing map atop ConcurrentMap - faster!
public static String intern(String s) {
String result = map.get(s);
if (result == null) {
result = map.putIfAbsent(s, s);
if (result == null)
result = s;
}
return result;
}
ConcurrentHashMap
除了提供卓越的并发性之外,速度也非常快。在我的机器上,上面这个优化过的 intern
方法比 String.intern
快了不止 6 倍(但是记住, String.intern
必须使用某种弱引用,避免随着时间的推移而发生内存泄漏)。并发集合导致同步的集合大多被废弃了。比如, 应该优先使用 ConcurrentHashMap
,而不是使用 Collections.synchronizedMap
。 只要用并发 Map 替换同步 Map ,就可以极大地提升并发应用程序的性能。
有些集合接口已经通过阻塞操作(blocking operation )进行了扩展,它们会一直等待(或者阻塞)到可以成功执行为止。例如, BlockingQueue
扩展了 Queue
接口,并添加了包括 take
在内的几个方法,它从队列中删除并返回了头元素,如果队列为空,就等待。这样就允许将阻塞队列用于工作队列(work queue),也称作生产者一消费者队列 (producer-consumer queue),一个或者多个生产者线程(producer thread)在工作队列中添加工作项目,并且当工作项目可用时,一个或者多个消费者线程(consumer thread )则从工作队列中取出队列并处理工作项目。不出所料,大多数 ExecutorService
实现(包括 ThreadPoolExecutor
)都使用了一个 BlockingQueue
(详见第 80 条) 。
同步器(Synchronizer)是使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是 CountDownLatch
和 Semaphore
。较不常用的是 CyclicBarrier
和 Exchanger
。功能最强大的同步器是 Phaser
。
倒计数锁存器(Countdown Latch)是一次性的障碍,允许一个或者多个线程等待一个或者多个其他线程来做某些事情。CountDownLatch
的唯一构造器带有一个 int 类型的参数,这个 int 参数是指允许所有在等待的线程被处理之前,必须在锁存器上调用 countDown
方法的次数。
要在这个简单的基本类型之上构建一些有用的东西,做起来是相当容易。例如,假设想要构建一个简单的框架,用来给一个动作的并发执行定时。这个框架中只包含单个方法,该方法带有一个执行该动作的 executor ,一个并发级别(表示要并发执行该动作的次数),以及表示该动作的 runnable 。所有的工作线程( worker thread )自身都准备好,要在 timer 线程启动时钟之前运行该动作。当最后一个工作线程准备好运行该动作时, timer 线程就「发射发令枪(fires the starting gun)」,同时允许工作线程执行该动作。一旦最后一个工作线程执行完该动作,timer 线程就立即停止计时。直接在 wait 和 notify 之上实现这个逻辑会很棍乱,而在 CountDownLatch
之上实现则相当简单:
// Simple framework for timing concurrent execution
public static long time(Executor executor, int concurrency,
Runnable action) throws InterruptedException {
CountDownLatch ready = new CountDownLatch(concurrency);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.execute(() -> {
ready.countDown();
// Tell timer we're ready
try {
start.await();
// Wait till peers are ready
action.run();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
done.countDown();
// Tell timer we're done
}
});
}
ready.await();
// Wait for all workers to be ready
long startNanos = System.nanoTime();
start.countDown();
// And they're off!
done.await();
// Wait for all workers to finish
return System.nanoTime() - startNanos;
}
注意这个方法使用了三个倒计数锁存器。第一个是 ready
,工作线程用它来告诉 timer
线程它们已经准备好了。然后工作线程在第二个锁存器 start
上等待。当最后一个工作线程调用 ready.countDown
时, timer
线程记录下起始时间,并调用 start.countDown
允许所有的工作线程继续进行。然后 timer 线程在第三个锁存器 done
上等待,直到最后一个工作线程运行完该动作,并调用 done.countDown
。一旦调用这个, timer
线程就会苏醒过来,并记录下结束的时间。
还有一些细节值得注意。传递给 time 方法的 executor 必须允许创建至少与指定并发级别一样多的线程,否则这个测试就永远不会结束。这就是线程饥饿死锁(thread starvationdeadlock) [Goetz06, 8.1.1] 。如果工作线程捕捉到 InterruptedException
,就会利用习惯用法 Thread.currentThread().interrupt()
重新断言中断,并从它的 run
方法中返回。这样就允许 executor 在必要的时候处理中断,事实上也理应如此。注意,我们利用了 System.nanoTime
来给活动定时。对于间歇式的定时,始终应该优先使用 System.nanoTime
,而不是使用 System.currentTimeMillis
。因为 System.nanoTime
更准确,也更精确,它不受系统的实时时钟的调整所影响。最后,注意本例中的代码并不能进行准确的定时,除非 action 能完成一定量的工作,比如一秒或者一秒以上。众所周知,准确的微基准测试十分困难,最好在专门的框架如 jmh 的协助下进行[JMH] 。
本条目仅仅触及了并发工具的一些皮毛。例如,前一个例子中的那三个倒计数锁存器其实可以用一个 CyclicBarrier
或者 Phaser
实例代替。这样得到的代码更加简洁,但是理解起来比较困难。虽然你始终应该优先使用并发工具,而不是使用 wait 方法和 notify 方法,但可能必须维护使用了 wait 方法和 notify 方法的遗留代码。wait 方法被用来使线程等待某个条件。它必须在同步区域内部被调用,这个同步区域将对象锁定在了调用 wait 方法的对象上。下面是使用 wait 方法的标准模式:
// The standard idiom for using the wait method
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
// (Releases lock, and reacquires on wakeup)
... // Perform action appropriate to condition
}
始终应该使用 wait 循环模式来调用 wait 方法;永远不要在循环之外调用 wait 方法。循环会在等待之前和之后对条件进行测试。
在等待之前测试条件,当条件已经成立时就跳过等待,这对于确保活性是必要的。如果条件已经成立,并且在线程等待之前, notify
(或者 notifyAll
)方法已经被调用, 则无法保证该线程总会从等待中苏醒过来。
在等待之前测试条件,如果条件不成立的话继续等待,这对于确保安全性是必要的。当条件不成立的时候,如果线程继续执行,则可能会破坏被锁保护的约束关系。当条件不成立时,有以下理由可使一个线程苏醒过来:
- 另一个线程可能已经得到了锁,并且从一个线程调用
notify
方法那一刻起,到等待线程苏醒过来的这段时间中,得到锁的线程已经改变了受保护的状态。 - 条件并不成立,但是另一个线程可能意外地或恶意地调用了
notify
方法。在公有可访问的对象上等待,这些类实际上把自己暴露在了这种危险的境地中。公有可访问对象的同步方法中包含的wait
方法都会出现这样的问题。 - 通知线程( notifying thread )在唤醒等待线程时可能会过于「慷慨」 。例如,即使只有某些等待线程的条件已经被满足,但是通知线程可能仍然调用
notifyAll
方法。 - 在没有通知的情况下,等待线程也可能(但很少)会苏醒过来。这被称为“伪唤醒”(spurious wakeup) [POSIX, 11.4.3.6.1; Java9-api] 。
一个相关的话题是,为了唤醒正在等待的线程,你应该使用 notify
方法还是 notifyAll
方法(回忆一下, notify 方法唤醒的是单个正在等待的线程,假设有这样的线程存在,而 notifyAll
方法唤醒的则是所有正在等待的线程) 。一种常见的说法是,应该始终使用 notifyAll
方法。这是合理而保守的建议。它总会产生正确的结果,因为它可以保证你将会唤醒所有需要被唤醒的线程。你可能也会唤醒其他一些线程,但是这不会影响程序的正确性。这些线程醒来之后,会检查它们正在等待的条件,如果发现条件并不满足,就会继续等待。
从优化的角度来看,如果处于等待状态的所有线程都在等待同一个条件,而每次只有一个线程可以从这个条件中被唤醒,那么你就应该选择调用 notify
方法,而不是 notifyAll
方法。
即使这些前提条件都满足,也许还是有理由使用 notifyAll
方法而不是 notify
方法。就好像把 wait
方法调用放在一个循环中,以避免在公有可访问对象上的意外或恶意的通知一样,与此类似,使用 notifyAll
方法代替 notify
方法可以避免来自不相关线程的意外或恶意的等待。否则,这样的等待会“吞掉”一个关键的通知,使真正的接收线程无限地等待下去。
简而言之,直接使用 wait
方法和 notify
方法就像用“并发汇编语言”进行编程一样,而 java.util.concurrent
则提供了更高级的语言。 没有理由在新代码中使用 wait
方法和 notify
方法,即使有,也是极少的。 如果你在维护使用 wait
方法和 notify
方法的代码,务必确保始终是利用标准的模式从 while
循环内部调用 wait
方法。一般情况下,应该优先使用 notifyAll
方法,而不是使用 notify
方法。如果使用 notify
方法,请一定要小心,以确保程序的活性。