JUC
JUC 详解 CompletableFuture

Future 接口理论知识复习

Future 接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

Future.java
package java.util.concurrent;
 
/**
 * A {@code Future} represents the result of an asynchronous
 * computation.  Methods are provided to check if the computation is
 * complete, to wait for its completion, and to retrieve the result of
 * the computation.  The result can only be retrieved using method
 * {@code get} when the computation has completed, blocking if
 * necessary until it is ready.  Cancellation is performed by the
 * {@code cancel} method.  Additional methods are provided to
 * determine if the task completed normally or was cancelled. Once a
 * computation has completed, the computation cannot be cancelled.
 * If you would like to use a {@code Future} for the sake
 * of cancellability but not provide a usable result, you can
 * declare types of the form {@code Future<?>} and
 * return {@code null} as a result of the underlying task.
 *
 * <p>
 * <b>Sample Usage</b> (Note that the following classes are all
 * made-up.)
 * <pre> {@code
 * interface ArchiveSearcher { String search(String target); }
 * class App {
 *   ExecutorService executor = ...
 *   ArchiveSearcher searcher = ...
 *   void showSearch(final String target)
 *       throws InterruptedException {
 *     Future<String> future
 *       = executor.submit(new Callable<String>() {
 *         public String call() {
 *             return searcher.search(target);
 *         }});
 *     displayOtherThings(); // do other things while searching
 *     try {
 *       displayText(future.get()); // use future
 *     } catch (ExecutionException ex) { cleanup(); return; }
 *   }
 * }}</pre>
 *
 * The {@link FutureTask} class is an implementation of {@code Future} that
 * implements {@code Runnable}, and so may be executed by an {@code Executor}.
 * For example, the above construction with {@code submit} could be replaced by:
 *  <pre> {@code
 * FutureTask<String> future =
 *   new FutureTask<String>(new Callable<String>() {
 *     public String call() {
 *       return searcher.search(target);
 *   }});
 * executor.execute(future);}</pre>
 *
 * <p>Memory consistency effects: Actions taken by the asynchronous computation
 * <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
 * actions following the corresponding {@code Future.get()} in another thread.
 *
 * @see FutureTask
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface Future<V> {
 
    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when {@code cancel} is called,
     * this task should never run.  If the task has already started,
     * then the {@code mayInterruptIfRunning} parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * <p>After this method returns, subsequent calls to {@link #isDone} will
     * always return {@code true}.  Subsequent calls to {@link #isCancelled}
     * will always return {@code true} if this method returned {@code true}.
     *
     * @param mayInterruptIfRunning {@code true} if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return {@code false} if the task could not be cancelled,
     * typically because it has already completed normally;
     * {@code true} otherwise
     */
    boolean cancel(boolean mayInterruptIfRunning);
 
    /**
     * Returns {@code true} if this task was cancelled before it completed
     * normally.
     *
     * @return {@code true} if this task was cancelled before it completed
     */
    boolean isCancelled();
 
    /**
     * Returns {@code true} if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * {@code true}.
     *
     * @return {@code true} if this task completed
     */
    boolean isDone();
 
    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;
 
    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者先执行完,过会才去获取子任务的执行结果或变更的任务状态。

总结: Future 接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

Future 接口常用实现类 FutureTask 异步任务

Future 接口作用

Future 是 Java5 新加的一个接口,它提供了一种异步并行计算的功能

如果主线程需要执行一个很耗时的计算任务,我们就可以通过 future 把这个任务放到异步线程中的执行。

主线程继续处理其他任务或者先行结束,再通过 Future 获取计算结果。

  • Runnable 接口(多线程)

  • Callable 接口(有返回)

  • Future 接口(异步)和 FutureTask 实现类(异步,该类实现 RunnableFuture 接口)

目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务。

本源的 Future 接口相关架构

CompletableFutureDemo.java
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());
 
        Thread t1 = new Thread(futureTask,"t1");
        t1.start();
 
        System.out.println(futureTask.get());
    }
}
 
class MyThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("-----come in call() " );
        return "hello Callable";
    }
}

Future 编码实战和优缺点分析

优点

future 和线程池异步多线程任务配合,能显著提高程序的执行效率。

FutureThreadPoolDemo.java
public class FutureThreadPoolDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
 
        long startTime = System.currentTimeMillis();
 
        FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
            try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
            return "task1 over";
        });
        threadPool.submit(futureTask1);
 
        FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
            try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
            return "task2 over";
        });
        threadPool.submit(futureTask2);
 
        FutureTask<String> futureTask3 = new FutureTask<String>(() -> {
            try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
            return "task3 over";
        });
        threadPool.submit(futureTask3);
        System.out.println(futureTask1.get());
        System.out.println(futureTask2.get());
        System.out.println(futureTask3.get());
        
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
 
        System.out.println(Thread.currentThread().getName()+"\t -----end");
        threadPool.shutdown();
    }
}

