远程调用优化之多线程

发布时间 2023-09-15 12:17:11作者: 摆烂ing

1. 通过feign进行远程调用是一种同步调用,只有当一个远程调用执行完毕以后,才会进行下一个远程调用,效率较低。
2. 可以考虑业务的执行逻辑,如果各个远程调用之间互不影响的话,可以考虑使用多线程来进行优化,提高效率。

1. 配置线程池

1.1 在公共的微服务中编写ThreadPoolConfiguration配置类,并自定义线程工厂

默认线程工厂的弊端:不利于对线程池输出的日志进行分析,无法确定日志是哪个微服务产生的。
使用自定义线程工厂,可以控制每一个线程的名称。

@Configuration
@EnableConfigurationProperties(value = ThreadPoolProperties.class)
public class ThreadPoolConfiguration {

    @Autowired
    private ThreadPoolProperties threadPoolProperties;

    @Value("${spring.application.name}")
    private String applicationName;

    /**
     * 配置一个线程池
     * int corePoolSize:核心线程数
     * int maximumPoolSize:最大线程数
     * long keepAliveTime:临时线程最大空闲时间
     * TimeUnit unit:时间单位
     * BlockingQueue<Runnable> workQueue:任务队列
     * ThreadFactory threadFactory:线程工厂
     * RejectedExecutionHandler handler:任务的拒绝策略
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                threadPoolProperties.getCorePoolSize(),
                threadPoolProperties.getMaximumPoolSize(),
                threadPoolProperties.getKeepAliveTime(),
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(threadPoolProperties.getWorkQueueSize()),
                new ThreadFactory() {
                    int num = 1;
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setName("thread-【" + applicationName + "】-" + num++);
                        return thread;
                    }
                },
                new ThreadPoolExecutor.AbortPolicy());
        return threadPoolExecutor;
    }
}

1.2 在公共微服务中编写ThreadPoolProperties实体类进行线程池参数配置

@Data
@ConfigurationProperties(prefix = "app.threadpool")
public class ThreadPoolProperties {
    private int corePoolSize;
    private int maximumPoolSize;
    private long keepAliveTime;
    private int workQueueSize;
}

1.3 使用@Import注解对线程池配置类进行封装,要使用线程池的微服务只需要在启动类上添加@EnableThreadPool注解即可

@Target(value = ElementType.TYPE)
@Retention(value = RetentionPolicy.RUNTIME)
@Import(value = ThreadPoolConfiguration.class)
public @interface EnableThreadPool {
}

1.4 在要使用线程池的微服务中的application.yml中配置线程池的参数信息

# 线程池参数配置
app:
  threadpool:
    corePoolSize: 5
    maximumPoolSize: 10
    keepAliveTime: 2
    workQueueSize: 60

2. 使用线程池对远程调用进行改造,每一次远程调用就向线程池中提交一个任务,配合CountDownLatch进行使用

代码示例:

// 等待其它四个线程执行完毕后,再执行当前线程
CountDownLatch countDownLatch = new CountDownLatch(4);

//远程调用product微服务的接口查询三级分类的数据
threadPoolExecutor.submit(() -> {
	Result<CategoryView> categoryViewResult = skuDetailFeignClient.findCategoryBySkuId(skuId);
	skuDetailVo.setCategoryView(categoryViewResult.getData());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询三级分类的数据");
	countDownLatch.countDown();  // 让CountDownLatch中的计数器减1
});

//远程调用product微服务的接口查询价格数据
threadPoolExecutor.submit(() -> {
	Result<SkuInfo> infoResult = skuDetailFeignClient.findPriceBySkuId(skuId);
	skuDetailVo.setPrice(infoResult.getData().getPrice());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询价格数据");
	countDownLatch.countDown();
});

//远程调用product微服务的接口查询spu的销售属性和销售属性值
threadPoolExecutor.submit(() -> {
	Result<List<SpuSaleAttr>> spuSaleAttrListResult = skuDetailFeignClient.findSpuSaleAttrAndValueBySkuId(skuId);
	skuDetailVo.setSpuSaleAttrList(spuSaleAttrListResult.getData());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询spu的销售属性和销售属性值");
	countDownLatch.countDown();
});

//远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合
threadPoolExecutor.submit(() -> {
	Result<List<AttrValueConcatVo>> brotherSkuSaleAttrValueConcatResult = skuDetailFeignClient.findBrotherSkuSaleAttrValueConcatBySkuId(skuId);
	List<AttrValueConcatVo> attrValueConcatVoList = brotherSkuSaleAttrValueConcatResult.getData();
	//Collectors.toMap将流中的元素转换成Map,方法的第一个参数用来构建map的键,方法的第二个参数用来构建map的值
	Map<String, Long> map = attrValueConcatVoList.stream().collect(Collectors.toMap(attrValueConcatVo -> attrValueConcatVo.getAttrValueConcat(), attrValueConcatVo -> attrValueConcatVo.getSkuId()));
	String valuesSkuJson = JSON.toJSONString(map);
	skuDetailVo.setValuesSkuJson(valuesSkuJson);
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合");
	countDownLatch.countDown();
});

try {
	countDownLatch.await();   // 让执行当前方法的线程阻塞,等其它线程执行完毕以后再执行当前线程
} catch (InterruptedException e) {
	e.printStackTrace();
}

3. 线程池的弊端:无法直接对多个任务进行链式、组合处理。解决方案:使用juc中的CompletableFuture实现对任务编排的能力,可以轻松组织不同任务的运行顺序、规则以及方式。

3.1 异步执行任务

3.1.1 无返回值的方法:

runAsync(runnable):  CompletableFuture<Void> 以异步方式启动一个任务并在默认的线程池(ForkJoinPool)执行
runAsync(runnable,executor):CompletableFuture<Void>    以异步方式启动一个任务并在指定的线程池(executor)执行

3.1.2 有返回值的方法

supplyAsync(supplier): CompletableFuture<U>   以异步方式启动一个任务并在默认的线程池(ForkJoinPool)执行。
supplyAsync(supplier,executor):CompletableFuture<U>  以异步方式启动一个任务并在指定的线程池(executor)执行。

代码演示:

public static void supplyAsyncTest02() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10;
    },service);
    Integer count = supplyAsync.get();
    System.out.println(count);
}

public static void supplyAsyncTest01() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10;
    });
    Integer count = supplyAsync.get();  // 获取异步线程执行结果
    System.out.println(count);
}

3.2 whenComplete方法

在任务执行完毕以后(不论是正常执行完毕还是出现异常)执行某一个操作。

  1. 正常完成:whenComplete返回结果和上级任务一致,异常为null
  2. 出现异常:whenComplete返回结果为null,异常为上级任务的异常
    相关方法:
whenComplete(action) 		使用当前线程执行一个动作,不开启额外的线程
whenCompleteAsync(action)   在默认的线程池中开启一个线程执行该动作
whenCompleteAsync(action, executor) 在指定的线程池中开启一个线程执行该动作

注:上一次任务执行完毕以后产生了异常,此时再调用get方法获取结果就会抛出异常

public static void whenCompleteTest01() throws ExecutionException, InterruptedException {
    /**
         * result参数表示的是上一次任务执行完成以后的结果
         * e:表示的是异常对象
         */
    CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10;
    },service).whenComplete((result , e) -> {  // 使用main线程执行当前任务
        if(e == null) {
            System.out.println(Thread.currentThread() + "上一次任务正常执行完成了,任务的返回结果为:" + result);
        }else {
            System.out.println(Thread.currentThread() + "上一次任务执行时产生了异常,任务的返回结果为:" + result);
        }
    });

    Integer integer = supplyAsync.get(); // 获取异步任务的结果,如果whenComplete上一次执行的产生异常了,那么在调用该方法的时候就会报错
    System.out.println(integer);
}

