在上一篇文章C++使用socket实现与微信小程序通信(下)中,小懵白就给大家简要地讲解了线程池的原理。
今天呢,小懵白就给大家继续讲解C++如何实现封装线程池类。
第一步
首先,我们需要定义生产者、消费者的存储容器类型。
在这里呢,消费者的容器是通过使用SLT中的vector容器来实现的:std::vector<std::thread> pool;
而任务生产者容器则是由SLT的queue容器来实现:std::queue<Task>task ;
定义完生产者、消费者的容器类型后,接着就是要构建消费者容器:往pool里添加一定数量N的阻塞线程。
简要代码如下:
bool Init_ThreadPool(){
for(int i=0;i<max_thread;i++) //max_thread为初始化线程最大化量
pool.push_back(std::thread(&ThreadPool::runtask,this));
return true;
}
在初始化中,每一个线程都绑定了runtask函数,也就是线程要执行的函数,该函数是用来实现获取task的。
简要代码如下:
void runtask(){
while(!stop){
std::unique_lock<std::mutex>lk(_mutex);
//当任务队列为空或者stop为假时,阻塞当前线程,直到条件变量唤醒
condition.wait(lk,[this]{return (!task.empty()||stop);});
/**********************************************
std::move是将对象的状态或者所有权从一个对象转移到另一个对象,
只是转移,没有内存的搬迁或者内存拷贝。
***********************************************/
Task ta=move(task.front());// 取一个 task
task.pop(); //从队列移除正在执行的任务
lk.unlock();
ta(); //执行task任务
condition.notify_one(); //通知等待一个线程
}
//此处补充销毁线程代码
}
在该代码中,bool型的stop变量是用来判定线程是否终止的,每当一次while循环后进行判断stop,若为假则继续阻塞线程然后获取任务,否则的话结束线程。
消费者实现后,接下来就是实现生产者了。在此之前,还需要做一件事情:同一规划任务变量。
using Task = std::function<void()>;
这是由于每一个task函数的返回类型都不可能是统一的,有的是void类型的,有的是bool类型的,更还有的是指针类型的。
为了能够实现代码的高效性,这里采用了std::function<void()>思想,定义了Task类型。
接着就是实现生产者了:
bool Add_task(const Task&t){
if(task.size()==max_queue)Warn_LOG("添加任务,任务队列已满,准备阻塞");
if(thread_run==max_thread)Warn_LOG("添加任务,但线程已经分配完");
std::unique_lock<std::mutex>lk(_mutex);
while(task.size()==max_queue||stop){ //要是任务数量到了最大,就等待处理完再添加
condition.wait(lk);
}
if(stop)return false;
task.push(t);//给队列中添加任务
task_numble=task.size();
std::cout<<"添加成功。"<<std::endl<<std::endl;
condition.notify_one(); //通知等待一个线程
return true;
}
为此,到这里的时候,生产者和消费者的线程池初步构建完成了
第二步:解说
在第一步中,我们通过使用STL中的vector容器存储了已初始化的线程后,那么接下来是如何实现生产者-消费者模式呢。
在生产者pool容器中,每一个线程都是绑定了runtask函数,也就是说每一个线程都是在执行runtask函数。
而在runtask函数代码块里,首先是判断!stop条件是否满足,若不满足,则结束线程。
否则的话,添加互斥锁,若任务队列为空或者stop为假时,使用条件锁std::condition_variable阻塞当前线程。
直到条件变量唤醒后,才从任务队列中获取一个task,解锁互斥锁并执行task,直至完成并通知等待一个线程,然后重新循环判断。
第三步:封装
实现完上面的功能后,接下来就是对功能进行封装了。代码如下:
class ThreadPool{
using Task = std::function<void()>;
public:
ThreadPool();
~ThreadPool();
bool Init_ThreadPool(); //初始化线程
bool Add_task(const Task&t); //添加任务
void End_threadpool(); //销毁线程
int thread_num; //当前的线程数量
int task_numble; //任务队列
protected:
void runtask();
void Expand_thread();
bool Destroy_thread();
int id;
private:
int max_thread;//初始化线程数量
int max_queue;//初始化 任务队列数量
std::vector<std::thread> pool;// 线程池、任务队列
std::queue<Task>task ;
std::mutex _mutex ;//互斥锁条件变量
std::condition_variable condition;
bool stop; //停止标志位
int key; //目前正在执行任务的线程数量
};
其中,~ThreadPool()的代码实现如下:
ThreadPool::~ThreadPool(){
End_threadpool();
}
void ThreadPool::End_threadpool(){
stop=true; //停止读取任务
while(thread_run){}//等待线程还没有执行完的任务
for(int i =0 ;i<pool.size();i++) //销毁线程
pool[i].join() ;
std::queue<Task> empty;
swap(empty,task);//清空任务队列
pool.clear(); //清空线程池
}
至此,线程池类封装就实现了,其余功能函数在这就不多讲了,有兴趣的同学可自行完成。
好了,今天关于C++实现线程池的话题,就讲到这里了,关于源代码的问题,我整理好后会放在公众号(小懵白生活小趣谈)后台里面。