美文网首页LinuxLinux学习之路
APUE读书笔记-11线程(4)

APUE读书笔记-11线程(4)

作者: QuietHeart | 来源:发表于2020-06-20 15:41 被阅读0次

6、线程同步

当多个线程共享同一片内存的时候,我们需要保证每个线程看到的数据是一致的。如果线程使用的变量没有被其他线程使用,那么不会存在一致性的问题。类似,如果一个变量是只读,那么多个线程同时访问也不会出现一致性的问题。然而当有一个线程可以修改这个变量,而这个变量同时也可以被其他的线程修改和读取的时候,我们需要在线程之间进行同步,来保证它们访问变量内存的内容的时候的数据是合法的。

当一个线程修改变量的时候,别的读取这个变量的线程会潜在地遭遇不一致的情况。在修改操作占用多于一个存储周期的处理器架构上面,这个情况在两次写周期之间进行读内存的时候很容易发生。虽然这个取决于处理器架构,但是一个可移植的程序不能对使用的处理器的架构做任何的假设。

文中先给出了一个简单的情况:

线程A:读-写-写
线程B:-----读----

当B的读发生在A的两个写周期之间的时候,A,B就存在不一致性的问题了。

图中先给了一个解决方案:规定在访问变量之前,先对变量进行加锁。这样当一进程持有锁的时候,其它申请锁将被阻塞。

然后又给出了一些其它导致不一致的情况的例子,具体参见参考资料以及其中的图示。

互斥信号量 (mutex)

通过使用 pthreads 中的互斥信号量接口,我们可以保护我们的数据,保证同一个时间,只有一个线程访问我们的数据。实际, mutex 就是我们访问共享资源设置的以及使用完共享资源时释放的锁。如果我们解锁 mutex 的时候有多余一个线程处于阻塞状态,那么所有在这个锁上面阻塞的线程都变成可执行,然后第一个运行的将会设置锁,其他的看到锁被设置了就继续返回阻塞等待锁的下一回释放了。这样,在一个时间里面,只有一个线程在执行。

要想使用互斥机制,我们需要自己设计数据访问规则。操作系统不会将我们的数据访问串行化。如果我们的一个线程访问数据的时候没有获取锁那么即使其他的线程加锁,也会出现不一致的情况。

mutex 变量用数据类型 pthread_mutex_t 数据类型替代,在我们使用 mutex 变量之前,我们必须首先将它用常量 PTHREAD_MUTEX_INITIALIZER 初始化(只用于静态分配的 mutex )或者用 pthread_mutex_init 初始化。如果我们动态分配 mutex (例如通过 malloc ),我们 需要在释放互斥量内存之前调用 pthread_mutex_destroy 函数

#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

返回:如果成功,两者返回0;如果失败,返回错误码。

我们可以把参数 attr 设置为 NULL 这样,就会使用默认的初始值。以后讨论非默认的 mutex 属性。

下面的函数用来对 mutex 进行加锁或者解锁。

#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

如果一个线程无法接受被阻塞,那么可以使用 pthread_mutex_trylock 有条件地添加锁。这样,如果调用 pthread_mutex_trylock 的时候 mutex 没有被上锁,那么将会正常一样上锁并且返回0;如果之前 mutex 被上了锁,那么 pthread_mutex_trylock 将会失败并且立即返回 EBUSY

举例:

#include <stdlib.h>
#include <pthread.h>
struct foo {
    int             f_count;
    pthread_mutex_t f_lock;
    /* ... more stuff here ... */
};

struct foo *foo_alloc(void) /* allocate the object */
{
    struct foo *fp;
    if ((fp = malloc(sizeof(struct foo))) != NULL) {
        fp->f_count = 1;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
            free(fp);
            return(NULL);
        }
        /* ... continue initialization ... */
    }
    return(fp);
}
void foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}
void foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    if (--fp->f_count == 0) { /* last reference */
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } else {
        pthread_mutex_unlock(&fp->f_lock);
    }
}

这个例子使用 mutex 来保护一个数据结构,当有多个线程访问一个动态分配的对象的时候,我们可以给这个对象内嵌一个引用计数保护对象不会在线程被访问的时候被释放。

在增加,减少,以及检查引用计数是否为0的时候,我们都会锁住 mutex 来保护它,最开始 foo_alloc 初始化的时设置引用计数为1的时候,不用设置这个锁保护,因为此时只有分配空间的那个线程引用它。如果这时候我们把这个结构放到一个链表中,那么它可以被其他线程找到,我们需要先为它加锁。

