作者: 雪山肥鱼
时间:20210425 23:28
目的:锁的应用
Concurrent Counter
simple but not scalable
Scalable Counting - Sloppy Counting
Concurrent Linked List
Concurrent Queue
Concurrent Hash Table
围绕这锁是如何用在常见的数据结构中的,从而保证线程安全。这些锁是如何应用的决定了数据之间的正确性和性能。
Concurrent Counter
Simple But Not Scalable
typedef struct __counter_t {
int value;
pthread_mutex_t lock;
} counter_t;
void init(counter_t *c) {
c->value = 0;
Pthread_mutex_init(&c->lock, NULL);
}
void increment(counter_t *c) {
Pthread_mutex_lock(&c->lock);
c->value++;
Pthread_mutex_unlock(&c->lock);
}
void decrement(counter_t *c) {
Pthread_mutex_lock(&c->lock);
c->value--;
Pthread_mutex_unlock(&c->lock);
}
int get(counter_t *c) {
Pthread_mutex_lock(&c->lock);
int rc = c->value;
Pthread_mutex_unlock(&c->lock);
return rc;
}
实现手法简单粗暴,正确性有所保证。但是效率太低了。
速度对比.png
实验环境:iMac with four Intel 2.7 GHz i5 CPUs; with more CPUs active
即使多核情况下,假设将共享变量counter 跟新100w次。图中单线程更新速度大概在0.03s,而2个线程则超过5s. 性能相差巨大。
Scalable Counting - Sloppy Counting
个人认为上述,多CPU 多线程 同步慢,除了调度线程时的上下文切换,另一主要原因是 cache 的同步问题。多核多线程的cache MESI 同步势必会对速度造成影响。那么每个线程,本地一个变量,然后一个全局变量,不走MESI(或者其他同类)。定时向内存中的全局变量刷新,local 变量向global全局变量刷新后,会清零,这样速度会有明显提升。
刷新频率速度会由阈值threashold设定。刷新频率高(S值较小),则性能下降,和原来的方法并无差异,刷新频率适当会提高速度。如图所示:
对于imac的测试环境来说,我们设定4个本地变量,1个全局变量, 想过如速度对比这张图,效果相当明显
coding (rough version):
typedef struct __counter_t {
int global // global count
pthread_mutex_t glock; //global lock;
int lock[NUMCPUS]; //local count per cpu
pthread_mutex_t llock[NUNMCPUS];// locks
int threashold
}counter_t;
void init(counter_t *c, int threshold) {
c->threshold = threshold;
c->global = 0;
pthread_mutex_init(&c->glock, NULL);
int i;
for(i = 0; i< NUMCPUS; i++) {
c->lock[i] = 0;
pthread_mutex_init(&c->llock[i], NULL);
}
}
void update(counter_t *c, int threadID, int amt) {
int cpu = threadID % NUMCPUS;
pthread_mutex_lock(&c->llock[cpu]);
c->local[cpu] += amt;
if(c->local[cpu] >= c->threashold) {
pthread_mutex_lock(&c->glock);
c->global += c->local[cpu];
pthread_mutex_unlock(&c->glock);
c->local[cpu] = 0;
}
pthread_mutex_unlock(&c->llock[cpu];
}
int get(counter_t *c) {
pthread_mutex_lock(&c->glock);
int val = c->global;
pthread_mutex_unlock(&c->glock);
return val;
}
上述代码有个疑问,线程ID%NUMCPU 就会得到 线程所在的CPU吗?回头要实验以下。
Concurrent Linked List
typedef struct __node_t {
int key;
struct __node_t *next;
}node_t;
typdef struct __list_t {
node_t * head;
pthread_mutex_t lock;
}list_t;
void list_init(list*t L) {
L->head = NULL;
pthread_mutex_init(&L->lock, NULL);
}
/*
int list_insert(list_t *L, int key) {
pthread_mutex_lock(&L->lock);
node_t * new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
pthread_mutex_unlock(&L->lock);
return -1; //fail
}
new->key = key;
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);
return 0;
}
*/
int list_insert(list_t *L, int key) {
//sychronization not needed
node_t * new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
return -1; //fail
}
new->key = key;
pthread_mutex_lock(&L->lock);
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);
return 0;
}
/*
int list_lookup(list_t *L, int key) {
pthread_mutex_lock(&L->lock);
node_t * curr = L->head;
while(curr) {
if(curr->key == key) {
pthread_mutex_unlock(&L->lock);
return 0;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock);
return -1; //failure
}
*/
int list_lookup(list_t *L, int key) {
int rv = -1;
pthread_mutex_lock(&L->lock);
node_t * curr = L->head;
while(curr) {
if(curr->key == key) {
rv = 0;
break;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock);
return rv; //failure
}
这里在malloc之后上锁的原因,书中给出的原因时,malloc如果申请内存失败,代码也需要及时释放锁。索性在malloc之后上锁。
不必担心malloc的线程安全性。
同时无论对于寻找还是插入函数,释放锁及返回的路径单一。保证安全性。
更多并发,并不代表着速度更快
多线程设计之中,流程设计的不合理会导致程序夯住,其中一个线程挂了,整个进程会挂掉,多线程中产生的内存泄漏问题也值得注意
Concurrent Queue
typedef struct __node_t {
int value;
struct __node_t * next;
}node_t;
typedef struct __queue_t {
node_t *head;
node_t * tail;
pthread_mutex_t headLock;
pthread_mutex_t tailLock;
}queue_t;
void Queue_Init(queue_t *q) {
node_t * tmp = malloc(sizeof(node_t));
tmp->next = NULL;
q->head = q->tail = tmp;
pthread_mutex_init(&q->headLock, NULL);
pthread_mutex_init(&q->tailLock, NULL);
}
void Queue_Enqueue(queue_t *q, int value) {
node_t * tmp = malloc(sizeof(node_t);
assert(tmp!=NULL);
tmp->value = value;
tmp->next = NULL;
pthread_mutex_lock(&q->taiLock);
q->tail->next = tmp;
q->tai =tmp;
pthread_mutex_unlock(&q->taiLock);
}
int Queue_Dequeue(queue_t *q, int value) {
pthread_mutex_lock(&q->headLock);
node_t * tmp = q->head;
node_t * newHead = tmp->next;
if(newHead == NULL) {
pthread_mutex_unlock(&q->headLock);
return -1;//queue was empty
}
*value = newHead->value;
q->head = newHead;
pthread_mutex_unlock(&q->headLock);
free(tmp);
return 0
}
Concurrent Hash Table
#define BUCKETS(101)
typedef struct __hash_t {
list_t lists[BUCKETS];
}hash_t;
void Hash_Init(hash_t * H) {
int i;
for(i =0;i<BUCKETS;i++) {
List_Init(&H->lists[i]);
}
}
int Hash_Insert(hash_t *H, int key) {
int bucket = key % BUCKETS;
return List_Insert(&H->lists[buket], key);
}
int Hash_Lookup(hash_t *H, int key) {
int bucket = key %% BUCKETS;
return List_lookup(&H->lists[bucket], key);
}
效率高 每个 bucket 管理的list,有一把锁。 比list的例子中的全局一把锁的效果要好很多
效率对比.png
网友评论