你肯定不想让共享数据陷入条件竞争,或是出现破坏不变量的情况。将所有访问共享数据的代码标记为互斥是否是一种更好的办法呢?这样,任何一个线程在执行时,其他线程就必须进行等待。除非该线程在修改共享数据,否则任何线程都不可能会看到不变量的中间状态。
访问共享数据前,将数据锁住,在访问结束后,再将数据解锁。线程库需要保证,当线程使用互斥量锁住共享数据时,其他的线程都必须等到之前那个线程对数据进行解锁后,才能进行访问数据。
互斥量是C++保护数据最通用的机制,但也需要编排代码来保护数据的正确性(见3.2.2节),并避免接口间的条件竞争(见3.2.3节)也非常重要。不过,互斥量也会造成死锁(见3.2.4节),或对数据保护的太多(或太少)(见3.2.8节)。
通过实例化std::mutex
创建互斥量实例,成员函数lock()可对互斥量上锁,unlock()为解锁。不过,不推荐直接去调用成员函数,调用成员函数就意味着,必须在每个函数出口都要去调用unlock()(包括异常的情况)。C++标准库为互斥量提供了RAII模板类std::lock_guard
,在构造时就能提供已锁的互斥量,并在析构时进行解锁,从而保证了互斥量能被正确解锁。下面的代码中,展示了如何在多线程应用中,使用std::mutex
构造的std::lock_guard
实例,对列表进行访问保护。(std::mutex
和std::lock_guard
都在<mutex>
头文件中声明。)
代码3.1 使用互斥量保护列表
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list; // 1
std::mutex some_mutex; // 2
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex); // 3
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex); // 4
return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}
代码3.1中有一个全局变量①,这个全局变量被一个全局的互斥量保护②。add_to_list()③和list_contains()④函数中使用std::lock_guard<std::mutex>
,使得这两个函数中对数据的访问是互斥的:list_contains()不可能看到正在被add_to_list()修改的列表。
C++17中添加了一个新特性,称为模板类参数推导,类似std::lock_guard
这样简单的模板类型,其模板参数列表可以省略。③和④的代码可以简化成:
std::lock_guard guard(some_mutex);
具体的模板参数类型推导则交给C++17的编译器完成。3.2.4节中,会介绍C++17中的一种加强版数据保护机制——std::scoped_lock
,所以在C++17的环境下,上面的这行代码也可以写成:
std::scoped_lock guard(some_mutex);
为了让代码更加清晰,并且兼容只支持C++11标准的编译器,我会继续使用std::lock_guard
,并在代码中写明模板参数的类型。
某些情况下使用全局变量没问题,但大多数情况下,互斥量通常会与需要保护的数据放在同一类中,而不是定义成全局变量。这是面向对象设计的准则:将其放在一个类中,就可让他们联系在一起,也可对类的功能进行封装,并进行数据保护。这种情况下,函数add_to_list和list_contains可以作为这个类的成员函数。互斥量和需要保护的数据,在类中都定义为private成员,这会让代码更清晰,并且方便了解什么时候对互斥量上锁。所有成员函数都会在调用时对数据上锁,结束时对数据解锁,这就保证了访问时数据不变量的状态稳定。
当然,也不是总能那么理想:当其中一个成员函数返回的是保护数据的指针或引用时,也会破坏数据。具有访问能力的指针或引用可以访问(并可能修改)保护数据,而不会被互斥锁限制。这就需要对接口谨慎设计,要确保互斥量能锁住数据访问,并且不留后门。
使用互斥量来保护数据,并不是在每一个成员函数中加入一个std::lock_guard
对象那么简单。一个指针或引用,也会让这种保护形同虚设。不过,检查指针或引用很容易,只要没有成员函数通过返回值或者输出参数的形式,向其调用者返回指向受保护数据的指针或引用,数据就是安全的。确保成员函数不会传出指针或引用的同时,检查成员函数是否通过指针或引用的方式来调用也是很重要的(尤其是这个操作不在你的控制下时)。函数可能没在互斥量保护的区域内存储指针或引用,这样就很危险。更危险的是:将保护数据作为一个运行时参数,如同下面代码中所示。
代码3.2 无意中传递了保护数据的引用
class some_data
{
int a;
std::string b;
public:
void do_something();
};
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data); // 1 传递“保护”数据给用户函数
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
unprotected=&protected_data;
}
data_wrapper x;
void foo()
{
x.process_data(malicious_function); // 2 传递一个恶意函数
unprotected->do_something(); // 3 在无保护的情况下访问保护数据
}
例子中process_data看起来没有问题,std::lock_guard
对数据做了很好的保护,但调用用户提供的函数func①,就意味着foo能够绕过保护机制将函数malicious_function
传递进去②,可以在没有锁定互斥量的情况下调用do_something()
。
这段代码的问题在于根本没有保护,只是将所有可访问的数据结构代码标记为互斥。函数foo()
中调用unprotected->do_something()
的代码未能被标记为互斥。这种情况下,C++无法提供任何帮助,只能由开发者使用正确的互斥锁来保护数据。从乐观的角度上看,还是有方法的:切勿将受保护数据的指针或引用传递到互斥锁作用域之外。
虽然,这是使用互斥量保护共享数据时常犯的错误,但绝不仅仅是一个潜在的陷阱。下一节中,即便是使用了互斥量对数据进行保护,条件竞争依旧存在。
使用了互斥量或其他机制保护了共享数据,就不必再为条件竞争所担忧吗?并不是,依旧需要确定数据是否受到了保护。回想之前双链表的例子,为了能让线程安全地删除一个节点,需要确保防止对这三个节点(待删除的节点及其前后相邻的节点)的并发访问。如果只对指向每个节点的指针进行访问保护,那就和没有使用互斥量一样,条件竞争仍会发生——除了指针,整个数据结构和整个删除操作需要保护。这种情况下最简单的解决方案就是使用互斥量来保护整个链表,如代码3.1所示。
尽管链表的个别操作是安全的,但依旧可能遇到条件竞争。例如,构建一个类似于std::stack
的栈(代码3.3),除了构造函数和swap()以外,需要对std::stack
提供五个操作:push()一个新元素进栈,pop()一个元素出栈,top()查看栈顶元素,empty()判断栈是否是空栈,size()了解栈中有多少个元素。即使修改了top(),返回一个拷贝而非引用(即遵循了3.2.2节的准则),这个接口仍存在条件竞争。这个问题不仅存在于互斥量实现接口中,在无锁实现接口中,也会产生条件竞争。这是接口的问题,与实现方式无关。
代码3.3 std::stack
容器的实现
template<typename T,typename Container=std::deque<T> >
class stack
{
public:
explicit stack(const Container&);
explicit stack(Container&& = Container());
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&);
bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);
template <class... Args> void emplace(Args&&... args); // C++14的新特性
};
虽然empty()和size()可能在返回时是正确的,但结果不可靠。当返回后,其他线程就可以自由地访问栈,并且可能push()多个新元素到栈中,也可能pop()一些已在栈中的元素。这样的话,之前从empty()和size()得到的数值就有问题了。
非共享的栈对象,如果栈非空,使用empty()检查再调用top()访问栈顶部的元素是安全的。如下代码所示:
stack<int> s;
if (! s.empty()){ // 1
int const value = s.top(); // 2
s.pop(); // 3
do_something(value);
}
不仅在单线程代码中安全,而且在空堆栈上调用top()是未定义的行为也符合预期。对于共享的栈对象,这样的调用顺序就不再安全,因为在调用empty()①和调用top()②之间,可能有来自另一个线程的pop()调用并删除了最后一个元素。这是一个经典的条件竞争,使用互斥量对栈内部数据进行保护,但依旧不能阻止条件竞争的发生,这就是接口固有的问题。
怎么解决呢?问题发生在接口设计上,所以解决的方法就是变更接口设计。怎么改?这个简单的例子中调用top()时,发现栈已经是空,就抛出异常。这能直接解决这个问题,但这是一个笨拙的解决方案,这样的话,即使empty()返回false的情况下,也需要进行异常捕获。本质上,这会让empty()成为一个多余函数。
仔细的观察之前的代码段,在调用top()②和pop()③之间会发现另一个潜在的条件竞争。假设两个线程运行着前面的代码,并且都引用同一个栈对象。当为性能而使用线程时,多个线程在不同的数据上执行相同的操作很正常,并且共享栈可以将工作进行分摊。假设,一开始栈中只有两个元素,这时任一线程上的empty()和top()都存在竞争,只需要考虑可能的执行顺序即可。
内部互斥量保护栈时,只有一个线程可以调用栈的成员函数,所以调用可以很好地交错,并且do_something()是可以并发运行的。在表3.1中,展示一种可能的执行顺序。
表3.1 一种可能执行顺序
Thread A | Thread B |
---|---|
if (!s.empty); | |
if(!s.empty); | |
int const value = s.top(); | |
int const value = s.top(); | |
s.pop(); | |
do_something(value); | s.pop(); |
do_something(value); |
当线程运行时,调用两次top(),没修改栈,所以每个线程能得到同样的值。不仅是这样,调用top()的过程中(两次),都没有调用pop()函数。这样,在其中一个值再读取的时候,虽然不会出现“写后读”的情况,但其值已处理了两次。这种条件竞争,比未定义的empty()/top()竞争更加严重。虽然结果依赖于do_something()的结果,但因为看起来没有任何错误,就会让这个Bug更难定位。
这就需要接口设计上有较大的改动,提议之一就是使用同一互斥量来保护top()和pop()。Tom Cargill[1]指出当拷贝构造函数在栈中抛出一个异常,这样的处理方式就会有问题。在Herb Sutter[2]看来,这个问题可以从“异常安全”的角度完美解决,不过潜在的条件竞争,可能会组成一些新的组合。
说一些大家没有意识到的问题:假设有一个stack<vector<int>>
,vector是一个动态容器,当拷贝一个vector,标准库会从堆上分配很多内存来完成这次拷贝。当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以vector的拷贝构造函数可能会抛出一个std::bad_alloc
异常。当vector中存有大量元素时,这种情况发生的可能性更大。当pop()函数返回“弹出值”时(也就是从栈中将这个值移除),会有一个潜在的问题:这个值返回到调用函数的时候,栈才被改变。但拷贝数据的时候,调用函数抛出一个异常会怎么样? 如果真的发生了,要弹出的数据将会丢失,它的确从栈上移出了,但是拷贝失败了!std::stack
的设计人员将这个操作分为两部分:先获取顶部元素(top()),然后从栈中移除(pop())。这样,在不能安全的将元素拷贝出去的情况下,栈中的这个数据还依旧存在,没有丢失。当问题是堆空间不足,应用可能会释放一些内存,然后再进行尝试。
不幸的是,这样的分割却制造了本想避免的条件竞争。幸运的是,我们还有的别的选项,但使用每个选项都有相应的代价。
选项1: 传入一个引用
第一个选项是将变量的引用作为参数,传入pop()函数中获取“弹出值”:
std::vector<int> result;
some_stack.pop(result);
这种方式还不错,缺点也很明显:需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的,因为临时构造一个实例,从时间和资源的角度上来看都不划算。对于其他的类型,这样也不总行得通,因为构造函数需要的参数,在这个阶段不一定可用。最后,需要可赋值的存储类型,这是一个重大限制:即使支持移动构造,甚至是拷贝构造(从而允许返回一个值),很多用户自定义类型可能都不支持赋值操作。
选项2:无异常抛出的拷贝构造函数或移动构造函数
对于有返回值的pop()函数来说,只有“异常安全”方面的担忧(当返回值时可以抛出一个异常)。很多类型都有拷贝构造函数,它们不会抛出异常,并且随着新标准中对“右值引用”的支持(详见附录A,A.1节),很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,也不会抛出异常。一个有用的选项可以限制对线程安全栈的使用,并且能让栈安全的返回所需的值,而不抛出异常。
虽然安全,但非可靠。尽管能在编译时可使用std::is_nothrow_copy_constructible
和std::is_nothrow_move_constructible
,让拷贝或移动构造函数不抛出异常,但是这种方式的局限性太强。用户自定义的类型中,会有不抛出异常的拷贝构造函数或移动构造函数的类型, 那些有抛出异常的拷贝构造函数,但没有移动构造函数的类型往往更多(这种情况会随着人们习惯于C++11中的右值引用而有所改变)。如果这些类型不能存储在线程安全的栈中,那将是多么的不幸。
选项3:返回指向弹出值的指针
第三个选择是返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常,这样就能避免Cargill提到的异常问题了。缺点就是返回指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。对于这个方案,使用std::shared_ptr
是个不错的选择,不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,就不需要new和delete操作。这种优化是很重要的:因为堆栈中的每个对象,都需要用new进行独立的内存分配,相较于非线程安全版本,这个方案的开销相当大。
选项4:“选项1 + 选项2”或 “选项1 + 选项3”
对于通用的代码来说,灵活性不应忽视。当已经选择了选项2或3时,再去选择1也是很容易的。这些选项提供给用户,让用户自己选择最合适,最经济的方案。
例:定义线程安全的堆栈
代码3.4中是一个接口没有条件竞争的堆栈类定义,它实现了选项1和选项3:重载了pop(),使用局部引用去存储弹出值,并返回std::shared_ptr<>
对象。它有一个简单的接口,只有两个函数:push()和pop();
代码3.4 线程安全的堆栈类定义(概述)
#include <exception>
#include <memory> // For std::shared_ptr<>
struct empty_stack: std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
public:
threadsafe_stack();
threadsafe_stack(const threadsafe_stack&);
threadsafe_stack& operator=(const threadsafe_stack&) = delete; // 1 赋值操作被删除
void push(T new_value);
std::shared_ptr<T> pop();
void pop(T& value);
bool empty() const;
};
削减接口可以获得最大程度的安全,甚至限制对栈的一些操作。栈是不能直接赋值的,因为赋值操作已经删除了①(详见附录A,A.2节),并且这里没有swap()函数。当栈为空时,pop()函数会抛出一个empty_stack异常,所以在empty()函数被调用后,其他部件还能正常工作。如选项3描述的那样,使用std::shared_ptr
可以避免内存分配管理的问题,并避免多次使用new和delete操作。堆栈中的五个操作,现在就剩下三个:push(), pop()和empty()(这里empty()都有些多余)。简化接口更有利于数据控制,可以保证互斥量将操作完全锁住。下面的代码展示了一个简单的实现——封装std::stack<>
的线程安全堆栈。
代码3.5 扩充(线程安全)堆栈
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack: std::exception
{
const char* what() const throw() {
return "empty stack!";
};
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack()
: data(std::stack<T>()){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data; // 1 在构造函数体中的执行拷贝
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空
std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
堆栈可以拷贝——拷贝构造函数对互斥量上锁,再拷贝堆栈。构造函数体中①的拷贝使用互斥量来确保复制结果的正确性,这样的方式比成员初始化列表好。
之前对top()和pop()函数的讨论中,因为锁的粒度太小,恶性条件竞争已经出现,需要保护的操作并未全覆盖到。不过,锁的颗粒度过大同样会有问题。还有一个问题,一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,线程可以强制运行,甚至可以访问不同位置的数据,抵消了并发带来的性能提升。第一版为多处理器系统设计Linux内核中,就使用了一个全局内核锁。这个锁能正常工作,但在双核处理系统的上的性能要比两个单核系统的性能差很多,四核系统就更不能提了。太多请求去竞争占用内核,使得依赖于处理器运行的线程没有办法很好的工作。随后修正的Linux内核加入了一个细粒度锁方案,因为少了很多内核竞争,这时四核处理系统的性能就和单核处理的四倍差不多了。
使用多个互斥量保护所有的数据,细粒度锁也有问题。如前所述,当增大互斥量覆盖数据的粒度时,只需要锁住一个互斥量。但这种方案并非放之四海皆准,互斥量保护一个独立类的实例,锁的状态的下一个阶段,不是离开锁定区域将锁定区域还给用户,就是有独立的互斥量去保护这个类的全部实例,两种方式都不怎么好。
一个给定操作需要两个或两个以上的互斥量时,另一个潜在的问题将出现:死锁。与条件竞争完全相反——不同的两个线程会互相等待,从而什么都没做。
试想有一个玩具,这个玩具由两部分组成,必须拿到这两个部分,才能够玩。例如玩具鼓,需要鼓锤和鼓才能玩。有两个小孩,他们都很喜欢玩这个玩具。当其中一个孩子拿到了鼓和鼓锤时,那就可以尽情的玩耍了。当另一孩子想要玩,他就得等待另一孩子玩完才行。再试想,鼓和鼓锤被放在不同的玩具箱里,并且两个孩子在同一时间里都想要去敲鼓。之后,他们就去玩具箱里面找这个鼓。其中一个找到了鼓,并且另外一个找到了鼓锤。现在问题就来了,除非其中一个孩子决定让另一个先玩,他可以把自己的那部分给另外一个孩子。但当他们都紧握着自己所有的部分,那么这个鼓谁都没法玩。
现在没有孩子去争抢玩具,但线程有对锁的竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。因为他们都在等待对方释放互斥量,没有线程能工作。这种情况就是死锁,它的问题就是由两个或两个以上的互斥量进行锁定。
避免死锁的一般建议,就是让两个互斥量以相同的顺序上锁:总在互斥量B之前锁住互斥量A,就永远不会死锁。某些情况下是可以这样用,因为不同的互斥量用于不同的地方。不过,当有多个互斥量保护同一个类的独立实例时,一个操作对同一个类的两个不同实例进行数据的交换操作,为了保证数据交换操作的正确性,就要避免并发修改数据,并确保每个实例上的互斥量都能锁住自己要保护的区域。不过,选择一个固定的顺序(例如,实例提供的第一互斥量作为第一个参数,提供的第二个互斥量为第二个参数),可能会适得其反:在参数交换了之后,两个线程试图在相同的两个实例间进行数据交换时,程序又死锁了!
很幸运,C++标准库有办法解决这个问题,std::lock
——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险)。下面的程序代码中,就来看一下怎么在一个简单的交换操作中使用std::lock
。
代码3.6 交换操作中使用std::lock()
和std::lock_guard
// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m); // 1
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3
swap(lhs.some_detail,rhs.some_detail);
}
};
首先检查参数,因为操作试图获取std::mutex
对象上的锁,所以结果很难预料。(互斥量可以在同一线程上多次上锁,标准库中std::recursive_mutex
提供这样的功能。详情见3.3.3节)。然后,调用std::lock()
①锁住两个互斥量,并且创建两个std:lock_guard
实例②③。提供std::adopt_lock
参数除了表示std::lock_guard
可获取锁之外,还将锁交由std::lock_guard
管理,就不需要std::lock_guard
再去构建新的锁了。
这样,就能保证在大多数情况下,函数退出时互斥量能解锁(保护操作可能会抛出一个异常),也允许使用一个简单的“return”作为返回。当使用std::lock
去锁lhs.m或rhs.m时,可能会抛出异常,异常会传播到std::lock
之外。当std::lock
获取互斥锁时,并尝试从另一个互斥量上再获取锁时,就会有异常抛出,第一个锁也会随着异常而自动释放,所以std::lock
要么将两个锁都锁住,要不一个都不锁。
C++17对这种情况提供了支持,std::scoped_lock<>
是一种新的RAII模板类型,与 std::lock_guard<>
的功能相同,这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数。互斥量支持构造时上锁,与std::lock
的用法相同,解锁在析构中进行。代码3.6中swap()操作可以重写如下:
void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::scoped_lock guard(lhs.m,rhs.m); // 1
swap(lhs.some_detail,rhs.some_detail);
}
这里使用了C++17的另一个特性:自动推导模板参数。如果有支持C++17的编译器(就能使用std::scoped_lock
了,因为其是C++17标准库中的一个工具),C++17可以通过隐式参数模板类型推导机制, 通过传递的对形象类型来构造实例①。这行代码等价于下面全给参数的版本:
std::scoped_lock<std::mutex,std::mutex> guard(lhs.m,rhs.m);
std::scoped_lock
的好处在于,可以将所有std::lock
替换掉,从而减少错误的发生。
虽然std::lock
(和std::scoped_lock<>
)可以在这情况下(获取两个以上的锁)避免死锁,但它没办法帮助你获取其中一个锁。这需要依赖开发者的纪律性(译者:也就是经验),来确保程序不会死锁。
死锁是多线程编程中令人相当头痛的问题,并且死锁经常是不可预见的,因为在大部分时间里,所有工作都能很好的完成。不过,一些相对简单的规则能帮助写出“无死锁”的代码。
死锁通常是对锁的使用不当造成。无锁的情况下,仅需要两个线程std::thread
对象互相调用join()就能产生死锁。这种情况下,没有线程可以继续运行,因为他们正在互相等待。这种情况很常见,一个线程会等待另一个线程,其他线程同时也会等待第一个线程结束,所以三个或更多线程的互相等待也会发生死锁。为了避免死锁,这里意见:不要谦让。以下提供一些个人建议。
避免嵌套锁
第一个建议往往是最简单的:线程获得一个锁时,就别再去获取第二个。每个线程只持有一个锁,就不会产生死锁。当需要获取多个锁,使用std::lock
来做这件事(对获取锁的操作上锁),避免产生死锁。
避免在持有锁时调用外部代码
第二个建议是次简单的:因为代码是外部提供的,所以没有办法确定外部要做什么。外部程序可能做任何事情,包括获取锁。在持有锁的情况下,如果用外部代码要获取一个锁,就会违反第一个指导意见,并造成死锁(有时这是无法避免的)。当写通用代码时(例如3.2.3中的栈),每一个操作的参数类型,都是外部提供的定义,这就需要其他指导意见来帮助你了。
使用固定顺序获取锁
当硬性要求获取两个或两个以上的锁,并且不能使用std::lock
单独操作来获取它们时,最好在每个线程上,用固定的顺序获取它们(锁)。3.2.4节中提到,当需要获取两个互斥量时,需要以一定的顺序获取锁。一些情况下,这种方式相对简单。比如,3.2.3节中的栈——每个栈实例中都内置有互斥量,但是对数据成员存储的操作上,栈就需要调用外部代码。虽然,可以添加一些约束,对栈上存储的数据项不做任何操作,但对数据项的处理仅限于栈自身。这会让使用通用栈的难度有所增加,但是一个容器很少去访问另一个容器中存储的数据,即使发生了也会很显眼,所以这对于通用栈来说并不是一个特别重的负担。
其他情况下,这就没那么简单了(例如:3.2.4节中的交换操作),这时可能同时锁住多个互斥量(有时不会发生)。3.1节中那个链表连接例子中,列表中的每个节点都会有一个互斥量保护。为了访问链表,线程必须获取感兴趣节点上的互斥锁。当一个线程删除一个节点,就必须获取三个节点上的互斥锁:将要删除的节点,两个邻接节点。为了遍历链表,线程必须保证在获取当前节点的互斥锁前提下,获得下一个节点的锁,要保证指向下一个节点的指针不会同时被修改。当下一个节点上的锁被获取,第一个节点的锁就可以释放了。
这种“手递手”的模式允许多个线程访问链表,为每一个访问的线程提供不同的节点。为了避免死锁,节点必须以固定的顺序上锁:如果两个线程试图用互为反向的顺序,在使用“手递手”遍历列表时,执行到链表中间部分时会发生死锁。当节点A和B在列表中相邻,当前线程可能会同时尝试获取A和B上的锁。另一个线程可能已经获取了节点B上的锁,并试图获取节点A上的锁——经典的死锁场景,如图3.2所示。
线程1 | 线程2 |
---|---|
锁住主入口的互斥量 | |
读取头结点指针 | |
锁住头结点互斥量 | |
解锁主入口互斥量 | |
锁住主入口互斥量 | |
读取head->next指针 | 锁住尾结点互斥量 |
锁住next结点的互斥量 | 读取tail->prev指针 |
读取next->next指针 | 解锁尾结点的互斥量 |
... | ... |
锁住A结点的互斥量 | 锁住C结点的互斥量 |
读取A->next指针(也就是B结点) | 读取C->next指针(也就是B结点) |
锁住B结点互斥量 | |
阻塞,尝试锁住B结点的互斥量 | 解锁C结点互斥量 |
读取B->prev指针(也就是A结点) | |
阻塞,尝试锁住A结点的互斥量 | |
死锁! |
图3.2 不同线程以相反顺序访问列表所造成的死锁
当A、C节点中间的B节点删除时,有线程在已获取A和C上的锁后,还要获取B节点上的锁时,就可能发生死锁。线程可能会试图先锁住A节点或C节点(根据遍历的方向),但是发现无法获得B上的锁,因为执行删除任务的线程,已经获取了B上的锁。
这里提供一种避免死锁的方式,定义遍历的顺序,一个线程必须先锁住A才能获取B的锁,在锁住B之后才能获取C的锁。这将消除死锁,不允许反向遍历链表。类似的约定常用于建立其他的数据结构。
使用层次锁结构
虽然,定义锁的顺序是一种特殊情况,但层次锁的意义在于,在运行时会约定是否进行检查。这个建议需要对应用进行分层,并且识别在给定层上所有互斥量。当代码试图对互斥量上锁,而低层已持有该层锁时,不允许锁定。可以通过每个互斥量对应的层数,以及每个线程使用的互斥量,在运行时检查锁定操作是否可以进行。下面的代码列表中,展示两个线程如何使用进行分层互斥的。
代码3.7 使用层次锁来避免死锁
hierarchical_mutex high_level_mutex(10000); // 1
hierarchical_mutex low_level_mutex(5000); // 2
hierarchical_mutex other_mutex(6000); // 3
int do_low_level_stuff();
int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 4
return do_low_level_stuff();
}
void high_level_stuff(int some_param);
void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 6
high_level_stuff(low_level_func()); // 5
}
void thread_a() // 7
{
high_level_func();
}
void do_other_stuff();
void other_stuff()
{
high_level_func(); // 10
do_other_stuff();
}
void thread_b() // 8
{
std::lock_guard<hierarchical_mutex> lk(other_mutex); // 9
other_stuff();
}
这段代码有三个hierarchical_mutex实例(①,②和③),其通过逐渐递减的层级进行构造。根据已经定义好的机制,如将一个hierarchical_mutex实例进行上锁,那么只能获取更低层级实例上的锁,这就会对代码进行一些限制。
假设do_low_level_stuff不会对任何互斥量进行上锁,low_level_func为层级最低的函数,并且会对low_level_mutex④进行上锁。high_level_func调用low_level_func⑤的同时,也持有high_level_mutex⑥上的锁,这也没什么问题,因为high_level_mutex(①:10000)要比low_level_mutex(②:5000)更高级。
thread_a()⑦遵守规则,所以运行没问题。
另一方面,thread_b()⑧无视规则,因此在运行时会失败。
首先,thread_b锁住了other_mutex⑨,这个互斥量的层级值只有6000③。这就意味着,中层级的数据已被保护。当other_stuff()调用high_level_func()⑧时,就违反了层级结构:high_level_func()试图获取high_level_mutex,这个互斥量的层级值是10000,要比当前层级值6000大很多。因此hierarchical_mutex将会产生一个错误,可能会是抛出一个异常或直接终止程序。层级互斥量不可能死锁,因为互斥量本身会严格遵循约定进行上锁。当多个互斥量在是在同一级上时,不能同时持有多个锁,所以“手递手”的方案需要每个互斥量在一条链上,并且每个互斥量都比前一个有更低的层级值,这在某些情况下无法实现。
例子也展示了std::lock_guard<>
模板与用户自定义的互斥量类型如何一起使用。虽然hierarchical_mutex不是C++标准的一部分,但是写起来很容易,代码3.8中有一个简单的实现。尽管它是一个用户定义类型,可用于std::lock_guard<>
模板中,为了满足互斥量操作,其有三个成员函数:lock(), unlock() 和 try_lock()。try_lock()使用起来很简单:当互斥量上的锁被一个线程持有,它将返回false,而不是等待调用的线程,直到能够获取互斥量上的锁为止。std::lock()
的内部实现中,try_lock()作为避免死锁算法的一部分。
代码3.8 简单的层级互斥量实现
class hierarchical_mutex
{
std::mutex internal_mutex;
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;
static thread_local unsigned long this_thread_hierarchy_value; // 1
void check_for_hierarchy_violation()
{
if(this_thread_hierarchy_value <= hierarchy_value) // 2
{
throw std::logic_error(“mutex hierarchy violated”);
}
}
void update_hierarchy_value()
{
previous_hierarchy_value=this_thread_hierarchy_value; // 3
this_thread_hierarchy_value=hierarchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value),
previous_hierarchy_value(0)
{}
void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock(); // 4
update_hierarchy_value(); // 5
}
void unlock()
{
if(this_thread_hierarchy_value!=hierarchy_value)
throw std::logic_error(“mutex hierarchy violated”); // 9
this_thread_hierarchy_value=previous_hierarchy_value; // 6
internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if(!internal_mutex.try_lock()) // 7
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX); // 8
这里重点是使用了thread_local的值来代表当前线程的层级值:this_thread_hierarchy_value①,初始化为最大值⑧,所以最初所有线程都能被锁住。因为声明中有thread_local,所以每个线程都有其副本,这样线程中变量状态完全独立,当从另一个线程进行读取时,变量的状态也完全独立。
所以,线程第一次锁住一个hierarchical_mutex时,this_thread_hierarchy_value的值是ULONG_MAX。由于其本身的性质,这个值会大于其他任何值,所以通过了check_for_hierarchy_vilation()②的检查。这种检查下,lock()代表内部互斥锁已锁住④。一旦成功锁住,就可以更新层级值了⑤。
当持有第一个锁的同时,还锁住了另一个hierarchical_mutex,this_thread_hierarchy_value的值将会显示第一个互斥量的层级值。第二个互斥量的层级值必须小于已持有互斥量,检查函数②才能通过。
现在,最重要的是为当前线程赋予之前的层级值,可以调用unlock()⑥对层级值进行保存。否则,就锁不住任何互斥量(第二个互斥量的层级数高于第一个互斥量),即使线程没有持有任何锁。因为保存了之前的层级值,只有当持有internal_mutex③,且在解锁内部互斥量⑥之前存储它的层级值时,需要内部互斥量对hierarchical_mutex实例进行保护,才能安全的将hierarchical_mutex存储。为了避免无序解锁造成层次混乱,不是解锁最近上锁的那个互斥量,就需要抛出异常⑨。其他机制也能做到这点,但目前这是最简单的。
try_lock()与lock()的功能相似,除了在调用internal_mutex的try_lock()⑦失败时,不能持有对应锁,所以不必更新层级值,并直接返回false。
虽然是运行时检测,但无时间依赖性——不必去等待构成死锁的条件出现。同时,设计过程需要拆分应用,互斥量在这种情况下可以消除死锁的可能性。这样的练习很有必要去做一下,即使你之后没有去做,代码也会在运行时检查。
超越锁的延伸扩展
死锁不仅仅会发生在锁之间,也会发生在同步构造中(可能会产生一个等待循环),这也需要有指导意见,例如:获取嵌套锁,等待一个持有锁的线程,都是很糟糕的决定(因为线程为了能继续运行可能需要获取对应的锁)。如果去等待一个线程结束,应该确定这个线程的层级,这样一个线程只需要等待比其层级低的线程结束即可。用一个简单的办法便可确定,添加的线程是否在同一函数中启动,如同在3.1.2节和3.3节中描述的那样。
代码已能规避死锁,std::lock()
和std::lock_guard
可组成简单的锁,并覆盖大多数情况,但有时需要更多的灵活性,可以使用标准库提供的std::unique_lock
模板。如 std::lock_guard
,这是一个参数化的互斥量模板类,它提供很多RAII类型锁用来管理std::lock_guard
类型,可以让代码更加灵活。
std::unqiue_lock
使用起来更为自由,std::unique_lock
实例不会总与互斥量的数据类型相关,使用起来要比std:lock_guard
更加灵活。首先,可将std::adopt_lock
作为第二个参数传入构造函数,对互斥量进行管理。也可以将std::defer_lock
作为第二个参数传递进去,表明互斥量应保持解锁状态。这样就可以让std::unique_lock
对象(不是互斥量)的lock()所获取,或传递std::unique_lock
对象到std::lock()
中。代码3.6可以轻易的转换为代码3.9,使用std::unique_lock
和std::defer_lock
①,而非std::lock_guard
和std::adopt_lock
。代码长度相同,几乎等价,唯一不同的就是:std::unique_lock
会占用比较多的空间,并且比std::lock_guard
稍慢一些。保证灵活性要付出代价,这个代价就是允许std::unique_lock
实例不带互斥量:信息已存储,且已更新。
代码3.9 交换操作中std::lock()
和std::unique_lock
的使用
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock); // 1
std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock); // 1 std::defer_lock 留下未上锁的互斥量
std::lock(lock_a,lock_b); // 2 互斥量在这里上锁
swap(lhs.some_detail,rhs.some_detail);
}
};
代码3.9中,因为std::unique_lock
支持lock(), try_lock()和unlock()成员函数,所以能将std::unique_lock
对象传递到std::lock()
②。这些同名成员函数在低层做着实际的工作,并且仅更新std::unique_lock
实例中的标志,来确定该实例是否拥有特定的互斥量,这个标志是为了确保unlock()在析构函数中正确调用。如果实例拥有互斥量,那么析构函数必须调用unlock()。但当实例中没有互斥量时,析构函数就不能去调用unlock(),这个标志可以通过owns_lock()成员变量进行查询。除非想将std::unique_lock
的所有权进行转让,最好使用C++17中提供的std::scoped_lock
(详见3.2.4节)。
如期望的那样,这个标志存储在了某个地方。因此,std::unique_lock
实例的体积通常要比std::lock_guard
实例大,当使用std::unique_lock
替代std::lock_guard
,会对标志进行更新或检查,就会有一些轻微的性能惩罚。当std::lock_guard
已经能够满足需求时,建议继续使用。当需要更加灵活的锁时,最好选择std::unique_lock
,因为它更适合于你的任务。我们已经看到一个递延锁的例子,另外一种情况是锁的所有权从一个域转到另一个域。
std::unique_lock
实例没有与自身相关的互斥量,互斥量的所有权可以通过移动操作,在不同的实例中进行传递。某些情况下,这种转移是自动发生的,例如:当函数返回一个实例。另一种情况下,需要显式的调用std::move()
来执行移动操作。本质上来说,需要依赖于源值是否是左值——一个实际的值或是引用——或一个右值——一个临时类型。当源值是一个右值,为了避免转移所有权过程出错,就必须显式移动成左值。std::unique_lock
是可移动,但不可赋值的类型。
一种使用可能是允许函数去锁住一个互斥量,并且将所有权移到调用者上,所以调用者可以在这个锁保护的范围内执行额外的动作。
下面的程序片段展示了:函数get_lock()锁住了互斥量,然后准备数据,返回锁的调用函数。
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk; // 1
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock()); // 2
do_something();
}
lk在函数中被声明为自动变量,它不需要调用std::move()
,可以直接返回①(编译器负责调用移动构造函数)。process_data()函数直接转移std::unique_lock
实例的所有权②,调用do_something()可使用的正确数据(数据没有受到其他线程的修改)。
通常这种模式会用于已锁的互斥量,其依赖于当前程序的状态,或依赖于传入返回类型为std::unique_lock
的函数(或以参数返回)。这样不会直接返回锁,不过网关类的数据成员可用来确认,是否已经对保护数据的访问权限进行上锁。这种情况下,所有的访问都必须通过网关类:当你想要访问数据,需要获取网关类的实例(如同前面的例子,通过调用get_lock()之类函数)来获取锁。之后就可以通过网关类的成员函数对数据进行访问,完成访问时可以销毁这个网关类对象,将锁进行释放,让别的线程来访问保护数据。这样的一个网关类可能是可移动的(所以可以从函数进行返回),这种情况下锁对象的数据必须可移动。
std::unique_lock
的灵活性同样也允许实例在销毁之前放弃拥有的锁。可以使用unlock()来做这件事,如同一个互斥量:std::unique_lock
的成员函数提供类似于锁定和解锁的功能。std::unique_lock
实例有在销毁前释放锁的能力,当没有必要在持有锁的时候,可以在特定的代码分支对锁进行选择性释放。这对于应用的性能来说非常重要,因为持有锁的时间增加会导致性能下降,其他线程会等待这个锁的释放,避免超越操作。
3.2.3节中,已经对锁的粒度有所了解:锁的粒度是一个华而不实的术语(hand-waving term),用来描述通过一个锁保护着的数据量大小。一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse-grained lock)能够保护较多的数据量。粒度对于锁来说很重要,为了保护对应的数据,保证锁有能力保护这些数据也很重要。
在超市等待结账的时候,正在结账的顾客突然意识到忘了拿蔓越莓酱,然后离开柜台去拿,并让其他的人都等待他回来。或者当收银员,准备收钱时,顾客才去翻钱包拿钱,这样的情况都会让等待的顾客很无奈。当每个人都检查了自己要拿的东西,且能随时为拿到的商品进行支付时,每件事都会进行得很顺利。
道理同样适用于线程:如果很多线程正在等待同一个资源(等待收银员对自己拿到的商品进行清点),当有线程持有锁的时间过长,这就会增加等待的时间(别等到结账的时候,才想起来蔓越莓酱没拿)。可能的情况下,锁住互斥量的同时只能对共享数据进行访问,试图对锁外数据进行处理。特别是做一些费时的动作,比如:对文件的输入/输出操作进行上锁。文件输入/输出通常要比从内存中读或写同样长度的数据慢成百上千倍,所以除非锁已经打算去保护对文件的访问,要么执行输入/输出操作将会将延迟其他线程执行的时间,这没有必要(因为文件锁阻塞住了很多操作),这样多线程带来的性能效益会被抵消。
std::unique_lock
在这种情况下工作正常,调用unlock()时,代码不需要再访问共享数据。当再次需要对共享数据进行访问时,再调用lock()就可以了。
void get_and_process_data()
{
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process=get_next_data_chunk();
my_lock.unlock(); // 1 不要让锁住的互斥量越过process()函数的调用
result_type result=process(data_to_process);
my_lock.lock(); // 2 为了写入数据,对互斥量再次上锁
write_result(data_to_process,result);
}
不需要让锁住的互斥量越过对process()函数的调用,所以可以在函数调用①前对互斥量进行手动解锁,之后对其再次上锁②。
这表示只有一个互斥量保护整个数据结构时的情况,不仅会有更多对锁的竞争,也会增加持锁的时长。较多的操作步骤需要获取同一个互斥量上的锁,所以持有锁的时间会更长。成本上的双重打击也算是为了向细粒度锁转移提供了激励和可能。
如同上面的例子,锁不仅是能锁住合适粒度的数据,还要控制锁的持有时间,以及哪些操作在执行的同时能够拥有锁。一般情况下,尽可能将持有锁的时间缩减到最小。
代码3.6和3.9中,交换操作需要锁住两个互斥量,其明确要求并发访问两个对象。假设用来做比较的是一个简单的数据类型(比如:int类型),将会有什么不同么?int的拷贝很廉价,所以可以进行数据复制,并且每个比较的对象都持有该对象的锁,在比较之后进行数据拷贝。在最短时间内持有每个互斥量,并且不会在持有一个锁的同时再去获取另一个。下面的代码中展示了这样情景中的Y类,并且展示了一个相等比较运算符的等价实现。
代码3.10 比较操作符中一次锁住一个互斥量
class Y
{
private:
int some_detail;
mutable std::mutex m;
int get_detail() const
{
std::lock_guard<std::mutex> lock_a(m); // 1
return some_detail;
}
public:
Y(int sd):some_detail(sd){}
friend bool operator==(Y const& lhs, Y const& rhs)
{
if(&lhs==&rhs)
return true;
int const lhs_value=lhs.get_detail(); // 2
int const rhs_value=rhs.get_detail(); // 3
return lhs_value==rhs_value; // 4
}
};
例子中,比较操作符首先通过调用get_detail()成员函数检索要比较的值②③,函数在索引时被锁保护着①。比较操作符会在之后比较索引出来的值④。注意:虽然锁只持有一次的操作能减少锁持有的时间(这样能消除死锁的可能性),但这里有一个微妙的语义操作同时对两个锁住的值进行比较。
代码3.10中,当操作符返回true时,就意味着在这个时间点上的lhs.some_detail与另一个时间点的rhs.some_detail相同。这两个值在读取之后,可能会以任意方式修改。两个值会在②和③处进行交换,这样就会失去了比较的意义。比较可能会返回true,表明这两个值是相等的,实际上这两个值相等的情况可能就发生在一瞬间。这样的变化必须要小心,语义操作是无法改变比较方式的:当持有锁的时间没有达到整个操作时间,就会让自己处于条件竞争的状态。
有时可能找不到一个合适的粒度级别,因为并不是所有对数据结构的访问都需要同一级的保护。这个例子中,就需要寻找一个合适的机制,去替换std::mutex
。
[1] Tom Cargill, “Exception Handling: A False Sense of Security,” in C++ Report 6, no. 9 (November–December 1994). Also available at http://www.informit.com/content/images/020163371x/supplements/Exception_Handling_Article.html.
[2] Herb Sutter, Exceptional C++: 47 Engineering Puzzles, Programming Problems, and Solutions (Addison Wesley Pro-fessional, 1999).