Lambda从入门到精通之三十三 CompletableFuture异步编程性能分析和提升

我们已经初步了解了CompletableFuture的功能,以及在使用CompletableFuture时如何处理异常,我们前面的章节除了提到CompletableFuture可以实现多个异步计算的关联计算,我们本节来看看CompletableFuture的性能。

demo场景:双十一马上要到了,我们要买某个商品,买这个商品前,为了买到最优惠的商品,我们货比多家,我们去多个电商平台查看这个商品的价格,找到最便宜的那一个。
看代码:

/**
 * 使用CompletableFuture异步编程 性能
 * @author www.itzhimei.com
 */
public class FutureTest_4 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Runtime.getRuntime().availableProcessors());

        List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
                new FutureShop("TIANMAO"),
                new FutureShop("PDD"),
                new FutureShop("TAOBAO"));

        long start = System.currentTimeMillis();
        List<String> collect = shops.stream().map(x -> x.getProductPrice()).collect(Collectors.toList());
        long end = System.currentTimeMillis();
        System.out.println("商品价格计算结果[" + collect.stream().collect(Collectors.joining(",")) + "]");
        System.out.println("Stream商品计算时间:" + (end-start));

		
        long start2 = System.currentTimeMillis();
        List<String> collect2 = shops.stream().parallel().map(x -> x.getProductPrice()).collect(Collectors.toList());
        long end2 = System.currentTimeMillis();
        System.out.println("商品价格计算结果[" + collect2.stream().collect(Collectors.joining(",")) + "]");
        System.out.println("Stream parallel商品计算时间:" + (end2-start2));

		
        long start3 = System.currentTimeMillis();
        List<CompletableFuture<String>> CFcollect3 = shops.stream().map(x -> CompletableFuture.supplyAsync(
                () -> x.getProductPrice()
                )
        ).collect(Collectors.toList());
        List<String> collect3 = CFcollect3.stream().map(CompletableFuture::join).collect(Collectors.toList());
        long end3 = System.currentTimeMillis();
        System.out.println("商品价格计算结果[" + collect3.stream().collect(Collectors.joining(",")) + "]");
        System.out.println("CompletableFuture商品计算时间:" + (end3-start3));

		
        long start4 = System.currentTimeMillis();
        List<String> collect4 = shops.stream().map(x -> CompletableFuture.supplyAsync(
                () -> x.getProductPrice()
                )
        ).map(CompletableFuture::join).collect(Collectors.toList());
        long end4 = System.currentTimeMillis();
        System.out.println("商品价格计算结果[" + collect4.stream().collect(Collectors.joining(",")) + "]");
        System.out.println("CompletableFuture串行商品计算时间:" + (end4-start4));
    }

    public static class FutureShop {

        public FutureShop(String product) {
            this.product = product;
        }

        private String product;

        @SneakyThrows
        public String getProductPrice() {
            //模拟计算复杂逻辑
            Thread.sleep(1000);
            return product+"获取商品价格:"  + new Random().nextInt(1000);
        }

        public String getProduct() {
            return product;
        }

        public void setProduct(String product) {
            this.product = product;
        }
    }


}
/* 输出
商品价格计算结果[JD获取商品价格:333,TIANMAO获取商品价格:98,PDD获取商品价格:898,TAOBAO获取商品价格:789]
Stream商品计算时间:4085

商品价格计算结果[JD获取商品价格:605,TIANMAO获取商品价格:978,PDD获取商品价格:967,TAOBAO获取商品价格:952]
Stream parallel商品计算时间:1236

商品价格计算结果[JD获取商品价格:368,TIANMAO获取商品价格:586,PDD获取商品价格:809,TAOBAO获取商品价格:730]
CompletableFuture商品计算时间:2013

商品价格计算结果[JD获取商品价格:318,TIANMAO获取商品价格:939,PDD获取商品价格:403,TAOBAO获取商品价格:144]
CompletableFuture串行商品计算时间:4003

 */

代码中对比了4种写法:
第一种是stream,用时:4085
第二种是stream.parallel,也就是stream的并发模式,用时:1236
第三种是CompletableFutureb并发模式:2013
第四种是CompletableFuture的串行模式,用时:4003

首先要分析的是:为什么第三和第四的时间不一样,第三种模式是分成两个lambda表达式来计算的,第四种是一个lambda来计算的。
因为lambda的计算是流式执行,也就是第四种模式执行的时候是每一个数据都从前到后执行完,才执行下一个数据,也就是shops.stream().map-> CompletableFuture.supplyAsync->map(CompletableFuture::join),一个执行完,再执行下一个,所以就没有实现并发计算。
而第三种方式是几条数据并行获取价格,获取一个新的结果,然后新结果再计算。

