CompletableFuture
标签:Java并发

CompletableFuture

CompletableFuture和Future一样,可以作为函数调用的契约,如果你想CompletableFuture请求一个数据,如果数据还没有准备好的话,请求线程会等待。

public class AskThread implements Runnable {
    CompletableFuture<Integer> re=null;

    public AskThread(CompletableFuture<Integer> re) {
        this.re = re;
    }

    @Override
    public void run() {
        int myRe=0;
        try {
            myRe=re.get()*re.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println(myRe);
    }

    public static void main(String[] args) throws InterruptedException {
        final CompletableFuture<Integer> future=new CompletableFuture<>();
//        启动线程
        new Thread(new AskThread(future)).start();
//        模拟主线程长时间计算
        Thread.sleep(1000);
//        告知完成结果
        future.complete(60);
    }
}

定义一个AskThread线程作为计算平方,并将其打印出来,主线程创建一个AskThread线程,但是CompletableFuture中没有数据,这个时候处于未完成状态,当主线程Sleep完后,调用complete方法将值放入到CompletableFuture,这个时候AskThread就可以继续执行。

异步执行

public class CompletableFutureSync {
    public static Integer calc(Integer para){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return para*para;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->calc(50));
        System.out.println("main");
        System.out.println(future.get());
    }
}

通过CompletableFuture.supplyAsync()构造一个CompletableFuture实例,在supplyAsync()函数中,它会在一个新的线程中,执行传入的参数。上面的是执行calc方法,该方法会可能会执行的比较慢,但是supplyAsync会立即返回,返回的结果可以作为获取结果的契约,在后面调用get方法时获得。

类似的方法有:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

Supplier:

@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

上面的supplyAsync()方法用于那些需要有返回值的场景,比如计算数据等,runAsync()方法没有返回值的场景,仅仅是简单异步执行某个动作。

上面的方法中可以看到我们可以指定一个线程池,也可以不指定线程池,如果我们没有指定线程池的话,那么就会在默认的ForkJoinPool.common线程池中执行。Java8中,通过ForkJoinPool.common()方法,我们可以获得一个公共的ForkJoin线程池,这个公共线程池中的所有线程都是Daemon线程,这意味着如果主线程退出的话,那么无论这些线程是否执行完,都将会退出,故要上面示例用get方法等待。

流式调用

public class CompletableFutureSync {
    public static Integer calc(Integer para){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return para*para;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future=CompletableFuture.supplyAsync(()->calc(50))
                .thenApply(i->Integer.toString(i))
                .thenApply(str->"\""+str+"\"")
                .thenAccept(System.out::println);
        future.get();
    }
}

上面通过lambda表达式,最后会等到结果 "2500"

异常处理

public class CompletableFutureSync {
    public static Integer calc(Integer para) {
        return para / 0;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> calc(50))
                .exceptionally(ex -> {
                    System.out.println(ex.toString());
                    return 0;
                })
                .thenApply(i -> Integer.toString(i))
                .thenApply(str -> "\"" + str + "\"")
                .thenAccept(System.out::println);
        future.get();
    }
}

通过exceptionally()处理异常,上面执行会抛出异常,并且会输出 "0"

组合多个CompletableFuture

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> calc(50))
.thenCompose(i->CompletableFuture.supplyAsync(()->calc(i)))
.thenApply(i -> Integer.toString(i))
.thenApply(str -> "\"" + str + "\"")
.thenAccept(System.out::println);
future.get();

thenCompose上面是一个CompletableFuture执行完传递给下一个CompletableFuture继续执行。

CompletableFuture<Integer> intFuture1=CompletableFuture.supplyAsync(()->calc(50));
CompletableFuture<Integer> intFuture2=CompletableFuture.supplyAsync(()->calc(20));
CompletableFuture<Void> future=intFuture1.thenCombine(intFuture2,(i,j)->i+j).thenApply(str -> "\"" + str + "\"")
    .thenAccept(System.out::println);
future.get();

thenCombine是先完成intFuture1和intFuture2的执行,然后再进行后续步骤。

更多信息:CompletableFuture 详解

  • 4 min read

CONTRIBUTORS


  • 4 min read