美文网首页
Nuplayer Decoder从 Source读取数据

Nuplayer Decoder从 Source读取数据

作者: Nothing_655f | 来源:发表于2022-04-01 15:55 被阅读0次

Nuplayer Decoder从 Source读取数据

NuPlayer注册的的是异步的callback,buffer的输入输出状态由OMX回调ACodec后到MediaCodec,然后再到NuPlayerDecoder 这边,关于更多codec 状态初始话和转换可以看这个文章

从NuPlayer到MediaCodec到ACodec到OMX的整体流程与状态转换

这里主要是介绍Player 如何把 Source 数据跟Decoder 串起来的

void NuPlayer::Decoder::onMessageReceived(const sp<AMessage> &msg) {
    ALOGV("[%s] onMessage: %s", mComponentName.c_str(), msg->debugString().c_str());

    switch (msg->what()) {
        case kWhatCodecNotify:
        {
            int32_t cbID;
            CHECK(msg->findInt32("callbackID", &cbID));

            ALOGV("[%s] kWhatCodecNotify: cbID = %d, paused = %d",
                    mIsAudio ? "audio" : "video", cbID, mPaused);

            if (mPaused) {
                break;
            }

            switch (cbID) {
                case MediaCodec::CB_INPUT_AVAILABLE:
                {
                    int32_t index;
                    CHECK(msg->findInt32("index", &index));

                    handleAnInputBuffer(index);
                    break;
                }

                case MediaCodec::CB_OUTPUT_AVAILABLE:
                {
                    int32_t index;
                    size_t offset;
                    size_t size;
                    int64_t timeUs;
                    int32_t flags;

                    CHECK(msg->findInt32("index", &index));
                    CHECK(msg->findSize("offset", &offset));
                    CHECK(msg->findSize("size", &size));
                    CHECK(msg->findInt64("timeUs", &timeUs));
                    CHECK(msg->findInt32("flags", &flags));

                    handleAnOutputBuffer(index, offset, size, timeUs, flags);
                    break;
                }

                case MediaCodec::CB_OUTPUT_FORMAT_CHANGED:
                {
                    sp<AMessage> format;
                    CHECK(msg->findMessage("format", &format));

                    handleOutputFormatChange(format);
                    break;
                }

                case MediaCodec::CB_ERROR:
                {
                    status_t err;
                    CHECK(msg->findInt32("err", &err));
                    ALOGE("Decoder (%s) reported error : 0x%x",
                            mIsAudio ? "audio" : "video", err);

                    handleError(err);
                    break;
                }

                default:
                {
                    TRESPASS();
                    break;
                }
            }

            break;
        }

        case kWhatRenderBuffer:
        {
            if (!isStaleReply(msg)) {
                onRenderBuffer(msg);
            }
            break;
        }

        case kWhatAudioOutputFormatChanged:
        {
            if (!isStaleReply(msg)) {
                status_t err;
                if (msg->findInt32("err", &err) && err != OK) {
                    ALOGE("Renderer reported 0x%x when changing audio output format", err);
                    handleError(err);
                }
            }
            break;
        }

        case kWhatSetVideoSurface:
        {
            sp<AReplyToken> replyID;
            CHECK(msg->senderAwaitsResponse(&replyID));

            sp<RefBase> obj;
            CHECK(msg->findObject("surface", &obj));
            sp<Surface> surface = static_cast<Surface *>(obj.get()); // non-null
            int32_t err = INVALID_OPERATION;
            // NOTE: in practice mSurface is always non-null, but checking here for completeness
            if (mCodec != NULL && mSurface != NULL) {
                // TODO: once AwesomePlayer is removed, remove this automatic connecting
                // to the surface by MediaPlayerService.
                //
                // at this point MediaPlayerService::client has already connected to the
                // surface, which MediaCodec does not expect
                err = nativeWindowDisconnect(surface.get(), "kWhatSetVideoSurface(surface)");
                if (err == OK) {
                    err = mCodec->setSurface(surface);
                    ALOGI_IF(err, "codec setSurface returned: %d", err);
                    if (err == OK) {
                        // reconnect to the old surface as MPS::Client will expect to
                        // be able to disconnect from it.
                        (void)nativeWindowConnect(mSurface.get(), "kWhatSetVideoSurface(mSurface)");
                        mSurface = surface;
                    }
                }
                if (err != OK) {
                    // reconnect to the new surface on error as MPS::Client will expect to
                    // be able to disconnect from it.
                    (void)nativeWindowConnect(surface.get(), "kWhatSetVideoSurface(err)");
                }
            }

            sp<AMessage> response = new AMessage;
            response->setInt32("err", err);
            response->postReply(replyID);
            break;
        }

        case kWhatDrmReleaseCrypto:
        {
            ALOGV("kWhatDrmReleaseCrypto");
            onReleaseCrypto(msg);
            break;
        }

        default:
            DecoderBase::onMessageReceived(msg);
            break;
    }
}

