Improve Connection with threading and some kind of congestion control

This commit is contained in:
Perttu Ahola 2011-10-20 23:04:09 +03:00
parent b6fcbc5fba
commit 4b6138e69b
12 changed files with 1163 additions and 716 deletions

@ -142,7 +142,7 @@
#active_object_send_range_blocks = 3
#active_block_range = 2
#max_simultaneous_block_sends_per_client = 2
#max_simultaneous_block_sends_server_total = 2
#max_simultaneous_block_sends_server_total = 8
#max_block_send_distance = 7
#max_block_generate_distance = 5
#time_send_interval = 20

@ -246,7 +246,7 @@ void Client::connect(Address address)
{
DSTACK(__FUNCTION_NAME);
//JMutexAutoLock lock(m_con_mutex); //bulk comment-out
m_con.setTimeoutMs(0);
m_con.SetTimeoutMs(0);
m_con.Connect(address);
}
@ -563,8 +563,8 @@ void Client::step(float dtime)
counter = 0.0;
//JMutexAutoLock lock(m_con_mutex); //bulk comment-out
// connectedAndInitialized() is true, peer exists.
con::Peer *peer = m_con.GetPeer(PEER_ID_SERVER);
infostream<<"Client: avg_rtt="<<peer->avg_rtt<<std::endl;
float avg_rtt = m_con.GetPeerAvgRTT(PEER_ID_SERVER);
infostream<<"Client: avg_rtt="<<avg_rtt<<std::endl;
}
}
@ -709,14 +709,6 @@ void Client::ProcessData(u8 *data, u32 datasize, u16 sender_peer_id)
return;
}
con::Peer *peer;
{
//JMutexAutoLock lock(m_con_mutex); //bulk comment-out
// All data is coming from the server
// PeerNotFoundException is handled by caller.
peer = m_con.GetPeer(PEER_ID_SERVER);
}
u8 ser_version = m_server_ser_ver;
//infostream<<"Client received command="<<(int)command<<std::endl;
@ -2168,9 +2160,10 @@ ClientEvent Client::getClientEvent()
float Client::getRTT(void)
{
con::Peer *peer = m_con.GetPeerNoEx(PEER_ID_SERVER);
if(!peer)
return 0.0;
return peer->avg_rtt;
try{
return m_con.GetPeerAvgRTT(PEER_ID_SERVER);
} catch(con::PeerNotFoundException &e){
return 1337;
}
}

@ -251,11 +251,11 @@ public:
float getAvgRtt()
{
//JMutexAutoLock lock(m_con_mutex); //bulk comment-out
con::Peer *peer = m_con.GetPeerNoEx(PEER_ID_SERVER);
if(peer == NULL)
return 0.0;
return peer->avg_rtt;
try{
return m_con.GetPeerAvgRTT(PEER_ID_SERVER);
} catch(con::PeerNotFoundException){
return 1337;
}
}
bool getChatMessage(std::wstring &message)

File diff suppressed because it is too large Load Diff

