11.6 终止任务

在前面的例子中,读者已经看到了使用“退出标志”或Cancelable接口以适当的方式来终止一个任务。这是解决该问题的合理的途径。然而,在某些情形下任务却必须要突然地结束。在本节中,读者将会学到有关这样终止任务所产生的后果和存在的问题。

首先,看一个示例,这个示例不仅示范了终止任务的问题,而且也是资源共享的另一个例子。为了介绍这个例子,首先需要解决输入输出流冲突的问题。

11.6.1 防止输入/输出流冲突

读者也许已经注意到了前面例子中的输出有时候会出现信息混淆的现象。当初创建C++输入/输出流时并没有考虑线程处理的事情,因此没有采取什么措施阻止一个线程的输出与其他线程输出之间的冲突。所以必须编写应用程序来处理输入/输出流同步的问题。

为了解决这个问题,首先需要创建全部的输出数据信息包,然后明确决定什么时候尝试将其发送到控制台。一个简单的解决办法是将信息写入一个ostringstream,然后用一个带有互斥锁的对象作为所有线程的输出点,以防多个线程同时写入数据:

11.6 终止任务 - 图1

11.6 终止任务 - 图2

通过这个办法,预先定义一个标准operator<<()函数,可以使用熟悉的ostream运算符在内存中构建对象。当一个任务需要显示输出时,它创建一个临时的ostringstream对象,用于构建想要的输出消息。当它调用output()时,互斥锁会阻止多个线程同时向该Display对象写入数据。(在程序中必须只使用一个Display对象,正如在下面例子中将看到的。)

这恰恰表现了其基本思想,但是如果必要的话,可以构建一个更精细的框架。例如,可以把它做成一个单件(Singleton)来强迫实现一个程序中仅有一个Display对象的要求。(ZThread库有一个Singleton模板用来支持单件。)

11.6.2 举例观赏植物园

在这个模拟程序中,公园委员会想要了解每天有多少人通过这个公园的多个入口进入。每个入口有一个十字转门或其他种类的计数器,当该十字转门的计数器增1之后,一个用来表示公园中游客总数的共享计数器也增1。

11.6 终止任务 - 图3

11.6 终止任务 - 图4

11.6 终止任务 - 图5

Count是一个类,它是用来保存公园游客数的主计数器。单个Count对象在main()中定义为count,同时count被作为一个CountedPtr实例保存在Entrance中,因此被所有Entrance对象共享。本例中,使用一个叫lock的FastMutex模板实例而非普通的Mutex,因为FastMutex使用本地操作系统的互斥锁并因此产生许多有趣的结果。

在increment()函数中,一个使用lock对象的Guard对象用来同步对count的访问。在大约一半时间,这个函数使用rand()来引发yield(),在这中间取来count的数据放入temp,并且使temp增1,再把temp存回到count之中。因为这个原因,如果注释掉Guard对象的定义,很快就会看到程序崩溃,因为多线程将会同时对count进行访问和修改。

Entrance类也持有一个本地的number,用来记录已通过这个特定入口的游客数。这里提供了对count对象的双重检验,以确保所记录的游客数正确。Entrance:run()仅使number变量和count对象增1,并休眠100毫秒。

在主函数中,一个vector<Entrance*>用于装载已经创建的每个Entrance。用户按下<Enter>键之后,该vector用来迭代所有的个体Entrance值并计算其总和。

这个程序在运行时遇到相当少的额外麻烦时,就会以一种稳定的方式关闭所有的对象。编写这个程序的部分原因是为了说明在结束多线程处理程序的执行时需要多么谨慎,还有部分原因是为了示范interrupt()函数的值,读者不久就会学到这些。

Entrance对象间发生的所有通信都要通过一个Count对象。当用户按下<Enter>键时,main()函数用pause()发送消息给count。由于每个Entrance:run()都在监视着count对象是否暂停下来,这将引发每个Entrance对象迁移到waitingForCancel等待状态,在这种状态下它将不再计数,但仍然处于活动状态。这是必要的,因为main()必须能安全迭代(遍历)在vector<Entrance>中的每个对象。注意,因为在一个Entrance完成计数并迁移至waitingForCancel等待状态之前,发生迭代的可能性很小(可以忽略),所以函数getValue()循环调用sleep()直到对象迁移至waitingForCancel等待状态。(这是被称为忙等待(busy wait)的形式之一,是不受欢迎的。稍后会在本章中看到首选的解决办法,它使用了wait()函数。)一旦main()完成了对vector<Entrance>的一次遍历迭代,cancel()消息就会被送至count对象。再强调一次,所有Entrance对象都会监视这个状态变化。在这点上,它们打印一条终止消息并从run()中退出,这导致每个任务都会被线程处理机制销毁掉。