对于decoder来说,请求一个buffer,其实就是请求填充demux 数据的buffer

bool NuPlayer::Decoder::handleAnInputBuffer(size_t index) {
    if (isDiscontinuityPending()) {
        return false;
    }

    if (mCodec == NULL) {
        ALOGE("[%s] handleAnInputBuffer without a valid codec", mComponentName.c_str());
        handleError(NO_INIT);
        return false;
    }

    sp<MediaCodecBuffer> buffer;
    mCodec->getInputBuffer(index, &buffer);

    if (buffer == NULL) {
        ALOGE("[%s] handleAnInputBuffer, failed to get input buffer", mComponentName.c_str());
        handleError(UNKNOWN_ERROR);
        return false;
    }

    if (index >= mInputBuffers.size()) {
        for (size_t i = mInputBuffers.size(); i <= index; ++i) {
            mInputBuffers.add();
            mMediaBuffers.add();
            mInputBufferIsDequeued.add();
            mMediaBuffers.editItemAt(i) = NULL;
            mInputBufferIsDequeued.editItemAt(i) = false;
        }
    }
    mInputBuffers.editItemAt(index) = buffer;

    //CHECK_LT(bufferIx, mInputBuffers.size());

    if (mMediaBuffers[index] != NULL) {
        mMediaBuffers[index]->release();
        mMediaBuffers.editItemAt(index) = NULL;
    }
    mInputBufferIsDequeued.editItemAt(index) = true;

    if (!mCSDsToSubmit.isEmpty()) {
        sp<AMessage> msg = new AMessage();
        msg->setSize("buffer-ix", index);

        sp<ABuffer> buffer = mCSDsToSubmit.itemAt(0);
        ALOGI("[%s] resubmitting CSD", mComponentName.c_str());
        msg->setBuffer("buffer", buffer);
        mCSDsToSubmit.removeAt(0);
        if (!onInputBufferFetched(msg)) { // 1
            handleError(UNKNOWN_ERROR);
            return false;
        }
        return true;
    }

    while (!mPendingInputMessages.empty()) {
        sp<AMessage> msg = *mPendingInputMessages.begin();
        if (!onInputBufferFetched(msg)) {  // 2
            break;
        }
        mPendingInputMessages.erase(mPendingInputMessages.begin());
    }

    if (!mInputBufferIsDequeued.editItemAt(index)) {
        return true;
    }

    mDequeuedInputBuffers.push_back(index);

    onRequestInputBuffers();
    return true;
}

上述函数主要调用了onInputBufferFetched 和 onRequestInputBuffers方法,

onInputBufferFetched主要是处理buffer输入后的一些逻辑,在 onRequestInputBuffers -> doRequestBuffers 中也会调用onInputBufferFetched 来处理,最后面会过下这块的流程

如果发现buffer数据不足需要请求更多数据会通过msg looper doRequestBuffers读取更多数据

void NuPlayer::DecoderBase::onRequestInputBuffers() {
    if (mRequestInputBuffersPending) {
        return;
    }

    // doRequestBuffers() return true if we should request more data
    if (doRequestBuffers()) {
        mRequestInputBuffersPending = true;

        sp<AMessage> msg = new AMessage(kWhatRequestInputBuffers, this);
        msg->post(10 * 1000ll);
    }
}
bool NuPlayer::Decoder::doRequestBuffers() {
    if (isDiscontinuityPending()) {
        return false;
    }
    status_t err = OK;
    while (err == OK && !mDequeuedInputBuffers.empty()) {
        size_t bufferIx = *mDequeuedInputBuffers.begin();
        sp<AMessage> msg = new AMessage();
        msg->setSize("buffer-ix", bufferIx);
        err = fetchInputData(msg);
        if (err != OK && err != ERROR_END_OF_STREAM) {
            // if EOS, need to queue EOS buffer
            break;
        }
        mDequeuedInputBuffers.erase(mDequeuedInputBuffers.begin());

        if (!mPendingInputMessages.empty()
                || !onInputBufferFetched(msg)) {
            mPendingInputMessages.push_back(msg);
        }
    }

    return err == -EWOULDBLOCK
            && mSource->feedMoreTSData() == OK;
}

