Fixed minetest reliable udp implementation (compatible to old clients)

This commit is contained in:
sapier 2014-01-06 20:05:28 +01:00
parent 8b0b857eaa
commit 9edb91da57
8 changed files with 2844 additions and 1156 deletions

@ -345,7 +345,10 @@
# Number of emerge threads to use. Make this field blank, or increase this number, to use multiple threads. # Number of emerge threads to use. Make this field blank, or increase this number, to use multiple threads.
# On multiprocessor systems, this will improve mapgen speed greatly, at the cost of slightly buggy caves. # On multiprocessor systems, this will improve mapgen speed greatly, at the cost of slightly buggy caves.
#num_emerge_threads = 1 #num_emerge_threads = 1
# maximum number of packets sent per send step, if you have a slow connection
# try reducing it, but don't reduce it to a number below double of targeted
# client number
#max_packets_per_iteration = 1024
# #
# Physics stuff # Physics stuff
# #

@ -538,7 +538,7 @@ void Client::step(float dtime)
writeU16(&data[53], CLIENT_PROTOCOL_VERSION_MAX); writeU16(&data[53], CLIENT_PROTOCOL_VERSION_MAX);
// Send as unreliable // Send as unreliable
Send(0, data, false); Send(1, data, false);
} }
// Not connected, return // Not connected, return
@ -597,7 +597,7 @@ void Client::step(float dtime)
writeV3S16(&reply[2+1+6*k], *j); writeV3S16(&reply[2+1+6*k], *j);
k++; k++;
} }
m_con.Send(PEER_ID_SERVER, 1, reply, true); m_con.Send(PEER_ID_SERVER, 2, reply, true);
if(i == deleted_blocks.end()) if(i == deleted_blocks.end())
break; break;
@ -745,7 +745,7 @@ void Client::step(float dtime)
reply[2] = 1; reply[2] = 1;
writeV3S16(&reply[3], r.p); writeV3S16(&reply[3], r.p);
// Send as reliable // Send as reliable
m_con.Send(PEER_ID_SERVER, 1, reply, true); m_con.Send(PEER_ID_SERVER, 2, reply, true);
} }
} }
if(num_processed_meshes > 0) if(num_processed_meshes > 0)
@ -840,7 +840,7 @@ void Client::step(float dtime)
std::string s = os.str(); std::string s = os.str();
SharedBuffer<u8> data((u8*)s.c_str(), s.size()); SharedBuffer<u8> data((u8*)s.c_str(), s.size());
// Send as reliable // Send as reliable
Send(0, data, true); Send(1, data, true);
} }
} }
} }
@ -957,7 +957,7 @@ void Client::request_media(const std::list<std::string> &file_requests)
std::string s = os.str(); std::string s = os.str();
SharedBuffer<u8> data((u8*)s.c_str(), s.size()); SharedBuffer<u8> data((u8*)s.c_str(), s.size());
// Send as reliable // Send as reliable
Send(0, data, true); Send(1, data, true);
infostream<<"Client: Sending media request list to server (" infostream<<"Client: Sending media request list to server ("
<<file_requests.size()<<" files)"<<std::endl; <<file_requests.size()<<" files)"<<std::endl;
} }
@ -970,7 +970,7 @@ void Client::received_media()
std::string s = os.str(); std::string s = os.str();
SharedBuffer<u8> data((u8*)s.c_str(), s.size()); SharedBuffer<u8> data((u8*)s.c_str(), s.size());
// Send as reliable // Send as reliable
Send(0, data, true); Send(1, data, true);
infostream<<"Client: Notifying server that we received all media" infostream<<"Client: Notifying server that we received all media"
<<std::endl; <<std::endl;
} }

File diff suppressed because it is too large Load Diff

