本篇介绍
本篇介绍下AudioRecord的线程运行,以及startRecording,stop,pause等流程。
源码介绍
线程运行流程
先看下RecordThread的创建:
AudioFlinger::RecordThread::RecordThread(const sp<AudioFlinger>& audioFlinger,
AudioStreamIn *input,
audio_io_handle_t id,
bool systemReady
) :
ThreadBase(audioFlinger, id, RECORD, systemReady, false /* isOut */),
mInput(input),
mSource(mInput),
mActiveTracks(&this->mLocalLog),
mRsmpInBuffer(NULL),
// mRsmpInFrames, mRsmpInFramesP2, and mRsmpInFramesOA are set by readInputParameters_l()
mRsmpInRear(0)
, mReadOnlyHeap(new MemoryDealer(kRecordThreadReadOnlyHeapSize,
"RecordThreadRO", MemoryHeapBase::READ_ONLY))
// mFastCapture below
, mFastCaptureFutex(0)
// mInputSource
// mPipeSink
// mPipeSource
, mPipeFramesP2(0)
// mPipeMemory
// mFastCaptureNBLogWriter
, mFastTrackAvail(false)
, mBtNrecSuspended(false)
{
snprintf(mThreadName, kThreadNameLength, "AudioIn_%X", id);
mNBLogWriter = audioFlinger->newWriter_l(kLogSize, mThreadName);
if (mInput->audioHwDev != nullptr) {
mIsMsdDevice = strcmp(
mInput->audioHwDev->moduleName(), AUDIO_HARDWARE_MODULE_ID_MSD) == 0;
}
readInputParameters_l();
// TODO: We may also match on address as well as device type for
// AUDIO_DEVICE_IN_BUS, AUDIO_DEVICE_IN_BLUETOOTH_A2DP, AUDIO_DEVICE_IN_REMOTE_SUBMIX
// TODO: This property should be ensure that only contains one single device type.
mTimestampCorrectedDevice = (audio_devices_t)property_get_int64(
"audio.timestamp.corrected_input_device",
(int64_t)(mIsMsdDevice ? AUDIO_DEVICE_IN_BUS // turn on by default for MSD
: AUDIO_DEVICE_NONE));
// create an NBAIO source for the HAL input stream, and negotiate
mInputSource = new AudioStreamInSource(input->stream); // 读取源头
size_t numCounterOffers = 0;
const NBAIO_Format offers[1] = {Format_from_SR_C(mSampleRate, mChannelCount, mFormat)};
#if !LOG_NDEBUG
ssize_t index =
#else
(void)
#endif
mInputSource->negotiate(offers, 1, NULL, numCounterOffers);
ALOG_ASSERT(index == 0);
// initialize fast capture depending on configuration
bool initFastCapture;
switch (kUseFastCapture) {
case FastCapture_Never:
initFastCapture = false;
ALOGV("%p kUseFastCapture = Never, initFastCapture = false", this);
break;
case FastCapture_Always:
initFastCapture = true;
ALOGV("%p kUseFastCapture = Always, initFastCapture = true", this);
break;
case FastCapture_Static:
initFastCapture = (mFrameCount * 1000) / mSampleRate < kMinNormalCaptureBufferSizeMs;
ALOGV("%p kUseFastCapture = Static, (%lld * 1000) / %u vs %u, initFastCapture = %d",
this, (long long)mFrameCount, mSampleRate, kMinNormalCaptureBufferSizeMs,
initFastCapture);
break;
// case FastCapture_Dynamic:
}
if (initFastCapture) {
// create a Pipe for FastCapture to write to, and for us and fast tracks to read from
NBAIO_Format format = mInputSource->format();
// quadruple-buffering of 20 ms each; this ensures we can sleep for 20ms in RecordThread
size_t pipeFramesP2 = roundup(4 * FMS_20 * mSampleRate / 1000);
size_t pipeSize = pipeFramesP2 * Format_frameSize(format);
void *pipeBuffer = nullptr;
const sp<MemoryDealer> roHeap(readOnlyHeap());
sp<IMemory> pipeMemory;
if ((roHeap == 0) ||
(pipeMemory = roHeap->allocate(pipeSize)) == 0 ||
(pipeBuffer = pipeMemory->unsecurePointer()) == nullptr) {
ALOGE("not enough memory for pipe buffer size=%zu; "
"roHeap=%p, pipeMemory=%p, pipeBuffer=%p; roHeapSize: %lld",
pipeSize, roHeap.get(), pipeMemory.get(), pipeBuffer,
(long long)kRecordThreadReadOnlyHeapSize);
goto failed;
}
// pipe will be shared directly with fast clients, so clear to avoid leaking old information
memset(pipeBuffer, 0, pipeSize);
Pipe *pipe = new Pipe(pipeFramesP2, format, pipeBuffer);
const NBAIO_Format offers[1] = {format};
size_t numCounterOffers = 0;
ssize_t index = pipe->negotiate(offers, 1, NULL, numCounterOffers);
ALOG_ASSERT(index == 0);
mPipeSink = pipe;
PipeReader *pipeReader = new PipeReader(*pipe);
numCounterOffers = 0;
index = pipeReader->negotiate(offers, 1, NULL, numCounterOffers);
ALOG_ASSERT(index == 0);
mPipeSource = pipeReader;
mPipeFramesP2 = pipeFramesP2;
mPipeMemory = pipeMemory;
// create fast capture
mFastCapture = new FastCapture();
FastCaptureStateQueue *sq = mFastCapture->sq();
#ifdef STATE_QUEUE_DUMP
// FIXME
#endif
FastCaptureState *state = sq->begin();
state->mCblk = NULL;
state->mInputSource = mInputSource.get();
state->mInputSourceGen++;
state->mPipeSink = pipe;
state->mPipeSinkGen++;
state->mFrameCount = mFrameCount;
state->mCommand = FastCaptureState::COLD_IDLE;
// already done in constructor initialization list
//mFastCaptureFutex = 0;
state->mColdFutexAddr = &mFastCaptureFutex;
state->mColdGen++;
state->mDumpState = &mFastCaptureDumpState;
#ifdef TEE_SINK
// FIXME
#endif
mFastCaptureNBLogWriter = audioFlinger->newWriter_l(kFastCaptureLogSize, "FastCapture");
state->mNBLogWriter = mFastCaptureNBLogWriter.get();
sq->end();
sq->push(FastCaptureStateQueue::BLOCK_UNTIL_PUSHED);
// start the fast capture
mFastCapture->run("FastCapture", ANDROID_PRIORITY_URGENT_AUDIO);
pid_t tid = mFastCapture->getTid();
sendPrioConfigEvent(getpid(), tid, kPriorityFastCapture, false /*forApp*/);
stream()->setHalThreadPriority(kPriorityFastCapture);
#ifdef AUDIO_WATCHDOG
// FIXME
#endif
mFastTrackAvail = true;
}
#ifdef TEE_SINK
mTee.set(mInputSource->format(), NBAIO_Tee::TEE_FLAG_INPUT_THREAD);
mTee.setId(std::string("_") + std::to_string(mId) + "_C");
#endif
failed: ;
// FIXME mNormalSource
}
构造的时候初始化了一些和采集相关的东西,这儿的采集源头是mInputSource。
还记得采集线程是被智能指针管理的,首次引用时会自动调用onFirstRef
void AudioFlinger::RecordThread::onFirstRef()
{
run(mThreadName, PRIORITY_URGENT_AUDIO);
}
对于Android的Thread,调用run后,Thread基类会调用子类的threadLoop,因此接下来就直接看threadLoop:
bool AudioFlinger::RecordThread::threadLoop()
{
...
processConfigEvents_l(); // 这个和播放一样,先处理配置命令
size_t size = mActiveTracks.size(); // 便利所有的track
if (size == 0) {
standbyIfNotAlreadyInStandby();
// exitPending() can't become true here
releaseWakeLock_l();
ALOGV("RecordThread: loop stopping");
// go to sleep
mWaitWorkCV.wait(mLock);
ALOGV("RecordThread: loop starting");
goto reacquire_wakelock;
}
bool doBroadcast = false;
bool allStopped = true;
for (size_t i = 0; i < size; ) {
activeTrack = mActiveTracks[i];
if (activeTrack->isTerminated()) {
if (activeTrack->isFastTrack()) {
ALOG_ASSERT(fastTrackToRemove == 0);
fastTrackToRemove = activeTrack;
}
removeTrack_l(activeTrack);
mActiveTracks.remove(activeTrack);// 已经结束的,直接移除
size--;
continue;
}
TrackBase::track_state activeTrackState = activeTrack->mState;
switch (activeTrackState) {
case TrackBase::PAUSING:// 暂停的也移除
mActiveTracks.remove(activeTrack);
activeTrack->mState = TrackBase::PAUSED;
doBroadcast = true;
size--;
continue;
case TrackBase::STARTING_1:
sleepUs = 10000;
i++;
allStopped = false;
continue;
case TrackBase::STARTING_2:
doBroadcast = true;
if (mStandby) {
mThreadMetrics.logBeginInterval();
mStandby = false;
}
activeTrack->mState = TrackBase::ACTIVE;
allStopped = false;
break;
case TrackBase::ACTIVE:
allStopped = false;
break;
case TrackBase::IDLE: // cannot be on ActiveTracks if idle
case TrackBase::PAUSED: // cannot be on ActiveTracks if paused
case TrackBase::STOPPED: // cannot be on ActiveTracks if destroyed/terminated
default:
LOG_ALWAYS_FATAL("%s: Unexpected active track state:%d, id:%d, tracks:%zu",
__func__, activeTrackState, activeTrack->id(), size);
}
if (activeTrack->isFastTrack()) {
ALOG_ASSERT(!mFastTrackAvail);
ALOG_ASSERT(fastTrack == 0);
// if the active fast track is silenced either:
// 1) silence the whole capture from fast capture buffer if this is
// the only active track
// 2) invalidate this track: this will cause the client to reconnect and possibly
// be invalidated again until unsilenced
if (activeTrack->isSilenced()) {
if (size > 1) {
activeTrack->invalidate();
ALOG_ASSERT(fastTrackToRemove == 0);
fastTrackToRemove = activeTrack;
removeTrack_l(activeTrack);
mActiveTracks.remove(activeTrack);
size--;
continue;
} else {
silenceFastCapture = true;
}
}
fastTrack = activeTrack;
}
activeTracks.add(activeTrack);
i++;
}
mActiveTracks.updatePowerState(this);
updateMetadata_l();
if (allStopped) {
standbyIfNotAlreadyInStandby();
}
if (doBroadcast) {
mStartStopCond.broadcast();
}
// sleep if there are no active tracks to process
if (activeTracks.isEmpty()) {
if (sleepUs == 0) {
sleepUs = kRecordThreadSleepUs;
}
continue;
}
sleepUs = 0;
lockEffectChains_l(effectChains);
}
接下来就是开始采集
if (mPipeSource != 0) {
size_t framesToRead = min(mRsmpInFramesOA - rear, mRsmpInFramesP2 / 2);
// The audio fifo read() returns OVERRUN on overflow, and advances the read pointer
// to the full buffer point (clearing the overflow condition). Upon OVERRUN error,
// we immediately retry the read() to get data and prevent another overflow.
for (int retries = 0; retries <= 2; ++retries) {
ALOGW_IF(retries > 0, "overrun on read from pipe, retry #%d", retries);
framesRead = mPipeSource->read((uint8_t*)mRsmpInBuffer + rear * mFrameSize, // 通过hal 采集
framesToRead);
if (framesRead != OVERRUN) break;
}
const ssize_t availableToRead = mPipeSource->availableToRead();
if (availableToRead >= 0) {
// PipeSource is the primary clock. It is up to the AudioRecord client to keep up.
LOG_ALWAYS_FATAL_IF((size_t)availableToRead > mPipeFramesP2,
"more frames to read than fifo size, %zd > %zu",
availableToRead, mPipeFramesP2);
const size_t pipeFramesFree = mPipeFramesP2 - availableToRead;
const size_t sleepFrames = min(pipeFramesFree, mRsmpInFramesP2) / 2;
ALOGVV("mPipeFramesP2:%zu mRsmpInFramesP2:%zu sleepFrames:%zu availableToRead:%zd",
mPipeFramesP2, mRsmpInFramesP2, sleepFrames, availableToRead);
sleepUs = (sleepFrames * 1000000LL) / mSampleRate;
}
if (framesRead < 0) {
status_t status = (status_t) framesRead;
switch (status) {
case OVERRUN:
ALOGW("overrun on read from pipe");
framesRead = 0;
break;
case NEGOTIATE:
ALOGE("re-negotiation is needed");
framesRead = -1; // Will cause an attempt to recover.
break;
default:
ALOGE("unknown error %d on read from pipe", status);
break;
}
}
// otherwise use the HAL / AudioStreamIn directly
} else {
ATRACE_BEGIN("read");
size_t bytesRead;
status_t result = mSource->read(
(uint8_t*)mRsmpInBuffer + rear * mFrameSize, mBufferSize, &bytesRead); // 通过hal 采集
ATRACE_END();
if (result < 0) {
framesRead = result;
} else {
framesRead = bytesRead / mFrameSize;
}
}
// 接下来就是copy 采集音频给mActiveTracks
size = activeTracks.size();
// loop over each active track
for (size_t i = 0; i < size; i++) {
activeTrack = activeTracks[i];
// skip fast tracks, as those are handled directly by FastCapture
if (activeTrack->isFastTrack()) {
continue;
}
// TODO: This code probably should be moved to RecordTrack.
// TODO: Update the activeTrack buffer converter in case of reconfigure.
enum {
OVERRUN_UNKNOWN,
OVERRUN_TRUE,
OVERRUN_FALSE
} overrun = OVERRUN_UNKNOWN;
// loop over getNextBuffer to handle circular sink
for (;;) {
activeTrack->mSink.frameCount = ~0;
status_t status = activeTrack->getNextBuffer(&activeTrack->mSink);
size_t framesOut = activeTrack->mSink.frameCount;
LOG_ALWAYS_FATAL_IF((status == OK) != (framesOut > 0));
// check available frames and handle overrun conditions
// if the record track isn't draining fast enough.
bool hasOverrun;
size_t framesIn;
activeTrack->mResamplerBufferProvider->sync(&framesIn, &hasOverrun);
if (hasOverrun) {
overrun = OVERRUN_TRUE;
}
if (framesOut == 0 || framesIn == 0) {
break;
}
// Don't allow framesOut to be larger than what is possible with resampling
// from framesIn.
// This isn't strictly necessary but helps limit buffer resizing in
// RecordBufferConverter. TODO: remove when no longer needed.
framesOut = min(framesOut,
destinationFramesPossible(
framesIn, mSampleRate, activeTrack->mSampleRate));
if (activeTrack->isDirect()) {
// No RecordBufferConverter used for direct streams. Pass
// straight from RecordThread buffer to RecordTrack buffer.
AudioBufferProvider::Buffer buffer;
buffer.frameCount = framesOut;
status_t status = activeTrack->mResamplerBufferProvider->getNextBuffer(&buffer);
if (status == OK && buffer.frameCount != 0) {
ALOGV_IF(buffer.frameCount != framesOut,
"%s() read less than expected (%zu vs %zu)",
__func__, buffer.frameCount, framesOut);
framesOut = buffer.frameCount;
memcpy(activeTrack->mSink.raw, buffer.raw, buffer.frameCount * mFrameSize);
activeTrack->mResamplerBufferProvider->releaseBuffer(&buffer);
} else {
framesOut = 0;
ALOGE("%s() cannot fill request, status: %d, frameCount: %zu",
__func__, status, buffer.frameCount);
}
} else {
// process frames from the RecordThread buffer provider to the RecordTrack
// buffer
framesOut = activeTrack->mRecordBufferConverter->convert(
activeTrack->mSink.raw,
activeTrack->mResamplerBufferProvider,
framesOut);
}
if (framesOut > 0 && (overrun == OVERRUN_UNKNOWN)) {
overrun = OVERRUN_FALSE;
}
if (activeTrack->mFramesToDrop == 0) {
if (framesOut > 0) {
activeTrack->mSink.frameCount = framesOut;
// Sanitize before releasing if the track has no access to the source data
// An idle UID receives silence from non virtual devices until active
if (activeTrack->isSilenced()) {
memset(activeTrack->mSink.raw, 0, framesOut * activeTrack->frameSize());
}
activeTrack->releaseBuffer(&activeTrack->mSink);
}
} else {
// FIXME could do a partial drop of framesOut
if (activeTrack->mFramesToDrop > 0) {
activeTrack->mFramesToDrop -= (ssize_t)framesOut;
if (activeTrack->mFramesToDrop <= 0) {
activeTrack->clearSyncStartEvent();
}
} else {
activeTrack->mFramesToDrop += framesOut;
if (activeTrack->mFramesToDrop >= 0 || activeTrack->mSyncStartEvent == 0 ||
activeTrack->mSyncStartEvent->isCancelled()) {
ALOGW("Synced record %s, session %d, trigger session %d",
(activeTrack->mFramesToDrop >= 0) ? "timed out" : "cancelled",
activeTrack->sessionId(),
(activeTrack->mSyncStartEvent != 0) ?
activeTrack->mSyncStartEvent->triggerSession() :
AUDIO_SESSION_NONE);
activeTrack->clearSyncStartEvent();
}
}
}
if (framesOut == 0) {
break;
}
}
switch (overrun) {
case OVERRUN_TRUE:
// client isn't retrieving buffers fast enough
if (!activeTrack->setOverflow()) {
nsecs_t now = systemTime();
// FIXME should lastWarning per track?
if ((now - lastWarning) > kWarningThrottleNs) {
ALOGW("RecordThread: buffer overflow");
lastWarning = now;
}
}
break;
case OVERRUN_FALSE:
activeTrack->clearOverflow();
break;
case OVERRUN_UNKNOWN:
break;
}
// update frame information and push timestamp out
activeTrack->updateTrackFrameInfo(
activeTrack->mServerProxy->framesReleased(),
mTimestamp.mPosition[ExtendedTimestamp::LOCATION_SERVER],
mSampleRate, mTimestamp);
}
// 接下来就是一些回收操作了,执行下面操作的时候就已经退出了loop了
standbyIfNotAlreadyInStandby();
{
Mutex::Autolock _l(mLock);
for (size_t i = 0; i < mTracks.size(); i++) {
sp<RecordTrack> track = mTracks[i];
track->invalidate();
}
mActiveTracks.clear();
mStartStopCond.broadcast();
}
releaseWakeLock();
ALOGV("RecordThread %p exiting", this);..
}
开始采集介绍
调用startRecording可以开启操作:
public void startRecording(MediaSyncEvent syncEvent)
throws IllegalStateException {
if (mState != STATE_INITIALIZED) {
throw new IllegalStateException("startRecording() called on an "
+ "uninitialized AudioRecord.");
}
// start recording
synchronized(mRecordingStateLock) {
if (native_start(syncEvent.getType(), syncEvent.getAudioSessionId()) == SUCCESS) {
handleFullVolumeRec(true);
mRecordingState = RECORDSTATE_RECORDING;
}
}
}
实现是在native_start里面,这时候调用就会进入jni:
static jint
android_media_AudioRecord_start(JNIEnv *env, jobject thiz, jint event, jint triggerSession)
{
sp<AudioRecord> lpRecorder = getAudioRecord(env, thiz);
if (lpRecorder == NULL ) {
jniThrowException(env, "java/lang/IllegalStateException", NULL);
return (jint) AUDIO_JAVA_ERROR;
}
return nativeToJavaStatus(
lpRecorder->start((AudioSystem::sync_event_t)event, (audio_session_t) triggerSession));
}
看下AudioRecorde的实现,可以猜想到肯定是通过record的binder直接操纵AudioFlinger中的Record对象:
status_t AudioRecord::start(AudioSystem::sync_event_t event, audio_session_t triggerSession)
{
const int64_t beginNs = systemTime();
ALOGV("%s(%d): sync event %d trigger session %d", __func__, mPortId, event, triggerSession);
AutoMutex lock(mLock);
status_t status = NO_ERROR;
mediametrics::Defer defer([&] {
mediametrics::LogItem(mMetricsId)
.set(AMEDIAMETRICS_PROP_CALLERNAME,
mCallerName.empty()
? AMEDIAMETRICS_PROP_CALLERNAME_VALUE_UNKNOWN
: mCallerName.c_str())
.set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_START)
.set(AMEDIAMETRICS_PROP_EXECUTIONTIMENS, (int64_t)(systemTime() - beginNs))
.set(AMEDIAMETRICS_PROP_STATE, stateToString(mActive))
.set(AMEDIAMETRICS_PROP_STATUS, (int32_t)status)
.record(); });
if (mActive) {
return status;
}
// discard data in buffer
const uint32_t framesFlushed = mProxy->flush();
mFramesReadServerOffset -= mFramesRead + framesFlushed;
mFramesRead = 0;
mProxy->clearTimestamp(); // timestamp is invalid until next server push
mPreviousTimestamp.clear();
mTimestampRetrogradePositionReported = false;
mTimestampRetrogradeTimeReported = false;
// reset current position as seen by client to 0
mProxy->setEpoch(mProxy->getEpoch() - mProxy->getPosition());
// force refresh of remaining frames by processAudioBuffer() as last
// read before stop could be partial.
mRefreshRemaining = true;
mNewPosition = mProxy->getPosition() + mUpdatePeriod;
int32_t flags = android_atomic_acquire_load(&mCblk->mFlags);
// we reactivate markers (mMarkerPosition != 0) as the position is reset to 0.
// This is legacy behavior. This is not done in stop() to avoid a race condition
// where the last marker event is issued twice.
mMarkerReached = false;
// mActive is checked by restoreRecord_l
mActive = true;
if (!(flags & CBLK_INVALID)) {
status = mAudioRecord->start(event, triggerSession).transactionError(); // 通过Record binder 调用Record的start
if (status == DEAD_OBJECT) {
flags |= CBLK_INVALID;
}
}
if (flags & CBLK_INVALID) {
status = restoreRecord_l("start");
}
// Call these directly because we are already holding the lock.
mAudioRecord->setPreferredMicrophoneDirection(mSelectedMicDirection);
mAudioRecord->setPreferredMicrophoneFieldDimension(mSelectedMicFieldDimension);
if (status != NO_ERROR) {
mActive = false;
ALOGE("%s(%d): status %d", __func__, mPortId, status);
mMediaMetrics.markError(status, __FUNCTION__);
} else {
mTracker->recordingStarted(); // 记录启动事件,这样可以被client感知到有应用开启采集了
sp<AudioRecordThread> t = mAudioRecordThread; // 驱动采集回调线程
if (t != 0) {
t->resume();
} else {
mPreviousPriority = getpriority(PRIO_PROCESS, 0);
get_sched_policy(0, &mPreviousSchedulingGroup);
androidSetThreadPriority(0, ANDROID_PRIORITY_AUDIO);
}
// we've successfully started, log that time
mMediaMetrics.logStart(systemTime());
}
return status;
}
在继续往下看之前,先看下在回调驱动模式下,如何启动的采集回调通知Java的,关键就在于mAudioRecordThread
bool AudioRecord::AudioRecordThread::threadLoop()
{
{
AutoMutex _l(mMyLock);
if (mPaused) {
// TODO check return value and handle or log
mMyCond.wait(mMyLock);
// caller will check for exitPending()
return true;
}
if (mIgnoreNextPausedInt) {
mIgnoreNextPausedInt = false;
mPausedInt = false;
}
if (mPausedInt) {
if (mPausedNs > 0) {
// TODO check return value and handle or log
(void) mMyCond.waitRelative(mMyLock, mPausedNs);
} else {
// TODO check return value and handle or log
mMyCond.wait(mMyLock);
}
mPausedInt = false;
return true;
}
}
if (exitPending()) {
return false;
}
nsecs_t ns = mReceiver.processAudioBuffer();
switch (ns) {
case 0:
return true;
case NS_INACTIVE:
pauseInternal();
return true;
case NS_NEVER:
return false;
case NS_WHENEVER:
// Event driven: call wake() when callback notifications conditions change.
ns = INT64_MAX;
FALLTHROUGH_INTENDED;
default:
LOG_ALWAYS_FATAL_IF(ns < 0, "%s() returned %lld", __func__, (long long)ns);
pauseInternal(ns);
return true;
}
}
这儿就是调用AudioRecord的processAudioBuffer,接下来看下processAudioBuffer:
nsecs_t AudioRecord::processAudioBuffer()
{
mLock.lock();
if (mAwaitBoost) {
mAwaitBoost = false;
mLock.unlock();
static const int32_t kMaxTries = 5;
int32_t tryCounter = kMaxTries;
uint32_t pollUs = 10000;
do {
int policy = sched_getscheduler(0) & ~SCHED_RESET_ON_FORK;
if (policy == SCHED_FIFO || policy == SCHED_RR) {
break;
}
usleep(pollUs);
pollUs <<= 1;
} while (tryCounter-- > 0);
if (tryCounter < 0) {
ALOGE("%s(%d): did not receive expected priority boost on time", __func__, mPortId);
}
// Run again immediately
return 0;
}
// Can only reference mCblk while locked
int32_t flags = android_atomic_and(~CBLK_OVERRUN, &mCblk->mFlags);
// Check for track invalidation
if (flags & CBLK_INVALID) {
(void) restoreRecord_l("processAudioBuffer");
mLock.unlock();
// Run again immediately, but with a new IAudioRecord
return 0;
}
bool active = mActive;
// Manage overrun callback, must be done under lock to avoid race with releaseBuffer()
bool newOverrun = false;
if (flags & CBLK_OVERRUN) {
if (!mInOverrun) {
mInOverrun = true;
newOverrun = true;
}
}
// Get current position of server
Modulo<uint32_t> position(mProxy->getPosition());
// Manage marker callback
bool markerReached = false;
Modulo<uint32_t> markerPosition(mMarkerPosition);
// FIXME fails for wraparound, need 64 bits
if (!mMarkerReached && markerPosition.value() > 0 && position >= markerPosition) {
mMarkerReached = markerReached = true;
}
// Determine the number of new position callback(s) that will be needed, while locked
size_t newPosCount = 0;
Modulo<uint32_t> newPosition(mNewPosition);
uint32_t updatePeriod = mUpdatePeriod;
// FIXME fails for wraparound, need 64 bits
if (updatePeriod > 0 && position >= newPosition) {
newPosCount = ((position - newPosition).value() / updatePeriod) + 1;
mNewPosition += updatePeriod * newPosCount;
}
// Cache other fields that will be needed soon
uint32_t notificationFrames = mNotificationFramesAct;
if (mRefreshRemaining) {
mRefreshRemaining = false;
mRemainingFrames = notificationFrames;
mRetryOnPartialBuffer = false;
}
size_t misalignment = mProxy->getMisalignment();
uint32_t sequence = mSequence;
// These fields don't need to be cached, because they are assigned only by set():
// mTransfer, mCbf, mUserData, mSampleRate, mFrameSize
mLock.unlock();
// perform callbacks while unlocked
if (newOverrun) {
mCbf(EVENT_OVERRUN, mUserData, NULL);
}
if (markerReached) {
mCbf(EVENT_MARKER, mUserData, &markerPosition); // 采集到了预先设置的位置了
}
while (newPosCount > 0) {
size_t temp = newPosition.value(); // FIXME size_t != uint32_t
mCbf(EVENT_NEW_POS, mUserData, &temp); // 采集到了预先设置的长度了
newPosition += updatePeriod;
newPosCount--;
}
if (mObservedSequence != sequence) {
mObservedSequence = sequence;
mCbf(EVENT_NEW_IAUDIORECORD, mUserData, NULL);
}
// if inactive, then don't run me again until re-started
if (!active) {
return NS_INACTIVE;
}
// Compute the estimated time until the next timed event (position, markers)
uint32_t minFrames = ~0;
if (!markerReached && position < markerPosition) {
minFrames = (markerPosition - position).value();
}
if (updatePeriod > 0) {
uint32_t remaining = (newPosition - position).value();
if (remaining < minFrames) {
minFrames = remaining;
}
}
// If > 0, poll periodically to recover from a stuck server. A good value is 2.
static const uint32_t kPoll = 0;
if (kPoll > 0 && mTransfer == TRANSFER_CALLBACK && kPoll * notificationFrames < minFrames) {
minFrames = kPoll * notificationFrames;
}
// Convert frame units to time units
nsecs_t ns = NS_WHENEVER;
if (minFrames != (uint32_t) ~0) {
// This "fudge factor" avoids soaking CPU, and compensates for late progress by server
static const nsecs_t kFudgeNs = 10000000LL; // 10 ms
ns = ((minFrames * 1000000000LL) / mSampleRate) + kFudgeNs;
}
// If not supplying data by EVENT_MORE_DATA, then we're done
if (mTransfer != TRANSFER_CALLBACK) {
return ns;
}
struct timespec timeout;
const struct timespec *requested = &ClientProxy::kForever;
if (ns != NS_WHENEVER) {
timeout.tv_sec = ns / 1000000000LL;
timeout.tv_nsec = ns % 1000000000LL;
ALOGV("%s(%d): timeout %ld.%03d",
__func__, mPortId, timeout.tv_sec, (int) timeout.tv_nsec / 1000000);
requested = &timeout;
}
size_t readFrames = 0;
while (mRemainingFrames > 0) {
Buffer audioBuffer;
audioBuffer.frameCount = mRemainingFrames;
size_t nonContig;
status_t err = obtainBuffer(&audioBuffer, requested, NULL, &nonContig); // 获取采集数据
LOG_ALWAYS_FATAL_IF((err != NO_ERROR) != (audioBuffer.frameCount == 0),
"%s(%d): obtainBuffer() err=%d frameCount=%zu",
__func__, mPortId, err, audioBuffer.frameCount);
requested = &ClientProxy::kNonBlocking;
size_t avail = audioBuffer.frameCount + nonContig;
ALOGV("%s(%d): obtainBuffer(%u) returned %zu = %zu + %zu err %d",
__func__, mPortId, mRemainingFrames, avail, audioBuffer.frameCount, nonContig, err);
if (err != NO_ERROR) {
if (err == TIMED_OUT || err == WOULD_BLOCK || err == -EINTR) {
break;
}
ALOGE("%s(%d): Error %d obtaining an audio buffer, giving up.",
__func__, mPortId, err);
return NS_NEVER;
}
if (mRetryOnPartialBuffer) {
mRetryOnPartialBuffer = false;
if (avail < mRemainingFrames) {
int64_t myns = ((mRemainingFrames - avail) *
1100000000LL) / mSampleRate;
if (ns < 0 || myns < ns) {
ns = myns;
}
return ns;
}
}
size_t reqSize = audioBuffer.size;
mCbf(EVENT_MORE_DATA, mUserData, &audioBuffer); // 利用有数据了
size_t readSize = audioBuffer.size;
// Validate on returned size
if (ssize_t(readSize) < 0 || readSize > reqSize) {
ALOGE("%s(%d): EVENT_MORE_DATA requested %zu bytes but callback returned %zd bytes",
__func__, mPortId, reqSize, ssize_t(readSize));
return NS_NEVER;
}
if (readSize == 0) {
// The callback is done consuming buffers
// Keep this thread going to handle timed events and
// still try to provide more data in intervals of WAIT_PERIOD_MS
// but don't just loop and block the CPU, so wait
return WAIT_PERIOD_MS * 1000000LL;
}
size_t releasedFrames = readSize / mFrameSize;
audioBuffer.frameCount = releasedFrames;
mRemainingFrames -= releasedFrames;
if (misalignment >= releasedFrames) {
misalignment -= releasedFrames;
} else {
misalignment = 0;
}
releaseBuffer(&audioBuffer);
readFrames += releasedFrames;
// FIXME here is where we would repeat EVENT_MORE_DATA again on same advanced buffer
// if callback doesn't like to accept the full chunk
if (readSize < reqSize) {
continue;
}
// There could be enough non-contiguous frames available to satisfy the remaining request
if (mRemainingFrames <= nonContig) {
continue;
}
}
if (readFrames > 0) {
AutoMutex lock(mLock);
mFramesRead += readFrames;
// mFramesReadTime = systemTime(SYSTEM_TIME_MONOTONIC); // not provided at this time.
}
mRemainingFrames = notificationFrames;
mRetryOnPartialBuffer = true;
// A lot has transpired since ns was calculated, so run again immediately and re-calculate
return 0;
}
这儿的mCbf就是jni的一个回调 recorderCallback:
static void recorderCallback(int event, void* user, void *info) {
audiorecord_callback_cookie *callbackInfo = (audiorecord_callback_cookie *)user;
{
Mutex::Autolock l(sLock);
if (sAudioRecordCallBackCookies.indexOf(callbackInfo) < 0) {
return;
}
callbackInfo->busy = true;
}
switch (event) {
case AudioRecord::EVENT_MARKER: {
JNIEnv *env = AndroidRuntime::getJNIEnv();
if (user != NULL && env != NULL) {
env->CallStaticVoidMethod(
callbackInfo->audioRecord_class,
javaAudioRecordFields.postNativeEventInJava, // 回调java
callbackInfo->audioRecord_ref, event, 0,0, NULL);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
}
}
} break;
case AudioRecord::EVENT_NEW_POS: {
JNIEnv *env = AndroidRuntime::getJNIEnv();
if (user != NULL && env != NULL) {
env->CallStaticVoidMethod(
callbackInfo->audioRecord_class,
javaAudioRecordFields.postNativeEventInJava,
callbackInfo->audioRecord_ref, event, 0,0, NULL);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
}
}
} break;
}
{
Mutex::Autolock l(sLock);
callbackInfo->busy = false;
callbackInfo->cond.broadcast();
}
}
对应的Java方法是
private static void postEventFromNative(Object audiorecord_ref,
int what, int arg1, int arg2, Object obj) {
//logd("Event posted from the native side: event="+ what + " args="+ arg1+" "+arg2);
AudioRecord recorder = (AudioRecord)((WeakReference)audiorecord_ref).get();
if (recorder == null) {
return;
}
if (what == AudioSystem.NATIVE_EVENT_ROUTING_CHANGE) {
recorder.broadcastRoutingChange();
return;
}
if (recorder.mEventHandler != null) {
Message m =
recorder.mEventHandler.obtainMessage(what, arg1, arg2, obj);
recorder.mEventHandler.sendMessage(m);
}
}
消息处理的地方是:
public void handleMessage(Message msg) {
OnRecordPositionUpdateListener listener = null;
synchronized (mPositionListenerLock) {
listener = mAudioRecord.mPositionListener;
}
switch (msg.what) {
case NATIVE_EVENT_MARKER:
if (listener != null) {
listener.onMarkerReached(mAudioRecord);
}
break;
case NATIVE_EVENT_NEW_POS:
if (listener != null) {
listener.onPeriodicNotification(mAudioRecord);
}
break;
default:
loge("Unknown native event type: " + msg.what);
break;
}
}
当前只处理了NATIVE_EVENT_MARKER和NATIVE_EVENT_NEW_POS。也就是只支持提前设置好需要提醒的位置点(setNotificationMarkerPosition)和采集到预先设置的数据量(setPositionNotificationPeriod)。也就是OnRecordPositionUpdateListener 接口定义的回调。
接下来继续看下Record的start:
status_t AudioFlinger::RecordThread::RecordTrack::start(AudioSystem::sync_event_t event,
audio_session_t triggerSession)
{
sp<ThreadBase> thread = mThread.promote();
if (thread != 0) {
RecordThread *recordThread = (RecordThread *)thread.get();
return recordThread->start(this, event, triggerSession);
} else {
ALOGW("%s track %d: thread was destroyed", __func__, portId());
return DEAD_OBJECT;
}
}
这儿没做什么操作,直接调用了RecordThread的start:
status_t AudioFlinger::RecordThread::start(RecordThread::RecordTrack* recordTrack,
AudioSystem::sync_event_t event,
audio_session_t triggerSession)
{
ALOGV("RecordThread::start event %d, triggerSession %d", event, triggerSession);
sp<ThreadBase> strongMe = this;
status_t status = NO_ERROR;
if (event == AudioSystem::SYNC_EVENT_NONE) {
recordTrack->clearSyncStartEvent();
} else if (event != AudioSystem::SYNC_EVENT_SAME) {
recordTrack->mSyncStartEvent = mAudioFlinger->createSyncEvent(event,
triggerSession,
recordTrack->sessionId(),
syncStartEventCallback,
recordTrack);
// Sync event can be cancelled by the trigger session if the track is not in a
// compatible state in which case we start record immediately
if (recordTrack->mSyncStartEvent->isCancelled()) {
recordTrack->clearSyncStartEvent();
} else {
// do not wait for the event for more than AudioSystem::kSyncRecordStartTimeOutMs
recordTrack->mFramesToDrop = -(ssize_t)
((AudioSystem::kSyncRecordStartTimeOutMs * recordTrack->mSampleRate) / 1000);
}
}
{
// This section is a rendezvous between binder thread executing start() and RecordThread
AutoMutex lock(mLock);
if (recordTrack->isInvalid()) {
recordTrack->clearSyncStartEvent();
ALOGW("%s track %d: invalidated before startInput", __func__, recordTrack->portId());
return DEAD_OBJECT;
}
// 修改recordTrack的状态
if (mActiveTracks.indexOf(recordTrack) >= 0) {
if (recordTrack->mState == TrackBase::PAUSING) {
// We haven't stopped yet (moved to PAUSED and not in mActiveTracks)
// so no need to startInput().
ALOGV("active record track PAUSING -> ACTIVE");
recordTrack->mState = TrackBase::ACTIVE;
} else {
ALOGV("active record track state %d", recordTrack->mState);
}
return status;
}
// TODO consider other ways of handling this, such as changing the state to :STARTING and
// adding the track to mActiveTracks after returning from AudioSystem::startInput(),
// or using a separate command thread
recordTrack->mState = TrackBase::STARTING_1;
mActiveTracks.add(recordTrack); // 加入mActiveTracks列表,这样在ThreadLoop中采集到数据就会拷贝给recordTrack
status_t status = NO_ERROR;
if (recordTrack->isExternalTrack()) {
mLock.unlock();
status = AudioSystem::startInput(recordTrack->portId());
mLock.lock();
if (recordTrack->isInvalid()) {
recordTrack->clearSyncStartEvent();
if (status == NO_ERROR && recordTrack->mState == TrackBase::STARTING_1) {
recordTrack->mState = TrackBase::STARTING_2;
// STARTING_2 forces destroy to call stopInput.
}
ALOGW("%s track %d: invalidated after startInput", __func__, recordTrack->portId());
return DEAD_OBJECT;
}
if (recordTrack->mState != TrackBase::STARTING_1) {
ALOGW("%s(%d): unsynchronized mState:%d change",
__func__, recordTrack->id(), recordTrack->mState);
// Someone else has changed state, let them take over,
// leave mState in the new state.
recordTrack->clearSyncStartEvent();
return INVALID_OPERATION;
}
// we're ok, but perhaps startInput has failed
if (status != NO_ERROR) {
ALOGW("%s(%d): startInput failed, status %d",
__func__, recordTrack->id(), status);
// We are in ActiveTracks if STARTING_1 and valid, so remove from ActiveTracks,
// leave in STARTING_1, so destroy() will not call stopInput.
mActiveTracks.remove(recordTrack);
recordTrack->clearSyncStartEvent();
return status;
}
sendIoConfigEvent_l(
AUDIO_CLIENT_STARTED, recordTrack->creatorPid(), recordTrack->portId());
}
recordTrack->logBeginInterval(patchSourcesToString(&mPatch)); // log to MediaMetrics
// Catch up with current buffer indices if thread is already running.
// This is what makes a new client discard all buffered data. If the track's mRsmpInFront
// was initialized to some value closer to the thread's mRsmpInFront, then the track could
// see previously buffered data before it called start(), but with greater risk of overrun.
recordTrack->mResamplerBufferProvider->reset();
if (!recordTrack->isDirect()) {
// clear any converter state as new data will be discontinuous
recordTrack->mRecordBufferConverter->reset();
}
recordTrack->mState = TrackBase::STARTING_2;
// signal thread to start
mWaitWorkCV.broadcast();
return status;
}
}
这时候就完成了start操作,总之就是start就是把record结构加到了一个列表中,而该列表中的record会接收到采集的音频数据。
停止采集分析
public void stop()
throws IllegalStateException {
if (mState != STATE_INITIALIZED) {
throw new IllegalStateException("stop() called on an uninitialized AudioRecord.");
}
// stop recording
synchronized(mRecordingStateLock) {
handleFullVolumeRec(false);
native_stop();
mRecordingState = RECORDSTATE_STOPPED;
}
}
流程照样是从Java到jni再到AudioRecord,到AudioFlinger:
static void
android_media_AudioRecord_stop(JNIEnv *env, jobject thiz)
{
sp<AudioRecord> lpRecorder = getAudioRecord(env, thiz);
if (lpRecorder == NULL ) {
jniThrowException(env, "java/lang/IllegalStateException", NULL);
return;
}
lpRecorder->stop();
//ALOGV("Called lpRecorder->stop()");
}
继续看下Record里面的stop做的事情,可以猜想到,应该是设置一个标记就可以了:
void AudioFlinger::RecordThread::RecordTrack::stop()
{
sp<ThreadBase> thread = mThread.promote();
if (thread != 0) {
RecordThread *recordThread = (RecordThread *)thread.get();
if (recordThread->stop(this) && isExternalTrack()) {
AudioSystem::stopInput(mPortId);
}
}
}
bool AudioFlinger::RecordThread::stop(RecordThread::RecordTrack* recordTrack) {
ALOGV("RecordThread::stop");
AutoMutex _l(mLock);
// if we're invalid, we can't be on the ActiveTracks.
if (mActiveTracks.indexOf(recordTrack) < 0 || recordTrack->mState == TrackBase::PAUSING) {
return false;
}
// note that threadLoop may still be processing the track at this point [without lock]
recordTrack->mState = TrackBase::PAUSING; // 设置标记,验证了猜想。
// NOTE: Waiting here is important to keep stop synchronous.
// This is needed for proper patchRecord peer release.
while (recordTrack->mState == TrackBase::PAUSING && !recordTrack->isInvalid()) {
mWaitWorkCV.broadcast(); // signal thread to stop
mStartStopCond.wait(mLock);
}
if (recordTrack->mState == TrackBase::PAUSED) { // successful stop
ALOGV("Record stopped OK");
return true;
}
// don't handle anything - we've been invalidated or restarted and in a different state
ALOGW_IF("%s(%d): unsynchronized stop, state: %d",
__func__, recordTrack->id(), recordTrack->mState);
return false;
}
这时候设置PAUSED标记后,采集线程ThreadLoop就会在遍历record的时候去掉该record,这样就不会给record拷贝数据了。
read 分析
调用方需要通过read来读取音频数据,接下来就看下read流程:
public int read(@NonNull byte[] audioData, int offsetInBytes, int sizeInBytes,
@ReadMode int readMode) {
// Note: we allow reads of extended integers into a byte array.
if (mState != STATE_INITIALIZED || mAudioFormat == AudioFormat.ENCODING_PCM_FLOAT) {
return ERROR_INVALID_OPERATION;
}
if ((readMode != READ_BLOCKING) && (readMode != READ_NON_BLOCKING)) {
Log.e(TAG, "AudioRecord.read() called with invalid blocking mode");
return ERROR_BAD_VALUE;
}
if ( (audioData == null) || (offsetInBytes < 0 ) || (sizeInBytes < 0)
|| (offsetInBytes + sizeInBytes < 0) // detect integer overflow
|| (offsetInBytes + sizeInBytes > audioData.length)) {
return ERROR_BAD_VALUE;
}
return native_read_in_byte_array(audioData, offsetInBytes, sizeInBytes,
readMode == READ_BLOCKING);
}
同样Java侧基本没做什么,还是到了native,接下来就看下native_read_in_byte_array:
template <typename T>
static jint android_media_AudioRecord_readInArray(JNIEnv *env, jobject thiz,
T javaAudioData,
jint offsetInSamples, jint sizeInSamples,
jboolean isReadBlocking) {
// get the audio recorder from which we'll read new audio samples
sp<AudioRecord> lpRecorder = getAudioRecord(env, thiz);
if (lpRecorder == NULL) {
ALOGE("Unable to retrieve AudioRecord object");
return (jint)AUDIO_JAVA_INVALID_OPERATION;
}
if (javaAudioData == NULL) {
ALOGE("Invalid Java array to store recorded audio");
return (jint)AUDIO_JAVA_BAD_VALUE;
}
// NOTE: We may use GetPrimitiveArrayCritical() when the JNI implementation changes in such
// a way that it becomes much more efficient. When doing so, we will have to prevent the
// AudioSystem callback to be called while in critical section (in case of media server
// process crash for instance)
// get the pointer to where we'll record the audio
auto *recordBuff = envGetArrayElements(env, javaAudioData, NULL);
if (recordBuff == NULL) {
ALOGE("Error retrieving destination for recorded audio data");
return (jint)AUDIO_JAVA_BAD_VALUE;
}
// read the new audio data from the native AudioRecord object
const size_t sizeInBytes = sizeInSamples * sizeof(*recordBuff);
ssize_t readSize = lpRecorder->read(
recordBuff + offsetInSamples, sizeInBytes, isReadBlocking == JNI_TRUE /* blocking */); // 读取数据
envReleaseArrayElements(env, javaAudioData, recordBuff, 0);
if (readSize < 0) {
return interpretReadSizeError(readSize);
}
return (jint)(readSize / sizeof(*recordBuff));
}
接下来看下Native AudioRecord的实现,可以看看是如何读取的共享内存数据:
ssize_t AudioRecord::read(void* buffer, size_t userSize, bool blocking)
{
if (mTransfer != TRANSFER_SYNC) {
return INVALID_OPERATION;
}
if (ssize_t(userSize) < 0 || (buffer == NULL && userSize != 0)) {
// Validation. user is most-likely passing an error code, and it would
// make the return value ambiguous (actualSize vs error).
ALOGE("%s(%d) (buffer=%p, size=%zu (%zu)",
__func__, mPortId, buffer, userSize, userSize);
return BAD_VALUE;
}
ssize_t read = 0;
Buffer audioBuffer;
while (userSize >= mFrameSize) {
audioBuffer.frameCount = userSize / mFrameSize;
status_t err = obtainBuffer(&audioBuffer,
blocking ? &ClientProxy::kForever : &ClientProxy::kNonBlocking);// //获取共享内存数据
if (err < 0) {
if (read > 0) {
break;
}
if (err == TIMED_OUT || err == -EINTR) {
err = WOULD_BLOCK;
}
return ssize_t(err);
}
size_t bytesRead = audioBuffer.size;
memcpy(buffer, audioBuffer.i8, bytesRead);
buffer = ((char *) buffer) + bytesRead;
userSize -= bytesRead;
read += bytesRead;
releaseBuffer(&audioBuffer);
}
if (read > 0) {
mFramesRead += read / mFrameSize;
// mFramesReadTime = systemTime(SYSTEM_TIME_MONOTONIC); // not provided at this time.
}
return read;
}
可以继续看下obtainBuffer如何读取到共享内存的数据:
status_t AudioRecord::obtainBuffer(Buffer* audioBuffer, int32_t waitCount, size_t *nonContig)
{
if (audioBuffer == NULL) {
if (nonContig != NULL) {
*nonContig = 0;
}
return BAD_VALUE;
}
if (mTransfer != TRANSFER_OBTAIN) {
audioBuffer->frameCount = 0;
audioBuffer->size = 0;
audioBuffer->raw = NULL;
if (nonContig != NULL) {
*nonContig = 0;
}
return INVALID_OPERATION;
}
const struct timespec *requested;
struct timespec timeout;
if (waitCount == -1) {
requested = &ClientProxy::kForever;
} else if (waitCount == 0) {
requested = &ClientProxy::kNonBlocking;
} else if (waitCount > 0) {
time_t ms = WAIT_PERIOD_MS * (time_t) waitCount;
timeout.tv_sec = ms / 1000;
timeout.tv_nsec = (long) (ms % 1000) * 1000000;
requested = &timeout;
} else {
ALOGE("%s(%d): invalid waitCount %d", __func__, mPortId, waitCount);
requested = NULL;
}
return obtainBuffer(audioBuffer, requested, NULL /*elapsed*/, nonContig);
}
这儿没做什么继续看内部调用的obtainBuffer:
status_t AudioRecord::obtainBuffer(Buffer* audioBuffer, const struct timespec *requested,
struct timespec *elapsed, size_t *nonContig)
{
// previous and new IAudioRecord sequence numbers are used to detect track re-creation
uint32_t oldSequence = 0;
Proxy::Buffer buffer;
status_t status = NO_ERROR;
static const int32_t kMaxTries = 5;
int32_t tryCounter = kMaxTries;
do {
// obtainBuffer() is called with mutex unlocked, so keep extra references to these fields to
// keep them from going away if another thread re-creates the track during obtainBuffer()
sp<AudioRecordClientProxy> proxy;
sp<IMemory> iMem;
sp<IMemory> bufferMem;
{
// start of lock scope
AutoMutex lock(mLock);
uint32_t newSequence = mSequence;
// did previous obtainBuffer() fail due to media server death or voluntary invalidation?
if (status == DEAD_OBJECT) {
// re-create track, unless someone else has already done so
if (newSequence == oldSequence) {
status = restoreRecord_l("obtainBuffer");
if (status != NO_ERROR) {
buffer.mFrameCount = 0;
buffer.mRaw = NULL;
buffer.mNonContig = 0;
break;
}
}
}
oldSequence = newSequence;
// Keep the extra references
proxy = mProxy;
iMem = mCblkMemory;
bufferMem = mBufferMemory;
// Non-blocking if track is stopped
if (!mActive) {
requested = &ClientProxy::kNonBlocking;
}
} // end of lock scope
buffer.mFrameCount = audioBuffer->frameCount;
// FIXME starts the requested timeout and elapsed over from scratch
status = proxy->obtainBuffer(&buffer, requested, elapsed); // 读取数据
} while ((status == DEAD_OBJECT) && (tryCounter-- > 0));
audioBuffer->frameCount = buffer.mFrameCount;
audioBuffer->size = buffer.mFrameCount * mFrameSize;
audioBuffer->raw = buffer.mRaw;
audioBuffer->sequence = oldSequence;
if (nonContig != NULL) {
*nonContig = buffer.mNonContig;
}
return status;
}
这儿的proxy是AudioRecordClientProxy,在创建AudioFlinger的Record的时候创建的,可以再回顾下相关的位置:
record = audioFlinger->createRecord(input, output, &status); // 创建record
if (status == NO_ERROR) {
break;
}
if (status != FAILED_TRANSACTION || --remainingAttempts <= 0) {
ALOGE("%s(%d): AudioFlinger could not create record track, status: %d",
__func__, mPortId, status);
goto exit;
}
// FAILED_TRANSACTION happens under very specific conditions causing a state mismatch
// between audio policy manager and audio flinger during the input stream open sequence
// and can be recovered by retrying.
// Leave time for race condition to clear before retrying and randomize delay
// to reduce the probability of concurrent retries in locked steps.
usleep((20 + rand() % 30) * 10000);
} while (1);
ALOG_ASSERT(record != 0);
// AudioFlinger now owns the reference to the I/O handle,
// so we are no longer responsible for releasing it.
mAwaitBoost = false;
if (output.flags & AUDIO_INPUT_FLAG_FAST) {
ALOGI("%s(%d): AUDIO_INPUT_FLAG_FAST successful; frameCount %zu -> %zu",
__func__, mPortId,
mReqFrameCount, output.frameCount);
mAwaitBoost = true;
}
mFlags = output.flags;
mRoutedDeviceId = output.selectedDeviceId;
mSessionId = output.sessionId;
mSampleRate = output.sampleRate;
if (output.cblk == 0) {
ALOGE("%s(%d): Could not get control block", __func__, mPortId);
status = NO_INIT;
goto exit;
}
// TODO: Using unsecurePointer() has some associated security pitfalls
// (see declaration for details).
// Either document why it is safe in this case or address the
// issue (e.g. by copying).
iMemPointer = output.cblk ->unsecurePointer(); // 匿名共享内存对象
if (iMemPointer == NULL) {
ALOGE("%s(%d): Could not get control block pointer", __func__, mPortId);
status = NO_INIT;
goto exit;
}
cblk = static_cast<audio_track_cblk_t*>(iMemPointer);
// Starting address of buffers in shared memory.
// The buffers are either immediately after the control block,
// or in a separate area at discretion of server.
void *buffers;
if (output.buffers == 0) {
buffers = cblk + 1;
} else {
// TODO: Using unsecurePointer() has some associated security pitfalls
// (see declaration for details).
// Either document why it is safe in this case or address the
// issue (e.g. by copying).
buffers = output.buffers->unsecurePointer();
if (buffers == NULL) {
ALOGE("%s(%d): Could not get buffer pointer", __func__, mPortId);
status = NO_INIT;
goto exit;
}
}
// invariant that mAudioRecord != 0 is true only after set() returns successfully
if (mAudioRecord != 0) {
IInterface::asBinder(mAudioRecord)->unlinkToDeath(mDeathNotifier, this);
mDeathNotifier.clear();
}
mAudioRecord = record;
mCblkMemory = output.cblk;
mBufferMemory = output.buffers;
IPCThreadState::self()->flushCommands();
mCblk = cblk;
// note that output.frameCount is the (possibly revised) value of mReqFrameCount
if (output.frameCount < mReqFrameCount || (mReqFrameCount == 0 && output.frameCount == 0)) {
ALOGW("%s(%d): Requested frameCount %zu but received frameCount %zu",
__func__, output.portId,
mReqFrameCount, output.frameCount);
}
// Make sure that application is notified with sufficient margin before overrun.
// The computation is done on server side.
if (mNotificationFramesReq > 0 && output.notificationFrameCount != mNotificationFramesReq) {
ALOGW("%s(%d): Server adjusted notificationFrames from %u to %zu for frameCount %zu",
__func__, output.portId,
mNotificationFramesReq, output.notificationFrameCount, output.frameCount);
}
mNotificationFramesAct = (uint32_t)output.notificationFrameCount;
//mInput != input includes the case where mInput == AUDIO_IO_HANDLE_NONE for first creation
if (mDeviceCallback != 0) {
if (mInput != AUDIO_IO_HANDLE_NONE) {
AudioSystem::removeAudioDeviceCallback(this, mInput, mPortId);
}
AudioSystem::addAudioDeviceCallback(this, output.inputId, output.portId);
}
mPortId = output.portId;
// We retain a copy of the I/O handle, but don't own the reference
mInput = output.inputId;
mRefreshRemaining = true;
mFrameCount = output.frameCount;
// If IAudioRecord is re-created, don't let the requested frameCount
// decrease. This can confuse clients that cache frameCount().
if (mFrameCount > mReqFrameCount) {
mReqFrameCount = mFrameCount;
}
// update proxy
mProxy = new AudioRecordClientProxy(cblk, buffers, mFrameCount, mFrameSize); // 利用匿名共享内存对象构造AudioRecordClientProxy
mProxy->setEpoch(epoch);
mProxy->setMinimum(mNotificationFramesAct);
继续看下proxy的obtainBuffer:
__attribute__((no_sanitize("integer")))
status_t ClientProxy::obtainBuffer(Buffer* buffer, const struct timespec *requested,
struct timespec *elapsed)
{
LOG_ALWAYS_FATAL_IF(buffer == NULL || buffer->mFrameCount == 0,
"%s: null or zero frame buffer, buffer:%p", __func__, buffer);
struct timespec total; // total elapsed time spent waiting
total.tv_sec = 0;
total.tv_nsec = 0;
bool measure = elapsed != NULL; // whether to measure total elapsed time spent waiting
status_t status;
enum {
TIMEOUT_ZERO, // requested == NULL || *requested == 0
TIMEOUT_INFINITE, // *requested == infinity
TIMEOUT_FINITE, // 0 < *requested < infinity
TIMEOUT_CONTINUE, // additional chances after TIMEOUT_FINITE
} timeout;
if (requested == NULL) {
timeout = TIMEOUT_ZERO;
} else if (requested->tv_sec == 0 && requested->tv_nsec == 0) {
timeout = TIMEOUT_ZERO;
} else if (requested->tv_sec == INT_MAX) {
timeout = TIMEOUT_INFINITE;
} else {
timeout = TIMEOUT_FINITE;
if (requested->tv_sec > 0 || requested->tv_nsec >= MEASURE_NS) {
measure = true;
}
}
struct timespec before;
bool beforeIsValid = false;
audio_track_cblk_t* cblk = mCblk;
bool ignoreInitialPendingInterrupt = true;
// check for shared memory corruption
if (mIsShutdown) {
status = NO_INIT;
goto end;
}
for (;;) {
int32_t flags = android_atomic_and(~CBLK_INTERRUPT, &cblk->mFlags);
// check for track invalidation by server, or server death detection
if (flags & CBLK_INVALID) {
ALOGV("Track invalidated");
status = DEAD_OBJECT;
goto end;
}
if (flags & CBLK_DISABLED) {
ALOGV("Track disabled");
status = NOT_ENOUGH_DATA;
goto end;
}
// check for obtainBuffer interrupted by client
if (!ignoreInitialPendingInterrupt && (flags & CBLK_INTERRUPT)) {
ALOGV("obtainBuffer() interrupted by client");
status = -EINTR;
goto end;
}
ignoreInitialPendingInterrupt = false;
// compute number of frames available to write (AudioTrack) or read (AudioRecord)
int32_t front; // 环形buffer 头
int32_t rear; // 环形buffer 尾
if (mIsOut) {
// The barrier following the read of mFront is probably redundant.
// We're about to perform a conditional branch based on 'filled',
// which will force the processor to observe the read of mFront
// prior to allowing data writes starting at mRaw.
// However, the processor may support speculative execution,
// and be unable to undo speculative writes into shared memory.
// The barrier will prevent such speculative execution.
front = android_atomic_acquire_load(&cblk->u.mStreaming.mFront);
rear = cblk->u.mStreaming.mRear;
} else {
// On the other hand, this barrier is required.
rear = android_atomic_acquire_load(&cblk->u.mStreaming.mRear);
front = cblk->u.mStreaming.mFront;
}
// write to rear, read from front
ssize_t filled = audio_utils::safe_sub_overflow(rear, front);
// pipe should not be overfull
if (!(0 <= filled && (size_t) filled <= mFrameCount)) {
if (mIsOut) {
ALOGE("Shared memory control block is corrupt (filled=%zd, mFrameCount=%zu); "
"shutting down", filled, mFrameCount);
mIsShutdown = true;
status = NO_INIT;
goto end;
}
// for input, sync up on overrun
filled = 0;
cblk->u.mStreaming.mFront = rear;
(void) android_atomic_or(CBLK_OVERRUN, &cblk->mFlags);
}
// Don't allow filling pipe beyond the user settable size.
// The calculation for avail can go negative if the buffer size
// is suddenly dropped below the amount already in the buffer.
// So use a signed calculation to prevent a numeric overflow abort.
ssize_t adjustableSize = (ssize_t) getBufferSizeInFrames();
ssize_t avail = (mIsOut) ? adjustableSize - filled : filled;
if (avail < 0) {
avail = 0;
} else if (avail > 0) {
// 'avail' may be non-contiguous, so return only the first contiguous chunk
size_t part1;
if (mIsOut) {
rear &= mFrameCountP2 - 1;
part1 = mFrameCountP2 - rear;
} else {
front &= mFrameCountP2 - 1;
part1 = mFrameCountP2 - front;
}
if (part1 > (size_t)avail) {
part1 = avail;
}
if (part1 > buffer->mFrameCount) {
part1 = buffer->mFrameCount;
}
buffer->mFrameCount = part1;
buffer->mRaw = part1 > 0 ?
&((char *) mBuffers)[(mIsOut ? rear : front) * mFrameSize] : NULL; // 修改地址,这样就不用拷贝了
buffer->mNonContig = avail - part1;
mUnreleased = part1;
status = NO_ERROR;
break;
}
struct timespec remaining;
const struct timespec *ts;
switch (timeout) {
case TIMEOUT_ZERO:
status = WOULD_BLOCK;
goto end;
case TIMEOUT_INFINITE:
ts = NULL;
break;
case TIMEOUT_FINITE:
timeout = TIMEOUT_CONTINUE;
if (MAX_SEC == 0) {
ts = requested;
break;
}
FALLTHROUGH_INTENDED;
case TIMEOUT_CONTINUE:
// FIXME we do not retry if requested < 10ms? needs documentation on this state machine
if (!measure || requested->tv_sec < total.tv_sec ||
(requested->tv_sec == total.tv_sec && requested->tv_nsec <= total.tv_nsec)) {
status = TIMED_OUT;
goto end;
}
remaining.tv_sec = requested->tv_sec - total.tv_sec;
if ((remaining.tv_nsec = requested->tv_nsec - total.tv_nsec) < 0) {
remaining.tv_nsec += 1000000000;
remaining.tv_sec++;
}
if (0 < MAX_SEC && MAX_SEC < remaining.tv_sec) {
remaining.tv_sec = MAX_SEC;
remaining.tv_nsec = 0;
}
ts = &remaining;
break;
default:
LOG_ALWAYS_FATAL("obtainBuffer() timeout=%d", timeout);
ts = NULL;
break;
}
int32_t old = android_atomic_and(~CBLK_FUTEX_WAKE, &cblk->mFutex);
if (!(old & CBLK_FUTEX_WAKE)) {
if (measure && !beforeIsValid) {
clock_gettime(CLOCK_MONOTONIC, &before);
beforeIsValid = true;
}
errno = 0;
(void) syscall(__NR_futex, &cblk->mFutex,
mClientInServer ? FUTEX_WAIT_PRIVATE : FUTEX_WAIT, old & ~CBLK_FUTEX_WAKE, ts);
status_t error = errno; // clock_gettime can affect errno
// update total elapsed time spent waiting
if (measure) {
struct timespec after;
clock_gettime(CLOCK_MONOTONIC, &after);
total.tv_sec += after.tv_sec - before.tv_sec;
// Use auto instead of long to avoid the google-runtime-int warning.
auto deltaNs = after.tv_nsec - before.tv_nsec;
if (deltaNs < 0) {
deltaNs += 1000000000;
total.tv_sec--;
}
if ((total.tv_nsec += deltaNs) >= 1000000000) {
total.tv_nsec -= 1000000000;
total.tv_sec++;
}
before = after;
beforeIsValid = true;
}
switch (error) {
case 0: // normal wakeup by server, or by binderDied()
case EWOULDBLOCK: // benign race condition with server
case EINTR: // wait was interrupted by signal or other spurious wakeup
case ETIMEDOUT: // time-out expired
// FIXME these error/non-0 status are being dropped
break;
default:
status = error;
ALOGE("%s unexpected error %s", __func__, strerror(status));
goto end;
}
}
}
end:
if (status != NO_ERROR) {
buffer->mFrameCount = 0;
buffer->mRaw = NULL;
buffer->mNonContig = 0;
mUnreleased = 0;
}
if (elapsed != NULL) {
*elapsed = total;
}
if (requested == NULL) {
requested = &kNonBlocking;
}
if (measure) {
ALOGV("requested %ld.%03ld elapsed %ld.%03ld",
requested->tv_sec, requested->tv_nsec / 1000000,
total.tv_sec, total.tv_nsec / 1000000);
}
return status;
}
本质上就是共享了一块内存,这块内存有两部分,一部分记录内存使用信息,比如环形buffer头和尾的位置等,一部分记录真正的数据,由于这块内存都是共享的,所以环形buffer使用情况信息也是共享的,这样就可以直接操作了。
到了这儿AudioRecord也介绍完了。
网友评论