Do not allocate packet quota to half-open connections

This commit is contained in:
sfan5 2024-01-05 13:54:50 +01:00
parent 3987318f09
commit 050152eb90
4 changed files with 39 additions and 20 deletions

@ -1397,6 +1397,21 @@ session_t Connection::lookupPeer(const Address& sender)
return PEER_ID_INEXISTENT; return PEER_ID_INEXISTENT;
} }
u32 Connection::getActiveCount()
{
MutexAutoLock peerlock(m_peers_mutex);
u32 count = 0;
for (auto &it : m_peers) {
Peer *peer = it.second;
if (peer->isPendingDeletion())
continue;
if (peer->isHalfOpen())
continue;
count++;
}
return count;
}
bool Connection::deletePeer(session_t peer_id, bool timeout) bool Connection::deletePeer(session_t peer_id, bool timeout)
{ {
Peer *peer = 0; Peer *peer = 0;

@ -764,6 +764,8 @@ protected:
return m_peer_ids; return m_peer_ids;
} }
u32 getActiveCount();
UDPSocket m_udpSocket; UDPSocket m_udpSocket;
// Command queue: user -> SendThread // Command queue: user -> SendThread
MutexedQueue<ConnectionCommandPtr> m_command_queue; MutexedQueue<ConnectionCommandPtr> m_command_queue;

@ -86,8 +86,6 @@ void *ConnectionSendThread::run()
BEGIN_DEBUG_EXCEPTION_HANDLER BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
/* wait for trigger or timeout */ /* wait for trigger or timeout */
m_send_sleep_semaphore.wait(50); m_send_sleep_semaphore.wait(50);
@ -99,8 +97,16 @@ void *ConnectionSendThread::run()
curtime = porting::getTimeMs(); curtime = porting::getTimeMs();
float dtime = CALC_DTIME(lasttime, curtime); float dtime = CALC_DTIME(lasttime, curtime);
m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
const auto &calculate_quota = [&] () -> u32 {
u32 numpeers = m_connection->getActiveCount();
if (numpeers > 0)
return MYMAX(1, m_iteration_packets_avaialble / numpeers);
return m_iteration_packets_avaialble;
};
/* first resend timed-out packets */ /* first resend timed-out packets */
runTimeouts(dtime); runTimeouts(dtime, calculate_quota());
if (m_iteration_packets_avaialble == 0) { if (m_iteration_packets_avaialble == 0) {
LOG(warningstream << m_connection->getDesc() LOG(warningstream << m_connection->getDesc()
<< " Packet quota used up after re-sending packets, " << " Packet quota used up after re-sending packets, "
@ -119,7 +125,7 @@ void *ConnectionSendThread::run()
} }
/* send queued packets */ /* send queued packets */
sendPackets(dtime); sendPackets(dtime, calculate_quota());
END_DEBUG_EXCEPTION_HANDLER END_DEBUG_EXCEPTION_HANDLER
} }
@ -160,17 +166,12 @@ bool ConnectionSendThread::packetsQueued()
return false; return false;
} }
void ConnectionSendThread::runTimeouts(float dtime) void ConnectionSendThread::runTimeouts(float dtime, u32 peer_packet_quota)
{ {
std::vector<session_t> timeouted_peers; std::vector<session_t> timeouted_peers;
std::vector<session_t> peerIds = m_connection->getPeerIDs(); std::vector<session_t> peerIds = m_connection->getPeerIDs();
const u32 numpeers = m_connection->m_peers.size(); for (const session_t peerId : peerIds) {
if (numpeers == 0)
return;
for (session_t &peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId); PeerHelper peer = m_connection->getPeerNoEx(peerId);
if (!peer) if (!peer)
@ -217,8 +218,8 @@ void ConnectionSendThread::runTimeouts(float dtime)
channel.outgoing_reliables_sent.incrementTimeouts(dtime); channel.outgoing_reliables_sent.incrementTimeouts(dtime);
// Re-send timed out outgoing reliables // Re-send timed out outgoing reliables
auto timed_outs = channel.outgoing_reliables_sent.getResend(resend_timeout, auto timed_outs = channel.outgoing_reliables_sent.getResend(
(m_max_data_packets_per_iteration / numpeers)); resend_timeout, peer_packet_quota);
channel.UpdatePacketLossCounter(timed_outs.size()); channel.UpdatePacketLossCounter(timed_outs.size());
g_profiler->graphAdd("packets_lost", timed_outs.size()); g_profiler->graphAdd("packets_lost", timed_outs.size());
@ -235,7 +236,11 @@ void ConnectionSendThread::runTimeouts(float dtime)
continue; continue;
} }
if (m_iteration_packets_avaialble > timed_outs.size())
m_iteration_packets_avaialble -= timed_outs.size(); m_iteration_packets_avaialble -= timed_outs.size();
else
m_iteration_packets_avaialble = 0;
for (const auto &k : timed_outs) for (const auto &k : timed_outs)
resendReliable(channel, k.get(), resend_timeout); resendReliable(channel, k.get(), resend_timeout);
@ -643,15 +648,12 @@ void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
} }
} }
void ConnectionSendThread::sendPackets(float dtime) void ConnectionSendThread::sendPackets(float dtime, u32 peer_packet_quota)
{ {
std::vector<session_t> peerIds = m_connection->getPeerIDs(); std::vector<session_t> peerIds = m_connection->getPeerIDs();
std::vector<session_t> pendingDisconnect; std::vector<session_t> pendingDisconnect;
std::map<session_t, bool> pending_unreliable; std::map<session_t, bool> pending_unreliable;
const unsigned int peer_packet_quota = m_iteration_packets_avaialble
/ MYMAX(peerIds.size(), 1);
for (session_t peerId : peerIds) { for (session_t peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId); PeerHelper peer = m_connection->getPeerNoEx(peerId);
//peer may have been removed //peer may have been removed

@ -69,7 +69,7 @@ public:
void setPeerTimeout(float peer_timeout) { m_timeout = peer_timeout; } void setPeerTimeout(float peer_timeout) { m_timeout = peer_timeout; }
private: private:
void runTimeouts(float dtime); void runTimeouts(float dtime, u32 peer_packet_quota);
void resendReliable(Channel &channel, const BufferedPacket *k, float resend_timeout); void resendReliable(Channel &channel, const BufferedPacket *k, float resend_timeout);
void rawSend(const BufferedPacket *p); void rawSend(const BufferedPacket *p);
bool rawSendAsPacket(session_t peer_id, u8 channelnum, bool rawSendAsPacket(session_t peer_id, u8 channelnum,
@ -86,7 +86,7 @@ private:
void sendToAll(u8 channelnum, const SharedBuffer<u8> &data); void sendToAll(u8 channelnum, const SharedBuffer<u8> &data);
void sendToAllReliable(ConnectionCommandPtr &c); void sendToAllReliable(ConnectionCommandPtr &c);
void sendPackets(float dtime); void sendPackets(float dtime, u32 peer_packet_quota);
void sendAsPacket(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data, void sendAsPacket(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data,
bool ack = false); bool ack = false);