From 0b2496ac5060d6319ae9d1b1c7e5f9a6719c4c19 Mon Sep 17 00:00:00 2001 From: pingkai Date: Thu, 13 Feb 2020 13:35:52 +0800 Subject: [PATCH] feat(curl_data_source): supprot muti connect ringBuffer support seek back Signed-off-by: pingkai --- CMakeLists.txt | 2 +- cmdline/cicadaPlayer.cpp | 1 + framework/codec/Apple/AppleVideoToolBox.cpp | 2 +- framework/data_source/curl/CURLConnection.cpp | 50 +++++--- .../data_source/curl/curl_data_source.cpp | 84 ++++++++++++-- framework/data_source/curl/curl_data_source.h | 4 + framework/demuxer/avFormatDemuxer.cpp | 2 +- framework/utils/CMakeLists.txt | 2 +- .../utils/{ringBuffer.c => ringBuffer.cpp} | 109 ++++++++++++++---- framework/utils/ringBuffer.h | 3 + 10 files changed, 208 insertions(+), 51 deletions(-) rename framework/utils/{ringBuffer.c => ringBuffer.cpp} (55%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b06df4a1..d7efebece 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.15) project(CicadaMedia) -#set(USEASAN ON) +set(USEASAN ON) #set(USETSAN ON) #set(USEUBSAN ON) if (${CMAKE_SYSTEM_NAME} MATCHES "Linux") diff --git a/cmdline/cicadaPlayer.cpp b/cmdline/cicadaPlayer.cpp index 63e252db2..8158ca14e 100644 --- a/cmdline/cicadaPlayer.cpp +++ b/cmdline/cicadaPlayer.cpp @@ -90,6 +90,7 @@ int main(int argc, const char **argv) } log_enable_color(1); + log_set_level(AF_LOG_LEVEL_TRACE, 1); cicadaCont cicada{}; unique_ptr player = unique_ptr(new MediaPlayer()); cicada.player = player.get(); diff --git a/framework/codec/Apple/AppleVideoToolBox.cpp b/framework/codec/Apple/AppleVideoToolBox.cpp index 2dc02022b..219ab0a8c 100644 --- a/framework/codec/Apple/AppleVideoToolBox.cpp +++ b/framework/codec/Apple/AppleVideoToolBox.cpp @@ -563,7 +563,7 @@ namespace Cicada { assert(mParser != nullptr); mParser->parser(packet->getData(), packet->getSize()); int poc = mParser->getPOC(); - assert(poc >= 0); + // assert(poc >= 0); // AF_LOGD("poc is %d\n", poc); if (poc == 0) { diff --git a/framework/data_source/curl/CURLConnection.cpp b/framework/data_source/curl/CURLConnection.cpp index e4822fb59..85af27a13 100644 --- a/framework/data_source/curl/CURLConnection.cpp +++ b/framework/data_source/curl/CURLConnection.cpp @@ -14,7 +14,8 @@ using namespace std; using namespace Cicada; #define MAX_RESPONSE_SIZE 1024 -#define RINGBUFFER_SIZE 1024*16 +#define RINGBUFFER_SIZE 1024*256 +#define RINGBUFFER_BACK_SIZE 1024*512 #define MIN_SO_RCVBUF_SIZE 1024*64 @@ -46,7 +47,8 @@ static int getErrorCode(const CURLcode &CURLResult) Cicada::CURLConnection::CURLConnection(Cicada::IDataSource::SourceConfig *pConfig) { mHttp_handle = curl_easy_init(); - pRbuf = RingBufferCreate(RINGBUFFER_SIZE); + pRbuf = RingBufferCreate(RINGBUFFER_SIZE + RINGBUFFER_BACK_SIZE); + RingBufferSetBackSize(pRbuf, RINGBUFFER_BACK_SIZE); m_bFirstLoop = 1; mPConfig = pConfig; @@ -287,8 +289,8 @@ int CURLConnection::my_trace(CURL *handle, curl_infotype type, switch (type) { case CURLINFO_TEXT: - AF_LOGD("== Info: %s", data); + // AF_LOGD("== Info: %s", data); default: /* in case a new one is introduced to shock us */ return 0; @@ -317,7 +319,7 @@ int CURLConnection::my_trace(CURL *handle, curl_infotype type, break; } - AF_LOGD("%s\n", text); + // AF_LOGD("%s\n", text); return 0; } @@ -399,8 +401,8 @@ int CURLConnection::FillBuffer(uint32_t want) overflowSize)); if (p == nullptr) { - AF_LOGE("av_realloc error\n"); - return -ENOMEM; + // AF_LOGE("av_realloc error\n"); + // return -ENOMEM; } pOverflowBuffer = p; @@ -621,22 +623,34 @@ int CURLConnection::FillBuffer(uint32_t want) int CURLConnection::short_seek(int64_t off) { - uint32_t m_bufferSize = 1024 * 512; + uint32_t m_bufferSize = 1024 * 64; int ret; + int64_t delta = off - mFilePos; - if (FITS_INT(off - mFilePos) && - RingBufferSkipBytes(pRbuf, (int) (off - mFilePos))) { + if (delta < 0) { + if (RingBufferSkipBytes(pRbuf, (int) delta)) { + mFilePos = off; + return 0; + } else { + return -1; + } + } + + if (RingBufferSkipBytes(pRbuf, (int) delta)) { mFilePos = off; return 0; } - if (off > mFilePos && off < mFilePos + m_bufferSize) { + if (off < mFilePos + m_bufferSize) { int len = RingBuffergetMaxReadSize(pRbuf); - mFilePos += len; - RingBufferSkipBytes(pRbuf, len); + + if (len > 0) { + mFilePos += len; + RingBufferSkipBytes(pRbuf, len); + } if ((ret = FillBuffer(m_bufferSize)) < 0) { - if (!RingBufferSkipBytes(pRbuf, -len)) { + if (len && !RingBufferSkipBytes(pRbuf, -len)) { AF_LOGE("%s - Failed to restore position after failed fill", __FUNCTION__); } else { mFilePos -= len; @@ -645,11 +659,12 @@ int CURLConnection::short_seek(int64_t off) return ret; } - if (!FITS_INT(off - mFilePos) || - !RingBufferSkipBytes(pRbuf, (int) (off - mFilePos))) { - AF_LOGE("%s - Failed to skip to position after having filled buffer", __FUNCTION__); + AF_LOGI("read buffer size %d need is %d\n", RingBuffergetMaxReadSize(pRbuf), delta - len); + + if (!RingBufferSkipBytes(pRbuf, (int) (delta - len))) { + AF_LOGI("%s - Failed to skip to position after having filled buffer", __FUNCTION__); - if (!RingBufferSkipBytes(pRbuf, -len)) { + if (len && !RingBufferSkipBytes(pRbuf, -len)) { AF_LOGE("%s - Failed to restore position after failed seek", __FUNCTION__); } else { mFilePos -= len; @@ -672,6 +687,7 @@ int CURLConnection::readBuffer(void *buf, size_t size) /*if (re == CURLE_OK) av_log(mCurlhttpContext.hd,AV_LOG_DEBUG,"download speed is %f\n",downloadSpeed);*/ uint32_t want = std::min(RingBuffergetMaxReadSize(pRbuf), (uint32_t) size); + assert(want > 0); if (RingBufferReadData(pRbuf, (char *) buf, want) == want) { mFilePos += want; diff --git a/framework/data_source/curl/curl_data_source.cpp b/framework/data_source/curl/curl_data_source.cpp index 1534af3cc..6dc0f9ee0 100644 --- a/framework/data_source/curl/curl_data_source.cpp +++ b/framework/data_source/curl/curl_data_source.cpp @@ -61,7 +61,7 @@ int CurlDataSource::curl_connect(CURLConnection *pConnection, int64_t filePos) char *ipstr = nullptr; double length; long response; - AF_LOGD("start connect\n"); + AF_LOGD("start connect %lld\n", filePos); pConnection->SetResume(filePos); pConnection->start(); @@ -136,6 +136,7 @@ static void clean_curl() CurlDataSource::CurlDataSource(const string &url) : IDataSource(url) { mFileSize = -1; + mConnections = new std::vector(); } CurlDataSource::~CurlDataSource() @@ -161,7 +162,7 @@ int CurlDataSource::Open(int flags) if (headerList) { curl_slist_free_all(headerList); - headerList = NULL; + headerList = nullptr; } std::vector &customHeaders = mConfig.customHeaders; @@ -204,9 +205,6 @@ int CurlDataSource::Open(const string &url) return Open(0); } -// if (curlContext.still_running){ -// return -1; -// } mOpenTimeMS = af_gettime_relative() / 1000; mPConnection->disconnect(); bool isRTMP = url.compare(0, 7, "rtmp://") == 0; @@ -220,25 +218,50 @@ int CurlDataSource::Open(const string &url) fillConnectInfo(); } + closeConnections(false); + mConnections = new std::vector(); return ret; } void CurlDataSource::Close() { - std::lock_guard lock(mMutex); - CURLConnection *deleteConnection = mPConnection; + closeConnections(true); +} + +void CurlDataSource::closeConnections(bool current) +{ + lock_guard lock(mMutex); + CURLConnection *deleteConnection = nullptr; + vector *pConnections = mConnections; mPConnection = nullptr; + mConnections = nullptr; + + if (current) { + deleteConnection = mPConnection; + mPConnection = nullptr; + } if (deleteConnection) { AsyncJob::Instance()->addJob([deleteConnection] { delete deleteConnection; }); } + + if (pConnections) { + AsyncJob::Instance()->addJob([pConnections] { + for (auto item = pConnections->begin(); item != pConnections->end();) + { + delete *item; + item = pConnections->erase(item); + } + delete pConnections; + }); + } } int64_t CurlDataSource::Seek(int64_t offset, int whence) { - //AF_LOGD("CurlDataSource::Seek position is %lld,when is %d",offset,whence); +// AF_LOGD("CurlDataSource::Seek position is %lld,when is %d", offset, whence); if (whence == SEEK_SIZE) { return mFileSize; } else if ((whence == SEEK_CUR && offset == 0) || @@ -264,9 +287,46 @@ int64_t CurlDataSource::Seek(int64_t offset, int whence) return offset; } + if (offset > mFileSize) { + return -1; + } + + if (offset == mFileSize) { + } + //first seek in cache if (mPConnection->short_seek(offset) >= 0) { + AF_LOGI("short seek ok\n"); + return offset; + } else { + AF_LOGI("short seek filed\n"); + } + + CURLConnection *con = nullptr; + + for (auto item = mConnections->begin(); item != mConnections->end();) { + if ((*(item))->short_seek(offset) >= 0) { + con = *item; + item = mConnections->erase(item); + break; + } else { + ++item; + } + } + + if (con) { + mConnections->push_back(mPConnection); + + if (mConnections->size() > max_connection) { + delete mConnections->front(); + mConnections->erase(mConnections->begin()); + } + + mPConnection = con; + AF_LOGW("short seek ok\n"); return offset; + } else { + AF_LOGW("short seek filed\n"); } int64_t ret = TrySeekByNewConnection(offset); @@ -283,7 +343,13 @@ int64_t CurlDataSource::TrySeekByNewConnection(int64_t offset) if (ret >= 0) { std::lock_guard lock(mMutex); // try seek ok, use the new connection - delete mPConnection; + mConnections->push_back(mPConnection); + + if (mConnections->size() > max_connection) { + delete mConnections->front(); + mConnections->erase(mConnections->begin()); + } + mPConnection = pConnection_s; return offset; } diff --git a/framework/data_source/curl/curl_data_source.h b/framework/data_source/curl/curl_data_source.h index 2620bbf0f..8afff17de 100644 --- a/framework/data_source/curl/curl_data_source.h +++ b/framework/data_source/curl/curl_data_source.h @@ -63,6 +63,7 @@ namespace Cicada { static CurlDataSource se; private: + const static int max_connection = 1; std::string mLocation; int64_t mFileSize = -1; CURLConnection *mPConnection = nullptr; @@ -80,6 +81,9 @@ namespace Cicada { std::mutex mMutex; std::string mConnectInfo; bool mBDummy = false; + std::vector* mConnections {nullptr}; + + void closeConnections(bool current); }; } diff --git a/framework/demuxer/avFormatDemuxer.cpp b/framework/demuxer/avFormatDemuxer.cpp index 3cf025e93..4219422e3 100644 --- a/framework/demuxer/avFormatDemuxer.cpp +++ b/framework/demuxer/avFormatDemuxer.cpp @@ -80,7 +80,7 @@ namespace Cicada { int64_t startTime = af_getsteady_ms(); bool use_filename = false; - if (mReadCb != nullptr) { + if (mReadCb != nullptr ) { uint8_t *read_buffer = static_cast(av_malloc(INITIAL_BUFFER_SIZE)); mPInPutPb = avio_alloc_context(read_buffer, INITIAL_BUFFER_SIZE, 0, mUserArg, mReadCb, nullptr, mSeekCb); diff --git a/framework/utils/CMakeLists.txt b/framework/utils/CMakeLists.txt index f32c059bf..743717f91 100644 --- a/framework/utils/CMakeLists.txt +++ b/framework/utils/CMakeLists.txt @@ -20,7 +20,7 @@ set(SOURCE_FILES af_string.cpp errors/framework_error.c af_clock.cpp - ringBuffer.c + ringBuffer.cpp cJSON.c globalSettings.cpp property.cpp diff --git a/framework/utils/ringBuffer.c b/framework/utils/ringBuffer.cpp similarity index 55% rename from framework/utils/ringBuffer.c rename to framework/utils/ringBuffer.cpp index 79d75f302..1fff6bcbc 100644 --- a/framework/utils/ringBuffer.c +++ b/framework/utils/ringBuffer.cpp @@ -6,38 +6,55 @@ // add lock #include "ringBuffer.h" -#include -#include +#include "frame_work_log.h" +#include +#include +#include +#include + +using namespace std; typedef struct RingBuffer_t { char *m_buffer; unsigned int m_size; unsigned int m_readPtr; unsigned int m_writePtr; - unsigned int m_fillCount; + atomic_uint32_t m_fillCount; + unsigned int m_back_size; + atomic_uint32_t m_backCount; } RingBuffer; RingBuffer *RingBufferCreate(uint32_t size) { RingBuffer *rBuf; - char *buf = malloc(size); - if (!buf) - return NULL; - rBuf = malloc(sizeof(RingBuffer)); + char *buf = (char *) malloc(size); + + if (!buf) { + return nullptr; + } + + rBuf = (RingBuffer *) malloc(sizeof(RingBuffer)); memset(rBuf, 0, sizeof(RingBuffer)); rBuf->m_size = size; rBuf->m_buffer = buf; + rBuf->m_back_size = 0; + rBuf->m_backCount = 0; return rBuf; } +void RingBufferSetBackSize(RingBuffer *rBuf, uint32_t size) +{ + rBuf->m_back_size = size; +} + void RingBufferDestroy(RingBuffer *rBuf) { - if (rBuf->m_buffer != NULL) { + if (rBuf->m_buffer != nullptr) { free(rBuf->m_buffer); - rBuf->m_buffer = NULL; + rBuf->m_buffer = nullptr; } + free(rBuf); - return; } void RingBufferClear(RingBuffer *rBuf) @@ -45,67 +62,112 @@ void RingBufferClear(RingBuffer *rBuf) rBuf->m_readPtr = 0; rBuf->m_writePtr = 0; rBuf->m_fillCount = 0; - return; + rBuf->m_backCount = 0; } uint32_t RingBufferReadData(RingBuffer *rBuf, char *buf, uint32_t size) { - if (size > rBuf->m_fillCount) { + if (size > rBuf->m_fillCount.load()) { return 0; } + if (size + rBuf->m_readPtr > rBuf->m_size) { unsigned int chunk = rBuf->m_size - rBuf->m_readPtr; memcpy(buf, rBuf->m_buffer + rBuf->m_readPtr, chunk); memcpy(buf + chunk, rBuf->m_buffer, size - chunk); rBuf->m_readPtr = size - chunk; + assert(rBuf->m_readPtr <= rBuf->m_size); } else { memcpy(buf, rBuf->m_buffer + rBuf->m_readPtr, size); rBuf->m_readPtr += size; + assert(rBuf->m_readPtr <= rBuf->m_size); } - if (rBuf->m_readPtr == rBuf->m_size) + + if (rBuf->m_readPtr == rBuf->m_size) { rBuf->m_readPtr = 0; + } + rBuf->m_fillCount -= size; + rBuf->m_backCount = std::min(rBuf->m_back_size, rBuf->m_backCount + size); return size; } uint32_t RingBufferWriteData(RingBuffer *rBuf, const char *buf, uint32_t size) { - if (size > rBuf->m_size - rBuf->m_fillCount) { + if (size > rBuf->m_size - rBuf->m_fillCount.load() - rBuf->m_backCount.load()) { return 0; } + if (size + rBuf->m_writePtr > rBuf->m_size) { unsigned int chunk = rBuf->m_size - rBuf->m_writePtr; memcpy(rBuf->m_buffer + rBuf->m_writePtr, buf, chunk); memcpy(rBuf->m_buffer, buf + chunk, size - chunk); rBuf->m_writePtr = size - chunk; } else { - if ((rBuf->m_buffer + rBuf->m_writePtr) != buf) + if ((rBuf->m_buffer + rBuf->m_writePtr) != buf) { memcpy(rBuf->m_buffer + rBuf->m_writePtr, buf, size); + } + rBuf->m_writePtr += size; } - if (rBuf->m_writePtr == rBuf->m_size) + + if (rBuf->m_writePtr == rBuf->m_size) { rBuf->m_writePtr = 0; + } + rBuf->m_fillCount += size; return size; } int32_t RingBufferSkipBytes(RingBuffer *rBuf, int skipSize) { + int size; + if (skipSize < 0) { - return 0; // skipping backwards is not supported + size = -skipSize; + // AF_LOGI("skip size is %d\n", skipSize); + + if (rBuf->m_backCount.load() >= size) { + rBuf->m_backCount -= size; + + if (rBuf->m_readPtr < size) { + rBuf->m_readPtr = rBuf->m_size - (size - rBuf->m_readPtr); + assert(rBuf->m_readPtr <= rBuf->m_size); + } else { + rBuf->m_readPtr -= size; + assert(rBuf->m_readPtr <= rBuf->m_size); + } + + if (rBuf->m_readPtr == rBuf->m_size) { + rBuf->m_readPtr = 0; + } + + rBuf->m_fillCount += size; + return skipSize; + } + + return 0; } - int size = skipSize; + + size = skipSize; + if (size > rBuf->m_fillCount) { return 0; } + if (size + rBuf->m_readPtr > rBuf->m_size) { unsigned int chunk = rBuf->m_size - rBuf->m_readPtr; rBuf->m_readPtr = size - chunk; + assert(rBuf->m_readPtr <= rBuf->m_size); } else { rBuf->m_readPtr += size; + assert(rBuf->m_readPtr <= rBuf->m_size); } - if (rBuf->m_readPtr == rBuf->m_size) + + if (rBuf->m_readPtr == rBuf->m_size) { rBuf->m_readPtr = 0; + } + rBuf->m_fillCount -= size; return skipSize; } @@ -132,10 +194,15 @@ unsigned int RingBuffergetWritePtr(RingBuffer *rBuf) uint32_t RingBuffergetMaxReadSize(RingBuffer *rBuf) { - return rBuf->m_fillCount; + return rBuf->m_fillCount.load(); } uint32_t RingBuffergetMaxWriteSize(RingBuffer *rBuf) { - return rBuf->m_size - rBuf->m_fillCount; + return rBuf->m_size - rBuf->m_fillCount.load() - rBuf->m_backCount.load(); +} + +uint32_t RingBuffergetMaxBackSize(RingBuffer *rBuf) +{ + return rBuf->m_backCount; } \ No newline at end of file diff --git a/framework/utils/ringBuffer.h b/framework/utils/ringBuffer.h index 7ab6c46de..d445b09c6 100644 --- a/framework/utils/ringBuffer.h +++ b/framework/utils/ringBuffer.h @@ -15,6 +15,8 @@ typedef struct RingBuffer_t RingBuffer; RingBuffer *RingBufferCreate(uint32_t size); +void RingBufferSetBackSize(RingBuffer *rBuf,uint32_t size); + void RingBufferDestroy(RingBuffer *rBuf); void RingBufferClear(RingBuffer *rBuf); @@ -36,6 +38,7 @@ unsigned int RingBuffergetWritePtr(RingBuffer *rBuf); uint32_t RingBuffergetMaxReadSize(RingBuffer *rBuf); uint32_t RingBuffergetMaxWriteSize(RingBuffer *rBuf); +uint32_t RingBuffergetMaxBackSize(RingBuffer *rBuf); #ifdef __cplusplus }