IBR-DTNSuite 0.6

ibrcommon/ibrcommon/thread/Thread.cpp

Go to the documentation of this file.
00001 /*
00002  * Thread.cpp
00003  *
00004  *  Created on: 30.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/thread/Thread.h"
00010 #include "ibrcommon/thread/MutexLock.h"
00011 #include "ibrcommon/Logger.h"
00012 #include <stdexcept>
00013 #include <pthread.h>
00014 #include <sys/times.h>
00015 #include <stdio.h>
00016 #include <unistd.h>
00017 #include <signal.h>
00018 
00019 #ifdef __DEVELOPMENT_ASSERTIONS__
00020 #include <cassert>
00021 #endif
00022 
00023 namespace ibrcommon
00024 {
00025         void Thread::finalize_thread(void *obj)
00026         {
00027 #ifdef __DEVELOPMENT_ASSERTIONS__
00028                 // the given object should never be null
00029                 assert(obj != NULL);
00030 #endif
00031                 // cast the thread object
00032                 Thread *th = static_cast<Thread *>(obj);
00033 
00034                 // set the state to finalizing, blocking all threads until this is done
00035                 th->_state = THREAD_FINALIZING;
00036 
00037                 // call the finally method
00038                 th->finally();
00039 
00040                 // set the state to DONE
00041                 th->_state = THREAD_FINALIZED;
00042 
00043                 // delete the thread-object is requested
00044                 if (th->__delete_on_exit)
00045                 {
00046                         delete th;
00047                 }
00048         }
00049 
00050         void* Thread::exec_thread(void *obj)
00051         {
00052 #ifdef __DEVELOPMENT_ASSERTIONS__
00053                 // the given object should never be null
00054                 assert(obj != NULL);
00055 #endif
00056                 // cast the thread object
00057                 Thread *th = static_cast<Thread *>(obj);
00058 
00059                 // add cleanup-handler to finalize threads
00060                 pthread_cleanup_push(Thread::finalize_thread, (void *)th);
00061 
00062                 // set the state to RUNNING
00063                 try {
00064                         // enter the setup stage, if this thread is still in preparation state
00065                         {
00066                                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = th->_state.lock();
00067                                 if (ls != THREAD_PREPARE)
00068                                 {
00069                                         throw ibrcommon::Exception("Thread has been canceled.");
00070                                 }
00071 
00072                                 ls = THREAD_SETUP;
00073                         }
00074 
00075                         // call the setup method
00076                         th->setup();
00077                 } catch (const std::exception &ex) {
00078                         // set the state to error
00079                         th->_state = THREAD_ERROR;
00080 
00081                         IBRCOMMON_LOGGER_DEBUG(40) << "Thread setup aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00082 
00083                         // exit this thread
00084                         Thread::exit();
00085                 }
00086 
00087                 try {
00088                         // set the state to running
00089                         th->_state = THREAD_RUNNING;
00090 
00091                         // run threads run routine
00092                         th->run();
00093                 } catch (const std::exception &ex) {
00094                         // set the state to error
00095                         th->_state = THREAD_ERROR;
00096 
00097                         // an exception occured in run() or ready(), but this is allowed
00098                         IBRCOMMON_LOGGER_DEBUG(40) << "Thread execution aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00099                         Thread::exit();
00100                 }
00101 
00102                 // remove cleanup-handler and call it
00103                 pthread_cleanup_pop(1);
00104 
00105                 // exit the thread
00106                 Thread::exit();
00107 
00108                 return NULL;
00109         }
00110 
00111         Thread::Thread(size_t size, bool delete_on_exit)
00112          : _state(THREAD_INITIALIZED, THREAD_JOINED), tid(0), stack(size), priority(0), __delete_on_exit(delete_on_exit)
00113         {
00114                 pthread_attr_init(&attr);
00115         }
00116 
00117         void Thread::yield(void)
00118         {
00119 #if defined(HAVE_PTHREAD_YIELD_NP)
00120                 pthread_yield_np();
00121 #elif defined(HAVE_PTHREAD_YIELD)
00122                 pthread_yield();
00123 #else
00124                 sched_yield();
00125 #endif
00126         }
00127 
00128         void Thread::cancellation_point()
00129         {
00130                 ::pthread_testcancel();
00131         }
00132 
00133         void Thread::concurrency(int level)
00134         {
00135         #if defined(HAVE_PTHREAD_SETCONCURRENCY)
00136                 pthread_setconcurrency(level);
00137         #endif
00138         }
00139 
00140         void Thread::sleep(size_t timeout)
00141         {
00142                 timespec ts;
00143                 ts.tv_sec = timeout / 1000l;
00144                 ts.tv_nsec = (timeout % 1000l) * 1000000l;
00145         #if defined(HAVE_PTHREAD_DELAY)
00146                 pthread_delay(&ts);
00147         #elif defined(HAVE_PTHREAD_DELAY_NP)
00148                 pthread_delay_np(&ts);
00149         #else
00150                 usleep(timeout * 1000);
00151         #endif
00152         }
00153 
00154         Thread::~Thread()
00155         {
00156                 pthread_attr_destroy(&attr);
00157         }
00158 
00159         void Thread::detach(void)
00160         {
00161                 pthread_detach(tid);
00162         }
00163 
00164         void Thread::exit(void)
00165         {
00166                 pthread_exit(NULL);
00167         }
00168 
00169         int Thread::kill(int sig)
00170         {
00171                 if (tid == 0) return -1;
00172                 return pthread_kill(tid, sig);
00173         }
00174 
00175         void Thread::cancel() throw (ThreadException)
00176         {
00177                 // block multiple cancel calls
00178                 {
00179                         ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00180 
00181                         // is this thread running?
00182                         if (ls == THREAD_INITIALIZED)
00183                         {
00184                                 // deny any futher startup of this thread
00185                                 ls == THREAD_JOINED;
00186                                 return;
00187                         }
00188 
00189                         if ( (ls == THREAD_PREPARE) || (ls == THREAD_SETUP) )
00190                         {
00191                                 // wait until RUNNING, ERROR, JOINED or FINALIZED is reached
00192                                 ls.wait(THREAD_RUNNING | THREAD_JOINED | THREAD_ERROR | THREAD_FINALIZED | THREAD_CANCELLED);
00193                         }
00194 
00195                         // exit if the thread is not running
00196                         if (ls != THREAD_RUNNING) return;
00197 
00198                         // set state to cancelled
00199                         ls = THREAD_CANCELLED;
00200                 }
00201 
00202                 // run custom cancellation
00203                 if (!__cancellation())
00204                 {
00205                         // check if the thread is still alive
00206                         if (Thread::kill(0) == 0)
00207                         {
00208                                 int ret = pthread_cancel(tid);
00209                                 if ( 0 != ret) throw ThreadException(ret, "pthread_cancel");
00210                         }
00211                 }
00212         }
00213 
00214         bool Thread::equal(pthread_t t1, pthread_t t2)
00215         {
00216                 return (pthread_equal(t1, t2) != 0);
00217         }
00218 
00219         Thread::CancelProtector::CancelProtector(bool enable) : _oldstate(0)
00220         {
00221                 if (enable)
00222                 {
00223                         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &_oldstate);
00224                 }
00225                 else
00226                 {
00227                         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &_oldstate);
00228                 }
00229         }
00230 
00231         Thread::CancelProtector::~CancelProtector()
00232         {
00233                 pthread_setcancelstate(_oldstate, NULL);
00234         }
00235 
00236         JoinableThread::JoinableThread(size_t size)
00237          : Thread(size, false)
00238         {
00239         }
00240 
00241         JoinableThread::~JoinableThread()
00242         {
00243 #ifdef __DEVELOPMENT_ASSERTIONS__
00244                 // every thread should be joined, when the destructor is called.
00245                 if ( (_state != THREAD_JOINED) && (_state != THREAD_ERROR) )
00246                 {
00247                         std::cerr << "FAILURE: Thread not joined! Current state:" << _state.get() << std::endl;
00248                         assert( (_state == THREAD_JOINED) || (_state == THREAD_ERROR) );
00249                 }
00250 #endif
00251                 join();
00252         }
00253 
00254         void JoinableThread::start(int adj) throw (ThreadException)
00255         {
00256                 int ret;
00257 
00258                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00259                 if (ls != THREAD_INITIALIZED) return;
00260 
00261                 // set the thread state to PREPARE
00262                 ls = THREAD_PREPARE;
00263 
00264                 // set priority
00265                 priority = adj;
00266 
00267 #ifndef __PTH__
00268                 // modify the threads attributes - set as joinable thread
00269                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
00270 
00271                 // ignore scheduling policy and use the same as the parent
00272                 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00273 #endif
00274                 // we typically use "stack 1" for min stack...
00275 #ifdef  PTHREAD_STACK_MIN
00276                 // if the given stack size is too small...
00277                 if(stack && stack < PTHREAD_STACK_MIN)
00278                 {
00279                         // set it to the min stack size
00280                         stack = PTHREAD_STACK_MIN;
00281                 }
00282 #else
00283                 // if stack size if larger than zero and smaller than two...
00284                 if (stack && stack < 2)
00285                 {
00286                         // set it to zero (we will not set the stack size)
00287                         stack = 0;
00288                 }
00289 #endif
00290 
00291 #ifdef  __PTH__
00292                 // spawn a new thread
00293                 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00294 #else
00295                 // if the stack size is specified (> 0)
00296                 if (stack)
00297                 {
00298                         // set the stack size attribute
00299                         pthread_attr_setstacksize(&attr, stack);
00300                 }
00301 
00302                 // spawn a new thread
00303                 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00304 
00305                 
00306                 switch (ret)
00307                 {
00308                 case EAGAIN:
00309                         ls = THREAD_ERROR;
00310                         throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00311                 case EINVAL:
00312                         ls = THREAD_ERROR;
00313                         throw ThreadException(ret, "The value specified by attr is invalid.");
00314                 case EPERM:
00315                         ls = THREAD_ERROR;
00316                         throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00317                 }
00318 #endif
00319         }
00320 
00321         void JoinableThread::stop() throw (ThreadException)
00322         {
00323                 // Cancel throws the exception of the terminated thread
00324                 // so we have to catch any possible exception here
00325                 try {
00326                         Thread::cancel();
00327                 } catch (const ThreadException&) {
00328                         throw;
00329                 } catch (const std::exception&) {
00330                 }
00331         }
00332 
00333         void JoinableThread::join(void)
00334         {
00335                 // first to some state checking
00336                 {
00337                         ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00338 
00339                         // if the thread never has been started: exit and deny any further startup
00340                         if (ls == THREAD_INITIALIZED)
00341                         {
00342                                 ls = THREAD_JOINED;
00343                                 return;
00344                         }
00345 
00346                         // wait until the finalized state is reached
00347                         ls.wait(THREAD_FINALIZED | THREAD_JOINED | THREAD_ERROR);
00348 
00349                         // do not join if an error occured
00350                         if (ls == THREAD_ERROR) return;
00351 
00352                         // if the thread has been joined already: exit
00353                         if (ls == THREAD_JOINED) return;
00354 
00355                         // get the thread-id of the calling thread
00356                         pthread_t self = pthread_self();
00357 
00358                         // if we try to join our own thread, just call Thread::exit()
00359                         if (equal(tid, self))
00360                         {
00361                                 Thread::exit();
00362                                 return;
00363                         }
00364                 }
00365 
00366                 // if the thread has been started, do join
00367                 if (pthread_join(tid, NULL) == 0)
00368                 {
00369                         // set the state to joined
00370                         _state = THREAD_JOINED;
00371                 }
00372                 else
00373                 {
00374                         IBRCOMMON_LOGGER(error) << "Join on a thread failed[" << tid << "]" << IBRCOMMON_LOGGER_ENDL;
00375                         _state = THREAD_ERROR;
00376                 }
00377         }
00378 
00379         DetachedThread::DetachedThread(size_t size) : Thread(size, true)
00380         {
00381         }
00382 
00383         DetachedThread::~DetachedThread()
00384         {
00385         }
00386 
00387         void DetachedThread::start(int adj) throw (ThreadException)
00388         {
00389                 int ret = 0;
00390 
00391                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00392                 if (ls != THREAD_INITIALIZED) return;
00393 
00394                 // set the thread state to PREPARE
00395                 ls = THREAD_PREPARE;
00396 
00397                 // set the priority
00398                 priority = adj;
00399 
00400 #ifndef __PTH__
00401                 // modify the threads attributes - set as detached thread
00402                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00403 
00404                 // ignore scheduling policy and use the same as the parent
00405                 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00406 #endif
00407                 // we typically use "stack 1" for min stack...
00408 #ifdef  PTHREAD_STACK_MIN
00409                 // if the given stack size is too small...
00410                 if(stack && stack < PTHREAD_STACK_MIN)
00411                 {
00412                         // set it to the min stack size
00413                         stack = PTHREAD_STACK_MIN;
00414                 }
00415 #else
00416                 // if stack size if larger than zero and smaller than two...
00417                 if (stack && stack < 2)
00418                 {
00419                         // set it to zero (we will not set the stack size)
00420                         stack = 0;
00421                 }
00422 #endif
00423 
00424 #ifdef  __PTH__
00425                 // spawn a new thread
00426                 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00427 #else
00428                 // if the stack size is specified (> 0)
00429                 if (stack)
00430                 {
00431                         // set the stack size attribute
00432                         pthread_attr_setstacksize(&attr, stack);
00433                 }
00434 
00435                 // spawn a new thread
00436                 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00437 
00438                 // check for errors
00439                 switch (ret)
00440                 {
00441                 case EAGAIN:
00442                         ls = THREAD_ERROR;
00443                         throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00444                 case EINVAL:
00445                         ls = THREAD_ERROR;
00446                         throw ThreadException(ret, "The value specified by attr is invalid.");
00447                 case EPERM:
00448                         ls = THREAD_ERROR;
00449                         throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00450                 }
00451 #endif
00452         }
00453 
00454         void DetachedThread::stop() throw (ThreadException)
00455         {
00456                 // Cancel throws the exception of the terminated thread
00457                 // so we have to catch any possible exception here
00458                 try {
00459                         Thread::cancel();
00460                 } catch (const ThreadException&) {
00461                         throw;
00462                 } catch (const std::exception&) {
00463                 }
00464         }
00465 }