缺点

  • get() 阻塞

    FutureAPIDemo.java
    public class FutureAPIDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
            FutureTask<String> futureTask = new FutureTask<String>( () -> {
                System.out.println(Thread.currentThread().getName()+"\t -----start");
                try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
                return "task over";
            });
            Thread t1 = new Thread(futureTask, "t1");
            t1.start();
          
          	// 位置1
          	// System.out.println(futureTask.get());
     
            System.out.println(Thread.currentThread().getName()+"\t ----end");
     
          	// 位置2
            System.out.println(futureTask.get());
          	System.out.println(futureTask.get(3,TimeUnit.SECONDS));
        }
    }

    一旦调用 get() 方法求结果,如果计算没有完成容易导致程序阻塞。

  • isDone() 轮询

    FutureAPIDemo.java
    public class FutureAPIDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
            FutureTask<String> futureTask = new FutureTask<String>( () -> {
                System.out.println(Thread.currentThread().getName()+"\t -----start");
                try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
                return "task over";
            });
            Thread t1 = new Thread(futureTask, "t1");
            t1.start();
     
            System.out.println(Thread.currentThread().getName()+"\t ----end");
     
            while(true) {
                if(futureTask.isDone()) {
                    System.out.println(futureTask.get());
                    break;
                } else {
                    //暂停毫秒
                    try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("处理中...");
                }
            }
        }
    }

    轮询的方式会耗费 CPU 资源,并且无法及时得到计算;如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。

  • 结论:Future 对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

复杂业务场景需求

对于简单的业务场景使用 Future 完成足够,但复杂业务场景还是囊总羞涩。

  • 回调通知
    • 应对 Future 的完成时间,当任务完成后返回结果
    • 通过轮询的方式去判断任务是否完成
  • 创建异步任务
    • Future 结合线程池使用
  • 多个任务前后依赖可以组合处理
    • 将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值;
    • 将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个有依赖前一个处理的结果。
  • 返回 Future 任务集合中最先处理完的任务结果。

CompletableFuture 对 Future 的改进

CompletableFuture 和 CompletionStage 源码介绍

类架构说明

image-20240120163147827

CompletableFuture.java
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

接口 CompletionStage

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(()-> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

类 CompletableFuture

  • 在 Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法;
  • 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作;
  • 实现了 Future 和 CompletionStage 接口。

CompletableFuture 使用方法

四个静态方法

  • runAsync 无返回值
    • public static CompletableFuture<Void> runAsync(Runnable runnable)
    • public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  • supplyAsync 有返回值
    • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

上述 Executor executor 参数说明:

  • 没有指定 Executor 的方法,直接使用默认的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
  • 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

代码示例

runAsync 使用默认线程池
CompletableFutureDemo.java
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.runAsync start");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.runAsync end");
        });
        System.out.println(completableFuture.get());
    }
}
runAsync 使用自定义线程池
CompletableFutureDemo.java
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
 
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
 
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.runAsync start");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.runAsync end");
        }, threadPool);
        System.out.println(completableFuture.get());
      	threadPool.shutdown();
    }
}
supplyAsync 使用默认线程池
CompletableFutureDemo.java
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
 
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync start");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync end");
            return "hello supplyAsync";
        });
        System.out.println(completableFuture.get());
    }
}
supplyAsync 使用自定义线程池
CompletableFutureDemo.java
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
 
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
 
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync start");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync end");
            return "hello supplyAsync";
        }, threadPool);
        System.out.println(completableFuture.get());
        threadPool.shutdown();
    }
}

代码示例 - 减少阻塞和轮询

从Java8开始引入了CompletableFuture,它是Future的功能增强版, 可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

CompletableFutureUseDemo.java
public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        try {
            CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync start");
                int result = ThreadLocalRandom.current().nextInt(10);
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync result: " + result);
                // 触发异常情况
                if (result > 5) {
                    int i= 10/0;
                }
                return result;
            }, threadPool).whenComplete((v,e) -> {
                if (e == null) {
                    System.out.println(Thread.currentThread().getName() + "\t Calculation complete, updated value: " + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("Exception: " + e.getCause() + "\t" + e.getMessage());
                return null;
            });
 
            System.out.println(Thread.currentThread().getName() + "\t do other things");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            threadPool.shutdown();
        }
 
        // 主线程不要立刻结束,否则 CompletableFuture 默认使用的线程池会立刻关闭:暂停3秒钟线程
        // try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
 
    }
 
    private static void futureDemo() throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync start");
            int result = ThreadLocalRandom.current().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync result" + result);
            return result;
        });
        System.out.println(Thread.currentThread().getName()+"\t do other things");
 
        System.out.println(completableFuture.get());
    }
}