@ -27,6 +27,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "util/pointer.h" #include "util/pointer.h"
#include "util/container.h" #include "util/container.h"
#include "util/thread.h" #include "util/thread.h"
#include "util/numeric.h"
#include <iostream> #include <iostream>
#include <fstream> #include <fstream>
#include <list> #include <list>
@ -110,26 +111,74 @@ public:
{} {}
}; };
#define SEQNUM_MAX 65535 class ProcessedQueued : public BaseException
inline bool seqnum_higher(u16 higher, u16 lower)
{ {
if(lower > higher && lower - higher > SEQNUM_MAX/2){ public:
return true; ProcessedQueued(const char *s):
BaseException(s)
{}
};
class IncomingDataCorruption : public BaseException
{
public:
IncomingDataCorruption(const char *s):
BaseException(s)
{}
};
typedef enum MTProtocols {
PRIMARY,
UDP,
MINETEST_RELIABLE_UDP
} MTProtocols;
#define SEQNUM_MAX 65535
inline bool seqnum_higher(u16 totest, u16 base)
{
if (totest > base)
{
if((totest - base) > (SEQNUM_MAX/2))
return false;
else
return true;
}
else
{
if((base - totest) > (SEQNUM_MAX/2))
return true;
else
return false;
}
}
inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
{
u16 window_start = next;
u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
if (window_start < window_end)
{
return ((seqnum >= window_start) && (seqnum < window_end));
}
else
{
return ((seqnum < window_end) || (seqnum >= window_start));
} }
return (higher > lower);
} }
struct BufferedPacket struct BufferedPacket
{ {
BufferedPacket(u8 *a_data, u32 a_size): BufferedPacket(u8 *a_data, u32 a_size):
data(a_data, a_size), time(0.0), totaltime(0.0) data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
{} {}
BufferedPacket(u32 a_size): BufferedPacket(u32 a_size):
data(a_size), time(0.0), totaltime(0.0) data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
{} {}
SharedBuffer<u8> data; // Data of the packet, including headers SharedBuffer<u8> data; // Data of the packet, including headers
float time; // Seconds from buffering the packet or re-sending float time; // Seconds from buffering the packet or re-sending
float totaltime; // Seconds from buffering the packet float totaltime; // Seconds from buffering the packet
unsigned int absolute_send_time;
Address address; // Sender or destination Address address; // Sender or destination
}; };
@ -223,6 +272,8 @@ controltype and data description:
#define CONTROLTYPE_SET_PEER_ID 1 #define CONTROLTYPE_SET_PEER_ID 1
#define CONTROLTYPE_PING 2 #define CONTROLTYPE_PING 2
#define CONTROLTYPE_DISCO 3 #define CONTROLTYPE_DISCO 3
#define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
/* /*
ORIGINAL: This is a plain packet with no control and no error ORIGINAL: This is a plain packet with no control and no error
checking at all. checking at all.
@ -261,7 +312,6 @@ with a buffer in the receiving and transmitting end.
*/ */
#define TYPE_RELIABLE 3 #define TYPE_RELIABLE 3
#define RELIABLE_HEADER_SIZE 3 #define RELIABLE_HEADER_SIZE 3
//#define SEQNUM_INITIAL 0x10
#define SEQNUM_INITIAL 65500 #define SEQNUM_INITIAL 65500
/* /*
@ -275,23 +325,35 @@ class ReliablePacketBuffer
{ {
public: public:
ReliablePacketBuffer(); ReliablePacketBuffer();
void print();
bool empty(); bool getFirstSeqnum(u16& result);
u32 size();
RPBSearchResult findPacket(u16 seqnum);
RPBSearchResult notFound();
bool getFirstSeqnum(u16 *result);
BufferedPacket popFirst(); BufferedPacket popFirst();
BufferedPacket popSeqnum(u16 seqnum); BufferedPacket popSeqnum(u16 seqnum);
void insert(BufferedPacket &p); void insert(BufferedPacket &p,u16 next_expected);
void incrementTimeouts(float dtime); void incrementTimeouts(float dtime);
void resetTimedOuts(float timeout); std::list<BufferedPacket> getTimedOuts(float timeout,
bool anyTotaltimeReached(float timeout); unsigned int max_packets);
std::list<BufferedPacket> getTimedOuts(float timeout);
void print();
bool empty();
bool containsPacket(u16 seqnum);
RPBSearchResult notFound();
u32 size();
private: private:
RPBSearchResult findPacket(u16 seqnum);
std::list<BufferedPacket> m_list; std::list<BufferedPacket> m_list;
u16 m_list_size; u16 m_list_size;
u16 m_oldest_non_answered_ack;
JMutex m_list_mutex;
unsigned int writeptr;
}; };
/* /*
@ -313,27 +375,207 @@ public:
private: private:
// Key is seqnum // Key is seqnum
std::map<u16, IncomingSplitPacket*> m_buf; std::map<u16, IncomingSplitPacket*> m_buf;
JMutex m_map_mutex;
}; };
class Connection; struct OutgoingPacket
struct Channel
{ {
Channel(); u16 peer_id;
~Channel(); u8 channelnum;
SharedBuffer<u8> data;
bool reliable;
bool ack;
u16 next_outgoing_seqnum; OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
u16 next_incoming_seqnum; bool reliable_,bool ack_=false):
u16 next_outgoing_split_seqnum; peer_id(peer_id_),
channelnum(channelnum_),
data(data_),
reliable(reliable_),
ack(ack_)
{
}
};
enum ConnectionCommandType{
CONNCMD_NONE,
CONNCMD_SERVE,
CONNCMD_CONNECT,
CONNCMD_DISCONNECT,
CONNCMD_DISCONNECT_PEER,
CONNCMD_SEND,
CONNCMD_SEND_TO_ALL,
CONNCMD_DELETE_PEER,
CONCMD_ACK,
CONCMD_CREATE_PEER,
CONCMD_DISABLE_LEGACY
};
struct ConnectionCommand
{
enum ConnectionCommandType type;
u16 port;
Address address;
u16 peer_id;
u8 channelnum;
Buffer<u8> data;
bool reliable;
bool raw;
ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
void serve(u16 port_)
{
type = CONNCMD_SERVE;
port = port_;
}
void connect(Address address_)
{
type = CONNCMD_CONNECT;
address = address_;
}
void disconnect()
{
type = CONNCMD_DISCONNECT;
}
void disconnect_peer(u16 peer_id_)
{
type = CONNCMD_DISCONNECT_PEER;
peer_id = peer_id_;
}
void send(u16 peer_id_, u8 channelnum_,
SharedBuffer<u8> data_, bool reliable_)
{
type = CONNCMD_SEND;
peer_id = peer_id_;
channelnum = channelnum_;
data = data_;
reliable = reliable_;
}
void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
{
type = CONNCMD_SEND_TO_ALL;
channelnum = channelnum_;
data = data_;
reliable = reliable_;
}
void deletePeer(u16 peer_id_)
{
type = CONNCMD_DELETE_PEER;
peer_id = peer_id_;
}
void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
{
type = CONCMD_ACK;
peer_id = peer_id_;
channelnum = channelnum_;
data = data_;
reliable = false;
}
void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
{
type = CONCMD_CREATE_PEER;
peer_id = peer_id_;
data = data_;
channelnum = 0;
reliable = true;
raw = true;
}
void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
{
type = CONCMD_DISABLE_LEGACY;
peer_id = peer_id_;
data = data_;
channelnum = 0;
reliable = true;
raw = true;
}
};
class Channel
{
public:
u16 readNextIncomingSeqNum();
u16 incNextIncomingSeqNum();
u16 getOutgoingSequenceNumber(bool& successfull);
u16 readOutgoingSequenceNumber();
bool putBackSequenceNumber(u16);
u16 readNextSplitSeqNum();
void setNextSplitSeqNum(u16 seqnum);
// This is for buffering the incoming packets that are coming in // This is for buffering the incoming packets that are coming in
// the wrong order // the wrong order
ReliablePacketBuffer incoming_reliables; ReliablePacketBuffer incoming_reliables;
// This is for buffering the sent packets so that the sender can // This is for buffering the sent packets so that the sender can
// re-send them if no ACK is received // re-send them if no ACK is received
ReliablePacketBuffer outgoing_reliables; ReliablePacketBuffer outgoing_reliables_sent;
//queued reliable packets
Queue<BufferedPacket> queued_reliables;
//queue commands prior splitting to packets
Queue<ConnectionCommand> queued_commands;
IncomingSplitBuffer incoming_splits; IncomingSplitBuffer incoming_splits;
Channel();
~Channel();
void UpdatePacketLossCounter(unsigned int count);
void UpdatePacketTooLateCounter();
void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
void UpdateBytesLost(unsigned int bytes);
void UpdateTimers(float dtime);
const float getCurrentDownloadRateKB()
{ JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
const float getMaxDownloadRateKB()
{ JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
const float getCurrentLossRateKB()
{ JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
const float getMaxLossRateKB()
{ JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
const float getAvgDownloadRateKB()
{ JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
const float getAvgLossRateKB()
{ JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
const unsigned int getWindowSize() const { return window_size; };
void setWindowSize(unsigned int size) { window_size = size; };
private:
JMutex m_internal_mutex;
unsigned int window_size;
u16 next_incoming_seqnum;
u16 next_outgoing_seqnum;
u16 next_outgoing_split_seqnum;
unsigned int current_packet_loss;
unsigned int current_packet_too_late;
unsigned int current_packet_successfull;
float packet_loss_counter;
unsigned int current_bytes_transfered;
unsigned int current_bytes_lost;
float max_kbps;
float cur_kbps;
float avg_kbps;
float max_kbps_lost;
float cur_kbps_lost;
float avg_kbps_lost;
float bpm_counter;
}; };
class Peer; class Peer;
@ -360,71 +602,232 @@ public:
virtual void deletingPeer(Peer *peer, bool timeout) = 0; virtual void deletingPeer(Peer *peer, bool timeout) = 0;
}; };
class Peer class PeerHelper
{
public:
PeerHelper();
PeerHelper(Peer* peer);
~PeerHelper();
PeerHelper& operator=(Peer* peer);
Peer* operator->() const;
bool operator!();
Peer* operator&() const;
bool operator!=(void* ptr);
private:
Peer* m_peer;
};
class Connection;
typedef enum rtt_stat_type {
MIN_RTT,
MAX_RTT,
AVG_RTT,
MIN_JITTER,
MAX_JITTER,
AVG_JITTER
} rtt_stat_type;
class Peer {
public:
friend class PeerHelper;
Peer(Address address_,u16 id_,Connection* connection) :
id(id_),
m_increment_packets_remaining(9),
m_increment_bytes_remaining(0),
m_pending_deletion(false),
m_connection(connection),
address(address_),
m_ping_timer(0.0),
m_last_rtt(-1.0),
m_usage(0),
m_timeout_counter(0.0),
m_last_timeout_check(porting::getTimeMs()),
m_has_sent_with_id(false)
{
m_rtt.avg_rtt = -1.0;
m_rtt.jitter_avg = -1.0;
m_rtt.jitter_max = 0.0;
m_rtt.max_rtt = 0.0;
m_rtt.jitter_min = FLT_MAX;
m_rtt.min_rtt = FLT_MAX;
};
virtual ~Peer() {
JMutexAutoLock usage_lock(m_exclusive_access_mutex);
assert(m_usage == 0);
};
// Unique id of the peer
u16 id;
void Drop();
virtual void PutReliableSendCommand(ConnectionCommand &c,
unsigned int max_packet_size) {};
virtual bool isActive() { return false; };
virtual bool getAddress(MTProtocols type, Address& toset) = 0;
void ResetTimeout()
{JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
bool isTimedOut(float timeout);
void setSentWithID()
{ JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
bool hasSentWithID()
{ JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
unsigned int m_increment_packets_remaining;
unsigned int m_increment_bytes_remaining;
virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
BufferedPacket toadd,
bool reliable)
{
fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
return SharedBuffer<u8>(0);
};
virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
virtual float getStat(rtt_stat_type type) const {
switch (type) {
case MIN_RTT:
return m_rtt.min_rtt;
case MAX_RTT:
return m_rtt.max_rtt;
case AVG_RTT:
return m_rtt.avg_rtt;
case MIN_JITTER:
return m_rtt.jitter_min;
case MAX_JITTER:
return m_rtt.jitter_max;
case AVG_JITTER:
return m_rtt.jitter_avg;
}
return -1;
}
protected:
virtual void reportRTT(float rtt) {};
void RTTStatistics(float rtt,
std::string profiler_id="",
unsigned int num_samples=1000);
bool IncUseCount();
void DecUseCount();
JMutex m_exclusive_access_mutex;
bool m_pending_deletion;
Connection* m_connection;
// Address of the peer
Address address;
// Ping timer
float m_ping_timer;
private:
struct rttstats {
float jitter_min;
float jitter_max;
float jitter_avg;
float min_rtt;
float max_rtt;
float avg_rtt;
};
rttstats m_rtt;
float m_last_rtt;
// current usage count
unsigned int m_usage;
// Seconds from last receive
float m_timeout_counter;
u32 m_last_timeout_check;
bool m_has_sent_with_id;
};
class UDPPeer : public Peer
{ {
public: public:
Peer(u16 a_id, Address a_address); friend class PeerHelper;
virtual ~Peer(); friend class ConnectionReceiveThread;
friend class ConnectionSendThread;
UDPPeer(u16 a_id, Address a_address, Connection* connection);
virtual ~UDPPeer();
void PutReliableSendCommand(ConnectionCommand &c,
unsigned int max_packet_size);
bool isActive()
{ return ((hasSentWithID()) && (!m_pending_deletion)); };
bool getAddress(MTProtocols type, Address& toset);
void setNonLegacyPeer()
{ m_legacy_peer = false; }
bool getLegacyPeer()
{ return m_legacy_peer; }
u16 getNextSplitSequenceNumber(u8 channel);
void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
SharedBuffer<u8> addSpiltPacket(u8 channel,
BufferedPacket toadd,
bool reliable);
protected:
/* /*
Calculates avg_rtt and resend_timeout. Calculates avg_rtt and resend_timeout.
rtt=-1 only recalculates resend_timeout rtt=-1 only recalculates resend_timeout
*/ */
void reportRTT(float rtt); void reportRTT(float rtt);
Channel channels[CHANNEL_COUNT]; void RunCommandQueues(
unsigned int max_packet_size,
unsigned int maxcommands,
unsigned int maxtransfer);
// Address of the peer float getResendTimeout()
Address address; { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
// Unique id of the peer
u16 id; void setResendTimeout(float timeout)
// Seconds from last receive { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
float timeout_counter; bool Ping(float dtime,SharedBuffer<u8>& data);
// Ping timer
float ping_timer; Channel channels[CHANNEL_COUNT];
private:
// This is changed dynamically // This is changed dynamically
float resend_timeout; float resend_timeout;
// Updated when an ACK is received
float avg_rtt;
// This is set to true when the peer has actually sent something
// with the id we have given to it
bool has_sent_with_id;
float m_sendtime_accu;
float m_max_packets_per_second;
int m_num_sent;
int m_max_num_sent;
// Updated from configuration by Connection bool processReliableSendCommand(
float congestion_control_aim_rtt; ConnectionCommand &c,
float congestion_control_max_rate; unsigned int max_packet_size);
float congestion_control_min_rate;
private: bool m_legacy_peer;
}; };
/* /*
Connection Connection
*/ */
struct OutgoingPacket
{
u16 peer_id;
u8 channelnum;
SharedBuffer<u8> data;
bool reliable;
OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
bool reliable_):
peer_id(peer_id_),
channelnum(channelnum_),
data(data_),
reliable(reliable_)
{
}
};
enum ConnectionEventType{ enum ConnectionEventType{
CONNEVENT_NONE, CONNEVENT_NONE,
CONNEVENT_DATA_RECEIVED, CONNEVENT_DATA_RECEIVED,
@ -485,76 +888,108 @@ struct ConnectionEvent
} }
}; };
enum ConnectionCommandType{ class ConnectionSendThread : public JThread {
CONNCMD_NONE,
CONNCMD_SERVE, public:
CONNCMD_CONNECT, friend class UDPPeer;
CONNCMD_DISCONNECT,
CONNCMD_SEND, ConnectionSendThread(Connection* parent,
CONNCMD_SEND_TO_ALL, unsigned int max_packet_size, float timeout);
CONNCMD_DELETE_PEER,
void * Thread ();
void Trigger();
void setPeerTimeout(float peer_timeout)
{ m_timeout = peer_timeout; }
private:
void runTimeouts (float dtime);
void rawSend (const BufferedPacket &packet);
bool rawSendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
void processReliableCommand (ConnectionCommand &c);
void processNonReliableCommand (ConnectionCommand &c);
void serve (u16 port);
void connect (Address address);
void disconnect ();
void disconnect_peer(u16 peer_id);
void send (u16 peer_id, u8 channelnum,
SharedBuffer<u8> data);
void sendReliable (ConnectionCommand &c);
void sendToAll (u8 channelnum,
SharedBuffer<u8> data);
void sendToAllReliable(ConnectionCommand &c);
void sendPackets (float dtime);
void sendAsPacket (u16 peer_id, u8 channelnum,
SharedBuffer<u8> data,bool ack=false);
void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
bool packetsQueued();
Connection* m_connection;
unsigned int m_max_packet_size;
float m_timeout;
Queue<OutgoingPacket> m_outgoing_queue;
JSemaphore m_send_sleep_semaphore;
unsigned int m_iteration_packets_avaialble;
unsigned int m_max_commands_per_iteration;
unsigned int m_max_data_packets_per_iteration;
unsigned int m_max_packets_requeued;
}; };
struct ConnectionCommand class ConnectionReceiveThread : public JThread {
{ public:
enum ConnectionCommandType type; ConnectionReceiveThread(Connection* parent,
u16 port; unsigned int max_packet_size);
Address address;
u16 peer_id;
u8 channelnum;
Buffer<u8> data;
bool reliable;
ConnectionCommand(): type(CONNCMD_NONE) {}
void serve(u16 port_) void * Thread ();
{
type = CONNCMD_SERVE; private:
port = port_; void receive ();
}
void connect(Address address_) // Returns next data from a buffer if possible
{ // If found, returns true; if not, false.
type = CONNCMD_CONNECT; // If found, sets peer_id and dst
address = address_; bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
}
void disconnect() bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
{ SharedBuffer<u8> &dst);
type = CONNCMD_DISCONNECT;
} /*
void send(u16 peer_id_, u8 channelnum_, Processes a packet with the basic header stripped out.
SharedBuffer<u8> data_, bool reliable_) Parameters:
{ packetdata: Data in packet (with no base headers)
type = CONNCMD_SEND; peer_id: peer id of the sender of the packet in question
peer_id = peer_id_; channelnum: channel on which the packet was sent
channelnum = channelnum_; reliable: true if recursing into a reliable packet
data = data_; */
reliable = reliable_; SharedBuffer<u8> processPacket(Channel *channel,
} SharedBuffer<u8> packetdata, u16 peer_id,
void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_) u8 channelnum, bool reliable);
{
type = CONNCMD_SEND_TO_ALL;
channelnum = channelnum_; Connection* m_connection;
data = data_; unsigned int m_max_packet_size;
reliable = reliable_;
}
void deletePeer(u16 peer_id_)
{
type = CONNCMD_DELETE_PEER;
peer_id = peer_id_;
}
}; };
class Connection: public JThread class Connection
{ {
public: public:
friend class ConnectionSendThread;
friend class ConnectionReceiveThread;
Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6); Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6, Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
PeerHandler *peerhandler); PeerHandler *peerhandler);
~Connection(); ~Connection();
void * Thread();
/* Interface */ /* Interface */
ConnectionEvent getEvent(); ConnectionEvent getEvent();
ConnectionEvent waitEvent(u32 timeout_ms); ConnectionEvent waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommand &c); void putCommand(ConnectionCommand &c);
@ -572,68 +1007,53 @@ public:
Address GetPeerAddress(u16 peer_id); Address GetPeerAddress(u16 peer_id);
float GetPeerAvgRTT(u16 peer_id); float GetPeerAvgRTT(u16 peer_id);
void DeletePeer(u16 peer_id); void DeletePeer(u16 peer_id);
const u32 GetProtocolID() const { return m_protocol_id; };
private: const std::string getDesc();
void putEvent(ConnectionEvent &e);
void processCommand(ConnectionCommand &c); protected:
void send(float dtime); PeerHelper getPeer(u16 peer_id);
void receive(); PeerHelper getPeerNoEx(u16 peer_id);
void runTimeouts(float dtime); u16 lookupPeer(Address& sender);
void serve(u16 port);
void connect(Address address); u16 createPeer(Address& sender, MTProtocols protocol, int fd);
void disconnect(); UDPPeer* createServerPeer(Address& sender);
void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
void sendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
void rawSendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
void rawSend(const BufferedPacket &packet);
Peer* getPeer(u16 peer_id);
Peer* getPeerNoEx(u16 peer_id);
std::list<Peer*> getPeers();
bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
// Returns next data from a buffer if possible
// If found, returns true; if not, false.
// If found, sets peer_id and dst
bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
SharedBuffer<u8> &dst);
/*
Processes a packet with the basic header stripped out.
Parameters:
packetdata: Data in packet (with no base headers)
peer_id: peer id of the sender of the packet in question
channelnum: channel on which the packet was sent
reliable: true if recursing into a reliable packet
*/
SharedBuffer<u8> processPacket(Channel *channel,
SharedBuffer<u8> packetdata, u16 peer_id,
u8 channelnum, bool reliable);
bool deletePeer(u16 peer_id, bool timeout); bool deletePeer(u16 peer_id, bool timeout);
Queue<OutgoingPacket> m_outgoing_queue; void SetPeerID(u16 id){ m_peer_id = id; }
MutexedQueue<ConnectionEvent> m_event_queue;
void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
void PrintInfo(std::ostream &out);
void PrintInfo();
std::list<u16> getPeerIDs();
UDPSocket m_udpSocket;
MutexedQueue<ConnectionCommand> m_command_queue; MutexedQueue<ConnectionCommand> m_command_queue;
u32 m_protocol_id; void putEvent(ConnectionEvent &e);
u32 m_max_packet_size;
float m_timeout; private:
UDPSocket m_socket; std::list<Peer*> getPeers();
MutexedQueue<ConnectionEvent> m_event_queue;
u16 m_peer_id; u16 m_peer_id;
u32 m_protocol_id;
std::map<u16, Peer*> m_peers; std::map<u16, Peer*> m_peers;
JMutex m_peers_mutex; JMutex m_peers_mutex;
ConnectionSendThread m_sendThread;
ConnectionReceiveThread m_receiveThread;
JMutex m_info_mutex;
// Backwards compatibility // Backwards compatibility
PeerHandler *m_bc_peerhandler; PeerHandler *m_bc_peerhandler;
int m_bc_receive_timeout; int m_bc_receive_timeout;
void SetPeerID(u16 id){ m_peer_id = id; } bool m_shutting_down;
u32 GetProtocolID(){ return m_protocol_id; }
void PrintInfo(std::ostream &out);
void PrintInfo();
std::string getDesc();
u16 m_indentation;
}; };
} // namespace } // namespace

