美文网首页
并发控制

并发控制

作者: 小跑001 | 来源:发表于2020-06-06 14:04 被阅读0次

1 c++中的锁和条件变量实现状态同步

#include <queue>
#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>

using std::vector;
using std::string;
using std::queue;
using std::mutex;
using std::thread;

class Message {
 public:

     void Put(const string& msg) {
         std::lock_guard<std::mutex> guard(msgs_lock_);
         msgs_.push(msg);
     }

     bool Get(string& out_msg) {
         std::lock_guard<std::mutex> guard(msgs_lock_);
         if (!msgs_.empty()) {
             out_msg = msgs_.front();
             msgs_.pop();
             return true;
         }
         return false;
     }

 private:
     queue<string> msgs_;
     mutex msgs_lock_;
};

    
Message g_message_mgr;

void produce() {
    for(;;) {
        string msg = "some rand msg";
        g_message_mgr.Put(msg);
        std::this_thread::sleep_for(std::chrono::seconds(2));
    }
}

void cosumer() {
    for(;;) {
        string msg;
        if (g_message_mgr.Get(msg)) {
            // do sth with msg
            std::cout << msg << std::endl;
        }
    }
}

int main() {
    thread t1(produce);
    thread t2(cosumer);
    t1.join();
    t2.join();

    return 0;
}

  • 以上程序是一个生产者和一个消费者典型例子, 运行之后发现程序cpu高达99%.
  • 改善Message类
class Message {
 public:

     void Put(const string& msg) {
         std::unique_lock<std::mutex> guard(msgs_lock_);

         bool need_notify = false;
         if (msgs_.empty()) {
             need_notify = true;
         }

         msgs_.push(msg);

         if (need_notify) {
             cv_.notify_all();
         }
     }

     bool Get(string& out_msg) {
          std::unique_lock<std::mutex> guard(msgs_lock_);
         if (!msgs_.empty()) {
             out_msg = msgs_.front();
             msgs_.pop();
             return true;
         } else {
             cv_.wait(guard, [this]{return msgs_.empty() == false;});
         }
         return false;
     }

 private:
     queue<string> msgs_;
     mutex msgs_lock_;
     condition_variable cv_;
};
  • 改善之后cpu使用率降低为0%

  • 由上可以看出:

    • mutex只是起到了互斥作用, 并不知道资源实际使用情况, 造成了在没消息的时候去死循环检查, 浪费cpu
    • condition_variable条件变量可以根据条件来选择挂起和触发, 没消息的时候选择休眠, 有消息的时候被唤醒或者不会被挂起, 避免消费者自己死循环的去查找而浪费cpu.

    在实际编程中, 生产者可能想关闭队列并且希望消费者都能捕获这个消息. 那么就要加一个close方法来做处理. close之后需要注意几点:

    • close之后生产者不能再放消息进去
    • close之后消费者需要消费掉所有的消息再捕获到close的事件
    • 另外, Get获取不到需要阻塞, 如果没消息了就返回false.
      改善后的代码如下:
class Message {
 public:

     enum {
         GET_NORMAL,
         GET_NONE,
         GET_OVER,
     };

     bool Put(const string& msg) {
         std::unique_lock<std::mutex> guard(msgs_lock_);
         if (is_over_) {
             return false;
         }

         bool need_notify = false;
         if (msgs_.empty()) {
             need_notify = true;
         }

         msgs_.push(msg);

         if (need_notify) {
             cv_.notify_all();
         }

         return true;
     }

   bool Get(string& out_msg) {
         for(;;) {
             int ret = get(out_msg);
             if (ret == GET_NORMAL) {
                 return true;
             } else if (ret == GET_NONE) {
                 continue;
             } else if (ret == GET_OVER) {
                 return false;
             } else {
                 assert(false);
             }
         }
     }

