前面我们通过一篇文章讲过多线程应用开发过程中,我们遇到的如何对线程进行设计和应用的问题,其中我们提到了一个分离和联合线程模型。
在JAVA的1.7版以后,它提供了一个Fork/Join框架。用来实现一种常见的多线程处理模型设计。
Fork/Join框架的基类是
java.util.concurrent.ForkJoinPool。
这个类实现了Executor和ExecutorService两个接口以及AbstractExecutorService抽象类。
所以ForkJoin是一个特殊的线程池实现。
因此,ForkJoin线程池基本上是一个执行特殊任务的线程池,即ForkJoinTask。
这个类实现了已知的接口Future,并使用了get()、cancel()和isDone()等方法。
除此之外,这个类还提供了两个方法,它们为整个框架提供了名称:fork()和join()。
调用fork()将启动任务的异步执行,而调用join()将等待任务完成并检索其结果。
因此,我们可以将一个给定的任务分解成多个较小的任务,派生每个任务,最后等待所有任务完成。这使得复杂问题的实现更加容易。
在计算机科学中,这种方法也称为分治法。
每当一个问题太复杂而不能一次解决时,它就被分解成多个更小、更容易解决的问题。这可以用伪代码写成:
首先,我们检查问题的当前大小是否大于给定的阈值。
如果是这种情况,我们将问题分成更小的问题,fork()每个新任务,然后通过调用join()等待结果。
当join()返回每个子任务的结果时,我们必须找到较小问题的最佳解决方案,并将其作为最佳解决方案返回。
重复这些步骤,直到给定的阈值太低,并且问题太小,我们可以直接计算其解决方案,而无需进一步的除法。
为了更好地理解这个过程,我们实现了一个算法,它可以找到整数值数组中最小的数。
这个问题不是我们在日常工作中使用ForkJoinPool可以解决的问题,但是下面的实现非常清楚地展示了基本原则。
在main()方法中,我们设置一个带有随机值的整数数组,并创建一个新的ForkJoinPool。
传递给其构造函数的第一个参数是指示所需并行度的级别。在这里,我们查询运行时中可用的CPU内核的数量。
然后调用invoke()方法并传递一个GetMinNumb实例。
GetMinNumb扩展了类RecursiveTask,它本身是前面提到的ForkJoinTask的子类。
ForkJoinTask类实际上有两个子类:一个是为返回值的任务设计的(RecursiveTask),另一个是为没有返回值的任务设计的(RecursiveAction)。
超类要求我们必须实现compute()方法。在这里,我们查看整数数组的给定部分,并决定当前的问题是否太大而不能立即解决,需要分解处理。
在寻找数组中最小的数字时,需要直接解决的最小问题大小是比较两个元素并返回它们的最小值。
如果当前有两个以上的元素,则将数组分成两部分,并再次找到这两部分中最小的数。这是通过创建GetMinNumb的两个新实例来实现的。
构造函数由数组和开始和结束索引组成。然后,通过调用fork()来异步地启动这两个任务的执行。这个调用提交线程池队列中的两个任务。
线程池实现了一种称为“工作窃取”的策略,即如果所有其他线程都有足够的工作要做,则当前线程从其他任务之一窃取其工作。这确保任务尽可能快地执行。
如上所述,在RecursiveTask旁边还有RecursiveAction类。
与RecursiveTask不同的是,它不必返回值,因此它可以用于可以直接在给定数据结构上执行的异步计算。
这样一个例子是计算灰度图像的彩色图像。我们所要做的就是对图像的每个像素进行迭代,并使用以下公式从RGB值中计算出灰度值:
gray = 0.2126 * red + 0.7152 * green + 0.0722 * blue
浮点数表示特定颜色对人类对灰色感知的影响程度。由于绿色使用的是最大值,我们可以得出一个灰度图像的计算结果接近绿色部分的3/4。
因此,基本实现应该是这样的,假设image是我们表示实际像素数据的对象,setRGB()和getRGB()方法用于检索实际的RGB值:
上面的实现在单CPU机器上运行良好。但是如果我们有多个可用的CPU,我们可能希望将这些工作分配给可用的核心。
因此,不需要在两个嵌套的for循环中遍历所有像素,我们可以用ForkJoinPool并为图像的每一行(或每一列)提交一个新任务。
一旦将一行转换为灰度,当前线程就可以处理下一行。
这个原则在下面的例子中实现:
在main()方法中,我们使用Java的ImageIO类读取图像。返回的BufferedImage实例具有我们需要的所有方法。
我们可以查询行数和列数,并检索和设置每个像素的RGB值。所以我们要做的就是遍历所有行,并向我们的ForkJoinPool提交一个新的GrayscaleImageAction。后者收到了关于可用处理器的提示,作为其构造函数的参数。
现在,通过调用它们的compute()方法,ForkJoinPool可以异步启动任务。在这个方法中,我们遍历每一行并根据其灰度值更新相应的RGB值。
将所有任务提交到池后,我们在主线程中等待关闭整个池,然后使用ImageIO.write()方法将更新后的BufferedImage写回磁盘。
令人惊讶的是,如果不使用可用的处理器,我们只需要几行代码。这里我们再次通过使用java.util.concurrent包来完成实现。
ForkJoinPool为提交任务提供了三种不同的方法:
有了这些知识,我们就很清楚为什么要使用execute()方法提交上面的GrayscaleImageAction。
如果我们使用invoke(),主线程就会等待任务完成,我们就不会利用可用的并行度。
当我们仔细研究ForkJoinTask-API时,我们发现了同样的区别:
既然我们已经知道了ExecutorService和ForkJoinPool,您可能会问自己为什么应该使用ForkJoinPool而不是ExecutorService。
两者之间的差别并不大。两者都有execute()和submit()方法,并使用一些公共接口的实例,如Runnable、Callable、RecursiveAction或RecursiveTask。
为了更好地理解这种差异,让我们尝试使用ExecutorService从上面实现GetMinNumb类:
代码看起来非常相似,除了我们将任务提交给ExecutorService,然后使用返回的Future实例来等待结果之外。
这两个实现之间的主要区别可以在线程池构建的地方找到。
在上面的例子中,我们创建了一个包含64个线程的固定线程池。
为什么选择这么大的数字? 这里的原因是,为每个返回的Future调用get()会阻塞当前线程,直到结果可用为止。
如果我们只向池提供可用cpu数量的线程,那么程序将耗尽资源并无限期挂起。
ForkJoinPool实现了前面提到的工作窃取策略,即每次运行的线程都必须等待某个结果;
该线程从工作队列中删除当前任务,并执行一些准备运行的其他任务。
这样,当前线程就不会被阻塞,可以用来执行其他任务。一旦计算了最初挂起的任务的结果,任务将再次执行,join()方法将返回结果。
这是与常规ExecutorService的一个重要区别,在常规ExecutorService中,我们必须在等待结果时阻塞当前线程。