@ -172,6 +172,8 @@ void set_default_settings(Settings *settings)
// Server stuff // Server stuff
// "map-dir" doesn't exist by default. // "map-dir" doesn't exist by default.
settings->setDefault("workaround_window_size","5");
settings->setDefault("max_packets_per_iteration","1024");
settings->setDefault("port", "30000"); settings->setDefault("port", "30000");
settings->setDefault("default_game", "minetest"); settings->setDefault("default_game", "minetest");
settings->setDefault("motd", ""); settings->setDefault("motd", "");

@ -93,6 +93,7 @@ static std::string get_lev_string(enum LogMessageLevel lev)
void log_printline(enum LogMessageLevel lev, const std::string &text) void log_printline(enum LogMessageLevel lev, const std::string &text)
{ {
log_threadnamemutex.Lock();
std::string threadname = "(unknown thread)"; std::string threadname = "(unknown thread)";
std::map<threadid_t, std::string>::const_iterator i; std::map<threadid_t, std::string>::const_iterator i;
i = log_threadnames.find(get_current_thread_id()); i = log_threadnames.find(get_current_thread_id());
@ -108,6 +109,7 @@ void log_printline(enum LogMessageLevel lev, const std::string &text)
out->printLog(os.str(), lev); out->printLog(os.str(), lev);
out->printLog(lev, text); out->printLog(lev, text);
} }
log_threadnamemutex.Unlock();
} }
class Logbuf : public std::streambuf class Logbuf : public std::streambuf

