Minor refactor of IncomingSplitBuffer

This commit is contained in:
sfan5 2019-08-15 20:14:44 +02:00
parent fc2f55d931
commit 428a4c86e3
2 changed files with 60 additions and 35 deletions

@ -386,6 +386,48 @@ std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
return timed_outs; return timed_outs;
} }
/*
IncomingSplitPacket
*/
bool IncomingSplitPacket::insert(u32 chunk_num, SharedBuffer<u8> &chunkdata)
{
sanity_check(chunk_num < chunk_count);
// If chunk already exists, ignore it.
// Sometimes two identical packets may arrive when there is network
// lag and the server re-sends stuff.
if (chunks.find(chunk_num) != chunks.end())
return false;
// Set chunk data in buffer
chunks[chunk_num] = chunkdata;
return true;
}
SharedBuffer<u8> IncomingSplitPacket::reassemble()
{
sanity_check(allReceived());
// Calculate total size
u32 totalsize = 0;
for (const auto &chunk : chunks)
totalsize += chunk.second.getSize();
SharedBuffer<u8> fulldata(totalsize);
// Copy chunks to data buffer
u32 start = 0;
for (u32 chunk_i = 0; chunk_i < chunk_count; chunk_i++) {
const SharedBuffer<u8> &buf = chunks[chunk_i];
memcpy(&fulldata[start], *buf, buf.getSize());
start += buf.getSize();
}
return fulldata;
}
/* /*
IncomingSplitBuffer IncomingSplitBuffer
*/ */
@ -397,10 +439,7 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
delete i.second; delete i.second;
} }
} }
/*
This will throw a GotSplitPacketException when a full
split packet is constructed.
*/
SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable) SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
{ {
MutexAutoLock listlock(m_map_mutex); MutexAutoLock listlock(m_map_mutex);
@ -426,12 +465,14 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
} }
// Add if doesn't exist // Add if doesn't exist
IncomingSplitPacket *sp;
if (m_buf.find(seqnum) == m_buf.end()) { if (m_buf.find(seqnum) == m_buf.end()) {
m_buf[seqnum] = new IncomingSplitPacket(chunk_count, reliable); sp = new IncomingSplitPacket(chunk_count, reliable);
m_buf[seqnum] = sp;
} else {
sp = m_buf[seqnum];
} }
IncomingSplitPacket *sp = m_buf[seqnum];
if (chunk_count != sp->chunk_count) { if (chunk_count != sp->chunk_count) {
errorstream << "IncomingSplitBuffer::insert(): chunk_count=" errorstream << "IncomingSplitBuffer::insert(): chunk_count="
<< chunk_count << " != sp->chunk_count=" << sp->chunk_count << chunk_count << " != sp->chunk_count=" << sp->chunk_count
@ -443,40 +484,19 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
<<" != sp->reliable="<<sp->reliable <<" != sp->reliable="<<sp->reliable
<<std::endl); <<std::endl);
// If chunk already exists, ignore it.
// Sometimes two identical packets may arrive when there is network
// lag and the server re-sends stuff.
if (sp->chunks.find(chunk_num) != sp->chunks.end())
return SharedBuffer<u8>();
// Cut chunk data out of packet // Cut chunk data out of packet
u32 chunkdatasize = p.data.getSize() - headersize; u32 chunkdatasize = p.data.getSize() - headersize;
SharedBuffer<u8> chunkdata(chunkdatasize); SharedBuffer<u8> chunkdata(chunkdatasize);
memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize); memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
// Set chunk data in buffer if (!sp->insert(chunk_num, chunkdata))
sp->chunks[chunk_num] = chunkdata; return SharedBuffer<u8>();
// If not all chunks are received, return empty buffer // If not all chunks are received, return empty buffer
if (!sp->allReceived()) if (!sp->allReceived())
return SharedBuffer<u8>(); return SharedBuffer<u8>();
// Calculate total size SharedBuffer<u8> fulldata = sp->reassemble();
u32 totalsize = 0;
for (const auto &chunk : sp->chunks) {
totalsize += chunk.second.getSize();
}
SharedBuffer<u8> fulldata(totalsize);
// Copy chunks to data buffer
u32 start = 0;
for (u32 chunk_i=0; chunk_i<sp->chunk_count; chunk_i++) {
const SharedBuffer<u8> &buf = sp->chunks[chunk_i];
u16 buf_chunkdatasize = buf.getSize();
memcpy(&fulldata[start], *buf, buf_chunkdatasize);
start += buf_chunkdatasize;
}
// Remove sp from buffer // Remove sp from buffer
m_buf.erase(seqnum); m_buf.erase(seqnum);
@ -484,6 +504,7 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
return fulldata; return fulldata;
} }
void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
{ {
std::deque<u16> remove_queue; std::deque<u16> remove_queue;

@ -121,16 +121,20 @@ struct IncomingSplitPacket
IncomingSplitPacket() = delete; IncomingSplitPacket() = delete;
// Key is chunk number, value is data without headers
std::map<u16, SharedBuffer<u8>> chunks;
u32 chunk_count;
float time = 0.0f; // Seconds from adding float time = 0.0f; // Seconds from adding
bool reliable = false; // If true, isn't deleted on timeout u32 chunk_count;
bool reliable; // If true, isn't deleted on timeout
bool allReceived() const bool allReceived() const
{ {
return (chunks.size() == chunk_count); return (chunks.size() == chunk_count);
} }
bool insert(u32 chunk_num, SharedBuffer<u8> &chunkdata);
SharedBuffer<u8> reassemble();
private:
// Key is chunk number, value is data without headers
std::map<u16, SharedBuffer<u8>> chunks;
}; };
/* /*