您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

利用DUCC配置平台实现一个动态化线程池

时间:2023-03-08 11:52:49  来源:  作者:京东云开发者

作者:京东零售 张宾

1.背景

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。

2.代码实现

当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。

setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

 

setMaximumPoolSize方法:首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。

 

基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;

(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

接下来代码一一实现:

(1)动态线程池类

/**
 * 动态线程池
 *
 */
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)动态线程池配置定时刷新类

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
    /**
     * MAIntain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
     */
    private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();

    /**
     * @param threadPoolBeanName
     * @param threadPoolTaskExecutor
     */
    public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
        log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
        DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.refresh();
        //创建定时任务线程池
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
        //延迟1秒执行,每个1分钟check一次
        executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
    }

    private void refresh() {
        String dynamicThreadPool = "";
        try {
            if (DTP_REGISTRY.isEmpty()) {
                log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
                return;
            }
            dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
            if (StringUtils.isBlank(dynamicThreadPool)) {
                log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
                return;
            }
            log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
            List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
            });
            if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
                log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
                return;
            }
            for (ThreadPoolProperties properties : threadPoolPropertiesList) {
                doRefresh(properties);
            }
        } catch (Exception e) {
            log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
        }
    }

    /**
     * @param properties
     */
    private void doRefresh(ThreadPoolProperties properties) {
        if (StringUtils.isBlank(properties.getThreadPoolBeanName())
                || properties.getCorePoolSize() < 1
                || properties.getMaxPoolSize() < 1
                || properties.getMaxPoolSize() < properties.getCorePoolSize()) {
            log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
            return;
        }
        DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
        if (Objects.isNull(threadPoolTaskExecutor)) {
            log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
            return;
        }
        ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
                && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
            return;
        }
        if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
            threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
            log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
        }
        if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
            log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
        }
       
        ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
    }

    private class RefreshThreadPoolConfig extends TimerTask {
        private RefreshThreadPoolConfig() {
        }

        @Override
        public void run() {
            DynamicThreadPoolRefresh.this.refresh();
        }
    }

}

线程池配置类

@Data
public class ThreadPoolProperties {
    /**
     * 线程池名称
     */
    private String threadPoolBeanName;
    /**
     * 线程池核心线程数量
     */
    private int corePoolSize;
    /**
     * 线程池最大线程池数量
     */
    private int maxPoolSize;
}

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

配置value:

[
  {
    "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",
    "corePoolSize": 32,
    "maxPoolSize": 128
  }
]

(4) 应用启动刷新应用本地动态线程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DynamicThreadPoolTaskExecutor) {
            DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
        }
        return bean;
    }
}

3.动态线程池应用

动态线程池Bean声明

    <!-- 普通线程池 -->
    <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrApper">
        <!-- 核心线程数,默认为 -->
        <property name="corePoolSize" value="128"/>
        <!-- 最大线程数,默认为Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="512"/>
        <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接抛出JAVA.util.concurrent.RejectedExecutionException异常 -->
            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 动态线程池 -->
    <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
        <!-- 核心线程数,默认为 -->
        <property name="corePoolSize" value="32"/>
        <!-- 最大线程数,默认为Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="128"/>
        <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 动态线程池刷新配置 -->
    <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
    <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

业务类注入Spring Bean后,直接使用即可

 @Resource
 private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;

 
 Runnable asyncTask = ()->{...};
 CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小结

本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。



