ForkJoin框架
标签:Java并发

ForkJoin框架

forkjoin框架就是将一个大任务分解成一系列小任务,fork表示分解成多个子进程,等待子进程执行完毕后,得到最后的结果,join表示等待。

如果毫无顾忌的使用fork开启子线程,将会导致系统开启大量的线程而严重影响性能,故在JDK中,是通过ForkJoinPool线程池,对于fork方法先不着急开启新的线程,而是提交该线程池。所以,实际提交的任务和线程数量并不是一对一的关系,实际上,一个物理线程实际上需要处理多个逻辑任务的。

任务图

如果线程A已经执行完了自己的任务,线程A就会帮助帮助还有很多任务要执行的线程B执行任务,线程A从队列尾部拿去任务,线程B从队列头部获取任务。

1. 相关的类

先来看一下线程池:

可以看到ForkJoinPool也是继承我的ExecutorService,我们来看ForkJoinPool的submit方法:

可以看到除了线程池常用的Callable和Runnable以外,还有一个ForkJoinTask,下面来看一下ForkJoinTask:

可见它有两个实现的抽象类:对于RecursiveAction是没有返回值的,而RecursiveTask是可以带返回值的。

2. 例子

public class CountTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        if (canCompute) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
//            分成100个小任务
            long step = (start + end) / 100;
            ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
            long pos = start;
            for (int i = 0; i < 100; i++) {
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                CountTask subTask = new CountTask(pos, lastOne);
                pos += step + 1;
                subTasks.add(subTask);
                subTask.fork();
            }
            for (CountTask t : subTasks) {
                sum += t.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask countTask = new CountTask(0, 200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(countTask);
        try {
            long res = result.get();
            System.out.println("sum=" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

当end-start小于我们指定的Threshold,就执行,否则就继续分解,最后使用一个foreach进行join和获取返回值。

3. 注意

使用ForkJoin注意,如果任务划分的很深的话,一直得不到返回的话,原因可能有:一是系统内线程数量堆积越来越多,导致性能严重下降,二是函数调用层次变得很深,导致栈溢出了。

  • 3 min read

CONTRIBUTORS


  • 3 min read