11.6.2 多阶段线程同步工具

Phaser类是Java SE 7中新增的一个实用同步工具,所提供的功能和灵活性比之前介绍的倒数闸门和循环屏障要强很多。在fork/join框架中的子任务之间要进行同步时,应该优先使用Phaser类的对象。Phaser类的特点是把多个线程协作执行的任务划分成多个阶段(phase),在每个阶段上都可以有任意个参与者参与。线程可以随时注册并参与到某个阶段的执行中来。当一个阶段中所有的线程都成功完成之后,Phaser类的对象会自动进入下一个阶段。如此循环下去,直到Phaser类的对象中不再包含任何参与者。此时,Phaser类的对象的运行自动结束。

在Phaser类的对象被创建出来之后,其初始的阶段编号为0。在Phaser类的构造方法中可以指定初始的参与者的个数,也可以在创建出来之后,使用register方法或bulkRegister方法来动态添加一个或多个参与者。当某个参与者完成其任务之后,调用arrive方法来进行声明。有的参与者完成一次执行之后就不再继续参与,可以调用arriveAndDeregister方法在声明完成之后取消自己的注册。如果参与者在完成之后需要等待其他参与者完成,那么可以使用arriveAndAwaitAdvance方法。调用arriveAndAwaitAdvance方法的线程会阻塞,直到Phaser类的对象成功进入下一阶段。当需要等待Phaser类的对象进入下一个阶段时,可以使用awaitAdvance和awaitAdvanceInterruptibly方法。两个方法的参数是当前的阶段编号。使用awaitAdvanceInterruptibly方法时可以设置超时时间和处理中断请求。

当某个阶段中的所有参与者都完成任务之后,Phaser类的对象的onAdvance方法会被调用,可以通过覆写此方法来添加自定义的处理逻辑。该方法的作用类似于在创建表示循环屏障的CyclicBarrier类的对象时使用的Runnable接口的实现对象。如果onAdvance方法的返回值为true,则该Phaser类的对象会被终止。默认的实现是在参与者的数量为0时终止该Phaser类的对象。可以通过覆写onAdvance方法来使用不同的终止逻辑。要强制终止一个Phaser类的对象,可以使用forceTermination方法。

Phaser类的一个重要特征是多个Phaser类的对象可以组织成树形结构。这种树形结构正好与fork/join框架中子任务可能形成的树形结构相对应。Phaser类提供了一个构造方法来指定当前对象的父对象。当一个子Phaser类的对象中的参与者个数大于0时,它会被自动注册到父对象中;当参与者个数为0时,它会被自动从父对象中解除注册。

Phaser类的功能很强大,在实际中可以替代CountDownLatch类和Cyclic-Barrier类。代码清单11-21给出了Phaser类的使用示例。该示例的Java类WebPageImageDownloader用来下载一个网页中包含的图片文件。整个下载过程由多个线程共同参与,并分成几个阶段。第一个阶段是由主线程负责下载页面的内容,并分析其中包含的图片文件的链接地址。在创建Phaser类的对象时,声明初始的参与者个数为1,即只有主线程参与。对每个图片文件使用单独的线程来下载。在创建图片下载线程时,通过register方法把每个下载线程注册到Phaser类的对象中。在每个图片下载线程的run方法中先通过arriveAndAwaitAdvance方法等待其他线程创建完成。由于主线程也是参与者,同样需要通过调用arriveAndAwaitAdvance方法来进行等待。等所有线程都完成创建之后,Phaser类的对象进入下一个阶段。下载图片文件的线程开始进行下载,而主线程则通过第二个arriveAndAwaitAdvance方法等待所有下载线程完成运行。每个下载线程在完成任务之后,通过arriveAndDeregister方法进行声明,同时解除在Phaser类的对象上的注册。当所有下载线程都完成并解除注册之后,主线程成为Phaser类的对象中唯一的参与者。主线程最后通过arriveAndDeregister方法来解除注册,Phaser类的对象由于不存在任何参与者而自动关闭。

代码清单11-21 Phaser类的使用示例


public class WebPageImageDownloader{

private final Phaser phaser=new Phaser(1);

private final Pattern imageUrlPattern=Pattern.compile("src=['\"]?(.*?(\.jpg|\.gif|\.png))['\"]?[\s>]+",Pattern.CASE_INSENSITIVE);

public void download(URL url, final Path path, Charset charset)throws IOException{

if(charset==null){

charset=StandardCharsets.UTF_8;

}

String content=getContent(url, charset);

List<URL>imageUrls=extractImageUrls(content);

for(final URL imageUrl:imageUrls){

phaser.register();

new Thread(){

public void run(){

phaser.arriveAndAwaitAdvance();

try{

InputStream is=imageUrl.openStream();

Files.copy(is, getSavedPath(path, imageUrl),StandardCopyOption.REPLACE_EXISTING);

}catch(IOException e){

e.printStackTrace();

}finally{

phaser.arriveAndDeregister();

}

}

}.start();

}

phaser.arriveAndAwaitAdvance();

phaser.arriveAndAwaitAdvance();

phaser.arriveAndDeregister();

}

private String getContent(URL url, Charset charset)throws IOException{

InputStream is=url.openStream();

return IOUtils.toString(new InputStreamReader(is, charset.name()));

}

private List<URL>extractImageUrls(String content){

List<URL>result=new ArrayList<URL>();

Matcher matcher=imageUrlPattern.matcher(content);

while(matcher.find()){

try{

result.add(new URL(matcher.group(1)));

}catch(MalformedURLException e){

//忽略

}

}

return result;

}

private Path getSavedPath(Path parent, URL url){

//省略获取图片保存路径的代码

}

}