Lambda从入门到精通之三十一 CompletableFuture异步编程 并行计算

CompletableFuture异步编程的并行计算?一听到这个你或许会比较疑惑,CompletableFuture本来不就是异步并行计算吗?是的没错,但我们这里所说的并行,是指多个CompletableFuture计算结果有关联关系,并且可以同时进行的场景。

我们上一节讲解的CompletableFuture串联计算,是多个CompletableFuture计算时后一个依赖于前一个计算的结果,方法:thenApply和thenCompose。
而本节所说的CompletableFuture并行计算,则是多个CompletableFuture计算任务同时进行,等多个任务都计算结束后,然后把结果组合到一起返回,方法:thenCombine。

我们本节demo,对上一节演示thenApply和thenCompose方法的demo进行改进,我们先回顾一下上一节demo的业务逻辑。
我们要实现一个电商比价系统,从多个电商平台获取商品价格和折扣信息,依次进行了以下三个步骤的计算:
1、从远程电商获取商品价格
2、基于获取的价格创建新的VO,目的是构建符合折扣获取接口的入参对象
3、远程获取折扣,兵基于商品折扣计算最终价格

而我们本节,就进行一些小小的改造,第一步和第三步的商品价格获取和商品折扣获取其实就可以并行,各自获取到数据后,然后进行第二步的最终结果计算。
看代码:

/**
 * 使用CompletableFuture异步编程 并行计算
 * @author www.itzhimei.com
 */
public class FutureTest_6 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Runtime.getRuntime().availableProcessors());

        List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
                new FutureShop("TIANMAO"),
                new FutureShop("PDD"),
                new FutureShop("TAOBAO"),
                new FutureShop("JD2"),
                new FutureShop("TIANMAO2"),
                new FutureShop("PDD2"),
                new FutureShop("TAOBAO2"));


        Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                        new ThreadFactory() {
                            public Thread newThread(Runnable r) {
                                Thread t = new Thread(r);
                                t.setDaemon(true);
                                return t;
                            }
                        });
        long start5 = System.currentTimeMillis();

        List<CompletableFuture<String>> CFcollect6 = shops.stream()
                .map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor)
                        .thenCombine(CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscount(), executor), (price, dis) -> {
                            String[] split = price.split(":");
                            return split[0] + "获取商品价格:" + (Integer.parseInt(split[1]) * dis);
                        })
                ).collect(Collectors.toList());

        List<String> collect5 = CFcollect6.stream().map(CompletableFuture::join).collect(Collectors.toList());
        long end5 = System.currentTimeMillis();
        System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");
        System.out.println("CompletableFuture商品计算时间:" + (end5-start5));


    }

    /**
     * 从远程商店获取价格
     */
    public static class FutureShop {

        public FutureShop(String product) {
            this.product = product;
        }

        private String product;

        @SneakyThrows
        public String getProductPrice() {
            //模拟计算复杂逻辑
            Thread.sleep(1000);
            return product+"获取商品价格:"  + new Random().nextInt(1000);
        }

        public String getProduct() {
            return product;
        }

        public void setProduct(String product) {
            this.product = product;
        }
    }

    /**
     * 按照远程接口要求定义接口请求VO
     */
    public static class FutureDiscountPriceVO {

        private String shopName;
        private Integer price;

        public FutureDiscountPriceVO(String shopName, Integer price) {
            this.shopName = shopName;
            this.price = price;
        }

        public static FutureDiscountPriceVO getRemoteVO(String price) {
            String[] split = price.split(":");
            return new FutureDiscountPriceVO(split[0], Integer.parseInt(split[1]));
        }

        public Integer getPrice() {
            return price;
        }

        public void setPrice(Integer price) {
            this.price = price;
        }

        public String getShopName() {
            return shopName;
        }

        public void setShopName(String shopName) {
            this.shopName = shopName;
        }
    }


    /**
     * 远程商店折扣获取服务
     */
    public static class FutureDiscountService {

        private static double[] disArr = {0.9,0.8,0.7,0.6,0.5};

        @SneakyThrows
        public static String getDiscountPrice(FutureDiscountPriceVO vo) {
            //模拟远程请求耗时
            Thread.sleep(1000);

            int index = new Random().nextInt(4);
            Double price = vo.getPrice() * disArr[index];
            return  vo.getShopName() +"获取商品价格:"  +price.toString();
        }

        @SneakyThrows
        public static Double getDiscount() {
            //模拟远程请求耗时
            Thread.sleep(1000);

            int index = new Random().nextInt(4);
            return  disArr[index];
        }
    }

}

/* 输出
商品价格计算结果[
JD获取商品价格获取商品价格:413.4,
TIANMAO获取商品价格获取商品价格:409.2,
PDD获取商品价格获取商品价格:357.0,
TAOBAO获取商品价格获取商品价格:721.8000000000001,
JD2获取商品价格获取商品价格:588.6,
TIANMAO2获取商品价格获取商品价格:189.0,
PDD2获取商品价格获取商品价格:452.4,
TAOBAO2获取商品价格获取商品价格:287.7
]
CompletableFuture商品计算时间:2131
 */

核心代码:

List<CompletableFuture<String>> CFcollect6 = shops.stream()
                .map(x -> CompletableFuture.supplyAsync(() -> x.getProductPrice(), executor)
                        .thenCombine(CompletableFuture.supplyAsync(() -> FutureDiscountService.getDiscount(), executor), (price, dis) -> {
                            String[] split = price.split(":");
                            return split[0] + "获取商品价格:" + (Integer.parseInt(split[1]) * dis);
                        })
                ).collect(Collectors.toList());

这里使用了thenCombine方法,将价格获取和折扣获取进行了组合,组合方法是thenCombine方法的第二个参数,我们用价格*折扣=最终价格。
这种使用方式也可以叫做CompletableFuture的组合计算,实现效果就是并行计算多个。