Fix broken thread stop handling

This commit is contained in:
sapier 2013-11-30 01:51:54 +01:00
parent d02ce1cf4d
commit d19a69cd0d
5 changed files with 35 additions and 6 deletions

@ -47,6 +47,7 @@ public:
int Kill(); int Kill();
virtual void *Thread() = 0; virtual void *Thread() = 0;
bool IsRunning(); bool IsRunning();
bool StopRequested();
void *GetReturnValue(); void *GetReturnValue();
bool IsSameThread(); bool IsSameThread();
protected: protected:
@ -69,6 +70,7 @@ private:
#endif // WIN32 #endif // WIN32
void *retval; void *retval;
bool running; bool running;
bool requeststop;
JMutex runningmutex; JMutex runningmutex;
JMutex continuemutex,continuemutex2; JMutex continuemutex,continuemutex2;

@ -34,6 +34,7 @@ JThread::JThread()
{ {
retval = NULL; retval = NULL;
mutexinit = false; mutexinit = false;
requeststop = false;
running = false; running = false;
} }
@ -44,7 +45,7 @@ JThread::~JThread()
void JThread::Stop() { void JThread::Stop() {
runningmutex.Lock(); runningmutex.Lock();
running = false; requeststop = true;
runningmutex.Unlock(); runningmutex.Unlock();
} }
@ -78,6 +79,7 @@ int JThread::Start()
runningmutex.Unlock(); runningmutex.Unlock();
return ERR_JTHREAD_ALREADYRUNNING; return ERR_JTHREAD_ALREADYRUNNING;
} }
requeststop = false;
runningmutex.Unlock(); runningmutex.Unlock();
pthread_attr_t attr; pthread_attr_t attr;
@ -141,6 +143,15 @@ bool JThread::IsRunning()
return r; return r;
} }
bool JThread::StopRequested() {
bool r;
runningmutex.Lock();
r = requeststop;
runningmutex.Unlock();
return r;
}
void *JThread::GetReturnValue() void *JThread::GetReturnValue()
{ {
void *val; void *val;

@ -35,6 +35,7 @@ JThread::JThread()
{ {
retval = NULL; retval = NULL;
mutexinit = false; mutexinit = false;
requeststop = false;
running = false; running = false;
} }
@ -45,7 +46,7 @@ JThread::~JThread()
void JThread::Stop() { void JThread::Stop() {
runningmutex.Lock(); runningmutex.Lock();
running = false; requeststop = false;
runningmutex.Unlock(); runningmutex.Unlock();
} }
@ -76,6 +77,7 @@ int JThread::Start()
runningmutex.Unlock(); runningmutex.Unlock();
return ERR_JTHREAD_ALREADYRUNNING; return ERR_JTHREAD_ALREADYRUNNING;
} }
requeststop = false;e
runningmutex.Unlock(); runningmutex.Unlock();
continuemutex.Lock(); continuemutex.Lock();
@ -134,6 +136,15 @@ bool JThread::IsRunning()
return r; return r;
} }
bool JThread::StopRequested() {
bool r;
runningmutex.Lock();
r = requeststop;
runningmutex.Unlock();
return r;
}
void *JThread::GetReturnValue() void *JThread::GetReturnValue()
{ {
void *val; void *val;

@ -149,9 +149,11 @@ LuaJobInfo AsyncEngine::getJob() {
m_JobQueueMutex.Lock(); m_JobQueueMutex.Lock();
LuaJobInfo retval; LuaJobInfo retval;
retval.valid = false;
if (m_JobQueue.size() != 0) { if (m_JobQueue.size() != 0) {
retval = m_JobQueue.front(); retval = m_JobQueue.front();
retval.valid = true;
m_JobQueue.erase((m_JobQueue.begin())); m_JobQueue.erase((m_JobQueue.begin()));
} }
m_JobQueueMutex.Unlock(); m_JobQueueMutex.Unlock();
@ -322,11 +324,12 @@ void* AsyncWorkerThread::worker_thread_main() {
assert("no future with broken builtin async environment scripts" == 0); assert("no future with broken builtin async environment scripts" == 0);
} }
/** main loop **/ /** main loop **/
while(IsRunning()) { while(!StopRequested()) {
//wait for job //wait for job
LuaJobInfo toprocess = m_JobDispatcher->getJob(); LuaJobInfo toprocess = m_JobDispatcher->getJob();
if (!IsRunning()) { continue; } if (toprocess.valid == false) { continue; }
if (StopRequested()) { continue; }
//first push error handler //first push error handler
lua_pushcfunction(m_LuaStack, script_error_handler); lua_pushcfunction(m_LuaStack, script_error_handler);
@ -350,7 +353,7 @@ void* AsyncWorkerThread::worker_thread_main() {
toprocess.serializedParams.c_str(), toprocess.serializedParams.c_str(),
toprocess.serializedParams.length()); toprocess.serializedParams.length());
if (!IsRunning()) { continue; } if (StopRequested()) { continue; }
if(lua_pcall(m_LuaStack, 2, 2, errorhandler)) { if(lua_pcall(m_LuaStack, 2, 2, errorhandler)) {
scriptError("Async WORKER thread: %s\n", lua_tostring(m_LuaStack, -1)); scriptError("Async WORKER thread: %s\n", lua_tostring(m_LuaStack, -1));
toprocess.serializedResult="ERROR"; toprocess.serializedResult="ERROR";
@ -362,7 +365,7 @@ void* AsyncWorkerThread::worker_thread_main() {
toprocess.serializedResult = std::string(retval,lenght); toprocess.serializedResult = std::string(retval,lenght);
} }
if (!IsRunning()) { continue; } if (StopRequested()) { continue; }
//put job result //put job result
m_JobDispatcher->putJobResult(toprocess); m_JobDispatcher->putJobResult(toprocess);
} }

@ -57,6 +57,8 @@ struct LuaJobInfo {
std::string serializedResult; std::string serializedResult;
/** jobid used to identify a job and match it to callback **/ /** jobid used to identify a job and match it to callback **/
unsigned int JobId; unsigned int JobId;
/** valid marker **/
bool valid;
}; };
/** class encapsulating a asynchronous working environment **/ /** class encapsulating a asynchronous working environment **/