Nuplayer Decoder从 Source读取数据
NuPlayer注册的的是异步的callback,buffer的输入输出状态由OMX回调ACodec后到MediaCodec,然后再到NuPlayerDecoder 这边,关于更多codec 状态初始话和转换可以看这个文章
这里主要是介绍Player 如何把 Source 数据跟Decoder 串起来的
void NuPlayer::Decoder::onMessageReceived(const sp<AMessage> &msg) {
ALOGV("[%s] onMessage: %s", mComponentName.c_str(), msg->debugString().c_str());
switch (msg->what()) {
case kWhatCodecNotify:
{
int32_t cbID;
CHECK(msg->findInt32("callbackID", &cbID));
ALOGV("[%s] kWhatCodecNotify: cbID = %d, paused = %d",
mIsAudio ? "audio" : "video", cbID, mPaused);
if (mPaused) {
break;
}
switch (cbID) {
case MediaCodec::CB_INPUT_AVAILABLE:
{
int32_t index;
CHECK(msg->findInt32("index", &index));
handleAnInputBuffer(index);
break;
}
case MediaCodec::CB_OUTPUT_AVAILABLE:
{
int32_t index;
size_t offset;
size_t size;
int64_t timeUs;
int32_t flags;
CHECK(msg->findInt32("index", &index));
CHECK(msg->findSize("offset", &offset));
CHECK(msg->findSize("size", &size));
CHECK(msg->findInt64("timeUs", &timeUs));
CHECK(msg->findInt32("flags", &flags));
handleAnOutputBuffer(index, offset, size, timeUs, flags);
break;
}
case MediaCodec::CB_OUTPUT_FORMAT_CHANGED:
{
sp<AMessage> format;
CHECK(msg->findMessage("format", &format));
handleOutputFormatChange(format);
break;
}
case MediaCodec::CB_ERROR:
{
status_t err;
CHECK(msg->findInt32("err", &err));
ALOGE("Decoder (%s) reported error : 0x%x",
mIsAudio ? "audio" : "video", err);
handleError(err);
break;
}
default:
{
TRESPASS();
break;
}
}
break;
}
case kWhatRenderBuffer:
{
if (!isStaleReply(msg)) {
onRenderBuffer(msg);
}
break;
}
case kWhatAudioOutputFormatChanged:
{
if (!isStaleReply(msg)) {
status_t err;
if (msg->findInt32("err", &err) && err != OK) {
ALOGE("Renderer reported 0x%x when changing audio output format", err);
handleError(err);
}
}
break;
}
case kWhatSetVideoSurface:
{
sp<AReplyToken> replyID;
CHECK(msg->senderAwaitsResponse(&replyID));
sp<RefBase> obj;
CHECK(msg->findObject("surface", &obj));
sp<Surface> surface = static_cast<Surface *>(obj.get()); // non-null
int32_t err = INVALID_OPERATION;
// NOTE: in practice mSurface is always non-null, but checking here for completeness
if (mCodec != NULL && mSurface != NULL) {
// TODO: once AwesomePlayer is removed, remove this automatic connecting
// to the surface by MediaPlayerService.
//
// at this point MediaPlayerService::client has already connected to the
// surface, which MediaCodec does not expect
err = nativeWindowDisconnect(surface.get(), "kWhatSetVideoSurface(surface)");
if (err == OK) {
err = mCodec->setSurface(surface);
ALOGI_IF(err, "codec setSurface returned: %d", err);
if (err == OK) {
// reconnect to the old surface as MPS::Client will expect to
// be able to disconnect from it.
(void)nativeWindowConnect(mSurface.get(), "kWhatSetVideoSurface(mSurface)");
mSurface = surface;
}
}
if (err != OK) {
// reconnect to the new surface on error as MPS::Client will expect to
// be able to disconnect from it.
(void)nativeWindowConnect(surface.get(), "kWhatSetVideoSurface(err)");
}
}
sp<AMessage> response = new AMessage;
response->setInt32("err", err);
response->postReply(replyID);
break;
}
case kWhatDrmReleaseCrypto:
{
ALOGV("kWhatDrmReleaseCrypto");
onReleaseCrypto(msg);
break;
}
default:
DecoderBase::onMessageReceived(msg);
break;
}
}
对于decoder来说,请求一个buffer,其实就是请求填充demux 数据的buffer
bool NuPlayer::Decoder::handleAnInputBuffer(size_t index) {
if (isDiscontinuityPending()) {
return false;
}
if (mCodec == NULL) {
ALOGE("[%s] handleAnInputBuffer without a valid codec", mComponentName.c_str());
handleError(NO_INIT);
return false;
}
sp<MediaCodecBuffer> buffer;
mCodec->getInputBuffer(index, &buffer);
if (buffer == NULL) {
ALOGE("[%s] handleAnInputBuffer, failed to get input buffer", mComponentName.c_str());
handleError(UNKNOWN_ERROR);
return false;
}
if (index >= mInputBuffers.size()) {
for (size_t i = mInputBuffers.size(); i <= index; ++i) {
mInputBuffers.add();
mMediaBuffers.add();
mInputBufferIsDequeued.add();
mMediaBuffers.editItemAt(i) = NULL;
mInputBufferIsDequeued.editItemAt(i) = false;
}
}
mInputBuffers.editItemAt(index) = buffer;
//CHECK_LT(bufferIx, mInputBuffers.size());
if (mMediaBuffers[index] != NULL) {
mMediaBuffers[index]->release();
mMediaBuffers.editItemAt(index) = NULL;
}
mInputBufferIsDequeued.editItemAt(index) = true;
if (!mCSDsToSubmit.isEmpty()) {
sp<AMessage> msg = new AMessage();
msg->setSize("buffer-ix", index);
sp<ABuffer> buffer = mCSDsToSubmit.itemAt(0);
ALOGI("[%s] resubmitting CSD", mComponentName.c_str());
msg->setBuffer("buffer", buffer);
mCSDsToSubmit.removeAt(0);
if (!onInputBufferFetched(msg)) { // 1
handleError(UNKNOWN_ERROR);
return false;
}
return true;
}
while (!mPendingInputMessages.empty()) {
sp<AMessage> msg = *mPendingInputMessages.begin();
if (!onInputBufferFetched(msg)) { // 2
break;
}
mPendingInputMessages.erase(mPendingInputMessages.begin());
}
if (!mInputBufferIsDequeued.editItemAt(index)) {
return true;
}
mDequeuedInputBuffers.push_back(index);
onRequestInputBuffers();
return true;
}
上述函数主要调用了onInputBufferFetched 和 onRequestInputBuffers方法,
onInputBufferFetched主要是处理buffer输入后的一些逻辑,在 onRequestInputBuffers -> doRequestBuffers 中也会调用onInputBufferFetched 来处理,最后面会过下这块的流程
如果发现buffer数据不足需要请求更多数据会通过msg looper doRequestBuffers读取更多数据
void NuPlayer::DecoderBase::onRequestInputBuffers() {
if (mRequestInputBuffersPending) {
return;
}
// doRequestBuffers() return true if we should request more data
if (doRequestBuffers()) {
mRequestInputBuffersPending = true;
sp<AMessage> msg = new AMessage(kWhatRequestInputBuffers, this);
msg->post(10 * 1000ll);
}
}
bool NuPlayer::Decoder::doRequestBuffers() {
if (isDiscontinuityPending()) {
return false;
}
status_t err = OK;
while (err == OK && !mDequeuedInputBuffers.empty()) {
size_t bufferIx = *mDequeuedInputBuffers.begin();
sp<AMessage> msg = new AMessage();
msg->setSize("buffer-ix", bufferIx);
err = fetchInputData(msg);
if (err != OK && err != ERROR_END_OF_STREAM) {
// if EOS, need to queue EOS buffer
break;
}
mDequeuedInputBuffers.erase(mDequeuedInputBuffers.begin());
if (!mPendingInputMessages.empty()
|| !onInputBufferFetched(msg)) {
mPendingInputMessages.push_back(msg);
}
}
return err == -EWOULDBLOCK
&& mSource->feedMoreTSData() == OK;
}
这里面就是 fetchInputData 主要实现
status_t NuPlayer::Decoder::fetchInputData(sp<AMessage> &reply) {
sp<ABuffer> accessUnit;
bool dropAccessUnit = true;
do {
status_t err = mSource->dequeueAccessUnit(mIsAudio, &accessUnit);
if (err == -EWOULDBLOCK) {
return err;
} else if (err != OK) {
if (err == INFO_DISCONTINUITY) {
// ...
}
// reply should only be returned without a buffer set
// when there is an error (including EOS)
CHECK(err != OK);
reply->setInt32("err", err);
return ERROR_END_OF_STREAM;
}
dropAccessUnit = false;
if (!mIsAudio && !mIsEncrypted) {
// Extra safeguard if higher-level behavior changes. Otherwise, not required now.
// Preventing the buffer from being processed (and sent to codec) if this is a later
// round of playback but this time without prepareDrm. Or if there is a race between
// stop (which is not blocking) and releaseDrm allowing buffers being processed after
// Crypto has been released (GenericSource currently prevents this race though).
// Particularly doing this check before IsAVCReferenceFrame call to prevent parsing
// of encrypted data.
if (mIsEncryptedObservedEarlier) {
ALOGE("fetchInputData: mismatched mIsEncrypted/mIsEncryptedObservedEarlier (0/1)");
return INVALID_OPERATION;
}
int32_t layerId = 0;
bool haveLayerId = accessUnit->meta()->findInt32("temporal-layer-id", &layerId);
if (mRenderer->getVideoLateByUs() > 100000ll
&& mIsVideoAVC
&& !IsAVCReferenceFrame(accessUnit)) {
dropAccessUnit = true;
} else if (haveLayerId && mNumVideoTemporalLayerTotal > 1) {
// Add only one layer each time.
if (layerId > mCurrentMaxVideoTemporalLayerId + 1
|| layerId >= mNumVideoTemporalLayerAllowed) {
dropAccessUnit = true;
ALOGV("dropping layer(%d), speed=%g, allowed layer count=%d, max layerId=%d",
layerId, mPlaybackSpeed, mNumVideoTemporalLayerAllowed,
mCurrentMaxVideoTemporalLayerId);
} else if (layerId > mCurrentMaxVideoTemporalLayerId) {
mCurrentMaxVideoTemporalLayerId = layerId;
} else if (layerId == 0 && mNumVideoTemporalLayerTotal > 1
&& IsIDR(accessUnit->data(), accessUnit->size())) {
mCurrentMaxVideoTemporalLayerId = mNumVideoTemporalLayerTotal - 1;
}
}
if (dropAccessUnit) {
if (layerId <= mCurrentMaxVideoTemporalLayerId && layerId > 0) {
mCurrentMaxVideoTemporalLayerId = layerId - 1;
}
++mNumInputFramesDropped;
}
}
} while (dropAccessUnit);
#if 1
int64_t mediaTimeUs = -1;
accessUnit->meta()->findInt64("timeUs", &mediaTimeUs);
ALOGV("[%s] feeding input buffer at media time %.3f",
mIsAudio ? "audio" : "video",
mediaTimeUs / 1E6);
#endif
if (mCCDecoder != NULL) {
mCCDecoder->decode(accessUnit);
}
reply->setBuffer("buffer", accessUnit);
return OK;
}
这里就是调用了status_t err = mSource->dequeueAccessUnit(mIsAudio, &accessUnit);方法,数据就在accessUnit指针中
这里的mSource 要看具体对应的类型,以本地文件为例,其source 就是GenericSource
status_t NuPlayer::GenericSource::dequeueAccessUnit(
bool audio, sp<ABuffer> *accessUnit) {
Mutex::Autolock _l(mLock);
// If has gone through stop/releaseDrm sequence, we no longer send down any buffer b/c
// the codec's crypto object has gone away (b/37960096).
// Note: This will be unnecessary when stop() changes behavior and releases codec (b/35248283).
if (!mStarted && mIsDrmReleased) {
return -EWOULDBLOCK;
}
Track *track = audio ? &mAudioTrack : &mVideoTrack;
if (track->mSource == NULL) {
ALOGD("%s %d EWOULDBLOCK", __FUNCTION__, __LINE__);
return -EWOULDBLOCK;
}
status_t finalResult;
ALOGD("%s %d go to check hasBufferAvailable of %s", __FUNCTION__, __LINE__, audio ? "audio" : "video");
if (!track->mPackets->hasBufferAvailable(&finalResult)) {
if (finalResult == OK) {
postReadBuffer(
audio ? MEDIA_TRACK_TYPE_AUDIO : MEDIA_TRACK_TYPE_VIDEO);
ALOGD("%s %d EWOULDBLOCK", __FUNCTION__, __LINE__);
return -EWOULDBLOCK;
}
if (finalResult == -EWOULDBLOCK) {
ALOGD("%s %d EWOULDBLOCK of %s", __FUNCTION__, __LINE__, audio ? "audio" : "video");
}
return finalResult;
}
status_t result = track->mPackets->dequeueAccessUnit(accessUnit);
这个track->packets就是负责读取对应extractor中demux出来的track es data,在如下函数中初始化
status_t NuPlayer::GenericSource::initFromDataSource() {
if (!strncasecmp(mime, "audio/", 6)) {
if (mAudioTrack.mSource == NULL) {
mAudioTrack.mIndex = i;
mAudioTrack.mSource = track;
mAudioTrack.mPackets =
new AnotherPacketSource(mAudioTrack.mSource->getFormat());
// ...
mMimes.add(String8(mime));
}
} else if (!strncasecmp(mime, "video/", 6)) {
if (mVideoTrack.mSource == NULL) {
mVideoTrack.mIndex = i;
mVideoTrack.mSource = track;
mVideoTrack.mPackets =
new AnotherPacketSource(mVideoTrack.mSource->getFormat());
// video always at the beginning
mMimes.insertAt(String8(mime), 0);
}
}
在onInputBufferFetch完成后会有一个 onInputBufferFetched函数方法,可以大概过一下
bool NuPlayer::Decoder::onInputBufferFetched(const sp<AMessage> &msg) {
status_t err = mCodec->queueInputBuffer
}
从这条路走下来是走到
status_t MediaCodec::onQueueInputBuffer(const sp<AMessage> &msg) {
if (hasCryptoOrDescrambler()) {
AString *errorDetailMsg;
CHECK(msg->findPointer("errorDetailMsg", (void **)&errorDetailMsg));
err = mBufferChannel->queueSecureInputBuffer(
buffer,
(mFlags & kFlagIsSecure),
key,
iv,
mode,
pattern,
subSamples,
numSubSamples,
errorDetailMsg);
} else {
err = mBufferChannel->queueInputBuffer(buffer);
}
}
那么需要看下这个 mBufferChannel是什么对象了
在 status_t MediaCodec::init 的时候给 mBufferChannel = mCodec->getBufferChannel(); 赋值,这个是从ACodec来
std::shared_ptr<BufferChannelBase> ACodec::getBufferChannel() {
if (!mBufferChannel) {
mBufferChannel = std::make_shared<ACodecBufferChannel>(
new AMessage(kWhatInputBufferFilled, this),
new AMessage(kWhatOutputBufferDrained, this));
}
return mBufferChannel;
}
status_t ACodecBufferChannel::queueInputBuffer(const sp<MediaCodecBuffer> &buffer) {
if (mDealer != nullptr) {
return -ENOSYS;
}
std::shared_ptr<const std::vector<const BufferInfo>> array(
std::atomic_load(&mInputBuffers));
BufferInfoIterator it = findClientBuffer(array, buffer);
if (it == array->end()) {
return -ENOENT;
}
ALOGV("queueInputBuffer #%d", it->mBufferId);
sp<AMessage> msg = mInputBufferFilled->dup();
msg->setObject("buffer", it->mCodecBuffer);
msg->setInt32("buffer-id", it->mBufferId);
msg->post();
return OK;
}
网友评论