|
前言
线程池是服务端开发中的一个常用组件,Java中自己带有线程池的API,但是C++标准库是没有的,需要开发人员自己实现。本文首先会分析一个线程池对象的基本组件,然后一步一步实现一个通用任务线程池。最后本文会演示一下日常开发中线程池使用不当的一个经典问题,即线程池hang住导致服务不可用。完整代码在文章末尾,可以点赞+收藏哦~
线程池类基本组件
- 一组线程
- 一个并发安全的任务队列
- 一个 任务入队接口
线程池实现的基本原理
线程池在启动后,会创建若干数量的线程,每个线程内部都是一个死循环,开始时处于wait()状态。
- 线程等待外部接口往任务队列添加任务,然后将自己唤醒,执行任务后再次处于wait()状态。
- 外部接口往任务队列添加任务,然后等待任务返回结果。
线程池实现
现在我们采用C++11来一步一步实现一个线程池。
首先定义一个类,以及其中必要的成员变量和接口。
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
class ThreadPool{
public:
explicit ThreadPool(int num) : thread_nums_(num){}
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
~ThreadPool(){}
void enqueue(std::function<void>() cb);
private:
int thread_nums_;
std::vector<std::thread> workers_;
std::queue<std::function<void()>> job_queue_;
std::mutex mtx_;
std::condition_variable cv_;
};
上述代码基本涵盖了一个线程池的所有组件,包含一组固定数量线程,一个任务队列,以及同步工具互锁和条件变量。enqueue接口支持将任务函数添加到队列中,但这样的接口设计,我们能够处理的任务类型不是通用的,我们不能获取异步执行返回的结果,同时不能够传递参数,因此我们需要利用模板来让我们的任务类型更加通用。
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
class ThreadPool{
public:
explicit ThreadPool(int num) : thread_nums_(num){}
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
~ThreadPool(){}
template<class F, class... Args> // ++
void enqueue(F&& f, Args&& ...args);// ++
private:
int thread_nums_;
std::vector<std::thread> workers_;
std::queue<std::function<void()>> job_queue_;
std::mutex mtx_;
std::condition_variable cv_;
};
这样的接口,我们的任务函数可以支持任意参数了,但是仍然不支持返回值,想要得到返回值,这样我们又需要对enqueue接口做一些改变。我们会用到std::future 作为我们的返回值,返回值的类型可以在编译器确定,通过type_traits技术, 即std::result_of 来达到我们的目的。我们对 enqueue接口做一下改变:
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <future>
class ThreadPool{
public:
explicit ThreadPool(int num) : thread_nums_(num){}
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
~ThreadPool(){}
template<class F, class... Args>
auto enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>; // ++--
private:
int thread_nums_;
std::vector<std::thread> workers_;
std::queue<std::function<void()>> job_queue_;
std::mutex mtx_;
std::condition_variable cv_;
};
接下来就可以实现各个接口啦。
我们首先看一下enqueue接口:
auto enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>{
using return_type = typename std::result_of<F(Args...)>::type; // alias
// package
auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::lock_guard<std::mutex> lock(mtx_);
job_queue_.emplace([task](){ (*task)();});
}
cv_.notify_one();
return res;
}
这里的核心逻辑在于并发安全的将任务添加到任务队列中,同时通过条件变量唤醒一个工作线程。但是我们这里为了获取返回值,我们将通过完美转发将任务先封装成 std::packaged_task的共享指针,然后为这个task封装一个lambda放入队列中,这里用共享指针是为了延长task的生命周期,是C++异步编程中的常见手法。这里还有一点值得注意的是,我们把临界区设置的尽可能小,从而减少数据竞争,提升性能。
接下来我们看一下线程池的启动函数的具体实现,我们把启动函数直接放在构造函数里,代码如下:
explicit ThreadPool(int num) : thread_nums_(num){
for(int i = 0; i < thread_nums_; i++){
workers_.emplace_back([this](){
while(true){
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this](){ return !job_queue_.empty();});
task = std::move(job_queue_.front());
job_queue_.pop();
}
task();
}
});
}
}
这里线程的执行体就是一个死循环,然后当有任务加到队列中后,线程会被条件变量唤醒,执行任务。
写完这两个接口,我们的线程池就可以正常工作了,但是有一个问题在于,假如我们的线程池被析构了,那么它所管理的线程该如何被回收回来呢,因为线程内部是死循环,我们在析构时是不能直接join()的,我们需要加一个stop标志,当stop设为true时,线程池中的线程要主动退出死循环,方面线程池回收线程,正确析构。因为我们要完成析构函数。代码如下:
~ThreadPool(){
{
std::lock_guard<std::mutex> lock(mtx_);
stop = true;
}
cv_.notify_all();
for(int i = 0; i < workers_.size(); i++){
workers_.join();
}
}
同时在线程的条件变量中加一个判断条件,以及线程退出语句。如下所示:
explicit ThreadPool(int num) : thread_nums_(num), stop(false){
for(int i = 0; i < thread_nums_; i++){
workers_.emplace_back([this](){
while(true){
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this](){ return this->stop || !job_queue_.empty();}); // ++--
if(this->stop && job_queue_.empty()){ return;} // ++
task = std::move(job_queue_.front());
job_queue_.pop();
}
task();
}
});
}
}
这样我们的线程池就实现完成了。
完整代码
下面贴一下完整代码:
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <future>
#include <memory>
class ThreadPool{
public:
explicit ThreadPool(int num) : thread_nums_(num), stop(false){
for(int i = 0; i < thread_nums_; i++){
workers_.emplace_back([this](){
while(true){
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this](){ return this->stop || !job_queue_.empty();});
if(this->stop && job_queue_.empty()){
return;
}
task = std::move(job_queue_.front());
job_queue_.pop();
}
task();
}
});
}
}
ThreadPool(const ThreadPool& rhs) = delete;
ThreadPool& operator=(const ThreadPool& rhs) = delete;
~ThreadPool(){
{
std::lock_guard<std::mutex> lock(mtx_);
stop = true;
}
cv_.notify_all();
for(int i = 0; i < workers_.size(); i++){
workers_.join();
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>{
using return_type = typename std::result_of<F(Args...)>::type; // alias
// package
auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::lock_guard<std::mutex> lock(mtx_);
job_queue_.emplace([task](){ (*task)();});
}
cv_.notify_one();
return res;
}
private:
int thread_nums_;
std::vector<std::thread> workers_;
std::queue<std::function<void()>> job_queue_;
std::mutex mtx_;
std::condition_variable cv_;
bool stop;
};
线程池打满导致服务hang住问题分析
我们先来看一段代码:
ThreadPool g_pool(4);
int worker2(){
std::cout << &#34;worker2&#34; << std::endl;
return 0;
}
int worker(int x, float y){
std::cout << &#34;worker1&#34; << std::endl;
sleep(1);
auto res = g_pool.enqueue(worker2);
std::cout << res.get() << std::endl;
return x+y;
}
int main(){
auto res = g_pool.enqueue(worker, 123, 34.5);
auto res2 = g_pool.enqueue(worker, 123, 34.5);
std::cout << res.get() << std::endl;
std::cout << res2.get() << std::endl;
return 0;
}
这是利用刚才实现的线程池 写的一个简单测试程序,线程池有4个线程,然后向其加入两个worker任务,然后worker任务内部的子任务也会被塞进线程池中完成。
编译运行,得到结果如下:
worker1
worker1
worker2
worker2
0
157
0
157 可以正常运行。
然后我们将线程池的任务数量调为2,
发现,得到的结果如下:
worker1
worker1 程序不再继续往下执行了。这是因为worker2在等待线程池中有空闲线程,而线程池中的所有线程又都在等待其子任务的结果,因此陷入了互相等待的场景,从而hang住。
在c++服务端开发中,这是一个经典问题。主要出现在线程池饱和度过高,同时业务处理线程中的所有异步操作都放入同一个线程池中导致的。这种问题的解法主要还是对线程池进行分类,在不同阶段使用不同的线程池,避免出现相互等待的情况。
小结
线程池是服务端开发的常用组件,本文由浅入深,一步一步实现一个可以执行通用任务的线程池类,能够让大家更好的理解线程池实现的要点,同时为大家演示了线程池使用不规范导致服务hang住的一个经典案例,能够帮助大家更好更加规范的使用线程池完成日常开发。由于笔者水平有限,如果文章中有错误或不当之处,还请指出,我会及时纠正,以免误导他人。
参考文献
- GitHub - progschj/ThreadPool: A simple C++11 Thread Pool implementation
|
|