在使用这个对象之前,线程要增加这个结构对象的引用计数;用完之后要减少引用计数;当引用计数为0的时候,要释放结构对象的内存空间。

死锁的避免

当线程将要尝试对同一个信号两次加锁的时候,会产生死锁。但是实际上,由于mutex而产生死锁这个现象发生的很不明显。例如:我们在程序中使用了一个以上的互斥信号量,如果 第一个线程在持有第一个互斥信号量的时候再申请第二个互斥信号量,而第二个互斥信号量被第二个线程持有并且第二个线程想要加锁第一个互斥信号量; 这样两个线程都无法继续了,它们都互相等待对方持有的资源,这时发生的现象就叫做死锁。

死锁可以通过仔细控制信号量加锁的次序来避免。例如:假设你有两个互斥信号量A和B。如果所有的线程都首先给A加锁然后才给B加锁,那么对于这两个互斥信号量之间将不会发生死锁的现象(当然你有可能在其它的信号量上面发生死锁),只有当存在其它的线程对A,B加锁的次序相反的时候,才有可能会产生死锁。

有时一个应用程序的体系使得很难将一个特定顺序的加锁应用在它的身上。如果包含了足够的锁和数据结构,而你的函数还是无法用一个简单的方法来实现,那么应该换一个思路。这个时候,你兴许可以把你的锁释放,然后在稍后的一个时间尝试。你可以使用 pthread_mutex_trylock 来避免死锁。如果你已经成功的持有了 pthread_mutex_trylock ,那么你可以继续。如果没有,你可以释放你已经持有的资源,并且清理其它的工作,一会再尝试。

举例
具体的例子不多说了,参见参考资料的源代码。这里主要是给了两个例子,都使用两个信号量。为了避免死锁,在添加信号量的时候都按照相同的次序加锁。第一个例子锁的粒度比较细,导致程序代码结构有点复杂,但是性能应该更好;第二个例子锁的粒度比较粗,性能相对差一些,但是代码结构很简单。

读写锁

读写锁和互斥信号量类似,但是读写锁允许更高程度的并行。使用互斥信号量的状态只能是锁和非锁两种状态,并且在一个时间只有一个线程可以拥有锁。读写锁有三种可能的状态:读锁,写锁,和解锁。同一时刻只能有一个线程可以有写锁的状态,但是可以有多个线程处于读锁的状态。

当读写锁被处于写锁的时候,所有尝试加锁(无论是写锁还是读锁)的线程都会阻塞直到写锁释放;当处于读锁状态的时候,所有尝试加读锁的线程都会允许加锁,但任何尝试加写锁的线程都会被阻塞直到所有线程的读锁被释放。有一句不太确定的原句,如下:

Although implementations vary, readerwriter locks usually block additional readers if a lock is already held in read mode and a thread is blocked trying to acquire the lock in write mode. This prevents a constant stream of readers from starving waiting writers.

翻译不是很确定,大致是说:尽管实现不同,读写锁经常会在如下情况阻塞额外的读取者:当一个线程持有读锁,另外一个线程阻塞在获取写锁的时候。这样做的原因是,它可以防止大量读取操作导致一个写者无限等待。

读写锁适合读取操作比修改操作频繁的情况。读写锁也叫共享互斥锁。当一个读写所处于读锁状态的时候,它处于共享模式;当处于写锁状态的时候,它处于互斥模式。
和互斥信号量类似,读写锁也需要初始化之后才能使用。

#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

函数成功返回0,失败返回错误号码。

读写锁通过调用 pthread_rwlock_init 来进行初始化,如果使用默认的属性,我们可以给 attr 传递一个空指针,我们后面会讨论读写锁的属性。

在释放读写锁占用的内存之前,我们需要调用 pthread_rwlock_destroy 来清除它。 如果 pthread_rwlock_init 为读写锁分配了任何的内存,那么 pthread_rwlock_destroy 就会释放这些资源。如果我们 没有调用 pthread_rwlock_destroy 就直接释放读写锁的内存,那那么读写锁之前占用的那些额外的资源就会丢失。

为了让一个读写锁处于读模式,我们调用 pthread_rwlock_rdlock 函数;使它处于写模式,我们需要调用 pthread_rwlock_wrlock 。无论我们处于什么锁模式,我们都使用 pthread_rwlock_unlock 来释放读写锁。

