编程学习
🗒️C++多线程编程(8):细粒度操作
23836 分钟
2023-11-24
2023-11-26
70
type
status
date
slug
summary
tags
category
icon
password
Email
🏡
我的个人主页:https://www.helloylh.com/
📕
文章首发于我的个人博客:https://blog.helloylh.com/
💖
欢迎大佬们来逛逛,有任何问题欢迎给我留言或者加我的联系方式。

细粒度锁

细粒度锁是一种并发编程中的优化技术,其核心思想是将锁的粒度降低到最小,以减小锁竞争的概率,提高并发性能。在队列的实现中,细粒度锁通常指对队列的不同部分使用不同的锁,以减小锁的粒度。
什么是细粒度锁,用我自己的话来描述就是,当对某一数据结构操作的时候,对一个地方的改变不会影响另一个地方的变化,因此可以把这两个操作看作是互不干扰的,使用两个线程可以进行绝对的并行操作
如果我们只定义一个mutex,即不使用细粒度锁技术。则最先执行的线程进入locked状态,第二个线程就因为第一个线程已经拥有了锁而陷入阻塞状态,这实际上变成了并发操作。但是实际上它们两个执行的是对不同位置的数据元素进行操作的,根本不会发生任何由于访问同一临界资源而导致数据异常的情况。

非细粒度测试

下面进行测试,示例代码:
  • 首先是非细粒度锁,只有一个std::mutex,因此在执行modifyBlock的时候,由于前面一个线程已经占用了此临界资源,locked住了,因此其他的四个线程全部进入wait unlocked状态。
  • 因此这个程序表面上看是多线程的并行操作,或者说我们希望它是并行操作的,但是实际上却退化为了多线程的并发操作
并发与并行的概念:
  1. 并行操作(Parallelism): 并行操作指的是多个任务同时执行,通常在多核处理器上实现。这些任务可以独立执行,彼此之间没有依赖关系,因此可以同时进行,提高整体系统的吞吐量。在并行操作中,任务之间真正同时执行,各自独立运行,没有先后顺序。
  1. 并发操作(Concurrency): 并发操作指的是多个任务在同一时间段内交替执行,这并不意味着它们必须同时执行。在并发操作中,任务之间可能存在某种依赖关系,它们共享系统资源,通过时间片轮转等机制交替执行,从宏观上看就像是同时进行的。
  • 观察其输出可以发现:确实变成了并发操作。因为它们的操作都是1完成了,然后转交给2 … 这样的类似于串行的操作。但是每个线程的执行顺序是不同的。
//细粒度数组测试 class CoarseGrainedLockArray { public: CoarseGrainedLockArray(int num_blocks) : data(num_blocks, 0) { } void modifyBlock(int blockIndex, int value) { std::lock_guard<std::mutex> lock(mutex); data[blockIndex] += value; std::this_thread::sleep_for(std::chrono::microseconds(1000)); printf("%u: %d: %d\n",std::this_thread::get_id(), blockIndex,value); } std::vector<int> getData() const { return data; } private: std::vector<int> data; std::mutex mutex; }; void test() { const int numBlocks = 5; CoarseGrainedLockArray queue(numBlocks); // 多个线程并发地修改数组的不同部分 auto worker = [&](int blockIndex) { for (int i = 0; i < 10; ++i) { queue.modifyBlock(blockIndex, 1); } }; std::vector<std::thread> threads; for (int i = 0; i < numBlocks; ++i) { threads.emplace_back(worker, i); } for (auto& thread : threads) { thread.join(); } // 打印最终结果 auto result = queue.getData(); for (int value : result) { std::cout << value << " "; } }
点击查看输出结果
72524: 3: 1 72524: 3: 1 72524: 3: 1 72524: 3: 1 72524: 3: 1 72524: 3: 1 72524: 3: 1 72524: 3: 1 72524: 3: 1 72524: 3: 1 70084: 1: 1 70084: 1: 1 70084: 1: 1 70084: 1: 1 70084: 1: 1 70084: 1: 1 70084: 1: 1 70084: 1: 1 70084: 1: 1 70084: 1: 1 68568: 2: 1 68568: 2: 1 68568: 2: 1 68568: 2: 1 68568: 2: 1 68568: 2: 1 68568: 2: 1 68568: 2: 1 68568: 2: 1 68568: 2: 1 49296: 4: 1 49296: 4: 1 49296: 4: 1 49296: 4: 1 49296: 4: 1 49296: 4: 1 49296: 4: 1 49296: 4: 1 49296: 4: 1 49296: 4: 1 64316: 0: 1 64316: 0: 1 64316: 0: 1 64316: 0: 1 64316: 0: 1 64316: 0: 1 64316: 0: 1 64316: 0: 1 64316: 0: 1 64316: 0: 1

