假设我们正在编写一个简单的应用程序,该应用程序从客户端接收一些输入,对其进行一些CPU密集型处理,然后记录输出。我们编写的代码看起来像以下内容:
class ProcessingLibrary {
public void process (Input userInput) {
// 一些CPU密集型逻辑,用于处理用户输入
userInput.process();
// 记录结果
Logger.log(userInput.getResults());
}
}
看起来很简单,一个普通的函数,它接受用户输入,对其进行一些处理,然后返回输出,我们可以将这个库提供给客户端。但是,你的客户如果正在高并发地调用该进程函数,很快他们可能会抱怨他们的请求处理输入的时间太长。原因非常简单,当外部客户端调用你的函数时,调用线程被阻塞,因为实际处理是由调用线程进行的。
添加另一个要求:
为了解决这个问题,我们可以考虑利用客户端的CPU核心并将多线程纳入我们的代码。我们进行一些修改,具体如下:
class ProcessingLibrary {
public void process (Input userInput) {
Runnable runnable = () -> {
userInput.process();
Logger.log(userInput.getResults());
};
Thread thread = new Thread(runnable);
thread.start();
}
}
对于每个请求,最好不要阻塞调用线程,而是生成一个新线程来进行重度处理。调用线程可以从我们的处理方法中提前返回。确实,这比我们早期的版本有很大的改进。我们的客户开始使用新版本,他们比以前更满意了。但现在他们开始抱怨CPU使用率变得过高,并且在他们那边发生了崩溃。发生了什么?如果我们仔细检查代码,我们会发现我们没有限制正在生成的线程数!如果客户以非常高的速率调用进程函数,可能会生成数百甚至数千个线程,这对CPU来说是很大的开销。我们还必须限制生成的线程数。
注意!我们这里又添加了一个要求:
调用线程不应该被阻塞。
我们的逻辑不应生成无限多数量的线程。
我们想要的是,调用线程不应该被阻塞,我们应该生成一定数量的线程,由客户根据他们的CPU资源和他们想要实现的并行度决定。实际数量取决于请求到达的速率和平均请求时间。线程池是解决这个问题的理想解决方案。线程池是一个简单的概念,可以并行执行应用程序代码并利用CPU核心。线程池包含一组固定数量的可重复使用的工作线程,它们执行分配给它们的任务,而不会阻塞调用线程。我们下面看看如何实现一个简单的线程池。
我们可以得出几个简单的观察:
我们的线程应该是可重复使用的,并且应该在请求到达时惰性创建。线程创建是一个昂贵的过程(至少在JAVA中是这样)。
如果请求到达的速率远高于线程池中的线程数,我们可以在其他请求执行完毕时将请求输入保持在等待状态,然后当一个线程完成处理一个请求时,它可以从请求行中获取另一个请求并开始处理它。通过这种方式,我们仍然可以实现相当高的并行性并获得更多的请求吞吐量。
队列可以成为存储我们传入请求的良好数据结构,而我们的线程可以在完成先前的项目后不断地从队列中获取项目。
考虑以上要求,我们为线程池编写一个简单的类。
class ThreadPool {
private BlockingQueue<Runnable> taskQueue;
private Integer poolSize;
private AtomicInteger currentPoolSize;
public ThreadPool(int poolSize) {
this.poolSize = poolSize;
this.taskQueue = new LinkedBlockingQueue<>();
this.currentPoolSize = new AtomicInteger(0);
}
public void submitTask(Runnable runnable) {
this.taskQueue.add(runnable);
if(this.currentPoolSize.get() < this.poolSize) {
// 如果有更多的池大小可用,创建一个新线程
// 这个线程也应该被重新用于未来任务
// 因此,它应该继续从队列中寻找更多的任务
this.currentPoolSize.incrementAndGet();
this.createSingleThreadForPool();
}
}
private void createSingleThreadForPool() {
Runnable poolRunner = () -> {
while(true) {
if(this.taskQueue.size() > 0) {
Runnable taskFromQueue = this.taskQueue.poll();
taskFromQueue.run();
}
}
};
new Thread(poolRunner).start();
}
}
从以上实现中可以得到以下几点:
BlockingQueue
是一个线程安全的队列实现。我们需要确保线程安全,因为多个线程正在访问共享状态。AtomicInteger
也是如此,用于线程安全更新我们当前的池大小。
池运行者中的while
循环是为了确保该线程保持活动状态,以便我们在想要接收更多任务时可以继续运行。
我们可以更改我们对ProcessingLibrary
的实现,如下:
class ProcessingLibrary {
private ThreadPool threadPool;
public ProcessingLibrary(int poolSize) {
this.threadPool = new ThreadPool(poolSize);
}
public void process (Input userInput) {
Runnable runnable = () -> {
userInput.process();
Logger.log(userInput.getResults());
};
this.threadPool.submitTask(runnable);
}
}
现在,我们已经满足了对这个问题的两个要求 :)
在Java中,concurrent库提供了与此类似的内容,称为ExecutorService
。虽然我们讨论的实现有一些注意事项,例如我们生成的线程一直在等待,但这是一个理解线程池内部工作原理的良好起点。