您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

从根儿上认识线程池

时间:2020-05-18 13:24:35  来源:  作者:

前言

大家好,我是jack xu,本篇是并发编程的第二篇,今天跟大家聊一聊线程池的那点事。本篇文章有点长,小伙们静下心、耐下心来把他看完。。

为什么要使用线程池

1)降低创建线程和销毁线程的性能开销

2)提高响应速度,当有新任务需要执行是不需要等待线程创建就可以立马执行

3)合理的设置线程池大小可以避免因为线程数超过硬件资源瓶颈带来的问题

 

从根儿上认识线程池

 

我们来看阿里巴巴的代码规范,在项目中创建线程必须要使用线程池创建,原因就是我说的以上三点

 

线程池的使用

首先我们来看下UML类图

 

从根儿上认识线程池

 

 

  • Executor:可以看到最顶层是 Executor 的接口。这个接口很简单,只有一个 execute 方法。此接口的目的是为了把任务提交和任务执行解耦。
  • ExecutorService:这还是一个接口,继承自 Executor,它扩展了 Executor 接口,定义了更多线程池相关的操作。
  • AbstractExecutorService:提供了 ExecutorService 的部分默认实现。
  • ThreadPoolExecutor:实际上我们使用的线程池的实现是 ThreadPoolExecutor。它实现了线程池工作的完整机制。也是我们接下来分析的重点对象。
  • ForkJoinPool:和ThreadPoolExecutor都继承自AbstractExecutorService,适合用于分而治之,递归计算的算法
  • ScheduledExecutorService:这个接口扩展了ExecutorService,定义个延迟执行和周期性执行任务的方法。
  • ScheduledThreadPoolExecutor:此接口则是在继承 ThreadPoolExecutor 的基础上实现 ScheduledExecutorService 接口,提供定时和周期执行任务的特性。

搞清楚上面的结构很重要,Executors是一个工具类,然后看创建线程的两种方式,第一种是通过Executors提供的工厂方法来实现,有下面四种方式

        Executor executor1 = Executors.newFixedThreadPool(10);
        Executor executor2 = Executors.newSingleThreadExecutor();
        Executor executor3 = Executors.newCachedThreadPool();
        Executor executor4 = Executors.newScheduledThreadPool(10);
复制代码

