diff --git a/src/emerge.cpp b/src/emerge.cpp index 425e294b8..f66b78909 100644 --- a/src/emerge.cpp +++ b/src/emerge.cpp @@ -31,6 +31,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "filesys.h" #include "log.h" #include "servermap.h" +#include "database/database.h" #include "mapblock.h" #include "mapgen/mg_biome.h" #include "mapgen/mg_ore.h" @@ -185,10 +186,22 @@ SchematicManager *EmergeManager::getWritableSchematicManager() return schemmgr; } +void EmergeManager::initMap(MapDatabaseAccessor *holder) +{ + FATAL_ERROR_IF(m_db, "Map database already initialized."); + assert(holder->dbase); + m_db = holder; +} + +void EmergeManager::resetMap() +{ + FATAL_ERROR_IF(m_threads_active, "Threads are still active."); + m_db = nullptr; +} void EmergeManager::initMapgens(MapgenParams *params) { - FATAL_ERROR_IF(!m_mapgens.empty(), "Mapgen already initialised."); + FATAL_ERROR_IF(!m_mapgens.empty(), "Mapgen already initialized."); mgparams = params; @@ -466,7 +479,7 @@ void EmergeThread::signal() } -bool EmergeThread::pushBlock(const v3s16 &pos) +bool EmergeThread::pushBlock(v3s16 pos) { m_block_queue.push(pos); return true; @@ -491,7 +504,7 @@ void EmergeThread::cancelPendingItems() } -void EmergeThread::runCompletionCallbacks(const v3s16 &pos, EmergeAction action, +void EmergeThread::runCompletionCallbacks(v3s16 pos, EmergeAction action, const EmergeCallbackList &callbacks) { m_emerge->reportCompletedEmerge(action); @@ -524,21 +537,36 @@ bool EmergeThread::popBlockEmerge(v3s16 *pos, BlockEmergeData *bedata) } -EmergeAction EmergeThread::getBlockOrStartGen( - const v3s16 &pos, bool allow_gen, MapBlock **block, BlockMakeData *bmdata) +EmergeAction EmergeThread::getBlockOrStartGen(const v3s16 pos, bool allow_gen, + const std::string *from_db, MapBlock **block, BlockMakeData *bmdata) { MutexAutoLock envlock(m_server->m_env_mutex); + auto block_ok = [] (MapBlock *b) { + return b && b->isGenerated(); + }; + // 1). Attempt to fetch block from memory *block = m_map->getBlockNoCreateNoEx(pos); if (*block) { - if ((*block)->isGenerated()) + if (block_ok(*block)) { + // if we just read it from the db but the block exists that means + // someone else was faster. don't touch it to prevent data loss. + if (from_db) + verbosestream << "getBlockOrStartGen: block loading raced" << std::endl; return EMERGE_FROM_MEMORY; + } } else { - // 2). Attempt to load block from disk if it was not in the memory - *block = m_map->loadBlock(pos); - if (*block && (*block)->isGenerated()) + if (!from_db) { + // 2). We should attempt loading it return EMERGE_FROM_DISK; + } + // 2). Second invocation, we have the data + if (!from_db->empty()) { + *block = m_map->loadBlock(*from_db, pos); + if (block_ok(*block)) + return EMERGE_FROM_DISK; + } } // 3). Attempt to start generation @@ -643,7 +671,8 @@ void *EmergeThread::run() BEGIN_DEBUG_EXCEPTION_HANDLER v3s16 pos; - std::map modified_blocks; + std::map modified_blocks; + std::string databuf; m_map = &m_server->m_env->getServerMap(); m_emerge = m_server->getEmergeManager(); @@ -669,13 +698,30 @@ void *EmergeThread::run() continue; } + g_profiler->add(m_name + ": processed [#]", 1); + if (blockpos_over_max_limit(pos)) continue; bool allow_gen = bedata.flags & BLOCK_EMERGE_ALLOW_GEN; EMERGE_DBG_OUT("pos=" << pos << " allow_gen=" << allow_gen); - action = getBlockOrStartGen(pos, allow_gen, &block, &bmdata); + action = getBlockOrStartGen(pos, allow_gen, nullptr, &block, &bmdata); + + /* Try to load it */ + if (action == EMERGE_FROM_DISK) { + auto &m_db = *m_emerge->m_db; + { + ScopeProfiler sp(g_profiler, "EmergeThread: load block - async (sum)"); + MutexAutoLock dblock(m_db.mutex); + m_db.loadBlock(pos, databuf); + } + // actually load it, then decide again + action = getBlockOrStartGen(pos, allow_gen, &databuf, &block, &bmdata); + databuf.clear(); + } + + /* Generate it */ if (action == EMERGE_GENERATED) { bool error = false; m_trans_liquid = &bmdata.transforming_liquid; diff --git a/src/emerge.h b/src/emerge.h index d7f018feb..4e0f738d8 100644 --- a/src/emerge.h +++ b/src/emerge.h @@ -46,6 +46,7 @@ class DecorationManager; class SchematicManager; class Server; class ModApiMapgen; +struct MapDatabaseAccessor; // Structure containing inputs/outputs for chunk generation struct BlockMakeData { @@ -173,6 +174,10 @@ public: SchematicManager *getWritableSchematicManager(); void initMapgens(MapgenParams *mgparams); + /// @param holder non-owned reference that must stay alive + void initMap(MapDatabaseAccessor *holder); + /// resets the reference + void resetMap(); void startThreads(); void stopThreads(); @@ -206,6 +211,9 @@ private: std::vector m_threads; bool m_threads_active = false; + // The map database + MapDatabaseAccessor *m_db = nullptr; + std::mutex m_queue_mutex; std::map m_blocks_enqueued; std::unordered_map m_peer_queue_count; diff --git a/src/emerge_internal.h b/src/emerge_internal.h index 439c8227b..08e36778d 100644 --- a/src/emerge_internal.h +++ b/src/emerge_internal.h @@ -40,7 +40,7 @@ class EmergeScripting; class EmergeThread : public Thread { public: bool enable_mapgen_debug_info; - int id; + const int id; // Index of this thread EmergeThread(Server *server, int ethreadid); ~EmergeThread() = default; @@ -49,7 +49,7 @@ public: void signal(); // Requires queue mutex held - bool pushBlock(const v3s16 &pos); + bool pushBlock(v3s16 pos); void cancelPendingItems(); @@ -59,7 +59,7 @@ public: protected: void runCompletionCallbacks( - const v3s16 &pos, EmergeAction action, + v3s16 pos, EmergeAction action, const EmergeCallbackList &callbacks); private: @@ -79,8 +79,20 @@ private: bool popBlockEmerge(v3s16 *pos, BlockEmergeData *bedata); - EmergeAction getBlockOrStartGen( - const v3s16 &pos, bool allow_gen, MapBlock **block, BlockMakeData *data); + /** + * Try to get a block from memory and decide what to do. + * + * @param pos block position + * @param from_db serialized block data, optional + * (for second call after EMERGE_FROM_DISK was returned) + * @param allow_gen allow invoking mapgen? + * @param block output pointer for block + * @param data info for mapgen + * @return what to do for this block + */ + EmergeAction getBlockOrStartGen(v3s16 pos, bool allow_gen, + const std::string *from_db, MapBlock **block, BlockMakeData *data); + MapBlock *finishGen(v3s16 pos, BlockMakeData *bmdata, std::map *modified_blocks); diff --git a/src/main.cpp b/src/main.cpp index 30db81aa9..9f737b86d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1278,8 +1278,7 @@ static bool recompress_map_database(const GameParams &game_params, const Setting { MapBlock mb(v3s16(0,0,0), &server); - u8 ver = readU8(iss); - mb.deSerialize(iss, ver, true); + ServerMap::deSerializeBlock(&mb, iss); oss.str(""); oss.clear(); diff --git a/src/servermap.cpp b/src/servermap.cpp index 0248497c1..f57e5b5e4 100644 --- a/src/servermap.cpp +++ b/src/servermap.cpp @@ -51,6 +51,18 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "database/database-postgresql.h" #endif +/* + Helpers +*/ + +void MapDatabaseAccessor::loadBlock(v3s16 blockpos, std::string &ret) +{ + ret.clear(); + dbase->loadBlock(blockpos, &ret); + if (ret.empty() && dbase_ro) + dbase_ro->loadBlock(blockpos, &ret); +} + /* ServerMap */ @@ -67,7 +79,7 @@ ServerMap::ServerMap(const std::string &savedir, IGameDef *gamedef, emerge->map_settings_mgr = &settings_mgr; /* - Try to load map; if not found, create a new one. + Try to open map; if not found, create a new one. */ // Determine which database backend to use @@ -79,10 +91,10 @@ ServerMap::ServerMap(const std::string &savedir, IGameDef *gamedef, conf.set("backend", "sqlite3"); } std::string backend = conf.get("backend"); - dbase = createDatabase(backend, savedir, conf); + m_db.dbase = createDatabase(backend, savedir, conf); if (conf.exists("readonly_backend")) { std::string readonly_dir = savedir + DIR_DELIM + "readonly"; - dbase_ro = createDatabase(conf.get("readonly_backend"), readonly_dir, conf); + m_db.dbase_ro = createDatabase(conf.get("readonly_backend"), readonly_dir, conf); } if (!conf.updateConfigFile(conf_path.c_str())) errorstream << "ServerMap::ServerMap(): Failed to update world.mt!" << std::endl; @@ -90,6 +102,9 @@ ServerMap::ServerMap(const std::string &savedir, IGameDef *gamedef, m_savedir = savedir; m_map_saving_enabled = false; + // Inform EmergeManager of db handles + m_emerge->initMap(&m_db); + m_save_time_counter = mb->addCounter( "minetest_map_save_time", "Time spent saving blocks (in microseconds)"); m_save_count_counter = mb->addCounter( @@ -159,11 +174,15 @@ ServerMap::~ServerMap() << ", exception: " << e.what() << std::endl; } - /* - Close database if it was opened - */ - delete dbase; - delete dbase_ro; + m_emerge->resetMap(); + + { + MutexAutoLock dblock(m_db.mutex); + delete m_db.dbase; + m_db.dbase = nullptr; + delete m_db.dbase_ro; + m_db.dbase_ro = nullptr; + } deleteDetachedBlocks(); } @@ -547,9 +566,10 @@ void ServerMap::save(ModifiedState save_level) void ServerMap::listAllLoadableBlocks(std::vector &dst) { - dbase->listAllLoadableBlocks(dst); - if (dbase_ro) - dbase_ro->listAllLoadableBlocks(dst); + MutexAutoLock dblock(m_db.mutex); + m_db.dbase->listAllLoadableBlocks(dst); + if (m_db.dbase_ro) + m_db.dbase_ro->listAllLoadableBlocks(dst); } void ServerMap::listAllLoadedBlocks(std::vector &dst) @@ -597,17 +617,21 @@ MapDatabase *ServerMap::createDatabase( void ServerMap::beginSave() { - dbase->beginSave(); + MutexAutoLock dblock(m_db.mutex); + m_db.dbase->beginSave(); } void ServerMap::endSave() { - dbase->endSave(); + MutexAutoLock dblock(m_db.mutex); + m_db.dbase->endSave(); } bool ServerMap::saveBlock(MapBlock *block) { - return saveBlock(block, dbase, m_map_compression_level); + // FIXME: serialization happens under mutex + MutexAutoLock dblock(m_db.mutex); + return saveBlock(block, m_db.dbase, m_map_compression_level); } bool ServerMap::saveBlock(MapBlock *block, MapDatabase *db, int compression_level) @@ -634,18 +658,27 @@ bool ServerMap::saveBlock(MapBlock *block, MapDatabase *db, int compression_leve return ret; } -void ServerMap::loadBlock(std::string *blob, v3s16 p3d, MapSector *sector, bool save_after_load) +void ServerMap::deSerializeBlock(MapBlock *block, std::istream &is) { + ScopeProfiler sp(g_profiler, "ServerMap: deSer block", SPT_AVG, PRECISION_MICRO); + + u8 version = readU8(is); + if (is.fail()) + throw SerializationError("Failed to read MapBlock version"); + + block->deSerialize(is, version, true); +} + +MapBlock *ServerMap::loadBlock(const std::string &blob, v3s16 p3d, bool save_after_load) +{ + ScopeProfiler sp(g_profiler, "ServerMap: load block", SPT_AVG, PRECISION_MICRO); + MapBlock *block = nullptr; + bool created_new = false; + try { - std::istringstream is(*blob, std::ios_base::binary); + v2s16 p2d(p3d.X, p3d.Z); + MapSector *sector = createSector(p2d); - u8 version = readU8(is); - - if(is.fail()) - throw SerializationError("ServerMap::loadBlock(): Failed" - " to read MapBlock version"); - - MapBlock *block = nullptr; std::unique_ptr block_created_new; block = sector->getBlockNoCreateNoEx(p3d.Y); if (!block) { @@ -654,31 +687,16 @@ void ServerMap::loadBlock(std::string *blob, v3s16 p3d, MapSector *sector, bool } { - ScopeProfiler sp(g_profiler, "ServerMap: deSer block", SPT_AVG, PRECISION_MICRO); - block->deSerialize(is, version, true); + std::istringstream iss(blob, std::ios_base::binary); + deSerializeBlock(block, iss); } // If it's a new block, insert it to the map if (block_created_new) { sector->insertBlock(std::move(block_created_new)); - ReflowScan scanner(this, m_emerge->ndef); - scanner.scan(block, &m_transforming_liquid); + created_new = true; } - - /* - Save blocks loaded in old format in new format - */ - - //if(version < SER_FMT_VER_HIGHEST_READ || save_after_load) - // Only save if asked to; no need to update version - if(save_after_load) - saveBlock(block); - - // We just loaded it from, so it's up-to-date. - block->resetModified(); - } - catch(SerializationError &e) - { + } catch (SerializationError &e) { errorstream<<"Invalid block data in database" <<" ("<ndef); + scanner.scan(block, &m_transforming_liquid); - std::string ret; - dbase->loadBlock(blockpos, &ret); - if (!ret.empty()) { - loadBlock(&ret, blockpos, createSector(p2d), false); - } else if (dbase_ro) { - dbase_ro->loadBlock(blockpos, &ret); - if (!ret.empty()) { - loadBlock(&ret, blockpos, createSector(p2d), false); - } - } else { - return NULL; - } - - MapBlock *block = getBlockNoCreateNoEx(blockpos); - if (created_new && (block != NULL)) { std::map modified_blocks; // Fix lighting if necessary voxalgo::update_block_border_lighting(this, block, modified_blocks); if (!modified_blocks.empty()) { - //Modified lighting, send event MapEditEvent event; event.type = MEET_OTHER; event.setModifiedBlocks(modified_blocks); dispatchEvent(event); } } + + if (save_after_load) + saveBlock(block); + + // We just loaded it, so it's up-to-date. + block->resetModified(); + return block; } +MapBlock* ServerMap::loadBlock(v3s16 blockpos) +{ + std::string data; + { + ScopeProfiler sp(g_profiler, "ServerMap: load block - sync (sum)"); + MutexAutoLock dblock(m_db.mutex); + m_db.loadBlock(blockpos, data); + } + + if (!data.empty()) + return loadBlock(data, blockpos); + return getBlockNoCreateNoEx(blockpos); +} + bool ServerMap::deleteBlock(v3s16 blockpos) { - if (!dbase->deleteBlock(blockpos)) + MutexAutoLock dblock(m_db.mutex); + if (!m_db.dbase->deleteBlock(blockpos)) return false; MapBlock *block = getBlockNoCreateNoEx(blockpos); diff --git a/src/servermap.h b/src/servermap.h index 7a8a84b9b..3a2102668 100644 --- a/src/servermap.h +++ b/src/servermap.h @@ -33,9 +33,22 @@ class IRollbackManager; class EmergeManager; class ServerEnvironment; struct BlockMakeData; - class MetricsBackend; +// TODO: this could wrap all calls to MapDatabase, including locking +struct MapDatabaseAccessor { + /// Lock, to be taken for any operation + std::mutex mutex; + /// Main database + MapDatabase *dbase = nullptr; + /// Fallback database for read operations + MapDatabase *dbase_ro = nullptr; + + /// Load a block, taking dbase_ro into account. + /// @note call locked + void loadBlock(v3s16 blockpos, std::string &ret); +}; + /* ServerMap @@ -75,7 +88,7 @@ public: MapBlock *createBlock(v3s16 p); /* - Forcefully get a block from somewhere. + Forcefully get a block from somewhere (blocking!). - Memory - Load from disk - Create blank filled with CONTENT_IGNORE @@ -114,9 +127,16 @@ public: bool saveBlock(MapBlock *block) override; static bool saveBlock(MapBlock *block, MapDatabase *db, int compression_level = -1); - MapBlock* loadBlock(v3s16 p); - // Database version - void loadBlock(std::string *blob, v3s16 p3d, MapSector *sector, bool save_after_load=false); + + // Load block in a synchronous fashion + MapBlock *loadBlock(v3s16 p); + /// Load a block that was already read from disk. Used by EmergeManager. + /// @return non-null block (but can be blank) + MapBlock *loadBlock(const std::string &blob, v3s16 p, bool save_after_load=false); + + // Helper for deserializing blocks from disk + // @throws SerializationError + static void deSerializeBlock(MapBlock *block, std::istream &is); // Blocks are removed from the map but not deleted from memory until // deleteDetachedBlocks() is called, since pointers to them may still exist @@ -185,8 +205,8 @@ private: This is reset to false when written on disk. */ bool m_map_metadata_changed = true; - MapDatabase *dbase = nullptr; - MapDatabase *dbase_ro = nullptr; + + MapDatabaseAccessor m_db; // Map metrics MetricGaugePtr m_loaded_blocks_gauge;