Skip to content

Commit

Permalink
feat(curl_data_source): supprot muti connect
Browse files Browse the repository at this point in the history
ringBuffer  support seek back

Signed-off-by: pingkai <pingkai010@gmail.com>
  • Loading branch information
pingkai committed Feb 24, 2020
1 parent 49d7694 commit 0b2496a
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 51 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.15)
project(CicadaMedia)

#set(USEASAN ON)
set(USEASAN ON)
#set(USETSAN ON)
#set(USEUBSAN ON)
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
Expand Down
1 change: 1 addition & 0 deletions cmdline/cicadaPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ int main(int argc, const char **argv)
}

log_enable_color(1);
log_set_level(AF_LOG_LEVEL_TRACE, 1);
cicadaCont cicada{};
unique_ptr<MediaPlayer> player = unique_ptr<MediaPlayer>(new MediaPlayer());
cicada.player = player.get();
Expand Down
2 changes: 1 addition & 1 deletion framework/codec/Apple/AppleVideoToolBox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ namespace Cicada {
assert(mParser != nullptr);
mParser->parser(packet->getData(), packet->getSize());
int poc = mParser->getPOC();
assert(poc >= 0);
// assert(poc >= 0);
// AF_LOGD("poc is %d\n", poc);

if (poc == 0) {
Expand Down
50 changes: 33 additions & 17 deletions framework/data_source/curl/CURLConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ using namespace std;
using namespace Cicada;
#define MAX_RESPONSE_SIZE 1024

#define RINGBUFFER_SIZE 1024*16
#define RINGBUFFER_SIZE 1024*256
#define RINGBUFFER_BACK_SIZE 1024*512

#define MIN_SO_RCVBUF_SIZE 1024*64

Expand Down Expand Up @@ -46,7 +47,8 @@ static int getErrorCode(const CURLcode &CURLResult)
Cicada::CURLConnection::CURLConnection(Cicada::IDataSource::SourceConfig *pConfig)
{
mHttp_handle = curl_easy_init();
pRbuf = RingBufferCreate(RINGBUFFER_SIZE);
pRbuf = RingBufferCreate(RINGBUFFER_SIZE + RINGBUFFER_BACK_SIZE);
RingBufferSetBackSize(pRbuf, RINGBUFFER_BACK_SIZE);
m_bFirstLoop = 1;
mPConfig = pConfig;

Expand Down Expand Up @@ -287,8 +289,8 @@ int CURLConnection::my_trace(CURL *handle, curl_infotype type,

switch (type) {
case CURLINFO_TEXT:
AF_LOGD("== Info: %s", data);

// AF_LOGD("== Info: %s", data);
default: /* in case a new one is introduced to shock us */
return 0;

Expand Down Expand Up @@ -317,7 +319,7 @@ int CURLConnection::my_trace(CURL *handle, curl_infotype type,
break;
}

AF_LOGD("%s\n", text);
// AF_LOGD("%s\n", text);
return 0;
}

Expand Down Expand Up @@ -399,8 +401,8 @@ int CURLConnection::FillBuffer(uint32_t want)
overflowSize));

if (p == nullptr) {
AF_LOGE("av_realloc error\n");
return -ENOMEM;
// AF_LOGE("av_realloc error\n");
// return -ENOMEM;
}

pOverflowBuffer = p;
Expand Down Expand Up @@ -621,22 +623,34 @@ int CURLConnection::FillBuffer(uint32_t want)

int CURLConnection::short_seek(int64_t off)
{
uint32_t m_bufferSize = 1024 * 512;
uint32_t m_bufferSize = 1024 * 64;
int ret;
int64_t delta = off - mFilePos;

if (FITS_INT(off - mFilePos) &&
RingBufferSkipBytes(pRbuf, (int) (off - mFilePos))) {
if (delta < 0) {
if (RingBufferSkipBytes(pRbuf, (int) delta)) {
mFilePos = off;
return 0;
} else {
return -1;
}
}

if (RingBufferSkipBytes(pRbuf, (int) delta)) {
mFilePos = off;
return 0;
}

if (off > mFilePos && off < mFilePos + m_bufferSize) {
if (off < mFilePos + m_bufferSize) {
int len = RingBuffergetMaxReadSize(pRbuf);
mFilePos += len;
RingBufferSkipBytes(pRbuf, len);

if (len > 0) {
mFilePos += len;
RingBufferSkipBytes(pRbuf, len);
}

if ((ret = FillBuffer(m_bufferSize)) < 0) {
if (!RingBufferSkipBytes(pRbuf, -len)) {
if (len && !RingBufferSkipBytes(pRbuf, -len)) {
AF_LOGE("%s - Failed to restore position after failed fill", __FUNCTION__);
} else {
mFilePos -= len;
Expand All @@ -645,11 +659,12 @@ int CURLConnection::short_seek(int64_t off)
return ret;
}

if (!FITS_INT(off - mFilePos) ||
!RingBufferSkipBytes(pRbuf, (int) (off - mFilePos))) {
AF_LOGE("%s - Failed to skip to position after having filled buffer", __FUNCTION__);
AF_LOGI("read buffer size %d need is %d\n", RingBuffergetMaxReadSize(pRbuf), delta - len);

if (!RingBufferSkipBytes(pRbuf, (int) (delta - len))) {
AF_LOGI("%s - Failed to skip to position after having filled buffer", __FUNCTION__);

if (!RingBufferSkipBytes(pRbuf, -len)) {
if (len && !RingBufferSkipBytes(pRbuf, -len)) {
AF_LOGE("%s - Failed to restore position after failed seek", __FUNCTION__);
} else {
mFilePos -= len;
Expand All @@ -672,6 +687,7 @@ int CURLConnection::readBuffer(void *buf, size_t size)
/*if (re == CURLE_OK)
av_log(mCurlhttpContext.hd,AV_LOG_DEBUG,"download speed is %f\n",downloadSpeed);*/
uint32_t want = std::min(RingBuffergetMaxReadSize(pRbuf), (uint32_t) size);
assert(want > 0);

if (RingBufferReadData(pRbuf, (char *) buf, want) == want) {
mFilePos += want;
Expand Down
84 changes: 75 additions & 9 deletions framework/data_source/curl/curl_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ int CurlDataSource::curl_connect(CURLConnection *pConnection, int64_t filePos)
char *ipstr = nullptr;
double length;
long response;
AF_LOGD("start connect\n");
AF_LOGD("start connect %lld\n", filePos);
pConnection->SetResume(filePos);
pConnection->start();

Expand Down Expand Up @@ -136,6 +136,7 @@ static void clean_curl()
CurlDataSource::CurlDataSource(const string &url) : IDataSource(url)
{
mFileSize = -1;
mConnections = new std::vector<CURLConnection *>();
}

CurlDataSource::~CurlDataSource()
Expand All @@ -161,7 +162,7 @@ int CurlDataSource::Open(int flags)

if (headerList) {
curl_slist_free_all(headerList);
headerList = NULL;
headerList = nullptr;
}

std::vector<std::string> &customHeaders = mConfig.customHeaders;
Expand Down Expand Up @@ -204,9 +205,6 @@ int CurlDataSource::Open(const string &url)
return Open(0);
}

// if (curlContext.still_running){
// return -1;
// }
mOpenTimeMS = af_gettime_relative() / 1000;
mPConnection->disconnect();
bool isRTMP = url.compare(0, 7, "rtmp://") == 0;
Expand All @@ -220,25 +218,50 @@ int CurlDataSource::Open(const string &url)
fillConnectInfo();
}

closeConnections(false);
mConnections = new std::vector<CURLConnection *>();
return ret;
}

void CurlDataSource::Close()
{
std::lock_guard<std::mutex> lock(mMutex);
CURLConnection *deleteConnection = mPConnection;
closeConnections(true);
}

void CurlDataSource::closeConnections(bool current)
{
lock_guard<mutex> lock(mMutex);
CURLConnection *deleteConnection = nullptr;
vector<CURLConnection *> *pConnections = mConnections;
mPConnection = nullptr;
mConnections = nullptr;

if (current) {
deleteConnection = mPConnection;
mPConnection = nullptr;
}

if (deleteConnection) {
AsyncJob::Instance()->addJob([deleteConnection] {
delete deleteConnection;
});
}

if (pConnections) {
AsyncJob::Instance()->addJob([pConnections] {
for (auto item = pConnections->begin(); item != pConnections->end();)
{
delete *item;
item = pConnections->erase(item);
}
delete pConnections;
});
}
}

int64_t CurlDataSource::Seek(int64_t offset, int whence)
{
//AF_LOGD("CurlDataSource::Seek position is %lld,when is %d",offset,whence);
// AF_LOGD("CurlDataSource::Seek position is %lld,when is %d", offset, whence);
if (whence == SEEK_SIZE) {
return mFileSize;
} else if ((whence == SEEK_CUR && offset == 0) ||
Expand All @@ -264,9 +287,46 @@ int64_t CurlDataSource::Seek(int64_t offset, int whence)
return offset;
}

if (offset > mFileSize) {
return -1;
}

if (offset == mFileSize) {
}

//first seek in cache
if (mPConnection->short_seek(offset) >= 0) {
AF_LOGI("short seek ok\n");
return offset;
} else {
AF_LOGI("short seek filed\n");
}

CURLConnection *con = nullptr;

for (auto item = mConnections->begin(); item != mConnections->end();) {
if ((*(item))->short_seek(offset) >= 0) {
con = *item;
item = mConnections->erase(item);
break;
} else {
++item;
}
}

if (con) {
mConnections->push_back(mPConnection);

if (mConnections->size() > max_connection) {
delete mConnections->front();
mConnections->erase(mConnections->begin());
}

mPConnection = con;
AF_LOGW("short seek ok\n");
return offset;
} else {
AF_LOGW("short seek filed\n");
}

int64_t ret = TrySeekByNewConnection(offset);
Expand All @@ -283,7 +343,13 @@ int64_t CurlDataSource::TrySeekByNewConnection(int64_t offset)
if (ret >= 0) {
std::lock_guard<std::mutex> lock(mMutex);
// try seek ok, use the new connection
delete mPConnection;
mConnections->push_back(mPConnection);

if (mConnections->size() > max_connection) {
delete mConnections->front();
mConnections->erase(mConnections->begin());
}

mPConnection = pConnection_s;
return offset;
}
Expand Down
4 changes: 4 additions & 0 deletions framework/data_source/curl/curl_data_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ namespace Cicada {
static CurlDataSource se;

private:
const static int max_connection = 1;
std::string mLocation;
int64_t mFileSize = -1;
CURLConnection *mPConnection = nullptr;
Expand All @@ -80,6 +81,9 @@ namespace Cicada {
std::mutex mMutex;
std::string mConnectInfo;
bool mBDummy = false;
std::vector<CURLConnection *>* mConnections {nullptr};

void closeConnections(bool current);
};
}

Expand Down
2 changes: 1 addition & 1 deletion framework/demuxer/avFormatDemuxer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ namespace Cicada {
int64_t startTime = af_getsteady_ms();
bool use_filename = false;

if (mReadCb != nullptr) {
if (mReadCb != nullptr ) {
uint8_t *read_buffer = static_cast<uint8_t *>(av_malloc(INITIAL_BUFFER_SIZE));
mPInPutPb = avio_alloc_context(read_buffer, INITIAL_BUFFER_SIZE, 0, mUserArg, mReadCb, nullptr, mSeekCb);

Expand Down
2 changes: 1 addition & 1 deletion framework/utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ set(SOURCE_FILES
af_string.cpp
errors/framework_error.c
af_clock.cpp
ringBuffer.c
ringBuffer.cpp
cJSON.c
globalSettings.cpp
property.cpp
Expand Down
Loading

0 comments on commit 0b2496a

Please sign in to comment.