block send priority queue

This commit is contained in:
Perttu Ahola 2010-11-27 18:10:11 +02:00
parent 24c4b7c68d
commit e8fd5eb8ee
3 changed files with 345 additions and 8 deletions

@ -28,14 +28,13 @@
// The absolute working limit is (2^15 - viewing_range). // The absolute working limit is (2^15 - viewing_range).
#define MAP_GENERATION_LIMIT (31000) #define MAP_GENERATION_LIMIT (31000)
//#define MAX_SIMULTANEOUS_BLOCK_SENDS 7
//#define MAX_SIMULTANEOUS_BLOCK_SENDS 3
#define MAX_SIMULTANEOUS_BLOCK_SENDS 2 #define MAX_SIMULTANEOUS_BLOCK_SENDS 2
//#define MAX_SIMULTANEOUS_BLOCK_SENDS 1
#define FULL_BLOCK_SEND_ENABLE_MIN_TIME_FROM_BUILDING 2.0 #define FULL_BLOCK_SEND_ENABLE_MIN_TIME_FROM_BUILDING 2.0
#define LIMITED_MAX_SIMULTANEOUS_BLOCK_SENDS 1 #define LIMITED_MAX_SIMULTANEOUS_BLOCK_SENDS 1
#define MAX_SIMULTANEOUS_BLOCK_SENDS_SERVER_TOTAL 4
// Viewing range stuff // Viewing range stuff
#define FPS_DEFAULT_WANTED 30 #define FPS_DEFAULT_WANTED 30

