Add multi-Emerge thread support

This commit is contained in:
kwolekr 2013-02-17 01:47:49 -05:00
parent 76217939e0
commit 5ec5b1cbd6
8 changed files with 160 additions and 64 deletions

@ -187,6 +187,7 @@ void set_default_settings(Settings *settings)
settings->setDefault("emergequeue_limit_total", "256"); settings->setDefault("emergequeue_limit_total", "256");
settings->setDefault("emergequeue_limit_diskonly", "5"); settings->setDefault("emergequeue_limit_diskonly", "5");
settings->setDefault("emergequeue_limit_generate", "1"); settings->setDefault("emergequeue_limit_generate", "1");
settings->setDefault("num_emerge_threads", "");
// physics stuff // physics stuff
settings->setDefault("movement_acceleration_default", "3"); settings->setDefault("movement_acceleration_default", "3");

@ -47,49 +47,56 @@ EmergeManager::EmergeManager(IGameDef *gamedef, BiomeDefManager *bdef) {
this->biomedef = bdef ? bdef : new BiomeDefManager(gamedef); this->biomedef = bdef ? bdef : new BiomeDefManager(gamedef);
this->params = NULL; this->params = NULL;
this->mapgen = NULL;
qlimit_total = g_settings->getU16("emergequeue_limit_total"); qlimit_total = g_settings->getU16("emergequeue_limit_total");
qlimit_diskonly = g_settings->getU16("emergequeue_limit_diskonly"); qlimit_diskonly = g_settings->getU16("emergequeue_limit_diskonly");
qlimit_generate = g_settings->getU16("emergequeue_limit_generate"); qlimit_generate = g_settings->getU16("emergequeue_limit_generate");
queuemutex.Init(); queuemutex.Init();
emergethread = new EmergeThread((Server *)gamedef); int nthreads = g_settings->get("num_emerge_threads").empty() ?
porting::getNumberOfProcessors() :
g_settings->getU16("num_emerge_threads");
if (nthreads < 1)
nthreads = 1;
for (int i = 0; i != nthreads; i++)
emergethread.push_back(new EmergeThread((Server *)gamedef, i));
infostream << "EmergeManager: using " << nthreads << " threads" << std::endl;
} }
EmergeManager::~EmergeManager() { EmergeManager::~EmergeManager() {
emergethread->setRun(false); for (int i = 0; i != emergethread.size(); i++) {
emergethread->qevent.signal(); emergethread[i]->setRun(false);
emergethread->stop(); emergethread[i]->qevent.signal();
emergethread[i]->stop();
delete emergethread[i];
delete mapgen[i];
}
delete emergethread;
delete biomedef; delete biomedef;
delete mapgen;
delete params; delete params;
} }
void EmergeManager::initMapgens(MapgenParams *mgparams) { void EmergeManager::initMapgens(MapgenParams *mgparams) {
if (mapgen) Mapgen *mg;
if (mapgen.size())
return; return;
this->params = mgparams; this->params = mgparams;
this->mapgen = getMapgen(); //only one mapgen for now! for (int i = 0; i != emergethread.size(); i++) {
} mg = createMapgen(params->mg_name, 0, params);
if (!mg) {
Mapgen *EmergeManager::getMapgen() {
if (!mapgen) {
mapgen = createMapgen(params->mg_name, 0, params, this);
if (!mapgen) {
infostream << "EmergeManager: falling back to mapgen v6" << std::endl; infostream << "EmergeManager: falling back to mapgen v6" << std::endl;
delete params; delete params;
params = createMapgenParams("v6"); params = createMapgenParams("v6");
mapgen = createMapgen("v6", 0, params, this); mg = createMapgen("v6", 0, params);
} }
mapgen.push_back(mg);
} }
return mapgen;
} }
@ -98,6 +105,7 @@ bool EmergeManager::enqueueBlockEmerge(u16 peer_id, v3s16 p, bool allow_generate
BlockEmergeData *bedata; BlockEmergeData *bedata;
u16 count; u16 count;
u8 flags = 0; u8 flags = 0;
int idx = 0;
if (allow_generate) if (allow_generate)
flags |= BLOCK_EMERGE_ALLOWGEN; flags |= BLOCK_EMERGE_ALLOWGEN;
@ -128,45 +136,58 @@ bool EmergeManager::enqueueBlockEmerge(u16 peer_id, v3s16 p, bool allow_generate
peer_queue_count[peer_id] = count + 1; peer_queue_count[peer_id] = count + 1;
emergethread->blockqueue.push(p); int lowestitems = emergethread[0]->blockqueue.size();
for (int i = 1; i != emergethread.size(); i++) {
int nitems = emergethread[i]->blockqueue.size();
if (nitems < lowestitems) {
idx = i;
lowestitems = nitems;
}
}
emergethread[idx]->blockqueue.push(p);
} }
emergethread->qevent.signal(); emergethread[idx]->qevent.signal();
return true; return true;
} }
bool EmergeManager::popBlockEmerge(v3s16 *pos, u8 *flags) { bool EmergeThread::popBlockEmerge(v3s16 *pos, u8 *flags) {
std::map<v3s16, BlockEmergeData *>::iterator iter; std::map<v3s16, BlockEmergeData *>::iterator iter;
JMutexAutoLock queuelock(queuemutex); JMutexAutoLock queuelock(emerge->queuemutex);
if (emergethread->blockqueue.empty()) if (blockqueue.empty())
return false; return false;
v3s16 p = emergethread->blockqueue.front(); v3s16 p = blockqueue.front();
emergethread->blockqueue.pop(); blockqueue.pop();
*pos = p; *pos = p;
iter = blocks_enqueued.find(p); iter = emerge->blocks_enqueued.find(p);
if (iter == blocks_enqueued.end()) if (iter == emerge->blocks_enqueued.end())
return false; //uh oh, queue and map out of sync!! return false; //uh oh, queue and map out of sync!!
BlockEmergeData *bedata = iter->second; BlockEmergeData *bedata = iter->second;
*flags = bedata->flags; *flags = bedata->flags;
peer_queue_count[bedata->peer_requested]--; emerge->peer_queue_count[bedata->peer_requested]--;
delete bedata; delete bedata;
blocks_enqueued.erase(iter); emerge->blocks_enqueued.erase(iter);
return true; return true;
} }
int EmergeManager::getGroundLevelAtPoint(v2s16 p) { int EmergeManager::getGroundLevelAtPoint(v2s16 p) {
if (!mapgen) if (!mapgen[0]) {
errorstream << "EmergeManager: getGroundLevelAtPoint() called"
" before mapgen initialized" << std::endl;
return 0; return 0;
return mapgen->getGroundLevelAtPoint(p); }
return mapgen[0]->getGroundLevelAtPoint(p);
} }
@ -193,8 +214,9 @@ u32 EmergeManager::getBlockSeed(v3s16 p) {
Mapgen *EmergeManager::createMapgen(std::string mgname, int mgid, Mapgen *EmergeManager::createMapgen(std::string mgname, int mgid,
MapgenParams *mgparams, EmergeManager *emerge) { MapgenParams *mgparams) {
std::map<std::string, MapgenFactory *>::const_iterator iter = mglist.find(mgname); std::map<std::string, MapgenFactory *>::const_iterator iter;
iter = mglist.find(mgname);
if (iter == mglist.end()) { if (iter == mglist.end()) {
errorstream << "EmergeManager; mapgen " << mgname << errorstream << "EmergeManager; mapgen " << mgname <<
" not registered" << std::endl; " not registered" << std::endl;
@ -202,12 +224,13 @@ Mapgen *EmergeManager::createMapgen(std::string mgname, int mgid,
} }
MapgenFactory *mgfactory = iter->second; MapgenFactory *mgfactory = iter->second;
return mgfactory->createMapgen(mgid, mgparams, emerge); return mgfactory->createMapgen(mgid, mgparams, this);
} }
MapgenParams *EmergeManager::createMapgenParams(std::string mgname) { MapgenParams *EmergeManager::createMapgenParams(std::string mgname) {
std::map<std::string, MapgenFactory *>::const_iterator iter = mglist.find(mgname); std::map<std::string, MapgenFactory *>::const_iterator iter;
iter = mglist.find(mgname);
if (iter == mglist.end()) { if (iter == mglist.end()) {
errorstream << "EmergeManager: mapgen " << mgname << errorstream << "EmergeManager: mapgen " << mgname <<
" not registered" << std::endl; " not registered" << std::endl;
@ -227,7 +250,7 @@ MapgenParams *EmergeManager::getParamsFromSettings(Settings *settings) {
mgparams->seed = settings->getU64(settings == g_settings ? "fixed_map_seed" : "seed"); mgparams->seed = settings->getU64(settings == g_settings ? "fixed_map_seed" : "seed");
mgparams->water_level = settings->getS16("water_level"); mgparams->water_level = settings->getS16("water_level");
mgparams->chunksize = settings->getS16("chunksize"); mgparams->chunksize = settings->getS16("chunksize");
mgparams->flags = settings->getS32("mg_flags"); mgparams->flags = settings->getFlagStr("mg_flags", flagdesc_mapgen);
if (!mgparams->readParams(settings)) { if (!mgparams->readParams(settings)) {
delete mgparams; delete mgparams;
@ -354,11 +377,11 @@ void *EmergeThread::Thread() {
map = (ServerMap *)&(m_server->m_env->getMap()); map = (ServerMap *)&(m_server->m_env->getMap());
emerge = m_server->m_emerge; emerge = m_server->m_emerge;
mapgen = emerge->getMapgen(); mapgen = emerge->mapgen[id]; //emerge->getMapgen();
while (getRun()) while (getRun())
try { try {
while (!emerge->popBlockEmerge(&p, &flags)) { while (!popBlockEmerge(&p, &flags)) {
qevent.wait(); qevent.wait();
if (!getRun()) if (!getRun())
goto exit_emerge_loop; goto exit_emerge_loop;

@ -46,8 +46,8 @@ class EmergeManager {
public: public:
std::map<std::string, MapgenFactory *> mglist; std::map<std::string, MapgenFactory *> mglist;
Mapgen *mapgen; std::vector<Mapgen *> mapgen;
EmergeThread *emergethread; std::vector<EmergeThread *> emergethread;
//settings //settings
MapgenParams *params; MapgenParams *params;
@ -68,11 +68,9 @@ public:
void initMapgens(MapgenParams *mgparams); void initMapgens(MapgenParams *mgparams);
Mapgen *createMapgen(std::string mgname, int mgid, Mapgen *createMapgen(std::string mgname, int mgid,
MapgenParams *mgparams, EmergeManager *emerge); MapgenParams *mgparams);
MapgenParams *createMapgenParams(std::string mgname); MapgenParams *createMapgenParams(std::string mgname);
Mapgen *getMapgen();
bool enqueueBlockEmerge(u16 peer_id, v3s16 p, bool allow_generate); bool enqueueBlockEmerge(u16 peer_id, v3s16 p, bool allow_generate);
bool popBlockEmerge(v3s16 *pos, u8 *flags);
bool registerMapgen(std::string name, MapgenFactory *mgfactory); bool registerMapgen(std::string name, MapgenFactory *mgfactory);
MapgenParams *getParamsFromSettings(Settings *settings); MapgenParams *getParamsFromSettings(Settings *settings);
@ -92,17 +90,19 @@ class EmergeThread : public SimpleThread
EmergeManager *emerge; EmergeManager *emerge;
Mapgen *mapgen; Mapgen *mapgen;
bool enable_mapgen_debug_info; bool enable_mapgen_debug_info;
int id;
public: public:
Event qevent; Event qevent;
std::queue<v3s16> blockqueue; std::queue<v3s16> blockqueue;
EmergeThread(Server *server): EmergeThread(Server *server, int ethreadid):
SimpleThread(), SimpleThread(),
m_server(server), m_server(server),
map(NULL), map(NULL),
emerge(NULL), emerge(NULL),
mapgen(NULL) mapgen(NULL),
id(ethreadid)
{ {
enable_mapgen_debug_info = g_settings->getBool("enable_mapgen_debug_info"); enable_mapgen_debug_info = g_settings->getBool("enable_mapgen_debug_info");
} }
@ -118,6 +118,7 @@ public:
} }
} }
bool popBlockEmerge(v3s16 *pos, u8 *flags);
bool getBlockOrStartGen(v3s16 p, MapBlock **b, bool getBlockOrStartGen(v3s16 p, MapBlock **b,
BlockMakeData *data, bool allow_generate); BlockMakeData *data, bool allow_generate);
}; };

@ -2247,6 +2247,21 @@ void ServerMap::initBlockMake(BlockMakeData *data, v3s16 blockpos)
data->vmanip->initialEmerge(bigarea_blocks_min, bigarea_blocks_max); data->vmanip->initialEmerge(bigarea_blocks_min, bigarea_blocks_max);
} }
// Ensure none of the blocks to be generated were marked as containing CONTENT_IGNORE
for (s16 z = blockpos_min.Z; z <= blockpos_max.Z; z++) {
for (s16 y = blockpos_min.Y; y <= blockpos_max.Y; y++) {
for (s16 x = blockpos_min.X; x <= blockpos_max.X; x++) {
core::map<v3s16, u8>::Node *n;
n = data->vmanip->m_loaded_blocks.find(v3s16(x, y, z));
if (n == NULL)
continue;
u8 flags = n->getValue();
flags &= ~VMANIP_BLOCK_CONTAINS_CIGNORE;
n->setValue(flags);
}
}
}
// Data is ready now. // Data is ready now.
} }
@ -3672,8 +3687,10 @@ void MapVoxelManipulator::emerge(VoxelArea a, s32 caller_id)
for(s32 y=p_min.Y; y<=p_max.Y; y++) for(s32 y=p_min.Y; y<=p_max.Y; y++)
for(s32 x=p_min.X; x<=p_max.X; x++) for(s32 x=p_min.X; x<=p_max.X; x++)
{ {
u8 flags = 0;
MapBlock *block;
v3s16 p(x,y,z); v3s16 p(x,y,z);
core::map<v3s16, bool>::Node *n; core::map<v3s16, u8>::Node *n;
n = m_loaded_blocks.find(p); n = m_loaded_blocks.find(p);
if(n != NULL) if(n != NULL)
continue; continue;
@ -3689,7 +3706,7 @@ void MapVoxelManipulator::emerge(VoxelArea a, s32 caller_id)
a.print(infostream); a.print(infostream);
infostream<<std::endl;*/ infostream<<std::endl;*/
MapBlock *block = m_map->getBlockNoCreate(p); block = m_map->getBlockNoCreate(p);
if(block->isDummy()) if(block->isDummy())
block_data_inexistent = true; block_data_inexistent = true;
else else
@ -3702,6 +3719,8 @@ void MapVoxelManipulator::emerge(VoxelArea a, s32 caller_id)
if(block_data_inexistent) if(block_data_inexistent)
{ {
flags |= VMANIP_BLOCK_DATA_INEXIST;
VoxelArea a(p*MAP_BLOCKSIZE, (p+1)*MAP_BLOCKSIZE-v3s16(1,1,1)); VoxelArea a(p*MAP_BLOCKSIZE, (p+1)*MAP_BLOCKSIZE-v3s16(1,1,1));
// Fill with VOXELFLAG_INEXISTENT // Fill with VOXELFLAG_INEXISTENT
for(s32 z=a.MinEdge.Z; z<=a.MaxEdge.Z; z++) for(s32 z=a.MinEdge.Z; z<=a.MaxEdge.Z; z++)
@ -3711,8 +3730,13 @@ void MapVoxelManipulator::emerge(VoxelArea a, s32 caller_id)
memset(&m_flags[i], VOXELFLAG_INEXISTENT, MAP_BLOCKSIZE); memset(&m_flags[i], VOXELFLAG_INEXISTENT, MAP_BLOCKSIZE);
} }
} }
else if (block->getNode(0, 0, 0).getContent() == CONTENT_IGNORE)
{
// Mark that block was loaded as blank
flags |= VMANIP_BLOCK_CONTAINS_CIGNORE;
}
m_loaded_blocks.insert(p, !block_data_inexistent); m_loaded_blocks.insert(p, flags);
} }
//infostream<<"emerge done"<<std::endl; //infostream<<"emerge done"<<std::endl;
@ -3832,8 +3856,10 @@ void ManualMapVoxelManipulator::initialEmerge(
for(s32 y=p_min.Y; y<=p_max.Y; y++) for(s32 y=p_min.Y; y<=p_max.Y; y++)
for(s32 x=p_min.X; x<=p_max.X; x++) for(s32 x=p_min.X; x<=p_max.X; x++)
{ {
u8 flags = 0;
MapBlock *block;
v3s16 p(x,y,z); v3s16 p(x,y,z);
core::map<v3s16, bool>::Node *n; core::map<v3s16, u8>::Node *n;
n = m_loaded_blocks.find(p); n = m_loaded_blocks.find(p);
if(n != NULL) if(n != NULL)
continue; continue;
@ -3843,7 +3869,7 @@ void ManualMapVoxelManipulator::initialEmerge(
{ {
TimeTaker timer1("emerge load", &emerge_load_time); TimeTaker timer1("emerge load", &emerge_load_time);
MapBlock *block = m_map->getBlockNoCreate(p); block = m_map->getBlockNoCreate(p);
if(block->isDummy()) if(block->isDummy())
block_data_inexistent = true; block_data_inexistent = true;
else else
@ -3856,6 +3882,8 @@ void ManualMapVoxelManipulator::initialEmerge(
if(block_data_inexistent) if(block_data_inexistent)
{ {
flags |= VMANIP_BLOCK_DATA_INEXIST;
/* /*
Mark area inexistent Mark area inexistent
*/ */
@ -3868,8 +3896,13 @@ void ManualMapVoxelManipulator::initialEmerge(
memset(&m_flags[i], VOXELFLAG_INEXISTENT, MAP_BLOCKSIZE); memset(&m_flags[i], VOXELFLAG_INEXISTENT, MAP_BLOCKSIZE);
} }
} }
else if (block->getNode(0, 0, 0).getContent() == CONTENT_IGNORE)
{
// Mark that block was loaded as blank
flags |= VMANIP_BLOCK_CONTAINS_CIGNORE;
}
m_loaded_blocks.insert(p, !block_data_inexistent); m_loaded_blocks.insert(p, flags);
} }
} }
@ -3882,12 +3915,14 @@ void ManualMapVoxelManipulator::blitBackAll(
/* /*
Copy data of all blocks Copy data of all blocks
*/ */
for(core::map<v3s16, bool>::Iterator for(core::map<v3s16, u8>::Iterator
i = m_loaded_blocks.getIterator(); i = m_loaded_blocks.getIterator();
i.atEnd() == false; i++) i.atEnd() == false; i++)
{ {
v3s16 p = i.getNode()->getKey(); v3s16 p = i.getNode()->getKey();
bool existed = i.getNode()->getValue(); u8 flags = i.getNode()->getValue();
bool existed = !(flags & VMANIP_BLOCK_DATA_INEXIST);
if(existed == false) if(existed == false)
{ {
// The Great Bug was found using this // The Great Bug was found using this
@ -3896,6 +3931,7 @@ void ManualMapVoxelManipulator::blitBackAll(
<<std::endl;*/ <<std::endl;*/
continue; continue;
} }
MapBlock *block = m_map->getBlockNoCreateNoEx(p); MapBlock *block = m_map->getBlockNoCreateNoEx(p);
if(block == NULL) if(block == NULL)
{ {
@ -3906,10 +3942,13 @@ void ManualMapVoxelManipulator::blitBackAll(
continue; continue;
} }
block->copyFrom(*this); bool no_content_ignore = !(flags & VMANIP_BLOCK_CONTAINS_CIGNORE);
if (no_content_ignore)
if(modified_blocks) {
modified_blocks->insert(p, block); block->copyFrom(*this);
if(modified_blocks)
modified_blocks->insert(p, block);
}
} }
} }

@ -517,6 +517,9 @@ private:
sqlite3_stmt *m_database_list; sqlite3_stmt *m_database_list;
}; };
#define VMANIP_BLOCK_DATA_INEXIST 1
#define VMANIP_BLOCK_CONTAINS_CIGNORE 2
class MapVoxelManipulator : public VoxelManipulator class MapVoxelManipulator : public VoxelManipulator
{ {
public: public:
@ -533,13 +536,13 @@ public:
void blitBack(core::map<v3s16, MapBlock*> & modified_blocks); void blitBack(core::map<v3s16, MapBlock*> & modified_blocks);
protected:
Map *m_map;
/* /*
key = blockpos key = blockpos
value = block existed when loaded value = flags describing the block
*/ */
core::map<v3s16, bool> m_loaded_blocks; core::map<v3s16, u8> m_loaded_blocks;
protected:
Map *m_map;
}; };
class ManualMapVoxelManipulator : public MapVoxelManipulator class ManualMapVoxelManipulator : public MapVoxelManipulator

@ -131,6 +131,29 @@ void signal_handler_init(void)
#endif #endif
/*
Multithreading support
*/
int getNumberOfProcessors() {
#if defined(_SC_NPROCESSORS_ONLN)
return sysconf(_SC_NPROCESSORS_ONLN);
#elif defined(__FreeBSD__) || defined(__APPLE__)
unsigned int len, count;
len = sizeof(count);
return sysctlbyname("hw.ncpu", &count, &len, NULL, 0);
#elif defined(_GNU_SOURCE)
return get_nprocs();
#elif defined(_WIN32)
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
return sysinfo.dwNumberOfProcessors;
#elif defined(PTW32_VERSION) || defined(__hpux)
return pthread_num_processors_np();
#else
return 1;
#endif
}
/* /*
Path mangler Path mangler
*/ */

@ -103,6 +103,11 @@ std::string getDataPath(const char *subpath);
*/ */
void initializePaths(); void initializePaths();
/*
Get number of online processors in the system.
*/
int getNumberOfProcessors();
/* /*
Resolution is 10-20ms. Resolution is 10-20ms.
Remember to check for overflows. Remember to check for overflows.

@ -1649,7 +1649,8 @@ void Server::AsyncRunStep()
{ {
counter = 0.0; counter = 0.0;
m_emerge->emergethread->trigger(); for (int i = 0; i != m_emerge->emergethread.size(); i++)
m_emerge->emergethread[i]->trigger();
// Update m_enable_rollback_recording here too // Update m_enable_rollback_recording here too
m_enable_rollback_recording = m_enable_rollback_recording =