Chapter27 多线程和并发

C++17还引入了一些多线程和并发领域的扩展和改进。

27.1 补充的互斥量和锁

27.1.1 std::scoped_lock

C++11引入了一个简单的std::lock_guard来实现简单的RAII风格的互斥量上锁:

  • 构造函数上锁
  • 析构函数解锁(也可能因异常而调用析构函数解锁)

不幸的是,没有标准化的可变参数模板可以用来在一条语句中同时锁住多个互斥量。

std::scoped_lock<>解决了这个问题。它允许我们同时锁住一个或多个互斥量。 互斥量的类型也可以不同。

例如:

  1. #include <mutex>
  2. ...
  3. std::vector<std::string> allIssues;
  4. std::mutex allIssuesMx;
  5. std::vector<std::string> openIssues;
  6. std::timed_mutex openIssuesMx;
  7. // 同时锁住两个issue列表:
  8. {
  9. std::scoped_lock lg(allIssuesMx, openIssuesMx);
  10. ... // 操作allIssues和openIssues
  11. }

注意根据类模板参数推导,声明lg时你不需要指明互斥量的类型。

这个示例等价于下面的C++11代码:

  1. // 锁住两个issue列表:
  2. {
  3. std::lock(allIssuesMx, openIssuesMx); // 以避免死锁的方式上锁
  4. std::lock_guard<std::mutex> lg1(allIssuesMx, std::adopt_lock);
  5. std::lock_guard<std::mutex> lg2(openIssuesMx, std::adopt_lock);
  6. ... // 操作allIssues和openIssues
  7. }

因此,当传入的互斥量超过一个时,scoped_lock的构造函数会使用可变参数的 快捷函数lock(...),这个函数会保证不会导致死锁 (标准中提到: “必须使用一个避免死锁的算法,例如try-and-back-off算法,但具体 使用哪种算法并没有明确指定,这是为了避免过度约束实现” )。

如果只向scoped_lock的构造函数传递了一个互斥量,那么它只简单的锁住互斥量。 因此,如果用单个参数构造scoped_lock,它的行为类似于lock_guard。 它还定义了成员mutex_type,而多个互斥量构造的对象没有这个成员。

因此,你可以把所有的lock_guard都替换为scoped_lock

如果没有传递互斥量,那么将不会有任何效果。

注意你也可以传递已经被锁住的互斥量:

  1. // 锁住两个issue列表:
  2. {
  3. std::lock(allIssuesMx, openIssuesMx); // 注意:使用了避免死锁的算法
  4. std::scoped_lock lg{std::adopt_lock, allIssuesMx, openIssuesMx};
  5. ... // 操作allIssues和openIssues
  6. }

然而,注意现在传递已经被锁住的互斥量时要在前边加上adopt_lock参数。

27.1.2 std::shared_mutex

C++14添加了一个std::shared_timed_mutex来支持读/写锁, 它支持多个线程同时读一个值,偶尔会有一个线程更改值。 然而,在某些平台上不支持超时的锁可以被实现的更有效率。 因此,现在引入了类型std::shared_mutex (就像C++11引入的std::mutexstd::timed_mutex的关系一样)。

std::shared_mutex定义在头文件shared_mutex中,支持以下操作:

  • 对于独占锁:locktry_lock()unlock()
  • 对于共享的读访问:lock_shared()try_lock_shared()unlock_shared()
  • native_handle()

也就是说,和类型std::shared_timed_mutex不同的地方在于, std::shared_mutex不保证支持try_ lock_for()try_lock_until()try_lock_shared_for()try_lock_shared_until等操作。

注意std::shared_timed_mutex是唯一一个不提供native_handle() API的互斥量类型。

使用shared_mutex

我们可以像这样使用shared_mutex:假设你有一个共享的vector,它被多个线程读取, 但偶尔会被修改:

  1. #include <shared_mutex>
  2. #include <mutex>
  3. ...
  4. std::vector<double> v; // 共享的资源
  5. std::shared_mutex vMutex; // 控制对v的访问(在C++14中要使用shared_timed_mutex)

为了获取共享的读权限(多个读者不会互相阻塞),你可以使用std::shared_lock, 它是为共享读权限设计的lock guard(C++14引入)。例如:

  1. if (std::shared_lock sl(vMutex); v.size() > 0) {
  2. ... // vector v的(共享)读权限
  3. }

