Divorce map database locking from env lock (#15151)

This commit is contained in:
sfan5 2024-09-11 19:17:08 +02:00
parent 526a2f7b8c
commit 588a0f83e9
6 changed files with 197 additions and 90 deletions

@ -31,6 +31,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "filesys.h" #include "filesys.h"
#include "log.h" #include "log.h"
#include "servermap.h" #include "servermap.h"
#include "database/database.h"
#include "mapblock.h" #include "mapblock.h"
#include "mapgen/mg_biome.h" #include "mapgen/mg_biome.h"
#include "mapgen/mg_ore.h" #include "mapgen/mg_ore.h"
@ -185,10 +186,22 @@ SchematicManager *EmergeManager::getWritableSchematicManager()
return schemmgr; return schemmgr;
} }
void EmergeManager::initMap(MapDatabaseAccessor *holder)
{
FATAL_ERROR_IF(m_db, "Map database already initialized.");
assert(holder->dbase);
m_db = holder;
}
void EmergeManager::resetMap()
{
FATAL_ERROR_IF(m_threads_active, "Threads are still active.");
m_db = nullptr;
}
void EmergeManager::initMapgens(MapgenParams *params) void EmergeManager::initMapgens(MapgenParams *params)
{ {
FATAL_ERROR_IF(!m_mapgens.empty(), "Mapgen already initialised."); FATAL_ERROR_IF(!m_mapgens.empty(), "Mapgen already initialized.");
mgparams = params; mgparams = params;
@ -466,7 +479,7 @@ void EmergeThread::signal()
} }
bool EmergeThread::pushBlock(const v3s16 &pos) bool EmergeThread::pushBlock(v3s16 pos)
{ {
m_block_queue.push(pos); m_block_queue.push(pos);
return true; return true;
@ -491,7 +504,7 @@ void EmergeThread::cancelPendingItems()
} }
void EmergeThread::runCompletionCallbacks(const v3s16 &pos, EmergeAction action, void EmergeThread::runCompletionCallbacks(v3s16 pos, EmergeAction action,
const EmergeCallbackList &callbacks) const EmergeCallbackList &callbacks)
{ {
m_emerge->reportCompletedEmerge(action); m_emerge->reportCompletedEmerge(action);
@ -524,22 +537,37 @@ bool EmergeThread::popBlockEmerge(v3s16 *pos, BlockEmergeData *bedata)
} }
EmergeAction EmergeThread::getBlockOrStartGen( EmergeAction EmergeThread::getBlockOrStartGen(const v3s16 pos, bool allow_gen,
const v3s16 &pos, bool allow_gen, MapBlock **block, BlockMakeData *bmdata) const std::string *from_db, MapBlock **block, BlockMakeData *bmdata)
{ {
MutexAutoLock envlock(m_server->m_env_mutex); MutexAutoLock envlock(m_server->m_env_mutex);
auto block_ok = [] (MapBlock *b) {
return b && b->isGenerated();
};
// 1). Attempt to fetch block from memory // 1). Attempt to fetch block from memory
*block = m_map->getBlockNoCreateNoEx(pos); *block = m_map->getBlockNoCreateNoEx(pos);
if (*block) { if (*block) {
if ((*block)->isGenerated()) if (block_ok(*block)) {
// if we just read it from the db but the block exists that means
// someone else was faster. don't touch it to prevent data loss.
if (from_db)
verbosestream << "getBlockOrStartGen: block loading raced" << std::endl;
return EMERGE_FROM_MEMORY; return EMERGE_FROM_MEMORY;
}
} else { } else {
// 2). Attempt to load block from disk if it was not in the memory if (!from_db) {
*block = m_map->loadBlock(pos); // 2). We should attempt loading it
if (*block && (*block)->isGenerated())
return EMERGE_FROM_DISK; return EMERGE_FROM_DISK;
} }
// 2). Second invocation, we have the data
if (!from_db->empty()) {
*block = m_map->loadBlock(*from_db, pos);
if (block_ok(*block))
return EMERGE_FROM_DISK;
}
}
// 3). Attempt to start generation // 3). Attempt to start generation
if (allow_gen && m_map->initBlockMake(pos, bmdata)) if (allow_gen && m_map->initBlockMake(pos, bmdata))
@ -643,7 +671,8 @@ void *EmergeThread::run()
BEGIN_DEBUG_EXCEPTION_HANDLER BEGIN_DEBUG_EXCEPTION_HANDLER
v3s16 pos; v3s16 pos;
std::map<v3s16, MapBlock *> modified_blocks; std::map<v3s16, MapBlock*> modified_blocks;
std::string databuf;
m_map = &m_server->m_env->getServerMap(); m_map = &m_server->m_env->getServerMap();
m_emerge = m_server->getEmergeManager(); m_emerge = m_server->getEmergeManager();
@ -669,13 +698,30 @@ void *EmergeThread::run()
continue; continue;
} }
g_profiler->add(m_name + ": processed [#]", 1);
if (blockpos_over_max_limit(pos)) if (blockpos_over_max_limit(pos))
continue; continue;
bool allow_gen = bedata.flags & BLOCK_EMERGE_ALLOW_GEN; bool allow_gen = bedata.flags & BLOCK_EMERGE_ALLOW_GEN;
EMERGE_DBG_OUT("pos=" << pos << " allow_gen=" << allow_gen); EMERGE_DBG_OUT("pos=" << pos << " allow_gen=" << allow_gen);
action = getBlockOrStartGen(pos, allow_gen, &block, &bmdata); action = getBlockOrStartGen(pos, allow_gen, nullptr, &block, &bmdata);
/* Try to load it */
if (action == EMERGE_FROM_DISK) {
auto &m_db = *m_emerge->m_db;
{
ScopeProfiler sp(g_profiler, "EmergeThread: load block - async (sum)");
MutexAutoLock dblock(m_db.mutex);
m_db.loadBlock(pos, databuf);
}
// actually load it, then decide again
action = getBlockOrStartGen(pos, allow_gen, &databuf, &block, &bmdata);
databuf.clear();
}
/* Generate it */
if (action == EMERGE_GENERATED) { if (action == EMERGE_GENERATED) {
bool error = false; bool error = false;
m_trans_liquid = &bmdata.transforming_liquid; m_trans_liquid = &bmdata.transforming_liquid;

@ -46,6 +46,7 @@ class DecorationManager;
class SchematicManager; class SchematicManager;
class Server; class Server;
class ModApiMapgen; class ModApiMapgen;
struct MapDatabaseAccessor;
// Structure containing inputs/outputs for chunk generation // Structure containing inputs/outputs for chunk generation
struct BlockMakeData { struct BlockMakeData {
@ -173,6 +174,10 @@ public:
SchematicManager *getWritableSchematicManager(); SchematicManager *getWritableSchematicManager();
void initMapgens(MapgenParams *mgparams); void initMapgens(MapgenParams *mgparams);
/// @param holder non-owned reference that must stay alive
void initMap(MapDatabaseAccessor *holder);
/// resets the reference
void resetMap();
void startThreads(); void startThreads();
void stopThreads(); void stopThreads();
@ -206,6 +211,9 @@ private:
std::vector<EmergeThread *> m_threads; std::vector<EmergeThread *> m_threads;
bool m_threads_active = false; bool m_threads_active = false;
// The map database
MapDatabaseAccessor *m_db = nullptr;
std::mutex m_queue_mutex; std::mutex m_queue_mutex;
std::map<v3s16, BlockEmergeData> m_blocks_enqueued; std::map<v3s16, BlockEmergeData> m_blocks_enqueued;
std::unordered_map<u16, u32> m_peer_queue_count; std::unordered_map<u16, u32> m_peer_queue_count;

@ -40,7 +40,7 @@ class EmergeScripting;
class EmergeThread : public Thread { class EmergeThread : public Thread {
public: public:
bool enable_mapgen_debug_info; bool enable_mapgen_debug_info;
int id; const int id; // Index of this thread
EmergeThread(Server *server, int ethreadid); EmergeThread(Server *server, int ethreadid);
~EmergeThread() = default; ~EmergeThread() = default;
@ -49,7 +49,7 @@ public:
void signal(); void signal();
// Requires queue mutex held // Requires queue mutex held
bool pushBlock(const v3s16 &pos); bool pushBlock(v3s16 pos);
void cancelPendingItems(); void cancelPendingItems();
@ -59,7 +59,7 @@ public:
protected: protected:
void runCompletionCallbacks( void runCompletionCallbacks(
const v3s16 &pos, EmergeAction action, v3s16 pos, EmergeAction action,
const EmergeCallbackList &callbacks); const EmergeCallbackList &callbacks);
private: private:
@ -79,8 +79,20 @@ private:
bool popBlockEmerge(v3s16 *pos, BlockEmergeData *bedata); bool popBlockEmerge(v3s16 *pos, BlockEmergeData *bedata);
EmergeAction getBlockOrStartGen( /**
const v3s16 &pos, bool allow_gen, MapBlock **block, BlockMakeData *data); * Try to get a block from memory and decide what to do.
*
* @param pos block position
* @param from_db serialized block data, optional
* (for second call after EMERGE_FROM_DISK was returned)
* @param allow_gen allow invoking mapgen?
* @param block output pointer for block
* @param data info for mapgen
* @return what to do for this block
*/
EmergeAction getBlockOrStartGen(v3s16 pos, bool allow_gen,
const std::string *from_db, MapBlock **block, BlockMakeData *data);
MapBlock *finishGen(v3s16 pos, BlockMakeData *bmdata, MapBlock *finishGen(v3s16 pos, BlockMakeData *bmdata,
std::map<v3s16, MapBlock *> *modified_blocks); std::map<v3s16, MapBlock *> *modified_blocks);

@ -1278,8 +1278,7 @@ static bool recompress_map_database(const GameParams &game_params, const Setting
{ {
MapBlock mb(v3s16(0,0,0), &server); MapBlock mb(v3s16(0,0,0), &server);
u8 ver = readU8(iss); ServerMap::deSerializeBlock(&mb, iss);
mb.deSerialize(iss, ver, true);
oss.str(""); oss.str("");
oss.clear(); oss.clear();

@ -51,6 +51,18 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "database/database-postgresql.h" #include "database/database-postgresql.h"
#endif #endif
/*
Helpers
*/
void MapDatabaseAccessor::loadBlock(v3s16 blockpos, std::string &ret)
{
ret.clear();
dbase->loadBlock(blockpos, &ret);
if (ret.empty() && dbase_ro)
dbase_ro->loadBlock(blockpos, &ret);
}
/* /*
ServerMap ServerMap
*/ */
@ -67,7 +79,7 @@ ServerMap::ServerMap(const std::string &savedir, IGameDef *gamedef,
emerge->map_settings_mgr = &settings_mgr; emerge->map_settings_mgr = &settings_mgr;
/* /*
Try to load map; if not found, create a new one. Try to open map; if not found, create a new one.
*/ */
// Determine which database backend to use // Determine which database backend to use
@ -79,10 +91,10 @@ ServerMap::ServerMap(const std::string &savedir, IGameDef *gamedef,
conf.set("backend", "sqlite3"); conf.set("backend", "sqlite3");
} }
std::string backend = conf.get("backend"); std::string backend = conf.get("backend");
dbase = createDatabase(backend, savedir, conf); m_db.dbase = createDatabase(backend, savedir, conf);
if (conf.exists("readonly_backend")) { if (conf.exists("readonly_backend")) {
std::string readonly_dir = savedir + DIR_DELIM + "readonly"; std::string readonly_dir = savedir + DIR_DELIM + "readonly";
dbase_ro = createDatabase(conf.get("readonly_backend"), readonly_dir, conf); m_db.dbase_ro = createDatabase(conf.get("readonly_backend"), readonly_dir, conf);
} }
if (!conf.updateConfigFile(conf_path.c_str())) if (!conf.updateConfigFile(conf_path.c_str()))
errorstream << "ServerMap::ServerMap(): Failed to update world.mt!" << std::endl; errorstream << "ServerMap::ServerMap(): Failed to update world.mt!" << std::endl;
@ -90,6 +102,9 @@ ServerMap::ServerMap(const std::string &savedir, IGameDef *gamedef,
m_savedir = savedir; m_savedir = savedir;
m_map_saving_enabled = false; m_map_saving_enabled = false;
// Inform EmergeManager of db handles
m_emerge->initMap(&m_db);
m_save_time_counter = mb->addCounter( m_save_time_counter = mb->addCounter(
"minetest_map_save_time", "Time spent saving blocks (in microseconds)"); "minetest_map_save_time", "Time spent saving blocks (in microseconds)");
m_save_count_counter = mb->addCounter( m_save_count_counter = mb->addCounter(
@ -159,11 +174,15 @@ ServerMap::~ServerMap()
<< ", exception: " << e.what() << std::endl; << ", exception: " << e.what() << std::endl;
} }
/* m_emerge->resetMap();
Close database if it was opened
*/ {
delete dbase; MutexAutoLock dblock(m_db.mutex);
delete dbase_ro; delete m_db.dbase;
m_db.dbase = nullptr;
delete m_db.dbase_ro;
m_db.dbase_ro = nullptr;
}
deleteDetachedBlocks(); deleteDetachedBlocks();
} }
@ -547,9 +566,10 @@ void ServerMap::save(ModifiedState save_level)
void ServerMap::listAllLoadableBlocks(std::vector<v3s16> &dst) void ServerMap::listAllLoadableBlocks(std::vector<v3s16> &dst)
{ {
dbase->listAllLoadableBlocks(dst); MutexAutoLock dblock(m_db.mutex);
if (dbase_ro) m_db.dbase->listAllLoadableBlocks(dst);
dbase_ro->listAllLoadableBlocks(dst); if (m_db.dbase_ro)
m_db.dbase_ro->listAllLoadableBlocks(dst);
} }
void ServerMap::listAllLoadedBlocks(std::vector<v3s16> &dst) void ServerMap::listAllLoadedBlocks(std::vector<v3s16> &dst)
@ -597,17 +617,21 @@ MapDatabase *ServerMap::createDatabase(
void ServerMap::beginSave() void ServerMap::beginSave()
{ {
dbase->beginSave(); MutexAutoLock dblock(m_db.mutex);
m_db.dbase->beginSave();
} }
void ServerMap::endSave() void ServerMap::endSave()
{ {
dbase->endSave(); MutexAutoLock dblock(m_db.mutex);
m_db.dbase->endSave();
} }
bool ServerMap::saveBlock(MapBlock *block) bool ServerMap::saveBlock(MapBlock *block)
{ {
return saveBlock(block, dbase, m_map_compression_level); // FIXME: serialization happens under mutex
MutexAutoLock dblock(m_db.mutex);
return saveBlock(block, m_db.dbase, m_map_compression_level);
} }
bool ServerMap::saveBlock(MapBlock *block, MapDatabase *db, int compression_level) bool ServerMap::saveBlock(MapBlock *block, MapDatabase *db, int compression_level)
@ -634,18 +658,27 @@ bool ServerMap::saveBlock(MapBlock *block, MapDatabase *db, int compression_leve
return ret; return ret;
} }
void ServerMap::loadBlock(std::string *blob, v3s16 p3d, MapSector *sector, bool save_after_load) void ServerMap::deSerializeBlock(MapBlock *block, std::istream &is)
{ {
try { ScopeProfiler sp(g_profiler, "ServerMap: deSer block", SPT_AVG, PRECISION_MICRO);
std::istringstream is(*blob, std::ios_base::binary);
u8 version = readU8(is); u8 version = readU8(is);
if (is.fail())
throw SerializationError("Failed to read MapBlock version");
if(is.fail()) block->deSerialize(is, version, true);
throw SerializationError("ServerMap::loadBlock(): Failed" }
" to read MapBlock version");
MapBlock *ServerMap::loadBlock(const std::string &blob, v3s16 p3d, bool save_after_load)
{
ScopeProfiler sp(g_profiler, "ServerMap: load block", SPT_AVG, PRECISION_MICRO);
MapBlock *block = nullptr; MapBlock *block = nullptr;
bool created_new = false;
try {
v2s16 p2d(p3d.X, p3d.Z);
MapSector *sector = createSector(p2d);
std::unique_ptr<MapBlock> block_created_new; std::unique_ptr<MapBlock> block_created_new;
block = sector->getBlockNoCreateNoEx(p3d.Y); block = sector->getBlockNoCreateNoEx(p3d.Y);
if (!block) { if (!block) {
@ -654,31 +687,16 @@ void ServerMap::loadBlock(std::string *blob, v3s16 p3d, MapSector *sector, bool
} }
{ {
ScopeProfiler sp(g_profiler, "ServerMap: deSer block", SPT_AVG, PRECISION_MICRO); std::istringstream iss(blob, std::ios_base::binary);
block->deSerialize(is, version, true); deSerializeBlock(block, iss);
} }
// If it's a new block, insert it to the map // If it's a new block, insert it to the map
if (block_created_new) { if (block_created_new) {
sector->insertBlock(std::move(block_created_new)); sector->insertBlock(std::move(block_created_new));
ReflowScan scanner(this, m_emerge->ndef); created_new = true;
scanner.scan(block, &m_transforming_liquid);
} }
} catch (SerializationError &e) {
/*
Save blocks loaded in old format in new format
*/
//if(version < SER_FMT_VER_HIGHEST_READ || save_after_load)
// Only save if asked to; no need to update version
if(save_after_load)
saveBlock(block);
// We just loaded it from, so it's up-to-date.
block->resetModified();
}
catch(SerializationError &e)
{
errorstream<<"Invalid block data in database" errorstream<<"Invalid block data in database"
<<" ("<<p3d.X<<","<<p3d.Y<<","<<p3d.Z<<")" <<" ("<<p3d.X<<","<<p3d.Y<<","<<p3d.Z<<")"
<<" (SerializationError): "<<e.what()<<std::endl; <<" (SerializationError): "<<e.what()<<std::endl;
@ -693,47 +711,51 @@ void ServerMap::loadBlock(std::string *blob, v3s16 p3d, MapSector *sector, bool
throw SerializationError("Invalid block data in database"); throw SerializationError("Invalid block data in database");
} }
} }
}
MapBlock* ServerMap::loadBlock(v3s16 blockpos) assert(block);
{
ScopeProfiler sp(g_profiler, "ServerMap: load block", SPT_AVG, PRECISION_MICRO);
bool created_new = (getBlockNoCreateNoEx(blockpos) == NULL);
v2s16 p2d(blockpos.X, blockpos.Z); if (created_new) {
ReflowScan scanner(this, m_emerge->ndef);
scanner.scan(block, &m_transforming_liquid);
std::string ret;
dbase->loadBlock(blockpos, &ret);
if (!ret.empty()) {
loadBlock(&ret, blockpos, createSector(p2d), false);
} else if (dbase_ro) {
dbase_ro->loadBlock(blockpos, &ret);
if (!ret.empty()) {
loadBlock(&ret, blockpos, createSector(p2d), false);
}
} else {
return NULL;
}
MapBlock *block = getBlockNoCreateNoEx(blockpos);
if (created_new && (block != NULL)) {
std::map<v3s16, MapBlock*> modified_blocks; std::map<v3s16, MapBlock*> modified_blocks;
// Fix lighting if necessary // Fix lighting if necessary
voxalgo::update_block_border_lighting(this, block, modified_blocks); voxalgo::update_block_border_lighting(this, block, modified_blocks);
if (!modified_blocks.empty()) { if (!modified_blocks.empty()) {
//Modified lighting, send event
MapEditEvent event; MapEditEvent event;
event.type = MEET_OTHER; event.type = MEET_OTHER;
event.setModifiedBlocks(modified_blocks); event.setModifiedBlocks(modified_blocks);
dispatchEvent(event); dispatchEvent(event);
} }
} }
if (save_after_load)
saveBlock(block);
// We just loaded it, so it's up-to-date.
block->resetModified();
return block; return block;
} }
MapBlock* ServerMap::loadBlock(v3s16 blockpos)
{
std::string data;
{
ScopeProfiler sp(g_profiler, "ServerMap: load block - sync (sum)");
MutexAutoLock dblock(m_db.mutex);
m_db.loadBlock(blockpos, data);
}
if (!data.empty())
return loadBlock(data, blockpos);
return getBlockNoCreateNoEx(blockpos);
}
bool ServerMap::deleteBlock(v3s16 blockpos) bool ServerMap::deleteBlock(v3s16 blockpos)
{ {
if (!dbase->deleteBlock(blockpos)) MutexAutoLock dblock(m_db.mutex);
if (!m_db.dbase->deleteBlock(blockpos))
return false; return false;
MapBlock *block = getBlockNoCreateNoEx(blockpos); MapBlock *block = getBlockNoCreateNoEx(blockpos);

@ -33,9 +33,22 @@ class IRollbackManager;
class EmergeManager; class EmergeManager;
class ServerEnvironment; class ServerEnvironment;
struct BlockMakeData; struct BlockMakeData;
class MetricsBackend; class MetricsBackend;
// TODO: this could wrap all calls to MapDatabase, including locking
struct MapDatabaseAccessor {
/// Lock, to be taken for any operation
std::mutex mutex;
/// Main database
MapDatabase *dbase = nullptr;
/// Fallback database for read operations
MapDatabase *dbase_ro = nullptr;
/// Load a block, taking dbase_ro into account.
/// @note call locked
void loadBlock(v3s16 blockpos, std::string &ret);
};
/* /*
ServerMap ServerMap
@ -75,7 +88,7 @@ public:
MapBlock *createBlock(v3s16 p); MapBlock *createBlock(v3s16 p);
/* /*
Forcefully get a block from somewhere. Forcefully get a block from somewhere (blocking!).
- Memory - Memory
- Load from disk - Load from disk
- Create blank filled with CONTENT_IGNORE - Create blank filled with CONTENT_IGNORE
@ -114,9 +127,16 @@ public:
bool saveBlock(MapBlock *block) override; bool saveBlock(MapBlock *block) override;
static bool saveBlock(MapBlock *block, MapDatabase *db, int compression_level = -1); static bool saveBlock(MapBlock *block, MapDatabase *db, int compression_level = -1);
MapBlock* loadBlock(v3s16 p);
// Database version // Load block in a synchronous fashion
void loadBlock(std::string *blob, v3s16 p3d, MapSector *sector, bool save_after_load=false); MapBlock *loadBlock(v3s16 p);
/// Load a block that was already read from disk. Used by EmergeManager.
/// @return non-null block (but can be blank)
MapBlock *loadBlock(const std::string &blob, v3s16 p, bool save_after_load=false);
// Helper for deserializing blocks from disk
// @throws SerializationError
static void deSerializeBlock(MapBlock *block, std::istream &is);
// Blocks are removed from the map but not deleted from memory until // Blocks are removed from the map but not deleted from memory until
// deleteDetachedBlocks() is called, since pointers to them may still exist // deleteDetachedBlocks() is called, since pointers to them may still exist
@ -185,8 +205,8 @@ private:
This is reset to false when written on disk. This is reset to false when written on disk.
*/ */
bool m_map_metadata_changed = true; bool m_map_metadata_changed = true;
MapDatabase *dbase = nullptr;
MapDatabase *dbase_ro = nullptr; MapDatabaseAccessor m_db;
// Map metrics // Map metrics
MetricGaugePtr m_loaded_blocks_gauge; MetricGaugePtr m_loaded_blocks_gauge;