当程序运行时,将看到总的计数和当一个游客走过十字转门时每个入口的计数显示。如果注释掉Count:increment()中的Guard对象,读者就会注意到游客总数不再是预期的值了。每个十字转门所统计的游客数都与count中的值不同。只要互斥锁Mutex在那里同步对Counter的访问,一切就会正常进行。切记:在这里,Count:increment()使用temp和yield()函数放大了失败的潜在可能。在实际的线程处理问题中,从统计学上来说失败的可能性很小,所以读者会很容易陷入相信不会有什么问题会发生的陷阱。就像在上面的例子中,可能会有一些隐藏的问题并没有在这个程序里发生,所以在对并发程序的代码进行复审时应格外仔细。

原子操作

注意,Count:value()使用一个Guard对象进行同步并返回count的值。这就提出一个有趣的观点,因为这段代码不用同步大概也可以在大部分编译器和操作系统上良好运行。其理由就是,在一般情况下一个简单的操作比如返回一个int型变量就是一个原子操作(atomic operation),这意味着或许它在一个微处理器指令中完成而不会被中断。(多线程处理机制不能在一个微处理器指令中间停止一个线程。)也就是说,原子操作不能被线程处理机制中断,因此不需要被保护。[1]实际上,如果删除取count的值送到temp的操作,并且删除yield()函数,代之以仅直接count增1操作,这样或许不需要进行互斥加锁处理也会工作得很好,因为增1操作通常也是原子操作。[2]

问题在于C++标准并不能保证任何这类操作的原子性。虽然诸如像返回一个int型值的操作,和对一个int型的值进行增1的操作在大多数计算机上几乎确定地是原子的,但是这并没有保证。正因为没有保证,所以必须考虑最坏的情况。有时可能要调查特定计算机(经常要通过阅读汇编语言)上的原子行为并根据这种假设编写代码。那总归是危险的、欠谨慎的做法。以上相关的信息很容易丢失或者被隐藏,其他人可能会认为这段代码可以被移植到其他机器上,当移植后就会发疯般地追踪由线程冲突而引发的偶然的错误。

所以,虽然从Count:value()上删除保护似乎可以照常工作,但并不是无懈可击的,因此可能会在某些机器上看到偏离常规的行为。

11.6.3 阻塞时终止

前面例子中的Entrance:run()在主循环中包含一个sleep()调用。我们知道在那个例子中sleep()休眠最后会被唤醒,在任务到达循环的顶部时检查isPaused()的状态,便有机会跳出循环。然而,sleep()仅是一个线程在其执行过程中被阻塞的一种情况,有时必须终止一个被阻塞的任务。

1.线程状态

一个线程可以处于以下4种状态之一:

1)新建(New)状态:一个线程只是在被创建的瞬间暂时地保持这个状态。它分配任何必需的系统资源并完成初始化。在这一点它有资格获得CPU时间。线程调度器随后将把该线程转换到可运行或阻塞状态。

2)可运行(Runnable)状态:这个状态意味着当时间分片机制为该线程分配可利用的CPU周期时,线程就可以运行。因此,在任何时刻,某个线程可能运行也可能不运行,但是如果线程调度器安排它,则没什么事情会阻止其运行;这时,它既不处于死亡状态,也不处于阻塞状态。

3)阻塞(Blocked)状态:线程可以运行了,但有某种事件阻止了它的运行。(比如,它也许正在等待I/O操作完成。)当一个线程处于阻塞状态时,线程调度器会忽略该线程并且不分配给它任何CPU时间。直到线程重新进入可运行状态之前,它不执行任何操作。

