11线程
每个线程都包含有表示执行环境所必须的信息:线程ID、一组寄存器值、栈、调度优先级和策略、信号屏蔽字、errno变量、线程私有数据。
11.1 线程ID
进程ID在整个系统中是唯一的,但线程ID只在其所属的进程上下文中才有意义。
线程识别/比较:
进程ID用pid_t数据类型表示,pid_t是个非负整数;
线程ID用pthread_t数据类型表示,实现时用一个结构表示pthread_t,
故可移植性操作的系统实现不能将其作为整数处理,
故须用一个函数来比较两个线程ID:
#include <pthread.h>
int pthread_equal(pthread_t tid1, pthread_t tid2);//相等返回非0值,否则返回0
线程可调用pthread_self 函数获得自身线程ID:
#include <pthread.h>
pthread_t pthread_self(void); // 返回线程的线程ID
当线程要识别线程ID时,pthread_self函数可与pthread_equal函数一起用,eg:用线程ID控制每个线程所处理的作业

如上图,
主线程可把做作业放在队列中,由3个线程组成的线程池从队列中取出作业。
主线程不允许每个线程任意处理从队列顶端取出的作业,而是由主线程控制作业的分配,
主线程在每个待处理作业的结构中放置处理该作业的线程ID,
每个线程只能取出标有自己线程ID的作业。
11.2线程创建
传统Unix进程模型中,每个进程只有一个控制线程。
概念上讲,与基于线程的模型中每个进程只包含一个线程是相同的。
POSIX线程(pthread)中,程序开始运行时,它也是以单进程中控制线程启动的;
即创建多个控制线程前,程序的行为与传统进程没什么区别。
可用pthread_create函数创建新线程:
#include <pthread.h>
int pthread_create(pthread_t* restrict tidp, // 成功则返回0,否则返回错误编号
const pthread_addr_t* restrict attr,
void* (*start_rtn)(void* ), void* restrict arg);
当pthread_create成功返回时,新创建的线程ID会被设置为tidp指向的内存单元。
attr参数用于定制不同的线程属性。
新创建的线程从start_rtn函数地址开始运行,该函数只有一个无类型指针参数arg;
若要想start_rtn函数传递多个参数,则需将参数放置结构中,将该结构的地址作为arg参数传入。
eg:
// 创建一个线程,打印进程ID、新线程的线程ID即初始线程的线程ID
#include "apue.h"
#include <pthread.h>
pthread_t ntid;
void printids(const char* s)
{
pid_t pid;
pthread_t tid;
pid = getpid();
tid = pthread_self();
printf("%s pid %lu tid %lu (0x%lx)\n", s, (unsigned long)pid,
(unsigned long)tid, (unsigned long)tid);
}
void* thr_fn(void* arg)
{
printids("new thread: ");
return ((void*)0);
}
int main(void)
{
int err;
err = pthread_create(&ntid, NULL, thr_fn, NULL);
if(err != 0)
{
err_exit(err, "can't create thread");
}
printids("main thread");
sleep(1);
exit(0);
}
该例有两个特别之处,要处理主线程和新线程之间的竞争:
1. 主线程要休眠,若主线程不休眠则它可能退出,
如此新线程还没机会运行整个进程就可能终止了,
这种行为依赖于OS中的线程实现和调度算法。
2. 新线程是通过pthread_self()获取自己的线程ID的,
而不是从共享内存中读取,或从线程的启动例程中以参数形式接受。
此例中,主线程把新线程ID放在ntid中,但新线程不能安全地使用它,
若新线程在主线程调用pthread_create函数返回之前运行了,
那新线程看到的是未经初始化的ntid的内容,此内容并不是正确的线程ID。
如所料,结果中两个线程的进程ID相同而线程ID不同。
output:

