配置线程池

创建线程池的方式:

1、使用 Executors 创建,不推荐, 默认创建的工作队列,使用的是 LinkedBlockingDeque 队列,且默认容量为 Integer 的最大值,工作队列的容量过大,会导致核心线程工作过载,对垒中任务数过多,且非核心线程无法参与处理,最终导致内存溢出

2、直接new ThreadPoolExecutor 对象,推荐

@Configuration
public class AppConfig {

@Bean
public ThreadPoolExecutor bizThreadPoolExecutor() {
return new ThreadPoolExecutor(10, 50, 10, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100));
}
}

参数说明:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}

corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;

maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;

keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;

unit:keepAliveTime的单位

workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;

threadFactory:线程工厂,用于创建线程,一般用默认即可;

handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务;

多线程执行任务

使用一个 Mybatis Plus 项目举例:使用多线程发起查询请求,获取当前区域的子区域,例如广东省的子区域广州、深圳等

@Service
public class DestinationServiceImpl extends ServiceImpl<DestinationMapper, Destination> implements DestinationService {

private final ThreadPoolExecutor bizThreadPoolExecutor;

public DestinationServiceImpl(ThreadPoolExecutor bizThreadPoolExecutor) {
this.bizThreadPoolExecutor = bizThreadPoolExecutor;
}

/**
* 使用代码循环方式,有N+1问题
* 使用多线程,同时发查询请求
*
* @param rid 区域ID
* @return 热门目的地
*/
public List<Destination> findHostListThread(Long rid) {
List<Destination> destinations = null;
LambdaQueryWrapper<Destination> wrapper = new LambdaQueryWrapper<>();
destinations = list(wrapper.eq(Destination::getParentId, 1));
// 如何等待所有异步线程结束,主线程再执行
CountDownLatch latch = new CountDownLatch(destinations.size());
for (Destination destination : destinations) {
// submit有返回值,且支持Callable,execute没有返回值,只支持Runnable
bizThreadPoolExecutor.execute(() -> {
List<Destination> children = list(Wrappers.<Destination>lambdaQuery().eq(Destination::getParentId, destination.getId()).last("limit 10"));
destination.setChildren(children);
// 倒计时数量-1
latch.countDown();
});
}
// 返回结果前阻塞等待
try {
latch.await(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
return destinations;
}
}