Java异步编程利器:CompletableFuture详解

一、CompletableFuture简介

‌CompletableFuture‌是Java 8中引入的一个类,它实现了Future和CompletionStage接口,用于支持异步编程和非阻塞操作。CompletableFuture提供了丰富的API,使得并发任务的处理变得简单而高效,能够轻松创建、组合和链式调用异步操作,无需关心底层线程管理,从而提升了程序的响应速度和资源利用率‌。

基本概念和特性

CompletableFuture的主要特性包括:

  • ‌异步执行‌:可以在新的线程上异步执行计算或操作,不会阻塞主线程,提高程序的响应速度。

  • ‌可组合性‌:CompletableFuture的操作可以组合成一个或多个CompletableFuture对象,构成复杂的异步计算链。

  • ‌异常处理‌:通过exceptionally()方法可以捕获计算中的异常并返回默认值。

  • ‌取消与超时‌:支持取消异步任务和设置超时时间,避免任务的无限等待。

  • ‌非阻塞式等待‌:提供了非阻塞式的等待方法,如join()和getNow()方法‌。

CompletableFuture常用于以下场景:

  • ‌并行处理‌:在处理多个耗时操作时,如I/O操作、数据库访问或网络请求,CompletableFuture可以并行执行这些任务,提高系统吞吐量和响应能力。

  • ‌流水线处理‌:在流水线处理场景中,一个任务的输出可以作为下一个任务的输入, CompletableFuture提供了多种方法来实现这种链式调用和结果聚合处理‌。

代码示例

以下是一个简单的使用CompletableFuture的示例:


public void testCreateFuture(String product) {
    // 使用supplyAsync创建异步任务并传入处理逻辑
    CompletableFuture<PriceResult> supplyAsyncResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product));
    // 使用runAsync创建异步任务但不返回结果
    CompletableFuture<Void> runAsyncResult = CompletableFuture.runAsync(() -> System.out.println(product));
}


在这个示例中,supplyAsync用于创建返回结果的异步任务,而runAsync用于创建不返回结果的异步任务‌。

二、Future回顾

CompletableFuture功能的实现基于Future接口,首先让我们来回顾一下Future吧!

Java在JDK1.5之后引入了JUC包,里面包含了一个接口:Future,对比普通的多线程编程,他可以有返回值,这算是Java中实现异步编程的开山鼻祖, 然而Future 的局限性在于它的功能相对简单,无法很好地处理复杂的异步任务链。这里就不重点介绍Future,感兴趣的小伙伴可以去了解一下,下面我们通过一个例子来简单的了解下Future:

下面的例子是两个有返回值的方法,通过Future的方式异步调用:

public class FutureStudy {

    public static String method1() throws InterruptedException {
        Thread.sleep(500);//模拟调用方法耗时
        return "方法1返回";
    }

    public static String method2() throws InterruptedException {
        Thread.sleep(1000);//模拟调用方法耗时
        return "方法2返回";
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //调用方法1
        FutureTask<String> method1 = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return method1();
            }
        });
        executorService.submit(method1);
        Thread.sleep(500); //模拟主线程其它操作耗时
        //调用方法2
        FutureTask<String> method2 = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return method2();
            }
        });
        executorService.submit(method2);
        System.out.println(method1.get());
        System.out.println(method2.get());
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
        executorService.shutdown();
    }


运行结果如下:


方法1返回
方法2返回
总共用时1525ms

通过上面的例子可以看到如果我们正常调用的话,那么用时就是500+1000+500就是2000,这里我们通过Future异步调用缩短了调用时间。

但是Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。

  • 阻塞获取结果:future.get() 方法是阻塞的,无法在不阻塞的情况下处理结果。

  • 缺乏组合功能:无法轻松地组合多个异步任务,例如任务链、并行任务等。

  • 异常处理复杂:异常处理不够简洁,需要手动捕获并处理异常。

  • Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。

三、使用示例,一个CompletableFuture的简单例子