11.3线程终止
若进程中任意线程调用了exit、_Exit或_exit,那整个进程将终止。
在不终止整个进程情况下,单个线程有3中方式退出以停止其控制流:
1. 线程可简单的从启动线程返回,返回值是 线程的退出码。
2. 线程可被同一进程中的其它线程取消。
3. 线程调用pthread_exit函数。
#include <pthread.h>
void pthread_exit(void* rval_ptr);
rval_ptr是一个无类型指针,类似与传给启动线程的单个参数;
进程中的其它线程可通过pthread_join函数访问该指针。
#include <pthread.h>
int pthread_join(pthread_t thread,
void** rval_ptr);// 若成功则返回0,否则返回错误编号
调用线程将一直阻塞直到指定的线程调用pthread_exit从启动线程中返回或被取消:
若线程从启动线程中返回,rval_ptr就包含返回码;
若线程被取消,则rval_ptr指定的内存单元被设置为PTHREAD_CANCELD;
可调用pthread_join自动将线程置于分离状态,如此资源得以恢复:
若线程已经处于分离状态,pthread_join调用会失败,返回EINVAL;
若对线程返回值不在意,可将rval_ptr设置为NULL,
如此调用pthread_join可等待指定的线程终止,但并不获取线程的终止状态。
即线程调用pthread_exit,进程中其它线程可用pthread_join获得该线程的退出状态:
// 展示如何获取已终止的线程的退出码
#include "apue.h"
#include <pthread.h>
void* thr_fn1(void* arg)
{
printf("thread 1 returning\n");
return ((void*)1);
}
void * thr_fn2(void* arg)
{
printf("thread 2 exiting\n");
pthread_exit((void*)2);
}
int main(void)
{
int err;
pthread_t tid1, tid2;
void* tret;
err = pthread_create(&tid1, NULL, thr_fn1, NULL);
if (err != 0)
{
err_exit(err, "can't create thread 1");
}
err = pthread_create(&tid2, NULL, thr_fn2, NULL);
if (err != 0)
{
err_exit(err, "can't create thread 2");
}
err = pthread_join(tid1, &tret);
if (err != 0)
{
err_exit(err, "can't create join with thread 1");
}
printf("thread 1 exit code %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
if(err != 0)
{
err_exit(err, "can't join with thread 2");
}
printf("thread 2 exit code %ld\n", (long)tret);
exit(0);
}
output:

note:pthread_create和pthread_exit的无类型指针参数可传递包含复杂信息的结构的地址,但该结构所用的内存在调用这完成调用后必须仍然有效:
eg:调用线程的栈上分配了该结构,则其它线程在使用该结构时内存内容可能变了,
或
线程在自己栈上分配了结构,让后把指向该结构的指针传给pthread_exit,
则调动pthread_join的线程使用该结构时,该栈可能被撤销了。
eg:
// 展示 用自动变量(分配在栈上)作为pthread_exit的参数时出现的问题
#include "apue.h"
#include <pthread.h>
struct foo
{
int a, b, c, d;
};
void printfoo(const char* s, const struct foo* fp)
{
printf("%s", s);
printf(" structure at 0x%lx\n", (unsigned long)fp);
printf(" foo.a = %d\n", fp->a);
printf(" foo.b = %d\n", fp->b);
printf(" foo.c = %d\n", fp->c);
printf(" foo.d = %d\n", fp->d);
}
void* thr_fn1(void* arg)
{
struct foo foo = {1, 2, 3, 4};
printfoo("thread 1:\n", &foo);
pthread_exit((void*)&foo);
}
void* thr_fn2(void* arg)
{
printf("thread 2: ID is %lu\n", (unsigned long)pthread_self());
pthread_exit((void*)0);
}
int main(void)
{
int err;
pthread_t tid1, tid2;
struct foo *fp;
err = pthread_create(&tid1, NULL, thr_fn1, NULL);
if (err != 0)
{
err_exit(err, "can't create thread 1");
}
err = pthread_join(tid1, (void*)&fp);
if (err != 0)
{
err_exit(err, "can't join with thread 1");
}
sleep(1);
printf("parent starting second thread\n");
err = pthread_create(&tid2, NULL, thr_fn2, NULL);
if (err != 0)
{
err_exit(err, "can't create thread 2");
}
sleep(1);
printfoo("parent: \n",fp);
exit(0);
}
output:结构据内存体系结构、编译器及线程库的实现而不同。

由结果知,主线程访问该结构时,结构的内容已变,解决此问题,可用全局结构或malloc分配结构。
线程可用pthread_cancel函数 请求 取消同一进程中的其它线程:
#include <pthread.h>
int pthread_cancel(pthread_t tid); //若成功返回0,否则返回错误编号
默认情况,pthread_cancel函数会使得由tid标识的线程的行为表现为
调用了PTHREAD_CANCELED的pthread_exit函数,
但线程可选择忽略取消或控制被取消。
即pthread_cancel并不等待线程终止,仅仅是提出请求。
线程清理处理程序:
线程可安排在其退出时需要调用的函数,这与进程退出时可用atexit函数安排退出类似。
这样的函数称为线程清理处理程序。
一个线程可建立多个清理处理程序,处理程序记录在栈中,即执行顺序与注册时相反。
#include <pthread.h>
void pthread_cleanup_push(void (*rtn)(void*), void* arg);
void pthread_cleanup_pop(int execute);
当线程执行以下动作时,
清理函数rtn由pthread_cleanup_push调度,调度时只有一个参数arg:
1. 调用pthread_exit时
2. 响应取消请求时
3. 用非零excute参数调用pthread_cleanup_pop时。
若execute设置为为0 ,清理函数将不被调用,
pthread_cleanup_pop会将删除上次pthread_cleanup_push建立的清理处理程序。
eg:使用线程清理处理程序
// 展示 如何使用线程清理处理程序
#include "apue.h"
#include <pthread.h>
void cleanup(void* arg)
{
printf("cleanup: %s\n", (char*)arg);
}
void* thr_fn1(void* arg)
{
printf("thread 1 start\n");
pthread_cleanup_push(cleanup, "thread 1 first handler");
pthread_cleanup_push(cleanup, "thread 1 second handler");
printf("thread 1 push complete\n");
if (arg)
{
return ((void*)1);
}
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
return ((void*)1);
}
void* thr_fn2(void* arg)
{
printf("thread 2 start\n");
pthread_cleanup_push(cleanup, "thread 2 first handler");
pthread_cleanup_push(cleanup, "thread 2 second handler");
printf("thread 2 push complete\n");
if (arg)
{
pthread_exit((void*)2);
}
pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
pthread_exit((void*)2);
}
int main(void)
{
int err;
pthread_t tid1, tid2;
void* tret;
err = pthread_create(&tid1, NULL, thr_fn1, (void*)1);
if (err != 0)
{
err_exit(err, "can't create thread 2");
}
err = pthread_create(&tid2, NULL, thr_fn2, (void*)1);
if (err != 0)
{
err_exit(err, "can't create thread 2");
}
err = pthread_join(tid1, &tret);
if (err != 0)
{
err_exit(err, "can't join with thread 1");
}
printf("thread 1 exit code %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
if (err != 0)
{
err_exit(err, "can't join with thread 2");
}
printf("thread 2 exit code %ld\n", (long)tret);
exit(0);
}
output:

输出结果中各行前后顺序可能不同,但确定的一点是:只有thread 2的清理程序被调用了。
因此,若线程通过从它的启动线程中返回或退出的话,它的清理处理程序就不会被调用。
线程函数和进程函数之间的相似:

默认情况线程终止状态会保存直到对该线程调用pthread_join:
但若线程已被分离,线程底层存储资源可在线程终止时被收回。
对分离状态的线程调用pthread_join会产生未定义行为,但可调用pthread_detach分离线程。
#include <pthread.h>
int pthread_detach(pthread_t tid); // 成功则返回0,否则返回错误编号
11.4 线程同步
增量操作通常分解为3步即3个原语:
1. 从内存单元读入寄存器
2. 在寄存器中对变量做增量操作
3. 把新的值写入内存单元
若两个或多个线程试图几乎在同一时间内对同一变量做增量操作而不进行同步的话,由增量操作的3个原语可知获取的结果可能不一致。
11.4.1 互斥量(mutex)
互斥量本质上是一把锁,在访问共享资源前对互斥量进行设置加锁,访问完成后释放互斥量解锁。
互斥变量用pthread_mutex_t数据类型表示的,使用互斥变量之前要初始化:
互斥变量初始化,可将其设置为常量
PTHREAD_MUTEX_INITIALIZER(只适用于静态分配的互斥量),
也可通过pthread_mutex_init函数进行初始化。
若动态分配互斥量(如malloc),在释放内存前须调用pthread_mutex_destory。
#include <pthread.h>
int pthread_mutex_init(pthread_mutex* restrict mutex,
const pthread_mutexattr_t* restrict sttr);
int pthread_mutex_destory(pthread_mutex_t* mutex);
// 两个函数,若成功返回0,否则返回错误编号
若用默认属性初始化互斥量,只需把attr设为NULL。
对互斥量加锁/解锁:
对互斥量加锁要调用pthread_mutex_lock;
若互斥量已加锁,调用线程将阻塞知道互斥量解锁。
对互斥量解锁要调用pthread_mutex_unlock。
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_trylock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
// 三个函数:若成功返回0,否则返回错误编号
若不希望线程被阻塞,可用pthread_mutex_trylock尝试对互斥量加锁,若互斥量是未加锁状态,则pthread_mutex_trylock将对互斥量加锁,不阻塞并返回0,否则pthread_mutex_trylock失败,不能锁住互斥量并返回EBUSY。
eg:
// 展示用于保护某数据结构的互斥量:
// 当两个及以上的线程要访问动态分配的对象,
// 可在对象中嵌入引用计数,
// 以确保所有使用该对象的线程完成数据访问之前,该对象内存空间不会被释放
#include <stdlib.h>
#include <pthread.h>
strcut foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
/* ... more stuff here ... */
};
struct foo* foo_alloc(int id) // allocate the object
{
struct foo* fp;
if ((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(p);
return (NULL);
}
/* ... continue initialization ...*/
}
return (fp);
}
void foo_hold(struct foo* fp) // add a reference to the object
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->lock);
}
void foo_rele(struct foo* fp) // release a reference to the object
{
pthread_mutex_lock(&fp->f_lock);
if(--fp->f_count == 0) // last reference
{
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destory(&fp->f_lock);
free(fp);
}
else
{
pthread_mutex_unlock(&fp->f_lock);
}
}
程序说明:
在对引用计数增减及检查是否为0时要加锁。
在将引用计数初始化为1时没必要加锁,因为该操作之前分配线程是唯一引用该对象的线程。
但在此之后若将该对象放到一个列表中,则它可能被其它线程发现,故须加锁。
使用该对象前须对引用计数加1,使用完毕后释放引用;
当最后一个引用释放时,对象所占内存空间被释放。
11.4.2避免死锁
若线程试图对同一互斥量加锁两次,那它自身会陷入死锁状态。除此还有其它方式可产生死锁,eg一个线程制图锁住另一线程以相反顺序锁住的互斥量:
程序使用多个互斥量时,
若允许一个线程一直占有第一个互斥量且在试图对第二个互斥量加锁时处于阻塞状态,
但拥有第二个互斥量的线程也在试图对第一个互斥量加锁,
如此这两个线程产生死锁。
此种情况下可用pthread_mutex_trylock避免死锁:
若已占有某些锁且pthread_mutex_trylock返回成功,则可继续前进,
但若返回失败即不能获取锁,则可先释放已占有的锁,做好清理工作,过段时间再试。
eg:
// 展示两个互斥量的使用方法:
// 在使用两个互斥量时,总是让它们以相同顺序加锁,以避免死锁。
#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)
struct foo* fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo
{
int f_count;
pthread_mutex_t f_lock;
int f_id;
struct foo* f_next; /* protected by hashlock */
/* ... more stuff here ... */
};
struct foo* foo_alloc(int id) /* allocate the object */
{
struct foo* fp;
int idx;
if ((fp = malloc(sizeof(struct foo))) != NULL)
{
fp->f_count = 1;
fp->f_id = id;
if (pthread_mutex_init(&fp->f_lock, NULL) != 0)
{
free(fp);
return (NULL);
}
idx = HASH(id);
pthread_mutex_lock(&hashlock);
fp->f_next = fh[idx];
fh[idx] = fp;
pthread_mutex_unlock(&hashlock);
/* ... continue initialization ...*/
pthread_mutex_unlock(&fp->f_lock);
}
return(fp);
}
void* foo_hold(struct foo* fp) /* add a reference to the object */
{
pthread_mutex_lock(&fp->f_lock);
fp->f_count++;
pthread_mutex_unlock(&fp->f_lock);
}
struct foo* foo_find(int id) /* find an existing object */
{
struct foo* fp;
pthread_mutex_lock(&hashlock);
for(fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next)
{
if(fp->f_id == id)
{
foo_hold(fp);
break;
}
}
pthread_mutex_unlock(&hashlock);
return (fp);
}
void foo_rele(struct foo* fp) /* release a reference to the object */
{
struct foo* tfp;
int idx;
pthread_mutex_lock(&fp->f_lock);
if (fp->f_count == 1) /* last reference */
{
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_lock(&hashlock);
pthread_mutex_lock(&fp->f_lock);
/* need to recheck the condition */
if (fp->f_count != 1)
{
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_unlock(&hashlock);
return;
}
/* remove from list */
idx = HASH(fp->f_id);
tfp = fh[idx];
if (tfp == fp)
{
fh[idx] = fp->f_next;
}
else
{
while (tfp->f_next != fp)
{
tfp = tfp->f_next;
}
tfp->f_next = fp->f_next;
}
pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock);
pthread_mutex_destory(&fp->f_lock);
free(fp);
}
else
{
fp->f_count--;
pthread_mutex_unlock(&fp->f_lock);
}
}
程序说明:
第二个互斥量维护者一个用于跟踪foo数据结构的散列表,
如此hashlock可同时保护foo结构中散列表fh和f_next,
foo结构中f_lock保护对foo结构中其它字段的访问。
11.4.3pthread_mutex_timedlock
当线程试图获取一个已加锁的互斥量时,pthread_mutex_timelock互斥量原语允许绑定线程阻塞时间。
pthread_mutex_timedlock和pthread_mutex_lock基本等价,但
在达到超时时间值时,pthread_mutex_timedlock不会对互斥量加锁,而返回错误码ETIMEDOUT。
#include <pthread.h>
#include <time.h>
int pthread_mutex_timedlock(pthread_mutex_t* restrict mutex,
const struct timespec* restrict tsptr); //成功返回0,否则返回错误编号
超时指定等待的绝对时间。超时时间用timespec结构表示,其用秒和纳秒描述时间。
eg:
// 用pthread_mutex_timelock避免永久阻塞
#include "apue.h"
#include <time.h>
#include <pthread.h>
int main(void)
{
int err;
struct timespec tout;
struct tm* tmp;
char buf[64];
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock);
printf("mutex is locked\n");
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("current time is %s\n", buf);
tout.tv_sec += 10; /* 10 seconds from now */
/* caution: this could lead to deadlock */
err = pthread_mutex_timedlock(&lock, &tout);
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("the time is now %s\n", buf);
if(err == 0)
{
printf("mutex locked again!\n");
}
else
{
printf("can't lcok mutex again:%s\n", strerror(err));
}
exit(0);
}
程序说明:
该程序对已有互斥量进行再次加锁,目的为演示pthread_mutex_timedlock工作。
output:

