虽然 JAVA 对线程的创建、中断、等待、通知、销毁、同步等功能提供了很多的支持,但是从操作系统角度来说,频繁的创建线程和销毁线程,其实是需要大量的时间和资源的。
例如,当有多个任务同时需要处理的时候,一个任务对应一个线程来执行,以此来提升任务的执行效率,模型图如下:
图片
如果任务数非常少,这种模式倒问题不大,但是如果任务数非常的多,可能就会存在很大的问题:
假如把很多任务让一组线程来执行,而不是一个任务对应一个新线程,这种通过接受任务并进行分发处理的就是线程池。
图片
线程池内部维护了若干个线程,当没有任务的时候,这些线程都处于等待状态;当有新的任务进来时,就分配一个空闲线程执行;当所有线程都处于忙碌状态时,新任务要么放入队列中等待,要么增加一个新线程进行处理,要么直接拒绝。
很显然,这种通过线程池来执行多任务的思路,优势明显:
关于这一点,我们可以看一个简单的对比示例。
/**
* 使用一个任务对应一个线程来执行
* @param args
*/
public static void mAIn(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 一个任务对应一个线程,使用20000个线程执行任务
for (int i = 0; i < 20000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
}
}).start();
}
// 等待任务执行完毕
while (true){
if(list.size() >= 20000){
break;
}
}
System.out.println("一个任务对应一个线程,执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
}
/**
* 使用线程池进行执行任务
* @param args
*/
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 使用线程池进行执行任务,默认4个线程
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000));
for (int i = 0; i < 20000; i++) {
// 提交任务
executor.submit(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
}
});
}
// 等待任务执行完毕
while (true){
if(list.size() >= 20000){
break;
}
}
System.out.println("使用线程池,执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
// 关闭线程池
executor.shutdown();
}
两者执行耗时情况对比,如下:
一个任务对应一个线程,执行耗时:3073ms
---------------------------
使用线程池,执行耗时:578ms
从结果上可以看出,同样的任务数,采用线程池和不采用线程池,执行耗时差距非常明显,一个任务对应一个新的线程来执行,反而效率不如采用 4 个线程的线程池执行的快。
为什么会产生这种现象,下面我们就一起来聊聊线程池。
站在专业的角度讲,线程池其实是一种利用池化思想来实现线程管理的技术,它将线程的创建和任务的执行进行解耦,同时复用已经创建的线程来降低频繁创建和销毁线程所带来的资源消耗。通过合理的参数设置,可以实现更低的系统资源使用率、更高的任务并发执行效率。
在 Java 中,线程池最顶级的接口是Executor,名下的实现类关系图如下:
图片
关键接口和实现类,相关的描述如下:
整个关系图中,其中ThreadPoolExecutor是线程池最核心的实现类,开发者可以使用它来创建线程池。
ThreadPoolExecutor类的完整构造方法一共有七个参数,理解这些参数的配置对使用好线程池至关重要,完整的构造方法核心源码如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
各个参数的解读如下:
创建完线程池之后就可以提交任务了,当有新的任务进来时,线程池就会工作并分配线程去执行任务。
ThreadPoolExecutor的典型用法如下:
// 创建固定大小的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100));
// 提交任务
executor.execute(task1);
executor.execute(task2);
executor.execute(task3);
...
针对任务的提交方式,ThreadPoolExecutor还提供了两种方法。
ThreadPoolExecutor执行提交的任务流程虽然比较复杂,但是通过对源码的分析,大致的任务执行流程,可以用如下图来概括。
整个执行流程,大体步骤如下:
我们再回头来看上文提到的ThreadPoolExecutor构造方法中的七个参数,这些参数会直接影响线程的执行情况,各个参数的变化情况,可以用如下几点来概括:
ThreadPoolExecutor执行任务的部分核心源码如下!
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作线程数量 < corePoolSize,直接创建线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 工作线程数量 >= corePoolSize,将任务添加至阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 往阻塞队列中添加任务的时候,如果线程池非运行状态,将任务remove,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 阻塞队列已满,尝试添加新的线程去执行,如果工作线程数量 >= maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池状态处于非 RUNNING 状态,添加worker失败
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 判断线程池中线程数量大于等于该线程池允许的最大线程数量,如果大于则worker失败,反之cas更新线程池中的线程数
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程池处于 RUNNING 状态并且线程已经启动,则抛出线程异常启动
if (t.isAlive())
throw new IllegalThreadStateException();
// 将线程加入已创建的工作线程集合,更新用于追踪线程池中线程数量 largestPoolSize 字段
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
final void runWorker(Worker w) {
// 获取执行任务线程
Thread wt = Thread.currentThread();
// 获取执行任务
Runnable task = w.firstTask;
// 将worker中的任务置空
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 从当前工作线程种获取任务,或者循环从阻塞任务队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 双重检查线程池是否正在停止,如果线程池停止,并且当前线程能够中断,则中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置执行任务钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行当前任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置执行任务钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 回收线程
processWorkerExit(w, completedAbruptly);
}
}
final void reject(Runnable command) {
// 执行拒绝策略
handler.rejectedExecution(command, this);
}
当线程池中的线程数大于等于 maximumPoolSize,并且 workQueue 已满,新任务会被拒绝,使用RejectedExecutionHandler接口的rejectedExecution()方法来处理被拒绝的任务。
线程池提供了四种拒绝策略实现类来拒绝任务,具体如下:
类 |
描述 |
AbortPolicy |
直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略 |
DiscardPolicy |
什么也不做,直接丢弃任务 |
DiscardOldestPolicy |
将阻塞队列中的任务移除出来,然后执行当前任务 |
CallerRunsPolicy |
尝试直接运行被拒绝的任务,如果线程池已经被关闭了,任务就被丢弃了 |
我们知道 Java 种的线程一共 6 种状态,其实线程池也有状态。
因为线程池也是异步执行的,有的任务正在执行,有的任务存储在任务队列中,有的线程处于工作状态,有的线程处于空闲状态等待回收,为了更加精细化的管理线程池,线程池也设计了 5 中状态,部分核心源码如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
// 线程池线程数的bit数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
}
其中的状态流程,可以用如下图来描述!
这几个状态的转化关系,可以用如下几个步骤来概括:
正如文章的开头所介绍的,使用线程池的方式,通常可以用如下几个步骤来概括:
// 1.创建固定大小为4的线程数、空闲线程的存活时间为15秒、阻塞任务队列的上限为1000的线程池完整示例
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 15, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
// 2.提交任务
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
...
// 3.使用完毕之后,可以手动关闭线程池
executor.shutdown();
正如上文所说,其中execute()和submit()方法都可以用来提交任务,稍有不同的是:submit()方法同时还支持获取任务执行完毕的返回结果。
针对线程池的使用,Java 还提供了Executors工具类,开发者可以通过此工具,快速创建不同类型的线程池。
下面我们一起来看下Executors为用户提供的几种创建线程池的方法。
newSingleThreadExecutor()方法表示创建一个单线程的线程池,核心源码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
从构造参数上可以很清晰的看到,线程池中的线程数为 1,不会被线程池自动回收,workQueue 选择的是无界的LinkedBlockingQueue阻塞队列,不管来多少任务存入阻塞队列中,前面一个任务执行完毕,再执行队列中的剩余任务。
简单应用示例如下:
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 创建一个单线程线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
System.out.println("thread name:" + Thread.currentThread().getName());
}
});
}
while (true){
if(list.size() >= 10){
break;
}
}
System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
// 关闭线程池
executor.shutdown();
}
运行结果如下:
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
thread name:pool-1-thread-1
执行耗时:13ms
newFixedThreadPool()方法表示创建一个固定大小线程数的线程池,核心源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定大小的线程池和单线程的线程池有异曲同工之处,无非是让线程池中能运行的线程数量支持手动指定。
简单应用示例如下:
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 创建固定大小线程数为3的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
System.out.println("thread name:" + Thread.currentThread().getName());
}
});
}
while (true){
if(list.size() >= 10){
break;
}
}
System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
// 关闭线程池
executor.shutdown();
}
运行结果如下:
thread name:pool-1-thread-2
thread name:pool-1-thread-1
thread name:pool-1-thread-3
thread name:pool-1-thread-3
thread name:pool-1-thread-3
thread name:pool-1-thread-1
thread name:pool-1-thread-3
thread name:pool-1-thread-2
thread name:pool-1-thread-2
thread name:pool-1-thread-1
执行耗时:10ms
newCachedThreadPool()方法表示创建一个可缓存的无界线程池,核心源码如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从构造参数上可以看出,线程池中的最大线程数为Integer.MAX_VALUE,也就是Integer的最大值,workQueue 选择的是SynchronousQueue阻塞队列,这个阻塞队列不像LinkedBlockingQueue,它没有容量,只负责做临时任务缓存,如果有任务进来立刻会被执行。
也就是说,只要添加进去了任务,线程就会立刻去执行,当任务超过线程池的线程数则创建新的线程去执行,线程数量的最大上线为Integer.MAX_VALUE,当线程池中的线程空闲时间超过 60s,则会自动回收该线程。
简单应用示例如下:
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final Random random = new Random();
List<Integer> list = new CopyOnWriteArrayList<>();
// 创建可缓存的无界线程池
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
list.add(random.nextInt(100));
System.out.println("thread name:" + Thread.currentThread().getName());
}
});
}
while (true){
if(list.size() >= 10){
break;
}
}
System.out.println("执行耗时:" + (System.currentTimeMillis() - startTime) + "ms");
// 关闭线程池
executor.shutdown();
}
运行结果如下:
thread name:pool-1-thread-1
thread name:pool-1-thread-2
thread name:pool-1-thread-3
thread name:pool-1-thread-4
thread name:pool-1-thread-3
thread name:pool-1-thread-2
thread name:pool-1-thread-1
thread name:pool-1-thread-4
thread name:pool-1-thread-4
thread name:pool-1-thread-4
执行耗时:13ms
newScheduledThreadPool()方法表示创建周期性的线程池,可以指定线程池中的核心线程数,支持定时及周期性任务的执行,核心源码如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
从构造参数上可以看出,线程池支持指定核心线程数,最大线程数为Integer.MAX_VALUE,workQueue 选择的是DelayedWorkQueue延迟阻塞队列,这个阻塞队列支持任务延迟消费,新加入的任务不会立刻被执行,只有时间到期之后才会被取出;当非核心线程处于空闲状态时,会立刻进行收回。
ScheduledExecutorService支持三种类型的定时调度方法,分别如下:
下面我们一起来看看它们的应用方式。
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) + " 准备启动");
// 定时执行一次的任务,延迟1s后执行
executor.schedule(new Runnable() {
@Override
public void run() {
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + ", schedule");
}
}, 1, TimeUnit.SECONDS);
输出结果:
2023-11-17 01:41:12 准备启动
2023-11-17 01:41:13 thread name:pool-1-thread-1, schedule
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) + " 准备启动");
// 周期性地执行任务,第一个任务延迟1s后执行,之后每隔2s周期性执行任务,需要等待上一次的任务执行完毕才执行下一个
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " begin");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " end");
}
}, 1, 2, TimeUnit.SECONDS);
输出结果:
2023-11-17 02:00:44 准备启动
2023-11-17 02:00:45 thread name:pool-1-thread-1 begin
2023-11-17 02:00:48 thread name:pool-1-thread-1 end
2023-11-17 02:00:48 thread name:pool-1-thread-1 begin
2023-11-17 02:00:51 thread name:pool-1-thread-1 end
2023-11-17 02:00:51 thread name:pool-1-thread-1 begin
2023-11-17 02:00:54 thread name:pool-1-thread-1 end
SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 创建线程数量为2的定时调度线程池
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println(sdf.format(new Date()) + " 准备启动");
// 周期性地执行任务,第一个任务延迟1s后执行,之后上一个任务执行完毕之后,延迟2秒再执行下一个任务
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " begin");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sdf.format(new Date()) + " thread name:" + Thread.currentThread().getName() + " end");
}
}, 1, 2, TimeUnit.SECONDS);
输出结果:
2023-11-17 01:53:26 准备启动
2023-11-17 01:53:27 thread name:pool-1-thread-1 begin
2023-11-17 01:53:30 thread name:pool-1-thread-1 end
2023-11-17 01:53:32 thread name:pool-1-thread-1 begin
2023-11-17 01:53:35 thread name:pool-1-thread-1 end
2023-11-17 01:53:37 thread name:pool-1-thread-1 begin
2023-11-17 01:53:40 thread name:pool-1-thread-1 end
从以上的介绍中,我们可以对这四种线程池的参数做一个汇总,内容如下表:
工厂方法 |
corePoolSize |
maximumPoolSize |
keepAliveTime |
workQueue |
newSingleThreadExecutor |
1 |
1 |
0 |
LinkedBlockingQueue |
newFixedThreadPool |
nThreads |
nThreads |
0 |
LinkedBlockingQueue |
newCachedThreadPool |
0 |
Integer.MAX_VALUE |
60s |
SynchronousQueue |
newScheduledThreadPool |
corePoolSize |
Integer.MAX_VALUE |
0 |
DelayedWorkQueue |
这四个线程池,主要的区别在于:corePoolSize、maximumPoolSize、keepAliveTime、workQueue 这四个参数,其中线程工厂为默认类DefaultThreadFactory,线程饱和的拒绝策略为默认类AbortPolicy。
结合以上的分析,最后我们再来总结一下。
对于线程池的使用,不太建议采用Executors工具去创建,尽量通过ThreadPoolExecutor的构造方法来创建,原因在于:有利于规避资源耗尽的风险;同时建议开发者手动设定任务队列的上限,防止服务出现 OOM。
虽然Executors工具提供了四种创建线程池的方法,能帮助开发者省去繁琐的参数配置,但是newSingleThreadExecutor和newFixedThreadPool方法创建的线程池,任务队列上限为Integer.MAX_VALUE,这意味着可以无限提交任务,这在高并发的环境下,系统可能会出现 OOM,导致整个线程池不可用;其次newCachedThreadPool方法也存在同样的问题,无限的创建线程可能会给系统带来更多的资源消耗。
其次,创建线程池的时候应该尽量给线程定义一个具体的业务名字前缀,方便定位问题,不同类型的业务尽量使用不同的线程池来实现。
例如可以使用guava包,创建自定义的线程工厂。
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true).build();
当然,你也可以自行实现一个线程工厂,需要继承ThreadFactory接口,案例如下:
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程工厂,它设置线程名称,有利于我们定位问题。
*/
public final class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;
/**
* 创建一个带名字的线程池生产工厂
*/
public NamingThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(name + "-" + threadNum.incrementAndGet());
return t;
}
}
创建一个线程名称以order开头的线程工厂。
NamingThreadFactory threadFactory = new NamingThreadFactory(Executors.defaultThreadFactory(), "order");
最后,再来说说关于线程池中线程数,如何合理设定的问题?
那如何判断当前是 CPU 密集型任务还是 I/O 密集型任务呢?
最简单的方法就是:如果当前任务涉及到网络读取,文件读取等,这类都是 IO 密集型任务,除此之外,可以看成是 CPU 密集型任务。
本文篇幅比较长,难免有描述不对的地方,欢迎大家留言指出!