@ -246,9 +246,11 @@ void * EmergeThread::Thread()
return NULL; return NULL;
} }
#if 0
void RemoteClient::SendBlocks(Server *server, float dtime) void RemoteClient::SendBlocks(Server *server, float dtime)
{ {
DSTACK(__FUNCTION_NAME); DSTACK(__FUNCTION_NAME);
/* /*
Find what blocks to send to the client next, and send them. Find what blocks to send to the client next, and send them.
@ -518,6 +520,262 @@ void RemoteClient::SendBlocks(Server *server, float dtime)
// Don't add anything here. The loop breaks by returning. // Don't add anything here. The loop breaks by returning.
} }
#endif // backup of SendBlocks
void RemoteClient::GetNextBlocks(Server *server, float dtime,
core::array<PrioritySortedBlockTransfer> &dest)
{
DSTACK(__FUNCTION_NAME);
// Won't send anything if already sending
{
JMutexAutoLock lock(m_blocks_sending_mutex);
if(m_blocks_sending.size() >= MAX_SIMULTANEOUS_BLOCK_SENDS)
{
//dstream<<"Not sending any blocks, Queue full."<<std::endl;
return;
}
}
Player *player = server->m_env.getPlayer(peer_id);
v3f playerpos = player->getPosition();
v3f playerspeed = player->getSpeed();
v3s16 center_nodepos = floatToInt(playerpos);
v3s16 center = getNodeBlockPos(center_nodepos);
/*
Get the starting value of the block finder radius.
*/
s16 last_nearest_unsent_d;
s16 d_start;
{
JMutexAutoLock lock(m_blocks_sent_mutex);
if(m_last_center != center)
{
m_nearest_unsent_d = 0;
m_last_center = center;
}
static float reset_counter = 0;
reset_counter += dtime;
if(reset_counter > 5.0)
{
reset_counter = 0;
m_nearest_unsent_d = 0;
}
last_nearest_unsent_d = m_nearest_unsent_d;
d_start = m_nearest_unsent_d;
}
u16 maximum_simultaneous_block_sends = MAX_SIMULTANEOUS_BLOCK_SENDS;
{
SharedPtr<JMutexAutoLock> lock(m_time_from_building.getLock());
m_time_from_building.m_value += dtime;
/*
Check the time from last addNode/removeNode.
Decrease send rate if player is building stuff.
*/
if(m_time_from_building.m_value
< FULL_BLOCK_SEND_ENABLE_MIN_TIME_FROM_BUILDING)
{
maximum_simultaneous_block_sends
= LIMITED_MAX_SIMULTANEOUS_BLOCK_SENDS;
}
}
// Serialization version used
//u8 ser_version = serialization_version;
//bool has_incomplete_blocks = false;
/*
TODO: Get this from somewhere
*/
//s16 d_max = 7;
s16 d_max = 8;
//TODO: Get this from somewhere (probably a bigger value)
s16 d_max_gen = 5;
//dstream<<"Starting from "<<d_start<<std::endl;
for(s16 d = d_start; d <= d_max; d++)
{
//dstream<<"RemoteClient::SendBlocks(): d="<<d<<std::endl;
//if(has_incomplete_blocks == false)
{
JMutexAutoLock lock(m_blocks_sent_mutex);
/*
If m_nearest_unsent_d was changed by the EmergeThread
(it can change it to 0 through SetBlockNotSent),
update our d to it.
Else update m_nearest_unsent_d
*/
if(m_nearest_unsent_d != last_nearest_unsent_d)
{
d = m_nearest_unsent_d;
}
else
{
m_nearest_unsent_d = d;
}
last_nearest_unsent_d = m_nearest_unsent_d;
}
/*
Get the border/face dot coordinates of a "d-radiused"
box
*/
core::list<v3s16> list;
getFacePositions(list, d);
core::list<v3s16>::Iterator li;
for(li=list.begin(); li!=list.end(); li++)
{
v3s16 p = *li + center;
/*
Send throttling
- Don't allow too many simultaneous transfers
Also, don't send blocks that are already flying.
*/
{
JMutexAutoLock lock(m_blocks_sending_mutex);
// Limit is dynamically lowered when building
if(m_blocks_sending.size()
>= maximum_simultaneous_block_sends)
{
/*dstream<<"Not sending more blocks. Queue full. "
<<m_blocks_sending.size()
<<std::endl;*/
return;
}
if(m_blocks_sending.find(p) != NULL)
continue;
}
/*
Do not go over-limit
*/
if(p.X < -MAP_GENERATION_LIMIT / MAP_BLOCKSIZE
|| p.X > MAP_GENERATION_LIMIT / MAP_BLOCKSIZE
|| p.Y < -MAP_GENERATION_LIMIT / MAP_BLOCKSIZE
|| p.Y > MAP_GENERATION_LIMIT / MAP_BLOCKSIZE
|| p.Z < -MAP_GENERATION_LIMIT / MAP_BLOCKSIZE
|| p.Z > MAP_GENERATION_LIMIT / MAP_BLOCKSIZE)
continue;
bool generate = d <= d_max_gen;
// Limit the generating area vertically to half
if(abs(p.Y - center.Y) > d_max_gen / 2)
generate = false;
/*
Don't send already sent blocks
*/
{
JMutexAutoLock lock(m_blocks_sent_mutex);
if(m_blocks_sent.find(p) != NULL)
continue;
}
/*
Check if map has this block
*/
MapBlock *block = NULL;
try
{
block = server->m_env.getMap().getBlockNoCreate(p);
}
catch(InvalidPositionException &e)
{
}
bool surely_not_found_on_disk = false;
if(block != NULL)
{
/*if(block->isIncomplete())
{
has_incomplete_blocks = true;
continue;
}*/
if(block->isDummy())
{
surely_not_found_on_disk = true;
}
}
/*
If block has been marked to not exist on disk (dummy)
and generating new ones is not wanted, skip block. TODO
*/
if(generate == false && surely_not_found_on_disk == true)
{
// get next one.
continue;
}
/*
Add inexistent block to emerge queue.
*/
if(block == NULL || surely_not_found_on_disk)
{
// Block not found.
SharedPtr<JMutexAutoLock> lock
(m_num_blocks_in_emerge_queue.getLock());
//TODO: Get value from somewhere
//TODO: Balance between clients
//if(server->m_emerge_queue.size() < 1)
// Allow only one block in emerge queue
if(m_num_blocks_in_emerge_queue.m_value == 0)
{
// Add it to the emerge queue and trigger the thread
u8 flags = 0;
if(generate == false)
flags |= TOSERVER_GETBLOCK_FLAG_OPTIONAL;
{
m_num_blocks_in_emerge_queue.m_value++;
}
server->m_emerge_queue.addBlock(peer_id, p, flags);
server->m_emergethread.trigger();
}
// get next one.
continue;
}
/*
Add block to queue
*/
PrioritySortedBlockTransfer q((float)d, p, peer_id);
dest.push_back(q);
}
}
// Don't add anything here. The loop breaks by returning.
}
void RemoteClient::SendObjectData( void RemoteClient::SendObjectData(
Server *server, Server *server,
@ -2069,6 +2327,7 @@ void Server::SendInventory(u16 peer_id)
m_con.Send(peer_id, 0, data, true); m_con.Send(peer_id, 0, data, true);
} }
#if 0
void Server::SendBlocks(float dtime) void Server::SendBlocks(float dtime)
{ {
DSTACK(__FUNCTION_NAME); DSTACK(__FUNCTION_NAME);
@ -2095,6 +2354,68 @@ void Server::SendBlocks(float dtime)
//dstream<<"Server::SendBlocks(): END"<<std::endl; //dstream<<"Server::SendBlocks(): END"<<std::endl;
} }
#endif
void Server::SendBlocks(float dtime)
{
DSTACK(__FUNCTION_NAME);
JMutexAutoLock envlock(m_env_mutex);
core::array<PrioritySortedBlockTransfer> queue;
s32 total_sending = 0;
for(core::map<u16, RemoteClient*>::Iterator
i = m_clients.getIterator();
i.atEnd() == false; i++)
{
RemoteClient *client = i.getNode()->getValue();
assert(client->peer_id == i.getNode()->getKey());
total_sending += client->SendingCount();
if(client->serialization_version == SER_FMT_VER_INVALID)
continue;
client->GetNextBlocks(this, dtime, queue);
}
// Sort.
// Lowest priority number comes first.
// Lowest is most important.
queue.sort();
JMutexAutoLock conlock(m_con_mutex);
for(u32 i=0; i<queue.size(); i++)
{
//TODO: Calculate value dynamically
if(total_sending >= MAX_SIMULTANEOUS_BLOCK_SENDS_SERVER_TOTAL)
break;
PrioritySortedBlockTransfer q = queue[i];
MapBlock *block = NULL;
try
{
block = m_env.getMap().getBlockNoCreate(q.pos);
}
catch(InvalidPositionException &e)
{
continue;
}
RemoteClient *client = getClient(q.peer_id);
SendBlockNoLock(q.peer_id, block, client->serialization_version);
client->SentBlock(q.pos);
total_sending++;
}
}
RemoteClient* Server::getClient(u16 peer_id) RemoteClient* Server::getClient(u16 peer_id)
{ {

@ -208,11 +208,11 @@ u32 PIChecksum(core::list<PlayerInfo> &l);
*/ */
struct PrioritySortedBlockTransfer struct PrioritySortedBlockTransfer
{ {
PrioritySortedBlockTransfer(float a_priority, v3s16 a_pos, u16 a_dest_peer) PrioritySortedBlockTransfer(float a_priority, v3s16 a_pos, u16 a_peer_id)
{ {
priority = a_priority; priority = a_priority;
pos = a_pos; pos = a_pos;
dest_peer = a_dest_peer; peer_id = a_peer_id;
} }
bool operator < (PrioritySortedBlockTransfer &other) bool operator < (PrioritySortedBlockTransfer &other)
{ {
@ -220,7 +220,7 @@ struct PrioritySortedBlockTransfer
} }
float priority; float priority;
v3s16 pos; v3s16 pos;
u16 a_dest_peer; u16 peer_id;
}; };
class RemoteClient class RemoteClient
@ -252,8 +252,13 @@ public:
{ {
} }
// Connection and environment should be locked when this is called /*
void SendBlocks(Server *server, float dtime); Finds block that should be sent next to the client.
Environment should be locked when this is called.
dtime is used for resetting send radius at slow interval
*/
void GetNextBlocks(Server *server, float dtime,
core::array<PrioritySortedBlockTransfer> &dest);
// Connection and environment should be locked when this is called // Connection and environment should be locked when this is called
// steps() objects of blocks not found in active_blocks, then // steps() objects of blocks not found in active_blocks, then
@ -273,6 +278,18 @@ public:
void BlockEmerged(); void BlockEmerged();
/*bool IsSendingBlock(v3s16 p)
{
JMutexAutoLock lock(m_blocks_sending_mutex);
return (m_blocks_sending.find(p) != NULL);
}*/
s32 SendingCount()
{
JMutexAutoLock lock(m_blocks_sending_mutex);
return m_blocks_sending.size();
}
// Increments timeouts and removes timed-out blocks from list // Increments timeouts and removes timed-out blocks from list
// NOTE: This doesn't fix the server-not-sending-block bug // NOTE: This doesn't fix the server-not-sending-block bug
// because it is related to emerging, not sending. // because it is related to emerging, not sending.