public static void whenCompleteTest02() throws ExecutionException, InterruptedException {
    /**
         * result参数表示的是上一次任务执行完成以后的结果
         * e:表示的是异常对象
         */
    CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10 / 0 ;
    },service).whenComplete((result , e) -> {
        if(e == null) {
            System.out.println(Thread.currentThread() + "上一次任务正常执行完成了,任务的返回结果为:" + result);
        }else {
            System.out.println(Thread.currentThread() + "上一次任务执行时产生了异常,任务的返回结果为:" + result);
        }
    }).exceptionally((e) -> {   // 配合exceptionally方法可以在产生异常以后返回一个默认值。
        System.out.println(e);
        return 20 ;
    });

    Integer integer = supplyAsync.get();
    System.out.println(integer);
}

3.3 thenRun方法

相关方法:

// 无法获取到上一次任务的执行结果
thenRun(runnable):                 接下来跑一个任务,以当前线程作为跑任务的线程,不开额外的异步线程
thenRunAsync(runnable):            接下来跑一个任务,用默认线程池新开一个异步线程
thenRunAsync(runnable,executor):   接下来跑一个任务,用指定线程池新开一个异步线程

和whenComplete区别:thenRun无法获取到上一个任务产生的异常。当上一个任务执行完毕以后产生了异常,那么该任务无法执行。

