minetest/src/network/connectionthreads.cpp
Jozef Behran 007ce24a11 Various network performance improvements (#8125)
* Optimize packet construction functions

Some of the functions that construct packets in
connection.cpp are using a const reference to get the raw
packet data to package and others use a value passed
parameter to do that. The ones that use the value passed
parameter suffer from performance hit as the rather bulky
packet data gets a temporary copy when the parameter is
passed before it lands at its final destination inside the
newly constructed packet. The unnecessary temporary copy
hurts quite badly as the underlying class (SharedBuffer)
actually allocates the space for the data in the heap.

Fix the performance hit by converting all of these value
passed parameters to const references. I believe that this
is what the author of the relevant code actually intended
to do as there is a couple of packet construction helper
functions that already use a const reference to get the
raw data.

* Optimize packet sender thread class

Most of the data sending methods of the packet sender thread
class use a value passed parameter for the packet data to be
sent. This causes the rather bulky data to be allocated on
the heap and copied, slowing the packet sending down. Convert
these parameters to const references to avoid the performance
hit.

* Optimize packet receiver thread class

The packet receiver and processor thread class has many
methods (mostly packet handlers) that receive the packed data
by value. This causes a performance hit that is actually
worse than the one caused by the packet sender methods
because the packet is first handed to the processPacket
method which looks at the packet type stored in the header
and then delegates the actual handling to one of the
handlers. Both, processPacket and all the handlers get the
packet data by value, leading to at least two unnecessary
copies of the data (with malloc and all the slow bells and
whistles of bulky classes).

As there already is a few methods that use a const reference
parameter for the packet data, convert all this value passed
packets to const references.
2019-04-14 21:56:38 +01:00

1382 lines
41 KiB
C++

/*
Minetest
Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "connectionthreads.h"
#include "log.h"
#include "profiler.h"
#include "settings.h"
#include "network/networkpacket.h"
#include "util/serialize.h"
namespace con
{
/******************************************************************************/
/* defines used for debugging and profiling */
/******************************************************************************/
#ifdef NDEBUG
#define LOG(a) a
#define PROFILE(a)
#undef DEBUG_CONNECTION_KBPS
#else
/* this mutex is used to achieve log message consistency */
std::mutex log_conthread_mutex;
#define LOG(a) \
{ \
MutexAutoLock loglock(log_conthread_mutex); \
a; \
}
#define PROFILE(a) a
//#define DEBUG_CONNECTION_KBPS
#undef DEBUG_CONNECTION_KBPS
#endif
/* maximum number of retries for reliable packets */
#define MAX_RELIABLE_RETRY 5
#define WINDOW_SIZE 5
static session_t readPeerId(u8 *packetdata)
{
return readU16(&packetdata[4]);
}
static u8 readChannel(u8 *packetdata)
{
return readU8(&packetdata[6]);
}
/******************************************************************************/
/* Connection Threads */
/******************************************************************************/
ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
float timeout) :
Thread("ConnectionSend"),
m_max_packet_size(max_packet_size),
m_timeout(timeout),
m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
{
}
void *ConnectionSendThread::run()
{
assert(m_connection);
LOG(dout_con << m_connection->getDesc()
<< "ConnectionSend thread started" << std::endl);
u64 curtime = porting::getTimeMs();
u64 lasttime = curtime;
PROFILE(std::stringstream ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
/* if stop is requested don't stop immediately but try to send all */
/* packets first */
while (!stopRequested() || packetsQueued()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
/* wait for trigger or timeout */
m_send_sleep_semaphore.wait(50);
/* remove all triggers */
while (m_send_sleep_semaphore.wait(0)) {
}
lasttime = curtime;
curtime = porting::getTimeMs();
float dtime = CALC_DTIME(lasttime, curtime);
/* first do all the reliable stuff */
runTimeouts(dtime);
/* translate commands to packets */
ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
while (c.type != CONNCMD_NONE) {
if (c.reliable)
processReliableCommand(c);
else
processNonReliableCommand(c);
c = m_connection->m_command_queue.pop_frontNoEx(0);
}
/* send non reliable packets */
sendPackets(dtime);
END_DEBUG_EXCEPTION_HANDLER
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
return NULL;
}
void ConnectionSendThread::Trigger()
{
m_send_sleep_semaphore.post();
}
bool ConnectionSendThread::packetsQueued()
{
std::list<session_t> peerIds = m_connection->getPeerIDs();
if (!m_outgoing_queue.empty() && !peerIds.empty())
return true;
for (session_t peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
if (!peer)
continue;
if (dynamic_cast<UDPPeer *>(&peer) == 0)
continue;
for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
if (!channel.queued_commands.empty()) {
return true;
}
}
}
return false;
}
void ConnectionSendThread::runTimeouts(float dtime)
{
std::list<session_t> timeouted_peers;
std::list<session_t> peerIds = m_connection->getPeerIDs();
for (session_t &peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
if (!peer)
continue;
UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
if (!udpPeer)
continue;
PROFILE(std::stringstream peerIdentifier);
PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
<< ";" << peerId << ";RELIABLE]");
PROFILE(ScopeProfiler
peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
/*
Check peer timeout
*/
if (peer->isTimedOut(m_timeout)) {
infostream << m_connection->getDesc()
<< "RunTimeouts(): Peer " << peer->id
<< " has timed out."
<< " (source=peer->timeout_counter)"
<< std::endl;
// Add peer to the list
timeouted_peers.push_back(peer->id);
// Don't bother going through the buffers of this one
continue;
}
float resend_timeout = udpPeer->getResendTimeout();
bool retry_count_exceeded = false;
for (Channel &channel : udpPeer->channels) {
std::list<BufferedPacket> timed_outs;
// Remove timed out incomplete unreliable split packets
channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
// Increment reliable packet times
channel.outgoing_reliables_sent.incrementTimeouts(dtime);
unsigned int numpeers = m_connection->m_peers.size();
if (numpeers == 0)
return;
// Re-send timed out outgoing reliables
timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
(m_max_data_packets_per_iteration / numpeers));
channel.UpdatePacketLossCounter(timed_outs.size());
g_profiler->graphAdd("packets_lost", timed_outs.size());
m_iteration_packets_avaialble -= timed_outs.size();
for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
k != timed_outs.end(); ++k) {
session_t peer_id = readPeerId(*(k->data));
u8 channelnum = readChannel(*(k->data));
u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
channel.UpdateBytesLost(k->data.getSize());
k->resend_count++;
if (k->resend_count > MAX_RELIABLE_RETRY) {
retry_count_exceeded = true;
timeouted_peers.push_back(peer->id);
/* no need to check additional packets if a single one did timeout*/
break;
}
LOG(derr_con << m_connection->getDesc()
<< "RE-SENDING timed-out RELIABLE to "
<< k->address.serializeString()
<< "(t/o=" << resend_timeout << "): "
<< "from_peer_id=" << peer_id
<< ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum
<< std::endl);
rawSend(*k);
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
}
if (retry_count_exceeded) {
break; /* no need to check other channels if we already did timeout */
}
channel.UpdateTimers(dtime);
}
/* skip to next peer if we did timeout */
if (retry_count_exceeded)
continue;
/* send ping if necessary */
if (udpPeer->Ping(dtime, data)) {
LOG(dout_con << m_connection->getDesc()
<< "Sending ping for peer_id: " << udpPeer->id << std::endl);
/* this may fail if there ain't a sequence number left */
if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
//retrigger with reduced ping interval
udpPeer->Ping(4.0, data);
}
}
udpPeer->RunCommandQueues(m_max_packet_size,
m_max_commands_per_iteration,
m_max_packets_requeued);
}
// Remove timed out peers
for (u16 timeouted_peer : timeouted_peers) {
LOG(derr_con << m_connection->getDesc()
<< "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
m_connection->deletePeer(timeouted_peer, true);
}
}
void ConnectionSendThread::rawSend(const BufferedPacket &packet)
{
try {
m_connection->m_udpSocket.Send(packet.address, *packet.data,
packet.data.getSize());
LOG(dout_con << m_connection->getDesc()
<< " rawSend: " << packet.data.getSize()
<< " bytes sent" << std::endl);
} catch (SendFailedException &e) {
LOG(derr_con << m_connection->getDesc()
<< "Connection::rawSend(): SendFailedException: "
<< packet.address.serializeString() << std::endl);
}
}
void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
{
try {
p.absolute_send_time = porting::getTimeMs();
// Buffer the packet
channel->outgoing_reliables_sent.insert(p,
(channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
% (MAX_RELIABLE_WINDOW_SIZE + 1));
}
catch (AlreadyExistsException &e) {
LOG(derr_con << m_connection->getDesc()
<< "WARNING: Going to send a reliable packet"
<< " in outgoing buffer" << std::endl);
}
// Send the packet
rawSend(p);
}
bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
const SharedBuffer<u8> &data, bool reliable)
{
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " INFO: dropped packet for non existent peer_id: "
<< peer_id << std::endl);
FATAL_ERROR_IF(!reliable,
"Trying to send raw packet reliable but no peer found!");
return false;
}
Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
if (reliable) {
bool have_sequence_number_for_raw_packet = true;
u16 seqnum =
channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
if (!have_sequence_number_for_raw_packet)
return false;
SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
Address peer_address;
peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
// Add base headers and make a packet
BufferedPacket p = con::makePacket(peer_address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// first check if our send window is already maxed out
if (channel->outgoing_reliables_sent.size()
< channel->getWindowSize()) {
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a reliable packet to peer_id " << peer_id
<< " channel: " << (u32)channelnum
<< " seqnum: " << seqnum << std::endl);
sendAsPacketReliable(p, channel);
return true;
}
LOG(dout_con << m_connection->getDesc()
<< " INFO: queueing reliable packet for peer_id: " << peer_id
<< " channel: " << (u32)channelnum
<< " seqnum: " << seqnum << std::endl);
channel->queued_reliables.push(p);
return false;
}
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// Add base headers and make a packet
BufferedPacket p = con::makePacket(peer_address, data,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// Send the packet
rawSend(p);
return true;
}
LOG(dout_con << m_connection->getDesc()
<< " INFO: dropped unreliable packet for peer_id: " << peer_id
<< " because of (yet) missing udp address" << std::endl);
return false;
}
void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
{
assert(c.reliable); // Pre-condition
switch (c.type) {
case CONNCMD_NONE:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_NONE" << std::endl);
return;
case CONNCMD_SEND:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_SEND" << std::endl);
sendReliable(c);
return;
case CONNCMD_SEND_TO_ALL:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
sendToAllReliable(c);
return;
case CONCMD_CREATE_PEER:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
/* put to queue if we couldn't send it immediately */
sendReliable(c);
}
return;
case CONNCMD_SERVE:
case CONNCMD_CONNECT:
case CONNCMD_DISCONNECT:
case CONCMD_ACK:
FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
default:
LOG(dout_con << m_connection->getDesc()
<< " Invalid reliable command type: " << c.type << std::endl);
}
}
void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
{
assert(!c.reliable); // Pre-condition
switch (c.type) {
case CONNCMD_NONE:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_NONE" << std::endl);
return;
case CONNCMD_SERVE:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_SERVE port="
<< c.address.serializeString() << std::endl);
serve(c.address);
return;
case CONNCMD_CONNECT:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_CONNECT" << std::endl);
connect(c.address);
return;
case CONNCMD_DISCONNECT:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_DISCONNECT" << std::endl);
disconnect();
return;
case CONNCMD_DISCONNECT_PEER:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
disconnect_peer(c.peer_id);
return;
case CONNCMD_SEND:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_SEND" << std::endl);
send(c.peer_id, c.channelnum, c.data);
return;
case CONNCMD_SEND_TO_ALL:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
sendToAll(c.channelnum, c.data);
return;
case CONCMD_ACK:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONCMD_ACK" << std::endl);
sendAsPacket(c.peer_id, c.channelnum, c.data, true);
return;
case CONCMD_CREATE_PEER:
FATAL_ERROR("Got command that should be reliable as unreliable command");
default:
LOG(dout_con << m_connection->getDesc()
<< " Invalid command type: " << c.type << std::endl);
}
}
void ConnectionSendThread::serve(Address bind_address)
{
LOG(dout_con << m_connection->getDesc()
<< "UDP serving at port " << bind_address.serializeString() << std::endl);
try {
m_connection->m_udpSocket.Bind(bind_address);
m_connection->SetPeerID(PEER_ID_SERVER);
}
catch (SocketException &e) {
// Create event
ConnectionEvent ce;
ce.bindFailed();
m_connection->putEvent(ce);
}
}
void ConnectionSendThread::connect(Address address)
{
LOG(dout_con << m_connection->getDesc() << " connecting to "
<< address.serializeString()
<< ":" << address.getPort() << std::endl);
UDPPeer *peer = m_connection->createServerPeer(address);
// Create event
ConnectionEvent e;
e.peerAdded(peer->id, peer->address);
m_connection->putEvent(e);
Address bind_addr;
if (address.isIPv6())
bind_addr.setAddress((IPv6AddressBytes *) NULL);
else
bind_addr.setAddress(0, 0, 0, 0);
m_connection->m_udpSocket.Bind(bind_addr);
// Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
m_connection->SetPeerID(PEER_ID_INEXISTENT);
NetworkPacket pkt(0, 0);
m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
}
void ConnectionSendThread::disconnect()
{
LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
// Create and send DISCO packet
SharedBuffer<u8> data(2);
writeU8(&data[0], PACKET_TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_DISCO);
// Send to all
std::list<session_t> peerids = m_connection->getPeerIDs();
for (session_t peerid : peerids) {
sendAsPacket(peerid, 0, data, false);
}
}
void ConnectionSendThread::disconnect_peer(session_t peer_id)
{
LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
// Create and send DISCO packet
SharedBuffer<u8> data(2);
writeU8(&data[0], PACKET_TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_DISCO);
sendAsPacket(peer_id, 0, data, false);
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer)
return;
if (dynamic_cast<UDPPeer *>(&peer) == 0) {
return;
}
dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
}
void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
const SharedBuffer<u8> &data)
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
<< ">>>NOT<<< found on sending packet"
<< ", channel " << (channelnum % 0xFF)
<< ", size: " << data.getSize() << std::endl);
return;
}
LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
<< ", channel " << (channelnum % 0xFF)
<< ", size: " << data.getSize() << std::endl);
u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
std::list<SharedBuffer<u8>> originals;
makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
for (const SharedBuffer<u8> &original : originals) {
sendAsPacket(peer_id, channelnum, original);
}
}
void ConnectionSendThread::sendReliable(ConnectionCommand &c)
{
PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
if (!peer)
return;
peer->PutReliableSendCommand(c, m_max_packet_size);
}
void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
{
std::list<session_t> peerids = m_connection->getPeerIDs();
for (session_t peerid : peerids) {
send(peerid, channelnum, data);
}
}
void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
{
std::list<session_t> peerids = m_connection->getPeerIDs();
for (session_t peerid : peerids) {
PeerHelper peer = m_connection->getPeerNoEx(peerid);
if (!peer)
continue;
peer->PutReliableSendCommand(c, m_max_packet_size);
}
}
void ConnectionSendThread::sendPackets(float dtime)
{
std::list<session_t> peerIds = m_connection->getPeerIDs();
std::list<session_t> pendingDisconnect;
std::map<session_t, bool> pending_unreliable;
for (session_t peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
//peer may have been removed
if (!peer) {
LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
<< peerId
<< std::endl);
continue;
}
peer->m_increment_packets_remaining =
m_iteration_packets_avaialble / m_connection->m_peers.size();
UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
if (!udpPeer) {
continue;
}
if (udpPeer->m_pending_disconnect) {
pendingDisconnect.push_back(peerId);
}
PROFILE(std::stringstream
peerIdentifier);
PROFILE(
peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
<< ";RELIABLE]");
PROFILE(ScopeProfiler
peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
LOG(dout_con << m_connection->getDesc()
<< " Handle per peer queues: peer_id=" << peerId
<< " packet quota: " << peer->m_increment_packets_remaining << std::endl);
// first send queued reliable packets for all peers (if possible)
for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
Channel &channel = udpPeer->channels[i];
u16 next_to_ack = 0;
channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
u16 next_to_receive = 0;
channel.incoming_reliables.getFirstSeqnum(next_to_receive);
LOG(dout_con << m_connection->getDesc() << "\t channel: "
<< i << ", peer quota:"
<< peer->m_increment_packets_remaining
<< std::endl
<< "\t\t\treliables on wire: "
<< channel.outgoing_reliables_sent.size()
<< ", waiting for ack for " << next_to_ack
<< std::endl
<< "\t\t\tincoming_reliables: "
<< channel.incoming_reliables.size()
<< ", next reliable packet: "
<< channel.readNextIncomingSeqNum()
<< ", next queued: " << next_to_receive
<< std::endl
<< "\t\t\treliables queued : "
<< channel.queued_reliables.size()
<< std::endl
<< "\t\t\tqueued commands : "
<< channel.queued_commands.size()
<< std::endl);
while (!channel.queued_reliables.empty() &&
channel.outgoing_reliables_sent.size()
< channel.getWindowSize() &&
peer->m_increment_packets_remaining > 0) {
BufferedPacket p = channel.queued_reliables.front();
channel.queued_reliables.pop();
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a queued reliable packet "
<< " channel: " << i
<< ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
<< std::endl);
sendAsPacketReliable(p, &channel);
peer->m_increment_packets_remaining--;
}
}
}
if (!m_outgoing_queue.empty()) {
LOG(dout_con << m_connection->getDesc()
<< " Handle non reliable queue ("
<< m_outgoing_queue.size() << " pkts)" << std::endl);
}
unsigned int initial_queuesize = m_outgoing_queue.size();
/* send non reliable packets*/
for (unsigned int i = 0; i < initial_queuesize; i++) {
OutgoingPacket packet = m_outgoing_queue.front();
m_outgoing_queue.pop();
if (packet.reliable)
continue;
PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " Outgoing queue: peer_id=" << packet.peer_id
<< ">>>NOT<<< found on sending packet"
<< ", channel " << (packet.channelnum % 0xFF)
<< ", size: " << packet.data.getSize() << std::endl);
continue;
}
/* send acks immediately */
if (packet.ack) {
rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable);
peer->m_increment_packets_remaining =
MYMIN(0, peer->m_increment_packets_remaining--);
} else if (
(peer->m_increment_packets_remaining > 0) ||
(stopRequested())) {
rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable);
peer->m_increment_packets_remaining--;
} else {
m_outgoing_queue.push(packet);
pending_unreliable[packet.peer_id] = true;
}
}
for (session_t peerId : pendingDisconnect) {
if (!pending_unreliable[peerId]) {
m_connection->deletePeer(peerId, false);
}
}
}
void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
const SharedBuffer<u8> &data, bool ack)
{
OutgoingPacket packet(peer_id, channelnum, data, false, ack);
m_outgoing_queue.push(packet);
}
ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
Thread("ConnectionReceive")
{
}
void *ConnectionReceiveThread::run()
{
assert(m_connection);
LOG(dout_con << m_connection->getDesc()
<< "ConnectionReceive thread started" << std::endl);
PROFILE(std::stringstream
ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
#ifdef DEBUG_CONNECTION_KBPS
u64 curtime = porting::getTimeMs();
u64 lasttime = curtime;
float debug_print_timer = 0.0;
#endif
while (!stopRequested()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler
sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
#ifdef DEBUG_CONNECTION_KBPS
lasttime = curtime;
curtime = porting::getTimeMs();
float dtime = CALC_DTIME(lasttime,curtime);
#endif
/* receive packets */
receive();
#ifdef DEBUG_CONNECTION_KBPS
debug_print_timer += dtime;
if (debug_print_timer > 20.0) {
debug_print_timer -= 20.0;
std::list<session_t> peerids = m_connection->getPeerIDs();
for (std::list<session_t>::iterator i = peerids.begin();
i != peerids.end();
i++)
{
PeerHelper peer = m_connection->getPeerNoEx(*i);
if (!peer)
continue;
float peer_current = 0.0;
float peer_loss = 0.0;
float avg_rate = 0.0;
float avg_loss = 0.0;
for(u16 j=0; j<CHANNEL_COUNT; j++)
{
peer_current +=peer->channels[j].getCurrentDownloadRateKB();
peer_loss += peer->channels[j].getCurrentLossRateKB();
avg_rate += peer->channels[j].getAvgDownloadRateKB();
avg_loss += peer->channels[j].getAvgLossRateKB();
}
std::stringstream output;
output << std::fixed << std::setprecision(1);
output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
output << std::setfill(' ');
for(u16 j=0; j<CHANNEL_COUNT; j++)
{
output << "\tcha " << j << ":"
<< " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
<< " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
<< " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
<< " /"
<< " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
<< " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
<< " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
<< " / WS: " << peer->channels[j].getWindowSize()
<< std::endl;
}
fprintf(stderr,"%s\n",output.str().c_str());
}
}
#endif
END_DEBUG_EXCEPTION_HANDLER
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
return NULL;
}
// Receive packets from the network and buffers and create ConnectionEvents
void ConnectionReceiveThread::receive()
{
// use IPv6 minimum allowed MTU as receive buffer size as this is
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled
// infrastructure
unsigned int packet_maxsize = 1500;
SharedBuffer<u8> packetdata(packet_maxsize);
bool packet_queued = true;
unsigned int loop_count = 0;
/* first of all read packets from socket */
/* check for incoming data available */
while ((loop_count < 10) &&
(m_connection->m_udpSocket.WaitData(50))) {
loop_count++;
try {
if (packet_queued) {
bool data_left = true;
session_t peer_id;
SharedBuffer<u8> resultdata;
while (data_left) {
try {
data_left = getFromBuffers(peer_id, resultdata);
if (data_left) {
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
}
catch (ProcessedSilentlyException &e) {
/* try reading again */
}
}
packet_queued = false;
}
Address sender;
s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
packet_maxsize);
if ((received_size < BASE_HEADER_SIZE) ||
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
<< ", protocol: "
<< ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
<< std::endl);
continue;
}
session_t peer_id = readPeerId(*packetdata);
u8 channelnum = readChannel(*packetdata);
if (channelnum > CHANNEL_COUNT - 1) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid channel " << (u32)channelnum << std::endl);
throw InvalidIncomingDataException("Channel doesn't exist");
}
/* Try to identify peer by sender address (may happen on join) */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->lookupPeer(sender);
// We do not have to remind the peer of its
// peer id as the CONTROLTYPE_SET_PEER_ID
// command was sent reliably.
}
/* The peer was not found in our lists. Add it. */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
}
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " got packet from unknown peer_id: "
<< peer_id << " Ignoring." << std::endl);
continue;
}
// Validate peer address
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
if (peer_address != sender) {
LOG(derr_con << m_connection->getDesc()
<< m_connection->getDesc()
<< " Peer " << peer_id << " sending from different address."
" Ignoring." << std::endl);
continue;
}
} else {
bool invalid_address = true;
if (invalid_address) {
LOG(derr_con << m_connection->getDesc()
<< m_connection->getDesc()
<< " Peer " << peer_id << " unknown."
" Ignoring." << std::endl);
continue;
}
}
peer->ResetTimeout();
Channel *channel = 0;
if (dynamic_cast<UDPPeer *>(&peer) != 0) {
channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
}
if (channel != 0) {
channel->UpdateBytesReceived(received_size);
}
// Throw the received packet to channel->processPacket()
// Make a new SharedBuffer from the data without the base headers
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
strippeddata.getSize());
try {
// Process it (the result is some data with no headers made by us)
SharedBuffer<u8> resultdata = processPacket
(channel, strippeddata, peer_id, channelnum, false);
LOG(dout_con << m_connection->getDesc()
<< " ProcessPacket from peer_id: " << peer_id
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
catch (ProcessedSilentlyException &e) {
}
catch (ProcessedQueued &e) {
packet_queued = true;
}
}
catch (InvalidIncomingDataException &e) {
}
catch (ProcessedSilentlyException &e) {
}
}
}
bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
{
std::list<session_t> peerids = m_connection->getPeerIDs();
for (session_t peerid : peerids) {
PeerHelper peer = m_connection->getPeerNoEx(peerid);
if (!peer)
continue;
if (dynamic_cast<UDPPeer *>(&peer) == 0)
continue;
for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
if (checkIncomingBuffers(&channel, peer_id, dst)) {
return true;
}
}
}
return false;
}
bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
session_t &peer_id, SharedBuffer<u8> &dst)
{
u16 firstseqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
if (firstseqnum == channel->readNextIncomingSeqNum()) {
BufferedPacket p = channel->incoming_reliables.popFirst();
peer_id = readPeerId(*p.data);
u8 channelnum = readChannel(*p.data);
u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
LOG(dout_con << m_connection->getDesc()
<< "UNBUFFERING TYPE_RELIABLE"
<< " seqnum=" << seqnum
<< " peer_id=" << peer_id
<< " channel=" << ((int) channelnum & 0xff)
<< std::endl);
channel->incNextIncomingSeqNum();
u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
// Get out the inside packet and re-process it
SharedBuffer<u8> payload(p.data.getSize() - headers_size);
memcpy(*payload, &p.data[headers_size], payload.getSize());
dst = processPacket(channel, payload, peer_id, channelnum, true);
return true;
}
}
return false;
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
{
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
errorstream << "Peer not found (possible timeout)" << std::endl;
throw ProcessedSilentlyException("Peer not found (possible timeout)");
}
if (packetdata.getSize() < 1)
throw InvalidIncomingDataException("packetdata.getSize() < 1");
u8 type = readU8(&(packetdata[0]));
if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
std::string errmsg = "Invalid peer_id=" + itos(peer_id);
errorstream << errmsg << std::endl;
throw InvalidIncomingDataException(errmsg.c_str());
}
if (type >= PACKET_TYPE_MAX) {
derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
<< std::endl;
throw InvalidIncomingDataException("Invalid packet type");
}
const PacketTypeHandler &pHandle = packetTypeRouter[type];
return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
}
const ConnectionReceiveThread::PacketTypeHandler
ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
{&ConnectionReceiveThread::handlePacketType_Control},
{&ConnectionReceiveThread::handlePacketType_Original},
{&ConnectionReceiveThread::handlePacketType_Split},
{&ConnectionReceiveThread::handlePacketType_Reliable},
};
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
{
if (packetdata.getSize() < 2)
throw InvalidIncomingDataException("packetdata.getSize() < 2");
u8 controltype = readU8(&(packetdata[1]));
if (controltype == CONTROLTYPE_ACK) {
assert(channel != NULL);
if (packetdata.getSize() < 4) {
throw InvalidIncomingDataException(
"packetdata.getSize() < 4 (ACK header size)");
}
u16 seqnum = readU16(&packetdata[2]);
LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
<< ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
<< seqnum << " ]" << std::endl);
try {
BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
// only calculate rtt from straight sent packets
if (p.resend_count == 0) {
// Get round trip time
u64 current_time = porting::getTimeMs();
// a overflow is quite unlikely but as it'd result in major
// rtt miscalculation we handle it here
if (current_time > p.absolute_send_time) {
float rtt = (current_time - p.absolute_send_time) / 1000.0;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
} else if (p.totaltime > 0) {
float rtt = p.totaltime;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
}
}
// put bytes for max bandwidth calculation
channel->UpdateBytesSent(p.data.getSize(), 1);
if (channel->outgoing_reliables_sent.size() == 0)
m_connection->TriggerSend();
} catch (NotFoundException &e) {
LOG(derr_con << m_connection->getDesc()
<< "WARNING: ACKed packet not in outgoing queue" << std::endl);
channel->UpdatePacketTooLateCounter();
}
throw ProcessedSilentlyException("Got an ACK");
} else if (controltype == CONTROLTYPE_SET_PEER_ID) {
// Got a packet to set our peer id
if (packetdata.getSize() < 4)
throw InvalidIncomingDataException
("packetdata.getSize() < 4 (SET_PEER_ID header size)");
session_t peer_id_new = readU16(&packetdata[2]);
LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
<< "... " << std::endl);
if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
LOG(derr_con << m_connection->getDesc()
<< "WARNING: Not changing existing peer id." << std::endl);
} else {
LOG(dout_con << m_connection->getDesc() << "changing own peer id"
<< std::endl);
m_connection->SetPeerID(peer_id_new);
}
throw ProcessedSilentlyException("Got a SET_PEER_ID");
} else if (controltype == CONTROLTYPE_PING) {
// Just ignore it, the incoming data already reset
// the timeout counter
LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
throw ProcessedSilentlyException("Got a PING");
} else if (controltype == CONTROLTYPE_DISCO) {
// Just ignore it, the incoming data already reset
// the timeout counter
LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
<< peer->id << std::endl);
if (!m_connection->deletePeer(peer->id, false)) {
derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
}
throw ProcessedSilentlyException("Got a DISCO");
} else {
LOG(derr_con << m_connection->getDesc()
<< "INVALID TYPE_CONTROL: invalid controltype="
<< ((int) controltype & 0xff) << std::endl);
throw InvalidIncomingDataException("Invalid control type");
}
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
{
if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
throw InvalidIncomingDataException
("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
<< std::endl);
// Get the inside packet out and return it
SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
return payload;
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
{
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// We have to create a packet again for buffering
// This isn't actually too bad an idea.
BufferedPacket packet = makePacket(peer_address,
packetdata,
m_connection->GetProtocolID(),
peer->id,
channelnum);
// Buffer the packet
SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
if (data.getSize() != 0) {
LOG(dout_con << m_connection->getDesc()
<< "RETURNING TYPE_SPLIT: Constructed full data, "
<< "size=" << data.getSize() << std::endl);
return data;
}
LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
throw ProcessedSilentlyException("Buffered a split packet chunk");
}
// We should never get here.
FATAL_ERROR("Invalid execution point");
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
{
assert(channel != NULL);
// Recursive reliable packets not allowed
if (reliable)
throw InvalidIncomingDataException("Found nested reliable packets");
if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
u16 seqnum = readU16(&packetdata[1]);
bool is_future_packet = false;
bool is_old_packet = false;
/* packet is within our receive window send ack */
if (seqnum_in_window(seqnum,
channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
m_connection->sendAck(peer->id, channelnum, seqnum);
} else {
is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
/* packet is not within receive window, don't send ack. *
* if this was a valid packet it's gonna be retransmitted */
if (is_future_packet)
throw ProcessedSilentlyException(
"Received packet newer then expected, not sending ack");
/* seems like our ack was lost, send another one for a old packet */
if (is_old_packet) {
LOG(dout_con << m_connection->getDesc()
<< "RE-SENDING ACK: peer_id: " << peer->id
<< ", channel: " << (channelnum & 0xFF)
<< ", seqnum: " << seqnum << std::endl;)
m_connection->sendAck(peer->id, channelnum, seqnum);
// we already have this packet so this one was on wire at least
// the current timeout
// we don't know how long this packet was on wire don't do silly guessing
// dynamic_cast<UDPPeer*>(&peer)->
// reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
throw ProcessedSilentlyException("Retransmitting ack for old packet");
}
}
if (seqnum != channel->readNextIncomingSeqNum()) {
Address peer_address;
// this is a reliable packet so we have a udp address for sure
peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
// This one comes later, buffer it.
// Actually we have to make a packet to buffer one.
// Well, we have all the ingredients, so just do it.
BufferedPacket packet = con::makePacket(
peer_address,
packetdata,
m_connection->GetProtocolID(),
peer->id,
channelnum);
try {
channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
LOG(dout_con << m_connection->getDesc()
<< "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
<< ", channel: " << (channelnum & 0xFF)
<< ", seqnum: " << seqnum << std::endl;)
throw ProcessedQueued("Buffered future reliable packet");
} catch (AlreadyExistsException &e) {
} catch (IncomingDataCorruption &e) {
ConnectionCommand discon;
discon.disconnect_peer(peer->id);
m_connection->putCommand(discon);
LOG(derr_con << m_connection->getDesc()
<< "INVALID, TYPE_RELIABLE peer_id: " << peer->id
<< ", channel: " << (channelnum & 0xFF)
<< ", seqnum: " << seqnum
<< "DROPPING CLIENT!" << std::endl;)
}
}
/* we got a packet to process right now */
LOG(dout_con << m_connection->getDesc()
<< "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
<< ", channel: " << (channelnum & 0xFF)
<< ", seqnum: " << seqnum << std::endl;)
/* check for resend case */
u16 queued_seqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
if (queued_seqnum == seqnum) {
BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
/** TODO find a way to verify the new against the old packet */
}
}
channel->incNextIncomingSeqNum();
// Get out the inside packet and re-process it
SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
return processPacket(channel, payload, peer->id, channelnum, true);
}
}