环境:Java8
Future是编程从JDK1.5开始有的,目的中新增新之是获取异步任务执行的结果,通常情况会结合ExecutorService及Callable一起使用。特性
单任务执行
private static class Task implements Callable<String> { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(3) ; return "success"; } }public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3,异步 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ; Future<String> future = executor.submit(new Task()) ; String result = future.get() ; System.out.println("执行结果:" + result) ;}
当执行到future.get()方法的时候会阻塞,等待3s后继续执行。
多个任务同时执行
private static class Task implements Callable<String> { private int sleep ; public Task(int sleep) { this.sleep = sleep ; } @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(this.sleep) ; return "success"; }}public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ; Future<String> future1 = executor.submit(new Task(3)) ; Future<String> future2 = executor.submit(new Task(2)) ; Future<String> future3 = executor.submit(new Task(1)) ; String result1 = future1.get() ; String result2 = future2.get() ; String result3 = future3.get() ; System.out.println("result1:" + result1 + "\t" + "result2:" + result2 + "\t" + "result3:" + result3) ;}
以上代码执行的3个任务分别用时3,2,1s。future1用时最长。
从运行的结果看到即便future2, future3执行时间短也必须等待future1执行完后才会继续,虽然你可以倒过来获取结果,但是在实际项目中的应用你应该是不能确认每个任务执行需要多长时间,谁先执行完就先获取谁。
虽然这种同步阻塞的方式在有些场景下还是很有必要的。但由于它的同步阻塞导致了当前线程不能干其它的事必须一致等待。
CompletionService是一边生产新的任务,一边处理已经完成的任务。简单地说就是CompletionService不管任务执行先后顺序,谁先执行完就处理谁。
private static class Task implements Callable<String> { private int time; private String name ; public Task(int time, String name) { this.time = time ; this.name = name ; } @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(this.time) ; return name ; } }public static void main(String[] args) throws Exception { ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ; CompletionService<String> cs = new ExecutorCompletionService<>(pool) ; cs.submit(new Task(3, "name" + 3)) ; cs.submit(new Task(1, "name" + 1)) ; cs.submit(new Task(2, "name" + 2)) ; for (int i = 0; i < 3; i++) { System.out.println(cs.take().get()) ; }}
通过执行结果发现,任务的结果获取是以谁先执行完处理谁与任务的执行先后没有关系。
CompletableFuture通过如下4个静态方法来执行异步任务
图片
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(3) ; System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); }}, executor).thenRun(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); }}) ;System.out.println("主线程:" + Thread.currentThread().getName()) ;executor.shutdown() ;
执行结果:
图片
CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3) ; System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return "1" ;}, executor).thenApply(res -> { System.out.println("获取到上一步任务执行结果:" + res) ; try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return "2" ;}).whenComplete((res, tx) -> { System.out.println("获取到结果:" + res) ; if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()) ; } executor.shutdown();}) ;System.out.println("主线程:" + Thread.currentThread().getName()) ;
执行结果:
图片
这里如果任务执行的时候发生了异常那么在whenComplete方法中的res 会为空,tx为发生异常的对象。没有异常时res有执行的机构,tx异常对象为空。
CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3) ; System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return "1" ;}, executor).thenApply(res -> { System.out.println("获取到上一步任务执行结果:" + res) ; try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 2 任务执行完成") ; System.out.println(1 / 0) ; } catch (InterruptedException e) { e.printStackTrace(); } return "2" ;}).exceptionally(tx -> { System.out.println(Thread.currentThread().getName() + ", 任务执行发生了异常") ; return "error" ;}).whenComplete((res, tx) -> { System.out.println("获取到结果:" + res) ; if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()) ; } executor.shutdown();}) ;System.out.println("主线程:" + Thread.currentThread().getName()) ;
这里我们人为的制造异常 1 / 0 。
执行结果:
图片
根据执行结果当发生异常时进入exceptionally方法,最终进入whenComplete方法此时 tx异常对象是发生异常的异常对象。
CompletableFuture.allOf
CompletableFuture<Double> calc1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", calc1任务执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return 10D ;}, executor) ; CompletableFuture<Double> calc2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(5) ; System.out.println(Thread.currentThread().getName() + ", calc2任务执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return 20D ;}, executor) ;// 当任何一个任务发生异常,这里的tx都不会为nullCompletableFuture.allOf(calc1, calc2).whenComplete((res, tx) -> { System.out.println("获取到结果:" + res + ", " + tx) ; try { System.out.println(calc1.get()) ; System.out.println(calc2.get()) ; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } executor.shutdown();}) ;
执行结果:
在这里whenComplete中的res是没有结果的,要获取数据我们的分别调用get方法获取。
CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 1 任务执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return "0" ;}, executor).handle((res, tx) -> { // 处理结果数据 return res + "1" ;}).whenComplete((res, tx) -> { System.out.println("获取到结果:" + res) ; if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()) ; } executor.shutdown();}) ;
执行结果:
正确
图片
发生异常时:
图片
当发生异常时handle方法中的res是没有值的,tx异常对象为发生异常的异常对象。
将两个异步任务完成后合并处理
CompletableFuture.thenCombine
CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return 10d ;}, executor) ;CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return 20d ;}, executor) ;task1.thenCombine(task2, (t1, t2) -> { System.out.println(Thread.currentThread().getName() + ", 合并任务完成") ; return t1 + "," + t2 ;}).whenComplete((res, tx) -> { System.out.println("获取到结果:" + res) ; if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()) ; } executor.shutdown();}) ;
执行结果:
图片
CompletableFuture.applyToEither
两个异步任务谁先执行完谁就继续执行后续的操作。
CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return 10d ;}, executor) ;CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return 20d ;}, executor) ;task1.applyToEither(task2, res -> { return res ;}).whenComplete((res, tx) -> { System.out.println("获取到结果:" + res) ; if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()) ; } executor.shutdown();}) ;
执行结果:
图片
只有两个任务都执行完成了后才会继续。
CompletableFuture.runAfterBoth
CompletableFuture<Double> task1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 任务1执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return 10d ;}, executor) ;CompletableFuture<Double> task2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2) ; System.out.println(Thread.currentThread().getName() + ", 任务2执行完成") ; } catch (InterruptedException e) { e.printStackTrace(); } return 20d ;}, executor) ;task1.runAfterBoth(task2, () -> { System.out.println("任务都执行完成了...") ;}).whenComplete((res, tx) -> { System.out.println("获取到结果:" + res) ; if (tx != null) { System.err.println("发生错误了:" + tx.getMessage()) ; } executor.shutdown();}) ;
执行结果:
图片
CompletableFuture.anyOf
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { sleep(1000) ; System.out.println("我是任务1") ; return "Task1" ;}, executor) ;CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> { sleep(3000) ; System.out.println("我是任务2") ; System.out.println(1 / 0) ; return "Task2" ;}, executor) ;// 任意一个任务执行完成就算完成// 当任务执行发生异常后,th才不会为nullCompletableFuture.anyOf(task1, task2).whenCompleteAsync((v, th) -> { System.out.println("v = " + v) ; System.out.println("th = " + th) ;}, executor) ;
执行结果:
图片
CompletableFuture.supplyAsync(() -> { sleep(2000) ; System.out.println("第一个任务执行完成...") ; // System.out.println(1 / 0) ; return new Random().nextInt(10000) ;}, executor).thenAcceptAsync(res -> { // 接收上一个任务的执行结果 System.out.println("任务执行结果:" + res) ;}, executor) ;
执行结果:
图片
责任编辑:武晓燕 来源: 实战案例锦集 Future异步JDK(责任编辑:时尚)
兴达国际(01899.HK)发布公告:预期2020年纯利同比减少50%
京东方、维信诺去年净利下降或亏损 OLED业务扭亏是改善关键