IE盒子

搜索
查看: 89|回复: 1

十分钟带你实现C++线程池

[复制链接]

2

主题

8

帖子

15

积分

新手上路

Rank: 1

积分
15
发表于 2023-1-7 19:24:20 | 显示全部楼层 |阅读模式
前言

线程池是服务端开发中的一个常用组件,Java中自己带有线程池的API,但是C++标准库是没有的,需要开发人员自己实现。本文首先会分析一个线程池对象的基本组件,然后一步一步实现一个通用任务线程池。最后本文会演示一下日常开发中线程池使用不当的一个经典问题,即线程池hang住导致服务不可用。完整代码在文章末尾,可以点赞+收藏哦~
线程池类基本组件


  • 一组线程
  • 一个并发安全的任务队列
  • 一个 任务入队接口
线程池实现的基本原理

线程池在启动后,会创建若干数量的线程,每个线程内部都是一个死循环,开始时处于wait()状态。

  • 线程等待外部接口往任务队列添加任务,然后将自己唤醒,执行任务后再次处于wait()状态。
  • 外部接口往任务队列添加任务,然后等待任务返回结果。
线程池实现

现在我们采用C++11来一步一步实现一个线程池。
首先定义一个类,以及其中必要的成员变量和接口。
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>

class ThreadPool{
public:
    explicit ThreadPool(int num) : thread_nums_(num){}
    ThreadPool(const ThreadPool& rhs) = delete;
    ThreadPool& operator=(const ThreadPool& rhs) = delete;
    ~ThreadPool(){}
    void enqueue(std::function<void>() cb);

private:
    int thread_nums_;

    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> job_queue_;

    std::mutex mtx_;
    std::condition_variable cv_;
};
上述代码基本涵盖了一个线程池的所有组件,包含一组固定数量线程,一个任务队列,以及同步工具互锁和条件变量。enqueue接口支持将任务函数添加到队列中,但这样的接口设计,我们能够处理的任务类型不是通用的,我们不能获取异步执行返回的结果,同时不能够传递参数,因此我们需要利用模板来让我们的任务类型更加通用。
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>

class ThreadPool{
public:
    explicit ThreadPool(int num) : thread_nums_(num){}
    ThreadPool(const ThreadPool& rhs) = delete;
    ThreadPool& operator=(const ThreadPool& rhs) = delete;
    ~ThreadPool(){}
    template<class F, class... Args>    // ++
    void enqueue(F&& f, Args&& ...args);// ++

private:
    int thread_nums_;

    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> job_queue_;

    std::mutex mtx_;
    std::condition_variable cv_;
};
这样的接口,我们的任务函数可以支持任意参数了,但是仍然不支持返回值,想要得到返回值,这样我们又需要对enqueue接口做一些改变。我们会用到std::future 作为我们的返回值,返回值的类型可以在编译器确定,通过type_traits技术, 即std::result_of 来达到我们的目的。我们对 enqueue接口做一下改变:
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <future>

class ThreadPool{
public:
    explicit ThreadPool(int num) : thread_nums_(num){}
    ThreadPool(const ThreadPool& rhs) = delete;
    ThreadPool& operator=(const ThreadPool& rhs) = delete;
    ~ThreadPool(){}
    template<class F, class... Args>
    auto enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>; // ++--

private:
    int thread_nums_;

    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> job_queue_;

    std::mutex mtx_;
    std::condition_variable cv_;
};
接下来就可以实现各个接口啦。
我们首先看一下enqueue接口:
    auto enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>{
        using return_type = typename std::result_of<F(Args...)>::type; // alias

        // package
        auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<return_type> res = task->get_future();
        {
            std::lock_guard<std::mutex> lock(mtx_);
            job_queue_.emplace([task](){ (*task)();});
        }
        cv_.notify_one();

        return res;
    }
这里的核心逻辑在于并发安全的将任务添加到任务队列中,同时通过条件变量唤醒一个工作线程。但是我们这里为了获取返回值,我们将通过完美转发将任务先封装成 std::packaged_task的共享指针,然后为这个task封装一个lambda放入队列中,这里用共享指针是为了延长task的生命周期,是C++异步编程中的常见手法。这里还有一点值得注意的是,我们把临界区设置的尽可能小,从而减少数据竞争,提升性能。
接下来我们看一下线程池的启动函数的具体实现,我们把启动函数直接放在构造函数里,代码如下:
    explicit ThreadPool(int num) : thread_nums_(num){
        for(int i = 0; i < thread_nums_; i++){
            workers_.emplace_back([this](){
                while(true){
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mtx_);
                        cv_.wait(lock, [this](){ return !job_queue_.empty();});
                        task = std::move(job_queue_.front());
                        job_queue_.pop();

                    }
                    task();
                }
            });
        }
    }
