一、背景:
使用JDK线程池ThreadPoolExecutor多线程异步执行批量插入、更新等操作方法,提高百万级数据插入效率。
二、具体细节:
2.1、创建自适应机器本身线程数量的线程池
/创建自适应机器本身线程数量的线程池
Integer processNum = Runtime.getRuntime().avAIlableProcessors();
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
2L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
@Override
public boolean batchInsert(List<Student> list) throws Exception {
Future<Boolean> a = null;
try {
/**
* submit与execute 都是向线程池提交任务。
* submit提交后执行提交类实现callable方法后重写的call方法,execute提交后执行实现Runnable的run方法
* Runnable任务没有返回值,而Callable任务有返回值。
* 并且Callable的call()方法只能通过ExecutorService的submit(Callable <T> task) 方法来执行
* 多人同时提交时的线程控制:多线程多任务
*/
a = executorService.submit(new BatchWay(list,studentService));
return a.get();
} catch (Exception e) {
e.printStackTrace();
try {
return a.get();
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
}
}
2.2、业务核心处理类:@Slf4j
public class BatchWay implements Callable<Boolean> {
private int batch100 = 100; //100条为分界批量导入
private List<Student> list; //list中的大量数据
private StudentService studentService;
//有参的构造函数,方便初始化其类
public BatchWay(List<Student> list, StudentService studentService) {
this.list = list;
this.studentService = studentService;
}
/**线程池*/
// private ThreadPoolExecutor threadPoolExecutor =
// new ThreadPoolExecutor(
// 10, //corePoolSize:线程池中核心线程数
// Runtime.getRuntime().availableProcessors(), //线程池中能拥有最多线程数 取所有
// 5L, //keepAliveTime:表示空闲线程的存活时间 2秒
// TimeUnit.SECONDS, //表示keepAliveTime的单位:秒
// new LinkedBlockingQueue<>(100), //用于缓存任务的阻塞队列
//// Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.CallerRunsPolicy()
// );
/**
* 功能描述:实现Callable的call方法
* @MethodName: call
* @MethodParam: []
* @Return: JAVA.lang.Boolean
* @Author: yyalin
* @CreateDate: 2022/5/6 15:46
*/
public Boolean call(){
try {
batchOp(list);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 功能描述:批量保存数据
* @MethodName: batchOp
* @MethodParam: [list]
* @Return: void
* @Author: yyalin
* @CreateDate: 2022/5/6 15:40
*/
private void batchOp(List<Student> list) {
if(!list.isEmpty()){
Integer size = list.size();
if(size<=batch100){
//小于分批的直接插入即可
studentService.saveBatch(list);
}else if(size>batch100){
//分批后再进行保存数据
batchOpSpilit(list,batch100);
}
}
}
/**
* 功能描述:对list进行切割
* @MethodName: batchOpSpilit
* @MethodParam: [list, batch100]
* @Return: void
* @Author: yyalin
* @CreateDate: 2022/5/6 15:43
*/
private void batchOpSpilit(List<Student> list, int batch100) {
log.info("开始切割………………");
List<List<Student>> list1 = SplitListUtils.pagingList(list, batch100);
try {
for (List<Student> list2 : list1) {
batchOp(list2);
// threadPoolExecutor.allowCoreThreadTimeOut(true);
// //再调batchOp方法,这里的多线程是多个小集合往数据库插入
// threadPoolExecutor.execute(() -> {
//// log.info("我是线程开始保存数据...:" + Thread.currentThread().getName());
// batchOp(list2);
// });
}
// log.info("当前线程池剩余的数量222222:"+threadPoolExecutor.getPoolSize());
} catch (Exception e) {
// log.info("出现异常:"+e);
} finally {
//最后关闭线程 不允许提交新的任务,但是会处理完已提交的任务
// threadPoolExecutor.shutdown();
}
}
2.3、造数据,多线程异步插入:
public String batchWay() throws Exception {
log.info("开始批量操作.........");
Random rand = new Random();
List<Student> list = new ArrayList<>();
for (int i = 0; i < 1000003; i++) {
Student student=new Student();
student.setStudentName("小李"+i);
student.setAddr("上海"+rand.nextInt(9) * 1000);
student.setAge(rand.nextInt(1000));
student.setPhone("134"+rand.nextInt(9) * 1000);
list.add(student);
}
long startTime = System.currentTimeMillis(); // 开始时间
boolean a=studentService.batchInsert(list);
long endTime = System.currentTimeMillis(); //结束时间
return "执行完成一共耗时time: " + (endTime - startTime) / 1000 + " s";
}
2.4、测试结果
汇总结果:
序号 |
核心线程(core_pool_size) |
插入数据(万) | 耗时(秒) |
1 | 10 | 100w | 38s |
2 | 15 | 100w | 32s |
3 | 50 | 100w | 31s |
个人推荐:SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据的方法。
总结:ThreadPoolTaskExecutor和ThreadPoolExecutor比Executors创建线程池更加灵活,可以设置参数,推荐ThreadPoolTaskExecutor和ThreadPoolExecutor,而ThreadPoolTaskExecutor是ThreadPoolExecutor的封装,所以,性能更加优秀,推荐ThreadPoolTaskExecutor。