From 4e6f79ddc811aa6457c399b1b22038750e2a1420 Mon Sep 17 00:00:00 2001 From: pingkai Date: Mon, 20 Jan 2020 13:09:41 +0800 Subject: [PATCH] feat: fmp4 Signed-off-by: pingkai --- cmdline/cicadaPlayer.cpp | 2 +- framework/demuxer/avFormatDemuxer.cpp | 4 + framework/demuxer/play_list/HLSStream.cpp | 409 ++++++++++++---------- framework/demuxer/play_list/HLSStream.h | 19 +- framework/demuxer/play_list/HlsParser.cpp | 44 +-- framework/demuxer/play_list/SegmentList.h | 11 +- framework/demuxer/play_list/segment.h | 2 + 7 files changed, 289 insertions(+), 202 deletions(-) diff --git a/cmdline/cicadaPlayer.cpp b/cmdline/cicadaPlayer.cpp index 87dfd4e0f..65ceed812 100644 --- a/cmdline/cicadaPlayer.cpp +++ b/cmdline/cicadaPlayer.cpp @@ -108,7 +108,7 @@ int main(int argc, const char **argv) #endif NetWorkEventReceiver netWorkEventReceiver(eListener); player->SetListener(pListener); - player->SetDefaultBandWidth(100000000); + player->SetDefaultBandWidth(1000*1000); player->SetDataSource(url.c_str()); player->SetAutoPlay(true); player->SetLoop(true); diff --git a/framework/demuxer/avFormatDemuxer.cpp b/framework/demuxer/avFormatDemuxer.cpp index 0fdba960a..fc880734c 100644 --- a/framework/demuxer/avFormatDemuxer.cpp +++ b/framework/demuxer/avFormatDemuxer.cpp @@ -96,6 +96,10 @@ namespace Cicada { av_dict_set_int(&mInputOpts, "safe", 0, 0); av_dict_set(&mInputOpts, "protocol_whitelist", "file,http,https,tcp,tls", 0); + + + + /*If a url with mp4 ext name, but is not a mp4 file, the mp4 demuxer will be matched * by ext name , mp4 demuxer will try to find moov box, it will ignore the return value * of the avio_*, and don't check interrupt flag, if the url is a network file, here will diff --git a/framework/demuxer/play_list/HLSStream.cpp b/framework/demuxer/play_list/HLSStream.cpp index 4d3286785..3d1965dc3 100644 --- a/framework/demuxer/play_list/HLSStream.cpp +++ b/framework/demuxer/play_list/HLSStream.cpp @@ -21,9 +21,11 @@ namespace Cicada { + static const int defaultInitSegSize = 1024 * 1024; + HLSStream::HLSStream(SegmentTracker *pTracker, int id) - : mPTracker(pTracker), - mId(id) + : mPTracker(pTracker), + mId(id) { } @@ -33,6 +35,8 @@ namespace Cicada { delete mThreadPtr; delete mPTracker; mStreamStartTimeMap.clear(); + if (mInitSegBuffer) + free(mInitSegBuffer); } int HLSStream::getStreamType() @@ -60,7 +64,6 @@ namespace Cicada { { auto *pHandle = static_cast(arg); int ret; - if (pHandle->mInterrupted) { return -EIO; } @@ -71,38 +74,50 @@ namespace Cicada { return 0; } - { - if (pHandle->mDataSourceStatus == dataSource_status_invalid) { - // AF_LOGD("%s : %d timeout ", __func__, __LINE__); - return -EAGAIN; - } else if (pHandle->mDataSourceStatus == dataSource_status_error) { - return pHandle->mDataSourceError; + if (pHandle->mInitSegBuffer) { + + int initSegSize = pHandle->mInitSegSize - pHandle->mInitSegPtr; + if (initSegSize > 0) { + int readSize = std::min(initSegSize, size); + + memcpy(buffer, pHandle->mInitSegBuffer + pHandle->mInitSegPtr, readSize); + pHandle->mInitSegPtr += readSize; + return readSize; } - if (pHandle->mSegDecrypter == nullptr) { - if (pHandle->mExtDataSource) { - ret = pHandle->mExtDataSource->Read(buffer, size); - } else { - ret = pHandle->mPdataSource->Read(buffer, (size_t) size); - } + } + ret = pHandle->readSegment(buffer, size); + return ret; + } + + int HLSStream::readSegment(const uint8_t *buffer, int size) + { + int ret; + if (mSegDecrypter == nullptr) { + if (mExtDataSource) { + ret = mExtDataSource->Read((void *) buffer, size); } else { - ret = pHandle->mSegDecrypter->Read(buffer, size); + ret = mPdataSource->Read((void *) buffer, (size_t) size); } + } else { + ret = mSegDecrypter->Read(const_cast(buffer), size); + } + return ret; + } - if (ret == 0) {//eos - pHandle->resetSource(); - pHandle->mDataSourceStatus = dataSource_status_invalid; - AF_LOGI("%s : %d send SEGEND to ffmpeg stream(%d)", __func__, __LINE__, - pHandle->mPTracker->getStreamType()); - return SEGEND; - } else if (ret < 0) { - //TODO: deal with error - if (ret == FRAMEWORK_ERR_EXIT) { - return -EAGAIN; - } + int64_t HLSStream::seekSegment(off_t offset, int whence) + { + int ret; + if (mSegDecrypter == nullptr) { + if (mExtDataSource) { + ret = mExtDataSource->Seek(offset, whence); + } else { + ret = mPdataSource->Seek(offset, whence); } + } else { + // ret = mSegDecrypter->Read(const_cast(buffer), size); + return -EINVAL; } - return ret; } @@ -126,6 +141,48 @@ namespace Cicada { return 0; } + int HLSStream::upDateInitSection() + { + string uri; + int ret; + + if (!mCurSeg || !mCurSeg->init_section || mCurInitSeg == mCurSeg->init_section) + return 0; + + uri = Helper::combinePaths(mPTracker->getBaseUri(), mCurSeg->init_section->mUri); + + ret = tryOpenSegment(uri); + + if (ret < 0) { + return ret; + } + mCurInitSeg = mCurSeg->init_section; + mInitSegSize = defaultInitSegSize; + mInitSegSize = seekSegment(0, SEEK_SIZE); + if (mInitSegSize < 0) + mInitSegSize = defaultInitSegSize; + + if (mInitSegBuffer) + free(mInitSegBuffer); + + mInitSegBuffer = static_cast(malloc(mInitSegSize)); + + int size = 0; + + do { +// TODO: realloc mInitSegBuffer + ret = readSegment(mInitSegBuffer + size, mInitSegSize - size); + if (ret > 0) + size += ret; + else + break; + } while (size < mInitSegSize); + + mInitSegSize = size; + mInitSegPtr = 0; + return 0; + } + int HLSStream::open_internal() { int ret; @@ -189,23 +246,15 @@ namespace Cicada { return -1; } } - - string uri = Helper::combinePaths(mPTracker->getBaseUri(), - mCurSeg->mUri); + ret = upDateInitSection(); + if (ret < 0) { + return ret; + } + string uri; + uri = Helper::combinePaths(mPTracker->getBaseUri(), + mCurSeg->mUri); AF_LOGD("open uri is %s seq is %llu\n", uri.c_str(), mCurSeg->sequence); - int retryTimes = 0; - - do { - resetSource(); - ret = openSegment(uri); - retryTimes++; - - if (retryTimes > 2) { - break; - } - - af_msleep(20); - } while (isHttpError(ret) && !mInterrupted); + ret = tryOpenSegment(uri); if (isHttpError(ret)) { resetSource(); @@ -219,7 +268,6 @@ namespace Cicada { if (ret < 0) { AF_TRACE; - mDataSourceStatus = dataSource_status_error; mDataSourceError = ret; resetSource(); return ret; @@ -230,29 +278,7 @@ namespace Cicada { return FRAMEWORK_ERR_EXIT; } - mError = 0; - mDataSourceError = 0; - ret = updateDecrypter(); - - if (ret < 0) { - return ret; - } - - mDataSourceStatus = dataSource_status_valid; - mPDemuxer = new demuxer_service(nullptr); - mPDemuxer->setOptions(mOpts); - unique_ptr demuxerMeta = unique_ptr(new DemuxerMeta()); - demuxerMeta->ownerUrl = mPTracker->getPlayListUri(); - mPDemuxer->setDemuxerMeta(demuxerMeta); - mPDemuxer->SetDataCallBack(read_callback, this, nullptr, nullptr, nullptr); - mPDemuxer->setSampleDecryptor(mSampeAesDecrypter.get()); - ret = mPDemuxer->createDemuxer(demuxer_type_unknown); - - if (mPDemuxer->getDemuxerHandle()) { - mPDemuxer->getDemuxerHandle()->setBitStreamFormat(mMergeVideoHeader, mMergerAudioHeader); - } - - ret = mPDemuxer->initOpen(); + ret = createDemuxer(); if (ret >= 0) { int nbStream = mPDemuxer->GetNbStreams(); @@ -274,6 +300,7 @@ namespace Cicada { mIsOpened_internal = true; } else { AF_LOGE("open demuxer error %d\n", ret); + return ret; } // mStatus = status_inited; @@ -286,6 +313,58 @@ namespace Cicada { return ret; } + int HLSStream::createDemuxer() + { + + int ret; + mError = 0; + mDataSourceError = 0; + ret = updateDecrypter(); + + if (ret < 0) { + return ret; + } + + { + std::lock_guard lock(mHLSMutex); + mPDemuxer = std::unique_ptr(new demuxer_service(nullptr)); + } + + mPDemuxer->setOptions(this->mOpts); + unique_ptr demuxerMeta = unique_ptr(new DemuxerMeta()); + demuxerMeta->ownerUrl = mPTracker->getPlayListUri(); + mPDemuxer->setDemuxerMeta(demuxerMeta); + mPDemuxer->SetDataCallBack(read_callback, this, nullptr, nullptr, nullptr); + mPDemuxer->setSampleDecryptor(this->mSampeAesDecrypter.get()); + ret = mPDemuxer->createDemuxer(demuxer_type_unknown); + + if (mPDemuxer->getDemuxerHandle()) { + mPDemuxer->getDemuxerHandle()->setBitStreamFormat(this->mMergeVideoHeader, this->mMergerAudioHeader); + } + + ret = mPDemuxer->initOpen(); + return ret; + } + + int HLSStream::tryOpenSegment(const string &uri) + { + int retryTimes = 0; + int ret; + + do { + resetSource(); + ret = openSegment(uri); + retryTimes++; + + if (retryTimes > 2) { + break; + } + + af_msleep(20); + } while (isHttpError(ret) && !mInterrupted); + return ret; + } + int HLSStream::openSegment(const string &uri) { int ret; @@ -478,7 +557,6 @@ namespace Cicada { if (mPDemuxer) { mPDemuxer->close(); std::lock_guard lock(mHLSMutex); - delete mPDemuxer; mPDemuxer = nullptr; } @@ -629,6 +707,65 @@ namespace Cicada { FILE *file = NULL; + + int HLSStream::updateSegment() + { + shared_ptr seg = nullptr; + AF_LOGD("getCurSegNum is %lld\n", mPTracker->getCurSegNum()); + seg = mPTracker->getNextSegment(); + int ret; + mCurSeg = nullptr; + if (seg) { + do { + mCurSeg = seg; + string uri = Helper::combinePaths(mPTracker->getBaseUri(), + seg->mUri); + ret = tryOpenSegment(uri); + + if (isHttpError(ret)) { + resetSource(); + seg = mPTracker->getNextSegment(); + + if (seg) { + af_msleep(20); + continue; + } else if (mPTracker->isLive()) { + return -EAGAIN; + } else { + //no more seg + break; + } + } + } while (isHttpError(ret)); + + if (ret < 0) { + mDataSourceError = ret; + + if (ret != FRAMEWORK_ERR_EXIT) { + mError = ret; + } + + resetSource(); + return ret; + } + + AF_LOGD("stream(%p) read seg %s seqno is %llu\n", this, seg->mUri.c_str(), + seg->getSequenceNumber()); + ret = updateDecrypter(); + + if (ret < 0) { + return ret; + } + + return 0; + } else if (mPTracker->getDuration() > 0) { + AF_LOGE("EOS"); + mIsDataEOS = true; + return 0; + } + return -EAGAIN; + } + int HLSStream::read_internal(std::unique_ptr &packet) { //TODO: move read synMsgRst to class member @@ -650,128 +787,51 @@ namespace Cicada { AF_LOGW("mPDemuxer->readPacket FRAMEWORK_ERR_EXIT\n"); } - if (ret == SEGEND) { - AF_LOGI("%s : %d receive SEGEND ffmpeg stream (%d)", __func__, __LINE__, - mPTracker->getStreamType()); - } else if (mReopen) { - // AF_LOGE("reopen"); - } - if (ret == SEGEND && mStopOnSegEnd) { + if (mStopOnSegEnd) { mIsEOS = true; AF_LOGE("mStopOnSegEnd"); return -EAGAIN; } - if (ret == SEGEND || mReopen) { + if (ret == 0 || mReopen) { if (mReopen) { AF_LOGD("reopen"); mReopen = false; } - shared_ptr seg = nullptr; - AF_LOGD("getCurSegNum is %lld\n", mPTracker->getCurSegNum()); - seg = mPTracker->getNextSegment(); - - if (seg) { - do { - mCurSeg = seg; - // TODO: use the same dataSource - //TODO: interrupt dataSource and return error - string uri = Helper::combinePaths(mPTracker->getBaseUri(), - seg->mUri); - int retryTimes = 0; - - do { - resetSource(); - ret = openSegment(uri); - retryTimes++; - - if (retryTimes > 2) { - break; - } - - af_msleep(20); - } while (isHttpError(ret)); - - if (isHttpError(ret)) { - resetSource(); - seg = mPTracker->getNextSegment(); - - if (seg) { - af_msleep(20); - continue; - } else if (mPTracker->isLive()) { - return -EAGAIN; - } else { - //no more seg - break; + ret = updateSegment(); + if (ret >= 0) { + upDateInitSection(); + mInitSegPtr = 0; + ret = createDemuxer(); + if (ret >= 0) { + int nbStream = mPDemuxer->GetNbStreams(); + AF_LOGI("file have %d streams\n", nbStream); + // open all stream in demuxer + Stream_meta meta{}; + + for (int i = 0; i < nbStream; ++i) { + mPDemuxer->GetStreamMeta(&meta, i, false); + + if (meta.type == mPTracker->getStreamType() + || mPTracker->getStreamType() == STREAM_TYPE_MIXED) { + mPDemuxer->OpenStream(i); } - } - } while (isHttpError(ret)); - - if (ret < 0) { - mDataSourceStatus = dataSource_status_error; - mDataSourceError = ret; - if (ret != FRAMEWORK_ERR_EXIT) { - mError = ret; + releaseMeta(&meta); } - - resetSource(); - return ret; } - - AF_LOGD("stream(%p) read seg %s seqno is %llu\n", this, seg->mUri.c_str(), - seg->getSequenceNumber()); - ret = updateDecrypter(); - - if (ret < 0) { - return ret; - } - - mDataSourceStatus = dataSource_status_valid; - } else if (mPTracker->getDuration() > 0) { - AF_LOGE("EOS"); - mIsDataEOS = true; - // return 0; } - packet = nullptr; return -EAGAIN; + } if (ret == -EAGAIN && mPTracker->getDuration() == 0) { - shared_ptr seg = nullptr; - seg = mPTracker->getNextSegment(); - - if (seg) { - mCurSeg = seg; - string uri = Helper::combinePaths(mPTracker->getBaseUri(), seg->mUri); - ret = openSegment(uri); - - if (ret < 0) { - mDataSourceStatus = dataSource_status_error; - mDataSourceError = ret; - - if (ret != FRAMEWORK_ERR_EXIT) { - mError = ret; - } - - resetSource(); - return ret; - } - - AF_LOGD("stream (%d) read seg %s\n", mPTracker->getStreamType(), seg->mUri.c_str()); - ret = updateDecrypter(); - - if (ret < 0) { - return ret; - } - - mDataSourceStatus = dataSource_status_valid; - } - + ret = updateSegment(); + if (ret < 0) + return ret; return -EAGAIN; } @@ -816,8 +876,8 @@ namespace Cicada { } if (packet->getInfo().pts == INT64_MIN - && mStreamStartTimeMap[streamIndex].lastFramePts != INT64_MIN - && mStreamStartTimeMap[streamIndex].frameDuration != INT64_MIN) { + && mStreamStartTimeMap[streamIndex].lastFramePts != INT64_MIN + && mStreamStartTimeMap[streamIndex].frameDuration != INT64_MIN) { packet->getInfo().pts = mStreamStartTimeMap[streamIndex].lastFramePts + mStreamStartTimeMap[streamIndex].frameDuration; } @@ -934,7 +994,6 @@ namespace Cicada { } resetSource(); - mDataSourceStatus = dataSource_status_invalid; { std::lock_guard lock(mHLSMutex); @@ -1011,7 +1070,6 @@ namespace Cicada { clearDataFrames(); if (reqReOpen) { - mDataSourceStatus = dataSource_status_invalid; resetSource(); if (mIsOpened_internal) { @@ -1022,7 +1080,7 @@ namespace Cicada { mPTracker->setCurSegNum(num); } - seek_internal(num, us); + // seek_internal(num, us); mIsEOS = false; mIsDataEOS = false; mError = 0; @@ -1084,7 +1142,6 @@ namespace Cicada { clearDataFrames(); if (reqReOpen) { - mDataSourceStatus = dataSource_status_invalid; resetSource(); if (mIsOpened_internal) { diff --git a/framework/demuxer/play_list/HLSStream.h b/framework/demuxer/play_list/HLSStream.h index c7dcc9c3c..6bae885bd 100644 --- a/framework/demuxer/play_list/HLSStream.h +++ b/framework/demuxer/play_list/HLSStream.h @@ -110,6 +110,18 @@ namespace Cicada { int openSegment(const string &uri); + int tryOpenSegment(const string &uri); + + int createDemuxer(); + + int readSegment(const uint8_t *buffer, int size); + + int upDateInitSection(); + + int64_t seekSegment(off_t offset, int whence); + + int updateSegment(); + bool updateIV() const; enum OpenType { @@ -126,7 +138,7 @@ namespace Cicada { }; SegmentTracker *mPTracker = nullptr; int mId = -1; - demuxer_service *mPDemuxer = nullptr; + std::unique_ptr mPDemuxer = nullptr; IDataSource *mPdataSource = nullptr; atomic_bool mIsOpened{false}; bool mIsEOS = false; //demuxer eos @@ -134,6 +146,10 @@ namespace Cicada { bool mReopen = false; bool mSwitchNeedBreak = false; std::shared_ptr mCurSeg = nullptr; + std::shared_ptr mCurInitSeg{nullptr}; + uint8_t *mInitSegBuffer{nullptr}; + int64_t mInitSegSize{0}; + int mInitSegPtr{0}; std::atomic_bool mStopOnSegEnd{false}; bool mLastReadSuccess{false}; std::mutex mDataMutex; @@ -145,7 +161,6 @@ namespace Cicada { int read_thread(); std::atomic_int mError{0}; - dataSourceStatus mDataSourceStatus = dataSource_status_invalid; int mDataSourceError = 0; int64_t mSeekPendingUs = -1; bool mIsOpened_internal = false; diff --git a/framework/demuxer/play_list/HlsParser.cpp b/framework/demuxer/play_list/HlsParser.cpp index 979ce689a..79cad7a38 100644 --- a/framework/demuxer/play_list/HlsParser.cpp +++ b/framework/demuxer/play_list/HlsParser.cpp @@ -169,6 +169,8 @@ namespace Cicada { const ValuesListTag *ctx_extinf = nullptr; std::list::const_iterator it; + std::shared_ptr curInitSegment = nullptr; + for (it = tagslist.begin(); it != tagslist.end(); ++it) { const Tag *tag = *it; @@ -177,7 +179,7 @@ namespace Cicada { case SingleValueTag::EXTXMEDIASEQUENCE: { sequenceNumber = (static_cast(tag))->getValue().decimal(); } - break; + break; case ValuesListTag::EXTINF: { ctx_extinf = static_cast(tag); @@ -219,6 +221,7 @@ namespace Cicada { // segment->utcTime = absReferenceTime; // absReferenceTime += nzDuration; // } + pSegment->init_section = curInitSegment; segmentList->addSegment(pSegment); if (ctx_byterange) { @@ -311,27 +314,26 @@ namespace Cicada { encryption.iv.clear(); } } - break; + break; -// case AttributesTag::EXTXMAP: { -// const AttributesTag *keytag = static_cast(tag); -// const Attribute *uriAttr; -// if (keytag && (uriAttr = keytag->getAttributeByName("URI")) && -// !segmentList->initialisationSegment.Get()) /* FIXME: handle discontinuities */ -// { -// InitSegment *initSegment = new(std::nothrow) InitSegment(rep); -// if (initSegment) { -// initSegment->setSourceUrl(uriAttr->quotedString()); -// const Attribute *byterangeAttr = keytag->getAttributeByName("BYTERANGE"); -// if (byterangeAttr) { -// const std::pair range = byterangeAttr->unescapeQuotes().getByteRange(); -// initSegment->setByteRange(range.first, range.first + range.second - 1); -// } -// segmentList->initialisationSegment.Set(initSegment); -// } -// } -// } -// break; + case AttributesTag::EXTXMAP: { + const AttributesTag *keytag = static_cast(tag); + const Attribute *uriAttr; + if (keytag && (uriAttr = keytag->getAttributeByName("URI"))) { + + curInitSegment = std::make_shared(sequenceNumber++); + if (curInitSegment) { + curInitSegment->setSourceUrl(uriAttr->quotedString()); + const Attribute *byterangeAttr = keytag->getAttributeByName("BYTERANGE"); + if (byterangeAttr) { + const std::pair range = byterangeAttr->unescapeQuotes().getByteRange(); + // initSegment->setByteRange(range.first, range.first + range.second - 1); + } + segmentList->addInitSegment(curInitSegment); + } + } + } + break; case Tag::EXTXDISCONTINUITY: discontinuity = true; diff --git a/framework/demuxer/play_list/SegmentList.h b/framework/demuxer/play_list/SegmentList.h index d92c92d34..1c8dd4ae1 100644 --- a/framework/demuxer/play_list/SegmentList.h +++ b/framework/demuxer/play_list/SegmentList.h @@ -20,7 +20,7 @@ namespace Cicada{ ~SegmentList(); - std::list> & getSegments(); + std::list> &getSegments(); std::shared_ptr getSegmentByNumber(uint64_t number); @@ -28,7 +28,12 @@ namespace Cicada{ int getRemainSegmentAfterNumber(uint64_t number); - void addSegment(const std::shared_ptr& seg); + void addSegment(const std::shared_ptr &seg); + + void addInitSegment(const std::shared_ptr &seg) + { + initSegment.push_back(seg); + } int merge(SegmentList *pSList); @@ -47,6 +52,8 @@ namespace Cicada{ uint64_t mNextStartTime = 0; + std::vector> initSegment; + }; } diff --git a/framework/demuxer/play_list/segment.h b/framework/demuxer/play_list/segment.h index 316a22c39..d08e1e0f6 100644 --- a/framework/demuxer/play_list/segment.h +++ b/framework/demuxer/play_list/segment.h @@ -37,6 +37,8 @@ namespace Cicada{ static const int SEQUENCE_FIRST; SegmentEncryption encryption; + std::shared_ptr init_section{nullptr}; + }; }