11.5.2 底层同步器

在一个多线程程序中,线程之间可能存在多种不同的同步方式。除了Java标准库提供的同步方式之外,程序中特有的同步方式需要由开发人员自己来实现。在这些同步方式中,一类比较常见的需求是对有限个共享资源的同步访问,多线程程序中的很多场景都可以抽象成这类同步方式。比如对某个监视器对象的互斥访问,实际上是多个线程在竞争唯一的一个资源。如果系统中安装了两个打印机,那么需要进行打印操作的多个线程相互竞争这两个资源。当多个线程在等待同一个资源时,从公平的角度出发,这些线程会被放入到一个先入先出(FIFO)的队列中。当资源变成可用时,处于队首的线程会获取该资源。

如果程序中的同步方式可以抽象成对有限个资源的同步访问,那么可以使用java.util.concurrent.locks包中的AbstractQueuedSynchronizer类和AbstractQueued-LongSynchronizer类作为实现的基础。这两个类的作用是相同的,只不过Abstract-QueuedSynchronizer类在内部使用一个int类型的变量来维护内部状态,而AbstractQueuedLongSynchronizer类使用long类型的变量。可以将这个内部变量理解成共享资源的个数。通过getState、setState和compareAndSetState这3个方法来更新这个内部变量的值。在下面的介绍中使用AbstractQueuedSynchronizer类来说明。

AbstractQueuedSynchronizer类是声明为abstract的,因此需要继承并覆写其中包含的部分方法后才能使用。通常的做法是把AbstractQueuedSynchronizer类的子类作为一个Java类的内部类。外部的Java类提供具体的同步方式,而Abstract-QueuedSynchronizer类的对象则作为实现的基础。实际使用AbstractQueuedSynchronizer类需要经过几个步骤。首先确定如何把需要同步访问的资源映射到Abstract-QueuedSynchronizer类的内部变量上。一般用内部变量的值表示当前可用资源的数量。接着选择使用排他模式或共享模式。共享模式允许多个线程同时获取所管理的资源,而排他模式在同一时刻只允许一个线程获取资源。选择好模式之后,继承自AbstractQueuedSynchronizer类的子类需要实现对应的资源获取和释放方法。使用排他模式时需要实现的方法是tryAcquire和tryRelease,而使用共享模式时对应的方法是tryAcquireShared和tryReleaseShared。在这些方法的实现中,使用getState、setState和compareAndSetState这3个方法来修改内部变量的值,以此来反映资源的状态。在对资源进行同步访问的Java类中需要创建一个AbstractQueuedSynchronizer类的子类的对象,并通过该对象来完成实际的资源获取和释放的同步操作。

代码清单11-14给出了一个基于AbstractQueuedSynchronizer类实现的对资源进行管理的SimpleResourceManager类。内部类InnerSynchronizer继承自AbstractQueued-Synchronizer类并覆写了tryAcquireShared和tryReleaseShared方法,说明对资源的访问使用的是共享模式,直接用内部变量的值表示资源的当前可用数量。在这两个方法的实现中,通过getState方法获取当前的状态值,并根据需要使用compareAndSetState方法来更新获取和释放操作之后的新状态值。在外部类中对资源进行访问的方法只是简单地调用InnerSynchronizer类的对象上的适当方法来完成的。

代码清单11-14 基于AbstractQueuedSynchronizer类的资源管理实现


public class SimpleResourceManager{

private final InnerSynchronizer synchronizer;

private static class InnerSynchronizer extends AbstractQueuedSynchronizer{

InnerSynchronizer(int numOfResources){

setState(numOfResources);

}

protected int tryAcquireShared(int acquires){

for(;){

int available=getState();

int remaining=available-acquires;

if(remaining<0||

compareAndSetState(available, remaining)){

return remaining;

}

}

}

protected boolean tryReleaseShared(int releases){

for(;){

int available=getState();

int next=available+releases;

if(compareAndSetState(available, next)){

return true;

}

}

}

}

public SimpleResourceManager(int numOfResources){

synchronizer=new InnerSynchronizer(numOfResources);

}

public void acquire()throws InterruptedException{

synchronizer.acquireSharedInterruptibly(1);

}

public void release(){

synchronizer.releaseShared(1);

}

}