Tags:线程池   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
深入掌握Java线程池调度策略,优化任务执行
在Java开发中,线程池是一种重要的并发处理机制。合理地使用线程池可以提高系统性能、响应速度和资源利用率。下面将深入掌握Java线程池的调度策略,介绍线程池的原理和常用的调...【详细内容】
2023-12-29  Search: 线程池  点击:(77)  评论:(0)  加入收藏
优雅的关闭Java线程池,这样做才是yyds
1 背景某年某月某日,和我的卧龙同事聊一个需求,说是有个数据查询的功能,因为涉及到多个第三方接口调用,想用线程池并行来做。很正常的一个方案,但是上线后发现,每次服务发布的时候...【详细内容】
2023-12-20  Search: 线程池  点击:(128)  评论:(0)  加入收藏
一文带你彻底弄懂线程池
一、前言虽然 Java 对线程的创建、中断、等待、通知、销毁、同步等功能提供了很多的支持,但是从操作系统角度来说,频繁的创建线程和销毁线程,其实是需要大量的时间和资源的。例...【详细内容】
2023-12-12  Search: 线程池  点击:(140)  评论:(0)  加入收藏
特殊线程池ForkJoinPool 要合理运用,不是什么样的任务都拿来用
背景Java 8 后一般稍微有点经验的程序员都在工作中更习惯于用流式API: Stream,他可以实现惰性计算(输出的元素可能并没有预先存储在内存中,而是实时计算出来的),一言以蔽之:省内存...【详细内容】
2023-12-06  Search: 线程池  点击:(221)  评论:(0)  加入收藏
打造定制线程池:Java多线程的艺术
当谈到多线程编程和并发控制时,Java中的线程池是一个不可或缺的工具。线程池允许更有效地管理和控 制线程的创建和执行,从而提高应用程序的性能和可维护性。我们来探讨Java线...【详细内容】
2023-11-23  Search: 线程池  点击:(206)  评论:(0)  加入收藏
揭秘Java性能调优的奥秘:垃圾回收调优与线程池优化
Java性能调优是提高应用程序性能和效率的重要一环,其中GC调优和线程池优化是两个关键方面。下面将揭秘Java性能调优的奥秘,并详细介绍GC调优和线程池优化的方法。一、GC调优垃...【详细内容】
2023-11-23  Search: 线程池  点击:(239)  评论:(0)  加入收藏
Java中线程池的优点和使用方法
线程池是一个存放线程的池子,它的存在有着重要的意义。在介绍线程池之前,我们先来了解一下什么是线程。线程是计算机中执行代码的最小单位,它可以在程序中独立运行,执行特定的任...【详细内容】
2023-11-01  Search: 线程池  点击:(366)  评论:(0)  加入收藏
四种常见线程池的原理,你学会了吗?
newFixedThreadPool (固定数目线程的线程池) newCachedThreadPool (可缓存线程的线程池) newSingleThreadExecutor (单线程的线程池) newScheduledThreadPool (定时及周期执...【详细内容】
2023-10-30  Search: 线程池  点击:(272)  评论:(0)  加入收藏
你真的了解线程池的七个参数是做什么的吗?
问:可以说一下线程池吗?关于线程池的问题,大多数面试官会问线程池的几个参数的含义,今天就直接聊一聊线程池ThreadPoolExecutor。先说下线程池中几个参数的含义:ThreadPoolExecut...【详细内容】
2023-10-18  Search: 线程池  点击:(187)  评论:(0)  加入收藏
解密SpringBoot线程池
我们在日常开发中,经常跟多线程打交道,Spring 为我们提供了一个线程池方便我们开发,它就是 ThreadPoolTaskExecutor ,接下来我们就来聊聊 Spring 的线程池吧。使用@Async声明多...【详细内容】
2023-10-13  Search: 线程池  点击:(332)  评论:(0)  加入收藏
▌简易百科推荐
Netflix 是如何管理 2.38 亿会员的
作者 | Surabhi Diwan译者 | 明知山策划 | TinaNetflix 高级软件工程师 Surabhi Diwan 在 2023 年旧金山 QCon 大会上发表了题为管理 Netflix 的 2.38 亿会员 的演讲。她在...【详细内容】
2024-04-08    InfoQ  Tags:Netflix   点击:(0)  评论:(0)  加入收藏
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(6)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(13)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(9)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(5)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(11)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(9)  评论:(0)  加入收藏
为什么都说 HashMap 是线程不安全的?
做Java开发的人,应该都用过 HashMap 这种集合。今天就和大家来聊聊,为什么 HashMap 是线程不安全的。1.HashMap 数据结构简单来说,HashMap 基于哈希表实现。它使用键的哈希码来...【详细内容】
2024-03-22  Java技术指北  微信公众号  Tags:HashMap   点击:(11)  评论:(0)  加入收藏
如何从头开始编写LoRA代码,这有一份教程
选自 lightning.ai作者:Sebastian Raschka机器之心编译编辑:陈萍作者表示:在各种有效的 LLM 微调方法中,LoRA 仍然是他的首选。LoRA(Low-Rank Adaptation)作为一种用于微调 LLM(大...【详细内容】
2024-03-21  机器之心Pro    Tags:LoRA   点击:(12)  评论:(0)  加入收藏
这样搭建日志中心,传统的ELK就扔了吧!
最近客户有个新需求,就是想查看网站的访问情况。由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的...【详细内容】
2024-03-20  dbaplus社群    Tags:日志   点击:(4)  评论:(0)  加入收藏
站内最新
站内热门
站内头条