Rework packet receiving in ServerThread

Notably it tries to receive all queued packets
between server steps, not just one.
This commit is contained in:
sfan5 2019-11-14 17:38:15 +01:00
parent 97764600e2
commit c10952b574
5 changed files with 86 additions and 24 deletions

@ -1323,16 +1323,21 @@ void Connection::Disconnect()
putCommand(c); putCommand(c);
} }
void Connection::Receive(NetworkPacket* pkt) bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
{ {
/*
Note that this function can potentially wait infinitely if non-data
events keep happening before the timeout expires.
This is not considered to be a problem (is it?)
*/
for(;;) { for(;;) {
ConnectionEvent e = waitEvent(m_bc_receive_timeout); ConnectionEvent e = waitEvent(timeout);
if (e.type != CONNEVENT_NONE) if (e.type != CONNEVENT_NONE)
LOG(dout_con << getDesc() << ": Receive: got event: " LOG(dout_con << getDesc() << ": Receive: got event: "
<< e.describe() << std::endl); << e.describe() << std::endl);
switch(e.type) { switch(e.type) {
case CONNEVENT_NONE: case CONNEVENT_NONE:
throw NoIncomingDataException("No incoming data"); return false;
case CONNEVENT_DATA_RECEIVED: case CONNEVENT_DATA_RECEIVED:
// Data size is lesser than command size, ignoring packet // Data size is lesser than command size, ignoring packet
if (e.data.getSize() < 2) { if (e.data.getSize() < 2) {
@ -1340,7 +1345,7 @@ void Connection::Receive(NetworkPacket* pkt)
} }
pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id); pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
return; return true;
case CONNEVENT_PEER_ADDED: { case CONNEVENT_PEER_ADDED: {
UDPPeer tmp(e.peer_id, e.address, this); UDPPeer tmp(e.peer_id, e.address, this);
if (m_bc_peerhandler) if (m_bc_peerhandler)
@ -1358,7 +1363,19 @@ void Connection::Receive(NetworkPacket* pkt)
"(port already in use?)"); "(port already in use?)");
} }
} }
throw NoIncomingDataException("No incoming data"); return false;
}
void Connection::Receive(NetworkPacket *pkt)
{
bool any = Receive(pkt, m_bc_receive_timeout);
if (!any)
throw NoIncomingDataException("No incoming data");
}
bool Connection::TryReceive(NetworkPacket *pkt)
{
return Receive(pkt, 0);
} }
void Connection::Send(session_t peer_id, u8 channelnum, void Connection::Send(session_t peer_id, u8 channelnum,

@ -771,6 +771,7 @@ public:
bool Connected(); bool Connected();
void Disconnect(); void Disconnect();
void Receive(NetworkPacket* pkt); void Receive(NetworkPacket* pkt);
bool TryReceive(NetworkPacket *pkt);
void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable); void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
session_t GetPeerID() const { return m_peer_id; } session_t GetPeerID() const { return m_peer_id; }
Address GetPeerAddress(session_t peer_id); Address GetPeerAddress(session_t peer_id);
@ -803,6 +804,8 @@ protected:
UDPSocket m_udpSocket; UDPSocket m_udpSocket;
MutexedQueue<ConnectionCommand> m_command_queue; MutexedQueue<ConnectionCommand> m_command_queue;
bool Receive(NetworkPacket *pkt, u32 timeout);
void putEvent(ConnectionEvent &e); void putEvent(ConnectionEvent &e);
void TriggerSend(); void TriggerSend();

@ -66,6 +66,15 @@ void NetworkPacket::putRawPacket(u8 *data, u32 datasize, session_t peer_id)
memcpy(m_data.data(), &data[2], m_datasize); memcpy(m_data.data(), &data[2], m_datasize);
} }
void NetworkPacket::clear()
{
m_data.clear();
m_datasize = 0;
m_read_offset = 0;
m_command = 0;
m_peer_id = 0;
}
const char* NetworkPacket::getString(u32 from_offset) const char* NetworkPacket::getString(u32 from_offset)
{ {
checkReadOffset(from_offset, 0); checkReadOffset(from_offset, 0);

@ -35,6 +35,7 @@ public:
~NetworkPacket(); ~NetworkPacket();
void putRawPacket(u8 *data, u32 datasize, session_t peer_id); void putRawPacket(u8 *data, u32 datasize, session_t peer_id);
void clear();
// Getters // Getters
u32 getSize() const { return m_datasize; } u32 getSize() const { return m_datasize; }

@ -93,6 +93,15 @@ void *ServerThread::run()
{ {
BEGIN_DEBUG_EXCEPTION_HANDLER BEGIN_DEBUG_EXCEPTION_HANDLER
/*
* The real business of the server happens on the ServerThread.
* How this works:
* AsyncRunStep() runs an actual server step as soon as enough time has
* passed (dedicated_server_loop keeps track of that).
* Receive() blocks at least(!) 30ms waiting for a packet (so this loop
* doesn't busy wait) and will process any remaining packets.
*/
m_server->AsyncRunStep(true); m_server->AsyncRunStep(true);
while (!stopRequested()) { while (!stopRequested()) {
@ -101,7 +110,6 @@ void *ServerThread::run()
m_server->Receive(); m_server->Receive();
} catch (con::NoIncomingDataException &e) {
} catch (con::PeerNotFoundException &e) { } catch (con::PeerNotFoundException &e) {
infostream<<"Server: PeerNotFoundException"<<std::endl; infostream<<"Server: PeerNotFoundException"<<std::endl;
} catch (ClientNotFoundException &e) { } catch (ClientNotFoundException &e) {
@ -911,24 +919,43 @@ void Server::AsyncRunStep(bool initial_step)
void Server::Receive() void Server::Receive()
{ {
session_t peer_id = 0; NetworkPacket pkt;
try { session_t peer_id;
NetworkPacket pkt; bool first = true;
m_con->Receive(&pkt); for (;;) {
peer_id = pkt.getPeerId(); pkt.clear();
ProcessData(&pkt); peer_id = 0;
} catch (const con::InvalidIncomingDataException &e) { try {
infostream << "Server::Receive(): InvalidIncomingDataException: what()=" /*
<< e.what() << std::endl; In the first iteration *wait* for a packet, afterwards process
} catch (const SerializationError &e) { all packets that are immediately available (no waiting).
infostream << "Server::Receive(): SerializationError: what()=" */
<< e.what() << std::endl; if (first) {
} catch (const ClientStateError &e) { m_con->Receive(&pkt);
errorstream << "ProcessData: peer=" << peer_id << e.what() << std::endl; first = false;
DenyAccess_Legacy(peer_id, L"Your client sent something server didn't expect." } else {
L"Try reconnecting or updating your client"); if (!m_con->TryReceive(&pkt))
} catch (const con::PeerNotFoundException &e) { return;
// Do nothing }
peer_id = pkt.getPeerId();
ProcessData(&pkt);
} catch (const con::InvalidIncomingDataException &e) {
infostream << "Server::Receive(): InvalidIncomingDataException: what()="
<< e.what() << std::endl;
} catch (const SerializationError &e) {
infostream << "Server::Receive(): SerializationError: what()="
<< e.what() << std::endl;
} catch (const ClientStateError &e) {
errorstream << "ProcessData: peer=" << peer_id << " what()="
<< e.what() << std::endl;
DenyAccess_Legacy(peer_id, L"Your client sent something server didn't expect."
L"Try reconnecting or updating your client");
} catch (const con::PeerNotFoundException &e) {
// Do nothing
} catch (const con::NoIncomingDataException &e) {
return;
}
} }
} }
@ -3728,6 +3755,11 @@ void dedicated_server_loop(Server &server, bool &kill)
static thread_local const float profiler_print_interval = static thread_local const float profiler_print_interval =
g_settings->getFloat("profiler_print_interval"); g_settings->getFloat("profiler_print_interval");
/*
* The dedicated server loop only does time-keeping (in Server::step) and
* provides a way to main.cpp to kill the server externally (bool &kill).
*/
for(;;) { for(;;) {
// This is kind of a hack but can be done like this // This is kind of a hack but can be done like this
// because server.step() is very light // because server.step() is very light