CompletableFuture 优点

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。

案例 — 电商网站的比价

函数式编程

  • Runnable(无参数,无返回值)

    Runnable.java
    @FunctionalInterface
    public interface Runnable {
        /**
         * When an object implementing interface <code>Runnable</code> is used
         * to create a thread, starting the thread causes the object's
         * <code>run</code> method to be called in that separately executing
         * thread.
         * <p>
         * The general contract of the method <code>run</code> is that it may
         * take any action whatsoever.
         *
         * @see     java.lang.Thread#run()
         */
        public abstract void run();
    }
  • Function(接受一个参数,有返回值)

    Function.java
    @FunctionalInterface
    public interface Function<T, R> {
     
        /**
         * Applies this function to the given argument.
         *
         * @param t the function argument
         * @return the function result
         */
        R apply(T t);
    }
  • Consumer(接受一个参数,没有返回值)

    Consumer.java
    @FunctionalInterface
    public interface Consumer<T> {
     
        /**
         * Performs this operation on the given argument.
         *
         * @param t the input argument
         */
        void accept(T t);
    }
  • Supplier(没有参数,有一个返回值)

    Supplier.java
    @FunctionalInterface
    public interface Supplier<T> {
     
        /**
         * Gets a result.
         *
         * @return a result
         */
        T get();
    }
     
  • BiConsumer(接受两个参数,没有返回值)

    BiConsumer.java
    @FunctionalInterface
    public interface BiConsumer<T, U> {
     
        /**
         * Performs this operation on the given arguments.
         *
         * @param t the first input argument
         * @param u the second input argument
         */
        void accept(T t, U u);
    }
  • 总结

    函数式接口名称方法名称参数返回值
    Runnablerun无参数无返回值
    Functionapply1个参数有返回值
    Consumeraccept1个参数无返回值
    Supplierget没有参数有返回值
    BiConsumeraccept2个参数无返回值

代码示例

CompletableFutureMallDemo.java
/**
 *
 * 案例说明:电商比价需求,模拟如下情况:
 *
 * 1需求:
 *  1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
 *  1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
 *
 * 2输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
 * 《mysql》 in jd price is 88.05
 * 《mysql》 in dangdang price is 86.11
 * 《mysql》 in taobao price is 90.43
 *
 * 3 技术要求
 *   3.1 函数式编程
 *   3.2 链式编程
 *   3.3 Stream流式计算
 */