其次要分析的是:为什么CompletableFuture不如Stream parallel性能好?
这个问题需要修改一下代码,我们把对比商城平台从4个增加到8个,代码:

List<FutureShop> shops = Arrays.asList(new FutureShop("JD"),
                new FutureShop("TIANMAO"),
                new FutureShop("PDD"),
                new FutureShop("TAOBAO"),
                new FutureShop("JD2"),
                new FutureShop("TIANMAO2"),
                new FutureShop("PDD2"),
                new FutureShop("TAOBAO2"));
				
/* 输出

商品价格计算结果[JD获取商品价格:742,TIANMAO获取商品价格:191,PDD获取商品价格:833,TAOBAO获取商品价格:347,JD2获取商品价格:610,TIANMAO2获取商品价格:952,PDD2获取商品价格:542,TAOBAO2获取商品价格:281]
Stream商品计算时间:8064

商品价格计算结果[JD获取商品价格:66,TIANMAO获取商品价格:97,PDD获取商品价格:77,TAOBAO获取商品价格:881,JD2获取商品价格:74,TIANMAO2获取商品价格:154,PDD2获取商品价格:594,TAOBAO2获取商品价格:160]
Stream parallel商品计算时间:2227

商品价格计算结果[JD获取商品价格:20,TIANMAO获取商品价格:948,PDD获取商品价格:308,TAOBAO获取商品价格:246,JD2获取商品价格:958,TIANMAO2获取商品价格:192,PDD2获取商品价格:194,TAOBAO2获取商品价格:307]
CompletableFuture商品计算时间:3015

商品价格计算结果[JD获取商品价格:755,TIANMAO获取商品价格:868,PDD获取商品价格:437,TAOBAO获取商品价格:29,JD2获取商品价格:612,TIANMAO2获取商品价格:576,PDD2获取商品价格:395,TAOBAO2获取商品价格:902]
CompletableFuture串行商品计算时间:8002

 */			

我们可以看到Stream parallel还是比CompletableFuture快一点,你多执行几次代码,二者的时间或许几乎相等,之所以是这样的结果,因为二者其实都是相同的原理,内部都是使用线程池来进行并发计算,并且使用的线程池的线程数量默认也是一样的,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值,其实就是你CPU的核数。
既然这样,那我们为什么还要使用CompletableFuture呢?因为CompletableFuture更能强大,因为CompletableFuture支持自定义线程池。

我们看源码supplyAsync方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
	if (supplier == null) throw new NullPointerException();
	CompletableFuture<U> f = new CompletableFuture<U>();
	execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
	return f;
}

/**
 * Returns a new CompletableFuture that is asynchronously completed
 * by a task running in the given executor with the value obtained
 * by calling the given Supplier.
 *
 * @param supplier a function returning the value to be used
 * to complete the returned CompletableFuture
 * @param executor the executor to use for asynchronous execution
 * @param <U> the function's return type
 * @return the new CompletableFuture
 */
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
												   Executor executor) {
	if (executor == null || supplier == null)
		throw new NullPointerException();
	CompletableFuture<U> f = new CompletableFuture<U>();
	execAsync(executor, new AsyncSupply<U>(supplier, f));
	return f;
}

那么对上面的代码改造一下:

Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                        new ThreadFactory() {
                            public Thread newThread(Runnable r) {
                                Thread t = new Thread(r);
                                t.setDaemon(true);
                                return t;
                            }
                        });
        long start5 = System.currentTimeMillis();
        List<CompletableFuture<String>> CFcollect5 = shops.stream().map(x -> CompletableFuture.supplyAsync(
                () -> x.getProductPrice(), executor
                )
        ).collect(Collectors.toList());
        List<String> collect5 = CFcollect5.stream().map(CompletableFuture::join).collect(Collectors.toList());
        long end5 = System.currentTimeMillis();
        System.out.println("商品价格计算结果[" + collect5.stream().collect(Collectors.joining(",")) + "]");
        System.out.println("CompletableFuture商品计算时间:" + (end5-start5));

/* 输出

商品价格计算结果[JD获取商品价格:979,TIANMAO获取商品价格:951,PDD获取商品价格:432,TAOBAO获取商品价格:705,JD2获取商品价格:996,TIANMAO2获取商品价格:926,PDD2获取商品价格:734,TAOBAO2获取商品价格:303]
CompletableFuture商品计算时间:1006

*/

从结果看出性能提升明显,这就是CompletableFuture的又一个强大特点,能够自定义线程池来并发计算。

那么并行计算我们是选择CompletableFuture还是Stream parallel呢?
如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream,如果除了计算还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture会更好。