Skip to content

Commit

Permalink
fix(cache): add cache result callback.
Browse files Browse the repository at this point in the history
  • Loading branch information
I-m-SuperMan authored and skufly committed Apr 2, 2020
1 parent 29e3857 commit 3ec4d21
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 86 deletions.
40 changes: 16 additions & 24 deletions framework/cacheModule/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,21 @@ void CacheManager::sendMediaFrame(const unique_ptr<IAFPacket> &frame, StreamType
AF_LOGE("cacheModule error : code = %d , msg = %s ", code, msg.c_str());
mNeedProcessFrame = false;

if (mCacheFailCallback != nullptr)
{
if (mCacheFailCallback != nullptr) {
mCacheFailCallback(code, msg);
}
});
mCacheModule.setResultCallback([this](bool success) -> void {
if (success) {
if (mCacheSuccessCallback != nullptr) {
mCacheSuccessCallback();
}
} else {
if (mCacheFailCallback != nullptr) {
mCacheFailCallback(-1, mStopReason);
}
}
});
const CacheRet &startRet = mCacheModule.start();

if (startRet.mCode != CACHE_SUCCESS.mCode) {
Expand All @@ -135,36 +145,18 @@ void CacheManager::sendMediaFrame(const unique_ptr<IAFPacket> &frame, StreamType
mCacheModule.addFrame(frame, type);
}



void CacheManager::stop(const string &reason)
{
mCacheModule.stop();
CacheModule::CacheStatus status = mCacheModule.getCacheStatus();
mNeedProcessFrame = false;
mCacheModule.reset();

if (status == CacheModule::CacheStatus::fail) {
if (mCacheFailCallback != nullptr) {
mCacheFailCallback(-1, reason);
}
}
std::unique_lock<mutex> lock(mStopMutex);
mStopReason = reason;
mCacheModule.stop();
}

void CacheManager::complete()
{
if (mCacheConfig.mEnable) {
mCacheModule.streamEnd();
CacheModule::CacheStatus cacheStatus = mCacheModule.getCacheStatus();

if (cacheStatus == CacheModule::success) {
if (mCacheSuccessCallback != nullptr) {
mCacheSuccessCallback();
}
}

AF_LOGD("eventCallback ==== cacheComplete cacheSuccess is %d", cacheStatus);
}
mCacheModule.streamEnd();
}

CacheModule::CacheStatus CacheManager::getCacheStatus()
Expand Down
14 changes: 9 additions & 5 deletions framework/cacheModule/CacheManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class CacheManager {

void setCacheConfig(const CacheConfig &config);

void setDescription(const string& description);
void setDescription(const string &description);

void setSourceUrl(const string &url);

Expand All @@ -49,19 +49,23 @@ class CacheManager {

static string getCachePath(const string &url, CacheConfig &config);

void setCacheFailCallback(function<void(int,string)> resultCallback);
void setCacheFailCallback(function<void(int, string)> resultCallback);

void setCacheSuccessCallback(function<void()> resultCallback);

void sendMediaFrame(const std::unique_ptr<IAFPacket>& frame, StreamType type);
void sendMediaFrame(const std::unique_ptr<IAFPacket> &frame, StreamType type);

private:
bool mNeedProcessFrame = false;
std::mutex mStopMutex{};

std::atomic_bool mNeedProcessFrame{false};
string mStopReason{};
CacheModule mCacheModule;
CacheConfig mCacheConfig;
string mSourceUrl;
string mDescription;
ICacheDataSource *mDataSource = nullptr;
function<void(int,string)> mCacheFailCallback = nullptr;
function<void(int, string)> mCacheFailCallback = nullptr;
function<void()> mCacheSuccessCallback = nullptr;

};
Expand Down
55 changes: 29 additions & 26 deletions framework/cacheModule/CacheModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ void CacheModule::setErrorCallback(function<void(int, string)> callback)
mErrorCallback = std::move(callback);
}

void CacheModule::setResultCallback(function<void(bool)> callback)
{
mResultCallback = std::move(callback);
}

CacheRet CacheModule::start()
{
{
Expand Down Expand Up @@ -126,11 +131,32 @@ CacheRet CacheModule::start()
mCacheFileRemuxer = new CacheFileRemuxer(cacheTmpPath, mDescription);
mCacheFileRemuxer->setStreamMeta(mStreamMetas);
mCacheFileRemuxer->setErrorCallback([this](int code, string msg) -> void {
if (mErrorCallback != nullptr)
{
if (mErrorCallback != nullptr) {
mErrorCallback(code, msg);
}
});
mCacheFileRemuxer->setResultCallback([this](bool success) -> void {
const string &cachePath = mCachePath.getCachePath();
string cacheTmpFilePath = cachePath + TMP_SUFFIX;
if (success) {
int ret = FileUtils::Rename(cacheTmpFilePath.c_str(), cachePath.c_str());

if (ret == 0) {
mCacheRet = CacheStatus::success;
} else {
FileUtils::rmrf(cacheTmpFilePath.c_str());
mCacheRet = CacheStatus::fail;
}
} else {
// not completion , need delete cached file
FileUtils::rmrf(cacheTmpFilePath.c_str());
mCacheRet = CacheStatus::fail;
}

if (mResultCallback != nullptr) {
mResultCallback(success);
}
});
bool prepareSucced = mCacheFileRemuxer->prepare();

if (!prepareSucced) {
Expand Down Expand Up @@ -181,8 +207,7 @@ void CacheModule::addFrame(const unique_ptr<IAFPacket> &frame, StreamType type)
void CacheModule::streamEnd()
{
AF_LOGD("---> streamEnd()");
mEos = true;
stop();
addFrame(nullptr, StreamType::ST_TYPE_UNKNOWN);
}

void CacheModule::stop()
Expand All @@ -199,27 +224,6 @@ void CacheModule::stop()
if (mCacheFileRemuxer != nullptr) {
mCacheFileRemuxer->interrupt();
mCacheFileRemuxer->stop();
bool remuxSuc = mCacheFileRemuxer->isRemuxSuccess();
delete mCacheFileRemuxer;
mCacheFileRemuxer = nullptr;
const string &cachePath = mCachePath.getCachePath();
string cacheTmpPath = cachePath + TMP_SUFFIX;

if (mEos && remuxSuc) {
// completion , rename file
int ret = FileUtils::Rename(cacheTmpPath.c_str(), cachePath.c_str());

if (ret == 0) {
mCacheRet = CacheStatus::success;
} else {
FileUtils::rmrf(cacheTmpPath.c_str());
mCacheRet = CacheStatus::fail;
}
} else {
// not completion , need delete cached file
FileUtils::rmrf(cacheTmpPath.c_str());
mCacheRet = CacheStatus::fail;
}
}
}
}
Expand All @@ -228,7 +232,6 @@ void CacheModule::reset()
{
AF_LOGD("---> reset()");
unique_lock<mutex> lock(mStatusMutex);
mEos = false;
mMediaInfoSet = false;
mCacheRet = CacheStatus::idle;;
mCacheChecker.reset();
Expand Down
6 changes: 4 additions & 2 deletions framework/cacheModule/CacheModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class CacheModule {

void setErrorCallback(function<void(int, string)> callback);

void setResultCallback(function<void(bool)> callback);

CacheRet checkCanBeCached(const string &acturalPlayURL);

CacheRet start();
Expand Down Expand Up @@ -96,7 +98,6 @@ class CacheModule {
private:

bool mMediaInfoSet = false;
bool mEos = false;
CacheStatus mCacheRet = CacheStatus::idle;

mutex mStatusMutex;
Expand All @@ -109,7 +110,8 @@ class CacheModule {
CachePath mCachePath;
string mDescription;

function<void(int, string)> mErrorCallback = nullptr;
function<void(int, string)> mErrorCallback = nullptr;
function<void(bool)> mResultCallback = nullptr;

vector<Stream_meta*> mStreamMetas{};

Expand Down
68 changes: 49 additions & 19 deletions framework/cacheModule/cache/CacheFileRemuxer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ CacheFileRemuxer::~CacheFileRemuxer()

void CacheFileRemuxer::addFrame(const unique_ptr<IAFPacket> &frame, StreamType type)
{
FrameInfo *info = new FrameInfo();
info->frame = frame->clone();
info->type = type;
{
std::unique_lock<mutex> lock(mQueueMutex);
mFrameInfoQueue.push_back(std::unique_ptr<FrameInfo>(info));
mQueueCondition.notify_one();
if (frame == nullptr) {
mFrameEof = true;
} else {
mFrameEof = false;

FrameInfo *info = new FrameInfo();
info->frame = frame->clone();
info->type = type;
{
std::unique_lock<mutex> lock(mQueueMutex);
mFrameInfoQueue.push_back(std::unique_ptr<FrameInfo>(info));
mQueueCondition.notify_one();
}
}
}

Expand Down Expand Up @@ -128,14 +134,22 @@ int CacheFileRemuxer::muxThreadRun()
return -1;
}

bool hasError = false;

while (true) {
FrameInfo frameInfo;
{
std::unique_lock<mutex> lock(mQueueMutex);

if (mFrameInfoQueue.empty()) {

if (mFrameEof) {
AF_LOGW("muxThreadRun() mFrameEof...");
break;
}

mQueueCondition.wait_for(lock, std::chrono::milliseconds(10),
[this]() { return this->mInterrupt || this->mWantStop; });
[this]() { return this->mInterrupt || this->mWantStop || this->mFrameEof; });

} else {
unique_ptr<FrameInfo> &frameInfo = mFrameInfoQueue.front();
int ret = mMuxer->muxPacket(move(frameInfo->frame));
Expand All @@ -146,27 +160,42 @@ int CacheFileRemuxer::muxThreadRun()

//no space error .
if (ENOSPC == errno) {
hasError = true;
sendError(CACHE_ERROR_NO_SPACE);
break;
}
}
}
}
{
if (mInterrupt || mWantStop) {
AF_LOGW("muxThreadRun() mInterrupt || mWantStop...");
break;
}

if (mInterrupt || mWantStop) {
AF_LOGW("muxThreadRun() mInterrupt || mWantStop...");
break;
}
}

int ret = mMuxer->close();

if (ret < 0) {
AF_LOGW("muxThreadRun() mMuxer close ret = %d ", ret);
hasError = true;
sendError(CACHE_ERROR_MUXER_CLOSE);
}

if (hasError) {
return -1;
}

bool muxSuccess = false;
if (mInterrupt || mWantStop) {
muxSuccess = false;
} else if (mFrameEof) {
muxSuccess = true;
}

if (mResultCallback != nullptr) {
mResultCallback(muxSuccess);
}

AF_LOGD("muxThreadRun() end...");
return -1;
}
Expand Down Expand Up @@ -237,6 +266,11 @@ void CacheFileRemuxer::setErrorCallback(function<void(int, string)> callback)
mErrorCallback = callback;
}

void CacheFileRemuxer::setResultCallback(function<void(bool)> callback)
{
mResultCallback = callback;
}

void CacheFileRemuxer::setStreamMeta(const vector<Stream_meta *> &streamMetas)
{
clearStreamMetas();
Expand Down Expand Up @@ -270,7 +304,3 @@ void CacheFileRemuxer::sendError(const CacheRet &ret)
}
}

bool CacheFileRemuxer::isRemuxSuccess()
{
return mRemuxSuc;
}
6 changes: 4 additions & 2 deletions framework/cacheModule/cache/CacheFileRemuxer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class CacheFileRemuxer {

void interrupt();

bool isRemuxSuccess();

void setErrorCallback(function<void(int, string)> callback);

void setResultCallback(function<void(bool)> callback);

void setStreamMeta(const vector<Stream_meta *> &streamMetas);

void clearStreamMetas();
Expand Down Expand Up @@ -79,6 +79,7 @@ private :

std::atomic_bool mInterrupt{false};
std::atomic_bool mWantStop {false};
std::atomic_bool mFrameEof{false};
bool mRemuxSuc = true;


Expand All @@ -91,6 +92,7 @@ private :
FileCntl *mDestFileCntl = nullptr;

function<void(int, string)> mErrorCallback = nullptr;
function<void(bool)> mResultCallback = nullptr;

vector<Stream_meta *> mStreamMetas{};

Expand Down
Loading

0 comments on commit 3ec4d21

Please sign in to comment.