JAVA并发API CompletableFuture组合式异步编程

前言

这章关于如何写异步代码,线程那部分的知识都快忘光了,公司项目上也没人用上(不知道他们有没有实践过)。

刚从学校出来面试时,多线程那块背的挺熟了,基本都是先从线程创建的几种方式,当时得回答在现在看来是打肿脸充胖子

说说我现在公司的项目,我也是比较幸运,能从学校出来就能接触到赶在前沿的技术,用的微服务(SpringCloud)这个'解决方案',(能接触到这个也感谢我的经理,对我很好,打心底的感谢,无奈我这人比较凉)。
在我眼里我们的项目架构算比较大的,就我们项目而言解决了负载的问题,但功能性能上不太乐观,很多功能需要调用远程接口,API也都是同步的。

看看以后找机会能不能用上,线程那部分知识,时间也长了,最近准备温习一下。

自从干了这个行业,如同这只鸡

白俊遥博客

提供或调用异步的APIFuture

自JAVA5就引用了Future,它建模一种异步计算,返回执行运算结果的引用。
举个例子:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你什么时候你的衣服会洗好(这就是一个Future事件)。衣服干洗的同时,你可以去做其他的事情。
Future要比底层的Thread更易用,更容易控制。

看一下Future是怎么用的

public Double runCode(){
        Double aDouble1=0D;
        //创建一个线程池
        ExecutorService excutor = Executors.newCachedThreadPool();
        Future<Double> future = excutor.submit(new Callable<Double>() {
            @Override
            public Double call() throws Exception {
                Double aDouble = calcNumber();
                return aDouble;
            }
        });
        Double age = calcAge();
        System.out.println("返回的年龄:"+age);
        try {
            //最长等待时间超过1s就退出
            aDouble1 = future.get(1, TimeUnit.SECONDS);
            System.out.println("futrue返回的:"+aDouble1);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //关闭线程池
        //excutor.shutdown();
        return aDouble1;
    }

执行过程

  • 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

  • 向线程池submit提交一个任务,这里使用一个新的线程进行执行,主线程(main方法线程)可以进行其他操作。

  • get时如果新的线程已经工作完成就正常返回,否则会一直阻塞直到返回结果,这里用了get的另一个重载,超过1S就关闭,会发生超时异常

我们很难使用Future来写一个相互依赖而又不会完全等待所有任务结束才返回结果的代码。接下来CompletableFuture

CompletableFuture

//详细写法
    public Future<Double> calcPrice(String name){
        //创建Future
        CompletableFuture<Double> completablefuture = new CompletableFuture<Double>();
        //创建一个新的执行线程,lambda创建 ()->void 这是哪个函数是接口呢??
        new Thread(()->{
            try {
                Thread.sleep(1000L);
                Double doublePrice = getPrice();
                completablefuture.complete(doublePrice);
            } catch (InterruptedException e) {
                completablefuture.completeExceptionally(e);
                e.printStackTrace();
            }
        }).start();
        return completablefuture;
    }


  • 使用complete方法对getPrice方法的返回结果来进行对future设置值,来进行关联。

  • completeExceptionally方法,如果需要了解它的异常详情,必须添加这个否则异常会被归为new Thread新的线程。会kill掉这个新的线程,导致get方法永远阻塞。

    使用lambda表达式创建CompletableFuture

    毕竟是java8出的接口,一定得体现lambda的好处,现在使用表达式来进行创建,来最大化精简。


//CompletableFuture工厂方法
public Future<Double> calcPen(){
    CompletableFuture<Double> completableFuture = CompletableFuture.supplyAsync(() -> getPrice());
    return completableFuture;
}



  • 使用CompletableFuture工厂方法,来进行函数式接口创建。

  • 这里使用的是Supplier<T>函数式接口,签名白俊遥博客)->T;

为了应对非常大的任务量,需要开辟更多的线程,supplyAsync方法还有重载的版本。
使用线程池的线程来进行管控或处理你的任务。好处可以减少线程创建销毁带来的资源消耗。

《Java并发编程实战》的作者Brian Goetz 提供了非常中肯的建议,怎样创建合适的线程池