第二种是通过构造方法来实现

        ExecutorService executor5 = new ThreadPoolExecutor(1,
                1,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
复制代码

其实查看第一种方式创建的源码就会发现:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
复制代码

根本上还是通过调用ThreadPoolExecutor的构造方法,创建时传入不同参数,所以本质上还是只有一种创建线程池的方式,就是用构造方法,这里我不想讲用Executors的工厂方法具体帮我们创建了怎样的线程池,让我们再来看一条阿里巴巴规范。

 

从根儿上认识线程池

 

看到这里大家都明白了吧,正是因为封装性太强了,反而小伙们会不知道怎么用,乱用,滥用,有可能会导致OOM,除非你对创建的这四个线程池了如指掌,所以我介绍了也是白介绍,因为就不在用,接下来我们重点看下ThreadPoolExecutor构造方法里各个参数的含义,构造方法有很多个,我选了一个最完整的。

 

public ThreadPoolExecutor(int corePoolSize, //核心线程数量
                          int maximumPoolSize, //最大线程数
                          long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
                          TimeUnit unit, //存活时间单位
                          BlockingQueue<Runnable> workQueue, //保存执行任务的队列
                          ThreadFactory threadFactory,//创建新线程使用的工厂
                          RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
复制代码
  • corePoolSize:即线程池的核心线程数量,其实也是最小线程数量。不设置allowCoreThreadTimeOut 的情况下,核心线程数量范围内的线程一直存活。线程不会自行销毁,而是以挂起的状态返回到线程池,直到应用程序再次向线程池发出请求时,线程池里挂起的线程就会再度激活执行任务。
  • maximumPoolSize:即线程池的最大线程数量
  • keepAliveTime和unit:超出核心线程数后的存活时间和单位
  • workQueue:是一个阻塞的 queue,用来保存线程池要执行的所有任务。通常可以取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;  
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
复制代码
  • ThreadFactory:我们一般用Executors.defaultThreadFactory()默认工厂,为什么要用工厂呢,其实就是规范了生成的Thread。避免调用new Thread创建,导致创建出来的Thread可能存在差异
  • handler:当队列和最大线程池都满了之后的饱和策略。
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录
日志或持久化存储不能处理的任务
复制代码

创建完线程池后使用也很简单,带返回值和不带返回值,传入对应传入Runnable或者Callable接口的实现

        //无返回值
        executor5.execute(() -> System.out.println("jack xushuaige"));
        //带返回值
        String message = executor5.submit(() -> { return "jack xushuaige"; }).get();
复制代码

源码分析

execute方法

基于源码入口进行分析,先看execute方法

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
复制代码

源代码中有一段关键的注释我没有贴进来,下面我先把这段关键的注释翻译讲解下:

分三步做处理:

1、如果运行的线程数量小于 corePoolSize,那么尝试创建新的线程,并把传入的 command 作为它的第一个 task 来执行。调用 addWorker 会自动检查 runState 和 workCount,以此来防止在不应该添加线程时添加线程的错误警告;

2、即使 task 可以被成功加入队列,我们仍旧需要再次确认我们是否应该添加 thread(因为最后一次检查之后可能有线程已经死掉了)还是线程池在进入此方法后已经停掉了。所以我们会再次检查状态,如果有必要的话,可以回滚队列。或者当没有线程时,开启新的 thread;

3、如果无法将 task 加入 queue,那么可以尝试添加新的 thread。如果添加失败,这是因为线程池被关闭或者已经饱和了,所以拒绝这个 task。

如果你看完以后还是一脸懵逼,那没事,我把这个流程图画下来,你品,你细品,好好理解一下

从根儿上认识线程池

 

 

然后介绍一下源码中ctl是干什么的,点进去查看源码

 

从根儿上认识线程池

 

我们发现它是一个原子类,主要作用是用来保存线程数量和线程池的状态,他用到了位运算, 一个int数值是32个 bit 位,这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。

 

我们来计算一下ctlOf(RUNNING, 0)方法,其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位,-1 的二进制是32个1(1111 1111 1111 1111 1111 1111 1111 1111),左移29位后得到(1110 0000 0000 0000 0000 0000 0000 0000),然后111| 0还是111,同理可得其他状态的 bit 位。这个位运算很有意思,hashmap源码中也用到了位运算,小伙们在平时开发中也可以尝试用下,这样运算速度会快,而且能够装b,介绍下这五种线程池的状态

  • RUNNING:接收新任务,并执行队列中的任务
  • SHUTDOWN:不接收新任务,但是执行队列中的任务
  • STOP:不接收新任务,不执行队列中的任务,中断正在执行中的任务
  • TIDYING:所有的任务都已结束, 线程数量为 0,处于该状态的线程池即将调用 terminated()方法
  • TERMINATED:terminated()方法执行完成

他们的转换关系如下:

 

从根儿上认识线程池

 

 

addWorker方法

我们看到execute流程的核心方法为addWorker,我们继续分析,源码看起来比较唬人,其实就做了两件事,拆分一下

第一步:更新worker的数量,代码如下:

retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
复制代码

retry是一个标记,和循环配合使用,continue retry 的时候,会跳到 retry 的地方再次执行。如果 break retry,则跳出整个循环体。源码先获取到 ctl,然后检查状态,然后根据创建线程类型的不同,进行数量的校验。在通过CAS方式更新 ctl状态,成功的话则跳出循环。否则再次取得线程池状态,如果和最初已经不一致,那么从头开始执行。如果状态并未改变则继续更新worker的数量。流程图如下:

 

从根儿上认识线程池

 

第二步:添加 worker 到 workers 的 set 中。并且启动 worker 中持有的线程。代码如下:

 

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
复制代码

可以看到添加 work 时需要先获得锁,这样确保多线程并发安全。如果添加 worker 成功,那么调用 worker 中线程的 start 方法启动线程。如果启动失败则调用 addWorkerFailed 方法进行回滚。看到这里小伙们会发现

1、ThreadPoolExecutor在初始化后并没有启动和创建任何线程,在调用 execute方法时才会调用 addWorker创建线程

2、addWorker方法中会创建新的worker,并启动其持有的线程来执行任务。

上文提到如果线程数量已经达到corePoolSize,则只会把command 加入到 workQueue中,那么加入到 workQueue中的command是如何被执行的呢?下面我们来分析 Worker 的源代码。

Worker类

Worker封装了线程,是executor中的工作单元。worker继承自AbstractQueuedSynchronizer,并实现 Runnable。 Worker 简单理解其实就是一个线程,里面重新了 run 方法,我们来看他的构造方法:

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
复制代码

再来看下这两个重要的属性

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
复制代码

firstTask 用它来保存传入的任务;thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程,这里用的是 ThreadFactory 创建线程,并没有直接 new,原因上文也提到过,这里看下 newThread 传入的是 this,因为 Worker 本身继承了 Runnable 接口,所以 addWork 中调用的 t.start(),实际上运行的是 t 所属 worker 的 run 方法。worker 的 run 方法如下:

public void run() {
    runWorker(this);
}
复制代码

runWorker源码再如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码

简单分析一下

1、先取出 worker 中的 firstTask,并清空;

2、如果没有 firstTask,则调用 getTask 方法,从 workQueue 中获取task;

3、获取锁;

4、执行 beforeExecute。这里是空方法,如有需要在子类实现;

5、执行 task.run;

6、执行 afterExecute。这里是空方法,如有需要在子类实现;

7、清空 task,completedTasks++,释放锁;

8、当有异常或者没有 task 可执行时,会进入外层 finnaly 代码块。调用 processWorkerExit 退出当前 worker。从 works 中移除本 worker 后,如果 worker 数量小于 corePoolSize,则创建新的 worker,以维持 corePoolSize 大小的线程数。

这行代码 while (task != null || (task = getTask()) != null) ,确保了 worker 不停地从 workQueue 中取得 task 执行。getTask 方法会从 BlockingQueue workQueue 中 poll 或者 take 其中的 task 出来。

至此,关于 executor 如何创建并启动线程执行 task 的过程已经分析的清清楚楚,明明白白,后面还有shutdown()、shutdownNow()等其他方法留给小伙们自行去观察研究哈。

如何合理配置线程池的大小

线程池大小不是靠猜,也不是说越多越好。

  • CPU 密集型任务:主要是执行计算任务,响应时间很快,CPU 一直在运行,这种任务 CPU 的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,应当分配较少的线程,比如和 CPU 个数相当的大小。
  • IO 密集型任务:主要是进行 IO 操作,执行 IO 操作的时间较长,由于线程并不是一直在运行,这时 CPU 处于空闲状态, 这种情况下可以增加线程池的大小,比如 CPU 个数 * 2

当然这些都是经验值,最好的方式还是根据实际情况测试得出最佳配置。

线程池的监控

如果在项目中大规模的使用了线程池,那么必须要有一套监控体系,来指导当前线程池的状 态,当出现问题的时候可以快速定位到问题。我们通过重写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以实现对线程的监控

 

从根儿上认识线程池

 

看这些名称和定义都知道,这是让子类来实现的,可以在线程执行前、后、终止状态执行自定义逻辑。

 

总结

线程池这东西说简单也简单,说难也难,简单是因为用起来简单,所以小伙们可能觉得这有啥好讲的,难是难在要知道他的底层的源码,他是如何调度线程的,说两点吧,第一是本文中用了大量的流程图,当我们在阅读源码或者做复杂业务开发的时候,一定要静下心来先画个图,否则会被绕晕或者被别人打断后,又得从头到尾的看一边,第二是阅读源码,刚毕业的小伙伴可能只要会用行了,但是如果你工作五年了,还是只会用,而不知道他里面是如何实现的,那你比刚毕业的优势在哪里,凭什么工资比刚毕业的高。如果你觉得写的不错,请点一个赞!



Tags:线程池   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
原文链接: https://mp.weixin.qq.com/s/MTw7z6n_wk4y4CTmGkoRoA一切要从CPU说起你可能会有疑问,讲多线程为什么要从CPU说起呢?原因很简单,在这里没有那些时髦的概念,你可以更加清...【详细内容】
2021-08-13  Tags: 线程池  点击:(97)  评论:(0)  加入收藏
多线程并发是Java语言中非常重要的一块内容,同时,也是Java基础的一个难点。说它重要是因为多线程是日常开发中频繁用到的知识,说它难是因为多线程并发涉及到的知识点非常之多,想...【详细内容】
2021-07-12  Tags: 线程池  点击:(110)  评论:(0)  加入收藏
1. Dubbo简介及线程池策略Apache Dubbo 是一款高性能、轻量级的开源 Java 服务框架。提供了六大核心能力:面向接口代理的高性能RPC调用,智能容错和负载均衡,服务自动注册和发现...【详细内容】
2021-05-18  Tags: 线程池  点击:(202)  评论:(0)  加入收藏
在上一篇文章C++使用socket实现与微信小程序通信(下)中,小懵白就给大家简要地讲解了线程池的原理。 今天呢,小懵白就给大家继续讲解C++如何实现封装线程池类。第一步首先,我们需...【详细内容】
2021-05-14  Tags: 线程池  点击:(204)  评论:(0)  加入收藏
见字如面,我是威哥,一个从普通二本院校毕业,从未曾接触分布式、微服务、高并发到通过技术分享实现职场蜕变,成长为RocketMQ社区优秀布道师、大厂资深架构师,出版《RocketMQ技...【详细内容】
2021-03-31  Tags: 线程池  点击:(277)  评论:(0)  加入收藏
作者公众号:一角钱技术(org_yijiaoqian)前言线程池的具体实现有两种,分别是ThreadPoolExecutor 默认线程池和ScheduledThreadPoolExecutor 定时线程池,上一篇已经分析过ThreadPoo...【详细内容】
2020-12-22  Tags: 线程池  点击:(143)  评论:(0)  加入收藏
之前我们介绍了线程池的四种拒绝策略,了解了线程池参数的含义,那么今天我们来聊聊Java 中常见的几种线程池,以及在jdk7 加入的 ForkJoin 新型线程池 首先我们列出Java 中的...【详细内容】
2020-11-05  Tags: 线程池  点击:(91)  评论:(0)  加入收藏
前面几篇文章分析了线程的主要实现,今天来整体总结以下他们。总览图直接上总结的总览图,如下图: 如果看过前几篇文章应该基本能够看懂这张总结图,可能在单独的一篇文章里弄懂了...【详细内容】
2020-09-08  Tags: 线程池  点击:(63)  评论:(0)  加入收藏
大多数线程池实现都离不开锁的使用,如互斥量pthread_mutex*结合条件变量pthread_cond*。众所周知,锁的使用对于程序性能影响较大,虽然现有的pthread_mutex*在锁的申请与释放方...【详细内容】
2020-08-24  Tags: 线程池  点击:(87)  评论:(0)  加入收藏
作为 Java 程序员,无论是技术面试、项目研发或者是学习框架源码,不彻底掌握 Java 多线程的知识,做不到心中有数,干啥都没底气,尤其是技术深究时往往略显发憷。坐稳扶好,通过今天的...【详细内容】
2020-08-12  Tags: 线程池  点击:(48)  评论:(0)  加入收藏
▌简易百科推荐
本文分为三个等级自顶向下地分析了glibc中内存分配与回收的过程。本文不过度关注细节,因此只是分别从arena层次、bin层次、chunk层次进行图解,而不涉及有关指针的具体操作。前...【详细内容】
2021-12-28  linux技术栈    Tags:glibc   点击:(3)  评论:(0)  加入收藏
摘 要 (OF作品展示)OF之前介绍了用python实现数据可视化、数据分析及一些小项目,但基本都是后端的知识。想要做一个好看的可视化大屏,我们还要学一些前端的知识(vue),网上有很多比...【详细内容】
2021-12-27  项目与数据管理    Tags:Vue   点击:(2)  评论:(0)  加入收藏
程序是如何被执行的&emsp;&emsp;程序是如何被执行的?许多开发者可能也没法回答这个问题,大多数人更注重的是如何编写程序,却不会太注意编写好的程序是如何被运行,这并不是一个好...【详细内容】
2021-12-23  IT学习日记    Tags:程序   点击:(9)  评论:(0)  加入收藏
阅读收获✔️1. 了解单点登录实现原理✔️2. 掌握快速使用xxl-sso接入单点登录功能一、早期的多系统登录解决方案 单系统登录解决方案的核心是cookie,cookie携带会话id在浏览器...【详细内容】
2021-12-23  程序yuan    Tags:单点登录(   点击:(8)  评论:(0)  加入收藏
下载Eclipse RCP IDE如果你电脑上还没有安装Eclipse,那么请到这里下载对应版本的软件进行安装。具体的安装步骤就不在这赘述了。创建第一个标准Eclipse RCP应用(总共分为六步)1...【详细内容】
2021-12-22  阿福ChrisYuan    Tags:RCP应用   点击:(7)  评论:(0)  加入收藏
今天想简单聊一聊 Token 的 Value Capture,就是币的价值问题。首先说明啊,这个话题包含的内容非常之光,Token 的经济学设计也可以包含诸多问题,所以几乎不可能把这个问题说的清...【详细内容】
2021-12-21  唐少华TSH    Tags:Token   点击:(10)  评论:(0)  加入收藏
实现效果:假如有10条数据,分组展示,默认在当前页面展示4个,点击换一批,从第5个开始继续展示,到最后一组,再重新返回到第一组 data() { return { qList: [], //处理后...【详细内容】
2021-12-17  Mason程    Tags:VUE   点击:(14)  评论:(0)  加入收藏
什么是性能调优?(what) 为什么需要性能调优?(why) 什么时候需要性能调优?(when) 什么地方需要性能调优?(where) 什么时候来进行性能调优?(who) 怎么样进行性能调优?(How) 硬件配...【详细内容】
2021-12-16  软件测试小p    Tags:性能调优   点击:(20)  评论:(0)  加入收藏
Tasker 是一款适用于 Android 设备的高级自动化应用,它可以通过脚本让重复性的操作自动运行,提高效率。 不知道从哪里听说的抖音 app 会导致 OLED 屏幕烧屏。于是就现学现卖,自...【详细内容】
2021-12-15  ITBang    Tags:抖音防烧屏   点击:(25)  评论:(0)  加入收藏
11 月 23 日,Rust Moderation Team(审核团队)在 GitHub 上发布了辞职公告,即刻生效。根据公告,审核团队集体辞职是为了抗议 Rust 核心团队(Core team)在执行社区行为准则和标准上...【详细内容】
2021-12-15  InfoQ    Tags:Rust   点击:(25)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条