     int get(string& out_msg) {
         std::unique_lock<std::mutex> guard(msgs_lock_);
         if (!msgs_.empty()) {
             out_msg = msgs_.front();
             msgs_.pop();
             return GET_NORMAL;
         } else if (is_over_) {
             return GET_OVER;
         } else {
             cv_.wait(guard, [this]{return msgs_.empty() == false || is_over_;});  // 这里is_over 注意要捕获, 不然可能会阻塞在这里.
             return GET_NONE;
         }
     }

     void Close() {
          std::unique_lock<std::mutex> guard(msgs_lock_);
          is_over_ = true;
          cv_.notify_all();
     }

 private:
     queue<string> msgs_;
     mutex msgs_lock_;
     condition_variable cv_;
     bool is_over_ = false;
};

2 信号量的同步

从第一节看出是由锁和条件变量来实现状态同步的, 而信号量也可以实现同样的功能, 参考伪代码:

produce:
    P(emptyCount)
    P(useQueue)
    putItemIntoQueue(item)
    V(useQueue)
    V(fullCount)
consume:
    P(fullCount)
    P(useQueue)
    item ← getItemFromQueue()
    V(useQueue)
    V(emptyCount)
  • emptyCount 初始资源大小为 N, fullCount 初始化大小是 0, 并且useQueue is 初始化大小是1.

  • P代表减1, V代表加1

  • 通过P(emptyCount)来检查共享资源, 当资源不够时候线程被挂起, 直到V(emptyCount)释放了资源. fullCount同理.

    以上P和V都是原子操作, 提供了检查资源, 挂起, 事件注册与通知等功能. 简而言之是一种共享锁(相对互斥锁来说). 当资源总共为1的时候, 同互斥锁.
    信号量也可以实现类似wait功能, 但是参考golang里面的waitgroup, 还是有一些不足, 例如在通知的时候信号量如何做到广播通知所有wait的线程, 相对来说信号量更偏向对资源的同步控制.

3 锁与信号量

锁和信号量都能实现类似的功能, 在一些复杂的场景, 例如多种相互依赖的状态需要同步的时候通过锁和条件变量更方便实现, 但是要注意锁的串行特性, 不要锁住耗时多的代码.

相关文章

  • 数据库并发控制——悲观锁、乐观锁、MVCC

    三种并发控制:悲观并发控制、乐观并发控制、多版本并发控制。 悲观并发控制(又名“悲观锁”,Pessimistic ...

  • MySQL系列之三 -- -并发(MVCC)

    MySQL 并发控制如何实现 MySQL 如何实现高并发? 一 并发控制 抛开MySQL,通过技术上来讨论并发控制...

  • 乐观锁与悲观锁

    乐观并发控制(乐观锁)和悲观并发控制(悲观锁)是并发控制主要采用的技术手段 悲观并发控制(悲观锁) 它可以阻止一个...

  • iOS多线程随笔

    1. 多线程的并发控制 1.1 在CGD中快速实现多线程的并发控制 NSOperationQueue来处理并发控制...

  • 你应该了解的MySQL锁分类

    MySQL中的锁 锁是为了解决并发环境下资源竞争的手段,其中乐观并发控制,悲观并发控制和多版本并发控制是数据库并发...

  • Linux 驱动开发之并发控制

    1 设备驱动的并发控制概览 2 参考 Linux并发控制

  • MySQL多版本并发控制 - MVCC

    并发控制 实现事务隔离的机制,称之为并发控制 所谓并发控制,就是保证并发执行的事务在某一隔离级别上的正确执行的机制...

  • Dubbo剖析-并发控制

    一、前言 前面讲解了Dubbo的服务降级,本节我们来讲解dubbo中的并发控制,并发控制分为客户端并发控制和服务端...

  • 并发控制

    并发操作带来的数据不一致性有 丢失修改两个事务读入同一数据 不可重复读2次读取未用数据,前后不一致(为了检验) 读...

  • 并发控制

    并发操作带来的问题 丢失修改,事务T1和T2同时对同一个记录进行修改,会出现结果覆盖的现象 读脏数...

网友评论

      本文标题:并发控制

      本文链接:https://www.haomeiwen.com/subject/eyattktx.html