对于独占的写操作你应该使用独占的lock guard。可以使用简单的lock_guard, 或者scoped_lock(刚刚介绍的)、或者复杂的unique_lock。例如:

  1. {
  2. std::scoped_lock sl(vMutex);
  3. ... // vector v的独占的写权限
  4. }

27.2 原子类型的is_always_lock_free

你现在可以使用一个C++库的特性来检查一个特定的原子类型是否总是可以在无锁的情况下使用。例如:

  1. if constexpr(std::atomic<int>::is_always_lock_free) {
  2. ...
  3. }
  4. else {
  5. ...
  6. }

如果一个原子类型的is_always_lock_free返回true, 那么该类型的对象的is_lock_free()成员一定会返回true

  1. if constexpr(atomic<T>::is_always_lock_free) {
  2. assert(atomic<T>{}.is_lock_free()); // 绝不会失败
  3. }

在C++17之前只能使用相应的宏的值来判断。例如, 当且仅当ATOMIC_INT_LOCK_FREE返回2时(这个值代表“总是”) std::atomic<int>::is_always_lock_freetrue

  1. if constexpr(std::atomic<int>::is_always_lock_free) {
  2. // ATOMIC_INT_LOCK_FREE == 2
  3. ...
  4. }
  5. else {
  6. // ATOMIC_INT_LOCK_FREE == 0 || ATOMIC_INT_LOCK_FREE == 1
  7. ...
  8. }

用静态成员替换宏是为了确保类型更加安全和在复杂的泛型代码中使用这些检查(例如,使用SFINAE时)。

记住std::atomic<>也可以用于平凡的可拷贝类型。因此, 你可以检查如果把你自定义的结构体用作原子类型时是否需要锁。例如:

  1. template<auto SZ>
  2. struct Data {
  3. bool set;
  4. int values[SZ];
  5. double average;
  6. };
  7. if constexpr(std::atomic<Data<4>>::is_always_lock_free) {
  8. ...
  9. }
  10. else {
  11. ...
  12. }

27.3 cache行大小

有时有的程序很需要处理cache行大小的能力:

  • 一方面,不同线程访问的不同对象不属于同一个cache行是很重要的。 否则,不同线程并发访问对象时cache行缓存的内存可能需要同步。

  • 另一方面,你可能会想把多个对象放在同一个cache行中,这样访问了第一个对象之后, 访问接下来的对象时就可以直接在cache中访问它们,不用再把它们调入cache。

为了实现这一点,C++标准库在头文件<new>引入了两个内联变量:

  1. namespace std {
  2. inline constexpr size_t hardware_destructive_interference_size;
  3. inline constexpr size_t hardware_constructive_interference_size;
  4. }

这些对象有下列实现定义的值:

  • hardware_destructive_interference_size是推荐的 可能被不同线程并发访问的两个对象之间的最小偏移量,再小的话就可能有性能损失, 因为共用的L1缓存会被影响。
  • hardware_constructive_interference_size是推荐的 两个想被放在同一个L1缓存行的对象合起来的最大大小。

这两个值都只是建议因为实际的值依赖于具体的架构。 这两个值只是编译器在生成支持的不同平台的代码时可以提供的最佳的值。 因此,如果你知道更好更准确的值,那就使用你知道的值。 不过使用这两个值要比使用假设的不同平台的固定大小更好。

这两个值都至少是alignof(std::max_align_t)。并且这两个值通常是相等的。 然而,从语义上讲,它们代表了不同的目的,所以你应该像下面这样根据情况使用它们:

  • 如果你想在 不同的线程里 访问两个不同的(原子)对象:
    1. struct Data {
    2. alignas(std::hardware_destructive_interference_size) int valueForThreadA;
    3. alignas(std::hardware_destructive_interference_size) int valueForThreadB;
    4. };
  • 如果你想在 同一个线程里 访问两个不同的(原子)对象: ```cpp struct Data { int valueForThraedA; int otherValueForTheThreadA; };

// 再检查一次我们能通过共享的cache行获得最佳性能 static_assert(sizeof(Data) <= std::hardware_constructive_interference_size);

// 确保对象被恰当的对齐: alignas(sizeof(Data)) Data myDataForAThread; ```