4)死亡(Dead)状态:一个处于死亡状态的线程,不能再被调度也不能获得任何CPU时间。它的任务已经完成,不再是可运行的。使一个线程消逝的正常的办法就是让它从run()函数返回。

2.变为阻塞状态

当一个线程不能继续运行时它就处在阻塞状态。一个线程变为阻塞状态的原因如下:

·调用sleep(milliseconds)使线程进入休眠状态,在这种情况下该线程在指定时间内不会运行。

·已经使用wait()挂起了该线程的运行。在得到signal()或broadcast()消息之前它不会再一次变为可执行状态。我们在后面的小节里将检验这些问题。

·线程正在等待某个I/O操作完成。

·线程正在尝试进入一段被一个互斥锁保护的代码块,而那个互斥锁已经被其他线程获得。

现在需要注意的问题是:有时需要在某个线程处于阻塞状态时终止它。线程在执行到代码中的某一点上能自己检查状态值并决定结束运行,如果不能等待线程到达代码中的这一点,那么就必须强迫线程脱离阻塞状态。

11.6.4 中断

正如想象的那样,在一个Runnable:run()函数的中间跳出,会比等待函数到达isCanceled()函数的检查点(或者程序员准备离开函数的其他地方)时跳出显得更加混乱。当从被阻塞的任务中离开时,可能需要销毁与之相关的对象并清理有关的资源。正因为这样,在一个任务的run()中间跳出更像是抛出一个异常,所以在ZThread库中,异常被用于此类退出。(这样处于不适当使用异常的边缘,因为这意味着经常把异常用于控制流。)[3]为了在以此方式结束一个任务时能返回到一个已知的正确状态,要谨慎地考虑代码的执行路径,在catch子句中正确清除所有的东西。在本节,读者会看到就这些问题的介绍。

为了终止一个阻塞的线程,ZThread库提供了Thread:interrupt()函数。这个函数用来为那类线程设置中断状态(interrupted status)。一个使用了中断状态设置的线程,如果已经被阻塞或尝试进行阻塞操作时将会抛出一个Interrupted Exception异常。当异常被抛出或者假如任务调用了Thread:interrupted()时,中断状态将重新设置。正如读者所见,Thread:interrupted()提供了不用抛出异常而离开run()函数中循环的第2条途径。

这里的例子显示了interrupt()的基本功能:

11.6 终止任务 - 图6

11.6 终止任务 - 图7

可以看到,除了将插入数据到cout之外,阻塞还能发生在run()函数中包含的其他两个地点:即对Thread:sleep(1000)和cin.get()的调用。可给程序传递任何命令行参数,可以通知main()休眠足够长的时间,以便任务到时候能结束它的sleep()和调用cin.get()。[4]如果不给程序传递参数,就会跳过main()中的sleep()。在这里,在任务休眠时发生了对函数interrupt()的调用。读者将会看到,这将导致Interrupted Exception异常被抛出。如果给程序一个命令行参数,就会发现如果一个任务被阻塞在IO操作上,它不能被中断。也就是说,除了IO操作,一个任务可以从任何阻塞操作中中断出来。[5]

如果正在创建一个执行IO操作的线程,这还是让人有点困惑,因为这意味着I/O有使多线程处理程序死锁的潜在可能性。问题在于,再次强调,在设计思想上C++没有被设计成使用线程处理;恰恰相反,它假装线程处理并不存在。因此,输入输出流库不是线程友好(thread-friendly)的。如果新的C++标准决定增加对线程的支持,输入输出流库也许需要重新考虑其处理方法。

1.被一个互斥锁阻塞

如果试图调用一个函数,而该函数的互斥锁已经被别的线程获得了,那么这个调用该函数的任务就会被挂起,直到该互斥锁变成可获得时为止。下面的例子测试了这种阻塞是否可被中断。

11.6 终止任务 - 图8

11.6 终止任务 - 图9

BlockedMutex类有一个构造函数,它获得对象自己的互斥锁Mutex并且绝不释放它。由于这个原因,如果试图调用f(),总会被阻塞,因为该互斥锁Mutex不能被获得。在Blocked2中,run()函数将因此停止在对blocked.f()的调用上。当运行程序时就会看到,和IO流的调用不同,interrupt()能够跳出已被一个互斥锁阻塞的调用。[6]

