|
其余内容见:
本文是《c++并发编程实战》第二版网址的读书笔记。
c++多线程历史
首先,c++标准目前还是只支持了多线程,多进程需要用操作系统API。
C++98(1998)标准不承认线程的存在,所以需要调用操作系统的C语言 API来实现多线程,比如:POSIX标准中的C标准、Microsoft Windows API。这显然是不跨平台的,每个平台都要编写一份代码。
c++11正式支持了多线程编程,提供标准库,而且标准库借鉴了Boost,会跟Boost的多线程库很像。显然c++11的多线程标准库是跨平台的,程序猿不再需要担心底层。
C++11 多线程相关的头文件:
- <atomic>:该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数。
- <thread>:该头文件主要声明了 std::thread 类,另外 std::this_thread 命名空间也在该头文件中。
- <mutex>:该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock,std::try_lock,std::lock_guard, std::unique_lock, 以及其他的类型和函数。
- <condition_variable>:该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any。
- <future>:该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::future 和 std::shared_future 两个 Future 类,另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中。
线程管理
启动线程
std::thread定义在<thread>头文件,是线程类。
当构造std::thread时并且指定了线程函数,就会启动一个新的线程,该线程将执行线程函数。
void do_some_work();
std::thread my_thread(do_some_work); // 启动了一个新线程
note:如果仅仅是调用无参数的构造函数,不会启动一个新的线程,直到指定了线程函数。
如下:
std::vector<std::thread> threads(10); // 此时没有启动线程
for(int i=0;i<10;++i){
threads=std::thread(do_some_work);
// 此时才启动线程
}
std::thread的构造函数的第一个参数是线程函数(callable对象:函数、仿函数、匿名函数、成员函数),其余参数是线程函数的参数。这里要十分注意是如何传递参数的,到后面再详细讲。
这里举一个成员函数的例子,调用成员函数的一个好处是,类可以有数据成员。
class X
{
public:
void do_lengthy_work(int x);
};
X my_x;
std::thread t(&X::do_lengthy_work,&my_x,1);
// 第一个参数是成员函数,第二个是对象指针,第三个是成员函数的第一个参数
note:可以向线程函数传递参数,线程函数也可以访问到全局变量。
join、detach
当构造完参数之后,我们需要确定线程的运行方式,是等待线程结束(阻塞式join), 还是让其自主运行(非阻塞式detach)。
detach其实就是一个守护线程。
守护进程的概念:发起就遗忘,即使当前进程运行到结束,但守护进程依旧继续执行,直到整个程序被关闭了。总而言之,守护进程不需要.join()等待了,拉起来就让它一直伴随左右。
为什么要引入守护进程呢?看linux就知道了:
由于在Linux中,每一个系统与用户进行交流的界面称为终端,每一个从此终端开始运行的进程都会依附于这个终端,这个终端称为这些进程的控制终端,当控制终端被关闭时,相应的进程都会自动关闭。
但是守护进程却能够突破这种限制,它从被执行开始运转,直到接收到某种信号或者整个系统关闭时才会退出。如果想让某个进程不因为用户、终端或者其他的变化而受到影响,那么就必须把这个进程变成一个守护进程。可见,守护进程是非常重要的。
此外,std::thread还有一个joinable()方法用于检测线程是否有效。
- joinable:代表该线程是可执行线程(即刚初始化完成,拥有线程所有权,且没结束运行)
- not-joinable:通常一下几种情况会导致线程成为not-joinable:
- 由thread的缺省构造函数构造而成(thread()没有参数);
- 该thread被move过(包括move构造和move赋值);
- 该线程调用过join或者detach(调用过后就置为non-joinable);
<hr/>线程不安全行为
下面介绍detach和join这两种情况下的不安全行为:
- detach。由于detach不阻塞,很有可能访问到已经析构的对象。如:
struct func
{
int& i;
func(int& i_) : i(i_) {}
void operator() ()
{
for (unsigned j=0 ; j<1000000 ; ++j)
{
do_something(i); // 潜在访问隐患: 悬空引用
}
}
};
void oops()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread my_thread(my_func);
my_thread.detach();
// detach不等待线程结束,所以很有可能变量i已经析构了
}
struct func; // 定义在清单2.1中
void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
try{
do_something_in_current_thread();
}
catch(...){
t.join();
throw;
}
t.join();
}
当发生异常这类特殊情况时,join很容易遗漏,发生不安全的行为。
当然上面的代码也是不规范的,应该用RAII技术,这样就可以确保安全。
class thread_guard
{
private:
std::thread& t;
public:
explicit thread_guard(std::thread& t_):t(t_){}
~thread_guard(){
if(t.joinable()){ // 是否是可执行的线程
t.join();
}
}
};
向线程函数传递参数
向线程函数传递参数是第一个小难点。
首先,新拉起的线程肯定是开辟新的参数副本,这样就能避免临界区竞争、访问已经被析构的变量等问题。
一句话来说,std::thread拉起一个新线程,会进行两次copy:
- 第一次在调用std::thread构造函数时,会在新线程保存一份变量。可以是赋值构造、也可以是移动构造;
- 第二次在把保存的变量传递给线程函数时,这里统一是用std::move,即移动构造;
小结一下:第一次copy,会根据传递的参数类型选择是左值还是右值,第二次统一都是右值。
举个例子:
void process_big_object(Test t);
class Test
{
public:
Test(Test&&);
Test(const Test &);
};
Test t;
std::thread t(process_big_object,std::move(t));
// 第一步copy执行右值,第二步copy还是右值
std::thread t(process_big_object,t);
// 第一步copy执行左值,第二步copy执行右值
<hr/>因为新的线程是开辟了新的参数副本,但这样做也有缺点,不能用引用传递参数了,也就不能修改原值。
这时我们需要用c++11提供的std::ref这个模板函数,它会返回一个类,本质上是维护了一个指针,所有操作都会改变指针指向的变量,相当于起到引用的功能。
举个例子:
void update_data_for_widget(widget_id w,widget_data& data); // 引用传参
void oops_again(widget_id w)
{
widget_data data;
std::thread t(update_data_for_widget,w,data); // error 右值不可以匹配到左值引用
display_status();
t.join();
}
因为在第二次传参时,发生了std::move强转为右值,不能匹配widget_data&左值引用,自然报错。如果非要用引用传参,需要用std::ref。
把那一行代码改为std::thread t(update_data_for_widget,w,std::ref(data));就可以了。
我们会发现std::thread和std::bind的参数传递过程很像,先在本地copy一份副本,当真正执行时,再把参数传入到执行函数。本质上是因为thread和bind都不知道用户何时执行,为了避免发生悬挂指针等错误,保存一份副本是值得的行为。
<hr/>在向线程函数传递参数时,我们需要注意以下场景:
void f(int i,std::string const& s);
void oops(int some_param)
{
char buffer[1024];
sprintf(buffer, &#34;%i&#34;,some_param);
std::thread t(f,3,buffer);
t.detach();
}
buffer是数组指针,在构造时采用浅拷贝,只拷贝了buffer指针,所以新的线程还是访问了主线程的buffer[1024]。让事情雪上加霜的是,采用非阻塞式detach,这意味着很有可能新线程在访问数组时,数组已经被析构了!
解决方法是采用深拷贝,在传入前转换为std::string:
void f(int i,std::string const& s);
void oops(int some_param)
{
char buffer[1024];
sprintf(buffer, &#34;%i&#34;,some_param);
std::thread t(f,3,std::string(buffer)); // 避免悬挂指针
t.detach();
}
转移线程所有权
C++标准库中有很多资源占有(resource-owning)类型, 比如std::ifstream,std::unique_ptr还有 std::thread都是可移动,但不可拷贝。 这就说明执行线程的所有权可以在std::thread实例中移动。
支持移动的话,thread就可以作为参数在函数间传递。还有一个好处,可以提供一个包装类来转移线程的所有权。
还记得吗,在开始我们讲了一个RAII来包装线程,现在来完善一下:
class scoped_thread
{
private:
std::thread t; // 内部维护一个thread
public:
explicit scoped_thread(std::thread t_):
t(std::move(t_)) //移动构造转移所有权
{
if(!t.joinable()) // 如果不可执行,error
throw std::logic_error(“No thread”);
}
~scoped_thread()
{
t.join(); // RAII 确保安全
}
};
如果不支持移动构造,scoped_thread没法获得std::thread的所有权。
c++20提供的std::jthread跟scoped_thread很类似,主要是为了join能正常地执行。
决定线程数量
我们知道线程数量很重要,太多反而增大切换线程的开销,太少又不能完全释放cpu的性能。
std::thread::hardware_concurrency()在新版C++标准库中是一个很有用的函数。这个函数将返回能同时并发在一个程序中的线程数量。例如,多核系统中,返回值可以是CPU核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。
举个累加的例子:
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
// accumulate标准库函数
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) // 空 直接退出
return init;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 2
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads= // 3
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size=length/num_threads; // 4
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1); // 5
Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); // 6
threads=std::thread( // 7
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results));
block_start=block_end; // 8
}
accumulate_block<Iterator,T>()(
block_start,last,results[num_threads-1]); // 9
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join)); // 10
return std::accumulate(results.begin(),results.end(),init); // 11
}
识别线程
线程标识类型是std::thread::id,可以通过两种方式进行检索。
- 第一种,成员函数get_id()来直接获取;
- 第二种,当前线程中调用std::this_thread::get_id();
如果std::thread对象没有与任何执行线程相关联,get_id()将返回std::thread::type默认构造值,这个值表示“没有线程”。
std::thread::id实例常用作检测线程是否需要进行一些操作,比如:当用线程来分割一项工作,主线程可能要做一些与其他线程不同的工作。
std::thread::id master_thread;
void some_core_part_of_algorithm()
{
if(std::this_thread::get_id()==master_thread)
{
do_master_thread_work();
}
do_common_work();
}
线程间共享数据
首先要提一个概念:原子操作。无论是lock上锁,还是信号量加减,都必须是原子操作。即该操作要么不执行,一执行就不可中断,直到运行结束。
很好理解,比如lock上锁,如果该过程执行到一半,cpu切换了线程,进而会导致异常。
解决办法:
- 要么你对lock继续上锁,可这样无穷递归无法解决问题 ×
- 干脆操作系统规定有些操作不可以中断,就有了原子操作的概念 √
互斥量
c++11提供了:
- 互斥量类
- 管理mutex的RAII类
- lock、unlock、try_lock函数。
互斥量类
- std::mutex用于互斥锁的互斥量
- std::shared_mutex用于读写锁的互斥量
读写锁是一种特殊的互斥锁。互斥锁不管三七二十一对所有线程的访问都上锁,但往往在很多场景中,读的频率比写的频率高,盲目上互斥锁是不值当的。
正确的做法是,读写锁是一对互斥锁,分为读锁和写锁。读锁和写锁互斥,让一个线程在进行读操作时,不允许其他线程的写操作,但是不影响其他线程的读操作;当一个线程在进行写操作时,不允许任何线程进行读操作或者写操作。
简单来说就是,写锁会排斥读和写,但是读锁只排斥写,这样的好处就很明显,在读多写少的应用场景下,比其他互斥锁性能要好很多。
lock、unlock、try_lock函数
- lock、unlock、try_lock
- lock_shared、unlock_shared、try_lock_shared
顾名思义,末尾有_shared,表示对上读锁、解读锁操作;无_shared表示对上写锁、解写锁操作。
管理mutex的RAII类
- std::lock_guard(C++11)最简单,构造时lock,析构时unlock
- std::unique_lock(C++11)支持移动,加锁解锁灵活
- std::share_lock(C++14)针对读写锁,构造时lock_shared,析构时unlock_shared
- std::scoped_lock(C++17)支持批量加锁,批量解锁
<hr/>下面来看一下应用:
mutex和lock_guard
在实际代码中,非常不推荐直接使用std::mutex,一般都用RAII技巧。
C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard,其会在构造的时候提供已锁的互斥量,并在析构的时候进行解锁。
互斥量自然是作用在临界区,比如全局变量。
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list; // 全局对象 list
std::mutex some_mutex; // 互斥锁
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex); // 锁
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex); // 锁
return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}
std::lock_guard功能有限,实现起来非常简单,只能在离开作用域时,通过析构去解析,不能中途unlock,而且不支持移动语义。
c++11还提供了一个比std::lock_guard功能更齐全的lock管理类,std::unique_lock。
unique_lock
std::unique_lock具有以下特点:
- 支持移动语义
支持移动语义,但当然不能支持copy。这也是unique名字的由来,可以转移所有权,这样std::unique_lock可以作为函数参数被传递下去,有更长的生命周期。
- 加锁解锁更灵活
std::lock_guard在构造时加锁,在析构时解锁。而unique_lock可以延迟加锁,也可以提取解锁。拥有lock(),unlock(),try_lock()成员函数,因此unique_lock也可以作为全局函数std::lock、std::unlock的参数。其实,unique_lock会维护一个标志来判断是否拥有互斥量,可以通过成员函数owns_lock()查看。
我们主要来看一下unique_lock延迟加锁的例子,需要用到std::defer_lock。
std::unique_lock<std::mutex> lock_a(mutex1,std::defer_lock);
std::unique_lock<std::mutex> lock_b(mutex2,std::defer_lock);
// std::def_lock表示没上锁,后面可以上锁,延迟加锁
std::lock(lock_a,lock_b);
// 互斥量在这里上锁!
// 说明unique_lock可以作为std::lock的参数
std::unique_lock确实更灵活,但是代价是性能上的开销,因为要维护更新内部标识。能用lock_guard就不要用unique_lock
shared_mutex和shared_lock
shared_mutex是用于读写锁的互斥量,std::share_lock与std::lock_guard相对应,前者管理读锁,构造时lock_shared,析构时unlock_shared;后者管理写锁,构造时lock,析构时unlock。
来看一个例子
class Test
{
private:
Resource resource;
mutable std::shared_mutex source_mutex; // mutable一个经典用途,read读操作应该是const的
public:
Element read(int i) const
{
std::shared_lock<std::shared_mutex> lk(source_mutex); // shared_lock上读锁
Element e = resource
return e
}
void write(int i,element e)
{
std::lock_guard<std::shared_mutex> lk(source_mutex); // lock_guard上写锁
resource=e;
}
};
<hr/>加锁策略
std::defer_lock、std::try_to_lock、std::adopt_lock比较常见,统一讲一下。
它们都是空结构体标签类型,用来为std::lock_guard、std::unique_lock指定锁定策略。
- std::defer_lock,不获得互斥的所有权。表示该互斥元在构造时保持未被锁定,这个锁就可以在这之后通过lock来上锁,即延迟上锁。
- try_to_lock_t,尝试获得互斥的所有权而不阻塞。相当于调用try_lock。
- adopt_lock_t,假设调用方线程已拥有互斥的所有权。此时不再上锁,只会转移所有权。
使用互斥量就安全了吗
使用互斥量来保护数据,并不是仅仅在每一个成员函数中都加入一个std::lock_guard对象那么简单;一个指向类内部数据成员的指针或引用,将会让这种保护形同虚设。
所以尽量成员函数不要返回指针或引用,不要对外暴露指针或引用。
死锁
死锁一般在以下两种场景发生:
单线程对同一个锁加锁两次,就会导致死锁。
解决方法是用递归锁,会记录上一次对自己加锁的线程id,如果发现是当前线程与上一次加锁的线程是同一个则不再加锁。
解决方法一:始终要同一顺序加锁!比如两个锁A和B,所有线程都要按照先A后B,或者先B后A的顺序加锁。
解决方法二:批量加锁。c++11提供了std::lock和std::try_lock,可以同时对多个锁同时上锁。 <hr/>下面一一介绍避免死锁的方法:
std::lock是批量锁,可以一次性锁住多个互斥量,并且没有死锁风险。因为死锁发生就是因为加锁顺序不一致,而std::lock压根没有顺序这一说,同时加锁,要么全部加锁成功,要么一个也不加锁!
std::lock(mutex1,mutex2,...)
来看一个例子:
// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m); // 1
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3
swap(lhs.some_detail,rhs.some_detail);
}
};
swap操作有两个操作数,要对两个操作数都上锁,如果一个线程加锁顺序是先A后B,一个线程是先B后A,自然发生死锁,而采用std::lock要么全部上锁,要么一个也不上锁。
2和3处的语句中std::adopt_lock是说明已经上过锁了,std::lock_guard只需要转移所有权,不用上锁。发生std::lock最后还是交给std::lock_guard来解锁。
上面的代码有点臃肿,c++17提供了std::scoped_lock,也就是个RAII类型,在构造时起到std::lock的作用,在析构时进行解锁。因此,可以改为:
void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::scoped_lock<std::mutex,std::mutex> guard(lhs.m,rhs.m);
// 也可以写成std::scoped_lock guard(lhs.m,rhs.m);
// 用到c++17提供的隐式参数模板类型推导机制,当然这不是本文重点,不介绍了
swap(lhs.some_detail,rhs.some_detail);
}
try_lock跟lock一样也是批量锁,不同的是try_lock是非阻塞的加锁,也就是加锁不成功,不阻塞继续执行,全部上锁才算加锁成功。有返回值true和false。
once_flag和call_once
很多时候我们的代码只需要执行一次,比如初始化时,先判断数据是否已被初始化,若未初始化则调用初始化方法。
这在单线程中是安全的,在多线程中是不安全的,所以我们要用互斥量进行互斥访问,但直接上锁会太浪费,造成不必要的阻塞同步。
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo(){
std::unique_lock<std::mutex> lk(resource_mutex); // 所有线程在此序列化
if(!resource_ptr){
resource_ptr.reset(new some_resource); // 只有初始化过程需要保护
}
lk.unlock();
resource_ptr->do_something();
}
我们可以用c++提供的std::once_flag和std::call_once来在多线程中正确实现只执行一次的代码。
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;
// 用来指示是否调用过,当call_once调用完就改变
void init_resource()
{
resource_ptr.reset(new some_resource);
}
void foo()
{
std::call_once(resource_flag,init_resource); // 可以完整的进行一次初始化
resource_ptr->do_something();
}
recursive_mutex循环锁
当一个线程已经获取一个std::mutex时(已经上锁),并对其再次上锁,这个操作就是错误的,并且继续尝试这样做 的话,就会产生未定义行为。
然而,在某些情况下,一个线程尝试获取同一个互斥量多次, 而没有对其进行一次释放是可以的。 之所以可以, 是因为C++标准库提供了 std::recursive_mutex类。 互斥量锁住其他线程前, 必须释放拥有的所有锁, 所以当调用lock()三次后, 也必须调用unlock()三次。
正确使用 std::lock_guard<std::recursive_mutex> 和 std::unique_lock<std::recursive_mutex> 可以帮你处理这些问题。
条件变量
c++提供了std::condition_variable和std::condition_variable_any,条件变量要和互斥量一起使用,前者只限于与std::unique_lock<std::mutex> 一起工作,而后者可以和任何正确的互斥量一起工作,从而加上了_any的后缀。 当然后者的通用性就意味着开销大,无特殊情况优先考虑前者。
条件变量其实就搞懂两类操作即可:wait和notify。
当调用wait方法时,当前线程睡眠,push进睡眠队列;当调用notify时,唤醒一个线程,即pop出睡眠队列。
来看一下伪代码:
wait(){
add this process to list;
block();
}
notify_one(){ // 随机唤醒一个
if(list非空){
随机从list选一个process;
wakeup(p);
}
}
notify_all(){ // 唤醒全部
if(list非空){
for(auto p in S->list){
wakeup(p);
}
}
}
这是最基本的实现,但这段代码有一个致命缺点,就是虚假唤醒(spurous wakeup),来看一个案例。
// global
int product=0;
std::condition_variable cv;
// 消费者代码,多个thread
cv.wait();
product--;
// 生产者代码,1个thread
product++;
cv.notify_all();
上面代码执行,生产者调用了notify_all();来唤醒所有消费者,但生产的产品可能不能满足所有消费者。也就是有一部分消费者即使没有产品用,但是也被虚假唤醒了。
对此,我们需要更改三个地方:
- 首先,在消费者加一句代码while(product==0),product==0就是条件。如果不满足条件继续wait;
- 其次,由于访问了临界区product时,自然要对其要上锁;
- 最后,要避免死锁,wait()发生阻塞时要解锁,这样生产者才能拿到锁,唤醒后又拿到锁跟调用wait前保持一致;
// 条件变量的wait方法改为
wait(lock){
add this process to list;
lock.unlock(); // 更改3 阻塞解锁,避免死锁
block();
lock.lock(); // 更改3 唤醒上锁,回到调用wait前状态
}
// 消费者线程代码改为
std::unique_lock<std::mutex> lock(mutex); // 更改2
while(product==0) // 更改1
cv.wait(lock);
product--;
现在我们可以回答为什么条件变量要和互斥量一起使用?
因为wait前要while循环判断条件是否满足,出现了临界区 为什么std::condition_variable只能与std::unique_lock<std::mutex>一起工作?
因为std::condition_variable的wait方法就进行unlock和lock,unique_lock才支持这么灵活的锁管理 <hr/>小结一下:
- 在wait加while循环判断条件
- 要和互斥量一起用
- 调用wait函数时,要传递参数lock
<hr/>理解了上面这些,再看c++std::condition_variable提供的方法就很简单了。
- wait(lock)和wait(lock, callable)
std::condition_variable重载了两个wait方法,后者wait(lock, callable)的第二个参数是函数,就是条件判断,根据返回的true\false决定是否继续阻塞。其实wait(lock, callable)就是等价于:
while(!callable())
wait(lock);
- notify_one()和notify_all()
<hr/>未完待续 |
|