美文网首页
线程(三):锁的应用

线程(三):锁的应用

作者: 404Not_Found | 来源:发表于2021-04-25 00:17 被阅读0次

    作者: 雪山肥鱼
    时间:20210425 23:28
    目的:锁的应用

    Concurrent Counter
      simple but not scalable
    Scalable Counting - Sloppy Counting
    Concurrent Linked List
    Concurrent Queue
    Concurrent Hash Table
    

    围绕这锁是如何用在常见的数据结构中的,从而保证线程安全。这些锁是如何应用的决定了数据之间的正确性和性能。

    Concurrent Counter

    Simple But Not Scalable

    typedef struct __counter_t {
      int value;
      pthread_mutex_t lock;
    } counter_t;
    
    void init(counter_t *c) {
     c->value = 0;
     Pthread_mutex_init(&c->lock, NULL);
    }
    
    void increment(counter_t *c) {
     Pthread_mutex_lock(&c->lock);
     c->value++;
     Pthread_mutex_unlock(&c->lock);
    }
    
    void decrement(counter_t *c) {
     Pthread_mutex_lock(&c->lock);
     c->value--;
     Pthread_mutex_unlock(&c->lock);
    }
    
    int get(counter_t *c) {
     Pthread_mutex_lock(&c->lock);
     int rc = c->value;
     Pthread_mutex_unlock(&c->lock);
     return rc;
    }
    

    实现手法简单粗暴,正确性有所保证。但是效率太低了。


    速度对比.png

    实验环境:iMac with four Intel 2.7 GHz i5 CPUs; with more CPUs active
    即使多核情况下,假设将共享变量counter 跟新100w次。图中单线程更新速度大概在0.03s,而2个线程则超过5s. 性能相差巨大。

    Scalable Counting - Sloppy Counting

    个人认为上述,多CPU 多线程 同步慢,除了调度线程时的上下文切换,另一主要原因是 cache 的同步问题。多核多线程的cache MESI 同步势必会对速度造成影响。那么每个线程,本地一个变量,然后一个全局变量,不走MESI(或者其他同类)。定时向内存中的全局变量刷新,local 变量向global全局变量刷新后,会清零,这样速度会有明显提升。
    刷新频率速度会由阈值threashold设定。刷新频率高(S值较小),则性能下降,和原来的方法并无差异,刷新频率适当会提高速度。如图所示:

    刷新频率S = 5.png
    对于imac的测试环境来说,我们设定4个本地变量,1个全局变量, 想过如速度对比这张图,效果相当明显
    coding (rough version):
    typedef struct __counter_t {
      int global // global count
      pthread_mutex_t glock; //global lock;
      int lock[NUMCPUS]; //local count per cpu
      pthread_mutex_t llock[NUNMCPUS];// locks
      int  threashold
    }counter_t;
    
    void init(counter_t *c, int threshold) {
      c->threshold = threshold;
      c->global = 0;
      pthread_mutex_init(&c->glock, NULL);
      int i;
      for(i = 0; i< NUMCPUS; i++) {
        c->lock[i] = 0;
        pthread_mutex_init(&c->llock[i], NULL);
      }
    }
    
    void update(counter_t *c, int threadID, int amt) {
      int cpu = threadID % NUMCPUS;
      pthread_mutex_lock(&c->llock[cpu]);
      c->local[cpu] += amt;
      if(c->local[cpu] >= c->threashold) {
        pthread_mutex_lock(&c->glock);
        c->global += c->local[cpu];
        pthread_mutex_unlock(&c->glock);
        c->local[cpu] = 0;
      }
      pthread_mutex_unlock(&c->llock[cpu];
    }
    
    int get(counter_t *c) {
      pthread_mutex_lock(&c->glock);
      int val = c->global;
      pthread_mutex_unlock(&c->glock);
      return val;
    }
    
    

    上述代码有个疑问,线程ID%NUMCPU 就会得到 线程所在的CPU吗?回头要实验以下。

    Concurrent Linked List

    typedef struct __node_t {
      int key;
      struct __node_t *next;
    }node_t;
    
    typdef struct __list_t {
      node_t * head;
      pthread_mutex_t lock;
    }list_t;
    
    void list_init(list*t L) {
      L->head = NULL;
      pthread_mutex_init(&L->lock, NULL);
    }
    /*
    int list_insert(list_t *L, int key) {
      pthread_mutex_lock(&L->lock);
      node_t * new = malloc(sizeof(node_t));
      if (new == NULL) {
        perror("malloc");
        pthread_mutex_unlock(&L->lock);
        return -1; //fail
      }
      new->key = key;
      new->next = L->head;
      L->head = new;
      pthread_mutex_unlock(&L->lock);
      return 0;
    }
    */
    
    int list_insert(list_t *L, int key) {
      //sychronization not needed
      node_t * new = malloc(sizeof(node_t));
      if (new == NULL) {
        perror("malloc");
        return -1; //fail
      }
      new->key = key;
      pthread_mutex_lock(&L->lock);
      new->next = L->head;
      L->head = new;
      pthread_mutex_unlock(&L->lock);
      return 0;
    }
    
    /*
    int list_lookup(list_t *L, int key) {
      pthread_mutex_lock(&L->lock);
      node_t * curr = L->head;
      while(curr) {
        if(curr->key == key) {
          pthread_mutex_unlock(&L->lock);
          return 0;
        }
        curr = curr->next;
      }
      pthread_mutex_unlock(&L->lock);
      return -1; //failure
    }
    */
    
    int list_lookup(list_t *L, int key) {
      int rv = -1;
      pthread_mutex_lock(&L->lock);
      node_t * curr = L->head;
      while(curr) {
        if(curr->key == key) {
          rv = 0;
          break;
        }
        curr = curr->next;
      }
      pthread_mutex_unlock(&L->lock);
      return rv; //failure
    }
    

    这里在malloc之后上锁的原因,书中给出的原因时,malloc如果申请内存失败,代码也需要及时释放锁。索性在malloc之后上锁。
    不必担心malloc的线程安全性。

    同时无论对于寻找还是插入函数,释放锁及返回的路径单一。保证安全性。

    更多并发,并不代表着速度更快
    多线程设计之中,流程设计的不合理会导致程序夯住,其中一个线程挂了,整个进程会挂掉,多线程中产生的内存泄漏问题也值得注意

    Concurrent Queue

    typedef struct __node_t {
      int value;
      struct __node_t * next;
    }node_t;
    
    typedef struct __queue_t {
      node_t *head;
      node_t * tail;
      pthread_mutex_t headLock;
      pthread_mutex_t tailLock;
    }queue_t;
    
    void Queue_Init(queue_t *q) {
      node_t * tmp = malloc(sizeof(node_t));
      tmp->next = NULL;
      q->head = q->tail = tmp;
      pthread_mutex_init(&q->headLock, NULL);
      pthread_mutex_init(&q->tailLock, NULL);
    }
    
    void Queue_Enqueue(queue_t *q, int value) {
      node_t * tmp = malloc(sizeof(node_t);
      assert(tmp!=NULL);
      tmp->value = value;
      tmp->next = NULL;
    
      pthread_mutex_lock(&q->taiLock);
      q->tail->next = tmp;
      q->tai =tmp;
      pthread_mutex_unlock(&q->taiLock);
    }
    
    int Queue_Dequeue(queue_t *q, int value) {
      pthread_mutex_lock(&q->headLock);
      node_t * tmp = q->head;
      node_t * newHead = tmp->next;
      if(newHead == NULL) {
        pthread_mutex_unlock(&q->headLock);
      return -1;//queue was empty
      }
      *value = newHead->value;
      q->head = newHead;
      pthread_mutex_unlock(&q->headLock);
      free(tmp);
      return 0
    }
    

    Concurrent Hash Table

    #define BUCKETS(101)
    
    typedef struct __hash_t {  
      list_t lists[BUCKETS];
    }hash_t;
    
    void Hash_Init(hash_t * H) {
      int i;
      for(i =0;i<BUCKETS;i++) {
        List_Init(&H->lists[i]);
      }
    }
    
    int Hash_Insert(hash_t *H, int key) {
       int bucket = key % BUCKETS;
      return List_Insert(&H->lists[buket], key);
    }
    
    int Hash_Lookup(hash_t *H, int key) {
      int bucket = key %% BUCKETS;
      return List_lookup(&H->lists[bucket], key);
    }
    

    效率高 每个 bucket 管理的list,有一把锁。 比list的例子中的全局一把锁的效果要好很多


    效率对比.png

    相关文章

      网友评论

          本文标题:线程(三):锁的应用

          本文链接:https://www.haomeiwen.com/subject/zaxsrltx.html