美文网首页
pthreads-tutorial

pthreads-tutorial

作者: my_passion | 来源:发表于2022-04-24 21:44 被阅读0次
    << ELEG652-pthreads-tutorial >>
    
    1   First PTHREAD Example: hello world
    
        n 个 workerRoutine thread 均 print caller 分配给 它 的 id + "hello word"
        返回 分配给 它的 id, 使得 caller (main 线程) 知道 哪个 workerRoutine thread 返回
        
    2   Incrementing Global Counter: mutex
    
        n 个 workerRoutine thread 均 并发 增加 1个 global couter
        
    3   Reacting on Specific Events & selfDefined barrierWithCv 
        
        Barrier 
            思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进
            
            内部结构 
                typedef struct BarrierTag 
                {
                    pthread_mutex_t *pMutex;
                    pthread_cond_t  *pCond;
                    ulong_t         *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数, 
                } Barrier;
            
            Barrier 3 函数 
                void barrier_init(Barrier* pBarrier, void *, ulong_t count);
                void barrier_destroy(Barrier* pBarrier);
                void barrier_wait(Barrier* pBarrier);
            
            workRoutine() context 
                typedef struct context_s 
                {
                    Barrier* pBarrier;  
                    ulong_t id;         
                } Context;
                    
                pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
            
    4   Creating Barriers More Easily 
    
        理解了 part3 中 Barrier 的实现, 再 用 pthread 中的 pthread_barrier_t 就很容易 
            
            相当于 将 part3 中 Barrier 相关部分封装
    

    4.1 重构 3: 封装 -> 实现出 简化版 barrier 的 数据结构 + 3个函数

        4.2 用 pthread 提供的 barrier 
            
            只需把 4.1 中 pthreadBarrier.h /pthreadBarrier.c 去掉, 改用 pthread 提供的 barrier 
    // ===
    

    1 First PTHREAD Example: hello world

        n 个 workerRoutine thread 均 print caller 分配给 它 的 id + "hello word"
        返回 分配给 它的 id, 使得 caller (main 线程) 知道 哪个 workerRoutine thread 返回
        
        Hello, World! . . . Output
        
        // === 1 hello.c
        /* $ gcc hello.c -o a -pthread
           $ ./a
        */
        #include <stdio.h>      // for snprintf(), fprintf(), printf(), puts()
        #include <stdlib.h>     // exit()
        #include <errno.h>      // errno (duh!)
        #include <pthread.h>    // pthread_* 
    
        #define MAX_NUM_WORKERS 4UL
    
        typedef struct WorkerIdTag
        { 
            unsigned long id 
        }WorkerId; // only one mem, use struct too
    
        void* workerRoutine(void* arg)
        {
            // Remember, pthread_t objects are descriptors, not just IDs!
            
            // (1) Retrieving allocated current thread's ID
            WorkerId* self = (WorkerId*) arg; 
            
            // (2) fill msg 
            char msg[100]; 
            int err = snprintf(msg, sizeof(msg),
                                "[%lu]\t Hello, World!\n", self->id);
            if (err < 0) 
            { 
                perror("snprintf"); 
                exit(errno); 
            }   
            
            // (3) output msg 
            puts(msg);
            
            // (4) return allocated current thread's ID 
            //      so that "main" thread knows which thread has returned
            return arg; 
        }
    
        // 宏 + 可变参数 __VA_ARGS__: 第2/3参数 按格式 %lu %lu 格式化
        #define ERR_MSG(prefix, ...) \
            fprintf(stderr, prefix " %lu out of %lu threads", __VA_ARGS__)
    
        int main(void) 
        {
            pthread_t workers [ MAX_NUM_WORKERS ];
            WorkerId workersIds [ MAX_NUM_WORKERS ];
            void *pRetVal = NULL;
            
            // (1) create n workerRoutine threads
            puts("[main]\tCreating workers...\n");
            for (unsigned long i = 0; i < MAX_NUM_WORKERS; ++i) 
            {
                workersIds[i].id = i;
                if (0 != pthread_create(&workers[i], NULL, workerRoutine, &workersIds[i]) )
                { 
                    ERR_MSG("Could not create thread", i, MAX_NUM_WORKERS);
                    exit(errno); 
                }
            }
            
            // (2) join 0~(n-1) threads one by one
            puts("[main]\tJoining the workers...\n");
            for (unsigned long i = 0; i < MAX_NUM_WORKERS; ++i) 
            {       
                if (0 != pthread_join(workers[i], (void**) &pRetVal) ) 
                    ERR_MSG("Could not join thread", i, MAX_NUM_WORKERS);
                else
                {
                    // (3) convert pRetVal to target type
                    WorkerId* wid = (WorkerId*) pRetVal;
                    printf("[main]\tWorker N.%lu has returned!\n", wid->id);
                }
            }
            return 0;
        }
    
        // one possible print
        [main]  Creating workers...
        [main]  Joining the workers...
        [1]  Hello, World!
        [0]  Hello, World!
        [3]  Hello, World!
        [main]  Worker N.0 has returned!
        [main]  Worker N.1 has returned!
        [2]  Hello, World!
        [main]  Worker N.2 has returned!
        [main]  Worker N.3 has returned!
    

    2 Incrementing Global Counter: mutex

        n 个 workerRoutine thread 均 并发 增加 1个 global couter
    
        // ======1 globalSum.h
        #ifndef GLOBAL_SUM_H
        #define GLOBAL_SUM_H
    
        #include <stdio.h>
        #include <stdlib.h>
        #include <pthread.h>
        #include <pthread.h>
        // #include "utils.h"
        typedef struct globalSumTag
        {
            unsigned long *pValue;
            pthread_mutex_t *pMutex;
        } globalSum;
    
        #endif // GLOBAL_SUM_H
        
        // ======2 IncreaseGlobalCounter.char
        #include "globalSum.h"
        #define MAX_NUM_WORKERS 20UL
    
        typedef unsigned long ulong_t;
    
        void* sum(void* arg) 
        {
            globalSum* gs = (globalSum*) arg;
            
            pthread_mutex_lock ( gs->pMutex );   /* Critical section starts here */
            ++*gs->pValue; // ++*(gs->pValue)
            pthread_mutex_unlock ( gs->pMutex ); /* Critical section ends here */
            
            return NULL;
        }
        int main(void) 
        {
            pthread_t threads [ MAX_NUM_WORKERS ];
            globalSum gSs [ MAX_NUM_WORKERS ];
            
            // (1) Note: "global" counter: calling thread stack var's ptr pass to newly created thread's start routine
            //      => stack var pass by ref => simulate global var 
            ulong_t counter = 0;
            pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
            
            // (2) create n threads 
            for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i) 
            {
                // 1) every thread's internal ptr point to global counter 
                gSs[i] = (globalSum){ .pValue = &counter, .pMutex = &mut };
                
                pthread_create(&threads[i], NULL, sum, &gSs[i]);
            }
            
            // (3) join
            for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
                pthread_join(threads[i], NULL);
            
            // (4) output global counter's value
            printf("%lu threads were running. Sum final value: %lu\n", MAX_NUM_WORKERS, counter);
            
            return 0;
        }
        
        $ ./a
        20 threads were running. Sum final value: 20
    

    3 Reacting on Specific Events & selfDefined barrierWithCv

        Barrier 
            思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进
            
            内部结构 
                typedef struct BarrierTag 
                {
                    pthread_mutex_t *pMutex;
                    pthread_cond_t  *pCond;
                    ulong_t         *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数, 
                } Barrier;
    
        // ======1 barrier.h
        #ifndef BARRIER_H
        #define BARRIER_H
    
        #include <pthread.h>
    
        #define SET_BARRIER_MSG(...) \
            snprintf(buffer, sizeof(buffer), __VA_ARGS__)
    
        #define NOT_LAST_TO_REACH \
            "[%lu]\tI’m NOT the last one to reach the barrier!"
            
        #define LAST_TO_REACH \
            "[%lu]\tI am the last to reach the barrier! Waking up the others."
            
        #define MAX_NUM_WORKERS 3
    
        typedef unsigned long ulong_t;
    
        // Note Barrier 思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进 
        typedef struct BarrierTag 
        {
            pthread_mutex_t *pMutex;
            pthread_cond_t  *pCond;
            ulong_t         *pCount; // threadNumOnBarrier: Barrier 上要挡住的 线程数, 
        } Barrier;
    
        typedef struct context_s 
        {
            Barrier* pBarrier;
            ulong_t id;
        } Context;
            
        #endif // BARRIER_H
        
        // ======2 barrier.c
        #include "barrier.h"
        #include <stdio.h>
    
        void *workerRoutine(void *pContext) 
        {
            char buffer[81];
            
            // (1) 取 arg
            Context *pC = (Context*) pContext;
            
            printf("[%lu]\tReaching the barrier...\n", pC->id);
            
            // ======= barrier start
            pthread_mutex_lock ( pC->pBarrier->pMutex ); // === (2) lock 
            
            // (3) Barrier 上 阻塞的 线程数 --
            --*pC->pBarrier->pCount;
            
            // (4) current thread is not the last to reach Barrier
            // print + cv.wait
            printf("*pC->pBarrier->pCount = %lu\n", *pC->pBarrier->pCount);
            if (*pC->pBarrier->pCount > 0) 
            {
                SET_BARRIER_MSG(NOT_LAST_TO_REACH, pC->id);
                pthread_cond_wait ( pC->pBarrier->pCond, pC->pBarrier->pMutex ); // === unlock & relock
            } 
            else // ... the last: print
            {
                SET_BARRIER_MSG(LAST_TO_REACH, pC->id);
            }
            
            puts(buffer); // Note: buffer 在 SET_BARRIER_MSG 中被 fill
            
            pthread_mutex_unlock ( pC->pBarrier->pMutex ); // === (5) unlock
            
            // (6) wake up others
            pthread_cond_broadcast( pC->pBarrier->pCond ); 
            // ====== barrier end 
            
            printf("[%lu]\tAfter the barriern\n", pC->id);
            
            return NULL;
        }
            
        
        // ======3 barrierWithCv.c
        #include "barrier.h"
        void *workerRoutine(void *pContext);
    
        int main(void) 
        {
            pthread_t threads [ MAX_NUM_WORKERS ];
            Context contexts [ MAX_NUM_WORKERS ];
            
            pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
            pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
            
            ulong_t count = MAX_NUM_WORKERS;
            
            // (1) 建 + init a Barrier
            Barrier barrier = {.pMutex = &mut, .pCond = &cond, .pCount = &count};
                
            // (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
            for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
            {
                contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
                
                pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
            }
            
            // (2) join
            for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
                pthread_join(threads[i], NULL);
            
            return 0;
        }
    
        $ gcc barrierWithCv.c barrier.c -o a -pthread
        $ ./a
        [1] Reaching the barrier...
        *pC->pBarrier->pCount = 2
        [0] Reaching the barrier...
        *pC->pBarrier->pCount = 1
        [2] Reaching the barrier...
        *pC->pBarrier->pCount = 0
        [2] I am the last to reach the barrier! Waking up the others.
        [1] I’m NOT the last one to reach the barrier!
        [1] After the barriern
        [0] I’m NOT the last one to reach the barrier!
        [0] After the barriern
        [2] After the barriern
    

    4 Creating Barriers More Easily

    理解了 part3 中 Barrier 的实现, 再 用 pthread 中的 pthread_barrier_t 就很容易 
        
        相当于 将 part3 中 Barrier 相关部分封装
    

    4.1 重构 3: 封装 -> 实现出 简化版 barrier 的 数据结构 + 3个函数

        // ====== 1 pthreadBarrier.h 
        #ifndef PTHREADBARRIER_H
        #define PTHREADBARRIER_H
    
        #include <pthread.h>
    
        /*
        #define SET_BARRIER_MSG(...) \
            snprintf(buffer, sizeof(buffer), __VA_ARGS__)
    
        #define NOT_LAST_TO_REACH \
            "[%lu]\tI’m NOT the last one to reach the barrier!"
            
        #define LAST_TO_REACH \
            "[%lu]\tI am the last to reach the barrier! Waking up the others."
        */
    
        typedef unsigned long ulong_t;
    
        // Note Barrier 思想: 直到 the last thread 到达 Barrier, 才 唤醒 所有 thread 继续行进 
        typedef struct BarrierTag 
        {
            pthread_mutex_t *pMutex;
            pthread_cond_t  *pCond;
            ulong_t         count; // threadNumOnBarrier: Barrier 上要挡住的 线程数, 
        } Barrier;
    
        void barrier_init(Barrier* pBarrier, void *, ulong_t count);
        void barrier_destroy(Barrier* pBarrier);
        void barrier_wait(Barrier* pBarrier);
            
        #endif // BARRIER_H
    
        // ====== 2 pthreadBarrier.c 
        #include "pthreadBarrier.h"
        #include <stdio.h>
        #include <stdlib.h>
    
        void barrier_init(Barrier* pBarrier, void *pMutexCvStruct, ulong_t count)
        {
            if(pMutexCvStruct == NULL)
            {
                // heap 内存: 因为 heap 在 other func 也要用
                pthread_mutex_t* pMutex = malloc( sizeof(pthread_mutex_t) );
                pBarrier->pMutex = pMutex;
                pthread_mutex_init(pBarrier->pMutex, NULL); // (1) 动态初始化 动态 mutex
                
                pthread_cond_t* pCond = malloc( sizeof(pthread_cond_t) );
                pBarrier->pCond = pCond;
                pthread_cond_init(pBarrier->pCond, NULL);  // (2) 动态初始化 动态 mutex
            }
            else
            {
                /* */
            }
            
            pBarrier->count = count;
        }
    
        void barrier_destroy(Barrier* pBarrier)
        {
            pthread_cond_destroy(pBarrier->pCond);
            pthread_mutex_destroy(pBarrier->pMutex);
        }
    
        void barrier_wait(Barrier* pBarrier)
        {
            // === (1) lock 
            pthread_mutex_lock ( pBarrier->pMutex ); 
            
            // (2) Barrier 上 阻塞的 线程数 --
            --pBarrier->count;
            
            // (3) current thread is not the last to reach Barrier
            // print + cv.wait
            printf("pBarrier->count = %lu\n", pBarrier->count);
            if (pBarrier->count > 0) 
            {
                //SET_BARRIER_MSG(NOT_LAST_TO_REACH, id);
                pthread_cond_wait ( pBarrier->pCond, pBarrier->pMutex ); // === unlock & relock
            } 
            else // ... the last: print
            {
                // SET_BARRIER_MSG(LAST_TO_REACH, id);
            }
            
            // puts(buffer); // Note: buffer 在 SET_BARRIER_MSG 中被 fill
            
            // === (4) unlock
            pthread_mutex_unlock ( pBarrier->pMutex ); 
            
            // (5) wake up others (thread)
            pthread_cond_broadcast( pBarrier->pCond ); 
        }
    
        // ====== 3 barrier.h
        #ifndef BARRIER_H
        #define BARRIER_H
    
        #include "pthreadBarrier.h"
    
        #define MAX_NUM_WORKERS 3
    
        typedef struct ContextTag 
        {
            Barrier* pBarrier;
            ulong_t id;
        } Context;
            
        #endif // BARRIER_H
    
        // ====== 4 barrier.c
        #include "barrier.h"
        #include <stdio.h>
    
        void *workerRoutine(void *pContext) 
        {
            // char buffer[81];
            
            // (1) 取 arg
            Context *pC = (Context*) pContext;
            
            printf("[%lu]\tReaching the barrier...\n", pC->id);
            
            // ======= barrier start
            barrier_wait(pC->pBarrier);
            // ====== barrier end 
            
            printf("[%lu]\tAfter the barriern\n", pC->id);
            
            return NULL;
        }
    
        // ====== 5 barrierWithCv.c
        #include "barrier.h"
    
        void *workerRoutine(void *pContext);
    
        int main(void) 
        {
            pthread_t threads [ MAX_NUM_WORKERS ];
            Context contexts [ MAX_NUM_WORKERS ];
            ulong_t count = MAX_NUM_WORKERS;
            
            // (0) 建 Barrier 
            Barrier barrier;
    
            // (1) init Barrier
            barrier_init(&barrier, NULL, count);
            
            // (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
            for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
            {
                contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
                
                pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
            }
            
            // (3) join
            for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
                pthread_join(threads[i], NULL);
            
            // (4)
            barrier_destroy(&barrier);
            
            return 0;
        }
                
    
        $ gcc barrierWithCv.c barrier.c pthreadBarrier.c -o a -pthread
        $ ./a
        [2] Reaching the barrier...
        pBarrier->count = 2
        [1] Reaching the barrier...
        pBarrier->count = 1
        [0] Reaching the barrier...
        pBarrier->count = 0
        [0] After the barriern
        [2] After the barriern
        [1] After the barriern
    
    

    4.2 用 pthread 提供的 barrier

        只需把 4.1 中 pthreadBarrier.h /pthreadBarrier.c 去掉, 改用 pthread 提供的 barrier
            效果 与 4.1 相同 
            
        int pthread barrier init( pthread barrier t restrict* barrier, const
                                    pthread barrierattr t *restrict attr, unsigned count )  
        int pthread barrier destroy( pthread barrier t restrict* barrier )
        int pthread barrier wait( pthread barrier t restrict* barrier )
        
        // ====== 1  barrier.h
        #ifndef BARRIER_H
        #define BARRIER_H
    
        #include <pthread.h> // #include "pthreadBarrier.h"
    
        #define MAX_NUM_WORKERS 3
    
        typedef unsigned long ulong_t;
    
        typedef struct ContextTag 
        {
            pthread_barrier_t *pBarrier; // Barrier* pBarrier;
            ulong_t id;
        } Context;
            
        #endif // BARRIER_H
    
        // ====== 2  barrier.c
        #include "barrier.h"
        #include <stdio.h>
    
        void *workerRoutine(void *pContext) 
        {
            // char buffer[81];
            
            // (1) 取 arg
            Context *pC = (Context*) pContext;
            
            printf("[%lu]\tReaching the barrier...\n", pC->id);
            
            // ======= barrier start
            pthread_barrier_wait(pC->pBarrier);
            // ====== barrier end 
            
            printf("[%lu]\tAfter the barriern\n", pC->id);
            
            return NULL;
        }
    
        // ====== 3  barrierWithCv.c
        #include "barrier.h"
    
        void *workerRoutine(void *pContext);
    
        int main(void) 
        {
            pthread_t threads [ MAX_NUM_WORKERS ];
            Context contexts [ MAX_NUM_WORKERS ];
            ulong_t count = MAX_NUM_WORKERS;
            
            // (0) 建 Barrier 
            pthread_barrier_t barrier; // Barrier barrier;
    
            // (1) init Barrier
            pthread_barrier_init(&barrier, NULL, count); // barrier_init(&barrier, NULL, count);
            
            // (2) create n threads: 所有 thread's arg 的 pBarrier 均指向 该 barrier
            for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
            {
                contexts[i] = (Context){ .pBarrier = &barrier, .id = i };
                
                pthread_create(&threads[i], NULL, workerRoutine, &contexts[i]);
            }
            
            // (3) join
            for (ulong_t i = 0; i < MAX_NUM_WORKERS; ++i)
                pthread_join(threads[i], NULL);
            
            // (4)
            pthread_barrier_destroy(&barrier); // barrier_destroy(&barrier);
            
            return 0;
        }
    

    相关文章

      网友评论

          本文标题:pthreads-tutorial

          本文链接:https://www.haomeiwen.com/subject/quztyrtx.html