使用上面的例子,通过CompletableFuture来实现:


    @Test
    public void CompletableFutureTest() throws  Exception{
        long startTime = System.currentTimeMillis();
        CompletableFuture<String> method1 = CompletableFuture.supplyAsync(() ->
                {
                    String s = "";
                    try {
                        s = method1();
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                    return s;
                }
                );
        Thread.sleep(500);
        CompletableFuture<String> method2 = CompletableFuture.supplyAsync(() ->
                {
                    String s = "";
                    try {
                        s = method2();
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                    return s;
                }
        );
        System.out.println(method1.get());
        System.out.println(method2.get());
        System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
    }

可以看到上面通过CompletableFuture实现了同样的效果,CompletableFuture的supplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture使用了默认线程池是ForkJoinPool.commonPool,当然,我们也可以使用自定义的线程池,使用方法也是很简单,只需要在创建线程的时候传入就行。

四、CompletableFuture使用

1、异步任务的创建

  • runAsync:runAsync 方法用于启动一个没有返回值的异步任务。该方法通常用于那些不需要返回结果的任务,例如记录日志、发送通知等。

  • supplyAsync :supplyAsync 方法用于启动一个有返回值的异步任务。该方法通常用于需要返回结果的任务,例如计算结果、获取数据等。


public void testCreateFuture(String product) {
    // supplyAsync, 执行逻辑有返回值PriceResult
    CompletableFuture<PriceResult> supplyAsyncResult =
            CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product));
    // runAsync, 执行逻辑没有返回值
    CompletableFuture<Void> runAsyncResult =
            CompletableFuture.runAsync(() -> System.out.println(product));
}


注:supplyAsync或者runAsync创建后便会立即执行,无需手动调用触发,它的get()方法是阻塞的。

2、任务异步回调

1)thenRun与thenRunAsync

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);

thenRun方法就是在CompletableFuture做完第一个任务之后,执行第二个任务使用,某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值;例子如下:


    @Test
    public void thenRunTest() throws ExecutionException, InterruptedException {
        CompletableFuture<String> test = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("执行第一个CompletableFuture任务");
                    return "第一个返回参数";
                }
        );

        CompletableFuture thenRunFuture = test.thenRun(() -> {
            System.out.println("执行第二个任务");
        });
        System.out.println(thenRunFuture.get());

    }


执行第一个CompletableFuture任务
执行第二个任务
null

两者的区别:

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是内置的ForkJoinPool.commonPool线程池。

2)thenAccept与thenAcceptAsync

thenAccept用于处理异步任务的结果,但不返回新的值,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中。适合用于打印日志、执行操作等场景。thenAccept与thenAcceptAsync两者的区别同上面的thenRun

  @Test
    public void acceptStudy() throws ExecutionException, InterruptedException {
        CompletableFuture<String> test = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("原始任务");
                    return "原始任务返回";
                }
        );
        CompletableFuture thenAccept = test.thenAccept((a) -> {
            System.out.println(a);
        });
    }


原始任务
原始任务返回

3)thenApply与thenApplyAsync

thenApply用于处理异步任务的结果,并返回新的值。适合用于转换数据、链式处理等场景。第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,与thenAccept不同的是他有返回值。

thenApply与thenApplyAsync两者的区别同上面的thenRun


    @Test
    public void acceptStudy() throws ExecutionException, InterruptedException {
        CompletableFuture<String> test = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("原始任务");
                    return "原始任务返回";
                }
        );
        CompletableFuture thenApply = test.thenApply((a) -> {
            System.out.println(a);
            return a;
        });
        System.out.println(thenApply.get());
    }


原始任务
原始任务返回
原始任务返回

4)whenComplete 与 exceptionally

  • whenComplete:类似于 Vue 中发起异步请求之后的 then 方法;无论任务是否有异常都会执行;回调方法接收两个参数:任务的结果(如果任务没有返回值则为 Void)和异常(如果没有异常则为 null);它是有返回值的,并且whenComplete方法返回的CompletableFuture的result是上个任务的结果。

  @Test
    public void whenCompleteTest() throws ExecutionException, InterruptedException {
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("原始任务");
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "原始任务";
                }
        );

        CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {
            System.out.println("回调任务");
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
        });

        System.out.println(rstFuture.get());
    }

运行结果如下:

原始任务
回调任务
上个任务执行完啦,还把原始任务传过来
原始任务

  • exceptionally: 类似于 Vue 中发起异步请求之后的 catch 方法;只有当任务发生异常时才会执行;某个任务执行异常时,执行回调方法;并且有抛出异常作为参数,传递到回调方法exceptionally中。

    CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("原始任务" );
                    return 1/0+"";
                }
        );

        CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
            e.printStackTrace();
            return "程序异常啦";
        });

        System.out.println(exceptionFuture.get());
    }


