minetest/src/network/connectionthreads.cpp
Loic Blot eabf04bd34
Network part requires SharedBuffers to be pass as value
This can trigger unreproductible crashes due to concurrency problem on SharedBuffers

This fixes #6354
2017-09-03 19:01:53 +02:00

1405 lines
42 KiB
C++

/*
Minetest
Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "connectionthreads.h"
#include "log.h"
#include "profiler.h"
#include "settings.h"
#include "network/networkpacket.h"
#include "util/serialize.h"
namespace con
{
/******************************************************************************/
/* defines used for debugging and profiling */
/******************************************************************************/
#ifdef NDEBUG
#define LOG(a) a
#define PROFILE(a)
#undef DEBUG_CONNECTION_KBPS
#else
/* this mutex is used to achieve log message consistency */
std::mutex log_conthread_mutex;
#define LOG(a) \
{ \
MutexAutoLock loglock(log_conthread_mutex); \
a; \
}
#define PROFILE(a) a
//#define DEBUG_CONNECTION_KBPS
#undef DEBUG_CONNECTION_KBPS
#endif
/* maximum number of retries for reliable packets */
#define MAX_RELIABLE_RETRY 5
#define WINDOW_SIZE 5
static u16 readPeerId(u8 *packetdata)
{
return readU16(&packetdata[4]);
}
static u8 readChannel(u8 *packetdata)
{
return readU8(&packetdata[6]);
}
/******************************************************************************/
/* Connection Threads */
/******************************************************************************/
ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
float timeout) :
Thread("ConnectionSend"),
m_max_packet_size(max_packet_size),
m_timeout(timeout),
m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
{
}
void *ConnectionSendThread::run()
{
assert(m_connection);
LOG(dout_con << m_connection->getDesc()
<< "ConnectionSend thread started" << std::endl);
u64 curtime = porting::getTimeMs();
u64 lasttime = curtime;
PROFILE(std::stringstream ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
/* if stop is requested don't stop immediately but try to send all */
/* packets first */
while (!stopRequested() || packetsQueued()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
/* wait for trigger or timeout */
m_send_sleep_semaphore.wait(50);
/* remove all triggers */
while (m_send_sleep_semaphore.wait(0)) {
}
lasttime = curtime;
curtime = porting::getTimeMs();
float dtime = CALC_DTIME(lasttime, curtime);
/* first do all the reliable stuff */
runTimeouts(dtime);
/* translate commands to packets */
ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
while (c.type != CONNCMD_NONE) {
if (c.reliable)
processReliableCommand(c);
else
processNonReliableCommand(c);
c = m_connection->m_command_queue.pop_frontNoEx(0);
}
/* send non reliable packets */
sendPackets(dtime);
END_DEBUG_EXCEPTION_HANDLER
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
return NULL;
}
void ConnectionSendThread::Trigger()
{
m_send_sleep_semaphore.post();
}
bool ConnectionSendThread::packetsQueued()
{
std::list<u16> peerIds = m_connection->getPeerIDs();
if (!m_outgoing_queue.empty() && !peerIds.empty())
return true;
for (u16 peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
if (!peer)
continue;
if (dynamic_cast<UDPPeer *>(&peer) == 0)
continue;
for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
if (!channel.queued_commands.empty()) {
return true;
}
}
}
return false;
}
void ConnectionSendThread::runTimeouts(float dtime)
{
std::list<u16> timeouted_peers;
std::list<u16> peerIds = m_connection->getPeerIDs();
for (u16 &peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
if (!peer)
continue;
UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
if (!udpPeer)
continue;
PROFILE(std::stringstream peerIdentifier);
PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
<< ";" << peerId << ";RELIABLE]");
PROFILE(ScopeProfiler
peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
/*
Check peer timeout
*/
if (peer->isTimedOut(m_timeout)) {
infostream << m_connection->getDesc()
<< "RunTimeouts(): Peer " << peer->id
<< " has timed out."
<< " (source=peer->timeout_counter)"
<< std::endl;
// Add peer to the list
timeouted_peers.push_back(peer->id);
// Don't bother going through the buffers of this one
continue;
}
float resend_timeout = udpPeer->getResendTimeout();
bool retry_count_exceeded = false;
for (Channel &channel : udpPeer->channels) {
std::list<BufferedPacket> timed_outs;
if (udpPeer->getLegacyPeer())
channel.setWindowSize(WINDOW_SIZE);
// Remove timed out incomplete unreliable split packets
channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
// Increment reliable packet times
channel.outgoing_reliables_sent.incrementTimeouts(dtime);
unsigned int numpeers = m_connection->m_peers.size();
if (numpeers == 0)
return;
// Re-send timed out outgoing reliables
timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
(m_max_data_packets_per_iteration / numpeers));
channel.UpdatePacketLossCounter(timed_outs.size());
g_profiler->graphAdd("packets_lost", timed_outs.size());
m_iteration_packets_avaialble -= timed_outs.size();
for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
k != timed_outs.end(); ++k) {
u16 peer_id = readPeerId(*(k->data));
u8 channelnum = readChannel(*(k->data));
u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
channel.UpdateBytesLost(k->data.getSize());
k->resend_count++;
if (k->resend_count > MAX_RELIABLE_RETRY) {
retry_count_exceeded = true;
timeouted_peers.push_back(peer->id);
/* no need to check additional packets if a single one did timeout*/
break;
}
LOG(derr_con << m_connection->getDesc()
<< "RE-SENDING timed-out RELIABLE to "
<< k->address.serializeString()
<< "(t/o=" << resend_timeout << "): "
<< "from_peer_id=" << peer_id
<< ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum
<< std::endl);
rawSend(*k);
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
}
if (retry_count_exceeded) {
break; /* no need to check other channels if we already did timeout */
}
channel.UpdateTimers(dtime, udpPeer->getLegacyPeer());
}
/* skip to next peer if we did timeout */
if (retry_count_exceeded)
continue;
/* send ping if necessary */
if (udpPeer->Ping(dtime, data)) {
LOG(dout_con << m_connection->getDesc()
<< "Sending ping for peer_id: " << udpPeer->id << std::endl);
/* this may fail if there ain't a sequence number left */
if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
//retrigger with reduced ping interval
udpPeer->Ping(4.0, data);
}
}
udpPeer->RunCommandQueues(m_max_packet_size,
m_max_commands_per_iteration,
m_max_packets_requeued);
}
// Remove timed out peers
for (u16 timeouted_peer : timeouted_peers) {
LOG(derr_con << m_connection->getDesc()
<< "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
m_connection->deletePeer(timeouted_peer, true);
}
}
void ConnectionSendThread::rawSend(const BufferedPacket &packet)
{
try {
m_connection->m_udpSocket.Send(packet.address, *packet.data,
packet.data.getSize());
LOG(dout_con << m_connection->getDesc()
<< " rawSend: " << packet.data.getSize()
<< " bytes sent" << std::endl);
} catch (SendFailedException &e) {
LOG(derr_con << m_connection->getDesc()
<< "Connection::rawSend(): SendFailedException: "
<< packet.address.serializeString() << std::endl);
}
}
void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
{
try {
p.absolute_send_time = porting::getTimeMs();
// Buffer the packet
channel->outgoing_reliables_sent.insert(p,
(channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
% (MAX_RELIABLE_WINDOW_SIZE + 1));
}
catch (AlreadyExistsException &e) {
LOG(derr_con << m_connection->getDesc()
<< "WARNING: Going to send a reliable packet"
<< " in outgoing buffer" << std::endl);
}
// Send the packet
rawSend(p);
}
bool ConnectionSendThread::rawSendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable)
{
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " INFO: dropped packet for non existent peer_id: "
<< peer_id << std::endl);
FATAL_ERROR_IF(!reliable,
"Trying to send raw packet reliable but no peer found!");
return false;
}
Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
if (reliable) {
bool have_sequence_number_for_raw_packet = true;
u16 seqnum =
channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
if (!have_sequence_number_for_raw_packet)
return false;
SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
Address peer_address;
peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
// Add base headers and make a packet
BufferedPacket p = con::makePacket(peer_address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// first check if our send window is already maxed out
if (channel->outgoing_reliables_sent.size()
< channel->getWindowSize()) {
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a reliable packet to peer_id " << peer_id
<< " channel: " << channelnum
<< " seqnum: " << seqnum << std::endl);
sendAsPacketReliable(p, channel);
return true;
}
LOG(dout_con << m_connection->getDesc()
<< " INFO: queueing reliable packet for peer_id: " << peer_id
<< " channel: " << channelnum
<< " seqnum: " << seqnum << std::endl);
channel->queued_reliables.push(p);
return false;
}
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// Add base headers and make a packet
BufferedPacket p = con::makePacket(peer_address, data,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
channelnum);
// Send the packet
rawSend(p);
return true;
}
LOG(dout_con << m_connection->getDesc()
<< " INFO: dropped unreliable packet for peer_id: " << peer_id
<< " because of (yet) missing udp address" << std::endl);
return false;
}
void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
{
assert(c.reliable); // Pre-condition
switch (c.type) {
case CONNCMD_NONE:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_NONE" << std::endl);
return;
case CONNCMD_SEND:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_SEND" << std::endl);
sendReliable(c);
return;
case CONNCMD_SEND_TO_ALL:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
sendToAllReliable(c);
return;
case CONCMD_CREATE_PEER:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
/* put to queue if we couldn't send it immediately */
sendReliable(c);
}
return;
case CONCMD_DISABLE_LEGACY:
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl);
if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
/* put to queue if we couldn't send it immediately */
sendReliable(c);
}
return;
case CONNCMD_SERVE:
case CONNCMD_CONNECT:
case CONNCMD_DISCONNECT:
case CONCMD_ACK:
FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
default:
LOG(dout_con << m_connection->getDesc()
<< " Invalid reliable command type: " << c.type << std::endl);
}
}
void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
{
assert(!c.reliable); // Pre-condition
switch (c.type) {
case CONNCMD_NONE:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_NONE" << std::endl);
return;
case CONNCMD_SERVE:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_SERVE port="
<< c.address.serializeString() << std::endl);
serve(c.address);
return;
case CONNCMD_CONNECT:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_CONNECT" << std::endl);
connect(c.address);
return;
case CONNCMD_DISCONNECT:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_DISCONNECT" << std::endl);
disconnect();
return;
case CONNCMD_DISCONNECT_PEER:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
disconnect_peer(c.peer_id);
return;
case CONNCMD_SEND:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_SEND" << std::endl);
send(c.peer_id, c.channelnum, c.data);
return;
case CONNCMD_SEND_TO_ALL:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
sendToAll(c.channelnum, c.data);
return;
case CONCMD_ACK:
LOG(dout_con << m_connection->getDesc()
<< " UDP processing CONCMD_ACK" << std::endl);
sendAsPacket(c.peer_id, c.channelnum, c.data, true);
return;
case CONCMD_CREATE_PEER:
FATAL_ERROR("Got command that should be reliable as unreliable command");
default:
LOG(dout_con << m_connection->getDesc()
<< " Invalid command type: " << c.type << std::endl);
}
}
void ConnectionSendThread::serve(Address bind_address)
{
LOG(dout_con << m_connection->getDesc()
<< "UDP serving at port " << bind_address.serializeString() << std::endl);
try {
m_connection->m_udpSocket.Bind(bind_address);
m_connection->SetPeerID(PEER_ID_SERVER);
}
catch (SocketException &e) {
// Create event
ConnectionEvent ce;
ce.bindFailed();
m_connection->putEvent(ce);
}
}
void ConnectionSendThread::connect(Address address)
{
LOG(dout_con << m_connection->getDesc() << " connecting to "
<< address.serializeString()
<< ":" << address.getPort() << std::endl);
UDPPeer *peer = m_connection->createServerPeer(address);
// Create event
ConnectionEvent e;
e.peerAdded(peer->id, peer->address);
m_connection->putEvent(e);
Address bind_addr;
if (address.isIPv6())
bind_addr.setAddress((IPv6AddressBytes *) NULL);
else
bind_addr.setAddress(0, 0, 0, 0);
m_connection->m_udpSocket.Bind(bind_addr);
// Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
m_connection->SetPeerID(PEER_ID_INEXISTENT);
NetworkPacket pkt(0, 0);
m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
}
void ConnectionSendThread::disconnect()
{
LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
// Create and send DISCO packet
SharedBuffer<u8> data(2);
writeU8(&data[0], PACKET_TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_DISCO);
// Send to all
std::list<u16> peerids = m_connection->getPeerIDs();
for (u16 peerid : peerids) {
sendAsPacket(peerid, 0, data, false);
}
}
void ConnectionSendThread::disconnect_peer(u16 peer_id)
{
LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
// Create and send DISCO packet
SharedBuffer<u8> data(2);
writeU8(&data[0], PACKET_TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_DISCO);
sendAsPacket(peer_id, 0, data, false);
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer)
return;
if (dynamic_cast<UDPPeer *>(&peer) == 0) {
return;
}
dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
}
void ConnectionSendThread::send(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data)
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
<< ">>>NOT<<< found on sending packet"
<< ", channel " << (channelnum % 0xFF)
<< ", size: " << data.getSize() << std::endl);
return;
}
LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
<< ", channel " << (channelnum % 0xFF)
<< ", size: " << data.getSize() << std::endl);
u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
std::list<SharedBuffer<u8>> originals;
makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
for (const SharedBuffer<u8> &original : originals) {
sendAsPacket(peer_id, channelnum, original);
}
}
void ConnectionSendThread::sendReliable(ConnectionCommand &c)
{
PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
if (!peer)
return;
peer->PutReliableSendCommand(c, m_max_packet_size);
}
void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
{
std::list<u16> peerids = m_connection->getPeerIDs();
for (u16 peerid : peerids) {
send(peerid, channelnum, data);
}
}
void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
{
std::list<u16> peerids = m_connection->getPeerIDs();
for (u16 peerid : peerids) {
PeerHelper peer = m_connection->getPeerNoEx(peerid);
if (!peer)
continue;
peer->PutReliableSendCommand(c, m_max_packet_size);
}
}
void ConnectionSendThread::sendPackets(float dtime)
{
std::list<u16> peerIds = m_connection->getPeerIDs();
std::list<u16> pendingDisconnect;
std::map<u16, bool> pending_unreliable;
for (u16 peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
//peer may have been removed
if (!peer) {
LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
<< peerId
<< std::endl);
continue;
}
peer->m_increment_packets_remaining =
m_iteration_packets_avaialble / m_connection->m_peers.size();
UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
if (!udpPeer) {
continue;
}
if (udpPeer->m_pending_disconnect) {
pendingDisconnect.push_back(peerId);
}
PROFILE(std::stringstream
peerIdentifier);
PROFILE(
peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
<< ";RELIABLE]");
PROFILE(ScopeProfiler
peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
LOG(dout_con << m_connection->getDesc()
<< " Handle per peer queues: peer_id=" << peerId
<< " packet quota: " << peer->m_increment_packets_remaining << std::endl);
// first send queued reliable packets for all peers (if possible)
for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
Channel &channel = udpPeer->channels[i];
u16 next_to_ack = 0;
channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
u16 next_to_receive = 0;
channel.incoming_reliables.getFirstSeqnum(next_to_receive);
LOG(dout_con << m_connection->getDesc() << "\t channel: "
<< i << ", peer quota:"
<< peer->m_increment_packets_remaining
<< std::endl
<< "\t\t\treliables on wire: "
<< channel.outgoing_reliables_sent.size()
<< ", waiting for ack for " << next_to_ack
<< std::endl
<< "\t\t\tincoming_reliables: "
<< channel.incoming_reliables.size()
<< ", next reliable packet: "
<< channel.readNextIncomingSeqNum()
<< ", next queued: " << next_to_receive
<< std::endl
<< "\t\t\treliables queued : "
<< channel.queued_reliables.size()
<< std::endl
<< "\t\t\tqueued commands : "
<< channel.queued_commands.size()
<< std::endl);
while (!channel.queued_reliables.empty() &&
channel.outgoing_reliables_sent.size()
< channel.getWindowSize() &&
peer->m_increment_packets_remaining > 0) {
BufferedPacket p = channel.queued_reliables.front();
channel.queued_reliables.pop();
LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a queued reliable packet "
<< " channel: " << i
<< ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
<< std::endl);
sendAsPacketReliable(p, &channel);
peer->m_increment_packets_remaining--;
}
}
}
if (!m_outgoing_queue.empty()) {
LOG(dout_con << m_connection->getDesc()
<< " Handle non reliable queue ("
<< m_outgoing_queue.size() << " pkts)" << std::endl);
}
unsigned int initial_queuesize = m_outgoing_queue.size();
/* send non reliable packets*/
for (unsigned int i = 0; i < initial_queuesize; i++) {
OutgoingPacket packet = m_outgoing_queue.front();
m_outgoing_queue.pop();
if (packet.reliable)
continue;
PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " Outgoing queue: peer_id=" << packet.peer_id
<< ">>>NOT<<< found on sending packet"
<< ", channel " << (packet.channelnum % 0xFF)
<< ", size: " << packet.data.getSize() << std::endl);
continue;
}
/* send acks immediately */
if (packet.ack) {
rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable);
peer->m_increment_packets_remaining =
MYMIN(0, peer->m_increment_packets_remaining--);
} else if (
(peer->m_increment_packets_remaining > 0) ||
(stopRequested())) {
rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable);
peer->m_increment_packets_remaining--;
} else {
m_outgoing_queue.push(packet);
pending_unreliable[packet.peer_id] = true;
}
}
for (u16 peerId : pendingDisconnect) {
if (!pending_unreliable[peerId]) {
m_connection->deletePeer(peerId, false);
}
}
}
void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool ack)
{
OutgoingPacket packet(peer_id, channelnum, data, false, ack);
m_outgoing_queue.push(packet);
}
ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
Thread("ConnectionReceive")
{
}
void *ConnectionReceiveThread::run()
{
assert(m_connection);
LOG(dout_con << m_connection->getDesc()
<< "ConnectionReceive thread started" << std::endl);
PROFILE(std::stringstream
ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
#ifdef DEBUG_CONNECTION_KBPS
u64 curtime = porting::getTimeMs();
u64 lasttime = curtime;
float debug_print_timer = 0.0;
#endif
while (!stopRequested()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler
sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
#ifdef DEBUG_CONNECTION_KBPS
lasttime = curtime;
curtime = porting::getTimeMs();
float dtime = CALC_DTIME(lasttime,curtime);
#endif
/* receive packets */
receive();
#ifdef DEBUG_CONNECTION_KBPS
debug_print_timer += dtime;
if (debug_print_timer > 20.0) {
debug_print_timer -= 20.0;
std::list<u16> peerids = m_connection->getPeerIDs();
for (std::list<u16>::iterator i = peerids.begin();
i != peerids.end();
i++)
{
PeerHelper peer = m_connection->getPeerNoEx(*i);
if (!peer)
continue;
float peer_current = 0.0;
float peer_loss = 0.0;
float avg_rate = 0.0;
float avg_loss = 0.0;
for(u16 j=0; j<CHANNEL_COUNT; j++)
{
peer_current +=peer->channels[j].getCurrentDownloadRateKB();
peer_loss += peer->channels[j].getCurrentLossRateKB();
avg_rate += peer->channels[j].getAvgDownloadRateKB();
avg_loss += peer->channels[j].getAvgLossRateKB();
}
std::stringstream output;
output << std::fixed << std::setprecision(1);
output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
output << std::setfill(' ');
for(u16 j=0; j<CHANNEL_COUNT; j++)
{
output << "\tcha " << j << ":"
<< " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
<< " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
<< " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
<< " /"
<< " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
<< " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
<< " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
<< " / WS: " << peer->channels[j].getWindowSize()
<< std::endl;
}
fprintf(stderr,"%s\n",output.str().c_str());
}
}
#endif
END_DEBUG_EXCEPTION_HANDLER
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
return NULL;
}
// Receive packets from the network and buffers and create ConnectionEvents
void ConnectionReceiveThread::receive()
{
// use IPv6 minimum allowed MTU as receive buffer size as this is
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled
// infrastructure
unsigned int packet_maxsize = 1500;
SharedBuffer<u8> packetdata(packet_maxsize);
bool packet_queued = true;
unsigned int loop_count = 0;
/* first of all read packets from socket */
/* check for incoming data available */
while ((loop_count < 10) &&
(m_connection->m_udpSocket.WaitData(50))) {
loop_count++;
try {
if (packet_queued) {
bool data_left = true;
u16 peer_id;
SharedBuffer<u8> resultdata;
while (data_left) {
try {
data_left = getFromBuffers(peer_id, resultdata);
if (data_left) {
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
}
catch (ProcessedSilentlyException &e) {
/* try reading again */
}
}
packet_queued = false;
}
Address sender;
s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
packet_maxsize);
if ((received_size < BASE_HEADER_SIZE) ||
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
<< ", protocol: "
<< ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
<< std::endl);
continue;
}
u16 peer_id = readPeerId(*packetdata);
u8 channelnum = readChannel(*packetdata);
if (channelnum > CHANNEL_COUNT - 1) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid channel " << channelnum << std::endl);
throw InvalidIncomingDataException("Channel doesn't exist");
}
/* Try to identify peer by sender address (may happen on join) */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->lookupPeer(sender);
// We do not have to remind the peer of its
// peer id as the CONTROLTYPE_SET_PEER_ID
// command was sent reliably.
}
/* The peer was not found in our lists. Add it. */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
}
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " got packet from unknown peer_id: "
<< peer_id << " Ignoring." << std::endl);
continue;
}
// Validate peer address
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
if (peer_address != sender) {
LOG(derr_con << m_connection->getDesc()
<< m_connection->getDesc()
<< " Peer " << peer_id << " sending from different address."
" Ignoring." << std::endl);
continue;
}
} else {
bool invalid_address = true;
if (invalid_address) {
LOG(derr_con << m_connection->getDesc()
<< m_connection->getDesc()
<< " Peer " << peer_id << " unknown."
" Ignoring." << std::endl);
continue;
}
}
peer->ResetTimeout();
Channel *channel = 0;
if (dynamic_cast<UDPPeer *>(&peer) != 0) {
channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
}
if (channel != 0) {
channel->UpdateBytesReceived(received_size);
}
// Throw the received packet to channel->processPacket()
// Make a new SharedBuffer from the data without the base headers
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
strippeddata.getSize());
try {
// Process it (the result is some data with no headers made by us)
SharedBuffer<u8> resultdata = processPacket
(channel, strippeddata, peer_id, channelnum, false);
LOG(dout_con << m_connection->getDesc()
<< " ProcessPacket from peer_id: " << peer_id
<< ",channel: " << (channelnum & 0xFF) << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
catch (ProcessedSilentlyException &e) {
}
catch (ProcessedQueued &e) {
packet_queued = true;
}
}
catch (InvalidIncomingDataException &e) {
}
catch (ProcessedSilentlyException &e) {
}
}
}
bool ConnectionReceiveThread::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
{
std::list<u16> peerids = m_connection->getPeerIDs();
for (u16 peerid : peerids) {
PeerHelper peer = m_connection->getPeerNoEx(peerid);
if (!peer)
continue;
if (dynamic_cast<UDPPeer *>(&peer) == 0)
continue;
for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
if (checkIncomingBuffers(&channel, peer_id, dst)) {
return true;
}
}
}
return false;
}
bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
u16 &peer_id, SharedBuffer<u8> &dst)
{
u16 firstseqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
if (firstseqnum == channel->readNextIncomingSeqNum()) {
BufferedPacket p = channel->incoming_reliables.popFirst();
peer_id = readPeerId(*p.data);
u8 channelnum = readChannel(*p.data);
u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
LOG(dout_con << m_connection->getDesc()
<< "UNBUFFERING TYPE_RELIABLE"
<< " seqnum=" << seqnum
<< " peer_id=" << peer_id
<< " channel=" << ((int) channelnum & 0xff)
<< std::endl);
channel->incNextIncomingSeqNum();
u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
// Get out the inside packet and re-process it
SharedBuffer<u8> payload(p.data.getSize() - headers_size);
memcpy(*payload, &p.data[headers_size], payload.getSize());
dst = processPacket(channel, payload, peer_id, channelnum, true);
return true;
}
}
return false;
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
SharedBuffer<u8> packetdata, u16 peer_id, u8 channelnum, bool reliable)
{
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
errorstream << "Peer not found (possible timeout)" << std::endl;
throw ProcessedSilentlyException("Peer not found (possible timeout)");
}
if (packetdata.getSize() < 1)
throw InvalidIncomingDataException("packetdata.getSize() < 1");
u8 type = readU8(&(packetdata[0]));
if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
std::string errmsg = "Invalid peer_id=" + itos(peer_id);
errorstream << errmsg << std::endl;
throw InvalidIncomingDataException(errmsg.c_str());
}
if (type >= PACKET_TYPE_MAX) {
derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
<< std::endl;
throw InvalidIncomingDataException("Invalid packet type");
}
const PacketTypeHandler &pHandle = packetTypeRouter[type];
return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
}
const ConnectionReceiveThread::PacketTypeHandler
ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
{&ConnectionReceiveThread::handlePacketType_Control},
{&ConnectionReceiveThread::handlePacketType_Original},
{&ConnectionReceiveThread::handlePacketType_Split},
{&ConnectionReceiveThread::handlePacketType_Reliable},
};
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
{
if (packetdata.getSize() < 2)
throw InvalidIncomingDataException("packetdata.getSize() < 2");
u8 controltype = readU8(&(packetdata[1]));
if (controltype == CONTROLTYPE_ACK) {
assert(channel != NULL);
if (packetdata.getSize() < 4) {
throw InvalidIncomingDataException(
"packetdata.getSize() < 4 (ACK header size)");
}
u16 seqnum = readU16(&packetdata[2]);
LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
<< ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
<< seqnum << " ]" << std::endl);
try {
BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
// only calculate rtt from straight sent packets
if (p.resend_count == 0) {
// Get round trip time
u64 current_time = porting::getTimeMs();
// a overflow is quite unlikely but as it'd result in major
// rtt miscalculation we handle it here
if (current_time > p.absolute_send_time) {
float rtt = (current_time - p.absolute_send_time) / 1000.0;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
} else if (p.totaltime > 0) {
float rtt = p.totaltime;
// Let peer calculate stuff according to it
// (avg_rtt and resend_timeout)
dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
}
}
// put bytes for max bandwidth calculation
channel->UpdateBytesSent(p.data.getSize(), 1);
if (channel->outgoing_reliables_sent.size() == 0)
m_connection->TriggerSend();
} catch (NotFoundException &e) {
LOG(derr_con << m_connection->getDesc()
<< "WARNING: ACKed packet not in outgoing queue" << std::endl);
channel->UpdatePacketTooLateCounter();
}
throw ProcessedSilentlyException("Got an ACK");
} else if (controltype == CONTROLTYPE_SET_PEER_ID) {
// Got a packet to set our peer id
if (packetdata.getSize() < 4)
throw InvalidIncomingDataException
("packetdata.getSize() < 4 (SET_PEER_ID header size)");
u16 peer_id_new = readU16(&packetdata[2]);
LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
<< "... " << std::endl);
if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
LOG(derr_con << m_connection->getDesc()
<< "WARNING: Not changing existing peer id." << std::endl);
} else {
LOG(dout_con << m_connection->getDesc() << "changing own peer id"
<< std::endl);
m_connection->SetPeerID(peer_id_new);
}
ConnectionCommand cmd;
SharedBuffer<u8> reply(2);
writeU8(&reply[0], PACKET_TYPE_CONTROL);
writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
cmd.disableLegacy(PEER_ID_SERVER, reply);
m_connection->putCommand(cmd);
throw ProcessedSilentlyException("Got a SET_PEER_ID");
} else if (controltype == CONTROLTYPE_PING) {
// Just ignore it, the incoming data already reset
// the timeout counter
LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
throw ProcessedSilentlyException("Got a PING");
} else if (controltype == CONTROLTYPE_DISCO) {
// Just ignore it, the incoming data already reset
// the timeout counter
LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
<< peer->id << std::endl);
if (!m_connection->deletePeer(peer->id, false)) {
derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
}
throw ProcessedSilentlyException("Got a DISCO");
} else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) {
dynamic_cast<UDPPeer *>(peer)->setNonLegacyPeer();
throw ProcessedSilentlyException("Got non legacy control");
} else {
LOG(derr_con << m_connection->getDesc()
<< "INVALID TYPE_CONTROL: invalid controltype="
<< ((int) controltype & 0xff) << std::endl);
throw InvalidIncomingDataException("Invalid control type");
}
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
{
if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
throw InvalidIncomingDataException
("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
<< std::endl);
// Get the inside packet out and return it
SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
return payload;
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
{
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
// We have to create a packet again for buffering
// This isn't actually too bad an idea.
BufferedPacket packet = makePacket(peer_address,
packetdata,
m_connection->GetProtocolID(),
peer->id,
channelnum);
// Buffer the packet
SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
if (data.getSize() != 0) {
LOG(dout_con << m_connection->getDesc()
<< "RETURNING TYPE_SPLIT: Constructed full data, "
<< "size=" << data.getSize() << std::endl);
return data;
}
LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
throw ProcessedSilentlyException("Buffered a split packet chunk");
}
// We should never get here.
FATAL_ERROR("Invalid execution point");
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
{
assert(channel != NULL);
// Recursive reliable packets not allowed
if (reliable)
throw InvalidIncomingDataException("Found nested reliable packets");
if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
u16 seqnum = readU16(&packetdata[1]);
bool is_future_packet = false;
bool is_old_packet = false;
/* packet is within our receive window send ack */
if (seqnum_in_window(seqnum,
channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
m_connection->sendAck(peer->id, channelnum, seqnum);
} else {
is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
/* packet is not within receive window, don't send ack. *
* if this was a valid packet it's gonna be retransmitted */
if (is_future_packet)
throw ProcessedSilentlyException(
"Received packet newer then expected, not sending ack");
/* seems like our ack was lost, send another one for a old packet */
if (is_old_packet) {
LOG(dout_con << m_connection->getDesc()
<< "RE-SENDING ACK: peer_id: " << peer->id
<< ", channel: " << (channelnum & 0xFF)
<< ", seqnum: " << seqnum << std::endl;)
m_connection->sendAck(peer->id, channelnum, seqnum);
// we already have this packet so this one was on wire at least
// the current timeout
// we don't know how long this packet was on wire don't do silly guessing
// dynamic_cast<UDPPeer*>(&peer)->
// reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
throw ProcessedSilentlyException("Retransmitting ack for old packet");
}
}
if (seqnum != channel->readNextIncomingSeqNum()) {
Address peer_address;
// this is a reliable packet so we have a udp address for sure
peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
// This one comes later, buffer it.
// Actually we have to make a packet to buffer one.
// Well, we have all the ingredients, so just do it.
BufferedPacket packet = con::makePacket(
peer_address,
packetdata,
m_connection->GetProtocolID(),
peer->id,
channelnum);
try {
channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
LOG(dout_con << m_connection->getDesc()
<< "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
<< ", channel: " << (channelnum & 0xFF)
<< ", seqnum: " << seqnum << std::endl;)
throw ProcessedQueued("Buffered future reliable packet");
} catch (AlreadyExistsException &e) {
} catch (IncomingDataCorruption &e) {
ConnectionCommand discon;
discon.disconnect_peer(peer->id);
m_connection->putCommand(discon);
LOG(derr_con << m_connection->getDesc()
<< "INVALID, TYPE_RELIABLE peer_id: " << peer->id
<< ", channel: " << (channelnum & 0xFF)
<< ", seqnum: " << seqnum
<< "DROPPING CLIENT!" << std::endl;)
}
}
/* we got a packet to process right now */
LOG(dout_con << m_connection->getDesc()
<< "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
<< ", channel: " << (channelnum & 0xFF)
<< ", seqnum: " << seqnum << std::endl;)
/* check for resend case */
u16 queued_seqnum = 0;
if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
if (queued_seqnum == seqnum) {
BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
/** TODO find a way to verify the new against the old packet */
}
}
channel->incNextIncomingSeqNum();
// Get out the inside packet and re-process it
SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
return processPacket(channel, payload, peer->id, channelnum, true);
}
}