3.4 thenAccept方法

相关方法:

thenAccept(consumer):              接下来跑一个任务,接受到上次的结果,以当前线程作为跑任务的线程,不开额外的异步线程
thenAcceptAsync(consumer):         接下来跑一个任务,接受到上次的结果,用默认线程池新开一个异步线程
thenAcceptAsync(consumer,executor) 接下来跑一个任务,接受到上次的结果,用指定线程池新开一个异步线程

代码示例:

public static void thenAcceptTest01() {
    CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10 ;
    },service).thenAccept((result) -> System.out.println(result * 10));
}

3.5 thenApply方法

相关方法:

thenApply(function) 接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,以当前线程作为跑任务的线程,不开额外的异步线程
thenApplyAsync(function) 接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,用默认线程池新开一个异步线程
thenApplyAsync(function, executor)接下来跑一个任务,接受到上次的结果,并且返回一个新的结果,用指定线程池新开一个异步线程

代码演示:

public static void thenApplyAsyncTest01() throws ExecutionException, InterruptedException {
    Integer count = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread());
        return 10;
    }, service).thenApplyAsync((result) -> {
        System.out.println(Thread.currentThread() + "---" + result * 10);
        return result * 2;
    }).get();  // 获取最终的执行结果
    System.out.println(count);
}

3.6 组合多任务

CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)   当所有的任务执行完毕以后,线程再向下进行执行
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 当任意一个任务执行完毕以后,线程再向下进行执行
CompletableFuture<Void> runAfterBoth(other,action) 当两个任务执行完毕以后在执行一个新的任务

3.7 代码演示

CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
	//远程调用product微服务的接口查询三级分类的数据
	Result<CategoryView> categoryViewResult = skuDetailFeignClient.findCategoryBySkuId(skuId);
	skuDetailVo.setCategoryView(categoryViewResult.getData());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询三级分类的数据");
}, threadPoolExecutor);

CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
	//远程调用product微服务的接口查询价格数据
	Result<SkuInfo> infoResult = skuDetailFeignClient.findPriceBySkuId(skuId);
	skuDetailVo.setPrice(infoResult.getData().getPrice());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询价格数据");
}, threadPoolExecutor);

CompletableFuture<Void> cf3 = CompletableFuture.runAsync(() -> {
	//远程调用product微服务的接口查询spu的销售属性和销售属性值
	Result<List<SpuSaleAttr>> spuSaleAttrListResult = skuDetailFeignClient.findSpuSaleAttrAndValueBySkuId(skuId);
	skuDetailVo.setSpuSaleAttrList(spuSaleAttrListResult.getData());
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务的接口查询spu的销售属性和销售属性值");
}, threadPoolExecutor);

CompletableFuture<Void> cf4 = CompletableFuture.runAsync(() -> {
	//远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合
	Result<List<AttrValueConcatVo>> brotherSkuSaleAttrValueConcatResult = skuDetailFeignClient.findBrotherSkuSaleAttrValueConcatBySkuId(skuId);
	List<AttrValueConcatVo> attrValueConcatVoList = brotherSkuSaleAttrValueConcatResult.getData();
	//Collectors.toMap将流中的元素转换成Map,方法的第一个参数用来构建map的键,方法的第二个参数用来构建map的值
	Map<String, Long> map = attrValueConcatVoList.stream().collect(Collectors.toMap(attrValueConcatVo -> attrValueConcatVo.getAttrValueConcat(), attrValueConcatVo -> attrValueConcatVo.getSkuId()));
	String valuesSkuJson = JSON.toJSONString(map);
	skuDetailVo.setValuesSkuJson(valuesSkuJson);
	log.info(Thread.currentThread().getName() + "---->远程调用product微服务,根据skuId获取所有兄弟sku销售属性值的组合");
}, threadPoolExecutor);

// 让四个异步任务执行完毕以后,再进行返回
CompletableFuture.allOf(cf1, cf2, cf3, cf4).join();