public class CompletableFutureMallDemo {
    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"),
            new NetMall("pdd"),
            new NetMall("tmall")
    );
 
    /**
     * step by step 一家家搜查
     * List<NetMall> ----->map------> List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPrice(List<NetMall> list,String productName) {
        //《mysql》 in taobao price is 90.43
        return list
                .stream()
                .map(netMall ->
                        String.format(productName + " in %s price is %.2f",
                                netMall.getNetMallName(),
                                netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }
 
    /**
     * List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName) {
        return list.stream().map(netMall ->
                CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
                netMall.getNetMallName(),
                netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(s -> s.join())
                .collect(Collectors.toList());
    }
 
 
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
 
        System.out.println("--------------------");
 
        long startTime2 = System.currentTimeMillis();
        List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
    }
}
 
class NetMall {
    @Getter
    private String netMallName;
 
    public NetMall(String netMallName)
    {
        this.netMallName = netMallName;
    }
 
    public double calcPrice(String productName) {
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
 
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

CompletableFuture 常用方法

获取结果和触发计算

获取结果

public T get()

会阻塞并且中断线程。

CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
            return "abc";
        });
 
        System.out.println(completableFuture.get());
    }
}
public T get(long timeout, TimeUnit unit)

超时后继续向下执行。

CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
            return "abc";
        });
        System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
    }
}
public T getNow(T valueIfAbsent)

立即获取结果不阻塞。如果计算完成,则返回计算完成后的结果;没有计算完成,则返回设置的 valueIfAbsent 值。

CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
            return "abc";
        });
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println(completableFuture.getNow("xxx"));
    }
}
public T join()

get() 方法相比,不会响应线程中断。

CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
            return "abc";
        });
 
        System.out.println(completableFuture.join());
    }
}

主动触发计算

public boolean complete(T value)

是否打断 get() 方法立即返回括号值

CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
            return "abc";
        });
        System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.get());
    }
}

对计算结果进行处理

thenApply

  • 计算结果存在依赖关系,这两个线程串行化
  • 由于存在依赖关系,当前步骤存在异常即终止。
CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
 
        //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }, threadPool).thenApply(f -> {
            System.out.println("222");
            return f + 1;
        }).thenApply(f -> {
            //int age = 10/0; // 异常情况:那步出错就停在那步。
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("value: " + v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
        System.out.println(Thread.currentThread().getName() + "\t do other things...");
        threadPool.shutdown();
    }
}

handle

有异常也可以往下一步走,根据带的异常参数可以进一步处理。

CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
 
        CompletableFuture.supplyAsync(() ->{
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1;
        },threadPool).handle((f,e) -> {
            int i = 10/0;
            System.out.println("222");
            return f + 2;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 3;
        }).whenComplete((v,e) -> {
            if (e == null) {
                System.out.println("----计算结果: "+v);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });
        System.out.println(Thread.currentThread().getName() + "\t do other things...");
        threadPool.shutdown();
    }
}

总结

方法类比
exceptionallytry/catch
whenComplete, handletry/finally

whenComplete 和 whenCompleteAsync 的区别:

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务;
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

对计算结果进行消费

thenAccept

接收任务的处理结果,并消费处理,无返回结果

CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            return 1;
        }).thenApply(f ->{
            return f + 2;
        }).thenApply(f ->{
            return f + 3;
        }).thenAccept(System.out::println);
    }
}

补充:任务之前的顺序执行

  • thenRun(Runable runnable):任务 A 执行完执行 B,并且 B 不需要 A 的结果
  • thenAccept(Consumer action):任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值
  • thenApply(Function fn):任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
CompletableFutureAPIDemo.java
public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
    }
}
 
// null
// resultA
// null
// resultAresultB

CompletableFuture 和线程池说明

thenRun 和 thenRunAsync 为例,有什么区别
CompletableFutureWithThreadPoolDemo.java
public class CompletableFutureWithThreadPoolDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
 
        try {
            CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
                System.out.println("1号任务\t" + Thread.currentThread().getName());
                return "abcd";
            }, threadPool).thenRunAsync(() -> {
                try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
                System.out.println("2号任务\t" + Thread.currentThread().getName());
            }).thenRun(() -> {
                try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
                System.out.println("3号任务\t" + Thread.currentThread().getName());
            }).thenRun(() -> {
                try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
                System.out.println("4号任务\t" + Thread.currentThread().getName());
            });
            System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
        System.out.println(Thread.currentThread().getName() + "\t do other things...");
    }
}
总结
  • 没有传入自定义线程池,都使用默认线程池 ForkJoinPool

  • 传入一个自定义线程池,如果执行第一个任务的时候,传入自定义线程池

    • 调用 thenRun 方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池;
    • 调用 thenRunAsync 执行第二个任务时,则第一个任务使用的是自定义线程池,第二个任务使用的是 ForkJoin 线程池。
  • 当处理速度太快时,系统优化切换原则,直接使用 main 线程处理。

    CompletableFutureWithThreadPoolDemo.java
    public class CompletableFutureWithThreadPoolDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService threadPool = Executors.newFixedThreadPool(3);
     
            try {
                CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
    //                try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
                    System.out.println("1号任务\t" + Thread.currentThread().getName());
                    return "abcd";
                }, threadPool).thenRun(() -> {
                    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
                    System.out.println("2号任务\t" + Thread.currentThread().getName());
                }).thenRun(() -> {
                    try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
                    System.out.println("3号任务\t" + Thread.currentThread().getName());
                }).thenRun(() -> {
                    try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
                    System.out.println("4号任务\t" + Thread.currentThread().getName());
                });
                System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                threadPool.shutdown();
            }
            System.out.println(Thread.currentThread().getName() + "\t do other things...");
        }
    }

其他如:thenAcceptthenAcceptAsyncthenApplythenApplyAsync 等,它们之间的区别同理。

源码分析
CompletableFuture.java
public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}
 
public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}
......
  
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
 
/**
 * Default executor -- ForkJoinPool.commonPool() unless it cannot
 * support parallelism.
 */
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

对计算速度进行选用

applyToEither

CompletableFutureFastDemo
public class CompletableFutureFastDemo {
    public static void main(String[] args) {
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A come in");
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return "playA";
        });
 
        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B come in");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return "playB";
        });
 
        CompletableFuture<String> result = playA.applyToEither(playB, f -> {
            return f + " is winer";
        });
 
        System.out.println(Thread.currentThread().getName()+"\t result: "+result.join());
    }
}

对计算结果进行合并

thenCombine

两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 来处理(先完成的任务需要等待其他分支任务完成)。

拆分版

CompletableFutureCombineDemo
public class CompletableFutureCombineDemo {
    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        });
 
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t ---启动");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 20;
        });
 
        CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println("-----开始两个结果合并");
            return x + y;
        });
 
        System.out.println(result.join());
 
    }
}

合并版

CompletableFutureCombineDemo2
public class CompletableFutureCombineDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
            return 20;
        }), (x,y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
            return 30;
        }),(a,b) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
            return a + b;
        });
        System.out.println("-----主线程结束,END");
        System.out.println(thenCombineResult.get());
 
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}