美文网首页
线程的同步

线程的同步

作者: 3e1094b2ef7b | 来源:发表于2017-05-23 21:46 被阅读0次

    一、互斥量

    1、为什么要使用互斥量

    当多个线程共享相同的内存时,需要每一个线程看到相同的视图。当一个线程修改变量时,而其他线程也可以读取或者修改这个变量,就需要对这些线程同步,确保他们不会访问到无效的变量。

    在变量修改时间多于一个存储器访问周期的处理器结构中,当存储器的读和写这两个周期交叉时,这种潜在的不一致性就会出现。当然这与处理器相关,但是在可移植的程序中并不能对处理器做出任何假设。

    2、互斥锁的初始化和销毁

    为了让线程访问数据不产生冲突,这要就需要对变量加锁,使得同一时刻只有一个线程可以访问变量。互斥量本质就是锁,访问共享资源前对互斥量加锁,访问完成后解锁
    当互斥量加锁以后,其他所有需要访问该互斥量的线程都将阻塞
    当互斥量解锁以后,所有因为这个互斥量阻塞的线程都将变为就绪态,第一个获得cpu的线程会获得互斥量,变为运行态,而其他线程会继续变为阻塞,在这种方式下访问互斥量每次只有一个线程能向前执行

    互斥量用pthread_mutex_t类型的数据表示,在使用之前需要对互斥量初始化
    1、如果是动态分配的互斥量,可以调用pthread_mutex_init()函数初始化
    2、如果是静态分配的互斥量,还可以把它置为常量PTHREAD_MUTEX_INITIALIZER
    3、动态分配的互斥量在释放内存之前需要调用pthread_mutex_destroy()

    int pthread_mutex_init(pthread_mutex_t *restrict mutex,
          const pthread_mutexattr_t *restrict attr);
    
    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    

    3、加锁和解锁

    加锁:

    int pthread_mutex_lock(pthread_mutex_t *mutex); 
    成功返回0,失败返回错误码。如果互斥量已经被锁住,那么会导致该线程阻塞
    int pthread_mutex_trylock(pthread_mutex_t *mutex);
    成功返回0,失败返回错误码。如果互斥量已经被锁住,不会导致线程阻塞
    

    解锁:

    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    成功返回0,失败返回错误码。如果一个互斥量没有被锁住,那么解锁就会出错
    

    二、读写锁

    1、什么是读写锁?

    读写锁与互斥量类似,不过读写锁有更高的并行性。互斥量要么加锁要么不加锁,而且同一时刻只允许一个线程对其加锁。对于一个变量的读取,完全可以让多个线程同时进行操作。

    pthread_rwlock_t     rwlock
    

    读写锁有三种状态,读模式下加锁,写模式下加锁,不加锁。一次只有一个线程可以占有写模式下的读写锁,但是多个线程可以同时占有读模式的读写锁

    读写锁在写加锁状态时,在它被解锁之前,所有试图对这个锁加锁的线程都会阻塞。读写锁在读加锁状态时,所有试图以读模式对其加锁的线程都会获得访问权,但是如果线程希望以写模式对其加锁,它必须阻塞直到所有的线程释放锁。

    当读写锁以读模式加锁时,如果有线程试图以写模式对其加锁,那么读写锁会阻塞随后的读模式锁请求。这样可以避免读锁长期占用,而写锁达不到请求。

    读写锁非常适合对数据结构读次数大于写次数的程序,当它以读模式锁住时,是以共享的方式锁住的;当它以写模式锁住时,是以独占的模式锁住的。

    2、读写锁的初始化和销毁

    读写锁在使用之前必须初始化:

    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
             const pthread_rwlockattr_t *restrict attr);
    

    使用完需要销毁:

    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); 
    成功返回0 ,失败返回错误码
    

    3、加锁和解锁

    读模式加锁:

    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
    

    写模式加锁:

    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
    

    解锁:

    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
    

    成功返回0,错误返回错误码。


    三、条件变量

    1、条件变量的引入

    一个典型的实例:
    在一条生产先线上有一个仓库,当生产者生产的时候需要锁住仓库独占,而消费者取产品的时候也要锁住仓库独占。如果生产者发现仓库满了,那么他就不能生产了,变成了阻塞状态。但是此时由于生产者独占仓库,消费者又无法进入仓库去消耗产品,这样就造成了一个僵死状态。

    我们需要一种机制,当互斥量被锁住以后发现当前线程还是无法完成自己的操作,那么它应该释放互斥量,让其他线程工作。

    1. 可以采用轮询的方式,不停的查询你需要的条件
    2. 让系统来帮你查询条件,使用条件变量pthread_cond_t cond

    2、条件变量的初始化和销毁

    静态初始化:
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    
    动态初始化:
    int pthread_cond_init(pthread_cond_t *restrict cond,
             const pthread_condattr_t *restrict attr);
    默认属性为空NULL
    
    条件变量使用完成之后需要销毁
    int pthread_cond_destroy(pthread_cond_t *cond);
    

    3、条件变量的使用

    条件变量使用需要配合互斥量使用:

    int pthread_cond_wait(pthread_cond_t *restrict cond,
         pthread_mutex_t *restrict mutex);
    

    1、使用pthread_cond_wait等待条件变为真。传递给pthread_cond_wait的互斥量对条件进行保护,调用者把锁住的互斥量传递给函数。
    2、这个函数将线程放到等待条件的线程列表上,然后对互斥量进行解锁,这是个原子操作。
    当条件满足时这个函数返回,返回以后继续对互斥量加锁。

    int pthread_cond_timedwait(pthread_cond_t *restrict cond,
         pthread_mutex_t *restrict mutex,
         const struct timespec *restrict abstime);
    

    这个函数与pthread_cond_wait类似,只是多一个timeout,如果到了指定的时间条件还不满足,那么就返回。
    时间用下面的结构体表示:

    struct timespec{
        time_t tv_sec;
        long tv_nsec;
     };
    

    注意,这个时间是绝对时间。例如你要等待3分钟,就要把当前时间加上3分钟然后转换到timespec,而不是直接将3分钟转换到timespec

    当条件满足的时候,需要唤醒等待条件的线程:

    int pthread_cond_broadcast(pthread_cond_t *cond);
    // pthread_cond_broadcast唤醒等待条件的所有线程
    int pthread_cond_signal(pthread_cond_t *cond);
    // pthread_cond_signal至少唤醒等待条件的某一个线程
    

    注意,一定要在条件改变以后再唤醒线程。

    典型例子:生产者与消费者。

    /*
     *DESCRIPTION: 生产者与消费者问题
     */
    
    #include <pthread.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <string.h>
    #include <signal.h>
    #include <errno.h>
    
    #define BUFFER_SIZE         5       //产品库存大小  
    #define PRODUCT_CNT         30      //产品生产总数   
    
    struct product_cons  
    {  
        int buffer[BUFFER_SIZE];  //生产产品值  
        pthread_mutex_t lock;     //互斥锁 volatile int  
        int readpos, writepos;    //读写位置  
        pthread_cond_t notempty;  //条件变量,非空  
        pthread_cond_t notfull;   //非满  
    }buffer;  
    
    void init(struct product_cons *p)  
    {  
        pthread_mutex_init(&p->lock, NULL);     //互斥锁  
        pthread_cond_init(&p->notempty, NULL);  //条件变量  
        pthread_cond_init(&p->notfull, NULL);   //条件变量  
        p->readpos = 0;                         //读写位置  
        p->writepos = 0;  
    }  
    
    void finish(struct product_cons *p)  
    {  
        pthread_mutex_destroy(&p->lock);     //互斥锁  
        pthread_cond_destroy(&p->notempty);  //条件变量  
        pthread_cond_destroy(&p->notfull);   //条件变量  
        p->readpos = 0;                      //读写位置  
        p->writepos = 0;  
    }  
    
    
    //存储 一个数据 到 bufferr  
    void put(struct product_cons *p, int data) //输入产品子函数  
    {
        pthread_mutex_lock(&p->lock);
        if((p->writepos+1)%BUFFER_SIZE == p->readpos)
        {
            printf("producer wait for not full\n");
            pthread_cond_wait(&p->notfull, &p->lock);
        }
    
        p->buffer[p->writepos] = data;
        p->writepos ++;
    
        if(p->writepos >= BUFFER_SIZE)
            p->writepos = 0;
    
        pthread_cond_signal(&p->notempty);
        pthread_mutex_unlock(&p->lock);
    }  
    
    //读,移除 一个数据 从 buffer  
    int get(struct product_cons *p)  
    {
        int data;
    
        pthread_mutex_lock(&p->lock);
    
        if(p->readpos == p->writepos)
        {
            printf("consumer wait for not empty\n");
            pthread_cond_wait(&p->notempty, &p->lock);
        }
    
        data = p->buffer[p->readpos];
        p->readpos++;
    
        if(p->readpos >= BUFFER_SIZE)
            p->readpos = 0;
    
        pthread_cond_signal(&p->notfull);
    
        pthread_mutex_unlock(&p->lock);
    
        return data;
    }  
    
    void *producer(void *data) //子线程 ,生产
    {
        int n;  
        for(n = 1; n <= 50; ++n) //生产 50 个产品  
        {  
            sleep(1);  
            printf("put the %d product ...\n", n);
            put(&buffer,n);  
            printf("put the %d product success\n", n);
        }  
    
        printf("producer stopped\n");  
    
        return NULL;  
    }  
    
    void *consumer(void *data)  
    {  
        static int cnt = 0;  
        int num;
        while(1)  
        {  
            sleep(2);  
            printf("get  product ...\n");  
            num = get(&buffer);
            printf("get the %d product success\n", num);  
            if(++cnt == PRODUCT_CNT)  
                break;  
        }  
    
        printf("consumer stopped\n");  
        return NULL;  
    }  
    
    int main(int argc, char *argv[])  
    {  
        pthread_t th_a,th_b;
        void *retval;  
    
        init(&buffer);  
    
        pthread_create(&th_a, NULL, producer, 0);  
        pthread_create(&th_b, NULL, consumer, 0);  
    
        pthread_join(th_a, &retval);   // 等待新线程的结束
        pthread_join(th_b, &retval);  
    
        finish(&buffer);  
    
        return 0;  
    }

    相关文章

      网友评论

          本文标题:线程的同步

          本文链接:https://www.haomeiwen.com/subject/dfigxxtx.html