#include <QCoreApplication>
#include <QDebug>
extern "C" {
#include "/usr/local/Cellar/ffmpeg/4.3.1_2/include/libavutil/avassert.h"
#include "ffmpeg/include/libavutil/avstring.h"
#include "ffmpeg/include/libavutil/frame.h"
#include "ffmpeg/include/libavutil/threadmessage.h"
#include "ffmpeg/include/libavcodec/avcodec.h"
#include "ffmpeg/include/libavutil/log.h"
}
#define HAVE_THREADS
struct sender_data {
int id;
pthread_t tid;
int workload;
AVThreadMessageQueue *queue;
};
/* same as sender_data but shuffled for testing purpose */
struct receiver_data {
pthread_t tid;
int workload;
int id;
AVThreadMessageQueue *queue;
};
struct message {
AVFrame *frame;
// we add some junk in the message to make sure the message size is >
// sizeof(void*)
int magic;
};
const int MAGIC = 0x0001;
static void free_frame(void *arg)
{
struct message *msg = (struct message *)arg;
av_assert0(msg->magic == MAGIC);
av_frame_free(&msg->frame);
}
static void *sender_thread(void *arg)
{
int i, ret = 0;
struct sender_data *wd = (struct sender_data *)arg;
av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
for (i = 0; i < wd->workload; i++) {
if (rand() % wd->workload < wd->workload / 10) {
av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
// 刷新队列
av_thread_message_flush(wd->queue);
} else {
char *val;
AVDictionary *meta = NULL;
// 初始化消息数据
struct message msg = {
.frame = (AVFrame *)av_frame_alloc(),
.magic = MAGIC,
};
if (!msg.frame) {
ret = AVERROR(ENOMEM);
break;
}
/* 制造 frame 的数据 */
/* we add some metadata to identify the frames */
val = av_asprintf("frame %d/%d from sender %d",
i + 1, wd->workload, wd->id);
if (!val) {
av_frame_free(&msg.frame);
ret = AVERROR(ENOMEM);
break;
}
ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
if (ret < 0) {
av_frame_free(&msg.frame);
break;
}
msg.frame->metadata = meta;
/* allocate a real frame in order to simulate "real" work */
msg.frame->format = AV_PIX_FMT_RGBA;
msg.frame->width = 320;
msg.frame->height = 240;
/*如果宽和高大于0,认为是视频数据, 返回get_video_buffer*/
ret = av_frame_get_buffer(msg.frame, 0);
if (ret < 0) {
av_frame_free(&msg.frame);
break;
}
/* push the frame in the common queue */
av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
wd->id, i + 1, wd->workload, msg.frame);
/*添加到队列*/
ret = av_thread_message_queue_send(wd->queue, &msg, 0);
if (ret < 0) {
av_frame_free(&msg.frame);
break;
}
}
}
av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
wd->id, av_err2str(ret));
av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
return NULL;
}
/*冲队列中接收消息*/
static void *receiver_thread(void *arg)
{
int i, ret = 0;
struct receiver_data *rd = (struct receiver_data*) arg;
for (i = 0; i < rd->workload; i++) {
if (rand() % rd->workload < rd->workload / 10) {
// 获取队列当前的消息数
av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
"discarding %d message(s)\n", rd->id,
av_thread_message_queue_nb_elems(rd->queue));
// 刷新消息队列
av_thread_message_flush(rd->queue);
} else {
struct message msg;
AVDictionary *meta;
AVDictionaryEntry *e;
/*从对队列中取出消息*/
ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
if (ret < 0)
break;
av_assert0(msg.magic == MAGIC);
meta = msg.frame->metadata;
e = av_dict_get(meta, "sig", NULL, 0);
av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
av_frame_free(&msg.frame);
}
}
av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
// 发送错误码
av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
return NULL;
}
static int get_workload(int minv, int maxv)
{
return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
}
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
// qDebug() << avcodec_configuration();
// unsigned version = avcodec_version();
// QString ch = QString::number(version, 10);
// qDebug() << "version:" << version;
int i, ret = 0;
int max_queue_size = 10000;
int nb_senders = 5, sender_min_load = 1, sender_max_load = 50;
int nb_receivers = 1, receiver_min_load = 1, receiver_max_load = 50;
struct sender_data *senders;
struct receiver_data *receivers;
AVThreadMessageQueue *queue = NULL;
/** av_mallocz_array 申请内存的长度 nb_senders * sizeof(*senders)
*/
senders = (struct sender_data *)av_mallocz_array(nb_senders, sizeof(*senders));
receivers = (struct receiver_data *)av_mallocz_array(nb_receivers, sizeof(*receivers));
if (!senders || !receivers) {
ret = AVERROR(ENOMEM);
goto end;
}
// 为消息队列申请内存 公共资源
ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
if (ret < 0)
goto end;
//设置消息释放回调函数
av_thread_message_queue_set_free_func(queue, free_frame);
//在普通的宏定义中,预处理器一般把空格解释成分段标志,对于每一段和前面比较,相同的就被替换。
//但是这样做的结果是,被替换段之间存在一些空格。如果我们不希望出现这些空格,就可以通过添加一些##来替代空格。
#define SPAWN_THREADS(type) do { \
for (i = 0; i < nb_##type##s; i++) { \
struct type##_data *td = &type##s[i]; \
\
td->id = i; \
td->queue = queue; \
td->workload = get_workload(type##_min_load, type##_max_load); \
\
ret = pthread_create(&td->tid, NULL, type##_thread, td); \
if (ret) { \
const int err = AVERROR(ret); \
av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
" thread: %s\n", av_err2str(err)); \
goto end; \
} \
} \
} while (0)
#define WAIT_THREADS(type) do { \
for (i = 0; i < nb_##type##s; i++) { \
struct type##_data *td = &type##s[i]; \
\
ret = pthread_join(td->tid, NULL); \
if (ret) { \
const int err = AVERROR(ret); \
av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
" thread: %s\n", av_err2str(err)); \
goto end; \
} \
} \
} while (0)
SPAWN_THREADS(receiver);
SPAWN_THREADS(sender);
WAIT_THREADS(sender);
WAIT_THREADS(receiver);
end:
av_thread_message_queue_free(&queue);
av_freep(&senders);
av_freep(&receivers);
if (ret < 0 && ret != AVERROR_EOF) {
av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
return 1;
}
return a.exec();
}
网友评论