note:阻塞时间可能不同,造成原因很多:
开始时间可能在某秒的中间位置;
系统时钟的精度可能不足以精确到 支持指定的超时时间值;
在程序继续运行前,调度延迟可能增加时间值。
11.4.4读写锁(reader-writer lock)
读写锁与互斥量类似,但读写锁允许更高的并行性。
互斥锁或是 锁住状态 或是 不加锁状态,且一次只能有一个线程对其加锁。
读写锁有3种状态:
读模式加锁状态;
写模式加锁状态;
不加锁状态。
一次只能有一个线程占有写模式的读写锁,但是多个线程可同时占有读模式的读写锁:
当读写锁是写模式加锁时,在被解锁之前,所有试图对该锁加锁的线程都将被阻塞;
当读写锁是读模式加锁时,
所有试图以 读模式 对该锁加锁的线程都可得到访问权;
但任何以 写模式 对该锁加锁的线程将被阻塞,知道所有 线程释放读锁。
虽然各OS对读写锁的实现不尽相同,但当读写锁处于读模式加锁状态,
而此时有个线程试图以写模式获取锁,读模式通常会阻塞后续的读模式请求,
以避免读模式长期占用读写锁,而等待的写模式请求一直得不到满足。
读写锁又名 共享互斥锁(shared-execlusive lock):
当读写锁是读模式加锁时,可说成是以共享模式加锁的;
当读写锁写模式加锁时,可说是以 互斥模式加锁的。
使用读写锁前须初始化,释放其底层的内存前须销毁。
#include <pthread.h>
int pthread_rwlock_init(pthread_rwlcok_t* restrict rwlock,
const pthread_rwlockattr_t* restrict attr);
int pthread_rwlock_destory(pthread_rwlock_t* rwlock);
// 两函数:成功返回0,否则返回错误编号
读写锁的加锁解锁:
1. 读模式加锁,须调用pthread_rwlock_rdlock;
2. 写模式加锁,须调用pthread_rwlock_wrlock;
3.解锁,可调用pthread_rwlock_unlock。
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t* rwlock);
// 所有函数:成功则返回0.否则返回错误编号
不同的实现可能会对共享模式下可获取的读写锁的次数有限制,故需检查pthread_rwlock_rdlock的返回值。
而且从技术而言,调用函数时应该总是检查错误返回。
Single UNIX Specification还定义了读写锁原语的条件版本:
#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock);
// 两函数:成功则返回0,否则返回错误编号
eg:
// 展示读写锁的使用:作业请求队列由单个读写锁保护
// 多个工作线程获取单个主线程分配的作业
#include <stdlib.h>
#include <pthread.h>
struct job
{
struct job* j_next;
struct job* j_prev;
pthread_t j_id; /* tells which thread handles this job */
/* ... more stuff here ... */
};
struct queue
{
struct job* q_head;
struct job* q_tail;
pthread_rwlock_t q_lock;
};
/* Initailize a queue */
int queue_init(struct queue* qp)
{
int err;
qp->q_head = NULL;
qp->q_tail = NULL;
err = pthread_rwlock_init(&qp->q_lock, NULL);
if(err != 0)
{
return err;
}
/* ... continue initialization ... */
return 0;
}
/* Insert a job at the head of the queue */
void job_insert(struct queue* qp, struct job* jp)
{
pthread_rwlock_wrlock(&qp->qlock);
jp->j_next = qp->head;
jp->j_prev = NULL;
if (qp->q_head != NULL)
{
qp->q_head->j_prev = jp;
}
else
{
qp->q_tail = jp; /* list was empty */
}
qp->q_head = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
/* Append a job on the tail of the queue */
void job_append(struct queue* qp, struct job* jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
jp->j_next = NULL;
jp->j_prev = qp->q_tail;
if(qp->q_tail != NULL)
{
qp->q_tail->j_next = jp;
}
else
{
qp->q_head = jp; /* list was empty */
}
qp->q_tail = jp;
pthread_rwlock_unlock(&qp->q_lock);
}
/* Remove the given job from a queue */
void job_remove(struct queue* qp, struct job* jp)
{
pthread_rwlock_wrlock(&qp->q_lock);
if(jp == qp->q_head)
{
qp->q_head = jp->j_next;
if(qp->q_tail == jp)
{
qp->q_tail = NULL;
}
else
{
jp->j_next->j_prev = jp->j_prev;
}
}
else if(jp == qp->q_tail)
{
qp->q_tail = jp->q_tail;
jp->j_prev->j_next = jp->next;
}
else
{
jp->j_prev->j_next = jp->next;
jp->j_next->j_prev = jp->j_prev;
}
pthread_rwlock_unlock(&qp->q_lock);
}
/* Find a job for the given thread ID */
struct job* job_find(struct queue* qp, pthread_t id)
{
struct job* jp;
if(pthread_rwlock_rdlock(&qp->q_lock) != 0 )
{
return NULL;
}
for (jp = qp->q_head; jp != NULL; jp = jp->j_next)
{
if(pthread_equal(jp->j_id, id))
{
break;
}
}
pthread_rwlock_unlock(&qp->q_lock);
return jp;
}
程序说明:
此例中,向队列中增加作业或删除作业时,都采用了写模式加锁;
而搜索队列时,采用读模式加锁,且允许所有线程并发搜索。
超时的读写锁:
与互斥量类似,single Unix specification提供了带有超时的读写锁加锁函数,以避免永久阻塞。
#include <pthread.h>
#include <time.h>
int pthread_rwlock_timedrdlock(pthread_rwlock_t* restrict rwlock,
const struct timespec* restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t* restrict rwlock,
const struct timespec* restrict tsptr);
// 两函数:成功则返回0,否则返回错误编号(ETIMEDOUT)
11.4.5 条件变量
条件变量是线程可用的另一种同步机制,但对互斥量加锁后才能改变条件变量:
使用条件变量前,须先初始化,
由pthread_cond_t数据类型表示的条件变量可用两种方式初始化:
可用PTHREAD_COND_INITIALIZER赋值给静态分配的条件变量;
若是动态分配的条件变量,需用函数pthread_cond_init初始化。
释放条件变量底层的内存空间之前,
可用pthread_cond_destory函数对条件变量进行反初始化(deinitialize)。
#include <pthread.h>
int pthread_cond_init(pthread_cond_t* restrict cond,
const pthread_condattr_t* restrict attr);
int pthread_cond_destory(pthread_cond_t* cond);
//两函数:成功则返回0,否则返回错误编号。
可用pthread_cond_wait等待条件变量变为真,若在给定时间内条件不能满足则返回一个错误码:
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t* restrict cond,
pthread_mutex_t* restrict mutex);
int pthread_cond_timedwait(pthread_cond_t* restrict cond,
pthread_mutex_t* mutex,
const struct timespec* restrict tsptr);
//两函数:成功则返回0,否则返回错误编号
传递给pthread_cond_wait的互斥量对条件进行保护。
调用者把锁住的互斥量传给函数,
函数然后自动把调用线程放到等待条件的线程列表上,对互斥量解锁。
pthread_cond_wait返回时互斥量会再次被加锁。
pthread_cond_timedwait类似。
并非所有平台都支持clock_gettime函数以获取timespec结构表示当前时间,
可用另一函数gettimeofday获取timeval结构表示当前时间,然后将该时间转换成timespec结构,
eg得到超时值的绝对时间,可用如下函数:
#include <sys/time.h>
#include <stdlib.h>
void maketimeout(struct timespec* tsp, long minutes)
{
strcut timeval now;
/* get the current time */
gettimeofday(&now, NULL);
tsp->tv_sec = now.tv_sec;
tsp->tv_nsec = now.tv_usec*1000; /* usec to nsec */
/* add the offset to get timeout value */
tsp->tv_sec += minutes*60;
}
有两个函数可通知线程条件已满足:
pthread_cond_signal至少唤醒一个等待该条件的线程;
而pthread_cond_broadcast唤醒等待该条件的额所有线程。
#include <pthread.h>
int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);
//两函数:成功返回0.否则返回错误编号
note:须在改变条件状态以后再给线程发信号。
eg:
// 结合使用条件变量和互斥量对线程进行同步
#include <pthread.h>
struct msg
{
struct msg *m_next;
/* ... more stuff here ... */
};
struct msg *workg;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
void process_msg (void)
{
struct msg *mp;
for(;;)
{
pthread_mutex_lock(&qlock);
while( workq == NULL)
{
pthread_cond_wait(&qready, &qlock);
}
mp = workq;
workq = mp->m_next;
pthread_mutex_unlock(&qlock);
/* now process the message mp */
}
}
void enqueue_msg (struct msg *mp)
{
pthread_mutex_lock(&qlock);
mp->m_next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
pthread_cond_signal(&qready);
}
程序说明:
条件是工作队列的状态,用互斥量保护条件,在while中判断条件。
把消息放到工作队列时需占有互斥量,但给等待线程发信号时不需占有互斥量。
11.4.6自旋锁
与互斥量类似,但不是通过休眠使进程阻塞,而是在获取锁之前一直忙等(自旋)阻塞状态。
自旋锁使用情况:锁被占有的持续时间短,且线程不希望在重新调度上花费太多成本。
自旋锁常作为底层原语用于实现其它类型的锁。
自旋锁用在非抢占式内核中很有用:除了提供互斥机制还会阻塞终端。
自旋锁的接口和互斥量的接口类似
可用pthread_spin_init多自旋锁初始化;用pthread_spin_destory反初始化。
#include <pthread.h>
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destory(pthread_spinlock_t *lock);
// 两函数,成功返回0,否则返回错误编号
pshared参数表进程共享属性,表明自旋锁是符合获取的:
若设为PTHREAD_PROCESS_SHARED,则自旋锁能被可以访问锁底层内存的线程所获取,
即使线程属于不同的进程;
若设为PTHREAD_PROCESS_PRIVATE,则自旋锁只能被初始化该锁的进程内的线程访问。
自旋锁加锁/解锁:
可用pthread_spin_lock或pthread_spin_trylock对自旋锁加锁;
前者在获取锁前一直自选,后者若不能获取锁立即返回EBUSY错误。
二者都可用pthread_spin_unlock解锁。
#include <pthread.h>
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
//所有函数,成功返回0,否则返回错误编号
note:若自旋锁为解锁状态,无需自旋就可对其加锁,
若线程已对其加锁,结果是未定义的,
调用pthread_spin_lock会返回EDEADLK或其它错误或可能永久自选。
具体行为依赖于具体实现。
对没加锁的自旋锁解锁,结果也是未定义的。
11.4.7屏障
屏障(barrier)是用户协调多个线程并行工作的同步机制。
屏障允许每个线程等待,直到所有相关线程都达到某点,然后从该点继续执行。
如pthread_join就是一种屏障,允许一个线程等待,直到另一个线程退出。
但屏障对象的概念更广,允许任意数量的线程等待,直到所有线程完成处理工作,
而线程不需退出,所有线程达到屏障后可继续工作。
屏障初始化
用pthread_barrier_init对屏障初始化分配资源,用pthread_barrier_destory释放资源。
#include <pthread.h>
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr,
unsigned int count);
int pthread_barrier_destory(pthread_barrier_t *barrier);
//两函数,成功则返回0,否则返回错误编号
初始化屏障时,可用count指定,
在允许所有线程继续运行前,必须达到屏障的线程数目。
可用pthread_barrier_wait函数表明,线程已完成工作,准备等待其它线程:
#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier);
//成功返回0或PTHREAD_BARRIER_SERIAL_THREAD,否则返回错误编号
调用pthread_barrier_wait的线程在屏障计数未满足条件时,会进入休眠状态。
若该线程是最后一个调用pthread_barrier_wait的线程,就满足了屏障计数,所有线程都被唤醒。
对任意一个线程,pthread_barrier_wait返回了PTHREAD_BARRIER_SERIALTHREAD,
剩下的线程的返回值是0。
这使得一个线程可作为主线程,它可工作在其它线程已完成工作结果上。
屏障重用
一旦达到了屏障计数,且线程处于非阻塞状态,屏障就可被重用。
但除非在调用pthread_barrier_destory之后有调用了pthread_barrier_init对计数另赋它值,
否则屏障计数不会变。
eg:
// 一个任务上合作的多个线程间 如何用屏障进行同步
#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>
#define NTHR 8 /* number of threads */
#define NUMNUM 8000000L /* number of numbers to sort */
#define TNUM (NUMNUM/NTHR) /* number to sort per thread */
long nums[NUMNUM];
long snums[NUMNUM];
pthread_barrier_t b;
#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t ,
int (*)(const void *, const void *));
#endif
/*
* Compare two long integers (helper function for heapsort)
*/
int complong (const void *arg1, const void *arg2)
{
long l1 = *(long *)arg1;
long l2 = *(long *)arg2;
if (l1 == l2)
{
return 0;
}
else if(l1 < l2)
{
return -1;
}
else
{
return 1;
}
}
/*
* Worker thread to sort a portion of the set of numbers.
*/
void *thr_fn(void *arg)
{
long idx = (long)arg;
//原书是heapsort,但配置原因链接不到heapsort(在/usr/include/bsd/stdlib.h中)
qsort(&nums[idx], TNUM, sizeof(long), complong);
pthread_barrier_wait(&b);
/*
* Go off and perforn more work ...
*/
return ((void*)0);
}
/*
* Merge the results of the individual sorted ranges.
*/
void merge()
{
long idx[NTHR];
long i, minidx, sidx, num;
for (i = 0; i < NTHR; i++)
{
idx[i] = i*TNUM;
}
for (sidx = 0; sidx < NUMNUM; sidx++)
{
num = LONG_MAX;
for (i = 0; i < NTHR; i++)
{
if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num))
{
num = nums[idx[i]];
minidx = i;
}
}
snums[sidx] = nums[idx[minidx]];
idx[minidx]++;
}
}
int main()
{
unsigned long i;
struct timeval start, end;
long long startusec, endusec;
double elapsed;
int err;
pthread_t tid;
/*
* Create the initial set of numbers to sort.
*/
srandom(1);
for(i = 0; i < NUMNUM; i++)
{
nums[i] = random();
}
/*
* Create 0 threads to sort the numbers.
*/
gettimeofday(&start, NULL);
pthread_barrier_init(&b, NULL, NTHR+1);
for ( i = 0; i < NTHR; i++)
{
err = pthread_create(&tid, NULL, thr_fn, (void*)(i*TNUM));
if (err != 0)
{
err_exit(err, "can't create thread.");
}
}
pthread_barrier_wait(&b);
merge();
gettimeofday(&end, NULL);
/*
* Print the sorted list
*/
startusec = start.tv_sec*1000000 + start.tv_usec;
endusec = end.tv_sec*100000 + end.tv_usec;
elapsed = (double)(endusec - startusec)/1000000.0;
printf("sort took %.4f seconds\n", elapsed);
for (i = 0; i < NUMNUM; i++)
{
printf("%ld\n", snums[i]);
}
exit(0);
}
程序说明:
使用8个线程分解了800w个数的排序工作。
每个线程用快排对100w个数排序,然后主线程调用一个函数将这些结果合并。
并不需要用pthread_barrier_wait的返回值PTHREAD_BARRIER_SERIAL_THREAD决定
哪个线程执行合并操作。
因为使用了主线程完成该操作,即把屏障计数设为线程数加1的原因。
网友评论