这里面就是 fetchInputData 主要实现

status_t NuPlayer::Decoder::fetchInputData(sp<AMessage> &reply) {
    sp<ABuffer> accessUnit;
    bool dropAccessUnit = true;
    do {
        status_t err = mSource->dequeueAccessUnit(mIsAudio, &accessUnit);

        if (err == -EWOULDBLOCK) {
            return err;
        } else if (err != OK) {
            if (err == INFO_DISCONTINUITY) {
                // ...
            }

            // reply should only be returned without a buffer set
            // when there is an error (including EOS)
            CHECK(err != OK);

            reply->setInt32("err", err);
            return ERROR_END_OF_STREAM;
        }

        dropAccessUnit = false;
        if (!mIsAudio && !mIsEncrypted) {
            // Extra safeguard if higher-level behavior changes. Otherwise, not required now.
            // Preventing the buffer from being processed (and sent to codec) if this is a later
            // round of playback but this time without prepareDrm. Or if there is a race between
            // stop (which is not blocking) and releaseDrm allowing buffers being processed after
            // Crypto has been released (GenericSource currently prevents this race though).
            // Particularly doing this check before IsAVCReferenceFrame call to prevent parsing
            // of encrypted data.
            if (mIsEncryptedObservedEarlier) {
                ALOGE("fetchInputData: mismatched mIsEncrypted/mIsEncryptedObservedEarlier (0/1)");

                return INVALID_OPERATION;
            }

            int32_t layerId = 0;
            bool haveLayerId = accessUnit->meta()->findInt32("temporal-layer-id", &layerId);
            if (mRenderer->getVideoLateByUs() > 100000ll
                    && mIsVideoAVC
                    && !IsAVCReferenceFrame(accessUnit)) {
                dropAccessUnit = true;
            } else if (haveLayerId && mNumVideoTemporalLayerTotal > 1) {
                // Add only one layer each time.
                if (layerId > mCurrentMaxVideoTemporalLayerId + 1
                        || layerId >= mNumVideoTemporalLayerAllowed) {
                    dropAccessUnit = true;
                    ALOGV("dropping layer(%d), speed=%g, allowed layer count=%d, max layerId=%d",
                            layerId, mPlaybackSpeed, mNumVideoTemporalLayerAllowed,
                            mCurrentMaxVideoTemporalLayerId);
                } else if (layerId > mCurrentMaxVideoTemporalLayerId) {
                    mCurrentMaxVideoTemporalLayerId = layerId;
                } else if (layerId == 0 && mNumVideoTemporalLayerTotal > 1
                        && IsIDR(accessUnit->data(), accessUnit->size())) {
                    mCurrentMaxVideoTemporalLayerId = mNumVideoTemporalLayerTotal - 1;
                }
            }
            if (dropAccessUnit) {
                if (layerId <= mCurrentMaxVideoTemporalLayerId && layerId > 0) {
                    mCurrentMaxVideoTemporalLayerId = layerId - 1;
                }
                ++mNumInputFramesDropped;
            }
        }
    } while (dropAccessUnit);

#if 1
    int64_t mediaTimeUs = -1;
    accessUnit->meta()->findInt64("timeUs", &mediaTimeUs);
    ALOGV("[%s] feeding input buffer at media time %.3f",
         mIsAudio ? "audio" : "video",
         mediaTimeUs / 1E6);
#endif

    if (mCCDecoder != NULL) {
        mCCDecoder->decode(accessUnit);
    }

    reply->setBuffer("buffer", accessUnit);

    return OK;
}

这里就是调用了status_t err = mSource->dequeueAccessUnit(mIsAudio, &accessUnit);方法,数据就在accessUnit指针中

这里的mSource 要看具体对应的类型,以本地文件为例,其source 就是GenericSource

