From 3ec4d216a1fa7bf1d7629360f1bb8a06448e388b Mon Sep 17 00:00:00 2001 From: lifujun <814084764@qq.com> Date: Thu, 2 Apr 2020 10:42:49 +0800 Subject: [PATCH] fix(cache): add cache result callback. --- framework/cacheModule/CacheManager.cpp | 40 +++++------ framework/cacheModule/CacheManager.h | 14 ++-- framework/cacheModule/CacheModule.cpp | 55 ++++++++------- framework/cacheModule/CacheModule.h | 6 +- .../cacheModule/cache/CacheFileRemuxer.cpp | 68 +++++++++++++------ .../cacheModule/cache/CacheFileRemuxer.h | 6 +- mediaPlayer/MediaPlayer.cpp | 15 ++-- 7 files changed, 118 insertions(+), 86 deletions(-) diff --git a/framework/cacheModule/CacheManager.cpp b/framework/cacheModule/CacheManager.cpp index cc940b51c..2392b2a23 100644 --- a/framework/cacheModule/CacheManager.cpp +++ b/framework/cacheModule/CacheManager.cpp @@ -114,11 +114,21 @@ void CacheManager::sendMediaFrame(const unique_ptr &frame, StreamType AF_LOGE("cacheModule error : code = %d , msg = %s ", code, msg.c_str()); mNeedProcessFrame = false; - if (mCacheFailCallback != nullptr) - { + if (mCacheFailCallback != nullptr) { mCacheFailCallback(code, msg); } }); + mCacheModule.setResultCallback([this](bool success) -> void { + if (success) { + if (mCacheSuccessCallback != nullptr) { + mCacheSuccessCallback(); + } + } else { + if (mCacheFailCallback != nullptr) { + mCacheFailCallback(-1, mStopReason); + } + } + }); const CacheRet &startRet = mCacheModule.start(); if (startRet.mCode != CACHE_SUCCESS.mCode) { @@ -135,36 +145,18 @@ void CacheManager::sendMediaFrame(const unique_ptr &frame, StreamType mCacheModule.addFrame(frame, type); } - - void CacheManager::stop(const string &reason) { - mCacheModule.stop(); - CacheModule::CacheStatus status = mCacheModule.getCacheStatus(); mNeedProcessFrame = false; - mCacheModule.reset(); - if (status == CacheModule::CacheStatus::fail) { - if (mCacheFailCallback != nullptr) { - mCacheFailCallback(-1, reason); - } - } + std::unique_lock lock(mStopMutex); + mStopReason = reason; + mCacheModule.stop(); } void CacheManager::complete() { - if (mCacheConfig.mEnable) { - mCacheModule.streamEnd(); - CacheModule::CacheStatus cacheStatus = mCacheModule.getCacheStatus(); - - if (cacheStatus == CacheModule::success) { - if (mCacheSuccessCallback != nullptr) { - mCacheSuccessCallback(); - } - } - - AF_LOGD("eventCallback ==== cacheComplete cacheSuccess is %d", cacheStatus); - } + mCacheModule.streamEnd(); } CacheModule::CacheStatus CacheManager::getCacheStatus() diff --git a/framework/cacheModule/CacheManager.h b/framework/cacheModule/CacheManager.h index d5b9b3064..3c23b0495 100644 --- a/framework/cacheModule/CacheManager.h +++ b/framework/cacheModule/CacheManager.h @@ -31,7 +31,7 @@ class CacheManager { void setCacheConfig(const CacheConfig &config); - void setDescription(const string& description); + void setDescription(const string &description); void setSourceUrl(const string &url); @@ -49,19 +49,23 @@ class CacheManager { static string getCachePath(const string &url, CacheConfig &config); - void setCacheFailCallback(function resultCallback); + void setCacheFailCallback(function resultCallback); + void setCacheSuccessCallback(function resultCallback); - void sendMediaFrame(const std::unique_ptr& frame, StreamType type); + void sendMediaFrame(const std::unique_ptr &frame, StreamType type); private: - bool mNeedProcessFrame = false; + std::mutex mStopMutex{}; + + std::atomic_bool mNeedProcessFrame{false}; + string mStopReason{}; CacheModule mCacheModule; CacheConfig mCacheConfig; string mSourceUrl; string mDescription; ICacheDataSource *mDataSource = nullptr; - function mCacheFailCallback = nullptr; + function mCacheFailCallback = nullptr; function mCacheSuccessCallback = nullptr; }; diff --git a/framework/cacheModule/CacheModule.cpp b/framework/cacheModule/CacheModule.cpp index 1adecb8be..7791ff8a5 100644 --- a/framework/cacheModule/CacheModule.cpp +++ b/framework/cacheModule/CacheModule.cpp @@ -94,6 +94,11 @@ void CacheModule::setErrorCallback(function callback) mErrorCallback = std::move(callback); } +void CacheModule::setResultCallback(function callback) +{ + mResultCallback = std::move(callback); +} + CacheRet CacheModule::start() { { @@ -126,11 +131,32 @@ CacheRet CacheModule::start() mCacheFileRemuxer = new CacheFileRemuxer(cacheTmpPath, mDescription); mCacheFileRemuxer->setStreamMeta(mStreamMetas); mCacheFileRemuxer->setErrorCallback([this](int code, string msg) -> void { - if (mErrorCallback != nullptr) - { + if (mErrorCallback != nullptr) { mErrorCallback(code, msg); } }); + mCacheFileRemuxer->setResultCallback([this](bool success) -> void { + const string &cachePath = mCachePath.getCachePath(); + string cacheTmpFilePath = cachePath + TMP_SUFFIX; + if (success) { + int ret = FileUtils::Rename(cacheTmpFilePath.c_str(), cachePath.c_str()); + + if (ret == 0) { + mCacheRet = CacheStatus::success; + } else { + FileUtils::rmrf(cacheTmpFilePath.c_str()); + mCacheRet = CacheStatus::fail; + } + } else { + // not completion , need delete cached file + FileUtils::rmrf(cacheTmpFilePath.c_str()); + mCacheRet = CacheStatus::fail; + } + + if (mResultCallback != nullptr) { + mResultCallback(success); + } + }); bool prepareSucced = mCacheFileRemuxer->prepare(); if (!prepareSucced) { @@ -181,8 +207,7 @@ void CacheModule::addFrame(const unique_ptr &frame, StreamType type) void CacheModule::streamEnd() { AF_LOGD("---> streamEnd()"); - mEos = true; - stop(); + addFrame(nullptr, StreamType::ST_TYPE_UNKNOWN); } void CacheModule::stop() @@ -199,27 +224,6 @@ void CacheModule::stop() if (mCacheFileRemuxer != nullptr) { mCacheFileRemuxer->interrupt(); mCacheFileRemuxer->stop(); - bool remuxSuc = mCacheFileRemuxer->isRemuxSuccess(); - delete mCacheFileRemuxer; - mCacheFileRemuxer = nullptr; - const string &cachePath = mCachePath.getCachePath(); - string cacheTmpPath = cachePath + TMP_SUFFIX; - - if (mEos && remuxSuc) { - // completion , rename file - int ret = FileUtils::Rename(cacheTmpPath.c_str(), cachePath.c_str()); - - if (ret == 0) { - mCacheRet = CacheStatus::success; - } else { - FileUtils::rmrf(cacheTmpPath.c_str()); - mCacheRet = CacheStatus::fail; - } - } else { - // not completion , need delete cached file - FileUtils::rmrf(cacheTmpPath.c_str()); - mCacheRet = CacheStatus::fail; - } } } } @@ -228,7 +232,6 @@ void CacheModule::reset() { AF_LOGD("---> reset()"); unique_lock lock(mStatusMutex); - mEos = false; mMediaInfoSet = false; mCacheRet = CacheStatus::idle;; mCacheChecker.reset(); diff --git a/framework/cacheModule/CacheModule.h b/framework/cacheModule/CacheModule.h index 99b7c5def..3e0601abc 100644 --- a/framework/cacheModule/CacheModule.h +++ b/framework/cacheModule/CacheModule.h @@ -64,6 +64,8 @@ class CacheModule { void setErrorCallback(function callback); + void setResultCallback(function callback); + CacheRet checkCanBeCached(const string &acturalPlayURL); CacheRet start(); @@ -96,7 +98,6 @@ class CacheModule { private: bool mMediaInfoSet = false; - bool mEos = false; CacheStatus mCacheRet = CacheStatus::idle; mutex mStatusMutex; @@ -109,7 +110,8 @@ class CacheModule { CachePath mCachePath; string mDescription; - function mErrorCallback = nullptr; + function mErrorCallback = nullptr; + function mResultCallback = nullptr; vector mStreamMetas{}; diff --git a/framework/cacheModule/cache/CacheFileRemuxer.cpp b/framework/cacheModule/cache/CacheFileRemuxer.cpp index 69754d192..740f86787 100644 --- a/framework/cacheModule/cache/CacheFileRemuxer.cpp +++ b/framework/cacheModule/cache/CacheFileRemuxer.cpp @@ -44,13 +44,19 @@ CacheFileRemuxer::~CacheFileRemuxer() void CacheFileRemuxer::addFrame(const unique_ptr &frame, StreamType type) { - FrameInfo *info = new FrameInfo(); - info->frame = frame->clone(); - info->type = type; - { - std::unique_lock lock(mQueueMutex); - mFrameInfoQueue.push_back(std::unique_ptr(info)); - mQueueCondition.notify_one(); + if (frame == nullptr) { + mFrameEof = true; + } else { + mFrameEof = false; + + FrameInfo *info = new FrameInfo(); + info->frame = frame->clone(); + info->type = type; + { + std::unique_lock lock(mQueueMutex); + mFrameInfoQueue.push_back(std::unique_ptr(info)); + mQueueCondition.notify_one(); + } } } @@ -128,14 +134,22 @@ int CacheFileRemuxer::muxThreadRun() return -1; } + bool hasError = false; + while (true) { - FrameInfo frameInfo; { std::unique_lock lock(mQueueMutex); if (mFrameInfoQueue.empty()) { + + if (mFrameEof) { + AF_LOGW("muxThreadRun() mFrameEof..."); + break; + } + mQueueCondition.wait_for(lock, std::chrono::milliseconds(10), - [this]() { return this->mInterrupt || this->mWantStop; }); + [this]() { return this->mInterrupt || this->mWantStop || this->mFrameEof; }); + } else { unique_ptr &frameInfo = mFrameInfoQueue.front(); int ret = mMuxer->muxPacket(move(frameInfo->frame)); @@ -146,27 +160,42 @@ int CacheFileRemuxer::muxThreadRun() //no space error . if (ENOSPC == errno) { + hasError = true; sendError(CACHE_ERROR_NO_SPACE); break; } } } } - { - if (mInterrupt || mWantStop) { - AF_LOGW("muxThreadRun() mInterrupt || mWantStop..."); - break; - } + + if (mInterrupt || mWantStop) { + AF_LOGW("muxThreadRun() mInterrupt || mWantStop..."); + break; } } int ret = mMuxer->close(); - if (ret < 0) { AF_LOGW("muxThreadRun() mMuxer close ret = %d ", ret); + hasError = true; sendError(CACHE_ERROR_MUXER_CLOSE); } + if (hasError) { + return -1; + } + + bool muxSuccess = false; + if (mInterrupt || mWantStop) { + muxSuccess = false; + } else if (mFrameEof) { + muxSuccess = true; + } + + if (mResultCallback != nullptr) { + mResultCallback(muxSuccess); + } + AF_LOGD("muxThreadRun() end..."); return -1; } @@ -237,6 +266,11 @@ void CacheFileRemuxer::setErrorCallback(function callback) mErrorCallback = callback; } +void CacheFileRemuxer::setResultCallback(function callback) +{ + mResultCallback = callback; +} + void CacheFileRemuxer::setStreamMeta(const vector &streamMetas) { clearStreamMetas(); @@ -270,7 +304,3 @@ void CacheFileRemuxer::sendError(const CacheRet &ret) } } -bool CacheFileRemuxer::isRemuxSuccess() -{ - return mRemuxSuc; -} diff --git a/framework/cacheModule/cache/CacheFileRemuxer.h b/framework/cacheModule/cache/CacheFileRemuxer.h index 27a171c75..318e7cb20 100644 --- a/framework/cacheModule/cache/CacheFileRemuxer.h +++ b/framework/cacheModule/cache/CacheFileRemuxer.h @@ -46,10 +46,10 @@ class CacheFileRemuxer { void interrupt(); - bool isRemuxSuccess(); - void setErrorCallback(function callback); + void setResultCallback(function callback); + void setStreamMeta(const vector &streamMetas); void clearStreamMetas(); @@ -79,6 +79,7 @@ private : std::atomic_bool mInterrupt{false}; std::atomic_bool mWantStop {false}; + std::atomic_bool mFrameEof{false}; bool mRemuxSuc = true; @@ -91,6 +92,7 @@ private : FileCntl *mDestFileCntl = nullptr; function mErrorCallback = nullptr; + function mResultCallback = nullptr; vector mStreamMetas{}; diff --git a/mediaPlayer/MediaPlayer.cpp b/mediaPlayer/MediaPlayer.cpp index ef95419b3..efe0497d9 100644 --- a/mediaPlayer/MediaPlayer.cpp +++ b/mediaPlayer/MediaPlayer.cpp @@ -225,7 +225,13 @@ namespace Cicada { AF_LOGE("Cache fail : code = %d , msg = %s", code, msg.c_str()); this->eventCallback(MEDIA_PLAYER_EVENT_CACHE_ERROR, msg.c_str(), this); }); - mCacheManager->setCacheSuccessCallback([this]()->void{ + mCacheManager->setCacheSuccessCallback([this]() -> void { + if (IsLoop()) { + //if cache success and want play loop, + // we set loop false to let onCompletion callback deal loop. + CicadaSetLoop(static_cast(mPlayerHandle), false); + } + this->eventCallback(MEDIA_PLAYER_EVENT_CACHE_SUCCESS, nullptr, this); }); ICacheDataSource *cacheDataSource = new PlayerCacheDataSource(mPlayerHandle); @@ -746,13 +752,6 @@ namespace Cicada { #ifdef ENABLE_CACHE_MODULE if (player->mCacheManager != nullptr) { player->mCacheManager->complete(); - CacheModule::CacheStatus cacheStatus = player->mCacheManager->getCacheStatus(); - - if (cacheStatus == CacheModule::CacheStatus::success && player->IsLoop()) { - //if cache success and want play loop, - // we set loop false to let onCompletion callback deal loop. - CicadaSetLoop(static_cast(player->mPlayerHandle), false); - } } #endif }