原始任务
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.ArithmeticException: / by zero
	at completableFuture.FutureStudy.lambda$exceptionStudy$8(FutureStudy.java:135)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	... 5 more
程序异常啦

Process finished with exit code 0

4)handle

handle方法表示某个任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。


    @Test
    public void handleStudy() throws ExecutionException, InterruptedException {
        CompletableFuture<String> origin = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("原始任务");
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "返回原始任务";
                }
        );

        CompletableFuture<String> rstFuture = origin.handle((a, throwable) -> {

            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            return "新任务返回";
        });

        System.out.println(rstFuture.get());
    }


原始任务
上个任务执行完啦,还把返回原始任务传过来
新任务返回

3、多个任务组合处理

1)AND关系

thenCombine/thenCombineAsync:会将两个线程的执行结果作为方法入参,传递到指定方法中,它是有返回值的。

thenAcceptBoth/thenAcceptB### 4)AnyOf othAsync: 会将两个任务的执行结果作为方法入参,传递到指定方法中,它是无返回值的。

runAfterBoth/runAfterBothAsync:不会把执行结果当做方法入参,且没有返回值。


  @Test
    public void andStudy() {
        CompletableFuture<String> first = CompletableFuture.completedFuture("这是第一个异步任务");
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletableFuture<String> future = CompletableFuture
                //第二个异步任务
                .supplyAsync(() -> "这是第二个异步任务", executor)
                // (w, s) -> System.out.println(s) 是第三个任务
                .thenCombineAsync(first, (s, w) -> {
                    System.out.println(w);
                    System.out.println(s);
                    return "两个异步任务的组合:"+w +" " +s;
                }, executor);
        System.out.println(future.join());
        executor.shutdown();
    }


这是第一个异步任务
这是第二个异步任务
两个异步任务的组合:这是第一个异步任务 这是第二个异步任务

2)OR关系

applyToEither/applyToEitherAsync:会将已经执行完成的任务,作为方法入参,传递到指定方法中,它是有返回值的。

acceptEither/acceptEitherAsync: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,它是无返回值的。

runAfterEither/runAfterEitherAsync: 不会把执行结果当做方法入参,并且它是没有返回值的。


    @Test
    public void orTest() {
        CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
            try{
                Thread.sleep(2000L);
                System.out.println("执行第一个异步任务");}
            catch (Exception e){
                return "第一个任务异常";
            }
            return "第一个异步任务";
        });
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> future = CompletableFuture
                //第二个异步任务
                .supplyAsync(() -> {
                            System.out.println("执行完第二个任务");
                            return "第二个任务";}
                        , executor)
                //第三个任务
                .runAfterEither(first,()->{
                    System.out.println(666);
                });

        executor.shutdown();
    }


执行完第二个任务
666

3)AllOf

只有当所有任务都执行完成后,才执行allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture去执行get方法,那么都会会抛出异常。


 @Test
    public void  allOfStudy() throws ExecutionException, InterruptedException {

        CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
            System.out.println("a执行完了");
        });
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
            System.out.println("b也执行完了");
        });
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{
            System.out.println("都执行完了");
        });
        allOfFuture.get();
    }


a执行完了
b也执行完了
都执行完了

4)AnyOf

只要任意一个任务执行完之后,就会执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture去执行get方法,会抛出异常。


    @Test
    public void  anyOfStudy() throws ExecutionException, InterruptedException {

        CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("a执行完了");
        });
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
            System.out.println("b执行完了");
        });
        CompletableFuture<Object> anyof = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
            System.out.println("都执行完了");
            return;
        });
        anyof.join();
    }


b执行完了
都执行完了

5)thenCompose

thenCompose会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的操作然后返回一个新的CompletableFuture。

 @Test
    public void compostStudy() {
        CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");
        //第二个异步任务
        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> "第二个任务")
                .thenComposeAsync(data -> {
                    System.out.println(data);
                    return f; //使用第一个任务作为返回
                });
        System.out.println(future.join());
    }


第二个任务
第一个任务


已有 0 条评论

    欢迎您,新朋友,感谢参与互动!