Network: Delete copy constructor and use std::move instead (#11642)

This is a follow-up change which disables class copies where possible to avoid unnecessary memory movements.
This commit is contained in:
SmallJoker 2021-12-01 20:22:33 +01:00 committed by GitHub
parent 1dc1305ada
commit 57a59ae92d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 600 additions and 552 deletions

@ -877,7 +877,7 @@ void Client::ProcessData(NetworkPacket *pkt)
*/
if(sender_peer_id != PEER_ID_SERVER) {
infostream << "Client::ProcessData(): Discarding data not "
"coming from server: peer_id=" << sender_peer_id
"coming from server: peer_id=" << sender_peer_id << " command=" << pkt->getCommand()
<< std::endl;
return;
}

@ -62,18 +62,27 @@ namespace con
#define PING_TIMEOUT 5.0
BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
u16 BufferedPacket::getSeqnum() const
{
if (size() < BASE_HEADER_SIZE + 3)
return 0; // should never happen
return readU16(&data[BASE_HEADER_SIZE + 1]);
}
BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, session_t sender_peer_id, u8 channel)
{
u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
BufferedPacket p(packet_size);
p.address = address;
writeU32(&p.data[0], protocol_id);
writeU16(&p.data[4], sender_peer_id);
writeU8(&p.data[6], channel);
BufferedPacketPtr p(new BufferedPacket(packet_size));
p->address = address;
memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
writeU32(&p->data[0], protocol_id);
writeU16(&p->data[4], sender_peer_id);
writeU8(&p->data[6], channel);
memcpy(&p->data[BASE_HEADER_SIZE], *data, data.getSize());
return p;
}
@ -169,9 +178,8 @@ void ReliablePacketBuffer::print()
MutexAutoLock listlock(m_list_mutex);
LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
unsigned int index = 0;
for (BufferedPacket &bufferedPacket : m_list) {
u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
LOG(dout_con<<index<< ":" << s << std::endl);
for (BufferedPacketPtr &packet : m_list) {
LOG(dout_con<<index<< ":" << packet->getSeqnum() << std::endl);
index++;
}
}
@ -188,16 +196,13 @@ u32 ReliablePacketBuffer::size()
return m_list.size();
}
RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
RPBSearchResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum)
{
std::list<BufferedPacket>::iterator i = m_list.begin();
for(; i != m_list.end(); ++i)
{
u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
if (s == seqnum)
break;
for (auto it = m_list.begin(); it != m_list.end(); ++it) {
if ((*it)->getSeqnum() == seqnum)
return it;
}
return i;
return m_list.end();
}
bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
@ -205,54 +210,54 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
return false;
const BufferedPacket &p = m_list.front();
result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
result = m_list.front()->getSeqnum();
return true;
}
BufferedPacket ReliablePacketBuffer::popFirst()
BufferedPacketPtr ReliablePacketBuffer::popFirst()
{
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
throw NotFoundException("Buffer is empty");
BufferedPacket p = std::move(m_list.front());
BufferedPacketPtr p(m_list.front());
m_list.pop_front();
if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
m_oldest_non_answered_ack =
readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
return p;
}
BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum)
{
MutexAutoLock listlock(m_list_mutex);
RPBSearchResult r = findPacket(seqnum);
if (r == notFound()) {
RPBSearchResult r = findPacketNoLock(seqnum);
if (r == m_list.end()) {
LOG(dout_con<<"Sequence number: " << seqnum
<< " not found in reliable buffer"<<std::endl);
throw NotFoundException("seqnum not found in buffer");
}
BufferedPacket p = std::move(*r);
BufferedPacketPtr p(*r);
m_list.erase(r);
if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
m_oldest_non_answered_ack =
readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
return p;
}
void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
void ReliablePacketBuffer::insert(BufferedPacketPtr &p_ptr, u16 next_expected)
{
MutexAutoLock listlock(m_list_mutex);
if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
const BufferedPacket &p = *p_ptr;
if (p.size() < BASE_HEADER_SIZE + 3) {
errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
"reliable packet" << std::endl;
return;
@ -263,7 +268,7 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
<< std::endl;
return;
}
u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
const u16 seqnum = p.getSeqnum();
if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
@ -280,44 +285,44 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
// Find the right place for the packet and insert it there
// If list is empty, just add it
if (m_list.empty())
{
m_list.push_back(p);
if (m_list.empty()) {
m_list.push_back(p_ptr);
m_oldest_non_answered_ack = seqnum;
// Done.
return;
}
// Otherwise find the right place
std::list<BufferedPacket>::iterator i = m_list.begin();
auto it = m_list.begin();
// Find the first packet in the list which has a higher seqnum
u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
u16 s = (*it)->getSeqnum();
/* case seqnum is smaller then next_expected seqnum */
/* this is true e.g. on wrap around */
if (seqnum < next_expected) {
while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
++i;
if (i != m_list.end())
s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
while(((s < seqnum) || (s >= next_expected)) && (it != m_list.end())) {
++it;
if (it != m_list.end())
s = (*it)->getSeqnum();
}
}
/* non wrap around case (at least for incoming and next_expected */
else
{
while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
++i;
if (i != m_list.end())
s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
while(((s < seqnum) && (s >= next_expected)) && (it != m_list.end())) {
++it;
if (it != m_list.end())
s = (*it)->getSeqnum();
}
}
if (s == seqnum) {
/* nothing to do this seems to be a resent packet */
/* for paranoia reason data should be compared */
auto &i = *it;
if (
(readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
(i->data.getSize() != p.data.getSize()) ||
(i->getSeqnum() != seqnum) ||
(i->size() != p.size()) ||
(i->address != p.address)
)
{
@ -325,51 +330,52 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
fprintf(stderr,
"Duplicated seqnum %d non matching packet detected:\n",
seqnum);
fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
fprintf(stderr, "Old: seqnum: %05d size: %04zu, address: %s\n",
i->getSeqnum(), i->size(),
i->address.serializeString().c_str());
fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
fprintf(stderr, "New: seqnum: %05d size: %04zu, address: %s\n",
p.getSeqnum(), p.size(),
p.address.serializeString().c_str());
throw IncomingDataCorruption("duplicated packet isn't same as original one");
}
}
/* insert or push back */
else if (i != m_list.end()) {
m_list.insert(i, p);
else if (it != m_list.end()) {
m_list.insert(it, p_ptr);
} else {
m_list.push_back(p);
m_list.push_back(p_ptr);
}
/* update last packet number */
m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]);
m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
void ReliablePacketBuffer::incrementTimeouts(float dtime)
{
MutexAutoLock listlock(m_list_mutex);
for (BufferedPacket &bufferedPacket : m_list) {
bufferedPacket.time += dtime;
bufferedPacket.totaltime += dtime;
for (auto &packet : m_list) {
packet->time += dtime;
packet->totaltime += dtime;
}
}
std::list<BufferedPacket>
std::list<ConstSharedPtr<BufferedPacket>>
ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
{
MutexAutoLock listlock(m_list_mutex);
std::list<BufferedPacket> timed_outs;
for (BufferedPacket &bufferedPacket : m_list) {
if (bufferedPacket.time >= timeout) {
// caller will resend packet so reset time and increase counter
bufferedPacket.time = 0.0f;
bufferedPacket.resend_count++;
std::list<ConstSharedPtr<BufferedPacket>> timed_outs;
for (auto &packet : m_list) {
if (packet->time < timeout)
continue;
timed_outs.push_back(bufferedPacket);
// caller will resend packet so reset time and increase counter
packet->time = 0.0f;
packet->resend_count++;
if (timed_outs.size() >= max_packets)
break;
}
timed_outs.emplace_back(packet);
if (timed_outs.size() >= max_packets)
break;
}
return timed_outs;
}
@ -428,11 +434,13 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
}
}
SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacketPtr &p_ptr, bool reliable)
{
MutexAutoLock listlock(m_map_mutex);
const BufferedPacket &p = *p_ptr;
u32 headersize = BASE_HEADER_SIZE + 7;
if (p.data.getSize() < headersize) {
if (p.size() < headersize) {
errorstream << "Invalid data size for split packet" << std::endl;
return SharedBuffer<u8>();
}
@ -473,7 +481,7 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
<<std::endl);
// Cut chunk data out of packet
u32 chunkdatasize = p.data.getSize() - headersize;
u32 chunkdatasize = p.size() - headersize;
SharedBuffer<u8> chunkdata(chunkdatasize);
memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
@ -520,14 +528,67 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
ConnectionCommand
*/
void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
bool reliable_)
ConnectionCommandPtr ConnectionCommand::create(ConnectionCommandType type)
{
type = CONNCMD_SEND;
peer_id = peer_id_;
channelnum = channelnum_;
data = pkt->oldForgePacket();
reliable = reliable_;
return ConnectionCommandPtr(new ConnectionCommand(type));
}
ConnectionCommandPtr ConnectionCommand::serve(Address address)
{
auto c = create(CONNCMD_SERVE);
c->address = address;
return c;
}
ConnectionCommandPtr ConnectionCommand::connect(Address address)
{
auto c = create(CONNCMD_CONNECT);
c->address = address;
return c;
}
ConnectionCommandPtr ConnectionCommand::disconnect()
{
return create(CONNCMD_DISCONNECT);
}
ConnectionCommandPtr ConnectionCommand::disconnect_peer(session_t peer_id)
{
auto c = create(CONNCMD_DISCONNECT_PEER);
c->peer_id = peer_id;
return c;
}
ConnectionCommandPtr ConnectionCommand::send(session_t peer_id, u8 channelnum,
NetworkPacket *pkt, bool reliable)
{
auto c = create(CONNCMD_SEND);
c->peer_id = peer_id;
c->channelnum = channelnum;
c->reliable = reliable;
c->data = pkt->oldForgePacket();
return c;
}
ConnectionCommandPtr ConnectionCommand::ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data)
{
auto c = create(CONCMD_ACK);
c->peer_id = peer_id;
c->channelnum = channelnum;
c->reliable = false;
data.copyTo(c->data);
return c;
}
ConnectionCommandPtr ConnectionCommand::createPeer(session_t peer_id, const Buffer<u8> &data)
{
auto c = create(CONCMD_CREATE_PEER);
c->peer_id = peer_id;
c->channelnum = 0;
c->reliable = true;
c->raw = true;
data.copyTo(c->data);
return c;
}
/*
@ -562,39 +623,38 @@ void Channel::setNextSplitSeqNum(u16 seqnum)
u16 Channel::getOutgoingSequenceNumber(bool& successful)
{
MutexAutoLock internal(m_internal_mutex);
u16 retval = next_outgoing_seqnum;
u16 lowest_unacked_seqnumber;
successful = false;
/* shortcut if there ain't any packet in outgoing list */
if (outgoing_reliables_sent.empty())
{
if (outgoing_reliables_sent.empty()) {
successful = true;
next_outgoing_seqnum++;
return retval;
}
if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
{
u16 lowest_unacked_seqnumber;
if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) {
if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
// ugly cast but this one is required in order to tell compiler we
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > m_window_size) {
successful = false;
return 0;
}
}
else {
} else {
// ugly cast but this one is required in order to tell compiler we
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
m_window_size) {
successful = false;
return 0;
}
}
}
successful = true;
next_outgoing_seqnum++;
return retval;
}
@ -946,45 +1006,45 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
return false;
}
void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
void UDPPeer::PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size)
{
if (m_pending_disconnect)
return;
Channel &chan = channels[c.channelnum];
Channel &chan = channels[c->channelnum];
if (chan.queued_commands.empty() &&
/* don't queue more packets then window size */
(chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
(chan.queued_reliables.size() + 1 < chan.getWindowSize() / 2)) {
LOG(dout_con<<m_connection->getDesc()
<<" processing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() << std::endl);
if (!processReliableSendCommand(c,max_packet_size)) {
chan.queued_commands.push_back(c);
}
}
else {
<<" processing reliable command for peer id: " << c->peer_id
<<" data size: " << c->data.getSize() << std::endl);
if (processReliableSendCommand(c, max_packet_size))
return;
} else {
LOG(dout_con<<m_connection->getDesc()
<<" Queueing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() <<std::endl);
chan.queued_commands.push_back(c);
if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
<<" Queueing reliable command for peer id: " << c->peer_id
<<" data size: " << c->data.getSize() <<std::endl);
if (chan.queued_commands.size() + 1 >= chan.getWindowSize() / 2) {
LOG(derr_con << m_connection->getDesc()
<< "Possible packet stall to peer id: " << c.peer_id
<< "Possible packet stall to peer id: " << c->peer_id
<< " queued_commands=" << chan.queued_commands.size()
<< std::endl);
}
}
chan.queued_commands.push_back(c);
}
bool UDPPeer::processReliableSendCommand(
ConnectionCommand &c,
ConnectionCommandPtr &c_ptr,
unsigned int max_packet_size)
{
if (m_pending_disconnect)
return true;
const auto &c = *c_ptr;
Channel &chan = channels[c.channelnum];
u32 chunksize_max = max_packet_size
@ -1003,9 +1063,9 @@ bool UDPPeer::processReliableSendCommand(
chan.setNextSplitSeqNum(split_sequence_number);
}
bool have_sequence_number = true;
bool have_sequence_number = false;
bool have_initial_sequence_number = false;
std::queue<BufferedPacket> toadd;
std::queue<BufferedPacketPtr> toadd;
volatile u16 initial_sequence_number = 0;
for (SharedBuffer<u8> &original : originals) {
@ -1024,25 +1084,23 @@ bool UDPPeer::processReliableSendCommand(
SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
// Add base headers and make a packet
BufferedPacket p = con::makePacket(address, reliable,
BufferedPacketPtr p = con::makePacket(address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
c.channelnum);
toadd.push(std::move(p));
toadd.push(p);
}
if (have_sequence_number) {
volatile u16 pcount = 0;
while (!toadd.empty()) {
BufferedPacket p = std::move(toadd.front());
BufferedPacketPtr p = toadd.front();
toadd.pop();
// LOG(dout_con<<connection->getDesc()
// << " queuing reliable packet for peer_id: " << c.peer_id
// << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl)
chan.queued_reliables.push(std::move(p));
pcount++;
chan.queued_reliables.push(p);
}
sanity_check(chan.queued_reliables.size() < 0xFFFF);
return true;
@ -1051,6 +1109,7 @@ bool UDPPeer::processReliableSendCommand(
volatile u16 packets_available = toadd.size();
/* we didn't get a single sequence number no need to fill queue */
if (!have_initial_sequence_number) {
LOG(derr_con << m_connection->getDesc() << "Ran out of sequence numbers!" << std::endl);
return false;
}
@ -1096,18 +1155,18 @@ void UDPPeer::RunCommandQueues(
(channel.queued_reliables.size() < maxtransfer) &&
(commands_processed < maxcommands)) {
try {
ConnectionCommand c = channel.queued_commands.front();
ConnectionCommandPtr c = channel.queued_commands.front();
LOG(dout_con << m_connection->getDesc()
<< " processing queued reliable command " << std::endl);
// Packet is processed, remove it from queue
if (processReliableSendCommand(c,max_packet_size)) {
if (processReliableSendCommand(c, max_packet_size)) {
channel.queued_commands.pop_front();
} else {
LOG(dout_con << m_connection->getDesc()
<< " Failed to queue packets for peer_id: " << c.peer_id
<< ", delaying sending of " << c.data.getSize()
<< " Failed to queue packets for peer_id: " << c->peer_id
<< ", delaying sending of " << c->data.getSize()
<< " bytes" << std::endl);
}
}
@ -1130,13 +1189,70 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
channels[channel].setNextSplitSeqNum(seqnum);
}
SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable)
{
assert(channel < CHANNEL_COUNT); // Pre-condition
return channels[channel].incoming_splits.insert(toadd, reliable);
}
/*
ConnectionEvent
*/
const char *ConnectionEvent::describe() const
{
switch(type) {
case CONNEVENT_NONE:
return "CONNEVENT_NONE";
case CONNEVENT_DATA_RECEIVED:
return "CONNEVENT_DATA_RECEIVED";
case CONNEVENT_PEER_ADDED:
return "CONNEVENT_PEER_ADDED";
case CONNEVENT_PEER_REMOVED:
return "CONNEVENT_PEER_REMOVED";
case CONNEVENT_BIND_FAILED:
return "CONNEVENT_BIND_FAILED";
}
return "Invalid ConnectionEvent";
}
ConnectionEventPtr ConnectionEvent::create(ConnectionEventType type)
{
return std::shared_ptr<ConnectionEvent>(new ConnectionEvent(type));
}
ConnectionEventPtr ConnectionEvent::dataReceived(session_t peer_id, const Buffer<u8> &data)
{
auto e = create(CONNEVENT_DATA_RECEIVED);
e->peer_id = peer_id;
data.copyTo(e->data);
return e;
}
ConnectionEventPtr ConnectionEvent::peerAdded(session_t peer_id, Address address)
{
auto e = create(CONNEVENT_PEER_ADDED);
e->peer_id = peer_id;
e->address = address;
return e;
}
ConnectionEventPtr ConnectionEvent::peerRemoved(session_t peer_id, bool is_timeout, Address address)
{
auto e = create(CONNEVENT_PEER_REMOVED);
e->peer_id = peer_id;
e->timeout = is_timeout;
e->address = address;
return e;
}
ConnectionEventPtr ConnectionEvent::bindFailed()
{
return create(CONNEVENT_BIND_FAILED);
}
/*
Connection
*/
@ -1186,18 +1302,12 @@ Connection::~Connection()
/* Internal stuff */
void Connection::putEvent(const ConnectionEvent &e)
void Connection::putEvent(ConnectionEventPtr e)
{
assert(e.type != CONNEVENT_NONE); // Pre-condition
assert(e->type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(e);
}
void Connection::putEvent(ConnectionEvent &&e)
{
assert(e.type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(std::move(e));
}
void Connection::TriggerSend()
{
m_sendThread->Trigger();
@ -1260,11 +1370,9 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
Address peer_address;
//any peer has a primary address this never fails!
peer->getAddress(MTP_PRIMARY, peer_address);
// Create event
ConnectionEvent e;
e.peerRemoved(peer_id, timeout, peer_address);
putEvent(e);
// Create event
putEvent(ConnectionEvent::peerRemoved(peer_id, timeout, peer_address));
peer->Drop();
return true;
@ -1272,18 +1380,16 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
/* Interface */
ConnectionEvent Connection::waitEvent(u32 timeout_ms)
ConnectionEventPtr Connection::waitEvent(u32 timeout_ms)
{
try {
return m_event_queue.pop_front(timeout_ms);
} catch(ItemNotFoundException &ex) {
ConnectionEvent e;
e.type = CONNEVENT_NONE;
return e;
return ConnectionEvent::create(CONNEVENT_NONE);
}
}
void Connection::putCommand(const ConnectionCommand &c)
void Connection::putCommand(ConnectionCommandPtr c)
{
if (!m_shutting_down) {
m_command_queue.push_back(c);
@ -1291,26 +1397,14 @@ void Connection::putCommand(const ConnectionCommand &c)
}
}
void Connection::putCommand(ConnectionCommand &&c)
{
if (!m_shutting_down) {
m_command_queue.push_back(std::move(c));
m_sendThread->Trigger();
}
}
void Connection::Serve(Address bind_addr)
{
ConnectionCommand c;
c.serve(bind_addr);
putCommand(c);
putCommand(ConnectionCommand::serve(bind_addr));
}
void Connection::Connect(Address address)
{
ConnectionCommand c;
c.connect(address);
putCommand(c);
putCommand(ConnectionCommand::connect(address));
}
bool Connection::Connected()
@ -1332,9 +1426,7 @@ bool Connection::Connected()
void Connection::Disconnect()
{
ConnectionCommand c;
c.disconnect();
putCommand(c);
putCommand(ConnectionCommand::disconnect());
}
bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
@ -1345,11 +1437,15 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
This is not considered to be a problem (is it?)
*/
for(;;) {
ConnectionEvent e = waitEvent(timeout);
if (e.type != CONNEVENT_NONE)
ConnectionEventPtr e_ptr = waitEvent(timeout);
const ConnectionEvent &e = *e_ptr;
if (e.type != CONNEVENT_NONE) {
LOG(dout_con << getDesc() << ": Receive: got event: "
<< e.describe() << std::endl);
switch(e.type) {
}
switch (e.type) {
case CONNEVENT_NONE:
return false;
case CONNEVENT_DATA_RECEIVED:
@ -1397,10 +1493,7 @@ void Connection::Send(session_t peer_id, u8 channelnum,
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
ConnectionCommand c;
c.send(peer_id, channelnum, pkt, reliable);
putCommand(std::move(c));
putCommand(ConnectionCommand::send(peer_id, channelnum, pkt, reliable));
}
Address Connection::GetPeerAddress(session_t peer_id)
@ -1499,41 +1592,31 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
LOG(dout_con << getDesc()
<< "createPeer(): giving peer_id=" << peer_id_new << std::endl);
ConnectionCommand cmd;
Buffer<u8> reply(4);
writeU8(&reply[0], PACKET_TYPE_CONTROL);
writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
writeU16(&reply[2], peer_id_new);
cmd.createPeer(peer_id_new,reply);
putCommand(std::move(cmd));
{
Buffer<u8> reply(4);
writeU8(&reply[0], PACKET_TYPE_CONTROL);
writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
writeU16(&reply[2], peer_id_new);
putCommand(ConnectionCommand::createPeer(peer_id_new, reply));
}
// Create peer addition event
ConnectionEvent e;
e.peerAdded(peer_id_new, sender);
putEvent(e);
putEvent(ConnectionEvent::peerAdded(peer_id_new, sender));
// We're now talking to a valid peer_id
return peer_id_new;
}
void Connection::PrintInfo(std::ostream &out)
{
m_info_mutex.lock();
out<<getDesc()<<": ";
m_info_mutex.unlock();
}
const std::string Connection::getDesc()
{
MutexAutoLock _(m_info_mutex);
return std::string("con(")+
itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
}
void Connection::DisconnectPeer(session_t peer_id)
{
ConnectionCommand discon;
discon.disconnect_peer(peer_id);
putCommand(discon);
putCommand(ConnectionCommand::disconnect_peer(peer_id));
}
void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
@ -1545,14 +1628,12 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
" channel: " << (channelnum & 0xFF) <<
" seqnum: " << seqnum << std::endl);
ConnectionCommand c;
SharedBuffer<u8> ack(4);
writeU8(&ack[0], PACKET_TYPE_CONTROL);
writeU8(&ack[1], CONTROLTYPE_ACK);
writeU16(&ack[2], seqnum);
c.ack(peer_id, channelnum, ack);
putCommand(std::move(c));
putCommand(ConnectionCommand::ack(peer_id, channelnum, ack));
m_sendThread->Trigger();
}

@ -32,6 +32,95 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <vector>
#include <map>
#define MAX_UDP_PEERS 65535
/*
=== NOTES ===
A packet is sent through a channel to a peer with a basic header:
Header (7 bytes):
[0] u32 protocol_id
[4] session_t sender_peer_id
[6] u8 channel
sender_peer_id:
Unique to each peer.
value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
value 1 (PEER_ID_SERVER) is reserved for server
these constants are defined in constants.h
channel:
Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
*/
#define BASE_HEADER_SIZE 7
#define CHANNEL_COUNT 3
/*
Packet types:
CONTROL: This is a packet used by the protocol.
- When this is processed, nothing is handed to the user.
Header (2 byte):
[0] u8 type
[1] u8 controltype
controltype and data description:
CONTROLTYPE_ACK
[2] u16 seqnum
CONTROLTYPE_SET_PEER_ID
[2] session_t peer_id_new
CONTROLTYPE_PING
- There is no actual reply, but this can be sent in a reliable
packet to get a reply
CONTROLTYPE_DISCO
*/
enum ControlType : u8 {
CONTROLTYPE_ACK = 0,
CONTROLTYPE_SET_PEER_ID = 1,
CONTROLTYPE_PING = 2,
CONTROLTYPE_DISCO = 3,
};
/*
ORIGINAL: This is a plain packet with no control and no error
checking at all.
- When this is processed, it is directly handed to the user.
Header (1 byte):
[0] u8 type
*/
//#define TYPE_ORIGINAL 1
#define ORIGINAL_HEADER_SIZE 1
/*
SPLIT: These are sequences of packets forming one bigger piece of
data.
- When processed and all the packet_nums 0...packet_count-1 are
present (this should be buffered), the resulting data shall be
directly handed to the user.
- If the data fails to come up in a reasonable time, the buffer shall
be silently discarded.
- These can be sent as-is or atop of a RELIABLE packet stream.
Header (7 bytes):
[0] u8 type
[1] u16 seqnum
[3] u16 chunk_count
[5] u16 chunk_num
*/
//#define TYPE_SPLIT 2
/*
RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
and they shall be delivered in the same order as sent. This is done
with a buffer in the receiving and transmitting end.
- When this is processed, the contents of each packet is recursively
processed as packets.
Header (3 bytes):
[0] u8 type
[1] u16 seqnum
*/
//#define TYPE_RELIABLE 3
#define RELIABLE_HEADER_SIZE 3
#define SEQNUM_INITIAL 65500
#define SEQNUM_MAX 65535
class NetworkPacket;
namespace con
@ -46,9 +135,13 @@ typedef enum MTProtocols {
MTP_MINETEST_RELIABLE_UDP
} MTProtocols;
#define MAX_UDP_PEERS 65535
#define SEQNUM_MAX 65535
enum PacketType : u8 {
PACKET_TYPE_CONTROL = 0,
PACKET_TYPE_ORIGINAL = 1,
PACKET_TYPE_SPLIT = 2,
PACKET_TYPE_RELIABLE = 3,
PACKET_TYPE_MAX
};
inline bool seqnum_higher(u16 totest, u16 base)
{
@ -85,24 +178,40 @@ static inline float CALC_DTIME(u64 lasttime, u64 curtime)
return MYMAX(MYMIN(value,0.1),0.0);
}
struct BufferedPacket
{
BufferedPacket(u8 *a_data, u32 a_size):
data(a_data, a_size)
{}
BufferedPacket(u32 a_size):
data(a_size)
{}
Buffer<u8> data; // Data of the packet, including headers
/*
Struct for all kinds of packets. Includes following data:
BASE_HEADER
u8[] packet data (usually copied from SharedBuffer<u8>)
*/
struct BufferedPacket {
BufferedPacket(u32 a_size)
{
m_data.resize(a_size);
data = &m_data[0];
}
DISABLE_CLASS_COPY(BufferedPacket)
u16 getSeqnum() const;
inline const size_t size() const { return m_data.size(); }
u8 *data; // Direct memory access
float time = 0.0f; // Seconds from buffering the packet or re-sending
float totaltime = 0.0f; // Seconds from buffering the packet
u64 absolute_send_time = -1;
Address address; // Sender or destination
unsigned int resend_count = 0;
private:
std::vector<u8> m_data; // Data of the packet, including headers
};
typedef std::shared_ptr<BufferedPacket> BufferedPacketPtr;
// This adds the base headers to the data and makes a packet out of it
BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, session_t sender_peer_id, u8 channel);
// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
@ -136,101 +245,12 @@ private:
std::map<u16, SharedBuffer<u8>> chunks;
};
/*
=== NOTES ===
A packet is sent through a channel to a peer with a basic header:
Header (7 bytes):
[0] u32 protocol_id
[4] session_t sender_peer_id
[6] u8 channel
sender_peer_id:
Unique to each peer.
value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
value 1 (PEER_ID_SERVER) is reserved for server
these constants are defined in constants.h
channel:
Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
*/
#define BASE_HEADER_SIZE 7
#define CHANNEL_COUNT 3
/*
Packet types:
CONTROL: This is a packet used by the protocol.
- When this is processed, nothing is handed to the user.
Header (2 byte):
[0] u8 type
[1] u8 controltype
controltype and data description:
CONTROLTYPE_ACK
[2] u16 seqnum
CONTROLTYPE_SET_PEER_ID
[2] session_t peer_id_new
CONTROLTYPE_PING
- There is no actual reply, but this can be sent in a reliable
packet to get a reply
CONTROLTYPE_DISCO
*/
//#define TYPE_CONTROL 0
#define CONTROLTYPE_ACK 0
#define CONTROLTYPE_SET_PEER_ID 1
#define CONTROLTYPE_PING 2
#define CONTROLTYPE_DISCO 3
/*
ORIGINAL: This is a plain packet with no control and no error
checking at all.
- When this is processed, it is directly handed to the user.
Header (1 byte):
[0] u8 type
*/
//#define TYPE_ORIGINAL 1
#define ORIGINAL_HEADER_SIZE 1
/*
SPLIT: These are sequences of packets forming one bigger piece of
data.
- When processed and all the packet_nums 0...packet_count-1 are
present (this should be buffered), the resulting data shall be
directly handed to the user.
- If the data fails to come up in a reasonable time, the buffer shall
be silently discarded.
- These can be sent as-is or atop of a RELIABLE packet stream.
Header (7 bytes):
[0] u8 type
[1] u16 seqnum
[3] u16 chunk_count
[5] u16 chunk_num
*/
//#define TYPE_SPLIT 2
/*
RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
and they shall be delivered in the same order as sent. This is done
with a buffer in the receiving and transmitting end.
- When this is processed, the contents of each packet is recursively
processed as packets.
Header (3 bytes):
[0] u8 type
[1] u16 seqnum
*/
//#define TYPE_RELIABLE 3
#define RELIABLE_HEADER_SIZE 3
#define SEQNUM_INITIAL 65500
enum PacketType: u8 {
PACKET_TYPE_CONTROL = 0,
PACKET_TYPE_ORIGINAL = 1,
PACKET_TYPE_SPLIT = 2,
PACKET_TYPE_RELIABLE = 3,
PACKET_TYPE_MAX
};
/*
A buffer which stores reliable packets and sorts them internally
for fast access to the smallest one.
*/
typedef std::list<BufferedPacket>::iterator RPBSearchResult;
typedef std::list<BufferedPacketPtr>::iterator RPBSearchResult;
class ReliablePacketBuffer
{
@ -239,12 +259,12 @@ public:
bool getFirstSeqnum(u16& result);
BufferedPacket popFirst();
BufferedPacket popSeqnum(u16 seqnum);
void insert(const BufferedPacket &p, u16 next_expected);
BufferedPacketPtr popFirst();
BufferedPacketPtr popSeqnum(u16 seqnum);
void insert(BufferedPacketPtr &p_ptr, u16 next_expected);
void incrementTimeouts(float dtime);
std::list<BufferedPacket> getTimedOuts(float timeout, u32 max_packets);
std::list<ConstSharedPtr<BufferedPacket>> getTimedOuts(float timeout, u32 max_packets);
void print();
bool empty();
@ -252,10 +272,9 @@ public:
private:
RPBSearchResult findPacket(u16 seqnum); // does not perform locking
inline RPBSearchResult notFound() { return m_list.end(); }
RPBSearchResult findPacketNoLock(u16 seqnum);
std::list<BufferedPacket> m_list;
std::list<BufferedPacketPtr> m_list;
u16 m_oldest_non_answered_ack;
@ -274,7 +293,7 @@ public:
Returns a reference counted buffer of length != 0 when a full split
packet is constructed. If not, returns one of length 0.
*/
SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
SharedBuffer<u8> insert(BufferedPacketPtr &p_ptr, bool reliable);
void removeUnreliableTimedOuts(float dtime, float timeout);
@ -285,25 +304,6 @@ private:
std::mutex m_map_mutex;
};
struct OutgoingPacket
{
session_t peer_id;
u8 channelnum;
SharedBuffer<u8> data;
bool reliable;
bool ack;
OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
bool reliable_,bool ack_=false):
peer_id(peer_id_),
channelnum(channelnum_),
data(data_),
reliable(reliable_),
ack(ack_)
{
}
};
enum ConnectionCommandType{
CONNCMD_NONE,
CONNCMD_SERVE,
@ -316,9 +316,13 @@ enum ConnectionCommandType{
CONCMD_CREATE_PEER
};
struct ConnectionCommand;
typedef std::shared_ptr<ConnectionCommand> ConnectionCommandPtr;
// This is very similar to ConnectionEvent
struct ConnectionCommand
{
enum ConnectionCommandType type = CONNCMD_NONE;
const ConnectionCommandType type;
Address address;
session_t peer_id = PEER_ID_INEXISTENT;
u8 channelnum = 0;
@ -326,48 +330,21 @@ struct ConnectionCommand
bool reliable = false;
bool raw = false;
ConnectionCommand() = default;
DISABLE_CLASS_COPY(ConnectionCommand);
void serve(Address address_)
{
type = CONNCMD_SERVE;
address = address_;
}
void connect(Address address_)
{
type = CONNCMD_CONNECT;
address = address_;
}
void disconnect()
{
type = CONNCMD_DISCONNECT;
}
void disconnect_peer(session_t peer_id_)
{
type = CONNCMD_DISCONNECT_PEER;
peer_id = peer_id_;
}
static ConnectionCommandPtr serve(Address address);
static ConnectionCommandPtr connect(Address address);
static ConnectionCommandPtr disconnect();
static ConnectionCommandPtr disconnect_peer(session_t peer_id);
static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data);
static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer<u8> &data);
void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
private:
ConnectionCommand(ConnectionCommandType type_) :
type(type_) {}
void ack(session_t peer_id_, u8 channelnum_, const Buffer<u8> &data_)
{
type = CONCMD_ACK;
peer_id = peer_id_;
channelnum = channelnum_;
data = data_;
reliable = false;
}
void createPeer(session_t peer_id_, const Buffer<u8> &data_)
{
type = CONCMD_CREATE_PEER;
peer_id = peer_id_;
data = data_;
channelnum = 0;
reliable = true;
raw = true;
}
static ConnectionCommandPtr create(ConnectionCommandType type);
};
/* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
@ -402,10 +379,10 @@ public:
ReliablePacketBuffer outgoing_reliables_sent;
//queued reliable packets
std::queue<BufferedPacket> queued_reliables;
std::queue<BufferedPacketPtr> queued_reliables;
//queue commands prior splitting to packets
std::deque<ConnectionCommand> queued_commands;
std::deque<ConnectionCommandPtr> queued_commands;
IncomingSplitBuffer incoming_splits;
@ -514,7 +491,7 @@ class Peer {
public:
friend class PeerHelper;
Peer(Address address_,u16 id_,Connection* connection) :
Peer(Address address_,session_t id_,Connection* connection) :
id(id_),
m_connection(connection),
address(address_),
@ -528,11 +505,11 @@ class Peer {
};
// Unique id of the peer
u16 id;
const session_t id;
void Drop();
virtual void PutReliableSendCommand(ConnectionCommand &c,
virtual void PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size) {};
virtual bool getAddress(MTProtocols type, Address& toset) = 0;
@ -549,7 +526,7 @@ class Peer {
virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
virtual SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable)
{
errorstream << "Peer::addSplitPacket called,"
@ -586,7 +563,7 @@ class Peer {
bool IncUseCount();
void DecUseCount();
std::mutex m_exclusive_access_mutex;
mutable std::mutex m_exclusive_access_mutex;
bool m_pending_deletion = false;
@ -634,7 +611,7 @@ public:
UDPPeer(u16 a_id, Address a_address, Connection* connection);
virtual ~UDPPeer() = default;
void PutReliableSendCommand(ConnectionCommand &c,
void PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size);
bool getAddress(MTProtocols type, Address& toset);
@ -642,7 +619,7 @@ public:
u16 getNextSplitSequenceNumber(u8 channel);
void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable);
protected:
@ -671,7 +648,7 @@ private:
float resend_timeout = 0.5;
bool processReliableSendCommand(
ConnectionCommand &c,
ConnectionCommandPtr &c_ptr,
unsigned int max_packet_size);
};
@ -679,7 +656,7 @@ private:
Connection
*/
enum ConnectionEventType{
enum ConnectionEventType {
CONNEVENT_NONE,
CONNEVENT_DATA_RECEIVED,
CONNEVENT_PEER_ADDED,
@ -687,56 +664,32 @@ enum ConnectionEventType{
CONNEVENT_BIND_FAILED,
};
struct ConnectionEvent;
typedef std::shared_ptr<ConnectionEvent> ConnectionEventPtr;
// This is very similar to ConnectionCommand
struct ConnectionEvent
{
enum ConnectionEventType type = CONNEVENT_NONE;
const ConnectionEventType type;
session_t peer_id = 0;
Buffer<u8> data;
bool timeout = false;
Address address;
ConnectionEvent() = default;
// We don't want to copy "data"
DISABLE_CLASS_COPY(ConnectionEvent);
const char *describe() const
{
switch(type) {
case CONNEVENT_NONE:
return "CONNEVENT_NONE";
case CONNEVENT_DATA_RECEIVED:
return "CONNEVENT_DATA_RECEIVED";
case CONNEVENT_PEER_ADDED:
return "CONNEVENT_PEER_ADDED";
case CONNEVENT_PEER_REMOVED:
return "CONNEVENT_PEER_REMOVED";
case CONNEVENT_BIND_FAILED:
return "CONNEVENT_BIND_FAILED";
}
return "Invalid ConnectionEvent";
}
static ConnectionEventPtr create(ConnectionEventType type);
static ConnectionEventPtr dataReceived(session_t peer_id, const Buffer<u8> &data);
static ConnectionEventPtr peerAdded(session_t peer_id, Address address);
static ConnectionEventPtr peerRemoved(session_t peer_id, bool is_timeout, Address address);
static ConnectionEventPtr bindFailed();
void dataReceived(session_t peer_id_, const Buffer<u8> &data_)
{
type = CONNEVENT_DATA_RECEIVED;
peer_id = peer_id_;
data = data_;
}
void peerAdded(session_t peer_id_, Address address_)
{
type = CONNEVENT_PEER_ADDED;
peer_id = peer_id_;
address = address_;
}
void peerRemoved(session_t peer_id_, bool timeout_, Address address_)
{
type = CONNEVENT_PEER_REMOVED;
peer_id = peer_id_;
timeout = timeout_;
address = address_;
}
void bindFailed()
{
type = CONNEVENT_BIND_FAILED;
}
const char *describe() const;
private:
ConnectionEvent(ConnectionEventType type_) :
type(type_) {}
};
class PeerHandler;
@ -752,10 +705,9 @@ public:
~Connection();
/* Interface */
ConnectionEvent waitEvent(u32 timeout_ms);
// Warning: creates an unnecessary copy, prefer putCommand(T&&) if possible
void putCommand(const ConnectionCommand &c);
void putCommand(ConnectionCommand &&c);
ConnectionEventPtr waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommandPtr c);
void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
void Serve(Address bind_addr);
@ -785,8 +737,6 @@ protected:
void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
void PrintInfo(std::ostream &out);
std::vector<session_t> getPeerIDs()
{
MutexAutoLock peerlock(m_peers_mutex);
@ -795,13 +745,11 @@ protected:
UDPSocket m_udpSocket;
// Command queue: user -> SendThread
MutexedQueue<ConnectionCommand> m_command_queue;
MutexedQueue<ConnectionCommandPtr> m_command_queue;
bool Receive(NetworkPacket *pkt, u32 timeout);
// Warning: creates an unnecessary copy, prefer putEvent(T&&) if possible
void putEvent(const ConnectionEvent &e);
void putEvent(ConnectionEvent &&e);
void putEvent(ConnectionEventPtr e);
void TriggerSend();
@ -811,7 +759,7 @@ protected:
}
private:
// Event queue: ReceiveThread -> user
MutexedQueue<ConnectionEvent> m_event_queue;
MutexedQueue<ConnectionEventPtr> m_event_queue;
session_t m_peer_id = 0;
u32 m_protocol_id;
@ -823,7 +771,7 @@ private:
std::unique_ptr<ConnectionSendThread> m_sendThread;
std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
std::mutex m_info_mutex;
mutable std::mutex m_info_mutex;
// Backwards compatibility
PeerHandler *m_bc_peerhandler;

@ -50,11 +50,11 @@ std::mutex log_conthread_mutex;
#define WINDOW_SIZE 5
static session_t readPeerId(u8 *packetdata)
static session_t readPeerId(const u8 *packetdata)
{
return readU16(&packetdata[4]);
}
static u8 readChannel(u8 *packetdata)
static u8 readChannel(const u8 *packetdata)
{
return readU8(&packetdata[6]);
}
@ -114,9 +114,9 @@ void *ConnectionSendThread::run()
}
/* translate commands to packets */
ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
while (c.type != CONNCMD_NONE) {
if (c.reliable)
auto c = m_connection->m_command_queue.pop_frontNoEx(0);
while (c && c->type != CONNCMD_NONE) {
if (c->reliable)
processReliableCommand(c);
else
processNonReliableCommand(c);
@ -227,21 +227,21 @@ void ConnectionSendThread::runTimeouts(float dtime)
m_iteration_packets_avaialble -= timed_outs.size();
for (const auto &k : timed_outs) {
u8 channelnum = readChannel(*k.data);
u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1]));
u8 channelnum = readChannel(k->data);
u16 seqnum = k->getSeqnum();
channel.UpdateBytesLost(k.data.getSize());
channel.UpdateBytesLost(k->size());
LOG(derr_con << m_connection->getDesc()
<< "RE-SENDING timed-out RELIABLE to "
<< k.address.serializeString()
<< k->address.serializeString()
<< "(t/o=" << resend_timeout << "): "
<< "count=" << k.resend_count
<< "count=" << k->resend_count
<< ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum
<< std::endl);
rawSend(k);
rawSend(k.get());
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
@ -274,25 +274,24 @@ void ConnectionSendThread::runTimeouts(float dtime)
}
}
void ConnectionSendThread::rawSend(const BufferedPacket &packet)
void ConnectionSendThread::rawSend(const BufferedPacket *p)
{
try {
m_connection->m_udpSocket.Send(packet.address, *packet.data,
packet.data.getSize());
m_connection->m_udpSocket.Send(p->address, p->data, p->size());
LOG(dout_con << m_connection->getDesc()
<< " rawSend: " << packet.data.getSize()
<< " rawSend: " << p->size()
<< " bytes sent" << std::endl);
} catch (SendFailedException &e) {
LOG(derr_con << m_connection->getDesc()
<< "Connection::rawSend(): SendFailedException: "
<< packet.address.serializeString() << std::endl);
<< p->address.serializeString() << std::endl);
}
}
void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
{
try {
p.absolute_send_time = porting::getTimeMs();
p->absolute_send_time = porting::getTimeMs();
// Buffer the packet
channel->outgoing_reliables_sent.insert(p,
(channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
@ -305,7 +304,7 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan
}
// Send the packet
rawSend(p);
rawSend(p.get());
}
bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
@ -321,11 +320,10 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
if (reliable) {
bool have_sequence_number_for_raw_packet = true;
u16 seqnum =
channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
bool have_seqnum = false;
const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
if (!have_sequence_number_for_raw_packet)
if (!have_seqnum)
return false;
SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
@ -333,13 +331,12 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
// Add base headers and make a packet
BufferedPacket p = con::makePacket(peer_address, reliable,
BufferedPacketPtr p = con::makePacket(peer_address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// first check if our send window is already maxed out
if (channel->outgoing_reliables_sent.size()
< channel->getWindowSize()) {
if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a reliable packet to peer_id " << peer_id
<< " channel: " << (u32)channelnum
@ -352,19 +349,19 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
<< " INFO: queueing reliable packet for peer_id: " << peer_id
<< " channel: " << (u32)channelnum
<< " seqnum: " << seqnum << std::endl);
channel->queued_reliables.push(std::move(p));
channel->queued_reliables.push(p);
return false;
}
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// Add base headers and make a packet
BufferedPacket p = con::makePacket(peer_address, data,
BufferedPacketPtr p = con::makePacket(peer_address, data,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// Send the packet
rawSend(p);
rawSend(p.get());
return true;
}
@ -374,11 +371,11 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
return false;
}
void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
{
assert(c.reliable); // Pre-condition
assert(c->reliable); // Pre-condition
switch (c.type) {
switch (c->type) {
case CONNCMD_NONE:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_NONE" << std::endl);
@ -399,7 +396,7 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
case CONCMD_CREATE_PEER:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
/* put to queue if we couldn't send it immediately */
sendReliable(c);
}
@ -412,13 +409,14 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
default:
LOG(dout_con << m_connection->getDesc()
<< " Invalid reliable command type: " << c.type << std::endl);
<< " Invalid reliable command type: " << c->type << std::endl);
}
}
void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
{
const ConnectionCommand &c = *c_ptr;
assert(!c.reliable); // Pre-condition
switch (c.type) {
@ -480,9 +478,7 @@ void ConnectionSendThread::serve(Address bind_address)
}
catch (SocketException &e) {
// Create event
ConnectionEvent ce;
ce.bindFailed();
m_connection->putEvent(ce);
m_connection->putEvent(ConnectionEvent::bindFailed());
}
}
@ -495,9 +491,7 @@ void ConnectionSendThread::connect(Address address)
UDPPeer *peer = m_connection->createServerPeer(address);
// Create event
ConnectionEvent e;
e.peerAdded(peer->id, peer->address);
m_connection->putEvent(e);
m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
Address bind_addr;
@ -586,9 +580,9 @@ void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
}
}
void ConnectionSendThread::sendReliable(ConnectionCommand &c)
void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
{
PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
if (!peer)
return;
@ -604,7 +598,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data
}
}
void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
{
std::vector<session_t> peerids = m_connection->getPeerIDs();
@ -663,8 +657,12 @@ void ConnectionSendThread::sendPackets(float dtime)
// first send queued reliable packets for all peers (if possible)
for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
Channel &channel = udpPeer->channels[i];
u16 next_to_ack = 0;
// Reduces logging verbosity
if (channel.queued_reliables.empty())
continue;
u16 next_to_ack = 0;
channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
u16 next_to_receive = 0;
channel.incoming_reliables.getFirstSeqnum(next_to_receive);
@ -694,13 +692,13 @@ void ConnectionSendThread::sendPackets(float dtime)
channel.outgoing_reliables_sent.size()
< channel.getWindowSize() &&
peer->m_increment_packets_remaining > 0) {
BufferedPacket p = std::move(channel.queued_reliables.front());
BufferedPacketPtr p = channel.queued_reliables.front();
channel.queued_reliables.pop();
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a queued reliable packet "
<< " channel: " << i
<< ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
<< ", seqnum: " << p->getSeqnum()
<< std::endl);
sendAsPacketReliable(p, &channel);
@ -881,17 +879,14 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
try {
// First, see if there any buffered packets we can process now
if (packet_queued) {
bool data_left = true;
session_t peer_id;
SharedBuffer<u8> resultdata;
while (data_left) {
while (true) {
try {
data_left = getFromBuffers(peer_id, resultdata);
if (data_left) {
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(std::move(e));
}
if (!getFromBuffers(peer_id, resultdata))
break;
m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
}
catch (ProcessedSilentlyException &e) {
/* try reading again */
@ -908,7 +903,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
return;
if ((received_size < BASE_HEADER_SIZE) ||
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
@ -999,9 +994,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(std::move(e));
m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
}
catch (ProcessedSilentlyException &e) {
}
@ -1026,10 +1019,11 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8
if (!peer)
continue;
if (dynamic_cast<UDPPeer *>(&peer) == 0)
UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
if (!p)
continue;
for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
for (Channel &channel : p->channels) {
if (checkIncomingBuffers(&channel, peer_id, dst)) {
return true;
}
@ -1042,32 +1036,34 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
session_t &peer_id, SharedBuffer<u8> &dst)
{
u16 firstseqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
if (firstseqnum == channel->readNextIncomingSeqNum()) {
BufferedPacket p = channel->incoming_reliables.popFirst();
peer_id = readPeerId(*p.data);
u8 channelnum = readChannel(*p.data);
u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
return false;
LOG(dout_con << m_connection->getDesc()
<< "UNBUFFERING TYPE_RELIABLE"
<< " seqnum=" << seqnum
<< " peer_id=" << peer_id
<< " channel=" << ((int) channelnum & 0xff)
<< std::endl);
if (firstseqnum != channel->readNextIncomingSeqNum())
return false;
channel->incNextIncomingSeqNum();
BufferedPacketPtr p = channel->incoming_reliables.popFirst();
u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
// Get out the inside packet and re-process it
SharedBuffer<u8> payload(p.data.getSize() - headers_size);
memcpy(*payload, &p.data[headers_size], payload.getSize());
peer_id = readPeerId(p->data); // Carried over to caller function
u8 channelnum = readChannel(p->data);
u16 seqnum = p->getSeqnum();
dst = processPacket(channel, payload, peer_id, channelnum, true);
return true;
}
}
return false;
LOG(dout_con << m_connection->getDesc()
<< "UNBUFFERING TYPE_RELIABLE"
<< " seqnum=" << seqnum
<< " peer_id=" << peer_id
<< " channel=" << ((int) channelnum & 0xff)
<< std::endl);
channel->incNextIncomingSeqNum();
u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
// Get out the inside packet and re-process it
SharedBuffer<u8> payload(p->size() - headers_size);
memcpy(*payload, &p->data[headers_size], payload.getSize());
dst = processPacket(channel, payload, peer_id, channelnum, true);
return true;
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
@ -1115,7 +1111,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
if (packetdata.getSize() < 2)
throw InvalidIncomingDataException("packetdata.getSize() < 2");
u8 controltype = readU8(&(packetdata[1]));
ControlType controltype = (ControlType)readU8(&(packetdata[1]));
if (controltype == CONTROLTYPE_ACK) {
assert(channel != NULL);
@ -1131,7 +1127,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
<< seqnum << " ]" << std::endl);
try {
BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
// the rtt calculation will be a bit off for re-sent packets but that's okay
{
@ -1140,14 +1136,14 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
// a overflow is quite unlikely but as it'd result in major
// rtt miscalculation we handle it here
if (current_time > p.absolute_send_time) {
float rtt = (current_time - p.absolute_send_time) / 1000.0;
if (current_time > p->absolute_send_time) {
float rtt = (current_time - p->absolute_send_time) / 1000.0;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
} else if (p.totaltime > 0) {
float rtt = p.totaltime;
} else if (p->totaltime > 0) {
float rtt = p->totaltime;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
@ -1156,7 +1152,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
}
// put bytes for max bandwidth calculation
channel->UpdateBytesSent(p.data.getSize(), 1);
channel->UpdateBytesSent(p->size(), 1);
if (channel->outgoing_reliables_sent.size() == 0)
m_connection->TriggerSend();
} catch (NotFoundException &e) {
@ -1204,7 +1200,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
throw ProcessedSilentlyException("Got a DISCO");
} else {
LOG(derr_con << m_connection->getDesc()
<< "INVALID TYPE_CONTROL: invalid controltype="
<< "INVALID controltype="
<< ((int) controltype & 0xff) << std::endl);
throw InvalidIncomingDataException("Invalid control type");
}
@ -1232,7 +1228,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe
if (peer->getAddress(MTP_UDP, peer_address)) {
// We have to create a packet again for buffering
// This isn't actually too bad an idea.
BufferedPacket packet = makePacket(peer_address,
BufferedPacketPtr packet = con::makePacket(peer_address,
packetdata,
m_connection->GetProtocolID(),
peer->id,
@ -1267,7 +1263,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
u16 seqnum = readU16(&packetdata[1]);
const u16 seqnum = readU16(&packetdata[1]);
bool is_future_packet = false;
bool is_old_packet = false;
@ -1311,7 +1307,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
// This one comes later, buffer it.
// Actually we have to make a packet to buffer one.
// Well, we have all the ingredients, so just do it.
BufferedPacket packet = con::makePacket(
BufferedPacketPtr packet = con::makePacket(
peer_address,
packetdata,
m_connection->GetProtocolID(),
@ -1328,9 +1324,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
throw ProcessedQueued("Buffered future reliable packet");
} catch (AlreadyExistsException &e) {
} catch (IncomingDataCorruption &e) {
ConnectionCommand discon;
discon.disconnect_peer(peer->id);
m_connection->putCommand(discon);
m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
LOG(derr_con << m_connection->getDesc()
<< "INVALID, TYPE_RELIABLE peer_id: " << peer->id
@ -1351,7 +1345,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
u16 queued_seqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
if (queued_seqnum == seqnum) {
BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
/** TODO find a way to verify the new against the old packet */
}
}

@ -29,6 +29,25 @@ namespace con
class Connection;
struct OutgoingPacket
{
session_t peer_id;
u8 channelnum;
SharedBuffer<u8> data;
bool reliable;
bool ack;
OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
bool reliable_,bool ack_=false):
peer_id(peer_id_),
channelnum(channelnum_),
data(data_),
reliable(reliable_),
ack(ack_)
{
}
};
class ConnectionSendThread : public Thread
{
@ -51,27 +70,27 @@ public:
private:
void runTimeouts(float dtime);
void rawSend(const BufferedPacket &packet);
void rawSend(const BufferedPacket *p);
bool rawSendAsPacket(session_t peer_id, u8 channelnum,
const SharedBuffer<u8> &data, bool reliable);
void processReliableCommand(ConnectionCommand &c);
void processNonReliableCommand(ConnectionCommand &c);
void processReliableCommand(ConnectionCommandPtr &c);
void processNonReliableCommand(ConnectionCommandPtr &c);
void serve(Address bind_address);
void connect(Address address);
void disconnect();
void disconnect_peer(session_t peer_id);
void send(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data);
void sendReliable(ConnectionCommand &c);
void sendReliable(ConnectionCommandPtr &c);
void sendToAll(u8 channelnum, const SharedBuffer<u8> &data);
void sendToAllReliable(ConnectionCommand &c);
void sendToAllReliable(ConnectionCommandPtr &c);
void sendPackets(float dtime);
void sendAsPacket(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data,
bool ack = false);
void sendAsPacketReliable(BufferedPacket &p, Channel *channel);
void sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel);
bool packetsQueued();

@ -124,7 +124,7 @@ void TestConnection::testHelpers()
Address a(127,0,0,1, 10);
const u16 seqnum = 34352;
con::BufferedPacket p1 = con::makePacket(a, data1,
con::BufferedPacketPtr p1 = con::makePacket(a, data1,
proto_id, peer_id, channel);
/*
We should now have a packet with this data:
@ -135,10 +135,10 @@ void TestConnection::testHelpers()
Data:
[7] u8 data1[0]
*/
UASSERT(readU32(&p1.data[0]) == proto_id);
UASSERT(readU16(&p1.data[4]) == peer_id);
UASSERT(readU8(&p1.data[6]) == channel);
UASSERT(readU8(&p1.data[7]) == data1[0]);
UASSERT(readU32(&p1->data[0]) == proto_id);
UASSERT(readU16(&p1->data[4]) == peer_id);
UASSERT(readU8(&p1->data[6]) == channel);
UASSERT(readU8(&p1->data[7]) == data1[0]);
//infostream<<"initial data1[0]="<<((u32)data1[0]&0xff)<<std::endl;

@ -22,6 +22,21 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "irrlichttypes.h"
#include "debug.h" // For assert()
#include <cstring>
#include <memory> // std::shared_ptr
template<typename T> class ConstSharedPtr {
public:
ConstSharedPtr(T *ptr) : ptr(ptr) {}
ConstSharedPtr(const std::shared_ptr<T> &ptr) : ptr(ptr) {}
const T* get() const noexcept { return ptr.get(); }
const T& operator*() const noexcept { return *ptr.get(); }
const T* operator->() const noexcept { return ptr.get(); }
private:
std::shared_ptr<T> ptr;
};
template <typename T>
class Buffer
@ -40,17 +55,11 @@ public:
else
data = NULL;
}
Buffer(const Buffer &buffer)
{
m_size = buffer.m_size;
if(m_size != 0)
{
data = new T[buffer.m_size];
memcpy(data, buffer.data, buffer.m_size);
}
else
data = NULL;
}
// Disable class copy
Buffer(const Buffer &) = delete;
Buffer &operator=(const Buffer &) = delete;
Buffer(Buffer &&buffer)
{
m_size = buffer.m_size;
@ -81,21 +90,6 @@ public:
drop();
}
Buffer& operator=(const Buffer &buffer)
{
if(this == &buffer)
return *this;
drop();
m_size = buffer.m_size;
if(m_size != 0)
{
data = new T[buffer.m_size];
memcpy(data, buffer.data, buffer.m_size);
}
else
data = NULL;
return *this;
}
Buffer& operator=(Buffer &&buffer)
{
if(this == &buffer)
@ -113,6 +107,18 @@ public:
return *this;
}
void copyTo(Buffer &buffer) const
{
buffer.drop();
buffer.m_size = m_size;
if (m_size != 0) {
buffer.data = new T[m_size];
memcpy(buffer.data, data, m_size);
} else {
buffer.data = nullptr;
}
}
T & operator[](unsigned int i) const
{
return data[i];