前言:这篇文章硬核了,我们来看看 pthread 的读写锁是怎么实现的
0X00 读写锁的感性认识
最适合用来理解「读写锁」的就是访问银行账户余额了
- 多个账户可以同时读出某个账户的余额
- 但一旦有一个
写线程
要修改账户的余额的时候,该写线程
就必须等待所有读线程
结束 - 然后只允许该
写线程
修改账户余额 - 在
写线程
没有结束之前,任何读线程
都不能读出余额
0X01 与读写锁相关的函数
#include <pthread.h>
int pthread_rwlock_rdlock (pthread_rwlock_t *rwptr);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_unlock (pthread_rwlock_t *rwptr);
上述函数的作用显而易见,其中 pthread_rwlock_rdlock 和 pthread_rwlock_wrlock 都是阻塞
的函数
#include <pthread.h>
int pthread_rwlock_tryrdlock (pthread_rwlock_t *rwptr);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwptr);
这上面的函数作用也是显而易见,但是这是非阻塞的函数,如果没有获取成功,会立即返回一个值,而不是阻塞
同样对读写锁
的属性也有相应的函数
#include <pthread.h>
int pthread_rwlock_init (pthread_rwlock_t *rwprr, const pthread_rwlockattr_t *attr);
int pthread_rwlock_destroy (pthread_rwlock_t *rwptr);
前者我们可以用来初始化一个读写锁
,而当一个线程不再需要某个读写锁的时候,可以用后者摧毁这个读写锁
更多的内容可以参考:《UNIX 网络编程进程间通信》8.3
0X02 读写锁的底层实现
接下来我们将学习如何用「条件变量」和「互斥锁」实现一个读写锁
首先我们得知道读写锁有哪些特性:
- 上读锁的成功的时候,上写锁阻塞,但是
我们来看读写锁的 struct
typedef struct {
pthread_mutex_t rw_mutex; /*最基础的读写互斥锁*/
pthread_cond_t rw_condreaders; /* 读等待的条件变量 */
pthread_cond_t rw_condwriters; /* 写等待的条件变量 */
int rw_magic;
int rw_nwaitreaders;
int rw_nwaitwriters;
int rw_refcount;
} pthread_rwlock_t;
主要说后面几个 int 成员的作用
- rw_magic
这个成员用来标志「读写锁」是不是初始化成功,是不是被摧毁了。
我们看后面的 init 函数,rw_magic 的赋值一定是在最后面。而看 destory 函数 rw_magic 的赋值一定是在最前面。
因为这就是一个标志,init 的时候,rw_magic 被置数 RW_MAGIC,标志这个「读写锁」全部成员初始化成功,而 destory 的时候,一开始就要将 rw_magic 置 0。哪怕这个其他成员还没来得及 destory 就调度到其他代码了,也能知道这个锁被 destory 了
- rw_waitreaders 和 rw_nwaitwriters
这两个成员就显而易见了,这个读写锁的等待的「读」和「写」的数
- rw_refcount
这个成员用来表示本「读写锁」的状态,0 的时候,表示这个锁可用,-1 的时候表示这是一个写锁,大于 0 的值意味着它当前容纳着多少个「读锁」
接下来我们介绍一些相关的函数
:
初始化与摧毁
int
pthread_rwlock_init(pthread_rwlock_t *rw, pthread_rwlockattr_t *attr)
{
int result;
// 不支持初始化的时候申明属性
if (attr != NULL)
return(EINVAL); /* not supported */
// 显而易见初始化一个互斥锁和两个条件变量
if ( (result = pthread_mutex_init(&rw->rw_mutex, NULL)) != 0)
goto err1;
if ( (result = pthread_cond_init(&rw->rw_condreaders, NULL)) != 0)
goto err2;
if ( (result = pthread_cond_init(&rw->rw_condwriters, NULL)) != 0)
goto err3;
rw->rw_nwaitreaders = 0;
rw->rw_nwaitwriters = 0;
rw->rw_refcount = 0;
rw->rw_magic = RW_MAGIC;
return(0);
// 如果初始化失败,则毁掉之间创建好的互斥锁或者条件变量,并报错
err3:
pthread_cond_destroy(&rw->rw_condreaders);
err2:
pthread_mutex_destroy(&rw->rw_mutex);
err1:
return(result); /* an errno value */
}
pthread_rwlock_destroy(pthread_rwlock_t *rw)
{
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
// 确保在没有使用的情况下销毁
if (rw->rw_refcount != 0 ||
rw->rw_nwaitreaders != 0 || rw->rw_nwaitwriters != 0)
return(EBUSY);
pthread_mutex_destroy(&rw->rw_mutex);
pthread_cond_destroy(&rw->rw_condreaders);
pthread_cond_destroy(&rw->rw_condwriters);
// 标记已摧毁
rw->rw_magic = 0;
return(0);
}
开始难一点的函数剖析了:
int
pthread_rwlock_rdlock(pthread_rwlock_t *rw)
{
int result;
// 检查这个锁是不是被摧毁了
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
// 每个相关的函数都必须给这个「读写锁」的互斥锁上锁,用来保护当前「读写锁」中的数据
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
// a 检查当前锁是不是读写锁 b 检查当前是否有写锁在等待
// 如果有写锁在等待,就阻塞自己,等待读的条件变量
while (rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0) {
rw->rw_nwaitreaders++;
result = pthread_cond_wait(&rw->rw_condreaders, &rw->rw_mutex);
rw->rw_nwaitreaders--;
if (result != 0)
break;
}
// 取得「读锁」的时候 rw_refcount + 1,表示此时又多容纳了一个新的「读锁」
if (result == 0)
rw->rw_refcount++; /* another reader has a read lock */
pthread_mutex_unlock(&rw->rw_mutex);
return (result);
}
接下来是是读锁的非阻塞版本
pthread_rwlock_tryrdlock,这个非常简单,并没有想象中那么难
int
pthread_rwlock_tryrdlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
if (rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0)
result = EBUSY; /* held by a writer or waiting writers */
else
rw->rw_refcount++; /* increment count of reader locks */
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
非阻塞也不是意味着用的都是非阻塞函数,这个非阻塞的版本与之前的阻塞版本也没什么不同,只是不再阻塞等待「写锁」了
我们再来看看「写锁」
int
pthread_rwlock_wrlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
// 有两种情况 写锁需要阻塞
// a 当前锁是一个写锁 也就是 rw->rw_refcount = -1
// b 有读锁拥有当前锁,此时 rw->rw_refcount > 0
// 所以 rw->rw_refcount != 0 的时候必须阻塞
while (rw->rw_refcount != 0) {
rw->rw_nwaitwriters++;
result = pthread_cond_wait(&rw->rw_condwriters, &rw->rw_mutex);
rw->rw_nwaitwriters--;
if (result != 0)
break;
}
if (result == 0)
rw->rw_refcount = -1;
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
而写锁不必等待读锁的条件变量,只需要读锁自己解锁以后 rw->rw_refcount-- 就 ok
我们再来看看上写锁
的非阻塞版本:
int
pthread_rwlock_trywrlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
if (rw->rw_refcount != 0)
result = EBUSY; /* held by either writer or reader(s) */
else
rw->rw_refcount = -1; /* available, indicate a writer has it */
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
这个很简单就不说了
最后我们来看看如何给当前「读写锁」解锁的 pthread_rwlock_unlock:
int
pthread_rwlock_unlock(pthread_rwlock_t *rw)
{
int result;
if (rw->rw_magic != RW_MAGIC)
return(EINVAL);
if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
return(result);
if (rw->rw_refcount > 0)
rw->rw_refcount--; /* 释放一个读 */
else if (rw->rw_refcount == -1)
rw->rw_refcount = 0; /* 释放一个写 */
else
err_dump("rw_refcount = %d", rw->rw_refcount);
if (rw->rw_nwaitwriters > 0) {
// 唤起一个写
if (rw->rw_refcount == 0)
result = pthread_cond_signal(&rw->rw_condwriters);
} else if (rw->rw_nwaitreaders > 0)
// 唤起所有读
result = pthread_cond_broadcast(&rw->rw_condreaders);
pthread_mutex_unlock(&rw->rw_mutex);
return(result);
}
更详细的内容可以参考:《UNIX 网络编程进程间通信》
0X03 线程取消
如果上面的函数在阻塞的时候,调用的线程被取消了,那么计数器就会出错,因为这个线程被取消了,理应不再持有这个「读写锁」,
但是「读写锁」的一些成员记录着等待的线程数,在取消的时候并没有做相应的处理,所以计数器出错
首先我们得知道到线程取消的函数调用是什么:
#include <pthread.h>
// 若成功则返回 0,若出错则为正的 Exxx 值
int pthread_cancel(pthread_t tid);
一个线程可以被同一进程内的任何其他线程取消,唯一参数就是线程的 tid
如果启动了多个线程执行某个任务,如果其中某个线程发现了一个错误,它和其他线程就有必要终止
因此我们就有这样的两个函数,用来在线程被取消、终止之前执行:
#include <pthread.h>
void pthread_cleanup_push(void (*function) (void *), void *arg);
void pthread_cleanup_pop(int execute);
前者被用来处理被取消(pthread_cancel)的情况,后者用来处理线程自愿终止(调用 pthread_exit 或线程自己结束)
更多的内容可以参考:《UNIX 网络编程进程间通信》8.5
完结撒花~
网友评论