chapter5 高级线程编程
[1] 一次性初始化
data 初始化
[2] 属性对象
创建 thread / mutex / cv 时, 控制它们的特征
[3] 线程特定数据
数据库机制
允许 a library
1] 关联 data 与其遇到的 各 threadBlocks
2] retrieve data later
Pthreads 实时调度 工具(facilities)
可预测 交互
5.1 一次性(One-time) 初始化
pthread_once_t onceControl = PTHREAD_ONCE_INIT; // 控制变量
int pthread_once (pthread_once_t *onceControl,
void (*init_routine) (void) );
(1) 需要 且只能做一次 的 事情, 如
1] 初始化 mutexes
2] 创建 thread-specific data keys
然后 才能真正 创建 关联的 thread
(2) 一次性初始化
顺序编程
bool 变量 管理
1] 静态初始化为 0
2] 依赖 code test bool
若为 0, 可初始化 -> 置为 1
3] 后续 check -> 1 -> 跳过初始化
多线程
初始化 state 是 shared invariant, 必须用 mutex 保护
[1] 自定义 一次性初始化 code
bool 变量 + statically initialized mutex
[2] pthread_once
vs
1) 大多 case, [1] 更方便 且 高效
2) 用 pthread_once 的 主要原因
最初, pthreads `不允许 静态初始化 mutex`
=> 要想用 mutex
oneTimeInitCode <- - -
| |
必须先调 | | 为了能 1次性初始化, 又 必须调
|/ |
pthread_mutex_init --
初始化 mutex
=> 死循环
解决: pthread_once
静态初始化 mutex 被 pthreads 支持 后,
pthread_once 保留为 方便的函数
Note
若 方便, 就 用 pthread_once, 但不必须用 pthread_once
(3) pthread_once 思想
双检测 + memory barrier ( compiler + cpu )
第1次 check 控制变量( 标志 初始化 是否已完成)
1] 已完成, 直接 return
2] else
lll_lock: gcc 内嵌宏指令 -> memory barrier => 避免了 双检测 问题
第2次检测, 若 已完成, 直接 lll_unlock
else
调 oneTimeInitCode (不带参数)
modify 控制变量( record 初始化 已完成)
=> 只有 第1个 获得 once_lock 的 线程 真正执行 oneTimeInitCode
其他 线程 要么直接 return, 要么 wait 直到 第1个线程 unlock 后 再 unlock -> 完成
// gcc 内嵌宏指令
#define lll_lock(futex, private) \
__asm __volatile (__lll_lock_asm_start \
... // 省略了其他指令
: "memory");
=> 本质是: compiler memory barrier
而 CPU memory barrier 已由 x86/x64 硬件体系 保证
=> 不会存在
1] 编译器 重排问题: var 放 register / 内嵌汇编 与 之前指令 重排
2] CPU 重排问题: new 3 条语句 重排 (等)
int
pthread_once (pthread_once_t *once_control;
void (*init_routine) (void) )
{
if (*once_control == PTHREAD_ONCE_INIT) //
{
lll_lock (once_lock, LLL_PRIVATE);
if (*once_control == PTHREAD_ONCE_INIT) // double check
{
init_routine ();
*once_control = !PTHREAD_ONCE_INIT;
}
lll_unlock (once_lock, LLL_PRIVATE);
}
return 0;
}
(4) 例
/* ====== 1. pthread_once.c
只能执行1次的 func/once_init_routine: 如 mutex init
pthread_mutex_init(&mutex, NULL)
calling thread 与 newly created thread 均
pthread_once(&once_block, once_init_routine)
*/
#include <pthread.h>
#include "errors.h"
// (0) control var: must statically init
pthread_once_t once_block = PTHREAD_ONCE_INIT;
// global mutex
pthread_mutex_t mutex;
void once_init_routine(void) // no arg
{
printf("\n === mutex init: only once \n");
pthread_mutex_init(&mutex, NULL);
}
void *thread_routine(void *arg)
{
// (2) subThread pthread_once
pthread_once(&once_block, once_init_routine);
pthread_mutex_lock(&mutex); // === lock
pthread_mutex_unlock(&mutex); // === unlock
return NULL;
}
int main()
{
pthread_t threadId;
pthread_create(&threadId, NULL,
thread_routine, NULL);
// (1) callingThread pthread_once
pthread_once(&once_block, once_init_routine);
pthread_mutex_lock(&mutex); // === lock
pthread_mutex_unlock(&mutex); // === unlock
pthread_join(threadId, NULL); // join
}
// print
=== mutex init: only once
Main ThreadBlock has locked the mutex.
thread_toutine has locked the mutex.
// ====== 2. myCallOnce.c
// 自定义 一次性初始化 code
#include <pthread.h>
#include <stdio.h>
int CallOnceFinished = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void once_init_routine(void) // no arg
{
printf("\n === mutex init: only once \n");
// ...
}
// Note: 适用于 编译器 重排 和 CPU 重排 问题 不存在 或 已被 pthread_mutex_lock 解决
int myCallOnce( void (*init_routine) (void) )
{
if (CallOnceFinished == 0)
{
pthread_mutex_lock(&mutex);
if (CallOnceFinished == 0) // double check
{
init_routine();
CallOnceFinished = 1;
}
pthread_mutex_unlock(&mutex);
}
return 0;
}
void *thread_routine(void *arg)
{
// (2) subThread myCallOnce
myCallOnce(once_init_routine);
pthread_mutex_lock(&mutex); // === lock
pthread_mutex_unlock(&mutex); // === unlock
return NULL;
}
int main()
{
pthread_t threadId;
pthread_create(&threadId, NULL,
thread_routine, NULL);
// (1) callingThread myCallOnce
myCallOnce(once_init_routine);
pthread_mutex_lock(&mutex); // === lock
pthread_mutex_unlock(&mutex); // === unlock
pthread_join(threadId, NULL); // join
}
// print
=== mutex init: only once
5.2 属性 对象
目标
[1] 保持 interface 简单
[2] 同时为 "expert" 功能 提供 `复用性`
无需 更改现有代码, 即可 添加选项
无需创建 非标准参数, 即可提供 专门选项(Pthreads 标准属性之外)
可视为私有结构
成员读/写
调 特殊函数
// 例
读/写 线程属性对象 中 stacksize 属性
pthread_attr_getstacksize
pthread_attr_setstacksize
(1) Mutex 属性
pthread_mutexattr_t attr;
int pthread_mutexattr_init (pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy ( pthread_mutexattr_t *attr);
#ifdef _POSIX_THREAD_PROCESS_SHARED // 进程共享 => 扩展到适用于 多进程中 各 线程
int pthread_mutexattr_getpshared (pthread_mutexattr_t *attr, int *pshared);
int pthread_mutexattr_setpshared (pthread_mutexattr_t *attr, int pshared);
#endif
(2) Cv 属性
pthread_condattr_t attr;
int pthread_condattr_init (pthread_condattr_t *attr) ;
in t pthread_condattr_destroy (pthread_condattr_t *attr);
#ifdef _POSIX_THREAD_PROCESS_SHARED
int pthread_condattr_getpshared (pthread_condattr_t *attr, int *pshared);
int pthread_condattr_setpshared (pthread_condattr_t *attr, int pshared);
#endif
(3) ThreadBlock 属性
pthread_attr_t attr;
int pthread_attr_init (pthread_attr_t *attr);
int pthread_attr_destroy (pthread_attr_t *attr);
int pthread_attr_getdetachstate (pthread_attr_t *attr, int *detachState);
int pthread_attr_setdetachstate (pthread_attr_t *attr, int detachState)
# ifdef _POSIX_THREAD_ATTR_STACKSI2E
int pthread_attr_getstacksize (pthread_attr_t *attr, size_t *stackSize );
int pthread_attr_setstacksize (pthread_attr_t *attr, size_t stackSize);
#endif
# ifdef _posix_thread_atpr_stackaddr
int pthreadLattr_getstackaddr (pthread_attr_t *attr, void *stackAddr);
int pthread_attr_setstackaddr (pthread_attr_t *attr, void **stackAddr);
Set stack size is not very portable.
Set stack address is less portable
(4) 例
// ====== 1. threadAttr.c
#include <limits.h>
#include <pthread.h>
#include "errors.h"
void *thread_routine(void *arg)
{
printf("The thread is here\n");
return NULL;
}
int main()
{
pthread_attr_t threadAttr;
size_t stack_size;
pthread_t threadId;
pthread_attr_init(&threadAttr);
pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_DETACHED);
pthread_attr_getstacksize(&threadAttr, &stack_size);
printf("Default stack size is %u; minimum is %u\n",
stack_size, PTHREAD_STACK_MIN);
pthread_attr_setstacksize(&threadAttr, PTHREAD_STACK_MIN*2);
pthread_create(&threadId, &threadAttr, thread_routine, NULL);
pthread_exit(NULL);
return 0;
}
5.3 线程特定数据
(0) summary
1) 跨 函数调用 之 private data & shared data
——————————————————————————————————————
单线程 下
——————————————————————————————————————
目标 | private data 跨 函数调用
——————————————————————————————————————
方法 | 静态(statically) 内存分配
——————————————————————————————————————
name’s | function/file: static
scope | global : extern
——————————————————————————————————————
——————————————————————————————————————————————————
多线程
——————————————————————————————————————————————————
进程内 所有线程 共享 进程地址空间 =>
1] static
2] extern
3] 进程 heap
vaule 是 被任意线程 最后写入的值
the pTSD last written by any thread
——————————————————————————————————————————————————
[1] 线程 唯一的真正 "private" storage(存储) 是 处理器 registers
stack addresses 可 共享
(thread) "owner" 将 stack address 暴露给 another thread 时
[2] register 和 "private" stack 不能替代 非线程代码中 持久静态存储
[3] 当需要 private variable 时, 必须先决定
1] 是 多线程 share
1> static
2> extern
与 单线程 一样, 但必须 synchronize 对 shared data 的 访问
2] 还是 各线程 private: have its own pTSD
store 到 somewhere -> 能被 each thread 找到(locate)
1> some cases 可用
static data
table
itemValue(如 thread’s pthread_t) 是
unique(唯一) to each thread
2> many 有趣的 cases
无法预测 有多少个线程 调用该 function
|
| 解决
2) 通用方案 - - - -
[1] 在 每个线程 中 分配 some heap & 存 pTSD
难点: 如何 find 任一 thread 的 private data
解决:
struct PrivateData
{
private pTSD
thread’s identifier (pthread_t)
};
list<PrivateData>
问题
1] search list -> 慢
2] 很难恢复 由 terminated threadBlocks 分配的 storage
threadFunc 无法知道 thread 何时终止
[2] 新接口不应依赖 implicit 持久存储
应该要求 caller 分配必要的 persistent state, 告诉你 state location
优点
1] 许多情况下
可 避免 内部同步
极少数情况下
caller 希望在 线程间 share 持久状态 时, caller 可提供 同步
2] caller 可选择 在 单个线程中 分配 多个 state buf
=> 多个 独立 函数调用 序列 -> 无冲突
[3] 问题是我们 经常需要支持 implicit persistent states
making 现有 interface thread-safe, 但
不能
1] 增加 arg
2] 要求 caller 为了我们方便而 维持 a new data structure
=> 引入 thread-specific data(TSD)
3) TSD 机制
shared key
/ \ 关联
/ \
线程 1 private pTSD ... 线程 n private pTSD
(1) 创建 ...
pthread_key_t key;
int pthread_key_create(pthread_key_t *key, void (*engineeThreadDtor)(void * ) );
int pthread_key_delete(pthread_key_t key );
1) pthread_key_t 变量
不透明
create
只创建1次: 调 pthread_key_create
最简方法 pthread_once
delete
当 no thread has a pTSD for that key
引用计数
2) dtor
= NULL 的 含义: key has no pTSD
调用 时机
线程终止时
3) key 关联的 TSD's pTSD
[1] 初始: 在所有线程中均为 null
[2] 线程终止 & 调 dtor 前: 被置 null
那 dtor 中 如何获取 key 关联的 previous TSD's pTSD ?
通过 dtor 的 参数 pass
(2) 使用 ...
int pthread_setspecific (pthread_key_t key, const void *pTSD);
void *pthread_getspecific (pthread_key_t key);
(3) 用 dtor
(4) 例
// ====== 1. thraedSpecificData_Once.c
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h> // malloc
#include <unistd.h> // sleep
// (1) TSD struct
typedef struct TSD
{
pthread_t threadId;
char *str;
} TSD;
// (2) key
pthread_key_t key;
pthread_once_t onceFlag = PTHREAD_ONCE_INIT;
void onceRoutine(void)
{
printf("init key\n");
// (5)
pthread_key_create(&key, NULL);
}
void *thread_routine(void *arg)
{
TSD *pTSD;
// (4)
pthread_once(&onceFlag, onceRoutine);
pTSD = (TSD*)malloc(sizeof(TSD));
// (6) setspecific
pthread_setspecific(key, pTSD);
pTSD->threadId = pthread_self();
pTSD->str = (char*)arg;
// (7) getspecific
pTSD = (TSD*)pthread_getspecific(key);
printf("%s starting...\n", pTSD->str);
sleep(2);
pTSD = (TSD*)pthread_getspecific(key);
printf("%s done...\n", pTSD->str);
return NULL;
}
int main()
{
pthread_t thread1, thread2;
// (3) Note: 字符串 字面值: 作 arg 传递, 传的是 其 ptr
pthread_create(&thread1, NULL, thread_routine, "thread 1");
pthread_create(&thread2, NULL, thread_routine, "thread 2");
pthread_exit(NULL);
}
// print
init key
thread 1 starting...
thread 2 starting...
thread 1 done...
thread 2 done...
// ====== 2. thraedSpecificData_Destory.c
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h> // malloc
#include <unistd.h> // sleep
typedef struct PrivateTag
{
pthread_t threadId;
char *str;
} ThreadPrivateData;
pthread_key_t key;
pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
long keyRefCount = 0;
void keyDtor(void *pArg)
{
// (1) Note: use previous TSD value: from keyDtor's para
ThreadPrivateData *pTPD = (ThreadPrivateData*)pArg;
printf("thread \"%s\" exiting...\n", pTPD->str);
// (2) free TSD memory
free(pArg);
pthread_mutex_lock(&mut); // === lock
// (3) Note: delete key under refCount & mutex
keyRefCount--;
if (keyRefCount <= 0)
{
pthread_key_delete(key);
printf("key delete...\n");
}
pthread_mutex_unlock(&mut); // === unlock
}
void *getPrivateDataFromKey(void)
{
void *pTPD;
pTPD = pthread_getspecific(key);
if (pTPD == NULL)
{
pTPD = malloc(sizeof(ThreadPrivateData) );
pthread_setspecific(key, pTPD);
}
return pTPD;
}
void *threadRoutine(void *arg)
{
ThreadPrivateData *pTPD;
pTPD = (ThreadPrivateData*) getPrivateDataFromKey();
pTPD->threadId = pthread_self();
pTPD->str = (char*)arg;
printf("thread \"%s\" starting...\n", pTPD->str);
sleep(2);
return NULL;
}
int main()
{
pthread_t thread_1, thread_2;
ThreadPrivateData *pTPD;
// (4)
pthread_key_create(&key, keyDtor);
keyRefCount = 3; // 3 个线程 用 key
// main thread
pTPD = (ThreadPrivateData*)getPrivateDataFromKey();
pTPD->threadId = pthread_self();
pTPD->str = "Main thread";
pthread_create(&thread_1, NULL, threadRoutine, "thread 1");
pthread_create(&thread_2, NULL, threadRoutine, "thread 2");
pthread_exit(NULL);
}
// print
thread "thread 1" starting...
thread "thread 2" starting...
thread "Main thread" exiting...
thread "thread 1" exiting...
thread "thread 2" exiting...
key delete...
5.4 取消
1 延迟 取消
2 异步 取消
3 Cleaning up
5.5 实时调度
(1) posix 实时选择
(2) 调度策略 和 优先性
(3) 竞争范围 和 分配域
(4) 实时调度 的 问题
(5) 有优先意识 的 mutexes
1) 优先 ceiling mutexs
2) 优先 继承 mutexes
5.6 线程 和 内核 实体
(1) 多 对 1 (用户级)
(2) 1 对 1 (内核级)
(3) 多 对 少 (2 个级别)
网友评论