#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

函数成功返回0,失败返回错误号码。
系统实现可能会对读写锁的共享模式数量有所限制,所以我们需要检查 pthread_rwlock_rdlock 的返回。尽管 pthread_rwlock_wrlockpthread_rwlock_unlock 有错误的返回码,如果我们设计妥当,我们就不需要检查其返回,只有我们不正确地使用它们的时候才会返回定义的错误码,例如使用一个没有初始化的锁,或者当我们请求了一个我们已经持有的锁导致死锁的时候。

Single UNIX Specification也定义了有条件的读写锁。

#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

正确返回0,失败返回错误号码。

如果能够获取到锁,这两个函数就会返回0,如果不能获取到锁,这两个函数就会返回错误码 EBUSY 。这些函数使用的情况和前面的类似。

举例:
参见相应的参考资料。这个例子是通过一个读写锁来保护一系列的工作请求队列。当有作业被插入,删除到队列中的时候,加写锁;如果只是查询队列中的作业,那么只需要读锁。

条件变量

条件变量是另外一个用于线程的同步机制。条件变量提供一个线程同步的点,当使用互斥信号量的时候,条件变量允许线程以一种无竞争的方式等待任何条件的发生。

条件本身被互斥信号量保护,线程改变条件状态的时候必须先锁住这个信号。其它线程在请求信号量之前,不会注意到条件的变化,因为锁住互斥信号量才能对条件进行检测。

使用条件变量之前,必须首先对这个条件变量进行初始化。条件变量使用数据结构 pthread_cond_t 来进行表示。我们可以把常量 PTHREAD_COND_INITIALIZER 分配给静态分配的条件变量,但是如果我们采用动态的方式分配条件变量那么我们使用 pthread_cond_init 函数对它进行初始化。

释放条件变量所占用的内存空间的之前 我们可以使用函数 pthread_mutex_destroy 对这个条件变量进行反初始化。

#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);

两者在成功的时候都返回0,如果失败会返回错误码。

这里如果想要创建一个使用默认的属性的条件变量,那么我们就给 pthread_cond_init 函数的 attr 参数传递 NULL 指针。

我们使用 pthread_cond_wait 来等待条件为 true 。另外,还有函数可以如果在一定的时间之内条件没有被满足,那么会返回一个错误号码到一个指定的变量中。

#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex,
                           const struct timespec *restrict timeout);

两个函数如果成功都返回0,如果失败则返回一个错误号码。

传递给函数 pthread_cond_wait 的互斥信号量 mutex 会保护这个条件。调用者把已经锁住的信号量传递给函数,这个函数原子性地把调用线程放到等待这个条件变量的线程等待队列上面,然后解锁这个互斥信号量。这样就把检测条件变量和线程为了等待条件变化而进入睡眠之间的时间窗口关闭了,这样线程不会错过条件的变化(因为检测到条件不行,才会解锁让其它线程有机会修改条件使之满足)。当 pthread_cond_wait 返回的时候, mutex 会再次被锁住(因为条件满足了,所以再次锁住,继续后面的操作)。(这里可能比较难理解,总之是在这个函数的内部先在检查完条件并且等待之后做了一步解锁操作,收到满足条件的通知之后继续执行准备返回但是返回前又加锁了,看后面的例子会比较容易明白)

函数 pthread_cond_timedwaitpthread_cond_wait 的功能类似,但是它设置了一个超时的机制,指定我们等待的时间。这个时间通过 timespec 结构来表示,

struct timespec {
    time_t tv_sec;   /* seconds */
    long   tv_nsec;  /* nanoseconds */
};

使用这个结构,我们需要使用绝对时间值来指定我们将要等待多久,而不是一个相对的时间值。例如,我们想要等待3分钟,我们不是给这个结构赋值为3分钟,而是把 now+3 这个时间赋值给它。

我们可以使用 gettimeofday 来获取使用 timeval 结构表示的当前时间,然后把这个结构转化成 timespec 结构,来获取绝对的时间值。

函数如下:

void maketimeout(struct timespec *tsp, long minutes)
{
    struct timeval now;
    /* 获取当前时间 */
    gettimeofday(&now);

    /*把timeval表示的时间转换成timespec结构表示的时间*/
    tsp->tv_sec = now.tv_sec;
    tsp->tv_nsec = now.tv_usec * 1000; /* 微秒转换成纳秒 */

    /* 为当前时间增加超时等待时长*/
    tsp->tv_sec += minutes * 60;
}

