11.7 线程间协作

正如读者所看到的,当使用线程在同一时刻运行多个任务时,可以使用互斥锁来同步两个任务的行为的方法,来阻止一个任务干扰另一个任务的资源。也就是说,如果两个任务对一个共享资源(通常是内存)相互争夺,就要使用互斥锁来保证在同一时刻只允许一个任务访问那个资源。

在这个问题解决之后,可以继续考虑线程间协作的问题,以便多个线程能一起工作来共同解决某个问题。现在问题不在于线程之间的彼此干扰,而在于其和谐工作,由于问题的某一部分必须在另外一部分能被解决之前解决完毕。这更像是一个工程进度表:必须先挖房屋的地基,但是钢结构构件的铺设和混凝土构件可以并行建造,这些任务都必须在混凝土基础浇注之前完成。管道设备必须在混凝土平板浇注好之前放置好,而混凝土平板要在开始搭建框架结构之前安置,等等。这些任务中有些可以并行进行,但是某些步骤则要求在完成其他所有任务之后才能继续进行。

这些任务协作时的关键问题是这些任务间的“握手”。为完成这个握手过程,使用相同的基础:互斥机制,互斥机制在这种情况下可以保证只有一个任务响应信号。这就消除了任何可能的竞争条件。要熟练掌握互斥锁,这里为任务增加了一个方法,让它把自己挂起来,直到某些外部状态发生改变(例如“管道设备现在已经就位”),表明此时任务可以向前进行。在本节中,读者会看到任务间的握手问题,在握手期间会出现的问题,以及这些问题相应的解决方法。

11.7.1 等待和信号

在Zthread库中,使用互斥锁并允许任务挂起的基类是Condition,可以通过在条件Condition上调用wait()挂起一个任务。当外部状态发生改变时,这种改变也许意味着某个任务应该继续进行处理。调用信号函数signal()可以通知该任务而唤醒它,或者调用broadcast(),而唤醒所有在那个Condition对象上被挂起的任务。

wait()有两种形式。第1种形式接受一个毫秒数作为参数,这个参数与sleep()函数中的参数有相同的含义:“在这段时间暂停”。wait()的第2种形式不要参数;这种形式更常见。这两种形式的wait()都会释放被Condition对象所控制的互斥锁Mutex,并且会挂起线程直到Condition对象收到一个signal()或者broadcast()。如果超时,第1种形式在接收到signal()或broadcast()之前也可以结束。

因为wait()会释放Mutex,这意味着该Mutex可以被其他线程获得。因此,当调用wait()时,就相当于说:“现在已经做完了该做的所有事情,我将在此等待,但是我希望如果其他同步操作可以执行的允许它们执行。”

典型的情况是,当在等待某个条件的改变时就使用wait(),而这个条件的改变在当前函数之外的因素控制之下进行。(这些条件常常会被另一个线程改变。)在线程内检测条件时,你不希望进行空循环等待;这就是所谓的“忙等待”,而“忙等待”通常会大量占用CPU周期。因此,wait()在等待外部条件变化时挂起线程,只在signal()或broadcast()被调用时(暗示某些相关事件已经发生),唤醒线程并检测发生的变化。因此,wait()为同步线程之间的活动提供了一种方法。

下面看一个简单的例子。WaxOMatic.cpp有两个进程:一个进程给Car上蜡,另一个进程给Car抛光。抛光进程在上蜡进程完成前不能进行其工作,并且上蜡进程在汽车可以再穿上另一个蜡外套之前必须等待直到抛光进程完成。WaxOn和WaxOff都使用了Car对象,这个Car对象包含了一个用于挂起一个在waitForWaxing()或waitForBuffing()内的线程的Condition。

11.7 线程间协作 - 图1

11.7 线程间协作 - 图2

在Car的构造函数中,一个互斥锁Mutex被封装于Condition对象中,以便Mutex可以用于管理任务间的通信。然而,Condition对象不包含有关进程状态的信息,所以还要管理另外的信息用来指出进程的状态。在这里,Car有一个bool waxOn,这个布尔变量指出上蜡、抛光进程的状态。

在waitForWaxing()中检查waxOn标志,如果它是false则调用中的线程通过调用Condition对象上的wait()被挂起。重要的是,这发生在一个被保护的子句中,在这个子句中该线程获得了互斥锁(在这里,是通过创建一个Guard对象获得的)。当调用wait()时,该线程被挂起并释放互斥锁。释放互斥锁是必要的,因为为了安全地改变对象的状态(比如,把waxOn的值变为true,为了使被挂起的线程继续进行下去,这是必须做的),互斥锁必须能够被一些其他任务获得。在本例中,当其他线程调用waxed()来告知被挂起的线程该去做某个工作的时候,互斥锁必须能获得以便把waxOn的值变为true。然后,waxed()向Condition对象发送一个信号signal(),由它来唤醒那个在调用wait()中被挂起的线程。虽然可以在一个被保护的子句中调用信号signal()—就像这里一样—但并不要求这样做。[1]