@ -319,28 +319,6 @@ struct Channel
{
Channel();
~Channel();
/*
Processes a packet with the basic header stripped out.
Parameters:
packetdata: Data in packet (with no base headers)
con: The connection to which the channel is associated
(used for sending back stuff (ACKs))
peer_id: peer id of the sender of the packet in question
channelnum: channel on which the packet was sent
reliable: true if recursing into a reliable packet
*/
SharedBuffer<u8> ProcessPacket(
SharedBuffer<u8> packetdata,
Connection *con,
u16 peer_id,
u8 channelnum,
bool reliable=false);
// Returns next data from a buffer if possible
// throws a NoIncomingDataException if no data is available
// If found, sets peer_id
SharedBuffer<u8> CheckIncomingBuffers(Connection *con,
u16 &peer_id);
u16 next_outgoing_seqnum;
u16 next_incoming_seqnum;
@ -412,78 +390,237 @@ public:
// with the id we have given to it
bool has_sent_with_id;
float m_sendtime_accu;
float m_max_packets_per_second;
int m_num_sent;
int m_max_num_sent;
private:
};
class Connection
/*
Connection
*/
struct OutgoingPacket
{
u16 peer_id;
u8 channelnum;
SharedBuffer<u8> data;
bool reliable;
OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
bool reliable_):
peer_id(peer_id_),
channelnum(channelnum_),
data(data_),
reliable(reliable_)
{
}
};
enum ConnectionEventType{
CONNEVENT_NONE,
CONNEVENT_DATA_RECEIVED,
CONNEVENT_PEER_ADDED,
CONNEVENT_PEER_REMOVED,
};
struct ConnectionEvent
{
enum ConnectionEventType type;
u16 peer_id;
SharedBuffer<u8> data;
bool timeout;
Address address;
ConnectionEvent(): type(CONNEVENT_NONE) {}
std::string describe()
{
switch(type){
case CONNEVENT_NONE:
return "CONNEVENT_NONE";
case CONNEVENT_DATA_RECEIVED:
return "CONNEVENT_DATA_RECEIVED";
case CONNEVENT_PEER_ADDED:
return "CONNEVENT_PEER_ADDED";
case CONNEVENT_PEER_REMOVED:
return "CONNEVENT_PEER_REMOVED";
}
return "Invalid ConnectionEvent";
}
void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
{
type = CONNEVENT_DATA_RECEIVED;
peer_id = peer_id_;
data = data_;
}
void peerAdded(u16 peer_id_, Address address_)
{
type = CONNEVENT_PEER_ADDED;
peer_id = peer_id_;
address = address_;
}
void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
{
type = CONNEVENT_PEER_REMOVED;
peer_id = peer_id_;
timeout = timeout_;
address = address_;
}
};
enum ConnectionCommandType{
CONNCMD_NONE,
CONNCMD_SERVE,
CONNCMD_CONNECT,
CONNCMD_DISCONNECT,
CONNCMD_SEND,
CONNCMD_SEND_TO_ALL,
CONNCMD_DELETE_PEER,
};
struct ConnectionCommand
{
enum ConnectionCommandType type;
u16 port;
Address address;
u16 peer_id;
u8 channelnum;
SharedBuffer<u8> data;
bool reliable;
ConnectionCommand(): type(CONNCMD_NONE) {}
void serve(u16 port_)
{
type = CONNCMD_SERVE;
port = port_;
}
void connect(Address address_)
{
type = CONNCMD_CONNECT;
address = address_;
}
void disconnect()
{
type = CONNCMD_DISCONNECT;
}
void send(u16 peer_id_, u8 channelnum_,
SharedBuffer<u8> data_, bool reliable_)
{
type = CONNCMD_SEND;
peer_id = peer_id_;
channelnum = channelnum_;
data = data_;
reliable = reliable_;
}
void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
{
type = CONNCMD_SEND_TO_ALL;
channelnum = channelnum_;
data = data_;
reliable = reliable_;
}
void deletePeer(u16 peer_id_)
{
type = CONNCMD_DELETE_PEER;
peer_id = peer_id_;
}
};
class Connection: public SimpleThread
{
public:
Connection(
u32 protocol_id,
u32 max_packet_size,
float timeout,
PeerHandler *peerhandler
);
Connection(u32 protocol_id, u32 max_packet_size, float timeout);
Connection(u32 protocol_id, u32 max_packet_size, float timeout,
PeerHandler *peerhandler);
~Connection();
void setTimeoutMs(int timeout){ m_socket.setTimeoutMs(timeout); }
// Start being a server
void * Thread();
/* Interface */
ConnectionEvent getEvent();
ConnectionEvent waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommand &c);
void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
void Serve(unsigned short port);
// Connect to a server
void Connect(Address address);
bool Connected();
void Disconnect();
// Sets peer_id
SharedBuffer<u8> GetFromBuffers(u16 &peer_id);
// The peer_id of sender is stored in peer_id
// Return value: I guess this always throws an exception or
// actually gets data
// May call PeerHandler methods
u32 Receive(u16 &peer_id, u8 *data, u32 datasize);
// These will automatically package the data as an original or split
void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
// Send data as a packet; it will be wrapped in base header and
// optionally to a reliable packet.
void SendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
// Sends a raw packet
void RawSend(const BufferedPacket &packet);
// May call PeerHandler methods
void RunTimeouts(float dtime);
// Can throw a PeerNotFoundException
Peer* GetPeer(u16 peer_id);
// returns NULL if failed
Peer* GetPeerNoEx(u16 peer_id);
core::list<Peer*> GetPeers();
// Calls PeerHandler::deletingPeer
// Returns false if peer was not found
bool deletePeer(u16 peer_id, bool timeout);
void SetPeerID(u16 id){ m_peer_id = id; }
void RunTimeouts(float dtime); // dummy
u16 GetPeerID(){ return m_peer_id; }
u32 GetProtocolID(){ return m_protocol_id; }
// For debug printing
void PrintInfo(std::ostream &out);
void PrintInfo();
u16 m_indentation;
Address GetPeerAddress(u16 peer_id);
float GetPeerAvgRTT(u16 peer_id);
void DeletePeer(u16 peer_id);
private:
void putEvent(ConnectionEvent &e);
void processCommand(ConnectionCommand &c);
void send(float dtime);
void receive();
void runTimeouts(float dtime);
void serve(u16 port);
void connect(Address address);
void disconnect();
void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
void sendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
void rawSendAsPacket(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data, bool reliable);
void rawSend(const BufferedPacket &packet);
Peer* getPeer(u16 peer_id);
Peer* getPeerNoEx(u16 peer_id);
core::list<Peer*> getPeers();
bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
// Returns next data from a buffer if possible
// If found, returns true; if not, false.
// If found, sets peer_id and dst
bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
SharedBuffer<u8> &dst);
/*
Processes a packet with the basic header stripped out.
Parameters:
packetdata: Data in packet (with no base headers)
peer_id: peer id of the sender of the packet in question
channelnum: channel on which the packet was sent
reliable: true if recursing into a reliable packet
*/
SharedBuffer<u8> processPacket(Channel *channel,
SharedBuffer<u8> packetdata, u16 peer_id,
u8 channelnum, bool reliable);
bool deletePeer(u16 peer_id, bool timeout);
Queue<OutgoingPacket> m_outgoing_queue;
MutexedQueue<ConnectionEvent> m_event_queue;
MutexedQueue<ConnectionCommand> m_command_queue;
u32 m_protocol_id;
float m_timeout;
PeerHandler *m_peerhandler;
core::map<u16, Peer*> m_peers;
u16 m_peer_id;
//bool m_waiting_new_peer_id;
u32 m_max_packet_size;
float m_timeout;
UDPSocket m_socket;
u16 m_peer_id;
core::map<u16, Peer*> m_peers;
JMutex m_peers_mutex;
// Backwards compatibility
PeerHandler *m_bc_peerhandler;
int m_bc_receive_timeout;
void SetPeerID(u16 id){ m_peer_id = id; }
u32 GetProtocolID(){ return m_protocol_id; }
void PrintInfo(std::ostream &out);
void PrintInfo();
std::string getDesc();
u16 m_indentation;
};
} // namespace

