11.5.3 高级同步对象
使用java.util.concurrent.atomic包和java.util.concurrent.locks包提供的Java类可以满足基本的互斥和同步访问的需求,但是这些Java类的抽象层次较低,使用起来比较复杂。对于一般的程序来说,更简单的做法是使用java.util.concurrent包中的高级同步对象。
本节主要介绍的是java.util.concurrent包提供的满足常见需求的高级同步对象,包括进行资源管理的信号量、协调不同线程运行顺序的倒数闸门和循环屏障,以及进行数据交换的对象交换器等。如果程序中的多个线程之间的交互方式满足这些高级同步对象所适用的场景,那么使用这些同步对象可以大大提高开发效率。
1.信号量
对于信号量的概念,开发人员可能并不陌生。信号量在操作系统中一般用来管理数量有限的资源。每类资源有一个对应的信号量。信号量的值表示资源的可用数量。在使用资源时,要先从该信号量上获取一个使用许可。成功获取许可之后,资源的可用数减1。在持有许可期间,使用者可以对获取的资源进行操作。完成对资源的使用之后,需要在信号量上释放一个使用许可,资源的可用数加1,允许其他使用者获取资源。当资源可用数为0的时候,需要获取资源的线程以阻塞的方式来等待资源变为可用,或者过段时间之后再检查资源是否变为可用。代码清单11-14中的SimpleResourceManager类实际上是信号量的一个简单实现。如果需要在程序中使用信号量,更好的方式是直接使用java.util.concurrent.Semaphore类。在创建Semaphore类的对象时指定资源的可用数,通过acquire方法以阻塞式的方式获取许可,而tryAcquire方法以非阻塞式的方式来获取许可。当需要释放许可时,使用release方法。Semaphore类也支持同时获取和释放多个资源的许可。通过acquire方法获取许可的过程是可以被中断的。如果不希望被中断,那么可以使用acquireUninterruptibly方法。
代码清单11-15给出了使用Semaphore类管理程序中可用的打印机的示例。在创建PrinterManager类的对象时,需要指定程序中所有可用Printer类的对象的集合。根据可用的Printer类的对象的数目创建出Semaphore类的对象。Semaphore类也支持在分配许可时使用公平模式,通过把构造方法的第二个参数值设为true来使用该模式。在公平模式下,当资源可用时,等待线程按照调用acquire方法申请资源的顺序依次获取许可。在进行资源管理时,一般使用公平模式,以避免造成线程饥饿问题。需要注意的是访问内部变量printers的方法都通过synchronized关键词声明为同步的。这是因为Semaphore类只是一个资源数量的抽象表示,并不负责管理资源对象本身,可能有多个线程同时获取到资源使用许可,因此需要使用同步机制避免数据竞争。
代码清单11-15 使用信号量管理资源的示例
public class PrinterManager{
private final Semaphore semaphore;
private final List<Printer>printers=new ArrayList<>();
public PrinterManager(Collection<?extends Printer>printers){
this.printers.addAll(printers);
this.semaphore=new Semaphore(this.printers.size(),true);
}
public Printer acquirePrinter()throws InterruptedException{
semaphore.acquire();
return getAvailablePrinter();
}
public void releasePrinter(Printer printer){
putBackPrinter(printer);
semaphore.release();
}
private synchronized Printer getAvailablePrinter(){
Printer result=printers.get(0);
printers.remove(0);
return result;
}
private synchronized void putBackPrinter(Printer printer){
printers.add(printer);
}
}
2.倒数闸门
在多个线程进行协作时,一个常见的情景是一个线程需要等待另外的线程完成某些任务之后才能继续进行。在这种情况下,可以使用java.util.concurrent.CountDownLatch类。CountDownLatch类相当于多个线程等待开启的一个闸门。只有在其他线程完成任务之后,闸门才会打开,等待的线程才能运行。在创建CountDownLatch类的对象时需要指定等待完成的任务数目。一个CountDownLatch类的对象被执行任务的线程和等待任务完成的线程所共享。当执行任务的线程完成其任务时,调用countDown方法来使待完成的任务数量减1。等待任务完成的线程通过调用await方法进入阻塞状态直到待完成的任务数量变为0。当所有任务都完成时,等待任务完成的线程会从await方法返回,可以继续执行后续的代码。CountDownLatch类的对象的使用是一次性的。一旦待完成的任务数量变为0,再调用await方法就不再阻塞当前线程,而是立即返回。
CountDownLatch类在很多场景下都可以得到应用。比如在使用多线程方式下载一个文件时,可以有一个主线程和若干个下载线程。在某个下载线程完成部分内容的下载任务时,调用countDown方法来进行声明,主线程则调用await方法等待所有部分下载完成。代码清单11-16给出了使用CountDownLatch类的一个示例。每个线程负责获取一个网页的内容并计算其内容的大小。当所有网页的大小都计算完成之后,由主线程对网页内容大小进行排序。在GetSizeWorker类的run方法中,当任务完成之后,通过countDown方法发出通知;在主线程中通过await方法来等待所有使用GetSizeWorker类的线程完成运行。
代码清单11-16 倒数闸门的使用示例
public class PageSizeSorter{
private static final ConcurrentHashMap<String, Integer>sizeMap=new ConcurrentHashMap<>();
private static class GetSizeWorker implements Runnable{
private final String urlString;
private final CountDownLatch signal;
public GetSizeWorker(String urlString, CountDownLatch signal){
this.urlString=urlString;
this.signal=signal;
}
public void run(){
try{
InputStream is=new URL(urlString).openStream();
int size=IOUtils.toByteArray(is).length;
sizeMap.put(urlString, size);
}catch(IOException e){
sizeMap.put(urlString,-1);
}finally{
signal.countDown();
}
}
}
private void sort(){
List<Entry<String, Integer>>list=new ArrayList<>(sizeMap.entrySet());
Collections.sort(list, new Comparator<Entry<String, Integer>>(){
public int compare(Entry<String, Integer>o1,
Entry<String, Integer>o2){
return Integer.compare(o2.getValue(),o1.getValue());
}
});
System.out.println(Arrays.deepToString(list.toArray()));
}
public void sortPageSize(Collection<String>urls)throws InterruptedException{
CountDownLatch sortSignal=new CountDownLatch(urls.size());
for(String url:urls){
new Thread(new GetSizeWorker(url, sortSignal)).start();
}
sortSignal.await();
sort();
}
}
3.循环屏障
循环屏障在作用上类似于倒数闸门,不过有几个显著的不同点。首先循环屏障不像倒数闸门一样是一次性的,可以循环使用。其次使用循环屏障的线程之间是互相平等的,彼此都需要等待对方完成。当一个线程完成自己的任务之后,等待其他线程完成。当所有线程都完成任务之后,所有线程才可以继续运行。当线程之间需要再次进行互相等待时,可以复用同一个循环屏障。
类java.util.concurrent.CyclicBarrier用来表示循环屏障。CyclicBarrier类的对象在创建时需要指定使用该对象的线程数目,还可以指定一个Runnable接口的实现对象作为每次所有线程之间完成相互等待之后执行的动作。当最后一个线程完成任务之后,在所有的线程继续执行之前,这个Runnable接口的实现对象会被运行。如果这些线程之间需要更新一些共享的内部状态,可以利用这个Runnable接口的实现对象来进行处理。每个线程在完成任务之后,通过调用await方法进入等待状态。等所有线程都调用了await方法之后,处于等待状态的线程都可以继续执行。在所有参与线程中,只要有一个在等待过程中被中断、出现超时或是其他错误,整个循环屏障会失效。所有处于等待状态的其他线程会抛出java.util.concurrent.BrokenBarrierException异常而结束。
代码清单11-17中给出了使用多个线程来查找质数的示例。每个线程负责查找给定数字区间范围内的质数。线程之间使用CyclicBarrier类的对象进行同步。当所有线程都完成一轮计算之后,会调用创建CyclicBarrier类的对象时提供的Runnable接口的实现。在这个Runnable接口实现中,检查当前已经找到的质数数目是否足够。如果已经足够,则设置标记变量done的值为true;否则,所有线程继续运行,在下一个区间中进行查找。这里需要注意的是,如果在调用await方法中出现异常,所有的线程都会结束。因此需要正确设置done的值,使得主线程在计算线程出错时也能正确处理。
代码清单11-17 使用多个线程来查找质数的示例
public class PrimeNumber{
private static final int TOTAL_COUNT=5000;
private static final int RANGE_LENGTH=200;
private static final int WORKER_NUMBER=5;
private static volatile boolean done=false;
private static int rangeCount=0;
private static final List<Long>results=new ArrayList<Long>();
private static final CyclicBarrier barrier=new CyclicBarrier(WORKER_NUMBER, new Runnable()){
public void run(){
if(results.size()>=TOTAL_COUNT){
done=true;
}
}
});
private static class PrimeFinder implements Runnable{
public void run(){
while(!done){
int range=getNextRange();
long start=range*RANGE_LENGTH;
long end=(range+1)*RANGE_LENGTH;
for(long i=start;i<end;i++){
if(isPrime(i)){
updateResult(i);
}
}
try{
barrier.await();
}catch(InterruptedException|BrokenBarrierException e){
done=true;
}
}
}
}
private synchronized static void updateResult(long value){
results.add(value);
}
private synchronized static int getNextRange(){
return rangeCount++;
}
private static boolean isPrime(long number){
//省略判断是否为质数的代码
}
public void calculate(){
for(int i=0;i<WORKER_NUMBER;i++){
new Thread(new PrimeFinder()).start();
}
while(!done){
}
//计算完成
}
}
4.对象交换器
对象交换器适合于两个线程需要进行数据交换的场景。在某些情况下,两个线程对于共享的对象有不同的处理逻辑。在两个线程都完成处理之后,处理的结果对象被交换给另外一个线程,由另外一个线程继续进行处理。类java.util.concurrent.Exchanger的作用是提供对这种对象交换能力的支持。两个线程共享一个Exchanger类的对象。一个线程完成对数据的处理之后,调用Exchanger类的exchange方法把处理之后的数据作为参数发送给另外一个线程。而exchange方法的返回结果是另外一个线程所提供的相同类型的对象。如果另外一个线程尚未完成对数据的处理,那么exchange方法会使当前线程进入等待状态,直到另外一个线程也调用了exchange方法来进行数据交换。
代码清单11-18给出了Exchanger类的使用示例。Sender和Receiver类分别负责准备各自的数据。任何一方完成准备之后,调用exchange方法来启动交换。等两者都完成任务之后,exchange方法会返回,两个线程得到了对方提供的对象。
代码清单11-18 对象交换器的使用示例
public class SendAndReceiver{
private final Exchanger<StringBuilder>exchanger=new Exchanger<String-Builder>();
private class Sender implements Runnable{
public void run(){
try{
StringBuilder content=new StringBuilder("Hello");
content=exchanger.exchange(content);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
}
private class Receiver implements Runnable{
public void run(){
try{
StringBuilder content=new StringBuilder("World");
content=exchanger.exchange(content);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
}
public void exchange(){
new Thread(new Sender()).start();
new Thread(new Receiver()).start();
}
}