为了从等待wait()中唤醒一个线程,必须首先重新获得其在进入wait()时释放的互斥锁。直到互斥锁变成可用之前,该线程不会被唤醒。

wait()的调用被置于一个while循环内部,用这个循环来检查相关的条件。这很重要,基于以下两个原因:[2]

·很可能当某个线程得到一个信号signal()时,其他一些条件可能已经改变了,但这些条件在这里与调用wait()的原因无关。如果有这种情况,该线程在其相关的条件改变之前将再一次被挂起。

·在该线程从其wait()函数中醒来之时,可能另外某个任务改变了一些条件,因此这个线程就不能或者没兴趣在此时执行其操作了。再次强调,它应再次调用wait()而被重新挂起。因为这两个原因在调用wait()时总会出现,故总要编写在while循环内调用wait()的一段程序来测试与线程相关的条件。

WaxOn:run()代表给汽车上蜡进程中的第1步,所以它执行其操作(调用sleep()来模拟上蜡所需要的时间)。然后它告知汽车上蜡完毕,并调用waitForBuffing(),该函数使用wait()挂起线程,直到WaxOff进程为汽车调用buffed(),改变状态并调用notify()。另一方面,WaxOff:run()立即迁移到waitForWaxing(),并因此被挂起直到由WaxOn将上蜡工作完成并且waxed()被调用。当运行此程序时可以看到,将控制权在两个线程之间来回传递,从而使这两步进程交替重复执行。当按下回车(<Enter>)键时,interrupt()停止这两个线程的运行—当为一个执行器对象Executor调用interrupt()时,它为其控制下的所有线程调用interrupt()。

11.7.2 生产者-消费者关系

线程处理问题中的一个常见的情形是生产者-消费者(producer-consumer)关系,一个任务创建对象而另一个任务消费这些对象。在这种情况下,要确定(在其他事件中)进行消费的任务不会意外遗漏掉已产生的任何对象。

为了说明该问题,考虑一个有3个任务的机器:一个任务是制作烤面包,一个任务是给烤面包抹黄油,还有一个任务是往抹好黄油的烤面包上抹果酱。

11.7 线程间协作 - 图3

11.7 线程间协作 - 图4

11.7 线程间协作 - 图5

这些类以逆序定义,这样做的目的是简化前向引用(forward-referencing)的操作问题。

Jammer和Butterer都包含一个Mutex对象、一个Condition对象和一些内部状态信息。通过改变这些内部状态信息的状态,来指出进程要被挂起或恢复执行。(Toaster不需要这些,因为它是生产者,无需等待任何事情。)两个run()函数都执行同一个操作,设置一个状态标志,然后调用wait()来挂起任务。moreToastReady()和moreButtered ToastReady()函数改变各自的状态标志,以指示某些事情已经发生了改变,进程现在要考虑恢复执行,然后调用信号signal()唤醒该线程。

本例与前面例子的不同之处在于,至少从概念上讲,这里生产了一些东西:烤面包。烤面包的生产率有点随机化,这就增加了不确定性。读者将会看到,在运行程序时可能会出错,因为会有许多片烤面包掉在地板上—没抹黄油,也没抹果酱。

11.7.3 用队列解决线程处理的问题

线程处理问题常常基于需要对任务进行串行化上—也就是说,要使事情有序地进行处理。ToastOMatic.cpp不仅必要注意让事情有序,还必须能够加工好烤面包片,而且在此期间不用担心它会掉到地板上。使用队列可以采用同步的方式访问其内部元素,这样就可以解决很多线程处理问题:

11.7 线程间协作 - 图6

这是通过在标准C++库的双端队列deque上添加下面的内容建立起来的:

1)加入同步以确保在同一时刻不会有两个线程添加对象。

2)加入wait()和signal()以便在队列为空时让消费者线程自动挂起,并在有多个元素可用时恢复执行。

这些相对量较少的代码能解决为数可观的问题。[3]

这里有个简单的测试程序,将对LiftOff对象的串行化执行进行测试。消费者是LiftOffRunner,它把每个LiftOff对象从TQueue中取出来并直接执行。(也就是说,它通过显式调用run()来使用自己的线程,而不是为每个任务启动一个新线程。)

11.7 线程间协作 - 图7

任务被main()函数置于TQueue队列上,被LiftOffRunner从TQueue队列上取走。注意,LiftOffRunner可以忽略同步问题,因为这些问题由TQueue来解决。

适当地进行烘烤

为解决ToastOMatic.cpp中存在的问题,我们可以在加工进程期间使用TQueue管理烤面包。为了做到这点,需要实际的烤面包对象,它们保持并显示了其状态:

11.7 线程间协作 - 图8

11.7 线程间协作 - 图9

11.7 线程间协作 - 图10

在这个解决方案中,两件事情会马上变得很明显:第一,在每个Runnable类中代码的数量和复杂性通过队列TQueue的使用会显著减少,因为进行保护、通信,以及wait()/signal()操作现在都由TQueue来维护。Runnable类不再拥有任何Mutex或Condition对象。第二,类之间的耦合被消除了,因为每个类只与它的TQueue通信。注意,现在类的定义次序是独立的。较少的代码和较少的耦合总归是一件好事,这暗示着在这里TQueue的使用有积极作用,就像在大多数问题中它所做的那样。

