美文网首页
ffplay.c 源码分析- 队列操作

ffplay.c 源码分析- 队列操作

作者: deep_sadness | 来源:发表于2018-11-26 17:27 被阅读46次

    前两遍文章,我们分析了视频部分和音频播放。其中包含的队列操作,还是让人迷惑。
    这边文章,就主要来梳理一下队列操作。

    PacketQueue

    定义

    在FFmpeg当中已经定义了一个AVPackList
    avformat.h中。

    typedef struct AVPacketList {
        AVPacket pkt;
        struct AVPacketList *next;
    } AVPacketList;
    

    但是这个AVPacketList,需要的serial,所以就自己定义。

    typedef struct MyAVPacketList {
        AVPacket pkt;
        struct MyAVPacketList *next;
        //操作数
        int serial;
    } MyAVPacketList;
    

    再次包装的PacketQueue,维持一些锁和统计变量。

    1. 因为多个线程(读取线程和解码线程)都需要对PacketQueue进行操作。所以需要有锁。
    2. 还需要统计当前的packet的数量。后面配合锁,做生产者和消费者的经典多线程模式。
    typedef struct PacketQueue {
        MyAVPacketList *first_pkt, *last_pkt;
        int nb_packets;
        int size;
        int64_t duration;
        int abort_request;
        int serial;
        SDL_mutex *mutex;
        SDL_cond *cond;
    } PacketQueue;
    

    使用

    初始化

    • 初始化
      初始化需要对锁进行初始化的操作。
      并且对队列的状态abort_request,进行修改。初始化之后的队列默认是不可用的。
    /* packet queue handling */
    static int packet_queue_init(PacketQueue *q)
    {
        // 重置整个队列对象
        memset(q, 0, sizeof(PacketQueue));
        //创建锁
        q->mutex = SDL_CreateMutex();
        if (!q->mutex) {
            av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
            return AVERROR(ENOMEM);
        }
        q->cond = SDL_CreateCond();
        if (!q->cond) {
            av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
            return AVERROR(ENOMEM);
        }
        q->abort_request = 1;
        return 0;
    }
    
    • 启动队列
      修改abort_request ,队列就可以开始工作了。
    static void packet_queue_start(PacketQueue *q)
    {
        SDL_LockMutex(q->mutex);
        q->abort_request = 0;
        packet_queue_put_private(q, &flush_pkt);
        SDL_UnlockMutex(q->mutex);
    }
    

    反初始化

    • flush方法
      flush方法基本就是clear方法。会将当前PacketQueue当中的数据全部清空。
      在操作数修改,或者结束的时候,会进行改操作。
    static void packet_queue_flush(PacketQueue *q)
    {
        MyAVPacketList *pkt, *pkt1;
    
        SDL_LockMutex(q->mutex);
        for (pkt = q->first_pkt; pkt; pkt = pkt1) {
            pkt1 = pkt->next;
            av_packet_unref(&pkt->pkt);
            av_freep(&pkt);
        }
        q->last_pkt = NULL;
        q->first_pkt = NULL;
        q->nb_packets = 0;
        q->size = 0;
        q->duration = 0;
        SDL_UnlockMutex(q->mutex);
    }
    
    • destroy方法
      释放的方法,先把队列flush ,同时释放我们的锁资源
    static void packet_queue_destroy(PacketQueue *q)
    {
        packet_queue_flush(q);
        SDL_DestroyMutex(q->mutex);
        SDL_DestroyCond(q->cond);
    }
    
    • 抛弃队列
      就是将队列的状态修改成不可用。
    static void packet_queue_abort(PacketQueue *q)
    {
        SDL_LockMutex(q->mutex);
    
        q->abort_request = 1;
    
        SDL_CondSignal(q->cond);
    
        SDL_UnlockMutex(q->mutex);
    }
    

    基本操作

    入列
    1. packet_queue_put就是简单的提供了一个线程同步的功能。
      可以看到,这里的入列的操作,是整体都加锁的。
    2. 入列的主体是在packet_queue_put_private方法当中。
      主要完成的任务是将传入的AVPacket包装成MyAVPacket,并放入队列当中。修改队列的头指针和尾指针,和队列的统计参数。然后通知条件锁cond解锁。(因为队列为空,出列时会用条件锁锁住)
    3. 这里值得注意的是,关于操作数的修改。可以看到如果放入的是flush_pkt的话,就会提高操作数。(在快进,或者开始播放时,会先放入一个flush_pkt,记录增加当前的操作数)
    static int packet_queue_put(PacketQueue *q, AVPacket *pkt)
    {
        int ret;
        //为了保证线程安全。锁住
        SDL_LockMutex(q->mutex);
        //真正的操作,在packet_queue_put_private方法中
        ret = packet_queue_put_private(q, pkt);
        SDL_UnlockMutex(q->mutex);
    
        if (pkt != &flush_pkt && ret < 0)
            av_packet_unref(pkt);
    
        return ret;
    }
    
    static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
    {
        //声明一个新的MyAVPacketList
        MyAVPacketList *pkt1;
    
        if (q->abort_request)
           return -1;
    
        pkt1 = av_malloc(sizeof(MyAVPacketList));
        if (!pkt1)
            return -1;
        //确定其变量
        pkt1->pkt = *pkt;
        pkt1->next = NULL;
        if (pkt == &flush_pkt)
            q->serial++;
        pkt1->serial = q->serial;
        
        //放置指针,并记录first_pkt 和 last_pkt
        if (!q->last_pkt)
            q->first_pkt = pkt1;
        else
            q->last_pkt->next = pkt1;
        q->last_pkt = pkt1;
        q->nb_packets++;
        q->size += pkt1->pkt.size + sizeof(*pkt1);
        q->duration += pkt1->pkt.duration;
        /* XXX: should duplicate packet data in DV case */
        SDL_CondSignal(q->cond);
        return 0;
    }
    

    这里提供了一个便捷的方法,把nullpacket放入队列当中。这是在EOF(就是读取文件流结束)之后,才会进行的。

    static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index)
    {
        AVPacket pkt1, *pkt = &pkt1;
        av_init_packet(pkt);
        pkt->data = NULL;
        pkt->size = 0;
        pkt->stream_index = stream_index;
        return packet_queue_put(q, pkt);
    }
    
    出列
    1. 出列提供了是否阻塞的选择,如果是阻塞的话,就会等待队列中有数据时,继续读取
    2. 出列的操作同样是全部锁住的。
    /* return < 0 if aborted, 0 if no packet and > 0 if packet.  */
    static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
    {
        MyAVPacketList *pkt1;
        int ret;
    
        SDL_LockMutex(q->mutex);
        
        //取不到数据时,如果是阻塞的,则会继续等待循环,如果不是阻塞的,就会直接跳出循环
        for (;;) {
            if (q->abort_request) {
                ret = -1;
                break;
            }
    
            pkt1 = q->first_pkt;
            if (pkt1) {
                q->first_pkt = pkt1->next;
                if (!q->first_pkt)
                    q->last_pkt = NULL;
                q->nb_packets--;
                q->size -= pkt1->pkt.size + sizeof(*pkt1);
                q->duration -= pkt1->pkt.duration;
                *pkt = pkt1->pkt;
                if (serial)
                    *serial = pkt1->serial;
                av_free(pkt1);
                ret = 1;
                break;
            } else if (!block) {
                ret = 0;
                break;
            } else {
                SDL_CondWait(q->cond, q->mutex);
            }
        }
        SDL_UnlockMutex(q->mutex);
        return ret;
    }
    
    

    小结

    1. 两种特殊的packet(flush_pktnullpacket)。flush_pkt对应serial操作数,记录前后的数据是否是连续的(开始和快进时会丢入flush_pkt);nullpacket对应文件流的末尾。

    2. 多线程同步。在PacketList的每一个操作,都是线程同步的,都用互斥锁给锁住了。
      读取数据出列时,如果队列为空,而且需要阻塞,则会继续等待入列后才会取出。

    3. 基本操作packet_queue_put把数据放入尾部。packet_queue_get取出头部数据。

    4. 方法汇总
      packet_queue_init:初始化
      packet_queue_destroy:销毁
      packet_queue_start:启用
      packet_queue_abort:中止
      packet_queue_get:获取一个节点
      packet_queue_put:存入一个节点
      packet_queue_put_nullpacket:存入一个空节点
      packet_queue_flush:清除队列内所有的节点

    FrameQueue

    FrameQueue 对比PacketList相对复杂一些

    1. 他和PacketList不同,用的是数组的方式。所以他的长度是固定的。
    2. 它是通过两个角标来控制,进行循环入列和循环出列。一个是rindex负责读。一个是windex负责写。通过size来判断,当前数组中的剩余。
      当需要写的时候,先通过windex判断是否还有可以写的部分,有的话,就会返回一个可写的数据。使用完,需要将windex+1. 当Size满了。就不会返回数据了。
      当需要读的时候,会通过rindex返回当前的数据。

    定义

    重新自定义一个Frame结构体。除了包含有AVFrame外,它还包含了一些其他的字段。

    1. 视频相关的长宽格式,翻转参数flip_v,是否显示uploadedpts,duration等。
    2. 因为有考虑字幕,包括 AVSubtitle 等参数。
    3. 同样会记录操作数serial,保证数据是最新需要的。
    typedef struct Frame {
        AVFrame *frame;
        AVSubtitle sub;
        int serial;
        double pts;           /* presentation timestamp for the frame */
        double duration;      /* estimated duration of the frame */
        int64_t pos;          /* byte position of the frame in the input file */
        int width;
        int height;
        int format;
        AVRational sar;
        int uploaded;
        int flip_v;
    } Frame;
    

    定义一个FrameQueue结构体。

    1. 如上概述。使用了数组的方式,通过两个winde,rindex,进行循环出列和入列。
    2. 还会通过keep_last来判断,队列中是否要保留一帧的数据。通过rindex_shown来辅助进行读取(因为keep_last时,rindex_shown为1)。
    3. 持有对应的PacketQueue,对队列的状态进行判断,如果队列进入abort状态了,就不会继续去解码入列了。
    4. 因为同样有多线程操作队列,需要锁。
    typedef struct FrameQueue {
        Frame queue[FRAME_QUEUE_SIZE];
        //我觉得最关键的三个变量。来帮助读写的。 
        int rindex;
        int windex;
        int size;
        //最大的size
        int max_size;
    
        //下面这两个变量来判断,队列中是否需要保留最后一个
        int keep_last;
        int rindex_shown;
    
        //当前的队列的锁
        SDL_mutex *mutex;
        SDL_cond *cond;
        
        //如果PacketQueue的状态不可用的话,它也无法取出数据
        PacketQueue *pktq;
    } FrameQueue;
    

    使用

    初始化

    1. 初始化锁
    2. 对队列中的AVFrame初始化。和PacketQueue不同的是,这里使用的AVFrame都是一开始准备好的,也是有限的。
    3. 记录keep_last的状态
    static int frame_queue_init(FrameQueue *f, PacketQueue *pktq, int max_size, int keep_last)
    {
        int i;
        //初始化的时候,都是用memset的方式,将变量重置
        memset(f, 0, sizeof(FrameQueue));
        //创建锁资源
        if (!(f->mutex = SDL_CreateMutex())) {
            av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
            return AVERROR(ENOMEM);
        }
        if (!(f->cond = SDL_CreateCond())) {
            av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
            return AVERROR(ENOMEM);
        }
        //保存对应的packet_queue
        f->pktq = pktq;
        f->max_size = FFMIN(max_size, FRAME_QUEUE_SIZE);
        //判断是否要保留最后一个。ffplay中的音频和视频,都需要保留最后一个。
        //其实这里的判断是keep_last != 0
        f->keep_last = !!keep_last;
        //初始化数组中的frame,因为这些frame最后,都是我们提供来用的,所以要实现初始化好。
        for (i = 0; i < f->max_size; i++)
            if (!(f->queue[i].frame = av_frame_alloc()))
                return AVERROR(ENOMEM);
        return 0;
    }
    

    反初始化

    • 销毁单个
    static void frame_queue_unref_item(Frame *vp)
    {
        av_frame_unref(vp->frame);
        avsubtitle_free(&vp->sub);
    }
    
    
    • 全部销毁
      将内部的frame和锁释放
    static void frame_queue_destory(FrameQueue *f)
    {
        int i;
        for (i = 0; i < f->max_size; i++) {
            Frame *vp = &f->queue[i];
            frame_queue_unref_item(vp);
            av_frame_free(&vp->frame);
        }
        SDL_DestroyMutex(f->mutex);
        SDL_DestroyCond(f->cond);
    }
    

    基本操作

    入列
    完整流程
    1. 先通过frame_queue_peek_writable 得到一个可以写的Frame
    2. 对这个Frame进行操作
    3. 通过frame_queue_push,让windex偏移到下一位。后续可得到下一个Frame。同时会记录当前的size 。当size到达max_size之后,下次读取frame_queue_peek_writable就需要等待size再次小于max_size
    static Frame *frame_queue_peek_writable(FrameQueue *f)
    {
        /* wait until we have space to put a new frame */
        SDL_LockMutex(f->mutex);
        //等待。直到可以取
        while (f->size >= f->max_size &&
               !f->pktq->abort_request) {
            SDL_CondWait(f->cond, f->mutex);
        }
        SDL_UnlockMutex(f->mutex);
        //可以取到了。这个时候,会接触这个锁?这里的锁,只有当index发生改变的时候,才会锁。  
    
        if (f->pktq->abort_request)
            return NULL;
        
        //直接返回f->windex
        return &f->queue[f->windex];
    }
    

    得到一个可以操作的AVFrame 等我们操作完。之后要调用 frame_queue_push将,角标进行移动。注意这个时候移动的是windex.

    static void frame_queue_push(FrameQueue *f)
    {
        //windex 的移动没有加锁。
        if (++f->windex == f->max_size)
            f->windex = 0;
        //对size进行加锁了。因为size 会影响取的时候的阻塞。
        SDL_LockMutex(f->mutex);
        f->size++;
        SDL_CondSignal(f->cond);
        SDL_UnlockMutex(f->mutex);
    }
    
    其他注意
    1. 锁的操作。
      这里只有对size的写入操作才会进行加锁。因为windex的写入操作都是在解码线程当中。不涉及到多线程的操作,所以不用锁。而size的写入操作在解码线程和视频播放线程都设计。所以需要做线程同步。
    2. PacketQueue队列状态的判断
      PacketQueue处于abort_request状态时,也不能取出可写的Frame
    出列

    音频和视频出列的方式有所不同。
    音频采用的方式是阻塞的方式进行读取,如果当前队列中没有数据,会进行等待到队列数据写入。
    而视频采取的方式是不阻塞的读取,如果没有数据,则继续会显示上一次的数据。等待下一次循环进入读取。

    阻塞的读取(音频)
    • 完整流程
      整体流程和入列基本一样。
    1. 先通过frame_queue_peek_readable 得到一个可以读的Frame。对这个Frame进行读取显示
    2. 通过frame_queue_next,让rindex偏移到下一位。后续可得到下一个Frame。同时会记录当前的size 。当size到达1之后,下次读取frame_queue_peek_readable就需要等待size再次小于1时。
    static Frame *frame_queue_peek_readable(FrameQueue *f)
    {
        /* wait until we have a readable a new frame */
        //使用f->size - f->rindex_shown ,来判断当前还持有的量
        //第一次调用这个方法时,rindex_shown还为0
        SDL_LockMutex(f->mutex);
        while (f->size - f->rindex_shown <= 0 &&
               !f->pktq->abort_request) {
            SDL_CondWait(f->cond, f->mutex);
        }
        SDL_UnlockMutex(f->mutex);
    
        if (f->pktq->abort_request)
            return NULL;
      
        //得到的是
        return &f->queue[(f->rindex + f->rindex_shown) % f->max_size];
    }
    
    static void frame_queue_next(FrameQueue *f)
    {
        if (f->keep_last && !f->rindex_shown) {
            f->rindex_shown = 1;
            return;
        }
        //每次不用,都需要通过这个方法来减少frame 的引用次数。这个不代表释放
        frame_queue_unref_item(&f->queue[f->rindex]);
        if (++f->rindex == f->max_size)
            f->rindex = 0;
        SDL_LockMutex(f->mutex);
        f->size--;
        SDL_CondSignal(f->cond);
        SDL_UnlockMutex(f->mutex);
    }
    
    • 其他注意
    1. 锁的操作。
      如上所诉,只有设计到多线程操作的size的写入,才会加锁。
    2. frame状态的控制
      偏移rindex时,是通过frame_queue_unref_item来减少一次引用计数。
    3. frame_queue_next的使用
      因为队列还要对rindex_shown进行初始化,所以需要先跑一次frame_queue_next用于初始化。
    非阻塞的读取(视频)
    1. 先判断是否还有充足的的可读
    /* return the number of undisplayed frames in the queue */
    static int frame_queue_nb_remaining(FrameQueue *f)
    {
        return f->size - f->rindex_shown;
    }
    
    1. 之后,就会直接通过frame_queue_peekframe_queue_peek_last取出对应的数据。
    //peek 出当前的。因为f->rindex + f->rindex_shown可能会超过max_size,所以用了取余
    static Frame *frame_queue_peek(FrameQueue *f)
    {
        return &f->queue[(f->rindex + f->rindex_shown) % f->max_size];
    }
    
    
    //取出最后一个。这个是在keep_last的时候才能用吗?这个表示的是真正的最后一个。因为通常我们会在视频和音频的队列中默认保留一帧的数据
    static Frame *frame_queue_peek_last(FrameQueue *f)
    {
        return &f->queue[f->rindex];
    }
    
    1. 最后也是通过frame_queue_next进行rindex的角标偏移
    • 其他注意
      rindex_shown的取值
      这两个方法的区别就在于rindex_shown。因为视频和音频都会在队列中保留一帧的数据。它会在
      在第一次调用frame_queue_next时,会将rindex_shown 进行初始化。视频和音频的线程,rindex_shown都会被刷新成1。
      在视频显示之前,会调用一次。
      音频播放之前,也会先调用一次。
    其他提供的辅助方法
    • frame_queue_last_pos
      seek 的时候使用
    /* return last shown position */
    static int64_t frame_queue_last_pos(FrameQueue *f)
    {
        Frame *fp = &f->queue[f->rindex];
        if (f->rindex_shown && fp->serial == f->pktq->serial)
            return fp->pos;
        else
            return -1;
    }
    
    • frame_queue_signal
      通知锁释放。
      在停止和释放线程时,会用到。因为PacketQueue队列的状态改变了。原来因为队列可用,而阻塞的部分,会因为队列状态改变,而结束。
    static void frame_queue_signal(FrameQueue *f)
    {
        SDL_LockMutex(f->mutex);
        SDL_CondSignal(f->cond);
        SDL_UnlockMutex(f->mutex);
    }
    
    小结
    1. FrameQueue中音频使用阻塞的读取,和写入基本一样。而视频使用的是非阻塞的读取方式。

    2. 锁的部分

    • 在写入时
      获取可用的Frame用来写入,会完全锁住。
      在push时,进行size增加的时候,会锁住,但是windex的增加,是不会锁住的。

    • 读取时
      同样,在获取可读的数据时,会完全锁住。
      同样的,在偏移角标的过程中,只锁住了size的变化。

    总结

    对两个特殊的变量进行额外的说明
    对多线程锁的理解。
    单线程读和单线程写的优化设定。

    相关文章

      网友评论

          本文标题:ffplay.c 源码分析- 队列操作

          本文链接:https://www.haomeiwen.com/subject/sechqqtx.html