|
IBR-DTNSuite 0.6
|
00001 /* 00002 * Thread.cpp 00003 * 00004 * Created on: 30.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrcommon/config.h" 00009 #include "ibrcommon/thread/Thread.h" 00010 #include "ibrcommon/thread/MutexLock.h" 00011 #include "ibrcommon/Logger.h" 00012 #include <stdexcept> 00013 #include <pthread.h> 00014 #include <sys/times.h> 00015 #include <stdio.h> 00016 #include <unistd.h> 00017 #include <signal.h> 00018 00019 #ifdef __DEVELOPMENT_ASSERTIONS__ 00020 #include <cassert> 00021 #endif 00022 00023 namespace ibrcommon 00024 { 00025 void Thread::finalize_thread(void *obj) 00026 { 00027 #ifdef __DEVELOPMENT_ASSERTIONS__ 00028 // the given object should never be null 00029 assert(obj != NULL); 00030 #endif 00031 // cast the thread object 00032 Thread *th = static_cast<Thread *>(obj); 00033 00034 // set the state to finalizing, blocking all threads until this is done 00035 th->_state = THREAD_FINALIZING; 00036 00037 // call the finally method 00038 th->finally(); 00039 00040 // set the state to DONE 00041 th->_state = THREAD_FINALIZED; 00042 00043 // delete the thread-object is requested 00044 if (th->__delete_on_exit) 00045 { 00046 delete th; 00047 } 00048 } 00049 00050 void* Thread::exec_thread(void *obj) 00051 { 00052 #ifdef __DEVELOPMENT_ASSERTIONS__ 00053 // the given object should never be null 00054 assert(obj != NULL); 00055 #endif 00056 // cast the thread object 00057 Thread *th = static_cast<Thread *>(obj); 00058 00059 // add cleanup-handler to finalize threads 00060 pthread_cleanup_push(Thread::finalize_thread, (void *)th); 00061 00062 // set the state to RUNNING 00063 try { 00064 // enter the setup stage, if this thread is still in preparation state 00065 { 00066 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = th->_state.lock(); 00067 if (ls != THREAD_PREPARE) 00068 { 00069 throw ibrcommon::Exception("Thread has been canceled."); 00070 } 00071 00072 ls = THREAD_SETUP; 00073 } 00074 00075 // call the setup method 00076 th->setup(); 00077 } catch (const std::exception &ex) { 00078 // set the state to error 00079 th->_state = THREAD_ERROR; 00080 00081 IBRCOMMON_LOGGER_DEBUG(40) << "Thread setup aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00082 00083 // exit this thread 00084 Thread::exit(); 00085 } 00086 00087 try { 00088 // set the state to running 00089 th->_state = THREAD_RUNNING; 00090 00091 // run threads run routine 00092 th->run(); 00093 } catch (const std::exception &ex) { 00094 // set the state to error 00095 th->_state = THREAD_ERROR; 00096 00097 // an exception occured in run() or ready(), but this is allowed 00098 IBRCOMMON_LOGGER_DEBUG(40) << "Thread execution aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00099 Thread::exit(); 00100 } 00101 00102 // remove cleanup-handler and call it 00103 pthread_cleanup_pop(1); 00104 00105 // exit the thread 00106 Thread::exit(); 00107 00108 return NULL; 00109 } 00110 00111 Thread::Thread(size_t size, bool delete_on_exit) 00112 : _state(THREAD_INITIALIZED, THREAD_JOINED), tid(0), stack(size), priority(0), __delete_on_exit(delete_on_exit) 00113 { 00114 pthread_attr_init(&attr); 00115 } 00116 00117 void Thread::yield(void) 00118 { 00119 #if defined(HAVE_PTHREAD_YIELD_NP) 00120 pthread_yield_np(); 00121 #elif defined(HAVE_PTHREAD_YIELD) 00122 pthread_yield(); 00123 #else 00124 sched_yield(); 00125 #endif 00126 } 00127 00128 void Thread::cancellation_point() 00129 { 00130 ::pthread_testcancel(); 00131 } 00132 00133 void Thread::concurrency(int level) 00134 { 00135 #if defined(HAVE_PTHREAD_SETCONCURRENCY) 00136 pthread_setconcurrency(level); 00137 #endif 00138 } 00139 00140 void Thread::sleep(size_t timeout) 00141 { 00142 timespec ts; 00143 ts.tv_sec = timeout / 1000l; 00144 ts.tv_nsec = (timeout % 1000l) * 1000000l; 00145 #if defined(HAVE_PTHREAD_DELAY) 00146 pthread_delay(&ts); 00147 #elif defined(HAVE_PTHREAD_DELAY_NP) 00148 pthread_delay_np(&ts); 00149 #else 00150 usleep(timeout * 1000); 00151 #endif 00152 } 00153 00154 Thread::~Thread() 00155 { 00156 pthread_attr_destroy(&attr); 00157 } 00158 00159 void Thread::detach(void) 00160 { 00161 pthread_detach(tid); 00162 } 00163 00164 void Thread::exit(void) 00165 { 00166 pthread_exit(NULL); 00167 } 00168 00169 int Thread::kill(int sig) 00170 { 00171 if (tid == 0) return -1; 00172 return pthread_kill(tid, sig); 00173 } 00174 00175 void Thread::cancel() throw (ThreadException) 00176 { 00177 // block multiple cancel calls 00178 { 00179 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00180 00181 // is this thread running? 00182 if (ls == THREAD_INITIALIZED) 00183 { 00184 // deny any futher startup of this thread 00185 ls == THREAD_JOINED; 00186 return; 00187 } 00188 00189 if ( (ls == THREAD_PREPARE) || (ls == THREAD_SETUP) ) 00190 { 00191 // wait until RUNNING, ERROR, JOINED or FINALIZED is reached 00192 ls.wait(THREAD_RUNNING | THREAD_JOINED | THREAD_ERROR | THREAD_FINALIZED | THREAD_CANCELLED); 00193 } 00194 00195 // exit if the thread is not running 00196 if (ls != THREAD_RUNNING) return; 00197 00198 // set state to cancelled 00199 ls = THREAD_CANCELLED; 00200 } 00201 00202 // run custom cancellation 00203 if (!__cancellation()) 00204 { 00205 // check if the thread is still alive 00206 if (Thread::kill(0) == 0) 00207 { 00208 int ret = pthread_cancel(tid); 00209 if ( 0 != ret) throw ThreadException(ret, "pthread_cancel"); 00210 } 00211 } 00212 } 00213 00214 bool Thread::equal(pthread_t t1, pthread_t t2) 00215 { 00216 return (pthread_equal(t1, t2) != 0); 00217 } 00218 00219 Thread::CancelProtector::CancelProtector(bool enable) : _oldstate(0) 00220 { 00221 if (enable) 00222 { 00223 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &_oldstate); 00224 } 00225 else 00226 { 00227 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &_oldstate); 00228 } 00229 } 00230 00231 Thread::CancelProtector::~CancelProtector() 00232 { 00233 pthread_setcancelstate(_oldstate, NULL); 00234 } 00235 00236 JoinableThread::JoinableThread(size_t size) 00237 : Thread(size, false) 00238 { 00239 } 00240 00241 JoinableThread::~JoinableThread() 00242 { 00243 #ifdef __DEVELOPMENT_ASSERTIONS__ 00244 // every thread should be joined, when the destructor is called. 00245 if ( (_state != THREAD_JOINED) && (_state != THREAD_ERROR) ) 00246 { 00247 std::cerr << "FAILURE: Thread not joined! Current state:" << _state.get() << std::endl; 00248 assert( (_state == THREAD_JOINED) || (_state == THREAD_ERROR) ); 00249 } 00250 #endif 00251 join(); 00252 } 00253 00254 void JoinableThread::start(int adj) throw (ThreadException) 00255 { 00256 int ret; 00257 00258 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00259 if (ls != THREAD_INITIALIZED) return; 00260 00261 // set the thread state to PREPARE 00262 ls = THREAD_PREPARE; 00263 00264 // set priority 00265 priority = adj; 00266 00267 #ifndef __PTH__ 00268 // modify the threads attributes - set as joinable thread 00269 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 00270 00271 // ignore scheduling policy and use the same as the parent 00272 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); 00273 #endif 00274 // we typically use "stack 1" for min stack... 00275 #ifdef PTHREAD_STACK_MIN 00276 // if the given stack size is too small... 00277 if(stack && stack < PTHREAD_STACK_MIN) 00278 { 00279 // set it to the min stack size 00280 stack = PTHREAD_STACK_MIN; 00281 } 00282 #else 00283 // if stack size if larger than zero and smaller than two... 00284 if (stack && stack < 2) 00285 { 00286 // set it to zero (we will not set the stack size) 00287 stack = 0; 00288 } 00289 #endif 00290 00291 #ifdef __PTH__ 00292 // spawn a new thread 00293 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this); 00294 #else 00295 // if the stack size is specified (> 0) 00296 if (stack) 00297 { 00298 // set the stack size attribute 00299 pthread_attr_setstacksize(&attr, stack); 00300 } 00301 00302 // spawn a new thread 00303 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this); 00304 00305 00306 switch (ret) 00307 { 00308 case EAGAIN: 00309 ls = THREAD_ERROR; 00310 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded."); 00311 case EINVAL: 00312 ls = THREAD_ERROR; 00313 throw ThreadException(ret, "The value specified by attr is invalid."); 00314 case EPERM: 00315 ls = THREAD_ERROR; 00316 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy."); 00317 } 00318 #endif 00319 } 00320 00321 void JoinableThread::stop() throw (ThreadException) 00322 { 00323 // Cancel throws the exception of the terminated thread 00324 // so we have to catch any possible exception here 00325 try { 00326 Thread::cancel(); 00327 } catch (const ThreadException&) { 00328 throw; 00329 } catch (const std::exception&) { 00330 } 00331 } 00332 00333 void JoinableThread::join(void) 00334 { 00335 // first to some state checking 00336 { 00337 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00338 00339 // if the thread never has been started: exit and deny any further startup 00340 if (ls == THREAD_INITIALIZED) 00341 { 00342 ls = THREAD_JOINED; 00343 return; 00344 } 00345 00346 // wait until the finalized state is reached 00347 ls.wait(THREAD_FINALIZED | THREAD_JOINED | THREAD_ERROR); 00348 00349 // do not join if an error occured 00350 if (ls == THREAD_ERROR) return; 00351 00352 // if the thread has been joined already: exit 00353 if (ls == THREAD_JOINED) return; 00354 00355 // get the thread-id of the calling thread 00356 pthread_t self = pthread_self(); 00357 00358 // if we try to join our own thread, just call Thread::exit() 00359 if (equal(tid, self)) 00360 { 00361 Thread::exit(); 00362 return; 00363 } 00364 } 00365 00366 // if the thread has been started, do join 00367 if (pthread_join(tid, NULL) == 0) 00368 { 00369 // set the state to joined 00370 _state = THREAD_JOINED; 00371 } 00372 else 00373 { 00374 IBRCOMMON_LOGGER(error) << "Join on a thread failed[" << tid << "]" << IBRCOMMON_LOGGER_ENDL; 00375 _state = THREAD_ERROR; 00376 } 00377 } 00378 00379 DetachedThread::DetachedThread(size_t size) : Thread(size, true) 00380 { 00381 } 00382 00383 DetachedThread::~DetachedThread() 00384 { 00385 } 00386 00387 void DetachedThread::start(int adj) throw (ThreadException) 00388 { 00389 int ret = 0; 00390 00391 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00392 if (ls != THREAD_INITIALIZED) return; 00393 00394 // set the thread state to PREPARE 00395 ls = THREAD_PREPARE; 00396 00397 // set the priority 00398 priority = adj; 00399 00400 #ifndef __PTH__ 00401 // modify the threads attributes - set as detached thread 00402 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 00403 00404 // ignore scheduling policy and use the same as the parent 00405 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); 00406 #endif 00407 // we typically use "stack 1" for min stack... 00408 #ifdef PTHREAD_STACK_MIN 00409 // if the given stack size is too small... 00410 if(stack && stack < PTHREAD_STACK_MIN) 00411 { 00412 // set it to the min stack size 00413 stack = PTHREAD_STACK_MIN; 00414 } 00415 #else 00416 // if stack size if larger than zero and smaller than two... 00417 if (stack && stack < 2) 00418 { 00419 // set it to zero (we will not set the stack size) 00420 stack = 0; 00421 } 00422 #endif 00423 00424 #ifdef __PTH__ 00425 // spawn a new thread 00426 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this); 00427 #else 00428 // if the stack size is specified (> 0) 00429 if (stack) 00430 { 00431 // set the stack size attribute 00432 pthread_attr_setstacksize(&attr, stack); 00433 } 00434 00435 // spawn a new thread 00436 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this); 00437 00438 // check for errors 00439 switch (ret) 00440 { 00441 case EAGAIN: 00442 ls = THREAD_ERROR; 00443 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded."); 00444 case EINVAL: 00445 ls = THREAD_ERROR; 00446 throw ThreadException(ret, "The value specified by attr is invalid."); 00447 case EPERM: 00448 ls = THREAD_ERROR; 00449 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy."); 00450 } 00451 #endif 00452 } 00453 00454 void DetachedThread::stop() throw (ThreadException) 00455 { 00456 // Cancel throws the exception of the terminated thread 00457 // so we have to catch any possible exception here 00458 try { 00459 Thread::cancel(); 00460 } catch (const ThreadException&) { 00461 throw; 00462 } catch (const std::exception&) { 00463 } 00464 } 00465 }