这里线程的执行体就是一个死循环,然后当有任务加到队列中后,线程会被条件变量唤醒,执行任务。
写完这两个接口,我们的线程池就可以正常工作了,但是有一个问题在于,假如我们的线程池被析构了,那么它所管理的线程该如何被回收回来呢,因为线程内部是死循环,我们在析构时是不能直接join()的,我们需要加一个stop标志,当stop设为true时,线程池中的线程要主动退出死循环,方面线程池回收线程,正确析构。因为我们要完成析构函数。代码如下:
    ~ThreadPool(){
        {
            std::lock_guard<std::mutex> lock(mtx_);
            stop = true;
        }
        cv_.notify_all();
        for(int i = 0; i < workers_.size(); i++){
            workers_.join();
        }
    }
同时在线程的条件变量中加一个判断条件,以及线程退出语句。如下所示:
    explicit ThreadPool(int num) : thread_nums_(num), stop(false){
        for(int i = 0; i < thread_nums_; i++){
            workers_.emplace_back([this](){
                while(true){
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mtx_);
                        cv_.wait(lock, [this](){ return this->stop || !job_queue_.empty();});  // ++--
                        if(this->stop && job_queue_.empty()){  return;} // ++
                        task = std::move(job_queue_.front());
                        job_queue_.pop();

                    }
                    task();
                }
            });
        }
    }
这样我们的线程池就实现完成了。
完整代码

下面贴一下完整代码:
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <future>
#include <memory>

class ThreadPool{
public:
    explicit ThreadPool(int num) : thread_nums_(num), stop(false){
        for(int i = 0; i < thread_nums_; i++){
            workers_.emplace_back([this](){
                while(true){
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mtx_);
                        cv_.wait(lock, [this](){ return this->stop || !job_queue_.empty();});
                        if(this->stop && job_queue_.empty()){
                            return;
                        }
                        task = std::move(job_queue_.front());
                        job_queue_.pop();

                    }
                    task();
                }
            });
        }
    }
    ThreadPool(const ThreadPool& rhs) = delete;
    ThreadPool& operator=(const ThreadPool& rhs) = delete;
    ~ThreadPool(){
        {
            std::lock_guard<std::mutex> lock(mtx_);
            stop = true;
        }
        cv_.notify_all();
        for(int i = 0; i < workers_.size(); i++){
            workers_.join();
        }
    }
    template<class F, class... Args>
    auto enqueue(F&& f, Args&& ...args) -> std::future<typename std::result_of<F(Args...)>::type>{
        using return_type = typename std::result_of<F(Args...)>::type; // alias

        // package
        auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<return_type> res = task->get_future();
        {
            std::lock_guard<std::mutex> lock(mtx_);
            job_queue_.emplace([task](){ (*task)();});
        }
        cv_.notify_one();

        return res;
    }

private:
    int thread_nums_;

    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> job_queue_;

    std::mutex mtx_;
    std::condition_variable cv_;
    bool stop;
};
线程池打满导致服务hang住问题分析

我们先来看一段代码:
ThreadPool g_pool(4);
int worker2(){
    std::cout << "worker2" << std::endl;
    return 0;
}
int worker(int x, float y){
    std::cout << "worker1" << std::endl;
    sleep(1);
    auto res = g_pool.enqueue(worker2);
    std::cout << res.get() << std::endl;
    return x+y;
}

int main(){
   
    auto res = g_pool.enqueue(worker, 123, 34.5);
    auto res2 = g_pool.enqueue(worker, 123, 34.5);
    std::cout << res.get() << std::endl;
    std::cout << res2.get() << std::endl;
   
    return 0;

}
这是利用刚才实现的线程池 写的一个简单测试程序,线程池有4个线程,然后向其加入两个worker任务,然后worker任务内部的子任务也会被塞进线程池中完成。
编译运行,得到结果如下:
worker1
worker1
worker2
worker2
0
157
0
157
可以正常运行。
然后我们将线程池的任务数量调为2,
发现,得到的结果如下:
worker1
worker1
程序不再继续往下执行了。这是因为worker2在等待线程池中有空闲线程,而线程池中的所有线程又都在等待其子任务的结果,因此陷入了互相等待的场景,从而hang住。
在c++服务端开发中,这是一个经典问题。主要出现在线程池饱和度过高,同时业务处理线程中的所有异步操作都放入同一个线程池中导致的。这种问题的解法主要还是对线程池进行分类,在不同阶段使用不同的线程池,避免出现相互等待的情况。
小结

线程池是服务端开发的常用组件,本文由浅入深,一步一步实现一个可以执行通用任务的线程池类,能够让大家更好的理解线程池实现的要点,同时为大家演示了线程池使用不规范导致服务hang住的一个经典案例,能够帮助大家更好更加规范的使用线程池完成日常开发。由于笔者水平有限,如果文章中有错误或不当之处,还请指出,我会及时纠正,以免误导他人。
参考文献


  • GitHub - progschj/ThreadPool: A simple C++11 Thread Pool implementation
回复

使用道具 举报

1

主题

6

帖子

11

积分

新手上路

Rank: 1

积分
11
发表于 2025-5-18 19:17:22 | 显示全部楼层
路过 帮顶 嘿嘿
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表