|
IBR-DTNSuite 0.6
|
00001 /* 00002 * Thread.cpp 00003 * 00004 * Created on: 30.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrcommon/config.h" 00009 #include "ibrcommon/thread/Thread.h" 00010 #include "ibrcommon/thread/MutexLock.h" 00011 #include "ibrcommon/Logger.h" 00012 #include <stdexcept> 00013 #include <pthread.h> 00014 #include <sys/times.h> 00015 #include <stdio.h> 00016 #include <unistd.h> 00017 #include <signal.h> 00018 00019 #ifdef __DEVELOPMENT_ASSERTIONS__ 00020 #include <cassert> 00021 #endif 00022 00023 namespace ibrcommon 00024 { 00025 void Thread::finalize_thread(void *obj) 00026 { 00027 #ifdef __DEVELOPMENT_ASSERTIONS__ 00028 // the given object should never be null 00029 assert(obj != NULL); 00030 #endif 00031 // cast the thread object 00032 Thread *th = static_cast<Thread *>(obj); 00033 00034 // set the state to finalizing, blocking all threads until this is done 00035 th->_state = THREAD_FINALIZING; 00036 00037 // call the finally method 00038 th->finally(); 00039 00040 // set the state to DONE 00041 th->_state = THREAD_FINALIZED; 00042 00043 // delete the thread-object is requested 00044 if (th->__delete_on_exit) 00045 { 00046 delete th; 00047 } 00048 } 00049 00050 void* Thread::exec_thread(void *obj) 00051 { 00052 #ifdef __DEVELOPMENT_ASSERTIONS__ 00053 // the given object should never be null 00054 assert(obj != NULL); 00055 #endif 00056 // cast the thread object 00057 Thread *th = static_cast<Thread *>(obj); 00058 00059 // add cleanup-handler to finalize threads 00060 pthread_cleanup_push(Thread::finalize_thread, (void *)th); 00061 00062 // set the state to RUNNING 00063 try { 00064 // enter the setup stage, if this thread is still in preparation state 00065 { 00066 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = th->_state.lock(); 00067 if (ls != THREAD_PREPARE) 00068 { 00069 throw ibrcommon::Exception("Thread has been canceled."); 00070 } 00071 00072 ls = THREAD_SETUP; 00073 } 00074 00075 // call the setup method 00076 th->setup(); 00077 } catch (const std::exception &ex) { 00078 IBRCOMMON_LOGGER_DEBUG(40) << "Thread setup aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00079 00080 // exit this thread 00081 Thread::exit(); 00082 } 00083 00084 try { 00085 // set the state to running 00086 th->_state = THREAD_RUNNING; 00087 00088 // run threads run routine 00089 th->run(); 00090 } catch (const std::exception &ex) { 00091 // an exception occured in run() or ready(), but this is allowed 00092 IBRCOMMON_LOGGER_DEBUG(40) << "Thread execution aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00093 Thread::exit(); 00094 } 00095 00096 // remove cleanup-handler and call it 00097 pthread_cleanup_pop(1); 00098 00099 // exit the thread 00100 Thread::exit(); 00101 00102 return NULL; 00103 } 00104 00105 Thread::Thread(size_t size, bool delete_on_exit) 00106 : _state(THREAD_INITIALIZED, THREAD_JOINED), tid(0), stack(size), priority(0), __delete_on_exit(delete_on_exit) 00107 { 00108 pthread_attr_init(&attr); 00109 } 00110 00111 void Thread::yield(void) 00112 { 00113 #if defined(HAVE_PTHREAD_YIELD_NP) 00114 pthread_yield_np(); 00115 #elif defined(HAVE_PTHREAD_YIELD) 00116 pthread_yield(); 00117 #else 00118 sched_yield(); 00119 #endif 00120 } 00121 00122 void Thread::cancellation_point() 00123 { 00124 ::pthread_testcancel(); 00125 } 00126 00127 void Thread::concurrency(int level) 00128 { 00129 #if defined(HAVE_PTHREAD_SETCONCURRENCY) 00130 pthread_setconcurrency(level); 00131 #endif 00132 } 00133 00134 void Thread::sleep(size_t timeout) 00135 { 00136 timespec ts; 00137 ts.tv_sec = timeout / 1000l; 00138 ts.tv_nsec = (timeout % 1000l) * 1000000l; 00139 #if defined(HAVE_PTHREAD_DELAY) 00140 pthread_delay(&ts); 00141 #elif defined(HAVE_PTHREAD_DELAY_NP) 00142 pthread_delay_np(&ts); 00143 #else 00144 usleep(timeout * 1000); 00145 #endif 00146 } 00147 00148 Thread::~Thread() 00149 { 00150 pthread_attr_destroy(&attr); 00151 } 00152 00153 void Thread::detach(void) 00154 { 00155 pthread_detach(tid); 00156 } 00157 00158 void Thread::exit(void) 00159 { 00160 pthread_exit(NULL); 00161 } 00162 00163 int Thread::kill(int sig) 00164 { 00165 if (tid == 0) return -1; 00166 return pthread_kill(tid, sig); 00167 } 00168 00169 void Thread::cancel() throw (ThreadException) 00170 { 00171 // block multiple cancel calls 00172 { 00173 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00174 00175 // is this thread running? 00176 if (ls == THREAD_INITIALIZED) 00177 { 00178 // deny any futher startup of this thread 00179 ls == THREAD_JOINED; 00180 return; 00181 } 00182 00183 if ( (ls == THREAD_PREPARE) || (ls == THREAD_SETUP) ) 00184 { 00185 // wait until RUNNING, ERROR, JOINED or FINALIZED is reached 00186 ls.wait(THREAD_RUNNING | THREAD_JOINED | THREAD_ERROR | THREAD_FINALIZED | THREAD_CANCELLED); 00187 } 00188 00189 // exit if the thread is not running 00190 if (ls != THREAD_RUNNING) return; 00191 00192 // set state to cancelled 00193 ls = THREAD_CANCELLED; 00194 } 00195 00196 // run custom cancellation 00197 if (!__cancellation()) 00198 { 00199 // check if the thread is still alive 00200 if (Thread::kill(0) == 0) 00201 { 00202 int ret = pthread_cancel(tid); 00203 if ( 0 != ret) throw ThreadException(ret, "pthread_cancel"); 00204 } 00205 } 00206 } 00207 00208 bool Thread::equal(pthread_t t1, pthread_t t2) 00209 { 00210 return (pthread_equal(t1, t2) != 0); 00211 } 00212 00213 Thread::CancelProtector::CancelProtector(bool enable) : _oldstate(0) 00214 { 00215 if (enable) 00216 { 00217 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &_oldstate); 00218 } 00219 else 00220 { 00221 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &_oldstate); 00222 } 00223 } 00224 00225 Thread::CancelProtector::~CancelProtector() 00226 { 00227 pthread_setcancelstate(_oldstate, NULL); 00228 } 00229 00230 JoinableThread::JoinableThread(size_t size) 00231 : Thread(size, false) 00232 { 00233 } 00234 00235 JoinableThread::~JoinableThread() 00236 { 00237 #ifdef __DEVELOPMENT_ASSERTIONS__ 00238 // every thread should be joined, when the destructor is called. 00239 if ( (_state != THREAD_JOINED) && (_state != THREAD_ERROR) ) 00240 { 00241 std::cerr << "FAILURE: Thread not joined! Current state:" << _state.get() << std::endl; 00242 assert( (_state == THREAD_JOINED) || (_state == THREAD_ERROR) ); 00243 } 00244 #endif 00245 join(); 00246 } 00247 00248 void JoinableThread::start(int adj) throw (ThreadException) 00249 { 00250 int ret; 00251 00252 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00253 if (ls != THREAD_INITIALIZED) return; 00254 00255 // set the thread state to PREPARE 00256 ls = THREAD_PREPARE; 00257 00258 // set priority 00259 priority = adj; 00260 00261 #ifndef __PTH__ 00262 // modify the threads attributes - set as joinable thread 00263 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 00264 00265 // ignore scheduling policy and use the same as the parent 00266 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); 00267 #endif 00268 // we typically use "stack 1" for min stack... 00269 #ifdef PTHREAD_STACK_MIN 00270 // if the given stack size is too small... 00271 if(stack && stack < PTHREAD_STACK_MIN) 00272 { 00273 // set it to the min stack size 00274 stack = PTHREAD_STACK_MIN; 00275 } 00276 #else 00277 // if stack size if larger than zero and smaller than two... 00278 if (stack && stack < 2) 00279 { 00280 // set it to zero (we will not set the stack size) 00281 stack = 0; 00282 } 00283 #endif 00284 00285 #ifdef __PTH__ 00286 // spawn a new thread 00287 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this); 00288 #else 00289 // if the stack size is specified (> 0) 00290 if (stack) 00291 { 00292 // set the stack size attribute 00293 pthread_attr_setstacksize(&attr, stack); 00294 } 00295 00296 // spawn a new thread 00297 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this); 00298 00299 00300 switch (ret) 00301 { 00302 case EAGAIN: 00303 ls = THREAD_ERROR; 00304 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded."); 00305 case EINVAL: 00306 ls = THREAD_ERROR; 00307 throw ThreadException(ret, "The value specified by attr is invalid."); 00308 case EPERM: 00309 ls = THREAD_ERROR; 00310 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy."); 00311 } 00312 #endif 00313 } 00314 00315 void JoinableThread::stop() throw (ThreadException) 00316 { 00317 // Cancel throws the exception of the terminated thread 00318 // so we have to catch any possible exception here 00319 try { 00320 Thread::cancel(); 00321 } catch (const ThreadException&) { 00322 throw; 00323 } catch (const std::exception&) { 00324 } 00325 } 00326 00327 void JoinableThread::join(void) 00328 { 00329 // first to some state checking 00330 { 00331 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00332 00333 // if the thread never has been started: exit and deny any further startup 00334 if (ls == THREAD_INITIALIZED) 00335 { 00336 ls = THREAD_JOINED; 00337 return; 00338 } 00339 00340 // wait until the finalized state is reached 00341 ls.wait(THREAD_FINALIZED | THREAD_JOINED | THREAD_ERROR); 00342 00343 // do not join if an error occured 00344 if (ls == THREAD_ERROR) return; 00345 00346 // if the thread has been joined already: exit 00347 if (ls == THREAD_JOINED) return; 00348 00349 // get the thread-id of the calling thread 00350 pthread_t self = pthread_self(); 00351 00352 // if we try to join our own thread, just call Thread::exit() 00353 if (equal(tid, self)) 00354 { 00355 Thread::exit(); 00356 return; 00357 } 00358 } 00359 00360 // if the thread has been started, do join 00361 if (pthread_join(tid, NULL) == 0) 00362 { 00363 // set the state to joined 00364 _state = THREAD_JOINED; 00365 } 00366 else 00367 { 00368 IBRCOMMON_LOGGER(error) << "Join on a thread failed[" << tid << "]" << IBRCOMMON_LOGGER_ENDL; 00369 _state = THREAD_ERROR; 00370 } 00371 } 00372 00373 DetachedThread::DetachedThread(size_t size) : Thread(size, true) 00374 { 00375 } 00376 00377 DetachedThread::~DetachedThread() 00378 { 00379 } 00380 00381 void DetachedThread::start(int adj) throw (ThreadException) 00382 { 00383 int ret = 0; 00384 00385 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00386 if (ls != THREAD_INITIALIZED) return; 00387 00388 // set the thread state to PREPARE 00389 ls = THREAD_PREPARE; 00390 00391 // set the priority 00392 priority = adj; 00393 00394 #ifndef __PTH__ 00395 // modify the threads attributes - set as detached thread 00396 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 00397 00398 // ignore scheduling policy and use the same as the parent 00399 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); 00400 #endif 00401 // we typically use "stack 1" for min stack... 00402 #ifdef PTHREAD_STACK_MIN 00403 // if the given stack size is too small... 00404 if(stack && stack < PTHREAD_STACK_MIN) 00405 { 00406 // set it to the min stack size 00407 stack = PTHREAD_STACK_MIN; 00408 } 00409 #else 00410 // if stack size if larger than zero and smaller than two... 00411 if (stack && stack < 2) 00412 { 00413 // set it to zero (we will not set the stack size) 00414 stack = 0; 00415 } 00416 #endif 00417 00418 #ifdef __PTH__ 00419 // spawn a new thread 00420 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this); 00421 #else 00422 // if the stack size is specified (> 0) 00423 if (stack) 00424 { 00425 // set the stack size attribute 00426 pthread_attr_setstacksize(&attr, stack); 00427 } 00428 00429 // spawn a new thread 00430 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this); 00431 00432 // check for errors 00433 switch (ret) 00434 { 00435 case EAGAIN: 00436 ls = THREAD_ERROR; 00437 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded."); 00438 case EINVAL: 00439 ls = THREAD_ERROR; 00440 throw ThreadException(ret, "The value specified by attr is invalid."); 00441 case EPERM: 00442 ls = THREAD_ERROR; 00443 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy."); 00444 } 00445 #endif 00446 } 00447 00448 void DetachedThread::stop() throw (ThreadException) 00449 { 00450 // Cancel throws the exception of the terminated thread 00451 // so we have to catch any possible exception here 00452 try { 00453 Thread::cancel(); 00454 } catch (const ThreadException&) { 00455 throw; 00456 } catch (const std::exception&) { 00457 } 00458 } 00459 }