美文网首页音视频
ijkplayer 开源项目分析(三)msg_queue消息机制

ijkplayer 开源项目分析(三)msg_queue消息机制

作者: 码上就说 | 来源:发表于2020-02-09 12:09 被阅读0次

    ijkplayer 开源项目分析(一)编译
    ijkplayer 开源项目分析(二)播放流程概要
    ijkplayer 开源项目分析(三)msg_queue消息机制剖析
    ijkplayer 开源项目分析(四)解决video size识别问题
    ijkplayer 开源项目分析(五)ffmpeg升级到4.X版本

    1.为什么需要一个消息机制

    Android 平台 提供了 Handler、Message、MessageQueue、Looper 机制来实现消息分发和处理机制;


    Handler 消息机制
    • 1、首先Looper.prepare()在本线程中保存一个Looper实例,然后该实例中保存一个MessageQueue对象;因为Looper.prepare()在一个线程中只能调用一次,所以MessageQueue在一个线程中只会存在一个。
    • 2、Looper.loop()会让当前线程进入一个无限循环,不端从MessageQueue的实例中读取消息,然后回调msg.target.dispatchMessage(msg)方法。
    • 3、Handler的构造方法,会首先得到当前线程中保存的Looper实例,进而与Looper实例中的MessageQueue相关联。
    • 4、Handler的sendMessage方法,会给msg的target赋值为handler自身,然后加入MessageQueue中。
    • 5、在构造Handler实例时,我们会重写handleMessage方法,也就是msg.target.dispatchMessage(msg)最终调用的方法。

    在ijkplayer的native层也需要这样的一个消息处理机制,因为播放器底层很多状态需要回调处理,这些转化成msg处理方便一些,当然也可以使用接口,但是要传一个极其庞大的接口,代码效果而言不太好,而且也不优美,正好Android 有一个现成的例子,还是采用Handler Message的机制;

    2.ijkplayer消息处理流程

    ijkplayer 开源项目分析(二)播放流程概要已经简单介绍了 ijkplayer消息处理机制,本章我们详细讲解一下ijkplayer底层是如何处理消息的?
    ijkplayer native层的消息机制是在player setup启动的;

    static void
    IjkMediaPlayer_native_setup(JNIEnv *env, jobject thiz, jobject weak_this)
    {
        MPTRACE("%s\n", __func__);
        IjkMediaPlayer *mp = ijkmp_android_create(message_loop);
        JNI_CHECK_GOTO(mp, env, "java/lang/OutOfMemoryError", "mpjni: native_setup: ijkmp_create() failed", LABEL_RETURN);
    
        jni_set_media_player(env, thiz, mp);
        ijkmp_set_weak_thiz(mp, (*env)->NewGlobalRef(env, weak_this));
        ijkmp_set_inject_opaque(mp, ijkmp_get_weak_thiz(mp));
        ijkmp_set_ijkio_inject_opaque(mp, ijkmp_get_weak_thiz(mp));
        ijkmp_android_set_mediacodec_select_callback(mp, mediacodec_select_callback, ijkmp_get_weak_thiz(mp));
    
    LABEL_RETURN:
        ijkmp_dec_ref_p(&mp);
    }
    

    通过IjkMediaPlayer *mp = ijkmp_android_create(message_loop); 初始化了message_loop 函数指针;message_loop 调用 message_loop_n 函数;

    static void message_loop_n(JNIEnv *env, IjkMediaPlayer *mp)
    {
        jobject weak_thiz = (jobject) ijkmp_get_weak_thiz(mp);
        JNI_CHECK_GOTO(weak_thiz, env, NULL, "mpjni: message_loop_n: null weak_thiz", LABEL_RETURN);
    
        while (1) {
            AVMessage msg;
    
            int retval = ijkmp_get_msg(mp, &msg, 1);
            if (retval < 0)
                break;
    
            // block-get should never return 0
            assert(retval > 0);
    
            switch (msg.what) {
            case FFP_MSG_FLUSH:
                MPTRACE("FFP_MSG_FLUSH:\n");
                post_event(env, weak_thiz, MEDIA_NOP, 0, 0);
                break;
            case FFP_MSG_ERROR:
                MPTRACE("FFP_MSG_ERROR: %d\n", msg.arg1);
                post_event(env, weak_thiz, MEDIA_ERROR, MEDIA_ERROR_IJK_PLAYER, msg.arg1);
                break;
            case FFP_MSG_PREPARED:
                MPTRACE("FFP_MSG_PREPARED:\n");
                post_event(env, weak_thiz, MEDIA_PREPARED, 0, 0);
                break;
            case FFP_MSG_COMPLETED:
                MPTRACE("FFP_MSG_COMPLETED:\n");
                post_event(env, weak_thiz, MEDIA_PLAYBACK_COMPLETE, 0, 0);
                break;
            case FFP_MSG_VIDEO_SIZE_CHANGED:
                MPTRACE("FFP_MSG_VIDEO_SIZE_CHANGED: %d, %d\n", msg.arg1, msg.arg2);
                post_event(env, weak_thiz, MEDIA_SET_VIDEO_SIZE, msg.arg1, msg.arg2);
                break;
            case FFP_MSG_SAR_CHANGED:
                MPTRACE("FFP_MSG_SAR_CHANGED: %d, %d\n", msg.arg1, msg.arg2);
                post_event(env, weak_thiz, MEDIA_SET_VIDEO_SAR, msg.arg1, msg.arg2);
                break;
            case FFP_MSG_VIDEO_RENDERING_START:
                MPTRACE("FFP_MSG_VIDEO_RENDERING_START:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_VIDEO_RENDERING_START, 0);
                break;
            case FFP_MSG_AUDIO_RENDERING_START:
                MPTRACE("FFP_MSG_AUDIO_RENDERING_START:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_AUDIO_RENDERING_START, 0);
                break;
            case FFP_MSG_VIDEO_ROTATION_CHANGED:
                MPTRACE("FFP_MSG_VIDEO_ROTATION_CHANGED: %d\n", msg.arg1);
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_VIDEO_ROTATION_CHANGED, msg.arg1);
                break;
            case FFP_MSG_AUDIO_DECODED_START:
                MPTRACE("FFP_MSG_AUDIO_DECODED_START:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_AUDIO_DECODED_START, 0);
                break;
            case FFP_MSG_VIDEO_DECODED_START:
                MPTRACE("FFP_MSG_VIDEO_DECODED_START:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_VIDEO_DECODED_START, 0);
                break;
            case FFP_MSG_OPEN_INPUT:
                MPTRACE("FFP_MSG_OPEN_INPUT:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_OPEN_INPUT, 0);
                break;
            case FFP_MSG_FIND_STREAM_INFO:
                MPTRACE("FFP_MSG_FIND_STREAM_INFO:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_FIND_STREAM_INFO, 0);
                break;
            case FFP_MSG_COMPONENT_OPEN:
                MPTRACE("FFP_MSG_COMPONENT_OPEN:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_COMPONENT_OPEN, 0);
                break;
            case FFP_MSG_BUFFERING_START:
                MPTRACE("FFP_MSG_BUFFERING_START:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_BUFFERING_START, msg.arg1);
                break;
            case FFP_MSG_BUFFERING_END:
                MPTRACE("FFP_MSG_BUFFERING_END:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_BUFFERING_END, msg.arg1);
                break;
            case FFP_MSG_BUFFERING_UPDATE:
                // MPTRACE("FFP_MSG_BUFFERING_UPDATE: %d, %d", msg.arg1, msg.arg2);
                post_event(env, weak_thiz, MEDIA_BUFFERING_UPDATE, msg.arg1, msg.arg2);
                break;
            case FFP_MSG_BUFFERING_BYTES_UPDATE:
                break;
            case FFP_MSG_BUFFERING_TIME_UPDATE:
                break;
            case FFP_MSG_SEEK_COMPLETE:
                MPTRACE("FFP_MSG_SEEK_COMPLETE:\n");
                post_event(env, weak_thiz, MEDIA_SEEK_COMPLETE, 0, 0);
                break;
            case FFP_MSG_ACCURATE_SEEK_COMPLETE:
                MPTRACE("FFP_MSG_ACCURATE_SEEK_COMPLETE:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_MEDIA_ACCURATE_SEEK_COMPLETE, msg.arg1);
                break;
            case FFP_MSG_PLAYBACK_STATE_CHANGED:
                break;
            case FFP_MSG_TIMED_TEXT:
                if (msg.obj) {
                    jstring text = (*env)->NewStringUTF(env, (char *)msg.obj);
                    post_event2(env, weak_thiz, MEDIA_TIMED_TEXT, 0, 0, text);
                    J4A_DeleteLocalRef__p(env, &text);
                }
                else {
                    post_event2(env, weak_thiz, MEDIA_TIMED_TEXT, 0, 0, NULL);
                }
                break;
            case FFP_MSG_GET_IMG_STATE:
                if (msg.obj) {
                    jstring file_name = (*env)->NewStringUTF(env, (char *)msg.obj);
                    post_event2(env, weak_thiz, MEDIA_GET_IMG_STATE, msg.arg1, msg.arg2, file_name);
                    J4A_DeleteLocalRef__p(env, &file_name);
                }
                else {
                    post_event2(env, weak_thiz, MEDIA_GET_IMG_STATE, msg.arg1, msg.arg2, NULL);
                }
                break;
            case FFP_MSG_VIDEO_SEEK_RENDERING_START:
                MPTRACE("FFP_MSG_VIDEO_SEEK_RENDERING_START:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_VIDEO_SEEK_RENDERING_START, msg.arg1);
                break;
            case FFP_MSG_AUDIO_SEEK_RENDERING_START:
                MPTRACE("FFP_MSG_AUDIO_SEEK_RENDERING_START:\n");
                post_event(env, weak_thiz, MEDIA_INFO, MEDIA_INFO_AUDIO_SEEK_RENDERING_START, msg.arg1);
                break;
            default:
                ALOGE("unknown FFP_MSG_xxx(%d)\n", msg.what);
                break;
            }
            msg_free_res(&msg);
        }
    
    LABEL_RETURN:
        ;
    }
    

    从函数执行来看,启动了一个无线循环来处理消息,在循环中,调用ijkmp_get_msg 函数获取最新的msg 信息;直接调用到 ijkplayer.c 中的ijkmp_get_msg;

    /* need to call msg_free_res for freeing the resouce obtained in msg */
    int ijkmp_get_msg(IjkMediaPlayer *mp, AVMessage *msg, int block)
    {
        assert(mp);
        while (1) {
            int continue_wait_next_msg = 0;
            int retval = msg_queue_get(&mp->ffplayer->msg_queue, msg, block);
            if (retval <= 0)
                return retval;
    
            switch (msg->what) {
            case FFP_MSG_PREPARED:
                MPTRACE("ijkmp_get_msg: FFP_MSG_PREPARED\n");
                pthread_mutex_lock(&mp->mutex);
                if (mp->mp_state == MP_STATE_ASYNC_PREPARING) {
                    ijkmp_change_state_l(mp, MP_STATE_PREPARED);
                } else {
                    // FIXME: 1: onError() ?
                    av_log(mp->ffplayer, AV_LOG_DEBUG, "FFP_MSG_PREPARED: expecting mp_state==MP_STATE_ASYNC_PREPARING\n");
                }
                if (!mp->ffplayer->start_on_prepared) {
                    ijkmp_change_state_l(mp, MP_STATE_PAUSED);
                }
                pthread_mutex_unlock(&mp->mutex);
                break;
    
            case FFP_MSG_COMPLETED:
                MPTRACE("ijkmp_get_msg: FFP_MSG_COMPLETED\n");
    
                pthread_mutex_lock(&mp->mutex);
                mp->restart = 1;
                mp->restart_from_beginning = 1;
                ijkmp_change_state_l(mp, MP_STATE_COMPLETED);
                pthread_mutex_unlock(&mp->mutex);
                break;
    
            case FFP_MSG_SEEK_COMPLETE:
                MPTRACE("ijkmp_get_msg: FFP_MSG_SEEK_COMPLETE\n");
    
                pthread_mutex_lock(&mp->mutex);
                mp->seek_req = 0;
                mp->seek_msec = 0;
                pthread_mutex_unlock(&mp->mutex);
                break;
    
            case FFP_REQ_START:
                MPTRACE("ijkmp_get_msg: FFP_REQ_START\n");
                continue_wait_next_msg = 1;
                pthread_mutex_lock(&mp->mutex);
                if (0 == ikjmp_chkst_start_l(mp->mp_state)) {
                    // FIXME: 8 check seekable
                    if (mp->restart) {
                        if (mp->restart_from_beginning) {
                            av_log(mp->ffplayer, AV_LOG_DEBUG, "ijkmp_get_msg: FFP_REQ_START: restart from beginning\n");
                            retval = ffp_start_from_l(mp->ffplayer, 0);
                            if (retval == 0)
                                ijkmp_change_state_l(mp, MP_STATE_STARTED);
                        } else {
                            av_log(mp->ffplayer, AV_LOG_DEBUG, "ijkmp_get_msg: FFP_REQ_START: restart from seek pos\n");
                            retval = ffp_start_l(mp->ffplayer);
                            if (retval == 0)
                                ijkmp_change_state_l(mp, MP_STATE_STARTED);
                        }
                        mp->restart = 0;
                        mp->restart_from_beginning = 0;
                    } else {
                        av_log(mp->ffplayer, AV_LOG_DEBUG, "ijkmp_get_msg: FFP_REQ_START: start on fly\n");
                        retval = ffp_start_l(mp->ffplayer);
                        if (retval == 0)
                            ijkmp_change_state_l(mp, MP_STATE_STARTED);
                    }
                }
                pthread_mutex_unlock(&mp->mutex);
                break;
    
            case FFP_REQ_PAUSE:
                MPTRACE("ijkmp_get_msg: FFP_REQ_PAUSE\n");
                continue_wait_next_msg = 1;
                pthread_mutex_lock(&mp->mutex);
                if (0 == ikjmp_chkst_pause_l(mp->mp_state)) {
                    int pause_ret = ffp_pause_l(mp->ffplayer);
                    if (pause_ret == 0)
                        ijkmp_change_state_l(mp, MP_STATE_PAUSED);
                }
                pthread_mutex_unlock(&mp->mutex);
                break;
    
            case FFP_REQ_SEEK:
                MPTRACE("ijkmp_get_msg: FFP_REQ_SEEK\n");
                continue_wait_next_msg = 1;
    
                pthread_mutex_lock(&mp->mutex);
                if (0 == ikjmp_chkst_seek_l(mp->mp_state)) {
                    mp->restart_from_beginning = 0;
                    if (0 == ffp_seek_to_l(mp->ffplayer, msg->arg1)) {
                        av_log(mp->ffplayer, AV_LOG_DEBUG, "ijkmp_get_msg: FFP_REQ_SEEK: seek to %d\n", (int)msg->arg1);
                    }
                }
                pthread_mutex_unlock(&mp->mutex);
                break;
            }
            if (continue_wait_next_msg) {
                msg_free_res(msg);
                continue;
            }
    
            return retval;
        }
    
        return -1;
    }
    

    从消息队列中取msg消息;

    int retval = msg_queue_get(&mp->ffplayer->msg_queue, msg, block);
    

    mp->ffplayer->msg_queue 是什么时候初始化的,从代码调用流程来看,从ijkmp_prepare_async_l 函数中初始化msg_queue,

        msg_queue_start(&mp->ffplayer->msg_queue);
    

    msg_queue_get 是 ijkplayer/ijkmedia/ijkplayer/ff_ffmsg_queue.h

    inline static int msg_queue_get(MessageQueue *q, AVMessage *msg, int block)
    

    MessageQueue 就是消息队列,AVMessage就是消息队列中的消息;block参数忽略,保留参数,暂时没什么用;

    typedef struct AVMessage {
        int what;
        int arg1;
        int arg2;
        void *obj;
        void (*free_l)(void *obj);
        struct AVMessage *next;
    } AVMessage;
    
    typedef struct MessageQueue {
        AVMessage *first_msg, *last_msg;
        int nb_messages;
        int abort_request;
        SDL_mutex *mutex;
        SDL_cond *cond;
    
        AVMessage *recycle_msg;
        int recycle_count;
        int alloc_count;
    } MessageQueue;
    

    可以发现,这个MessageQueue、AVMessage 和 Android的MessageQueue、Message基本上一样;

    msg_queue_get 函数就是从消息队列中获取要执行的消息执行,这是获取的地方,消息是如何放入消息队列的?

    ijkplayer/ijkmedia/ijkplayer/ff_ffplay_def.h中定义了几个方法,是添加消息到消息队列中的方法;

    inline static void ffp_notify_msg1(FFPlayer *ffp, int what) {
        msg_queue_put_simple3(&ffp->msg_queue, what, 0, 0);
    }
    
    inline static void ffp_notify_msg2(FFPlayer *ffp, int what, int arg1) {
        msg_queue_put_simple3(&ffp->msg_queue, what, arg1, 0);
    }
    
    inline static void ffp_notify_msg3(FFPlayer *ffp, int what, int arg1, int arg2) {
        msg_queue_put_simple3(&ffp->msg_queue, what, arg1, arg2);
    }
    
    inline static void ffp_notify_msg4(FFPlayer *ffp, int what, int arg1, int arg2, void *obj, int obj_len) {
        msg_queue_put_simple4(&ffp->msg_queue, what, arg1, arg2, obj, obj_len);
    }
    

    ijkplayer/ijkmedia/ijkplayer/ff_ffmsg_queue.h 中执行 MessageQueue添加消息动作;

    inline static void msg_queue_put_simple1(MessageQueue *q, int what)
    {
        AVMessage msg;
        msg_init_msg(&msg);
        msg.what = what;
        msg_queue_put(q, &msg);
    }
    
    inline static void msg_queue_put_simple2(MessageQueue *q, int what, int arg1)
    {
        AVMessage msg;
        msg_init_msg(&msg);
        msg.what = what;
        msg.arg1 = arg1;
        msg_queue_put(q, &msg);
    }
    
    inline static void msg_queue_put_simple3(MessageQueue *q, int what, int arg1, int arg2)
    {
        AVMessage msg;
        msg_init_msg(&msg);
        msg.what = what;
        msg.arg1 = arg1;
        msg.arg2 = arg2;
        msg_queue_put(q, &msg);
    }
    
    inline static void msg_queue_put_simple4(MessageQueue *q, int what, int arg1, int arg2, void *obj, int obj_len)
    {
        AVMessage msg;
        msg_init_msg(&msg);
        msg.what = what;
        msg.arg1 = arg1;
        msg.arg2 = arg2;
        msg.obj = av_malloc(obj_len);
        memcpy(msg.obj, obj, obj_len);
        msg.free_l = msg_obj_free_l;
        msg_queue_put(q, &msg);
    }
    

    像onPrepared的回调;

        ffp->prepared = true;
        ffp_notify_msg1(ffp, FFP_MSG_PREPARED);
    

    3.小结

    • 熟悉ijkplayer 底层的消息分发机制,便于我们深入理解ijkplayer的基本工作机制;

    感谢关注公众号JeffMony,持续给你带来音视频方面的知识。

    相关文章

      网友评论

        本文标题:ijkplayer 开源项目分析(三)msg_queue消息机制

        本文链接:https://www.haomeiwen.com/subject/txawxhtx.html