2.中断检查

注意,当在一个线程上调用interrupt()时,中断仅发生在任务进入一个阻塞操作的那一时刻,或者已经在一个阻塞操作内(正如你所知道的,假如在IO的情况下,就会陷在里面)。但是,编写什么样的代码,才能使是否产生这样的阻塞调用依赖于它的运行条件呢?如果只能通过在一个被阻塞的调用上抛出异常来退出,也许始终不能离开run()循环。因此,假如调用interrupt()来停止一个任务,如果在run()循环没有发生任何阻塞调用,该任务就需要另外的机会来退出。

中断状态(interrupted status)提供了这样的机会,它通过调用interrupt()进行设置。而调用interrupted()来检查中断状态,这不仅能告知interrupt()是否已经被调用,它也会清除中断状态。清除中断状态可以确保整个架构不会两次通知正被中断的任务。它会用一个Interrupted Exception异常或者一个成功的Thread:interrupted()测试来通知使用者。如果想再次检查是否被中断了,在调用Thread:interrupted()时可以把测试结果存储起来。

下面的例子显示了当设置了中断状态时,run()函数中在处理阻塞和非阻塞两种可能性的情况下所要用到的典型的习语:

11.6 终止任务 - 图10

11.6 终止任务 - 图11

如果采用抛出异常来离开循环,NeedsCleanup类强调了对相关资源进行正确清理的必要性。注意,在Blocked3:run()中没有定义指针,那是为了异常的安全,所有的资源必须封装在基于栈的对象中,以便异常处理器可以调用析构函数来自动清理它们。

必须在调用interrupt()之前给程序传递一个命令行参数,此参数为用毫秒表示的延迟时间。使用不同的延迟,能从循环中不同的地点退出Blocked3:run()函数:从正处于阻塞状态的sleep()调用中退出,以及从非阻塞状态的数学计算中退出。可以看到,如果interrupt()在标签point2后被调用(非阻塞操作期间)。首先循环已经完成,其次所有的本地对象被析构,最后循环经由while语句在顶部退出。然而,如果interrupt()在point1和point2之间被调用(在while语句之后,但是在阻塞操作sleep()之前或之中),任务通过Interrupted Exception异常退出。在这种情况下,只有在异常被抛出的位置之前已经被创建完成的栈对象才会被清理,并且有机会在catch子句中完成其他的清理操作。

设计用来响应interrupt()函数的类必须建立一种策略,以便保证其能保持一致的状态。这通常意味着,所有的资源获取都要封装在基于栈的对象中,以便无论run()循环如何退出,对象的析构函数都会被调用。如果正确地做了,像这样的代码一定是优雅的。在没有向对象接口中加入任何特别的函数的情况下,可以创建出完全封装了其同步机制,但仍能对外部激励(通过interrupt())有响应的组件。

[1]这样说过于简单。有时甚至当它看上去好像是一个原子操作且会是安全的时候它却可能不是,所以当决定不使用同步时必须非常小心。删除用于同步的代码通常是过度优化的一个标志—它会导致我们陷入很多麻烦中而且不会得到更多好处,甚至得不到任何东西。

[2]原子性不是惟一的问题。在多处理器系统上,可见性问题比在单处理器上多得多。一个线程所做的改变,即便它们在不能被中断的意义上来说是原子的,对其他线程来说仍然有可能是不可见的(比如,这些改变会被暂时存储在本地处理器缓存中),所以不同的线程会看见应用程序的不同状态。同步机制迫使一个线程做出的改变在多处理器系统上是跨应用程序可见的,然而不使用同步,这些变化何时会变为可见是不确定的。

[3]无论如何,在ZThread中异常绝不会被异步发送。因此退出中间指令或函数调用不会有危险。并且只要我们使用Guard模板获得互斥锁,那么如果抛出异常的话,互斥锁会被自动释放。

[4]实际上,sleep()只提供最小的延迟,不是保证延迟,所以可能(尽管不可思议)sleep(1100)会在sleep(1000)之前被唤醒。

[5]C++标准中没有说明在IO操作期间中断不能出现。然而大多数实现不支持它。

[6]注意,尽管不太可能,对t.interrupt()的调用实际可以发生在对blocked.f()的调用之前。