Make MutexQueue use jsemaphore for signaling

This commit is contained in:
sapier 2014-01-06 12:45:42 +01:00
parent 10fdbf7375
commit 8b0b857eaa
13 changed files with 248 additions and 99 deletions

@ -286,6 +286,20 @@ Client::Client(
} }
} }
void Client::Stop()
{
//request all client managed threads to stop
m_mesh_update_thread.Stop();
}
bool Client::isShutdown()
{
if (!m_mesh_update_thread.IsRunning()) return true;
return false;
}
Client::~Client() Client::~Client()
{ {
{ {
@ -296,7 +310,7 @@ Client::~Client()
m_mesh_update_thread.Stop(); m_mesh_update_thread.Stop();
m_mesh_update_thread.Wait(); m_mesh_update_thread.Wait();
while(!m_mesh_update_thread.m_queue_out.empty()) { while(!m_mesh_update_thread.m_queue_out.empty()) {
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front(); MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
delete r.mesh; delete r.mesh;
} }
@ -692,7 +706,7 @@ void Client::step(float dtime)
while(!m_mesh_update_thread.m_queue_out.empty()) while(!m_mesh_update_thread.m_queue_out.empty())
{ {
num_processed_meshes++; num_processed_meshes++;
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front(); MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
MapBlock *block = m_env.getMap().getBlockNoCreateNoEx(r.p); MapBlock *block = m_env.getMap().getBlockNoCreateNoEx(r.p);
if(block) if(block)
{ {

@ -289,6 +289,14 @@ public:
); );
~Client(); ~Client();
/*
request all threads managed by client to be stopped
*/
void Stop();
bool isShutdown();
/* /*
The name of the local player should already be set when The name of the local player should already be set when
calling this, as it is sent in the initialization. calling this, as it is sent in the initialization.

@ -592,8 +592,9 @@ void * Connection::Thread()
runTimeouts(dtime); runTimeouts(dtime);
//NOTE this is only thread safe for ONE consumer thread!
while(!m_command_queue.empty()){ while(!m_command_queue.empty()){
ConnectionCommand c = m_command_queue.pop_front(); ConnectionCommand c = m_command_queue.pop_frontNoEx();
processCommand(c); processCommand(c);
} }
@ -1556,7 +1557,7 @@ ConnectionEvent Connection::getEvent()
e.type = CONNEVENT_NONE; e.type = CONNEVENT_NONE;
return e; return e;
} }
return m_event_queue.pop_front(); return m_event_queue.pop_frontNoEx();
} }
ConnectionEvent Connection::waitEvent(u32 timeout_ms) ConnectionEvent Connection::waitEvent(u32 timeout_ms)

@ -3433,6 +3433,16 @@ void the_game(
chat_backend.addMessage(L"", L"# Disconnected."); chat_backend.addMessage(L"", L"# Disconnected.");
chat_backend.addMessage(L"", L""); chat_backend.addMessage(L"", L"");
client.Stop();
//force answer all texture and shader jobs (TODO return empty values)
while(!client.isShutdown()) {
tsrc->processQueue();
shsrc->processQueue();
sleep_ms(100);
}
// Client scope (client is destructed before destructing *def and tsrc) // Client scope (client is destructed before destructing *def and tsrc)
}while(0); }while(0);
} // try-catch } // try-catch

@ -594,7 +594,7 @@ protected:
*/ */
while (!m_requests.empty()) { while (!m_requests.empty()) {
Request req = m_requests.pop_front(); Request req = m_requests.pop_frontNoEx();
processRequest(req); processRequest(req);
} }
processQueued(&pool); processQueued(&pool);

@ -642,6 +642,7 @@ public:
void processQueue(IGameDef *gamedef) void processQueue(IGameDef *gamedef)
{ {
#ifndef SERVER #ifndef SERVER
//NOTE this is only thread safe for ONE consumer thread!
while(!m_get_clientcached_queue.empty()) while(!m_get_clientcached_queue.empty())
{ {
GetRequest<std::string, ClientCached*, u8, u8> GetRequest<std::string, ClientCached*, u8, u8>

@ -36,6 +36,7 @@ public:
void Post(); void Post();
void Wait(); void Wait();
bool Wait(unsigned int time_ms);
int GetValue(); int GetValue();

@ -17,8 +17,12 @@ with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/ */
#include <assert.h> #include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include "jthread/jsemaphore.h" #include "jthread/jsemaphore.h"
#define UNUSED(expr) do { (void)(expr); } while (0) #define UNUSED(expr) do { (void)(expr); } while (0)
JSemaphore::JSemaphore() { JSemaphore::JSemaphore() {
int sem_init_retval = sem_init(&m_semaphore,0,0); int sem_init_retval = sem_init(&m_semaphore,0,0);
assert(sem_init_retval == 0); assert(sem_init_retval == 0);
@ -49,6 +53,33 @@ void JSemaphore::Wait() {
UNUSED(sem_wait_retval); UNUSED(sem_wait_retval);
} }
bool JSemaphore::Wait(unsigned int time_ms) {
struct timespec waittime;
struct timeval now;
if (gettimeofday(&now, NULL) == -1) {
assert("Unable to get time by clock_gettime!" == 0);
return false;
}
waittime.tv_nsec = ((time_ms % 1000) * 1000 * 1000) + (now.tv_usec * 1000);
waittime.tv_sec = (time_ms / 1000) + (waittime.tv_nsec / (1000*1000*1000)) + now.tv_sec;
waittime.tv_nsec %= 1000*1000*1000;
errno = 0;
int sem_wait_retval = sem_timedwait(&m_semaphore,&waittime);
if (sem_wait_retval == 0)
{
return true;
}
else {
assert((errno == ETIMEDOUT) || (errno == EINTR));
return false;
}
return sem_wait_retval == 0 ? true : false;
}
int JSemaphore::GetValue() { int JSemaphore::GetValue() {
int retval = 0; int retval = 0;

@ -51,6 +51,21 @@ void JSemaphore::Wait() {
INFINITE); INFINITE);
} }
bool JSemaphore::Wait(unsigned int time_ms) {
unsigned int retval = WaitForSingleObject(
m_hSemaphore,
time_ms);
if (retval == WAIT_OBJECT_0)
{
return true;
}
else {
assert(retval == WAIT_TIMEOUT);
return false;
}
}
int JSemaphore::GetValue() { int JSemaphore::GetValue() {
long int retval = 0; long int retval = 0;

@ -427,21 +427,18 @@ u32 ShaderSource::getShaderId(const std::string &name)
/* infostream<<"Waiting for shader from main thread, name=\"" /* infostream<<"Waiting for shader from main thread, name=\""
<<name<<"\""<<std::endl;*/ <<name<<"\""<<std::endl;*/
try{ while(true) {
while(true) { GetResult<std::string, u32, u8, u8>
// Wait result for a second result = result_queue.pop_frontNoEx();
GetResult<std::string, u32, u8, u8>
result = result_queue.pop_front(1000);
if (result.key == name) { if (result.key == name) {
return result.item; return result.item;
} }
else {
errorstream << "Got shader with invalid name: " << result.key << std::endl;
} }
} }
catch(ItemNotFoundException &e){
errorstream<<"Waiting for shader " << name << " timed out."<<std::endl;
return 0;
}
} }
infostream<<"getShaderId(): Failed"<<std::endl; infostream<<"getShaderId(): Failed"<<std::endl;
@ -537,6 +534,7 @@ void ShaderSource::processQueue()
/* /*
Fetch shaders Fetch shaders
*/ */
//NOTE this is only thread safe for ONE consumer thread!
if(!m_get_shader_queue.empty()){ if(!m_get_shader_queue.empty()){
GetRequest<std::string, u32, u8, u8> GetRequest<std::string, u32, u8, u8>
request = m_get_shader_queue.pop(); request = m_get_shader_queue.pop();

@ -775,6 +775,7 @@ void TextureSource::processQueue()
/* /*
Fetch textures Fetch textures
*/ */
//NOTE this is only thread safe for ONE consumer thread!
if(!m_get_texture_queue.empty()) if(!m_get_texture_queue.empty())
{ {
GetRequest<std::string, u32, u8, u8> GetRequest<std::string, u32, u8, u8>

@ -24,7 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "../exceptions.h" #include "../exceptions.h"
#include "../jthread/jmutex.h" #include "../jthread/jmutex.h"
#include "../jthread/jmutexautolock.h" #include "../jthread/jmutexautolock.h"
#include "../porting.h" // For sleep_ms #include "../jthread/jsemaphore.h"
#include <list> #include <list>
#include <vector> #include <vector>
#include <map> #include <map>
@ -201,6 +201,12 @@ public:
++m_list_size; ++m_list_size;
} }
void push_front(T t)
{
m_list.push_front(t);
++m_list_size;
}
T pop_front() T pop_front()
{ {
if(m_list.empty()) if(m_list.empty())
@ -247,86 +253,141 @@ template<typename T>
class MutexedQueue class MutexedQueue
{ {
public: public:
template<typename Key, typename U, typename Caller, typename CallerData>
friend class RequestQueue;
MutexedQueue() MutexedQueue()
{ {
} }
bool empty() bool empty()
{ {
JMutexAutoLock lock(m_mutex); JMutexAutoLock lock(m_mutex);
return m_list.empty(); return (m_size.GetValue() == 0);
} }
void push_back(T t) void push_back(T t)
{ {
JMutexAutoLock lock(m_mutex); JMutexAutoLock lock(m_mutex);
m_list.push_back(t); m_list.push_back(t);
m_size.Post();
} }
T pop_front(u32 wait_time_max_ms=0)
/* this version of pop_front returns a empty element of T on timeout.
* Make sure default constructor of T creates a recognizable "empty" element
*/
T pop_frontNoEx(u32 wait_time_max_ms)
{ {
u32 wait_time_ms = 0; if (m_size.Wait(wait_time_max_ms))
for(;;)
{ {
{ JMutexAutoLock lock(m_mutex);
JMutexAutoLock lock(m_mutex);
if(!m_list.empty()) typename std::list<T>::iterator begin = m_list.begin();
{ T t = *begin;
typename std::list<T>::iterator begin = m_list.begin(); m_list.erase(begin);
T t = *begin; return t;
m_list.erase(begin); }
return t; else
} {
return T();
if(wait_time_ms >= wait_time_max_ms)
throw ItemNotFoundException("MutexedQueue: queue is empty");
}
// Wait a while before trying again
sleep_ms(10);
wait_time_ms += 10;
} }
} }
T pop_front(u32 wait_time_max_ms)
{
if (m_size.Wait(wait_time_max_ms))
{
JMutexAutoLock lock(m_mutex);
typename std::list<T>::iterator begin = m_list.begin();
T t = *begin;
m_list.erase(begin);
return t;
}
else
{
throw ItemNotFoundException("MutexedQueue: queue is empty");
}
}
T pop_frontNoEx()
{
m_size.Wait();
JMutexAutoLock lock(m_mutex);
typename std::list<T>::iterator begin = m_list.begin();
T t = *begin;
m_list.erase(begin);
return t;
}
T pop_back(u32 wait_time_max_ms=0) T pop_back(u32 wait_time_max_ms=0)
{ {
u32 wait_time_ms = 0; if (m_size.Wait(wait_time_max_ms))
for(;;)
{ {
{ JMutexAutoLock lock(m_mutex);
JMutexAutoLock lock(m_mutex);
if(!m_list.empty()) typename std::list<T>::iterator last = m_list.end();
{ last--;
typename std::list<T>::iterator last = m_list.end(); T t = *last;
last--; m_list.erase(last);
T t = *last; return t;
m_list.erase(last); }
return t; else
} {
throw ItemNotFoundException("MutexedQueue: queue is empty");
if(wait_time_ms >= wait_time_max_ms)
throw ItemNotFoundException("MutexedQueue: queue is empty");
}
// Wait a while before trying again
sleep_ms(10);
wait_time_ms += 10;
} }
} }
/* this version of pop_back returns a empty element of T on timeout.
* Make sure default constructor of T creates a recognizable "empty" element
*/
T pop_backNoEx(u32 wait_time_max_ms=0)
{
if (m_size.Wait(wait_time_max_ms))
{
JMutexAutoLock lock(m_mutex);
typename std::list<T>::iterator last = m_list.end();
last--;
T t = *last;
m_list.erase(last);
return t;
}
else
{
return T();
}
}
T pop_backNoEx()
{
m_size.Wait();
JMutexAutoLock lock(m_mutex);
typename std::list<T>::iterator last = m_list.end();
last--;
T t = *last;
m_list.erase(last);
return t;
}
protected:
JMutex & getMutex() JMutex & getMutex()
{ {
return m_mutex; return m_mutex;
} }
// NEVER EVER modify the >>list<< you got by using this function!
// You may only modify it's content
std::list<T> & getList() std::list<T> & getList()
{ {
return m_list; return m_list;
} }
protected:
JMutex m_mutex; JMutex m_mutex;
std::list<T> m_list; std::list<T> m_list;
JSemaphore m_size;
}; };
#endif #endif

@ -24,6 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "../jthread/jthread.h" #include "../jthread/jthread.h"
#include "../jthread/jmutex.h" #include "../jthread/jmutex.h"
#include "../jthread/jmutexautolock.h" #include "../jthread/jmutexautolock.h"
#include "porting.h"
template<typename T> template<typename T>
class MutexedVariable class MutexedVariable
@ -123,36 +124,38 @@ public:
void add(Key key, Caller caller, CallerData callerdata, void add(Key key, Caller caller, CallerData callerdata,
ResultQueue<Key, T, Caller, CallerData> *dest) ResultQueue<Key, T, Caller, CallerData> *dest)
{ {
JMutexAutoLock lock(m_queue.getMutex());
/*
If the caller is already on the list, only update CallerData
*/
for(typename std::list< GetRequest<Key, T, Caller, CallerData> >::iterator
i = m_queue.getList().begin();
i != m_queue.getList().end(); ++i)
{ {
GetRequest<Key, T, Caller, CallerData> &request = *i; JMutexAutoLock lock(m_queue.getMutex());
if(request.key == key) /*
If the caller is already on the list, only update CallerData
*/
for(typename std::list< GetRequest<Key, T, Caller, CallerData> >::iterator
i = m_queue.getList().begin();
i != m_queue.getList().end(); ++i)
{ {
for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator GetRequest<Key, T, Caller, CallerData> &request = *i;
i = request.callers.begin();
i != request.callers.end(); ++i) if(request.key == key)
{ {
CallerInfo<Caller, CallerData, Key, T> &ca = *i; for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
if(ca.caller == caller) i = request.callers.begin();
i != request.callers.end(); ++i)
{ {
ca.data = callerdata; CallerInfo<Caller, CallerData, Key, T> &ca = *i;
return; if(ca.caller == caller)
{
ca.data = callerdata;
return;
}
} }
CallerInfo<Caller, CallerData, Key, T> ca;
ca.caller = caller;
ca.data = callerdata;
ca.dest = dest;
request.callers.push_back(ca);
return;
} }
CallerInfo<Caller, CallerData, Key, T> ca;
ca.caller = caller;
ca.data = callerdata;
ca.dest = dest;
request.callers.push_back(ca);
return;
} }
} }
@ -168,12 +171,17 @@ public:
ca.dest = dest; ca.dest = dest;
request.callers.push_back(ca); request.callers.push_back(ca);
m_queue.getList().push_back(request); m_queue.push_back(request);
} }
GetRequest<Key, T, Caller, CallerData> pop(bool wait_if_empty=false) GetRequest<Key, T, Caller, CallerData> pop(unsigned int timeout_ms)
{ {
return m_queue.pop_front(wait_if_empty); return m_queue.pop_front(timeout_ms);
}
GetRequest<Key, T, Caller, CallerData> pop()
{
return m_queue.pop_frontNoEx();
} }
void pushResult(GetRequest<Key, T, Caller, CallerData> req, void pushResult(GetRequest<Key, T, Caller, CallerData> req,