Future 接口理论知识复习
Future 接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
package java.util.concurrent;
/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
*
* <p>
* <b>Sample Usage</b> (Note that the following classes are all
* made-up.)
* <pre> {@code
* interface ArchiveSearcher { String search(String target); }
* class App {
* ExecutorService executor = ...
* ArchiveSearcher searcher = ...
* void showSearch(final String target)
* throws InterruptedException {
* Future<String> future
* = executor.submit(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* displayOtherThings(); // do other things while searching
* try {
* displayText(future.get()); // use future
* } catch (ExecutionException ex) { cleanup(); return; }
* }
* }}</pre>
*
* The {@link FutureTask} class is an implementation of {@code Future} that
* implements {@code Runnable}, and so may be executed by an {@code Executor}.
* For example, the above construction with {@code submit} could be replaced by:
* <pre> {@code
* FutureTask<String> future =
* new FutureTask<String>(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* executor.execute(future);}</pre>
*
* <p>Memory consistency effects: Actions taken by the asynchronous computation
* <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
* actions following the corresponding {@code Future.get()} in another thread.
*
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface Future<V> {
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙其他事情或者先执行完,过会才去获取子任务的执行结果或变更的任务状态。
总结: Future 接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
Future 接口常用实现类 FutureTask 异步任务
Future 接口作用
Future 是 Java5 新加的一个接口,它提供了一种异步并行计算的功能。
如果主线程需要执行一个很耗时的计算任务,我们就可以通过 future 把这个任务放到异步线程中的执行。
主线程继续处理其他任务或者先行结束,再通过 Future 获取计算结果。
-
Runnable 接口(多线程)
-
Callable 接口(有返回)
-
Future 接口(异步)和 FutureTask 实现类(异步,该类实现 RunnableFuture 接口)
目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务。
本源的 Future 接口相关架构
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyThread());
Thread t1 = new Thread(futureTask,"t1");
t1.start();
System.out.println(futureTask.get());
}
}
class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("-----come in call() " );
return "hello Callable";
}
}
Future 编码实战和优缺点分析
优点
future 和线程池异步多线程任务配合,能显著提高程序的执行效率。
public class FutureThreadPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
long startTime = System.currentTimeMillis();
FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
return "task1 over";
});
threadPool.submit(futureTask1);
FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
return "task2 over";
});
threadPool.submit(futureTask2);
FutureTask<String> futureTask3 = new FutureTask<String>(() -> {
try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }
return "task3 over";
});
threadPool.submit(futureTask3);
System.out.println(futureTask1.get());
System.out.println(futureTask2.get());
System.out.println(futureTask3.get());
long endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
System.out.println(Thread.currentThread().getName()+"\t -----end");
threadPool.shutdown();
}
}
缺点
-
get()
阻塞FutureAPIDemo.javapublic class FutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { FutureTask<String> futureTask = new FutureTask<String>( () -> { System.out.println(Thread.currentThread().getName()+"\t -----start"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return "task over"; }); Thread t1 = new Thread(futureTask, "t1"); t1.start(); // 位置1 // System.out.println(futureTask.get()); System.out.println(Thread.currentThread().getName()+"\t ----end"); // 位置2 System.out.println(futureTask.get()); System.out.println(futureTask.get(3,TimeUnit.SECONDS)); } }
一旦调用
get()
方法求结果,如果计算没有完成容易导致程序阻塞。 -
isDone()
轮询FutureAPIDemo.javapublic class FutureAPIDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { FutureTask<String> futureTask = new FutureTask<String>( () -> { System.out.println(Thread.currentThread().getName()+"\t -----start"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return "task over"; }); Thread t1 = new Thread(futureTask, "t1"); t1.start(); System.out.println(Thread.currentThread().getName()+"\t ----end"); while(true) { if(futureTask.isDone()) { System.out.println(futureTask.get()); break; } else { //暂停毫秒 try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("处理中..."); } } } }
轮询的方式会耗费 CPU 资源,并且无法及时得到计算;如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。
-
结论:
Future
对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
复杂业务场景需求
对于简单的业务场景使用 Future 完成足够,但复杂业务场景还是囊总羞涩。
- 回调通知
- 应对 Future 的完成时间,当任务完成后返回结果
- 通过轮询的方式去判断任务是否完成
- 创建异步任务
- Future 结合线程池使用
- 多个任务前后依赖可以组合处理
- 将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值;
- 将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个有依赖前一个处理的结果。
- 返回 Future 任务集合中最先处理完的任务结果。
CompletableFuture 对 Future 的改进
CompletableFuture 和 CompletionStage 源码介绍
类架构说明
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
接口 CompletionStage
- CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
- 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(()-> System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
类 CompletableFuture
- 在 Java8 中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法;
- 它可能代表一个明确完成的 Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作;
- 实现了 Future 和 CompletionStage 接口。
CompletableFuture 使用方法
四个静态方法
- runAsync 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
- supplyAsync 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
上述 Executor executor
参数说明:
- 没有指定 Executor 的方法,直接使用默认的
ForkJoinPool.commonPool()
作为它的线程池执行异步代码。 - 如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码
代码示例
runAsync 使用默认线程池
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t completableFuture.runAsync start");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t completableFuture.runAsync end");
});
System.out.println(completableFuture.get());
}
}
runAsync 使用自定义线程池
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t completableFuture.runAsync start");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t completableFuture.runAsync end");
}, threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
}
}
supplyAsync 使用默认线程池
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync start");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync end");
return "hello supplyAsync";
});
System.out.println(completableFuture.get());
}
}
supplyAsync 使用自定义线程池
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync start");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync end");
return "hello supplyAsync";
}, threadPool);
System.out.println(completableFuture.get());
threadPool.shutdown();
}
}
代码示例 - 减少阻塞和轮询
从Java8开始引入了CompletableFuture,它是Future的功能增强版, 可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
public class CompletableFutureUseDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync start");
int result = ThreadLocalRandom.current().nextInt(10);
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync result: " + result);
// 触发异常情况
if (result > 5) {
int i= 10/0;
}
return result;
}, threadPool).whenComplete((v,e) -> {
if (e == null) {
System.out.println(Thread.currentThread().getName() + "\t Calculation complete, updated value: " + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println("Exception: " + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "\t do other things");
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
// 主线程不要立刻结束,否则 CompletableFuture 默认使用的线程池会立刻关闭:暂停3秒钟线程
// try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
}
private static void futureDemo() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync start");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t completableFuture.supplyAsync result" + result);
return result;
});
System.out.println(Thread.currentThread().getName()+"\t do other things");
System.out.println(completableFuture.get());
}
}
CompletableFuture 优点
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。
案例 — 电商网站的比价
函数式编程
-
Runnable(无参数,无返回值)
Runnable.java@FunctionalInterface public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }
-
Function(接受一个参数,有返回值)
Function.java@FunctionalInterface public interface Function<T, R> { /** * Applies this function to the given argument. * * @param t the function argument * @return the function result */ R apply(T t); }
-
Consumer(接受一个参数,没有返回值)
Consumer.java@FunctionalInterface public interface Consumer<T> { /** * Performs this operation on the given argument. * * @param t the input argument */ void accept(T t); }
-
Supplier(没有参数,有一个返回值)
Supplier.java@FunctionalInterface public interface Supplier<T> { /** * Gets a result. * * @return a result */ T get(); }
-
BiConsumer(接受两个参数,没有返回值)
BiConsumer.java@FunctionalInterface public interface BiConsumer<T, U> { /** * Performs this operation on the given arguments. * * @param t the first input argument * @param u the second input argument */ void accept(T t, U u); }
-
总结
函数式接口名称 方法名称 参数 返回值 Runnable run 无参数 无返回值 Function apply 1个参数 有返回值 Consumer accept 1个参数 无返回值 Supplier get 没有参数 有返回值 BiConsumer accept 2个参数 无返回值
代码示例
/**
*
* 案例说明:电商比价需求,模拟如下情况:
*
* 1需求:
* 1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
* 1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
*
* 2输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
* 《mysql》 in jd price is 88.05
* 《mysql》 in dangdang price is 86.11
* 《mysql》 in taobao price is 90.43
*
* 3 技术要求
* 3.1 函数式编程
* 3.2 链式编程
* 3.3 Stream流式计算
*/
public class CompletableFutureMallDemo {
static List<NetMall> list = Arrays.asList(
new NetMall("jd"),
new NetMall("dangdang"),
new NetMall("taobao"),
new NetMall("pdd"),
new NetMall("tmall")
);
/**
* step by step 一家家搜查
* List<NetMall> ----->map------> List<String>
* @param list
* @param productName
* @return
*/
public static List<String> getPrice(List<NetMall> list,String productName) {
//《mysql》 in taobao price is 90.43
return list
.stream()
.map(netMall ->
String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
/**
* List<NetMall> ----->List<CompletableFuture<String>>------> List<String>
* @param list
* @param productName
* @return
*/
public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName) {
return list.stream().map(netMall ->
CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f",
netMall.getNetMallName(),
netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join())
.collect(Collectors.toList());
}
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
List<String> list1 = getPrice(list, "mysql");
for (String element : list1) {
System.out.println(element);
}
long endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");
System.out.println("--------------------");
long startTime2 = System.currentTimeMillis();
List<String> list2 = getPriceByCompletableFuture(list, "mysql");
for (String element : list2) {
System.out.println(element);
}
long endTime2 = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
}
}
class NetMall {
@Getter
private String netMallName;
public NetMall(String netMallName)
{
this.netMallName = netMallName;
}
public double calcPrice(String productName) {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
CompletableFuture 常用方法
获取结果和触发计算
获取结果
public T get()
会阻塞并且中断线程。
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
return "abc";
});
System.out.println(completableFuture.get());
}
}
public T get(long timeout, TimeUnit unit)
超时后继续向下执行。
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
return "abc";
});
System.out.println(completableFuture.get(2L,TimeUnit.SECONDS));
}
}
public T getNow(T valueIfAbsent)
立即获取结果不阻塞。如果计算完成,则返回计算完成后的结果;没有计算完成,则返回设置的 valueIfAbsent
值。
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
return "abc";
});
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(completableFuture.getNow("xxx"));
}
}
public T join()
和 get()
方法相比,不会响应线程中断。
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
return "abc";
});
System.out.println(completableFuture.join());
}
}
主动触发计算
public boolean complete(T value)
是否打断 get()
方法立即返回括号值
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}
return "abc";
});
System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.get());
}
}
对计算结果进行处理
thenApply
- 计算结果存在依赖关系,这两个线程串行化。
- 由于存在依赖关系,当前步骤存在异常即终止。
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
CompletableFuture.supplyAsync(() -> {
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1024;
}, threadPool).thenApply(f -> {
System.out.println("222");
return f + 1;
}).thenApply(f -> {
//int age = 10/0; // 异常情况:那步出错就停在那步。
System.out.println("333");
return f + 1;
}).whenCompleteAsync((v,e) -> {
System.out.println("value: " + v);
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
System.out.println(Thread.currentThread().getName() + "\t do other things...");
threadPool.shutdown();
}
}
handle
有异常也可以往下一步走,根据带的异常参数可以进一步处理。
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() ->{
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("111");
return 1;
},threadPool).handle((f,e) -> {
int i = 10/0;
System.out.println("222");
return f + 2;
}).handle((f,e) -> {
System.out.println("333");
return f + 3;
}).whenComplete((v,e) -> {
if (e == null) {
System.out.println("----计算结果: "+v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "\t do other things...");
threadPool.shutdown();
}
}
总结
方法 | 类比 |
---|---|
exceptionally | try/catch |
whenComplete, handle | try/finally |
whenComplete 和 whenCompleteAsync 的区别:
- whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务;
- whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
对计算结果进行消费
thenAccept
接收任务的处理结果,并消费处理,无返回结果
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f ->{
return f + 2;
}).thenApply(f ->{
return f + 3;
}).thenAccept(System.out::println);
}
}
补充:任务之前的顺序执行
- thenRun(Runable runnable):任务 A 执行完执行 B,并且 B 不需要 A 的结果
- thenAccept(Consumer action):任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值
- thenApply(Function fn):任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join());
}
}
// null
// resultA
// null
// resultAresultB
CompletableFuture 和线程池说明
thenRun 和 thenRunAsync 为例,有什么区别
public class CompletableFutureWithThreadPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("1号任务\t" + Thread.currentThread().getName());
return "abcd";
}, threadPool).thenRunAsync(() -> {
try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("2号任务\t" + Thread.currentThread().getName());
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("3号任务\t" + Thread.currentThread().getName());
}).thenRun(() -> {
try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println("4号任务\t" + Thread.currentThread().getName());
});
System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
System.out.println(Thread.currentThread().getName() + "\t do other things...");
}
}
总结
-
没有传入自定义线程池,都使用默认线程池
ForkJoinPool
。 -
传入一个自定义线程池,如果执行第一个任务的时候,传入自定义线程池;
- 调用
thenRun
方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池; - 调用
thenRunAsync
执行第二个任务时,则第一个任务使用的是自定义线程池,第二个任务使用的是ForkJoin
线程池。
- 调用
-
当处理速度太快时,系统优化切换原则,直接使用
main
线程处理。CompletableFutureWithThreadPoolDemo.javapublic class CompletableFutureWithThreadPoolDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3); try { CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> { // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("1号任务\t" + Thread.currentThread().getName()); return "abcd"; }, threadPool).thenRun(() -> { try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("2号任务\t" + Thread.currentThread().getName()); }).thenRun(() -> { try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("3号任务\t" + Thread.currentThread().getName()); }).thenRun(() -> { try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} System.out.println("4号任务\t" + Thread.currentThread().getName()); }); System.out.println(completableFuture.get(2L, TimeUnit.SECONDS)); } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } System.out.println(Thread.currentThread().getName() + "\t do other things..."); } }
其他如:thenAccept
和 thenAcceptAsync
,thenApply
和 thenApplyAsync
等,它们之间的区别同理。
源码分析
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
......
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
对计算速度进行选用
applyToEither
public class CompletableFutureFastDemo {
public static void main(String[] args) {
CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
System.out.println("A come in");
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
return "playA";
});
CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
System.out.println("B come in");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return "playB";
});
CompletableFuture<String> result = playA.applyToEither(playB, f -> {
return f + " is winer";
});
System.out.println(Thread.currentThread().getName()+"\t result: "+result.join());
}
}
对计算结果进行合并
thenCombine
两个 CompletionStage
任务都完成后,最终能把两个任务的结果一起交给 thenCombine
来处理(先完成的任务需要等待其他分支任务完成)。
拆分版
public class CompletableFutureCombineDemo {
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t ---启动");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t ---启动");
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
System.out.println("-----开始两个结果合并");
return x + y;
});
System.out.println(result.join());
}
}
合并版
public class CompletableFutureCombineDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
return 20;
}), (x,y) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
return x + y;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
return 30;
}),(a,b) -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
return a + b;
});
System.out.println("-----主线程结束,END");
System.out.println(thenCombineResult.get());
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
}
}