如果超时了条件也没有满足,那么 pthread_cond_timewait 将会重新请求互斥信号量并且返回 ETIMEDOUT 。当 pthread_cond_waitpthread_cond_timedwait 成功返回的时候,需要一个线程重新估计条件值,因为可能另外有线程已经运行并且改变了条件。

有两个函数用来通知线程一个条件已经被满足了。 pthread_cond_signal 函数将会唤醒一个等待在一个条件上面的线程; pthread_cond_broadcast 函数将会唤醒所有等待一个条件的线程。

POSIX标准允许 pthread_cond_signal 的实现唤醒不止一个线程,这样会使得实现更为简单。

#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

两者如果成功返回0,如果失败返回错误号码。

当我们调用 pthread_cond_signal 或者 pthread_cond_broadcast 的时候,也就是说我们将会给线程或者条件发送信号。我们需要足够地仔细,只能在修改了条件状态的时候才给线程发送信号。

举例:条件变量的使用方法如下:

#include <pthread.h>
struct msg {
    struct msg *m_next;
    /* ... more stuff here ... */
};
struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

void process_msg(void) {
    struct msg *mp;
    for (;;) {
        pthread_mutex_lock(&qlock);/*这里是互斥相关,因为需要访问工作队列,所以进行操作之前首先上锁,保证其他线程不能再修改了*/
        while (workq == NULL)
            pthread_cond_wait(&qready, &qlock);/*这里是同步相关,发现队列为空,所以在相应的条件变量上面等待。等待函数的内部实际做的操作是检测并且将线程置于等待队列之后再解开锁便于其它线程修改工作队列使条件满足*/

        mp = workq;/*到这里表示刚才解锁等待的时候有线程修改了工作队列并且通知本线程条件满足了,于是从前面的等待函数中返回,并且返回之前再将刚才解开的锁重新加上,防止之后的修改期间又有其他线程干扰*/
        workq = mp->m_next;
        pthread_mutex_unlock(&qlock);/*修改之后真正地解开锁*/
        /* now process the message mp */
    }
}

void enqueue_msg(struct msg *mp) {
    pthread_mutex_lock(&qlock);/*这里是互斥相关,准备修改工作队列,所以加锁*/
    mp->m_next = workq;
    workq = mp;
    pthread_mutex_unlock(&qlock);
    pthread_cond_signal(&qready);/*这里是同步相关,通知队列状态的变化给等待的线程*/
}

上面的例子,展示了如何使用条件变量和互斥信号量一起来实现线程之间的同步。

条件用来表示工作队列(work queue)的状态。我们通过互斥信号量来保护条件并且通过一个 while 循环来对条件进行检测。当我们把一个消息放到工作队列上(work queue)的时候,我们需要持有这个互斥信号量,但是我们再条件满足通知等待线程的时候不需要持有这个互斥信号量。只要线程在我们调用 cond_signal 之前将消息推送至工作队列,我们就可以释放互斥信号量。因为我们是在一个 while 循环中检查这个条件,所以不会导致问题:线程将会醒来,发现队列还是空的,然后又继续进入等待状态了。如果代码无法忍受这个竞争,那么我们将需要在发送信号给线程的时候也持有这个锁。

译者注:这里无法忍受这个竞争的意思是,比如没有那个 while 循环。具体参见后面的: (1)关于使用循环 while 来检测条件变量的条件 (参见第6.5.1.1节)

译者注

评论与思考

关于使用循环 while 来检测条件变量的条件

注意:

while (workq == NULL)
             pthread_cond_wait(&qready, &qlock);

这里用 while 而不用 if 来循环判断是否等待,据我了解,是因为使用 if ,那从 pthread_cond_wait 返回后,可能 workq 不满足条件了,因为可能有多个线程使用这段代码,这段代码后续操作都是基于 "workq==NULL" 这个条件,第一个执行这段代码的线程返回的当然正常操作,后面的就不行了。

具体来说,如果不带 while 进行检测,这个时候,代码的过程就变成:

  1. 等待队列非空的线程1在等待时立即进入等待;
  2. 增加队列元素的线程2先释放锁,再发送信号通知线程1队列非空;
  3. 等待队列非空的线程1收到信号立即加锁并从等待返回,然后不再循环检测条件而直接访问新添加的元素。