status_t NuPlayer::GenericSource::dequeueAccessUnit(
        bool audio, sp<ABuffer> *accessUnit) {
    Mutex::Autolock _l(mLock);
    // If has gone through stop/releaseDrm sequence, we no longer send down any buffer b/c
    // the codec's crypto object has gone away (b/37960096).
    // Note: This will be unnecessary when stop() changes behavior and releases codec (b/35248283).
    if (!mStarted && mIsDrmReleased) {
        return -EWOULDBLOCK;
    }

    Track *track = audio ? &mAudioTrack : &mVideoTrack;

    if (track->mSource == NULL) {
        ALOGD("%s %d EWOULDBLOCK", __FUNCTION__, __LINE__);
        return -EWOULDBLOCK;
    }

    status_t finalResult;
    ALOGD("%s %d go to check hasBufferAvailable of %s", __FUNCTION__, __LINE__, audio ? "audio" : "video");
    if (!track->mPackets->hasBufferAvailable(&finalResult)) {
        if (finalResult == OK) {
            postReadBuffer(
                    audio ? MEDIA_TRACK_TYPE_AUDIO : MEDIA_TRACK_TYPE_VIDEO);
            ALOGD("%s %d EWOULDBLOCK", __FUNCTION__, __LINE__);
            return -EWOULDBLOCK;
        }
        if (finalResult == -EWOULDBLOCK) {
            ALOGD("%s %d EWOULDBLOCK of %s", __FUNCTION__, __LINE__, audio ? "audio" : "video");
        }
        return finalResult;
    }

    status_t result = track->mPackets->dequeueAccessUnit(accessUnit);

这个track->packets就是负责读取对应extractor中demux出来的track es data,在如下函数中初始化

status_t NuPlayer::GenericSource::initFromDataSource() {
            if (!strncasecmp(mime, "audio/", 6)) {
            if (mAudioTrack.mSource == NULL) {
                mAudioTrack.mIndex = i;
                mAudioTrack.mSource = track;
                mAudioTrack.mPackets =
                    new AnotherPacketSource(mAudioTrack.mSource->getFormat());
                // ...
                mMimes.add(String8(mime));
            }
        } else if (!strncasecmp(mime, "video/", 6)) {
            if (mVideoTrack.mSource == NULL) {
                mVideoTrack.mIndex = i;
                mVideoTrack.mSource = track;
                mVideoTrack.mPackets =
                    new AnotherPacketSource(mVideoTrack.mSource->getFormat());

                // video always at the beginning
                mMimes.insertAt(String8(mime), 0);
            }
}

在onInputBufferFetch完成后会有一个 onInputBufferFetched函数方法,可以大概过一下

bool NuPlayer::Decoder::onInputBufferFetched(const sp<AMessage> &msg) {

     status_t err = mCodec->queueInputBuffer
}

从这条路走下来是走到

status_t MediaCodec::onQueueInputBuffer(const sp<AMessage> &msg) {
    if (hasCryptoOrDescrambler()) {
        AString *errorDetailMsg;
        CHECK(msg->findPointer("errorDetailMsg", (void **)&errorDetailMsg));

        err = mBufferChannel->queueSecureInputBuffer(
                buffer,
                (mFlags & kFlagIsSecure),
                key,
                iv,
                mode,
                pattern,
                subSamples,
                numSubSamples,
                errorDetailMsg);
    } else {
        err = mBufferChannel->queueInputBuffer(buffer);
    }
}

那么需要看下这个 mBufferChannel是什么对象了

在 status_t MediaCodec::init 的时候给 mBufferChannel = mCodec->getBufferChannel(); 赋值,这个是从ACodec来

std::shared_ptr<BufferChannelBase> ACodec::getBufferChannel() {
    if (!mBufferChannel) {
        mBufferChannel = std::make_shared<ACodecBufferChannel>(
                new AMessage(kWhatInputBufferFilled, this),
                new AMessage(kWhatOutputBufferDrained, this));
    }
    return mBufferChannel;
}
status_t ACodecBufferChannel::queueInputBuffer(const sp<MediaCodecBuffer> &buffer) {
    if (mDealer != nullptr) {
        return -ENOSYS;
    }
    std::shared_ptr<const std::vector<const BufferInfo>> array(
            std::atomic_load(&mInputBuffers));
    BufferInfoIterator it = findClientBuffer(array, buffer);
    if (it == array->end()) {
        return -ENOENT;
    }
    ALOGV("queueInputBuffer #%d", it->mBufferId);
    sp<AMessage> msg = mInputBufferFilled->dup();
    msg->setObject("buffer", it->mCodecBuffer);
    msg->setInt32("buffer-id", it->mBufferId);
    msg->post();
    return OK;
}

相关文章

网友评论

      本文标题:Nuplayer Decoder从 Source读取数据

      本文链接:https://www.haomeiwen.com/subject/odpajrtx.html