<< ELEG652-pthreads-tutorial >>
1 First PTHREAD Example: hello world
n 个 workerRoutine thread 均 print caller 分配给 它 的 id + "hello word"
返回 分配给 它的 id, 使得 caller (main 线程) 知道 哪个 workerRoutine thread 返回
2 Incrementing Global Counter: mutex
n 个 workerRoutine thread 均 并发 增加 1个 global couter
3 Reacting on Specific Events & selfDefined barrierWithCv
Barrier
思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进
内部结构
typedef struct BarrierTag
{
pthread_mutex_t *pMutex;
pthread_cond_t *pCond;
ulong_t *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数,
} Barrier;
Barrier 3 函数
void barrier_init(Barrier* pBarrier, void *, ulong_t count);
void barrier_destroy(Barrier* pBarrier);
void barrier_wait(Barrier* pBarrier);
workRoutine() context
typedef struct context_s
{
Barrier* pBarrier;
ulong_t id;
} Context;
pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
4 Creating Barriers More Easily
理解了 part3 中 Barrier 的实现, 再 用 pthread 中的 pthread_barrier_t 就很容易
相当于 将 part3 中 Barrier 相关部分封装
4.1 重构 3: 封装 -> 实现出 简化版 barrier 的 数据结构 + 3个函数
4.2 用 pthread 提供的 barrier
只需把 4.1 中 pthreadBarrier.h /pthreadBarrier.c 去掉, 改用 pthread 提供的 barrier
// ===
1 First PTHREAD Example: hello world
n 个 workerRoutine thread 均 print caller 分配给 它 的 id + "hello word"
返回 分配给 它的 id, 使得 caller (main 线程) 知道 哪个 workerRoutine thread 返回
Hello, World! . . . Output
// === 1 hello.c
/* $ gcc hello.c -o a -pthread
$ ./a
*/
#include <stdio.h> // for snprintf(), fprintf(), printf(), puts()
#include <stdlib.h> // exit()
#include <errno.h> // errno (duh!)
#include <pthread.h> // pthread_*
#define MAX_NUM_WORKERS 4UL
typedef struct WorkerIdTag
{
unsigned long id
}WorkerId; // only one mem, use struct too
void* workerRoutine(void* arg)
{
// Remember, pthread_t objects are descriptors, not just IDs!
// (1) Retrieving allocated current thread's ID
WorkerId* self = (WorkerId*) arg;
// (2) fill msg
char msg[100];
int err = snprintf(msg, sizeof(msg),
"[%lu]\t Hello, World!\n", self->id);
if (err < 0)
{
perror("snprintf");
exit(errno);
}
// (3) output msg
puts(msg);
// (4) return allocated current thread's ID
// so that "main" thread knows which thread has returned
return arg;
}
// 宏 + 可变参数 __VA_ARGS__: 第2/3参数 按格式 %lu %lu 格式化
#define ERR_MSG(prefix, ...) \
fprintf(stderr, prefix " %lu out of %lu threads", __VA_ARGS__)
int main(void)
{
pthread_t workers [ MAX_NUM_WORKERS ];
WorkerId workersIds [ MAX_NUM_WORKERS ];
void *pRetVal = NULL;
// (1) create n workerRoutine threads
puts("[main]\tCreating workers...\n");
for (unsigned long i = 0; i < MAX_NUM_WORKERS; ++i)
{
workersIds[i].id = i;
if (0 != pthread_create(&workers[i], NULL, workerRoutine, &workersIds[i]) )
{
ERR_MSG("Could not create thread", i, MAX_NUM_WORKERS);
exit(errno);
}
}
// (2) join 0~(n-1) threads one by one
puts("[main]\tJoining the workers...\n");
for (unsigned long i = 0; i < MAX_NUM_WORKERS; ++i)
{
if (0 != pthread_join(workers[i], (void**) &pRetVal) )
ERR_MSG("Could not join thread", i, MAX_NUM_WORKERS);
else
{
// (3) convert pRetVal to target type
WorkerId* wid = (WorkerId*) pRetVal;
printf("[main]\tWorker N.%lu has returned!\n", wid->id);
}
}
return 0;
}
// one possible print
[main] Creating workers...
[main] Joining the workers...
[1] Hello, World!
[0] Hello, World!
[3] Hello, World!
[main] Worker N.0 has returned!
[main] Worker N.1 has returned!
[2] Hello, World!
[main] Worker N.2 has returned!
[main] Worker N.3 has returned!
2 Incrementing Global Counter: mutex
n 个 workerRoutine thread 均 并发 增加 1个 global couter
// ======1 globalSum.h
#ifndef GLOBAL_SUM_H
#define GLOBAL_SUM_H
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <pthread.h>
// #include "utils.h"
typedef struct globalSumTag
{
unsigned long *pValue;
pthread_mutex_t *pMutex;
} globalSum;
#endif // GLOBAL_SUM_H
// ======2 IncreaseGlobalCounter.char
#include "globalSum.h"
#define MAX_NUM_WORKERS 20UL
typedef unsigned long ulong_t;
void* sum(void* arg)
{
globalSum* gs = (globalSum*) arg;
pthread_mutex_lock ( gs->pMutex ); /* Critical section starts here */
++*gs->pValue; // ++*(gs->pValue)
pthread_mutex_unlock ( gs->pMutex ); /* Critical section ends here */
return NULL;
}
int main(void)
{
pthread_t threads [ MAX_NUM_WORKERS ];
globalSum gSs [ MAX_NUM_WORKERS ];
// (1) Note: "global" counter: calling thread stack var's ptr pass to newly created thread's start routine
// => stack var pass by ref => simulate global var
ulong_t counter = 0;
pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
// (2) create n threads
for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
{
// 1) every thread's internal ptr point to global counter
gSs[i] = (globalSum){ .pValue = &counter, .pMutex = &mut };
pthread_create(&threads[i], NULL, sum, &gSs[i]);
}
// (3) join
for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
pthread_join(threads[i], NULL);
// (4) output global counter's value
printf("%lu threads were running. Sum final value: %lu\n", MAX_NUM_WORKERS, counter);
return 0;
}
$ ./a
20 threads were running. Sum final value: 20
3 Reacting on Specific Events & selfDefined barrierWithCv
Barrier
思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进
内部结构
typedef struct BarrierTag
{
pthread_mutex_t *pMutex;
pthread_cond_t *pCond;
ulong_t *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数,
} Barrier;
// ======1 barrier.h
#ifndef BARRIER_H
#define BARRIER_H
#include <pthread.h>
#define SET_BARRIER_MSG(...) \
snprintf(buffer, sizeof(buffer), __VA_ARGS__)
#define NOT_LAST_TO_REACH \
"[%lu]\tI’m NOT the last one to reach the barrier!"
#define LAST_TO_REACH \
"[%lu]\tI am the last to reach the barrier! Waking up the others."
#define MAX_NUM_WORKERS 3
typedef unsigned long ulong_t;
// Note Barrier 思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进
typedef struct BarrierTag
{
pthread_mutex_t *pMutex;
pthread_cond_t *pCond;
ulong_t *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数,
} Barrier;
typedef struct context_s
{
Barrier* pBarrier;
ulong_t id;
} Context;
#endif // BARRIER_H
// ======2 barrier.c
#include "barrier.h"
#include <stdio.h>
void *workerRoutine(void *pContext)
{
char buffer[81];
// (1) 取 arg
Context *pC = (Context*) pContext;
printf("[%lu]\tReaching the barrier...\n", pC->id);
// ======= barrier start
pthread_mutex_lock ( pC->pBarrier->pMutex ); // === (2) lock
// (3) Barrier 上 阻塞的 线程数 --
--*pC->pBarrier->pCount;
// (4) current thread is not the last to reach Barrier
// print + cv.wait
printf("*pC->pBarrier->pCount = %lu\n", *pC->pBarrier->pCount);
if (*pC->pBarrier->pCount > 0)
{
SET_BARRIER_MSG(NOT_LAST_TO_REACH, pC->id);
pthread_cond_wait ( pC->pBarrier->pCond, pC->pBarrier->pMutex ); // === unlock & relock
}
else // ... the last: print
{
SET_BARRIER_MSG(LAST_TO_REACH, pC->id);
}
puts(buffer); // Note: buffer 在 SET_BARRIER_MSG 中被 fill
pthread_mutex_unlock ( pC->pBarrier->pMutex ); // === (5) unlock
// (6) wake up others
pthread_cond_broadcast( pC->pBarrier->pCond );
// ====== barrier end
printf("[%lu]\tAfter the barriern\n", pC->id);
return NULL;
}
// ======3 barrierWithCv.c
#include "barrier.h"
void *workerRoutine(void *pContext);
int main(void)
{
pthread_t threads [ MAX_NUM_WORKERS ];
Context contexts [ MAX_NUM_WORKERS ];
pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
ulong_t count = MAX_NUM_WORKERS;
// (1) 建 + init a Barrier
Barrier barrier = {.pMutex = &mut, .pCond = &cond, .pCount = &count};
// (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
{
contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
}
// (2) join
for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
pthread_join(threads[i], NULL);
return 0;
}
$ gcc barrierWithCv.c barrier.c -o a -pthread
$ ./a
[1] Reaching the barrier...
*pC->pBarrier->pCount = 2
[0] Reaching the barrier...
*pC->pBarrier->pCount = 1
[2] Reaching the barrier...
*pC->pBarrier->pCount = 0
[2] I am the last to reach the barrier! Waking up the others.
[1] I’m NOT the last one to reach the barrier!
[1] After the barriern
[0] I’m NOT the last one to reach the barrier!
[0] After the barriern
[2] After the barriern
4 Creating Barriers More Easily
理解了 part3 中 Barrier 的实现, 再 用 pthread 中的 pthread_barrier_t 就很容易
相当于 将 part3 中 Barrier 相关部分封装
4.1 重构 3: 封装 -> 实现出 简化版 barrier 的 数据结构 + 3个函数
// ====== 1 pthreadBarrier.h
#ifndef PTHREADBARRIER_H
#define PTHREADBARRIER_H
#include <pthread.h>
/*
#define SET_BARRIER_MSG(...) \
snprintf(buffer, sizeof(buffer), __VA_ARGS__)
#define NOT_LAST_TO_REACH \
"[%lu]\tI’m NOT the last one to reach the barrier!"
#define LAST_TO_REACH \
"[%lu]\tI am the last to reach the barrier! Waking up the others."
*/
typedef unsigned long ulong_t;
// Note Barrier 思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进
typedef struct BarrierTag
{
pthread_mutex_t *pMutex;
pthread_cond_t *pCond;
ulong_t count; // threadNumOnBarrier: Barrier 上要挡住的 线程数,
} Barrier;
void barrier_init(Barrier* pBarrier, void *, ulong_t count);
void barrier_destroy(Barrier* pBarrier);
void barrier_wait(Barrier* pBarrier);
#endif // BARRIER_H
// ====== 2 pthreadBarrier.c
#include "pthreadBarrier.h"
#include <stdio.h>
#include <stdlib.h>
void barrier_init(Barrier* pBarrier, void *pMutexCvStruct, ulong_t count)
{
if(pMutexCvStruct == NULL)
{
// heap 内存: 因为 heap 在 other func 也要用
pthread_mutex_t* pMutex = malloc( sizeof(pthread_mutex_t) );
pBarrier->pMutex = pMutex;
pthread_mutex_init(pBarrier->pMutex, NULL); // (1) 动态初始化 动态 mutex
pthread_cond_t* pCond = malloc( sizeof(pthread_cond_t) );
pBarrier->pCond = pCond;
pthread_cond_init(pBarrier->pCond, NULL); // (2) 动态初始化 动态 mutex
}
else
{
/* */
}
pBarrier->count = count;
}
void barrier_destroy(Barrier* pBarrier)
{
pthread_cond_destroy(pBarrier->pCond);
pthread_mutex_destroy(pBarrier->pMutex);
}
void barrier_wait(Barrier* pBarrier)
{
// === (1) lock
pthread_mutex_lock ( pBarrier->pMutex );
// (2) Barrier 上 阻塞的 线程数 --
--pBarrier->count;
// (3) current thread is not the last to reach Barrier
// print + cv.wait
printf("pBarrier->count = %lu\n", pBarrier->count);
if (pBarrier->count > 0)
{
//SET_BARRIER_MSG(NOT_LAST_TO_REACH, id);
pthread_cond_wait ( pBarrier->pCond, pBarrier->pMutex ); // === unlock & relock
}
else // ... the last: print
{
// SET_BARRIER_MSG(LAST_TO_REACH, id);
}
// puts(buffer); // Note: buffer 在 SET_BARRIER_MSG 中被 fill
// === (4) unlock
pthread_mutex_unlock ( pBarrier->pMutex );
// (5) wake up others (thread)
pthread_cond_broadcast( pBarrier->pCond );
}
// ====== 3 barrier.h
#ifndef BARRIER_H
#define BARRIER_H
#include "pthreadBarrier.h"
#define MAX_NUM_WORKERS 3
typedef struct ContextTag
{
Barrier* pBarrier;
ulong_t id;
} Context;
#endif // BARRIER_H
// ====== 4 barrier.c
#include "barrier.h"
#include <stdio.h>
void *workerRoutine(void *pContext)
{
// char buffer[81];
// (1) 取 arg
Context *pC = (Context*) pContext;
printf("[%lu]\tReaching the barrier...\n", pC->id);
// ======= barrier start
barrier_wait(pC->pBarrier);
// ====== barrier end
printf("[%lu]\tAfter the barriern\n", pC->id);
return NULL;
}
// ====== 5 barrierWithCv.c
#include "barrier.h"
void *workerRoutine(void *pContext);
int main(void)
{
pthread_t threads [ MAX_NUM_WORKERS ];
Context contexts [ MAX_NUM_WORKERS ];
ulong_t count = MAX_NUM_WORKERS;
// (0) 建 Barrier
Barrier barrier;
// (1) init Barrier
barrier_init(&barrier, NULL, count);
// (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
{
contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
}
// (3) join
for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
pthread_join(threads[i], NULL);
// (4)
barrier_destroy(&barrier);
return 0;
}
$ gcc barrierWithCv.c barrier.c pthreadBarrier.c -o a -pthread
$ ./a
[2] Reaching the barrier...
pBarrier->count = 2
[1] Reaching the barrier...
pBarrier->count = 1
[0] Reaching the barrier...
pBarrier->count = 0
[0] After the barriern
[2] After the barriern
[1] After the barriern
4.2 用 pthread 提供的 barrier
只需把 4.1 中 pthreadBarrier.h /pthreadBarrier.c 去掉, 改用 pthread 提供的 barrier
效果 与 4.1 相同
int pthread barrier init( pthread barrier t restrict* barrier, const
pthread barrierattr t *restrict attr, unsigned count )
int pthread barrier destroy( pthread barrier t restrict* barrier )
int pthread barrier wait( pthread barrier t restrict* barrier )
// ====== 1 barrier.h
#ifndef BARRIER_H
#define BARRIER_H
#include <pthread.h> // #include "pthreadBarrier.h"
#define MAX_NUM_WORKERS 3
typedef unsigned long ulong_t;
typedef struct ContextTag
{
pthread_barrier_t *pBarrier; // Barrier* pBarrier;
ulong_t id;
} Context;
#endif // BARRIER_H
// ====== 2 barrier.c
#include "barrier.h"
#include <stdio.h>
void *workerRoutine(void *pContext)
{
// char buffer[81];
// (1) 取 arg
Context *pC = (Context*) pContext;
printf("[%lu]\tReaching the barrier...\n", pC->id);
// ======= barrier start
pthread_barrier_wait(pC->pBarrier);
// ====== barrier end
printf("[%lu]\tAfter the barriern\n", pC->id);
return NULL;
}
// ====== 3 barrierWithCv.c
#include "barrier.h"
void *workerRoutine(void *pContext);
int main(void)
{
pthread_t threads [ MAX_NUM_WORKERS ];
Context contexts [ MAX_NUM_WORKERS ];
ulong_t count = MAX_NUM_WORKERS;
// (0) 建 Barrier
pthread_barrier_t barrier; // Barrier barrier;
// (1) init Barrier
pthread_barrier_init(&barrier, NULL, count); // barrier_init(&barrier, NULL, count);
// (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
{
contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
}
// (3) join
for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
pthread_join(threads[i], NULL);
// (4)
pthread_barrier_destroy(&barrier); // barrier_destroy(&barrier);
return 0;
}
网友评论