Miscellaneous networking improvements (#9611)

fixes #2862
This commit is contained in:
sfan5 2020-04-08 20:12:58 +02:00 committed by GitHub
parent 143a37e947
commit 3494475df1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 87 additions and 43 deletions

@ -126,6 +126,16 @@ const ToClientCommandHandler toClientCommandTable[TOCLIENT_NUM_MSG_TYPES] =
const static ServerCommandFactory null_command_factory = { "TOSERVER_NULL", 0, false }; const static ServerCommandFactory null_command_factory = { "TOSERVER_NULL", 0, false };
/*
Channels used for Client -> Server communication
2: Notifications back to the server (e.g. GOTBLOCKS)
1: Init and Authentication
0: everything else
Packet order is only guaranteed inside a channel, so packets that operate on
the same objects are *required* to be in the same channel.
*/
const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] = const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
{ {
null_command_factory, // 0x00 null_command_factory, // 0x00
@ -143,7 +153,7 @@ const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
null_command_factory, // 0x0c null_command_factory, // 0x0c
null_command_factory, // 0x0d null_command_factory, // 0x0d
null_command_factory, // 0x0e null_command_factory, // 0x0e
null_command_factory, // 0x0F null_command_factory, // 0x0f
null_command_factory, // 0x10 null_command_factory, // 0x10
{ "TOSERVER_INIT2", 1, true }, // 0x11 { "TOSERVER_INIT2", 1, true }, // 0x11
null_command_factory, // 0x12 null_command_factory, // 0x12
@ -186,7 +196,7 @@ const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
{ "TOSERVER_PLAYERITEM", 0, true }, // 0x37 { "TOSERVER_PLAYERITEM", 0, true }, // 0x37
{ "TOSERVER_RESPAWN", 0, true }, // 0x38 { "TOSERVER_RESPAWN", 0, true }, // 0x38
{ "TOSERVER_INTERACT", 0, true }, // 0x39 { "TOSERVER_INTERACT", 0, true }, // 0x39
{ "TOSERVER_REMOVED_SOUNDS", 1, true }, // 0x3a { "TOSERVER_REMOVED_SOUNDS", 2, true }, // 0x3a
{ "TOSERVER_NODEMETA_FIELDS", 0, true }, // 0x3b { "TOSERVER_NODEMETA_FIELDS", 0, true }, // 0x3b
{ "TOSERVER_INVENTORY_FIELDS", 0, true }, // 0x3c { "TOSERVER_INVENTORY_FIELDS", 0, true }, // 0x3c
null_command_factory, // 0x3d null_command_factory, // 0x3d
@ -194,8 +204,8 @@ const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
null_command_factory, // 0x3f null_command_factory, // 0x3f
{ "TOSERVER_REQUEST_MEDIA", 1, true }, // 0x40 { "TOSERVER_REQUEST_MEDIA", 1, true }, // 0x40
null_command_factory, // 0x41 null_command_factory, // 0x41
{ "TOSERVER_BREATH", 0, true }, // 0x42 old TOSERVER_BREATH. Ignored by servers null_command_factory, // 0x42
{ "TOSERVER_CLIENT_READY", 0, true }, // 0x43 { "TOSERVER_CLIENT_READY", 1, true }, // 0x43
null_command_factory, // 0x44 null_command_factory, // 0x44
null_command_factory, // 0x45 null_command_factory, // 0x45
null_command_factory, // 0x46 null_command_factory, // 0x46

@ -924,7 +924,7 @@ UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
Peer(a_address,a_id,connection) Peer(a_address,a_id,connection)
{ {
for (Channel &channel : channels) for (Channel &channel : channels)
channel.setWindowSize(g_settings->getU16("max_packets_per_iteration")); channel.setWindowSize(START_RELIABLE_WINDOW_SIZE);
} }
bool UDPPeer::getAddress(MTProtocols type,Address& toset) bool UDPPeer::getAddress(MTProtocols type,Address& toset)
@ -975,22 +975,29 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
if (m_pending_disconnect) if (m_pending_disconnect)
return; return;
if ( channels[c.channelnum].queued_commands.empty() && Channel &chan = channels[c.channelnum];
if (chan.queued_commands.empty() &&
/* don't queue more packets then window size */ /* don't queue more packets then window size */
(channels[c.channelnum].queued_reliables.size() (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
< (channels[c.channelnum].getWindowSize()/2))) {
LOG(dout_con<<m_connection->getDesc() LOG(dout_con<<m_connection->getDesc()
<<" processing reliable command for peer id: " << c.peer_id <<" processing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() << std::endl); <<" data size: " << c.data.getSize() << std::endl);
if (!processReliableSendCommand(c,max_packet_size)) { if (!processReliableSendCommand(c,max_packet_size)) {
channels[c.channelnum].queued_commands.push_back(c); chan.queued_commands.push_back(c);
} }
} }
else { else {
LOG(dout_con<<m_connection->getDesc() LOG(dout_con<<m_connection->getDesc()
<<" Queueing reliable command for peer id: " << c.peer_id <<" Queueing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() <<std::endl); <<" data size: " << c.data.getSize() <<std::endl);
channels[c.channelnum].queued_commands.push_back(c); chan.queued_commands.push_back(c);
if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
LOG(derr_con << m_connection->getDesc()
<< "Possible packet stall to peer id: " << c.peer_id
<< " queued_commands=" << chan.queued_commands.size()
<< std::endl);
}
} }
} }
@ -1001,6 +1008,8 @@ bool UDPPeer::processReliableSendCommand(
if (m_pending_disconnect) if (m_pending_disconnect)
return true; return true;
Channel &chan = channels[c.channelnum];
u32 chunksize_max = max_packet_size u32 chunksize_max = max_packet_size
- BASE_HEADER_SIZE - BASE_HEADER_SIZE
- RELIABLE_HEADER_SIZE; - RELIABLE_HEADER_SIZE;
@ -1008,13 +1017,13 @@ bool UDPPeer::processReliableSendCommand(
sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512); sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
std::list<SharedBuffer<u8>> originals; std::list<SharedBuffer<u8>> originals;
u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum(); u16 split_sequence_number = chan.readNextSplitSeqNum();
if (c.raw) { if (c.raw) {
originals.emplace_back(c.data); originals.emplace_back(c.data);
} else { } else {
makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals); makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
channels[c.channelnum].setNextSplitSeqNum(split_sequence_number); chan.setNextSplitSeqNum(split_sequence_number);
} }
bool have_sequence_number = true; bool have_sequence_number = true;
@ -1023,7 +1032,7 @@ bool UDPPeer::processReliableSendCommand(
volatile u16 initial_sequence_number = 0; volatile u16 initial_sequence_number = 0;
for (SharedBuffer<u8> &original : originals) { for (SharedBuffer<u8> &original : originals) {
u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number); u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number);
/* oops, we don't have enough sequence numbers to send this packet */ /* oops, we don't have enough sequence numbers to send this packet */
if (!have_sequence_number) if (!have_sequence_number)
@ -1055,10 +1064,10 @@ bool UDPPeer::processReliableSendCommand(
// << " channel: " << (c.channelnum&0xFF) // << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1]) // << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl) // << std::endl)
channels[c.channelnum].queued_reliables.push(p); chan.queued_reliables.push(p);
pcount++; pcount++;
} }
sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF); sanity_check(chan.queued_reliables.size() < 0xFFFF);
return true; return true;
} }
@ -1073,7 +1082,7 @@ bool UDPPeer::processReliableSendCommand(
toadd.pop(); toadd.pop();
bool successfully_put_back_sequence_number bool successfully_put_back_sequence_number
= channels[c.channelnum].putBackSequenceNumber( = chan.putBackSequenceNumber(
(initial_sequence_number+toadd.size() % (SEQNUM_MAX+1))); (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error"); FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
@ -1081,7 +1090,7 @@ bool UDPPeer::processReliableSendCommand(
// DO NOT REMOVE n_queued! It avoids a deadlock of async locked // DO NOT REMOVE n_queued! It avoids a deadlock of async locked
// 'log_message_mutex' and 'm_list_mutex'. // 'log_message_mutex' and 'm_list_mutex'.
u32 n_queued = channels[c.channelnum].outgoing_reliables_sent.size(); u32 n_queued = chan.outgoing_reliables_sent.size();
LOG(dout_con<<m_connection->getDesc() LOG(dout_con<<m_connection->getDesc()
<< " Windowsize exceeded on reliable sending " << " Windowsize exceeded on reliable sending "

@ -141,7 +141,6 @@ private:
=== NOTES === === NOTES ===
A packet is sent through a channel to a peer with a basic header: A packet is sent through a channel to a peer with a basic header:
TODO: Should we have a receiver_peer_id also?
Header (7 bytes): Header (7 bytes):
[0] u32 protocol_id [0] u32 protocol_id
[4] session_t sender_peer_id [4] session_t sender_peer_id
@ -152,8 +151,7 @@ sender_peer_id:
value 1 (PEER_ID_SERVER) is reserved for server value 1 (PEER_ID_SERVER) is reserved for server
these constants are defined in constants.h these constants are defined in constants.h
channel: channel:
The lower the number, the higher the priority is. Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
Only channels 0, 1 and 2 exist.
*/ */
#define BASE_HEADER_SIZE 7 #define BASE_HEADER_SIZE 7
#define CHANNEL_COUNT 3 #define CHANNEL_COUNT 3
@ -386,12 +384,14 @@ struct ConnectionCommand
} }
}; };
/* maximum window size to use, 0xFFFF is theoretical maximum don't think about /* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
* touching it, the less you're away from it the more likely data corruption * touching it, the less you're away from it the more likely data corruption
* will occur * will occur
*/ */
#define MAX_RELIABLE_WINDOW_SIZE 0x8000 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
/* starting value for window size */ /* starting value for window size */
#define START_RELIABLE_WINDOW_SIZE 0x400
/* minimum value for window size */
#define MIN_RELIABLE_WINDOW_SIZE 0x40 #define MIN_RELIABLE_WINDOW_SIZE 0x40
class Channel class Channel
@ -555,15 +555,15 @@ class Peer {
bool isTimedOut(float timeout); bool isTimedOut(float timeout);
unsigned int m_increment_packets_remaining = 9; unsigned int m_increment_packets_remaining = 0;
unsigned int m_increment_bytes_remaining = 0;
virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; }; virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {}; virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd, virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
bool reliable) bool reliable)
{ {
fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n"); errorstream << "Peer::addSplitPacket called,"
<< " this is supposed to be never called!" << std::endl;
return SharedBuffer<u8>(0); return SharedBuffer<u8>(0);
}; };

@ -73,6 +73,7 @@ ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
m_timeout(timeout), m_timeout(timeout),
m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration")) m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
{ {
SANITY_CHECK(m_max_data_packets_per_iteration > 1);
} }
void *ConnectionSendThread::run() void *ConnectionSendThread::run()
@ -107,8 +108,13 @@ void *ConnectionSendThread::run()
curtime = porting::getTimeMs(); curtime = porting::getTimeMs();
float dtime = CALC_DTIME(lasttime, curtime); float dtime = CALC_DTIME(lasttime, curtime);
/* first do all the reliable stuff */ /* first resend timed-out packets */
runTimeouts(dtime); runTimeouts(dtime);
if (m_iteration_packets_avaialble == 0) {
LOG(warningstream << m_connection->getDesc()
<< " Packet quota used up after re-sending packets, "
<< "max=" << m_max_data_packets_per_iteration << std::endl);
}
/* translate commands to packets */ /* translate commands to packets */
ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0); ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
@ -121,7 +127,7 @@ void *ConnectionSendThread::run()
c = m_connection->m_command_queue.pop_frontNoEx(0); c = m_connection->m_command_queue.pop_frontNoEx(0);
} }
/* send non reliable packets */ /* send queued packets */
sendPackets(dtime); sendPackets(dtime);
END_DEBUG_EXCEPTION_HANDLER END_DEBUG_EXCEPTION_HANDLER
@ -644,6 +650,9 @@ void ConnectionSendThread::sendPackets(float dtime)
std::list<session_t> pendingDisconnect; std::list<session_t> pendingDisconnect;
std::map<session_t, bool> pending_unreliable; std::map<session_t, bool> pending_unreliable;
const unsigned int peer_packet_quota = m_iteration_packets_avaialble
/ MYMAX(peerIds.size(), 1);
for (session_t peerId : peerIds) { for (session_t peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId); PeerHelper peer = m_connection->getPeerNoEx(peerId);
//peer may have been removed //peer may have been removed
@ -653,8 +662,7 @@ void ConnectionSendThread::sendPackets(float dtime)
<< std::endl); << std::endl);
continue; continue;
} }
peer->m_increment_packets_remaining = peer->m_increment_packets_remaining = peer_packet_quota;
m_iteration_packets_avaialble / m_connection->m_peers.size();
UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer); UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
@ -751,16 +759,10 @@ void ConnectionSendThread::sendPackets(float dtime)
} }
/* send acks immediately */ /* send acks immediately */
if (packet.ack) { if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable);
peer->m_increment_packets_remaining =
MYMIN(0, peer->m_increment_packets_remaining--);
} else if (
(peer->m_increment_packets_remaining > 0) ||
(stopRequested())) {
rawSendAsPacket(packet.peer_id, packet.channelnum, rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable); packet.data, packet.reliable);
if (peer->m_increment_packets_remaining > 0)
peer->m_increment_packets_remaining--; peer->m_increment_packets_remaining--;
} else { } else {
m_outgoing_queue.push(packet); m_outgoing_queue.push(packet);
@ -768,6 +770,19 @@ void ConnectionSendThread::sendPackets(float dtime)
} }
} }
if (peer_packet_quota > 0) {
for (session_t peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
if (!peer)
continue;
if (peer->m_increment_packets_remaining == 0) {
LOG(warningstream << m_connection->getDesc()
<< " Packet quota used up for peer_id=" << peerId
<< ", was " << peer_packet_quota << " pkts" << std::endl);
}
}
}
for (session_t peerId : pendingDisconnect) { for (session_t peerId : pendingDisconnect) {
if (!pending_unreliable[peerId]) { if (!pending_unreliable[peerId]) {
m_connection->deletePeer(peerId, false); m_connection->deletePeer(peerId, false);

@ -111,6 +111,16 @@ const ToServerCommandHandler toServerCommandTable[TOSERVER_NUM_MSG_TYPES] =
const static ClientCommandFactory null_command_factory = { "TOCLIENT_NULL", 0, false }; const static ClientCommandFactory null_command_factory = { "TOCLIENT_NULL", 0, false };
/*
Channels used for Server -> Client communication
2: Bulk data (mapblocks, media, ...)
1: HUD packets
0: everything else
Packet order is only guaranteed inside a channel, so packets that operate on
the same objects are *required* to be in the same channel.
*/
const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] = const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
{ {
null_command_factory, // 0x00 null_command_factory, // 0x00
@ -163,7 +173,7 @@ const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
{ "TOCLIENT_CHAT_MESSAGE", 0, true }, // 0x2F { "TOCLIENT_CHAT_MESSAGE", 0, true }, // 0x2F
null_command_factory, // 0x30 null_command_factory, // 0x30
{ "TOCLIENT_ACTIVE_OBJECT_REMOVE_ADD", 0, true }, // 0x31 { "TOCLIENT_ACTIVE_OBJECT_REMOVE_ADD", 0, true }, // 0x31
{ "TOCLIENT_ACTIVE_OBJECT_MESSAGES", 0, true }, // 0x32 Special packet, sent by 0 (rel) and 1 (unrel) channel { "TOCLIENT_ACTIVE_OBJECT_MESSAGES", 0, true }, // 0x32 (may be sent as unrel over channel 1 too)
{ "TOCLIENT_HP", 0, true }, // 0x33 { "TOCLIENT_HP", 0, true }, // 0x33
{ "TOCLIENT_MOVE_PLAYER", 0, true }, // 0x34 { "TOCLIENT_MOVE_PLAYER", 0, true }, // 0x34
{ "TOCLIENT_ACCESS_DENIED_LEGACY", 0, true }, // 0x35 { "TOCLIENT_ACCESS_DENIED_LEGACY", 0, true }, // 0x35
@ -176,7 +186,7 @@ const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
{ "TOCLIENT_ANNOUNCE_MEDIA", 0, true }, // 0x3C { "TOCLIENT_ANNOUNCE_MEDIA", 0, true }, // 0x3C
{ "TOCLIENT_ITEMDEF", 0, true }, // 0x3D { "TOCLIENT_ITEMDEF", 0, true }, // 0x3D
null_command_factory, // 0x3E null_command_factory, // 0x3E
{ "TOCLIENT_PLAY_SOUND", 0, true }, // 0x3f { "TOCLIENT_PLAY_SOUND", 0, true }, // 0x3f (may be sent as unrel too)
{ "TOCLIENT_STOP_SOUND", 0, true }, // 0x40 { "TOCLIENT_STOP_SOUND", 0, true }, // 0x40
{ "TOCLIENT_PRIVILEGES", 0, true }, // 0x41 { "TOCLIENT_PRIVILEGES", 0, true }, // 0x41
{ "TOCLIENT_INVENTORY_FORMSPEC", 0, true }, // 0x42 { "TOCLIENT_INVENTORY_FORMSPEC", 0, true }, // 0x42
@ -188,9 +198,9 @@ const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
null_command_factory, // 0x48 null_command_factory, // 0x48
{ "TOCLIENT_HUDADD", 1, true }, // 0x49 { "TOCLIENT_HUDADD", 1, true }, // 0x49
{ "TOCLIENT_HUDRM", 1, true }, // 0x4a { "TOCLIENT_HUDRM", 1, true }, // 0x4a
{ "TOCLIENT_HUDCHANGE", 0, true }, // 0x4b { "TOCLIENT_HUDCHANGE", 1, true }, // 0x4b
{ "TOCLIENT_HUD_SET_FLAGS", 0, true }, // 0x4c { "TOCLIENT_HUD_SET_FLAGS", 1, true }, // 0x4c
{ "TOCLIENT_HUD_SET_PARAM", 0, true }, // 0x4d { "TOCLIENT_HUD_SET_PARAM", 1, true }, // 0x4d
{ "TOCLIENT_BREATH", 0, true }, // 0x4e { "TOCLIENT_BREATH", 0, true }, // 0x4e
{ "TOCLIENT_SET_SKY", 0, true }, // 0x4f { "TOCLIENT_SET_SKY", 0, true }, // 0x4f
{ "TOCLIENT_OVERRIDE_DAY_NIGHT_RATIO", 0, true }, // 0x50 { "TOCLIENT_OVERRIDE_DAY_NIGHT_RATIO", 0, true }, // 0x50