前两遍文章,我们分析了视频部分和音频播放。其中包含的队列操作,还是让人迷惑。
这边文章,就主要来梳理一下队列操作。
PacketQueue
定义
在FFmpeg当中已经定义了一个AVPackList
。
在avformat.h
中。
typedef struct AVPacketList {
AVPacket pkt;
struct AVPacketList *next;
} AVPacketList;
但是这个AVPacketList
,需要的serial
,所以就自己定义。
typedef struct MyAVPacketList {
AVPacket pkt;
struct MyAVPacketList *next;
//操作数
int serial;
} MyAVPacketList;
再次包装的PacketQueue,维持一些锁和统计变量。
- 因为多个线程(读取线程和解码线程)都需要对PacketQueue进行操作。所以需要有锁。
- 还需要统计当前的
packet
的数量。后面配合锁,做生产者和消费者的经典多线程模式。
typedef struct PacketQueue {
MyAVPacketList *first_pkt, *last_pkt;
int nb_packets;
int size;
int64_t duration;
int abort_request;
int serial;
SDL_mutex *mutex;
SDL_cond *cond;
} PacketQueue;
使用
初始化
- 初始化
初始化需要对锁进行初始化的操作。
并且对队列的状态abort_request
,进行修改。初始化之后的队列默认是不可用的。
/* packet queue handling */
static int packet_queue_init(PacketQueue *q)
{
// 重置整个队列对象
memset(q, 0, sizeof(PacketQueue));
//创建锁
q->mutex = SDL_CreateMutex();
if (!q->mutex) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
q->cond = SDL_CreateCond();
if (!q->cond) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
q->abort_request = 1;
return 0;
}
- 启动队列
修改abort_request
,队列就可以开始工作了。
static void packet_queue_start(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 0;
packet_queue_put_private(q, &flush_pkt);
SDL_UnlockMutex(q->mutex);
}
反初始化
-
flush
方法
flush方法基本就是clear方法。会将当前PacketQueue
当中的数据全部清空。
在操作数修改,或者结束的时候,会进行改操作。
static void packet_queue_flush(PacketQueue *q)
{
MyAVPacketList *pkt, *pkt1;
SDL_LockMutex(q->mutex);
for (pkt = q->first_pkt; pkt; pkt = pkt1) {
pkt1 = pkt->next;
av_packet_unref(&pkt->pkt);
av_freep(&pkt);
}
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
SDL_UnlockMutex(q->mutex);
}
-
destroy
方法
释放的方法,先把队列flush ,同时释放我们的锁资源
static void packet_queue_destroy(PacketQueue *q)
{
packet_queue_flush(q);
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}
- 抛弃队列
就是将队列的状态修改成不可用。
static void packet_queue_abort(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 1;
SDL_CondSignal(q->cond);
SDL_UnlockMutex(q->mutex);
}
基本操作
入列
-
packet_queue_put
就是简单的提供了一个线程同步的功能。
可以看到,这里的入列的操作,是整体都加锁的。 - 入列的主体是在
packet_queue_put_private
方法当中。
主要完成的任务是将传入的AVPacket
包装成MyAVPacket
,并放入队列当中。修改队列的头指针和尾指针,和队列的统计参数。然后通知条件锁cond
解锁。(因为队列为空,出列时会用条件锁锁住) - 这里值得注意的是,关于操作数的修改。可以看到如果放入的是flush_pkt的话,就会提高操作数。(在快进,或者开始播放时,会先放入一个flush_pkt,记录增加当前的操作数)
static int packet_queue_put(PacketQueue *q, AVPacket *pkt)
{
int ret;
//为了保证线程安全。锁住
SDL_LockMutex(q->mutex);
//真正的操作,在packet_queue_put_private方法中
ret = packet_queue_put_private(q, pkt);
SDL_UnlockMutex(q->mutex);
if (pkt != &flush_pkt && ret < 0)
av_packet_unref(pkt);
return ret;
}
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
{
//声明一个新的MyAVPacketList
MyAVPacketList *pkt1;
if (q->abort_request)
return -1;
pkt1 = av_malloc(sizeof(MyAVPacketList));
if (!pkt1)
return -1;
//确定其变量
pkt1->pkt = *pkt;
pkt1->next = NULL;
if (pkt == &flush_pkt)
q->serial++;
pkt1->serial = q->serial;
//放置指针,并记录first_pkt 和 last_pkt
if (!q->last_pkt)
q->first_pkt = pkt1;
else
q->last_pkt->next = pkt1;
q->last_pkt = pkt1;
q->nb_packets++;
q->size += pkt1->pkt.size + sizeof(*pkt1);
q->duration += pkt1->pkt.duration;
/* XXX: should duplicate packet data in DV case */
SDL_CondSignal(q->cond);
return 0;
}
这里提供了一个便捷的方法,把nullpacket放入队列当中。这是在EOF(就是读取文件流结束)之后,才会进行的。
static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index)
{
AVPacket pkt1, *pkt = &pkt1;
av_init_packet(pkt);
pkt->data = NULL;
pkt->size = 0;
pkt->stream_index = stream_index;
return packet_queue_put(q, pkt);
}
出列
- 出列提供了是否阻塞的选择,如果是阻塞的话,就会等待队列中有数据时,继续读取
- 出列的操作同样是全部锁住的。
/* return < 0 if aborted, 0 if no packet and > 0 if packet. */
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
MyAVPacketList *pkt1;
int ret;
SDL_LockMutex(q->mutex);
//取不到数据时,如果是阻塞的,则会继续等待循环,如果不是阻塞的,就会直接跳出循环
for (;;) {
if (q->abort_request) {
ret = -1;
break;
}
pkt1 = q->first_pkt;
if (pkt1) {
q->first_pkt = pkt1->next;
if (!q->first_pkt)
q->last_pkt = NULL;
q->nb_packets--;
q->size -= pkt1->pkt.size + sizeof(*pkt1);
q->duration -= pkt1->pkt.duration;
*pkt = pkt1->pkt;
if (serial)
*serial = pkt1->serial;
av_free(pkt1);
ret = 1;
break;
} else if (!block) {
ret = 0;
break;
} else {
SDL_CondWait(q->cond, q->mutex);
}
}
SDL_UnlockMutex(q->mutex);
return ret;
}
小结
-
两种特殊的
packet
(flush_pkt
和nullpacket
)。flush_pkt
对应serial
操作数,记录前后的数据是否是连续的(开始和快进时会丢入flush_pkt
);nullpacket
对应文件流的末尾。 -
多线程同步。在
PacketList
的每一个操作,都是线程同步的,都用互斥锁给锁住了。
读取数据出列时,如果队列为空,而且需要阻塞,则会继续等待入列后才会取出。 -
基本操作
packet_queue_put
把数据放入尾部。packet_queue_get
取出头部数据。 -
方法汇总
packet_queue_init
:初始化
packet_queue_destroy
:销毁
packet_queue_start
:启用
packet_queue_abort
:中止
packet_queue_get
:获取一个节点
packet_queue_put
:存入一个节点
packet_queue_put_nullpacket
:存入一个空节点
packet_queue_flush
:清除队列内所有的节点
FrameQueue
FrameQueue
对比PacketList
相对复杂一些
- 他和
PacketList
不同,用的是数组的方式。所以他的长度是固定的。 - 它是通过两个角标来控制,进行循环入列和循环出列。一个是
rindex
负责读。一个是windex
负责写。通过size来判断,当前数组中的剩余。
当需要写的时候,先通过windex
判断是否还有可以写的部分,有的话,就会返回一个可写的数据。使用完,需要将windex+1
. 当Size
满了。就不会返回数据了。
当需要读的时候,会通过rindex
返回当前的数据。
定义
重新自定义一个Frame结构体。除了包含有AVFrame
外,它还包含了一些其他的字段。
- 视频相关的长宽格式,翻转参数
flip_v
,是否显示uploaded
,pts
,duration
等。 - 因为有考虑字幕,包括 AVSubtitle 等参数。
- 同样会记录操作数
serial
,保证数据是最新需要的。
typedef struct Frame {
AVFrame *frame;
AVSubtitle sub;
int serial;
double pts; /* presentation timestamp for the frame */
double duration; /* estimated duration of the frame */
int64_t pos; /* byte position of the frame in the input file */
int width;
int height;
int format;
AVRational sar;
int uploaded;
int flip_v;
} Frame;
定义一个FrameQueue结构体。
- 如上概述。使用了数组的方式,通过两个
winde
,rindex
,进行循环出列和入列。 - 还会通过
keep_last
来判断,队列中是否要保留一帧的数据。通过rindex_shown
来辅助进行读取(因为keep_last
时,rindex_shown
为1)。 - 持有对应的
PacketQueue
,对队列的状态进行判断,如果队列进入abort
状态了,就不会继续去解码入列了。 - 因为同样有多线程操作队列,需要锁。
typedef struct FrameQueue {
Frame queue[FRAME_QUEUE_SIZE];
//我觉得最关键的三个变量。来帮助读写的。
int rindex;
int windex;
int size;
//最大的size
int max_size;
//下面这两个变量来判断,队列中是否需要保留最后一个
int keep_last;
int rindex_shown;
//当前的队列的锁
SDL_mutex *mutex;
SDL_cond *cond;
//如果PacketQueue的状态不可用的话,它也无法取出数据
PacketQueue *pktq;
} FrameQueue;
使用
初始化
- 初始化锁
- 对队列中的AVFrame初始化。和
PacketQueue
不同的是,这里使用的AVFrame
都是一开始准备好的,也是有限的。 - 记录
keep_last
的状态
static int frame_queue_init(FrameQueue *f, PacketQueue *pktq, int max_size, int keep_last)
{
int i;
//初始化的时候,都是用memset的方式,将变量重置
memset(f, 0, sizeof(FrameQueue));
//创建锁资源
if (!(f->mutex = SDL_CreateMutex())) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
if (!(f->cond = SDL_CreateCond())) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
//保存对应的packet_queue
f->pktq = pktq;
f->max_size = FFMIN(max_size, FRAME_QUEUE_SIZE);
//判断是否要保留最后一个。ffplay中的音频和视频,都需要保留最后一个。
//其实这里的判断是keep_last != 0
f->keep_last = !!keep_last;
//初始化数组中的frame,因为这些frame最后,都是我们提供来用的,所以要实现初始化好。
for (i = 0; i < f->max_size; i++)
if (!(f->queue[i].frame = av_frame_alloc()))
return AVERROR(ENOMEM);
return 0;
}
反初始化
- 销毁单个
static void frame_queue_unref_item(Frame *vp)
{
av_frame_unref(vp->frame);
avsubtitle_free(&vp->sub);
}
- 全部销毁
将内部的frame和锁释放
static void frame_queue_destory(FrameQueue *f)
{
int i;
for (i = 0; i < f->max_size; i++) {
Frame *vp = &f->queue[i];
frame_queue_unref_item(vp);
av_frame_free(&vp->frame);
}
SDL_DestroyMutex(f->mutex);
SDL_DestroyCond(f->cond);
}
基本操作
入列
完整流程
- 先通过
frame_queue_peek_writable
得到一个可以写的Frame
。 - 对这个
Frame
进行操作 - 通过
frame_queue_push
,让windex
偏移到下一位。后续可得到下一个Frame
。同时会记录当前的size
。当size
到达max_size
之后,下次读取frame_queue_peek_writable
就需要等待size
再次小于max_size
static Frame *frame_queue_peek_writable(FrameQueue *f)
{
/* wait until we have space to put a new frame */
SDL_LockMutex(f->mutex);
//等待。直到可以取
while (f->size >= f->max_size &&
!f->pktq->abort_request) {
SDL_CondWait(f->cond, f->mutex);
}
SDL_UnlockMutex(f->mutex);
//可以取到了。这个时候,会接触这个锁?这里的锁,只有当index发生改变的时候,才会锁。
if (f->pktq->abort_request)
return NULL;
//直接返回f->windex
return &f->queue[f->windex];
}
得到一个可以操作的AVFrame 等我们操作完。之后要调用 frame_queue_push
将,角标进行移动。注意这个时候移动的是windex.
static void frame_queue_push(FrameQueue *f)
{
//windex 的移动没有加锁。
if (++f->windex == f->max_size)
f->windex = 0;
//对size进行加锁了。因为size 会影响取的时候的阻塞。
SDL_LockMutex(f->mutex);
f->size++;
SDL_CondSignal(f->cond);
SDL_UnlockMutex(f->mutex);
}
其他注意
- 锁的操作。
这里只有对size
的写入操作才会进行加锁。因为windex
的写入操作都是在解码线程当中。不涉及到多线程的操作,所以不用锁。而size
的写入操作在解码线程和视频播放线程都设计。所以需要做线程同步。 - 对
PacketQueue
队列状态的判断
当PacketQueue
处于abort_request
状态时,也不能取出可写的Frame
出列
音频和视频出列的方式有所不同。
音频采用的方式是阻塞的方式进行读取,如果当前队列中没有数据,会进行等待到队列数据写入。
而视频采取的方式是不阻塞的读取,如果没有数据,则继续会显示上一次的数据。等待下一次循环进入读取。
阻塞的读取(音频)
- 完整流程
整体流程和入列基本一样。
- 先通过
frame_queue_peek_readable
得到一个可以读的Frame
。对这个Frame
进行读取显示 - 通过
frame_queue_next
,让rindex
偏移到下一位。后续可得到下一个Frame
。同时会记录当前的size
。当size
到达1之后,下次读取frame_queue_peek_readable
就需要等待size
再次小于1时。
static Frame *frame_queue_peek_readable(FrameQueue *f)
{
/* wait until we have a readable a new frame */
//使用f->size - f->rindex_shown ,来判断当前还持有的量
//第一次调用这个方法时,rindex_shown还为0
SDL_LockMutex(f->mutex);
while (f->size - f->rindex_shown <= 0 &&
!f->pktq->abort_request) {
SDL_CondWait(f->cond, f->mutex);
}
SDL_UnlockMutex(f->mutex);
if (f->pktq->abort_request)
return NULL;
//得到的是
return &f->queue[(f->rindex + f->rindex_shown) % f->max_size];
}
static void frame_queue_next(FrameQueue *f)
{
if (f->keep_last && !f->rindex_shown) {
f->rindex_shown = 1;
return;
}
//每次不用,都需要通过这个方法来减少frame 的引用次数。这个不代表释放
frame_queue_unref_item(&f->queue[f->rindex]);
if (++f->rindex == f->max_size)
f->rindex = 0;
SDL_LockMutex(f->mutex);
f->size--;
SDL_CondSignal(f->cond);
SDL_UnlockMutex(f->mutex);
}
- 其他注意
- 锁的操作。
如上所诉,只有设计到多线程操作的size
的写入,才会加锁。 -
frame
状态的控制
偏移rindex
时,是通过frame_queue_unref_item
来减少一次引用计数。 -
frame_queue_next
的使用
因为队列还要对rindex_shown
进行初始化,所以需要先跑一次frame_queue_next
用于初始化。
非阻塞的读取(视频)
- 先判断是否还有充足的的可读
/* return the number of undisplayed frames in the queue */
static int frame_queue_nb_remaining(FrameQueue *f)
{
return f->size - f->rindex_shown;
}
- 之后,就会直接通过
frame_queue_peek
和frame_queue_peek_last
取出对应的数据。
//peek 出当前的。因为f->rindex + f->rindex_shown可能会超过max_size,所以用了取余
static Frame *frame_queue_peek(FrameQueue *f)
{
return &f->queue[(f->rindex + f->rindex_shown) % f->max_size];
}
//取出最后一个。这个是在keep_last的时候才能用吗?这个表示的是真正的最后一个。因为通常我们会在视频和音频的队列中默认保留一帧的数据
static Frame *frame_queue_peek_last(FrameQueue *f)
{
return &f->queue[f->rindex];
}
- 最后也是通过
frame_queue_next
进行rindex
的角标偏移
-
其他注意
rindex_shown
的取值
这两个方法的区别就在于rindex_shown。因为视频和音频都会在队列中保留一帧的数据。它会在
在第一次调用frame_queue_next
时,会将rindex_shown
进行初始化。视频和音频的线程,rindex_shown
都会被刷新成1。
在视频显示之前,会调用一次。
音频播放之前,也会先调用一次。
其他提供的辅助方法
- frame_queue_last_pos
seek 的时候使用
/* return last shown position */
static int64_t frame_queue_last_pos(FrameQueue *f)
{
Frame *fp = &f->queue[f->rindex];
if (f->rindex_shown && fp->serial == f->pktq->serial)
return fp->pos;
else
return -1;
}
- frame_queue_signal
通知锁释放。
在停止和释放线程时,会用到。因为PacketQueue
队列的状态改变了。原来因为队列可用,而阻塞的部分,会因为队列状态改变,而结束。
static void frame_queue_signal(FrameQueue *f)
{
SDL_LockMutex(f->mutex);
SDL_CondSignal(f->cond);
SDL_UnlockMutex(f->mutex);
}
小结
-
FrameQueue
中音频使用阻塞的读取,和写入基本一样。而视频使用的是非阻塞的读取方式。 -
锁的部分
-
在写入时
获取可用的Frame用来写入,会完全锁住。
在push时,进行size增加的时候,会锁住,但是windex
的增加,是不会锁住的。 -
读取时
同样,在获取可读的数据时,会完全锁住。
同样的,在偏移角标的过程中,只锁住了size的变化。
总结
对两个特殊的变量进行额外的说明
对多线程锁的理解。
单线程读和单线程写的优化设定。
网友评论