美文网首页
AVThreadMessageQueue

AVThreadMessageQueue

作者: NHFX | 来源:发表于2020-11-13 15:36 被阅读0次
    #include <QCoreApplication>
    #include <QDebug>
    extern "C" {
    
        #include "/usr/local/Cellar/ffmpeg/4.3.1_2/include/libavutil/avassert.h"
        #include "ffmpeg/include/libavutil/avstring.h"
        #include "ffmpeg/include/libavutil/frame.h"
        #include "ffmpeg/include/libavutil/threadmessage.h"
        #include "ffmpeg/include/libavcodec/avcodec.h"
        #include "ffmpeg/include/libavutil/log.h"
    }
    
    #define HAVE_THREADS
    
    struct sender_data {
        int id;
        pthread_t tid;
        int workload;
        AVThreadMessageQueue *queue;
    };
    
    /* same as sender_data but shuffled for testing purpose */
    struct receiver_data {
        pthread_t tid;
        int workload;
        int id;
        AVThreadMessageQueue *queue;
    };
    
    struct message {
        AVFrame *frame;
        // we add some junk in the message to make sure the message size is >
        // sizeof(void*)
        int magic;
    };
    
    const int MAGIC = 0x0001;
    
    static void free_frame(void *arg)
    {
        struct message *msg = (struct message *)arg;
        av_assert0(msg->magic == MAGIC);
        av_frame_free(&msg->frame);
    }
    
    static void *sender_thread(void *arg)
    {
        int i, ret = 0;
        struct sender_data *wd = (struct sender_data *)arg;
    
        av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
        for (i = 0; i < wd->workload; i++) {
            if (rand() % wd->workload < wd->workload / 10) {
                av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
                // 刷新队列
                av_thread_message_flush(wd->queue);
            } else {
                char *val;
                AVDictionary *meta = NULL;
                // 初始化消息数据
                struct message msg = {
                    .frame = (AVFrame *)av_frame_alloc(),
                    .magic = MAGIC,
                };
    
                if (!msg.frame) {
                    ret = AVERROR(ENOMEM);
                    break;
                }
    
                /* 制造 frame 的数据 */
                /* we add some metadata to identify the frames */
                val = av_asprintf("frame %d/%d from sender %d",
                                  i + 1, wd->workload, wd->id);
                if (!val) {
                    av_frame_free(&msg.frame);
                    ret = AVERROR(ENOMEM);
                    break;
                }
                ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
                if (ret < 0) {
                    av_frame_free(&msg.frame);
                    break;
                }
                msg.frame->metadata = meta;
    
                /* allocate a real frame in order to simulate "real" work */
                msg.frame->format = AV_PIX_FMT_RGBA;
                msg.frame->width  = 320;
                msg.frame->height = 240;
                /*如果宽和高大于0,认为是视频数据, 返回get_video_buffer*/
                ret = av_frame_get_buffer(msg.frame, 0);
                if (ret < 0) {
                    av_frame_free(&msg.frame);
                    break;
                }
    
                /* push the frame in the common queue */
                av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
                       wd->id, i + 1, wd->workload, msg.frame);
                /*添加到队列*/
                ret = av_thread_message_queue_send(wd->queue, &msg, 0);
                if (ret < 0) {
                    av_frame_free(&msg.frame);
                    break;
                }
            }
        }
        av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
               wd->id, av_err2str(ret));
        av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
        return NULL;
    }
    
    /*冲队列中接收消息*/
    static void *receiver_thread(void *arg)
    {
        int i, ret = 0;
        struct receiver_data *rd = (struct receiver_data*) arg;
    
        for (i = 0; i < rd->workload; i++) {
            if (rand() % rd->workload < rd->workload / 10) {
                // 获取队列当前的消息数
                av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
                       "discarding %d message(s)\n", rd->id,
                       av_thread_message_queue_nb_elems(rd->queue));
                // 刷新消息队列
                av_thread_message_flush(rd->queue);
            } else {
                struct message msg;
                AVDictionary *meta;
                AVDictionaryEntry *e;
                /*从对队列中取出消息*/
                ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
                if (ret < 0)
                    break;
                av_assert0(msg.magic == MAGIC);
                meta = msg.frame->metadata;
                e = av_dict_get(meta, "sig", NULL, 0);
                av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
                av_frame_free(&msg.frame);
            }
        }
    
        av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
        // 发送错误码
        av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
    
        return NULL;
    }
    
    static int get_workload(int minv, int maxv)
    {
        return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
    }
    
    int main(int argc, char *argv[])
    {
        QCoreApplication a(argc, argv);
    
    //    qDebug() << avcodec_configuration();
    //    unsigned version = avcodec_version();
    //    QString ch = QString::number(version, 10);
    //    qDebug() << "version:" << version;
        int i, ret = 0;
        int max_queue_size = 10000;
        int nb_senders = 5, sender_min_load = 1, sender_max_load = 50;
        int nb_receivers = 1, receiver_min_load = 1, receiver_max_load = 50;
        struct sender_data *senders;
        struct receiver_data *receivers;
        AVThreadMessageQueue *queue = NULL;
        /** av_mallocz_array 申请内存的长度 nb_senders * sizeof(*senders)
          */
        senders = (struct sender_data *)av_mallocz_array(nb_senders, sizeof(*senders));
        receivers = (struct receiver_data *)av_mallocz_array(nb_receivers, sizeof(*receivers));
        if (!senders || !receivers) {
                ret = AVERROR(ENOMEM);
                goto end;
            }
        // 为消息队列申请内存 公共资源
        ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
           if (ret < 0)
               goto end;
        //设置消息释放回调函数
        av_thread_message_queue_set_free_func(queue, free_frame);
    
    //在普通的宏定义中,预处理器一般把空格解释成分段标志,对于每一段和前面比较,相同的就被替换。
    //但是这样做的结果是,被替换段之间存在一些空格。如果我们不希望出现这些空格,就可以通过添加一些##来替代空格。
    #define SPAWN_THREADS(type) do {                                                \
        for (i = 0; i < nb_##type##s; i++) {                                        \
            struct type##_data *td = &type##s[i];                                   \
                                                                                    \
            td->id = i;                                                             \
            td->queue = queue;                                                      \
            td->workload = get_workload(type##_min_load, type##_max_load);          \
                                                                                    \
            ret = pthread_create(&td->tid, NULL, type##_thread, td);                \
            if (ret) {                                                              \
                const int err = AVERROR(ret);                                       \
                av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type)    \
                       " thread: %s\n", av_err2str(err));                           \
                goto end;                                                           \
            }                                                                       \
        }                                                                           \
    } while (0)
    
    #define WAIT_THREADS(type) do {                                                 \
        for (i = 0; i < nb_##type##s; i++) {                                        \
            struct type##_data *td = &type##s[i];                                   \
                                                                                    \
            ret = pthread_join(td->tid, NULL);                                      \
            if (ret) {                                                              \
                const int err = AVERROR(ret);                                       \
                av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type)     \
                       " thread: %s\n", av_err2str(err));                           \
                goto end;                                                           \
            }                                                                       \
        }                                                                           \
    } while (0)
           SPAWN_THREADS(receiver);
           SPAWN_THREADS(sender);
           WAIT_THREADS(sender);
           WAIT_THREADS(receiver);
    
    end:
        av_thread_message_queue_free(&queue);
        av_freep(&senders);
        av_freep(&receivers);
        if (ret < 0 && ret != AVERROR_EOF) {
            av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
            return 1;
        }
        return a.exec();
    }
    

    相关文章

      网友评论

          本文标题:AVThreadMessageQueue

          本文链接:https://www.haomeiwen.com/subject/zlzvbktx.html