细粒度测试

但是当我们修改为细粒度锁形式时,代码示例:
  • 可以发现,我们为mutex创建了一个vector,并且规定其大小等于data元素的数组大小,这意味着什么?我们对于某个元素的操作不会影响另一个元素的行为,因此我们可以充分利用这种没有相互关联的关系来充分发挥多线程的并行特性。
  • 每个线程对于数组元素的操作索引都是不一致的,并且当前线程不会因为上一个占用了mutex而陷入阻塞,因为它们之间的mutex是不同的,它们占用不同的临界资源。
  • 观察输出可以发现,这确实实现了多线程的并行操作。因为它们的输出是交叉的,否则一定是按照线程的排列而打印的。
//细粒度数组测试 class CoarseGrainedLockArray { public: CoarseGrainedLockArray(int num_blocks) : data(num_blocks, 0) , mutexs(num_blocks){} void modifyBlock(int blockIndex, int value) { std::lock_guard<std::mutex> lock(mutexs[blockIndex]); data[blockIndex] += value; std::this_thread::sleep_for(std::chrono::microseconds(1000)); printf("%u: %d: %d\n",std::this_thread::get_id(), blockIndex,value); } std::vector<int> getData() const { return data; } private: std::vector<int> data; std::vector<std::mutex> mutexs; #endif };
点击查看输出结果
62720: 1: 1 72332: 3: 1 63280: 0: 1 64900: 4: 1 33800: 2: 1 64900: 4: 1 62720: 1: 1 63280: 0: 1 33800: 2: 1 72332: 3: 1 62720: 1: 1 63280: 0: 1 64900: 4: 1 33800: 2: 1 72332: 3: 1 64900: 4: 1 62720: 1: 1 63280: 0: 1 72332: 3: 1 33800: 2: 1 62720: 1: 1 72332: 3: 1 63280: 0: 1 64900: 4: 1 33800: 2: 1 64900: 4: 1 33800: 2: 1 63280: 0: 1 72332: 3: 1 62720: 1: 1 63280: 0: 1 64900: 4: 1 62720: 1: 1 72332: 3: 1 33800: 2: 1 62720: 1: 1 33800: 2: 1 64900: 4: 1 72332: 3: 1 63280: 0: 1 33800: 2: 1 63280: 0: 1 62720: 1: 1 72332: 3: 1 64900: 4: 1 63280: 0: 1 62720: 1: 1 64900: 4: 1 72332: 3: 1 33800: 2: 1
观察其运行时间会明显察觉到第二个细粒度锁的时间会快很多。

简单总结

使用细粒度锁(XILIDU = 1)
  • 在这种情况下,每个数组块都有独立的锁,因此多个线程可以同时修改不同的数组块。在这种情况下,你可以观察到多个线程交替执行,并且打印的结果中各个块的修改是交叉进行的。这符合并行的概念,因为多个任务在同一时间段内并行执行。
不使用细粒度锁(XILIDU = 0)
  • 在这种情况下,所有数组块共享一个锁。这意味着同一时间只有一个线程可以修改数组的任何部分,其他线程必须等待。这种情况下,线程在修改数组块时是串行的,一个完成后另一个才能开始。虽然多个线程仍在执行,但它们在数组修改上的操作是并发的,因为它们可能在任何时间点处于执行状态。这种情况也可以被称为并发,但由于共享锁的存在,效果不如使用细粒度锁时那么明显。
总结:
  • 使用细粒度锁时,多个线程可以并行修改不同的数组块,表现为更好的并行性。
  • 不使用细粒度锁时,虽然线程依然是并发执行,但由于共享锁,各线程在数组块修改上的操作可能是串行的,效果相对较差。