这里,在线程2释放锁之后、发送信号前的时间窗期间,若有其它线程3加锁访问了队列,并且恰巧是删除队列元素使队列为空,这个时候线程2才发送信号通知线程1队列非空,这时候通知的已经是错误的信息,线程1就从等待中返回来,然后在线程3解锁后加锁成功,它误以为队列非空而访问再次变空的队列导致问题了。

一个不错的应用

线程池,实现很多,可自行搜索threadpool相关代码,或参见:http://www.oschina.net/p/threadpool
关键:
1、有一个队列,长度为M,每个队列元素是待执行的任务对应的函数。
2、启动N个线程,这N个线程执行添加的任务(即队列中的函数),每个线程是一个死循环,大致做的事情是:
每当队列中有元素则其中的某个空闲线程读取队列元素,并执行相应的函数;当队列为空则等待队列非空。

关于 pthread_conf_signal 函数应当在 unlock 之前还是之后

有这个文章:http://blog.csdn.net/xjtuse_mal/article/details/5413101

其中主要部分:

1)线程1获取mutex,在进行数据处理的时候,线程2也想获取mutex,但是此时被线程1所占用,线程2进入休眠,等待mutex被释放。
2)线程1做完数据处理后,调用pthread_cond_signal()唤醒等待队列中某个线程,在本例中也就是线程2。线程1在调用 pthread_mutex_unlock()前,因为系统调度的原因,线程2获取使用CPU的权利,那么它就想要开始处理数据,但是在开始处理之 前,mutex必须被获取,很遗憾,线程1正在使用mutex,所以线程2被迫再次进入休眠。
3)然后就是线程1执行pthread_mutex_unlock()后,线程2方能被再次唤醒。
从这里看,使用的效率是比较低的,如果再多线程环境中,这种情况频繁发生的话,是一件比较痛苦的事情。
所以觉得,如果程序不关心线程可预知的调度行为,那么最好在锁定区域以外调用他们吧:-)

原文参考

参考: APUE2/ch11lev1sec6.html

7、总结

这一章里面我们介绍了线程相关的内容,讨论了创建和销毁一个线程的POSIX相关函数。我们也介绍了线程的同步。我们讨论了三个基本的同步机制,互斥信号量,读写锁,以及条件变量,同时我们也看到了我们是如何利用它们来保护共享资源的。

译者注

原文参考

参考: APUE2/ch11lev1sec7.html

相关文章

  • APUE读书笔记-11线程(4)

    6、线程同步 当多个线程共享同一片内存的时候,我们需要保证每个线程看到的数据是一致的。如果线程使用的变量没有被其他...

  • APUE读书笔记-11线程(1)

    1、简介 我们前面讨论了进程,知道了unix的进程环境,进程之间的关系以及控制进程的方法。我们可以看到,进程间可以...

  • APUE读书笔记-11线程(2)

    4、线程创建 传统的 unix 进程模型,只支持每个进程只有一个线程控制。在概念上来说,这和基于线程模型的只有一个...

  • APUE读书笔记-11线程(3)

    5、线程终止 当进程的任何一个线程调用 exit , _exit 或者 _Exit 的时候,整个进程都会被终止。类...

  • APUE第11章 线程

    10.1 引言 在前面的章节中讨论了进程,学习了UNIX进程的环境、进程间的 关系以及控制进程的不同方式。可以看到...

  • 【APUE】线程

  • APUE线程

    十一章 线程 多线程可以实现在某一个时刻能够做不止一件事,每个线程处理各自独立的任务 好处如下通过为每种事件类型分...

  • APUE读书笔记-12线程控制(4)

    6、线程特定数据 这里的线程特定数据也就是线程私有数据,是一种只存取指定线程相关的数据的一种机制。使用线程私有数据...

  • 信号函数编写研究

    APUE读书笔记 ToDoList [ ] sleep初级实现 2017年12月6日 10:41:30 [ ] ...

  • APUE3--chapter11线程

    11线程 每个线程都包含有表示执行环境所必须的信息:线程ID、一组寄存器值、栈、调度优先级和策略、信号屏蔽字、er...

网友评论

    本文标题:APUE读书笔记-11线程(4)

    本文链接:https://www.haomeiwen.com/subject/vtocxktx.html