11.7.4 广播

signal()函数唤醒了一个正在等待Condition对象的线程。然而,也许会有多个线程在等待某个相同的条件对象,在这种情况下需要使用broadcast()而不是signal()把这些线程都唤醒。

现在考虑一个假想的制造汽车的机器人流水线,作为一个例子它集中体现了本章中的许多概念。每辆Car将在几个阶段内装配完成,在本例中将看到这样一个阶段:底盘制造好后,在这段时间里安装附属的发动机、驱动传动装置(drive train)和车轮。通过一个CarQueue将Car从一个地方传送到另一个地方,CarQueue是一个TQueue的类型。一个Director从进来的CarQueue队列中取出每辆Car(作为一个未加工的底盘)并放置在一个Cradle中,所有工作都在这里完成。在这个地方,主管Director通知所有正在等待的机器人(使用广播broadcast()),Car已经在Cradle中准备就绪,机器人们可以进行装配工作了。三种类型的机器人开始进行工作,当它们完成任务时给Cradle发送一个消息。Director一直等到所有任务都完成后,把Car放到出去的CarQueue队列上传送到下一个工序。在这里,出去的CarQueue队列的消费者是个Reporter对象,它仅打印该Car,说明那个任务已正确地完成了。

11.7 线程间协作 - 图11

11.7 线程间协作 - 图12

11.7 线程间协作 - 图13

11.7 线程间协作 - 图14

11.7 线程间协作 - 图15

注意,Car走了一个捷径:它假设布尔操作是原子的,就像以前讨论过的那样,有时候这是一个安全的假定,但需要周密考虑。[4]每个Car从一个未加工的底盘开始,不同的机器人给它装配上不同的部分,当它们去做这件事时要调用适当的“add”函数。

ChassisBuilder只是每秒钟创建一个新的Car,把它放进chassisQueue队列中。Director通过把下一个Car从chassisQueue队列中取出,把它放入Cradle,而通知所有机器人去startWork(),并通过调用waitUntilWorkFinished()挂起自己等一系列操作来管理装配进程。当工作完成时,Director把Car从Cradle中取出并放入finishingQueue。

Cradle是发送信号操作的关键。互斥锁Mutex和Condition条件对象控制着两件事情:机器人进行的工作和显示所有的操作是否已经完成。一个特定类型的机器人能够通过调用与其类型相适应的“提供”函数将其服务提供给Cradle。在这个地方,机器人线程被挂起,直到Director调用开始工作函数startWork(),它改变雇佣标志(hiring flag)并调用broadcast()来通知所有机器人出来工作。虽然这个系统允许任意数量的机器人提供服务,但每个机器人为做这些工作需要挂起自己的线程。可以想象一个更复杂的系统,在该系统中各种机器人在不同的Cradle里面注册自己,并没有被注册进程挂起。然后将它们存在一个对象池中,等待第1个需要完成某种任务的Cradle。

每个机器人完成了它的任务(改变进程中Car的状态)后,它就调用taskFinished(),此函数向readyCondition发送一个信号signal(),而这正是Director在waitUntilWorkFinished()函数中所等待的。每次主管(director)线程醒来,都会检查Car的状态。如果它仍然未完成,这个线程会被再次挂起。

当Director将一个Car插入到Cradle中时,可以通过运算符operator->()在Car上执行操作。为了防止多次提取同一辆汽车,用一个标志引发产生一个错误报告。(在ZThread库中异常不能跨线程传播。)

在main()函数中,随着ChassisBuilder开始持续启动进程,所有必需的对象都被创建并且所有的任务都被初始化。(然而,因为TQueue的行为,如果它先启动也没关系。)注意,这个程序遵循了本章给出的所有关于对象和任务生存周期的指南,故停止进程是安全的。

[1]这与Java相反,在Java中必须持有锁才能调用notify()(Java版的signal())。尽管Posix线程不要求必须持有锁才能调用signal()或broadcast(),但是这种做法是推荐的做法。ZThread库松散基于Posix线程。

[2]在一些平台上有第3种办法跳出wait(),即所谓伪唤醒(spurious wakeup)。一个伪唤醒本质上意味着一个线程过早地停止了阻塞(当等待一个条件变量或信号量时)而没有被signal()或broadcast()激活。线程就像是自己醒过来一样。伪唤醒存在的原因是,在某些平台上实现POSIX线程或类似的东西,并不像它在某些平台上那样直截了当。对这些平台来说,允许伪唤醒能够简化建立类似pthread库的工作。ZThread中不存在伪唤醒,因为该库弥补并对用户隐藏了这些问题。

[3]注意,如果读者由于某些原因停止了读,写者将继续写入直到系统内存用完。如果这是用户的程序存在的一个问题,用户可以添加所允许的最大元素计数,队列满时写者线程将被阻塞。

[4]详细说明,请参考本章先前关于多处理器和可见性的注释。