@ -100,7 +100,7 @@ void set_default_settings(Settings *settings)
//settings->setDefault("max_simultaneous_block_sends_per_client", "1");
// This causes frametime jitter on client side, or does it?
settings->setDefault("max_simultaneous_block_sends_per_client", "2");
settings->setDefault("max_simultaneous_block_sends_server_total", "2");
settings->setDefault("max_simultaneous_block_sends_server_total", "8");
settings->setDefault("max_block_send_distance", "7");
settings->setDefault("max_block_generate_distance", "5");
settings->setDefault("time_send_interval", "20");

@ -481,6 +481,8 @@ MainGameCallback *g_gamecallback = NULL;
// Connection
std::ostream *dout_con_ptr = &dummyout;
std::ostream *derr_con_ptr = &verbosestream;
//std::ostream *dout_con_ptr = &infostream;
//std::ostream *derr_con_ptr = &errorstream;
// Server
std::ostream *dout_server_ptr = &infostream;

@ -21,7 +21,9 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "mapsector.h"
#include "mapblock.h"
#include "main.h"
#ifndef SERVER
#include "client.h"
#endif
#include "filesys.h"
#include "utility.h"
#include "voxel.h"

@ -1074,7 +1074,7 @@ void Server::start(unsigned short port)
m_thread.stop();
// Initialize connection
m_con.setTimeoutMs(30);
m_con.SetTimeoutMs(30);
m_con.Serve(port);
// Start thread
@ -1823,9 +1823,18 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
JMutexAutoLock envlock(m_env_mutex);
JMutexAutoLock conlock(m_con_mutex);
con::Peer *peer;
try{
peer = m_con.GetPeer(peer_id);
Address address = m_con.GetPeerAddress(peer_id);
// drop player if is ip is banned
if(m_banmanager.isIpBanned(address.serializeString())){
SendAccessDenied(m_con, peer_id,
L"Your ip is banned. Banned name was "
+narrow_to_wide(m_banmanager.getBanName(
address.serializeString())));
m_con.DeletePeer(peer_id);
return;
}
}
catch(con::PeerNotFoundException &e)
{
@ -1834,17 +1843,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
return;
}
// drop player if is ip is banned
if(m_banmanager.isIpBanned(peer->address.serializeString())){
SendAccessDenied(m_con, peer_id,
L"Your ip is banned. Banned name was "
+narrow_to_wide(m_banmanager.getBanName(
peer->address.serializeString())));
m_con.deletePeer(peer_id, false);
return;
}
u8 peer_ser_ver = getClient(peer->id)->serialization_version;
u8 peer_ser_ver = getClient(peer_id)->serialization_version;
try
{
@ -1865,7 +1864,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
return;
infostream<<"Server: Got TOSERVER_INIT from "
<<peer->id<<std::endl;
<<peer_id<<std::endl;
// First byte after command is maximum supported
// serialization version
@ -1878,7 +1877,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
deployed = SER_FMT_VER_INVALID;
//peer->serialization_version = deployed;
getClient(peer->id)->pending_serialization_version = deployed;
getClient(peer_id)->pending_serialization_version = deployed;
if(deployed == SER_FMT_VER_INVALID)
{
@ -1900,7 +1899,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
net_proto_version = readU16(&data[2+1+PLAYERNAME_SIZE+PASSWORD_SIZE]);
}
getClient(peer->id)->net_proto_version = net_proto_version;
getClient(peer_id)->net_proto_version = net_proto_version;
if(net_proto_version == 0)
{
@ -2045,11 +2044,11 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
if(command == TOSERVER_INIT2)
{
infostream<<"Server: Got TOSERVER_INIT2 from "
<<peer->id<<std::endl;
<<peer_id<<std::endl;
getClient(peer->id)->serialization_version
= getClient(peer->id)->pending_serialization_version;
getClient(peer_id)->serialization_version
= getClient(peer_id)->pending_serialization_version;
/*
Send some initialization data
@ -2059,8 +2058,8 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
SendPlayerInfos();
// Send inventory to player
UpdateCrafting(peer->id);
SendInventory(peer->id);
UpdateCrafting(peer_id);
SendInventory(peer_id);
// Send player items to all players
SendPlayerItems();
@ -2074,7 +2073,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
{
SharedBuffer<u8> data = makePacket_TOCLIENT_TIME_OF_DAY(
m_env.getTimeOfDay());
m_con.Send(peer->id, 0, data, true);
m_con.Send(peer_id, 0, data, true);
}
// Send information about server to player in chat
@ -2095,7 +2094,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
}
// Warnings about protocol version can be issued here
if(getClient(peer->id)->net_proto_version < PROTOCOL_VERSION)
if(getClient(peer_id)->net_proto_version < PROTOCOL_VERSION)
{
SendChatMessage(peer_id, L"# Server: WARNING: YOUR CLIENT IS OLD AND MAY WORK PROPERLY WITH THIS SERVER");
}
@ -2402,7 +2401,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
else if(action == 2)
{
#if 0
RemoteClient *client = getClient(peer->id);
RemoteClient *client = getClient(peer_id);
JMutexAutoLock digmutex(client->m_dig_mutex);
client->m_dig_tool_item = -1;
#endif
@ -2685,7 +2684,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
}
// Reset build time counter
getClient(peer->id)->m_time_from_building = 0.0;
getClient(peer_id)->m_time_from_building = 0.0;
// Create node data
MaterialItem *mitem = (MaterialItem*)item;
@ -3428,11 +3427,10 @@ core::list<PlayerInfo> Server::getPlayerInfo()
Player *player = *i;
try{
con::Peer *peer = m_con.GetPeer(player->peer_id);
// Copy info from peer to info struct
info.id = peer->id;
info.address = peer->address;
info.avg_rtt = peer->avg_rtt;
// Copy info from connection to info struct
info.id = player->peer_id;
info.address = m_con.GetPeerAddress(player->peer_id);
info.avg_rtt = m_con.GetPeerAvgRTT(player->peer_id);
}
catch(con::PeerNotFoundException &e)
{

@ -468,9 +468,9 @@ public:
return m_banmanager.getBanDescription(ip_or_name);
}
con::Peer* getPeerNoEx(u16 peer_id)
Address getPeerAddress(u16 peer_id)
{
return m_con.GetPeerNoEx(peer_id);
return m_con.GetPeerAddress(peer_id);
}
// Envlock and conlock should be locked when calling this

@ -251,19 +251,18 @@ void cmd_banunban(std::wostringstream &os, ServerCommandContext *ctx)
return;
}
con::Peer *peer = ctx->server->getPeerNoEx(player->peer_id);
if(peer == NULL)
{
dstream<<__FUNCTION_NAME<<": peer was not found"<<std::endl;
return;
}
std::string ip_string = peer->address.serializeString();
ctx->server->setIpBanned(ip_string, player->getName());
os<<L"-!- Banned "<<narrow_to_wide(ip_string)<<L"|"
<<narrow_to_wide(player->getName());
try{
Address address = ctx->server->getPeerAddress(player->peer_id);
std::string ip_string = address.serializeString();
ctx->server->setIpBanned(ip_string, player->getName());
os<<L"-!- Banned "<<narrow_to_wide(ip_string)<<L"|"
<<narrow_to_wide(player->getName());
actionstream<<ctx->player->getName()<<" bans "
<<player->getName()<<" / "<<ip_string<<std::endl;
actionstream<<ctx->player->getName()<<" bans "
<<player->getName()<<" / "<<ip_string<<std::endl;
} catch(con::PeerNotFoundException){
dstream<<__FUNCTION_NAME<<": peer was not found"<<std::endl;
}
}
else
{

@ -819,7 +819,10 @@ struct TestConnection
/*
Test some real connections
NOTE: This mostly tests the legacy interface.
*/
u32 proto_id = 0xad26846a;
Handler hand_server("server");
@ -843,12 +846,31 @@ struct TestConnection
sleep_ms(50);
// Client should not have added client yet
assert(hand_client.count == 0);
try
{
u16 peer_id;
u8 data[100];
infostream<<"** running client.Receive()"<<std::endl;
u32 size = client.Receive(peer_id, data, 100);
infostream<<"** Client received: peer_id="<<peer_id
<<", size="<<size
<<std::endl;
}
catch(con::NoIncomingDataException &e)
{
}
// Client should have added server now
assert(hand_client.count == 1);
assert(hand_client.last_id == 1);
// But server should not have added client
// Server should not have added client yet
assert(hand_server.count == 0);
sleep_ms(50);
try
{
u16 peer_id;
@ -930,7 +952,7 @@ struct TestConnection
}
u16 peer_id_client = 2;
#if 0
{
/*
Send consequent packets in different order
@ -941,13 +963,13 @@ struct TestConnection
SharedBuffer<u8> data2 = SharedBufferFromString("Hello2");
Address client_address =
server.GetPeer(peer_id_client)->address;
server.GetPeerAddress(peer_id_client);
infostream<<"*** Sending packets in wrong order (2,1,2)"
<<std::endl;
u8 chn = 0;
con::Channel *ch = &server.GetPeer(peer_id_client)->channels[chn];
con::Channel *ch = &server.getPeer(peer_id_client)->channels[chn];
u16 sn = ch->next_outgoing_seqnum;
ch->next_outgoing_seqnum = sn+1;
server.Send(peer_id_client, chn, data2, true);
@ -1004,6 +1026,7 @@ struct TestConnection
}
assert(got_exception);
}
#endif
{
const int datasize = 30000;
SharedBuffer<u8> data1(datasize);
@ -1022,12 +1045,25 @@ struct TestConnection
server.Send(peer_id_client, 0, data1, true);
sleep_ms(50);
sleep_ms(3000);
u8 recvdata[datasize + 1000];
infostream<<"** running client.Receive()"<<std::endl;
u16 peer_id = 132;
u16 size = client.Receive(peer_id, recvdata, datasize + 1000);
u16 size = 0;
bool received = false;
u32 timems0 = porting::getTimeMs();
for(;;){
if(porting::getTimeMs() - timems0 > 5000)
break;
try{
size = client.Receive(peer_id, recvdata, datasize + 1000);
received = true;
}catch(con::NoIncomingDataException &e){
}
sleep_ms(10);
}
assert(received);
infostream<<"** Client received: peer_id="<<peer_id
<<", size="<<size
<<std::endl;