Improve protocol-level receiving code (#9617)

This commit is contained in:
sfan5 2020-04-20 23:22:00 +02:00 committed by GitHub
parent c2ac7b1a83
commit 8ef239b448
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 142 additions and 146 deletions

@ -1173,7 +1173,9 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
m_bc_peerhandler(peerhandler) m_bc_peerhandler(peerhandler)
{ {
m_udpSocket.setTimeoutMs(5); /* Amount of time Receive() will wait for data, this is entirely different
* from the connection timeout */
m_udpSocket.setTimeoutMs(500);
m_sendThread->setParent(this); m_sendThread->setParent(this);
m_receiveThread->setParent(this); m_receiveThread->setParent(this);

@ -812,6 +812,14 @@ void *ConnectionReceiveThread::run()
ThreadIdentifier); ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]"); PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
// use IPv6 minimum allowed MTU as receive buffer size as this is
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled
// infrastructure
const unsigned int packet_maxsize = 1500;
SharedBuffer<u8> packetdata(packet_maxsize);
bool packet_queued = true;
#ifdef DEBUG_CONNECTION_KBPS #ifdef DEBUG_CONNECTION_KBPS
u64 curtime = porting::getTimeMs(); u64 curtime = porting::getTimeMs();
u64 lasttime = curtime; u64 lasttime = curtime;
@ -830,7 +838,7 @@ void *ConnectionReceiveThread::run()
#endif #endif
/* receive packets */ /* receive packets */
receive(); receive(packetdata, packet_queued);
#ifdef DEBUG_CONNECTION_KBPS #ifdef DEBUG_CONNECTION_KBPS
debug_print_timer += dtime; debug_print_timer += dtime;
@ -892,157 +900,142 @@ void *ConnectionReceiveThread::run()
} }
// Receive packets from the network and buffers and create ConnectionEvents // Receive packets from the network and buffers and create ConnectionEvents
void ConnectionReceiveThread::receive() void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
bool &packet_queued)
{ {
// use IPv6 minimum allowed MTU as receive buffer size as this is try {
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled // First, see if there any buffered packets we can process now
// infrastructure if (packet_queued) {
unsigned int packet_maxsize = 1500; bool data_left = true;
SharedBuffer<u8> packetdata(packet_maxsize); session_t peer_id;
SharedBuffer<u8> resultdata;
bool packet_queued = true; while (data_left) {
try {
unsigned int loop_count = 0; data_left = getFromBuffers(peer_id, resultdata);
if (data_left) {
/* first of all read packets from socket */ ConnectionEvent e;
/* check for incoming data available */ e.dataReceived(peer_id, resultdata);
while ((loop_count < 10) && m_connection->putEvent(e);
(m_connection->m_udpSocket.WaitData(50))) {
loop_count++;
try {
if (packet_queued) {
bool data_left = true;
session_t peer_id;
SharedBuffer<u8> resultdata;
while (data_left) {
try {
data_left = getFromBuffers(peer_id, resultdata);
if (data_left) {
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
}
catch (ProcessedSilentlyException &e) {
/* try reading again */
} }
} }
packet_queued = false; catch (ProcessedSilentlyException &e) {
} /* try reading again */
Address sender;
s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
packet_maxsize);
if ((received_size < BASE_HEADER_SIZE) ||
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
<< ", protocol: "
<< ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
<< std::endl);
continue;
}
session_t peer_id = readPeerId(*packetdata);
u8 channelnum = readChannel(*packetdata);
if (channelnum > CHANNEL_COUNT - 1) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid channel " << (u32)channelnum << std::endl);
throw InvalidIncomingDataException("Channel doesn't exist");
}
/* Try to identify peer by sender address (may happen on join) */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->lookupPeer(sender);
// We do not have to remind the peer of its
// peer id as the CONTROLTYPE_SET_PEER_ID
// command was sent reliably.
}
/* The peer was not found in our lists. Add it. */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
}
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " got packet from unknown peer_id: "
<< peer_id << " Ignoring." << std::endl);
continue;
}
// Validate peer address
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
if (peer_address != sender) {
LOG(derr_con << m_connection->getDesc()
<< m_connection->getDesc()
<< " Peer " << peer_id << " sending from different address."
" Ignoring." << std::endl);
continue;
}
} else {
bool invalid_address = true;
if (invalid_address) {
LOG(derr_con << m_connection->getDesc()
<< m_connection->getDesc()
<< " Peer " << peer_id << " unknown."
" Ignoring." << std::endl);
continue;
} }
} }
packet_queued = false;
peer->ResetTimeout();
Channel *channel = 0;
if (dynamic_cast<UDPPeer *>(&peer) != 0) {
channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
}
if (channel != 0) {
channel->UpdateBytesReceived(received_size);
}
// Throw the received packet to channel->processPacket()
// Make a new SharedBuffer from the data without the base headers
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
strippeddata.getSize());
try {
// Process it (the result is some data with no headers made by us)
SharedBuffer<u8> resultdata = processPacket
(channel, strippeddata, peer_id, channelnum, false);
LOG(dout_con << m_connection->getDesc()
<< " ProcessPacket from peer_id: " << peer_id
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
catch (ProcessedSilentlyException &e) {
}
catch (ProcessedQueued &e) {
packet_queued = true;
}
} }
catch (InvalidIncomingDataException &e) {
// Call Receive() to wait for incoming data
Address sender;
s32 received_size = m_connection->m_udpSocket.Receive(sender,
*packetdata, packetdata.getSize());
if (received_size < 0)
return;
if ((received_size < BASE_HEADER_SIZE) ||
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
<< ", protocol: "
<< ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
<< std::endl);
return;
}
session_t peer_id = readPeerId(*packetdata);
u8 channelnum = readChannel(*packetdata);
if (channelnum > CHANNEL_COUNT - 1) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid channel " << (u32)channelnum << std::endl);
return;
}
/* Try to identify peer by sender address (may happen on join) */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->lookupPeer(sender);
// We do not have to remind the peer of its
// peer id as the CONTROLTYPE_SET_PEER_ID
// command was sent reliably.
}
/* The peer was not found in our lists. Add it. */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
}
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " got packet from unknown peer_id: "
<< peer_id << " Ignoring." << std::endl);
return;
}
// Validate peer address
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
if (peer_address != sender) {
LOG(derr_con << m_connection->getDesc()
<< " Peer " << peer_id << " sending from different address."
" Ignoring." << std::endl);
return;
}
} else {
LOG(derr_con << m_connection->getDesc()
<< " Peer " << peer_id << " doesn't have an address?!"
" Ignoring." << std::endl);
return;
}
peer->ResetTimeout();
Channel *channel = nullptr;
if (dynamic_cast<UDPPeer *>(&peer)) {
channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
} else {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
" Ignoring." << std::endl);
return;
}
channel->UpdateBytesReceived(received_size);
// Throw the received packet to channel->processPacket()
// Make a new SharedBuffer from the data without the base headers
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
strippeddata.getSize());
try {
// Process it (the result is some data with no headers made by us)
SharedBuffer<u8> resultdata = processPacket
(channel, strippeddata, peer_id, channelnum, false);
LOG(dout_con << m_connection->getDesc()
<< " ProcessPacket from peer_id: " << peer_id
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
} }
catch (ProcessedSilentlyException &e) { catch (ProcessedSilentlyException &e) {
} }
catch (ProcessedQueued &e) {
// we set it to true anyway (see below)
}
/* Every time we receive a packet it can happen that a previously
* buffered packet is now ready to process. */
packet_queued = true;
}
catch (InvalidIncomingDataException &e) {
} }
} }
@ -1189,7 +1182,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
m_connection->TriggerSend(); m_connection->TriggerSend();
} catch (NotFoundException &e) { } catch (NotFoundException &e) {
LOG(derr_con << m_connection->getDesc() LOG(derr_con << m_connection->getDesc()
<< "WARNING: ACKed packet not in outgoing queue" << std::endl); << "WARNING: ACKed packet not in outgoing queue"
<< " seqnum=" << seqnum << std::endl);
channel->UpdatePacketTooLateCounter(); channel->UpdatePacketTooLateCounter();
} }

@ -101,7 +101,7 @@ public:
} }
private: private:
void receive(); void receive(SharedBuffer<u8> &packetdata, bool &packet_queued);
// Returns next data from a buffer if possible // Returns next data from a buffer if possible
// If found, returns true; if not, false. // If found, returns true; if not, false.