@ -90,14 +90,15 @@ public:
void * ServerThread::Thread() void * ServerThread::Thread()
{ {
ThreadStarted();
log_register_thread("ServerThread"); log_register_thread("ServerThread");
DSTACK(__FUNCTION_NAME); DSTACK(__FUNCTION_NAME);
BEGIN_DEBUG_EXCEPTION_HANDLER BEGIN_DEBUG_EXCEPTION_HANDLER
m_server->AsyncRunStep(true);
ThreadStarted();
while(!StopRequested()) while(!StopRequested())
{ {
try{ try{
@ -1018,7 +1019,7 @@ void Server::step(float dtime)
} }
} }
void Server::AsyncRunStep() void Server::AsyncRunStep(bool initial_step)
{ {
DSTACK(__FUNCTION_NAME); DSTACK(__FUNCTION_NAME);
@ -1035,7 +1036,7 @@ void Server::AsyncRunStep()
SendBlocks(dtime); SendBlocks(dtime);
} }
if(dtime < 0.001) if((dtime < 0.001) && (initial_step == false))
return; return;
g_profiler->add("Server::AsyncRunStep with dtime (num)", 1); g_profiler->add("Server::AsyncRunStep with dtime (num)", 1);
@ -1528,7 +1529,7 @@ void Server::AsyncRunStep()
memcpy((char*)&reply[2], unreliable_data.c_str(), memcpy((char*)&reply[2], unreliable_data.c_str(),
unreliable_data.size()); unreliable_data.size());
// Send as unreliable // Send as unreliable
m_con.Send(client->peer_id, 0, reply, false); m_con.Send(client->peer_id, 1, reply, false);
} }
/*if(reliable_data.size() > 0 || unreliable_data.size() > 0) /*if(reliable_data.size() > 0 || unreliable_data.size() > 0)
@ -3721,7 +3722,7 @@ void Server::SendHUDAdd(u16 peer_id, u32 id, HudElement *form)
std::string s = os.str(); std::string s = os.str();
SharedBuffer<u8> data((u8*)s.c_str(), s.size()); SharedBuffer<u8> data((u8*)s.c_str(), s.size());
// Send as reliable // Send as reliable
m_con.Send(peer_id, 0, data, true); m_con.Send(peer_id, 1, data, true);
} }
void Server::SendHUDRemove(u16 peer_id, u32 id) void Server::SendHUDRemove(u16 peer_id, u32 id)
@ -3736,7 +3737,8 @@ void Server::SendHUDRemove(u16 peer_id, u32 id)
std::string s = os.str(); std::string s = os.str();
SharedBuffer<u8> data((u8*)s.c_str(), s.size()); SharedBuffer<u8> data((u8*)s.c_str(), s.size());
// Send as reliable // Send as reliable
m_con.Send(peer_id, 0, data, true);
m_con.Send(peer_id, 1, data, true);
} }
void Server::SendHUDChange(u16 peer_id, u32 id, HudElementStat stat, void *value) void Server::SendHUDChange(u16 peer_id, u32 id, HudElementStat stat, void *value)
@ -4204,7 +4206,7 @@ void Server::SendBlockNoLock(u16 peer_id, MapBlock *block, u8 ver, u16 net_proto
/* /*
Send packet Send packet
*/ */
m_con.Send(peer_id, 1, reply, true); m_con.Send(peer_id, 2, reply, true);
} }
void Server::SendBlocks(float dtime) void Server::SendBlocks(float dtime)
@ -4566,7 +4568,7 @@ void Server::sendRequestedMedia(u16 peer_id,
<<" size=" <<s.size()<<std::endl; <<" size=" <<s.size()<<std::endl;
SharedBuffer<u8> data((u8*)s.c_str(), s.size()); SharedBuffer<u8> data((u8*)s.c_str(), s.size());
// Send as reliable // Send as reliable
m_con.Send(peer_id, 0, data, true); m_con.Send(peer_id, 2, data, true);
} }
} }

@ -330,7 +330,7 @@ public:
// Actual processing is done in an another thread. // Actual processing is done in an another thread.
void step(float dtime); void step(float dtime);
// This is run by ServerThread and does the actual processing // This is run by ServerThread and does the actual processing
void AsyncRunStep(); void AsyncRunStep(bool initial_step=false);
void Receive(); void Receive();
void ProcessData(u8 *data, u32 datasize, u16 peer_id); void ProcessData(u8 *data, u32 datasize, u16 peer_id);