Track connection half-open state

This commit is contained in:
sfan5 2024-01-05 00:31:14 +01:00
parent 2ea8d9ca11
commit db88d24ff8
4 changed files with 119 additions and 40 deletions

@ -49,7 +49,7 @@ namespace con
// TODO: Clean this up. // TODO: Clean this up.
#define LOG(a) a #define LOG(a) a
#define PING_TIMEOUT 5.0 #define PING_TIMEOUT 5.0f
u16 BufferedPacket::getSeqnum() const u16 BufferedPacket::getSeqnum() const
{ {
@ -548,6 +548,15 @@ ConnectionCommandPtr ConnectionCommand::disconnect_peer(session_t peer_id)
return c; return c;
} }
ConnectionCommandPtr ConnectionCommand::resend_one(session_t peer_id)
{
auto c = create(CONNCMD_RESEND_ONE);
c->peer_id = peer_id;
c->channelnum = 0; // must be same as createPeer
c->reliable = true;
return c;
}
ConnectionCommandPtr ConnectionCommand::send(session_t peer_id, u8 channelnum, ConnectionCommandPtr ConnectionCommand::send(session_t peer_id, u8 channelnum,
NetworkPacket *pkt, bool reliable) NetworkPacket *pkt, bool reliable)
{ {
@ -984,12 +993,12 @@ void UDPPeer::reportRTT(float rtt)
bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data) bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
{ {
m_ping_timer += dtime; m_ping_timer += dtime;
if (m_ping_timer >= PING_TIMEOUT) if (!isHalfOpen() && m_ping_timer >= PING_TIMEOUT)
{ {
// Create and send PING packet // Create and send PING packet
writeU8(&data[0], PACKET_TYPE_CONTROL); writeU8(&data[0], PACKET_TYPE_CONTROL);
writeU8(&data[1], CONTROLTYPE_PING); writeU8(&data[1], CONTROLTYPE_PING);
m_ping_timer = 0.0; m_ping_timer = 0.0f;
return true; return true;
} }
return false; return false;
@ -1608,6 +1617,12 @@ void Connection::DisconnectPeer(session_t peer_id)
putCommand(ConnectionCommand::disconnect_peer(peer_id)); putCommand(ConnectionCommand::disconnect_peer(peer_id));
} }
void Connection::doResendOne(session_t peer_id)
{
assert(peer_id != PEER_ID_INEXISTENT);
putCommand(ConnectionCommand::resend_one(peer_id));
}
void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
{ {
assert(channelnum < CHANNEL_COUNT); // Pre-condition assert(channelnum < CHANNEL_COUNT); // Pre-condition
@ -1634,6 +1649,7 @@ UDPPeer* Connection::createServerPeer(Address& address)
} }
UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this); UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this);
peer->SetFullyOpen();
{ {
MutexAutoLock lock(m_peers_mutex); MutexAutoLock lock(m_peers_mutex);

@ -313,7 +313,8 @@ enum ConnectionCommandType{
CONNCMD_SEND, CONNCMD_SEND,
CONNCMD_SEND_TO_ALL, CONNCMD_SEND_TO_ALL,
CONCMD_ACK, CONCMD_ACK,
CONCMD_CREATE_PEER CONCMD_CREATE_PEER,
CONNCMD_RESEND_ONE
}; };
struct ConnectionCommand; struct ConnectionCommand;
@ -336,6 +337,7 @@ struct ConnectionCommand
static ConnectionCommandPtr connect(Address address); static ConnectionCommandPtr connect(Address address);
static ConnectionCommandPtr disconnect(); static ConnectionCommandPtr disconnect();
static ConnectionCommandPtr disconnect_peer(session_t peer_id); static ConnectionCommandPtr disconnect_peer(session_t peer_id);
static ConnectionCommandPtr resend_one(session_t peer_id);
static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable); static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data); static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data);
static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer<u8> &data); static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer<u8> &data);
@ -520,6 +522,9 @@ class Peer {
void ResetTimeout() void ResetTimeout()
{MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; }; {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
bool isHalfOpen() const { return m_half_open; }
void SetFullyOpen() { m_half_open = false; }
bool isTimedOut(float timeout); bool isTimedOut(float timeout);
unsigned int m_increment_packets_remaining = 0; unsigned int m_increment_packets_remaining = 0;
@ -590,6 +595,15 @@ class Peer {
rttstats m_rtt; rttstats m_rtt;
float m_last_rtt = -1.0f; float m_last_rtt = -1.0f;
/*
Until the peer has communicated with us using their assigned peer id
the connection is considered half-open.
During this time we inhibit re-sending any reliables or pings. This
is to avoid spending too many resources on a potential DoS attack
and to make sure Minetest servers are not useful for UDP amplificiation.
*/
bool m_half_open = true;
// current usage count // current usage count
unsigned int m_usage = 0; unsigned int m_usage = 0;
@ -736,6 +750,8 @@ protected:
void SetPeerID(session_t id) { m_peer_id = id; } void SetPeerID(session_t id) { m_peer_id = id; }
void doResendOne(session_t peer_id);
void sendAck(session_t peer_id, u8 channelnum, u16 seqnum); void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
std::vector<session_t> getPeerIDs() std::vector<session_t> getPeerIDs()

@ -46,11 +46,11 @@ namespace con
#define WINDOW_SIZE 5 #define WINDOW_SIZE 5
static session_t readPeerId(const u8 *packetdata) static inline session_t readPeerId(const u8 *packetdata)
{ {
return readU16(&packetdata[4]); return readU16(&packetdata[4]);
} }
static u8 readChannel(const u8 *packetdata) static inline u8 readChannel(const u8 *packetdata)
{ {
return readU8(&packetdata[6]); return readU8(&packetdata[6]);
} }
@ -220,29 +220,22 @@ void ConnectionSendThread::runTimeouts(float dtime)
channel.UpdatePacketLossCounter(timed_outs.size()); channel.UpdatePacketLossCounter(timed_outs.size());
g_profiler->graphAdd("packets_lost", timed_outs.size()); g_profiler->graphAdd("packets_lost", timed_outs.size());
m_iteration_packets_avaialble -= timed_outs.size(); // Note that this only happens during connection setup, it would
// break badly otherwise.
for (const auto &k : timed_outs) { if (peer->isHalfOpen()) {
u8 channelnum = readChannel(k->data); if (!timed_outs.empty()) {
u16 seqnum = k->getSeqnum(); dout_con << m_connection->getDesc() <<
"Skipping re-send of " << timed_outs.size() <<
channel.UpdateBytesLost(k->size()); " timed-out reliables to peer_id " << udpPeer->id
<< " (half-open)." << std::endl;
LOG(derr_con << m_connection->getDesc() }
<< "RE-SENDING timed-out RELIABLE to " continue;
<< k->address.serializeString()
<< "(t/o=" << resend_timeout << "): "
<< "count=" << k->resend_count
<< ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum
<< std::endl);
rawSend(k.get());
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
} }
m_iteration_packets_avaialble -= timed_outs.size();
for (const auto &k : timed_outs)
resendReliable(channel, k.get(), resend_timeout);
channel.UpdateTimers(dtime); channel.UpdateTimers(dtime);
} }
@ -270,8 +263,36 @@ void ConnectionSendThread::runTimeouts(float dtime)
} }
} }
void ConnectionSendThread::resendReliable(Channel &channel, const BufferedPacket *k, float resend_timeout)
{
assert(k);
u8 channelnum = readChannel(k->data);
u16 seqnum = k->getSeqnum();
channel.UpdateBytesLost(k->size());
derr_con << m_connection->getDesc()
<< "RE-SENDING timed-out RELIABLE to "
<< k->address.serializeString();
if (resend_timeout >= 0)
derr_con << "(t/o=" << resend_timeout << "): ";
else
derr_con << "(force): ";
derr_con
<< "count=" << k->resend_count
<< ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum
<< std::endl;
rawSend(k);
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
}
void ConnectionSendThread::rawSend(const BufferedPacket *p) void ConnectionSendThread::rawSend(const BufferedPacket *p)
{ {
assert(p);
try { try {
m_connection->m_udpSocket.Send(p->address, p->data, p->size()); m_connection->m_udpSocket.Send(p->address, p->data, p->size());
LOG(dout_con << m_connection->getDesc() LOG(dout_con << m_connection->getDesc()
@ -398,6 +419,23 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
} }
return; return;
case CONNCMD_RESEND_ONE: {
LOG(dout_con << m_connection->getDesc()
<< "UDP processing reliable CONNCMD_RESEND_ONE" << std::endl);
PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
if (!peer)
return;
Channel &channel = dynamic_cast<UDPPeer *>(&peer)->channels[c->channelnum];
auto timed_outs = channel.outgoing_reliables_sent.getTimedOuts(0, 1);
if (!timed_outs.empty())
resendReliable(channel, timed_outs.front().get(), -1);
return;
}
case CONNCMD_SERVE: case CONNCMD_SERVE:
case CONNCMD_CONNECT: case CONNCMD_CONNECT:
case CONNCMD_DISCONNECT: case CONNCMD_DISCONNECT:
@ -457,6 +495,7 @@ void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr
sendAsPacket(c.peer_id, c.channelnum, c.data, true); sendAsPacket(c.peer_id, c.channelnum, c.data, true);
return; return;
case CONCMD_CREATE_PEER: case CONCMD_CREATE_PEER:
case CONNCMD_RESEND_ONE:
FATAL_ERROR("Got command that should be reliable as unreliable command"); FATAL_ERROR("Got command that should be reliable as unreliable command");
default: default:
LOG(dout_con << m_connection->getDesc() LOG(dout_con << m_connection->getDesc()
@ -918,20 +957,24 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
return; return;
} }
/* Try to identify peer by sender address (may happen on join) */ const bool knew_peer_id = peer_id != PEER_ID_INEXISTENT;
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->lookupPeer(sender);
// We do not have to remind the peer of its
// peer id as the CONTROLTYPE_SET_PEER_ID
// command was sent reliably.
}
if (peer_id == PEER_ID_INEXISTENT) { if (!m_connection->ConnectedToServer()) {
/* Ignore it if we are a client */ // Try to identify peer by sender address
if (m_connection->ConnectedToServer()) if (peer_id == PEER_ID_INEXISTENT) {
return; peer_id = m_connection->lookupPeer(sender);
/* The peer was not found in our lists. Add it. */ if (peer_id != PEER_ID_INEXISTENT) {
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0); /* During join it can happen that the CONTROLTYPE_SET_PEER_ID
* packet is lost. Since resends are not active at this stage
* we need to remind the peer manually. */
m_connection->doResendOne(peer_id);
}
}
// Someone new is trying to talk to us. Add them.
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
}
} }
PeerHelper peer = m_connection->getPeerNoEx(peer_id); PeerHelper peer = m_connection->getPeerNoEx(peer_id);
@ -959,6 +1002,9 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
return; return;
} }
if (knew_peer_id)
peer->SetFullyOpen();
peer->ResetTimeout(); peer->ResetTimeout();
Channel *channel = nullptr; Channel *channel = nullptr;

@ -70,6 +70,7 @@ public:
private: private:
void runTimeouts(float dtime); void runTimeouts(float dtime);
void resendReliable(Channel &channel, const BufferedPacket *k, float resend_timeout);
void rawSend(const BufferedPacket *p); void rawSend(const BufferedPacket *p);
bool rawSendAsPacket(session_t peer_id, u8 channelnum, bool rawSendAsPacket(session_t peer_id, u8 channelnum,
const SharedBuffer<u8> &data, bool reliable); const SharedBuffer<u8> &data, bool reliable);