细粒度锁的缺点:
  • 复杂性增加,引入细粒度锁则代表你需要对每一个不同的数据对象执行更加细致的锁的操控。
  • 内存开销增大,锁的数量会明显增多。

示例:细粒度锁队列实现

队列:FIFO,先进先出。我们在此使用链表实现队列
  • push操作:往链表的末尾插入数据
  • pop操作:队列头部取出元素
  • 使用head和tail双指针维护链表
先看代码:
#include <iostream> #include <vector> #include <thread> #include <mutex> #include <stdexcept> class EmptyQueueError : public std::exception { public: const char* what() const noexcept override { return "Queue is empty!"; } }; template <typename T> class ThreadSafeQueue { private: struct Node { std::shared_ptr<T> value; std::unique_ptr<Node> next; Node(std::shared_ptr<T> val) : value(val), next(nullptr) {} }; Node* get_tail() { std::lock_guard<std::mutex> lock(tail_mutex); return tail; } std::unique_ptr<Node> pop_head() { std::lock_guard<std::mutex> lock(head_mutex); if (head.get() == get_tail()) { return nullptr; } std::unique_ptr<Node> old_head_node = std::move(head); head = std::move(old_head_node->next); return old_head_node; } public: ThreadSafeQueue() : head(new Node(std::make_shared<T>())), tail(head.get()) {} ~ThreadSafeQueue() { } void push(T value) { std::shared_ptr<T> new_value = std::make_shared<T>(std::move(value)); std::unique_ptr<Node> new_node(new Node(new_value)); Node* new_tail = new_node.get(); //对表尾操作 std::lock_guard<std::mutex> lock(tail_mutex); tail->next = std::move(new_node); tail = new_tail; } std::shared_ptr<T> try_pop() { std::unique_ptr<Node> r_head = pop_head(); if (!r_head) { throw EmptyQueueError(); } return r_head->value; } private: std::unique_ptr<Node> head; Node* tail; std::mutex head_mutex, tail_mutex; }; template <typename T> void insertData(ThreadSafeQueue<T>& safe_queue, T value) { safe_queue.push(value); } template <typename T> void popData(ThreadSafeQueue<T>& safe_queue) { try { std::shared_ptr<T> get_value = safe_queue.try_pop(); printf("id:%u --> %d\n",std::this_thread::get_id(), *get_value); } catch (const EmptyQueueError& err) { printf("id:%u ---> %s\n", std::this_thread::get_id(), err.what()); } } void test() { ThreadSafeQueue<int> t; std::vector<std::thread> threads; for (int i = 0; i < 10; i++) { threads.emplace_back(std::thread(insertData<int>, std::ref(t), i)); threads.emplace_back(std::thread(popData<int>, std::ref(t))); } for (auto& x : threads) { x.join(); } }
程序说明:
  • Node节点类型:
    • 共享指针变量:value表示每一个节点的值。
    • 独享指针变量:next表示当前节点的下一个节点。
      • 便于实现所有权的转移。
      • 进行内存管理。
  • head 是链表头部,是独享的。tail为了便于操作,因此设置为普通指针变量
  • head_mutex:实现细粒度锁的关键:对head进行相关操作时加锁,实现一个锁维护一个变量。
  • tail_mutex:实现细粒度锁的关键:对tail进行相关操作时加锁,实现一个锁维护一个变量。
  • 为什么使用std::move?
    • 便于实现std::unique_ptr的所有权转移,因为其无法实现拷贝构造(被删除),只能通过移动构造来实现转移。
    • std::move 并不会执行实际的资源移动,它只是将右值引用传递给目标对象,告诉编译器可以将资源所有权从一个对象转移到另一个对象。因此,使用 std::move 是确保正确的所有权转移的标准做法。
    • 实际的移动是由std::unique_ptr或者其他类中的成员函数实现的,例如移动构造函数和移动赋值运算符。
点击查看此程序的运行结果
id:73220 --> 0 id:65960 --> 2 id:64876 ---> Queue is empty! id:70584 --> 0 id:61660 --> 3 id:67348 --> 1 id:73736 --> 4 id:73744 --> 5 id:73752 --> 6 id:73760 --> 7
 
细粒度锁实现的队列是绝对线程安全的。

参考资料:

评论
  • Twikoo
  • Valine

浅色模式