Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Refactor CRcvQueue::storePkt(..) for better resource management. #2775

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4817,7 +4817,7 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
// XXX Problem around CONN_CONFUSED!
// If some too-eager packets were received from a listener
// that thinks it's connected, but his last handshake was missed,
// they are collected by CRcvQueue::storePkt. The removeConnector
// they are collected by CRcvQueue::storePktClone. The removeConnector
// function will want to delete them all, so it would be nice
// if these packets can be re-delivered. Of course the listener
// should be prepared to resend them (as every packet can be lost
Expand Down
8 changes: 3 additions & 5 deletions srtcore/packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ void CPacket::deallocate()
if (m_data_owned)
delete[](char*) m_PacketVector[PV_DATA].data();
m_PacketVector[PV_DATA].set(NULL, 0);
m_data_owned = false;
}

char* CPacket::release()
Expand All @@ -241,8 +242,7 @@ CPacket::~CPacket()
{
// PV_HEADER is always owned, PV_DATA may use a "borrowed" buffer.
// Delete the internal buffer only if it was declared as owned.
if (m_data_owned)
delete[](char*) m_PacketVector[PV_DATA].data();
deallocate();
}

size_t CPacket::getLength() const
Expand Down Expand Up @@ -561,10 +561,8 @@ CPacket* CPacket::clone() const
{
CPacket* pkt = new CPacket;
memcpy((pkt->m_nHeader), m_nHeader, HDR_SIZE);
pkt->m_pcData = new char[m_PacketVector[PV_DATA].size()];
pkt->allocate(m_PacketVector[PV_DATA].size());
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
memcpy((pkt->m_pcData), m_pcData, m_PacketVector[PV_DATA].size());
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
pkt->m_PacketVector[PV_DATA].setLength(m_PacketVector[PV_DATA].size());

pkt->m_DestAddr = m_DestAddr;

return pkt;
Expand Down
25 changes: 8 additions & 17 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,6 @@ srt::CRcvQueue::~CRcvQueue()
while (!i->second.empty())
{
CPacket* pkt = i->second.front();
delete[] pkt->m_pcData;
delete pkt;
i->second.pop();
}
Expand Down Expand Up @@ -1365,14 +1364,12 @@ srt::EReadStatus srt::CRcvQueue::worker_RetrieveUnit(int32_t& w_id, CUnit*& w_un
{
// no space, skip this packet
CPacket temp;
temp.m_pcData = new char[m_szPayloadSize];
temp.setLength(m_szPayloadSize);
temp.allocate(m_szPayloadSize);
THREAD_PAUSED();
EReadStatus rst = m_pChannel->recvfrom((w_addr), (temp));
THREAD_RESUMED();
// Note: this will print nothing about the packet details unless heavy logging is on.
LOGC(qrlog.Error, log << CONID() << "LOCAL STORAGE DEPLETED. Dropping 1 packet: " << temp.Info());
delete[] temp.m_pcData;

// Be transparent for RST_ERROR, but ignore the correct
// data read and fake that the packet was dropped.
Expand Down Expand Up @@ -1541,7 +1538,7 @@ srt::EConnectStatus srt::CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUni
if (cst == CONN_CONFUSED)
{
LOGC(cnlog.Warn, log << "AsyncOrRND: PACKET NOT HANDSHAKE - re-requesting handshake from peer");
storePkt(id, unit->m_Packet.clone());
storePktClone(id, unit->m_Packet);
if (!u->processAsyncConnectRequest(RST_AGAIN, CONN_CONTINUE, &unit->m_Packet, u->m_PeerAddr))
{
// Reuse previous behavior to reject a packet
Expand Down Expand Up @@ -1616,7 +1613,7 @@ srt::EConnectStatus srt::CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUni
log << "AsyncOrRND: packet RESOLVED TO ID=" << id << " -- continuing through CENTRAL PACKET QUEUE");
// This is where also the packets for rendezvous connection will be landing,
// in case of a synchronous connection.
storePkt(id, unit->m_Packet.clone());
storePktClone(id, unit->m_Packet);

return CONN_CONTINUE;
}
Expand Down Expand Up @@ -1680,7 +1677,6 @@ int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
w_packet.setLength(newpkt->getLength());
w_packet.m_DestAddr = newpkt->m_DestAddr;

delete[] newpkt->m_pcData;
delete newpkt;

// remove this message from queue,
Expand Down Expand Up @@ -1735,7 +1731,6 @@ void srt::CRcvQueue::removeConnector(const SRTSOCKET& id)
log << "removeConnector: ... and its packet queue with " << i->second.size() << " packets collected");
while (!i->second.empty())
{
delete[] i->second.front()->m_pcData;
delete i->second.front();
i->second.pop();
}
Expand Down Expand Up @@ -1768,34 +1763,30 @@ srt::CUDT* srt::CRcvQueue::getNewEntry()
return u;
}

void srt::CRcvQueue::storePkt(int32_t id, CPacket* pkt)
void srt::CRcvQueue::storePktClone(int32_t id, const CPacket& pkt)
{
CUniqueSync passcond(m_BufferLock, m_BufferCond);

map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);

if (i == m_mBuffer.end())
{
m_mBuffer[id].push(pkt);
m_mBuffer[id].push(pkt.clone());
passcond.notify_one();
}
else
{
// avoid storing too many packets, in case of malfunction or attack
// Avoid storing too many packets, in case of malfunction or attack.
if (i->second.size() > 16)
{
delete[] pkt->m_pcData;
delete pkt;
return;
}

i->second.push(pkt);
i->second.push(pkt.clone());
}
}

void srt::CMultiplexer::destroy()
{
// Reverse order of the assigned
// Reverse order of the assigned.
delete m_pRcvQueue;
delete m_pSndQueue;
delete m_pTimer;
Expand Down
2 changes: 1 addition & 1 deletion srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ class CRcvQueue
bool ifNewEntry();
CUDT* getNewEntry();

void storePkt(int32_t id, CPacket* pkt);
void storePktClone(int32_t id, const CPacket& pkt);

private:
sync::Mutex m_LSLock;
Expand Down