计算公式:Nthreads = NCPU * UCPU * (1 + W/C)
�7�9NCPU是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到
�7�9UCPU是期望的CPU利用率(该值应该介于0和1之间)
�7�9W/C是等待时间与计算时间的比率


创建一个固定大小的线程池

Executor executor = Executors.newFixedThreadPool(10, 400),new ThreadFactory() { 
    public Thread newThread(Runnable r) { 
        Thread t = new Thread(r); 
        t.setDaemon(true); 
        return t; 
    }
}); 
//使用supplyAsync线程池版本来管控任务
CompletableFuture.supplyAsync(()->getPrice(),executor);


我做了测试,我的shops里有200个shop,每次getShopInfo方法执行会耗时1s,每次parse执行耗时1s,使用stream计算总耗时410s,使用并行流parallel计算总耗时52.3s

我的cpu是8个核心
并行流200个任务为什么就不行呢了?原因是根据机器cpu核数,来开辟几个线程,我们200个任务他每次分8个线程执行,其余的任务等待这8个中完成,才会去执行任务。

使用CompletableFuture耗时多少呢?
处理200个任务总耗时2.28s。因为线程池最大400线程,所以可以每一个任务使用一个线程且绰绰有余,28毫秒来自thenApply的任务。

public List<Shop> runCode(){
        List<CompletableFuture<Shop>> collect = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> getShopInfo(shop),executor))
                .map(future -> future.thenApply(str -> str.toString()))
                .map(future -> future.thenCompose(str -> CompletableFuture.supplyAsync(() -> parse(str),executor)))
                .collect(Collectors.toList());
         return collect.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

以下future代表的就是CompletableFuture,

  • thenApply方法会把前一个CompletableFuture同步转换成另一个CompletableFuture

  • thenCompose方法会依赖上一个结果CompletableFuture来创建出一个全新的CompletableFuture。当然他也有另外一个版本thenComposeAsync,这个版本会从线程池中拿到一个新线程,进行操作

其他方法:


  • thenCombine(future,BiFunction)

  • thenAccept(Consumer<T>) 消费一个future,消费完返回CompletableFuture<Void>

响应CompletableFuture的completion事件

  • 一旦thenAccept对CompletableFuture计算完成后就会返回 CompletableFuture<Void>结果

看一下thenAccpet使用例子

//这里重构了findPricesStream,利用CompletableFuture的特性,直接返回future
public static Stream<CompletableFuture<String>> findPricesStream(String product){
        return shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> getPrice(product), executor))
                .map(future -> future.thenApply(Completion_apply::parse))
                .map(future -> future.thenCompose(shop -> CompletableFuture.supplyAsync(()->applyDiscount(shop),executor)));
    }

?


  • 对future使用thenAccept方法进行消费操作,操作完成后返回Stream<CompletableFuture<Void>>

  • public static void atLastRunCode(){
        long start = System.nanoTime();
         CompletableFuture[] futures = (CompletableFuture[]) findPricesStream("小米8")
                .map(future -> future.thenAccept(s -> System.out.println(" (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs")))
                .toArray(size -> new CompletableFuture[size]);
         CompletableFuture.allOf(futures).join();
         System.out.println("All shops have now responded in "
                + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    }
  • 把这个结果转换成CompletableFuture数组,数组里的CompletableFuture计算完成后都会是Void类型

  • allOf方法是等待所有的future都执行完毕进行计算,join相当于get方法,差别是它不用再捕获异常了,省去了try.catch等代码,代码不会臃肿

在一些场景下我们只需要等待其中一个完成就可以返回了,这时候可以使用anyOf方法,它会返回最快完成的那个任务(Future),这样可以让进行其他任务尽快去执行。

结尾

CompletableFuture的好处太多了,相对于Thread,它使用起来也是更易控制,用上面的例子来看120s的任务可以用不到3s的时间来完成。

今年学习了java7的Fork/Join框架和Spilterator,再加上Future和并行流,4大法器护体,写并发的代码,提供异步接口是没问题了。

基于Thread是底层,还是要看的

我的博客:维斯有条河

程序员之家
请先登录后发表评论
  • 最新评论
  • 总共0条评论