IBR-DTNSuite 0.6

ibrcommon/ibrcommon/thread/Thread.cpp

Go to the documentation of this file.
00001 /*
00002  * Thread.cpp
00003  *
00004  *  Created on: 30.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/thread/Thread.h"
00010 #include "ibrcommon/thread/MutexLock.h"
00011 #include "ibrcommon/Logger.h"
00012 #include <stdexcept>
00013 #include <pthread.h>
00014 #include <sys/times.h>
00015 #include <stdio.h>
00016 #include <unistd.h>
00017 #include <signal.h>
00018 
00019 #ifdef __DEVELOPMENT_ASSERTIONS__
00020 #include <cassert>
00021 #endif
00022 
00023 namespace ibrcommon
00024 {
00025         void Thread::finalize_thread(void *obj)
00026         {
00027 #ifdef __DEVELOPMENT_ASSERTIONS__
00028                 // the given object should never be null
00029                 assert(obj != NULL);
00030 #endif
00031                 // cast the thread object
00032                 Thread *th = static_cast<Thread *>(obj);
00033 
00034                 // set the state to finalizing, blocking all threads until this is done
00035                 th->_state = THREAD_FINALIZING;
00036 
00037                 // call the finally method
00038                 th->finally();
00039 
00040                 // set the state to DONE
00041                 th->_state = THREAD_FINALIZED;
00042 
00043                 // delete the thread-object is requested
00044                 if (th->__delete_on_exit)
00045                 {
00046                         delete th;
00047                 }
00048         }
00049 
00050         void* Thread::exec_thread(void *obj)
00051         {
00052 #ifdef __DEVELOPMENT_ASSERTIONS__
00053                 // the given object should never be null
00054                 assert(obj != NULL);
00055 #endif
00056                 // cast the thread object
00057                 Thread *th = static_cast<Thread *>(obj);
00058 
00059                 // add cleanup-handler to finalize threads
00060                 pthread_cleanup_push(Thread::finalize_thread, (void *)th);
00061 
00062                 // set the state to RUNNING
00063                 try {
00064                         // enter the setup stage, if this thread is still in preparation state
00065                         {
00066                                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = th->_state.lock();
00067                                 if (ls != THREAD_PREPARE)
00068                                 {
00069                                         throw ibrcommon::Exception("Thread has been canceled.");
00070                                 }
00071 
00072                                 ls = THREAD_SETUP;
00073                         }
00074 
00075                         // call the setup method
00076                         th->setup();
00077                 } catch (const std::exception &ex) {
00078                         IBRCOMMON_LOGGER_DEBUG(40) << "Thread setup aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00079 
00080                         // exit this thread
00081                         Thread::exit();
00082                 }
00083 
00084                 try {
00085                         // set the state to running
00086                         th->_state = THREAD_RUNNING;
00087 
00088                         // run threads run routine
00089                         th->run();
00090                 } catch (const std::exception &ex) {
00091                         // an exception occured in run() or ready(), but this is allowed
00092                         IBRCOMMON_LOGGER_DEBUG(40) << "Thread execution aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00093                         Thread::exit();
00094                 }
00095 
00096                 // remove cleanup-handler and call it
00097                 pthread_cleanup_pop(1);
00098 
00099                 // exit the thread
00100                 Thread::exit();
00101 
00102                 return NULL;
00103         }
00104 
00105         Thread::Thread(size_t size, bool delete_on_exit)
00106          : _state(THREAD_INITIALIZED, THREAD_JOINED), tid(0), stack(size), priority(0), __delete_on_exit(delete_on_exit)
00107         {
00108                 pthread_attr_init(&attr);
00109         }
00110 
00111         void Thread::yield(void)
00112         {
00113 #if defined(HAVE_PTHREAD_YIELD_NP)
00114                 pthread_yield_np();
00115 #elif defined(HAVE_PTHREAD_YIELD)
00116                 pthread_yield();
00117 #else
00118                 sched_yield();
00119 #endif
00120         }
00121 
00122         void Thread::cancellation_point()
00123         {
00124                 ::pthread_testcancel();
00125         }
00126 
00127         void Thread::concurrency(int level)
00128         {
00129         #if defined(HAVE_PTHREAD_SETCONCURRENCY)
00130                 pthread_setconcurrency(level);
00131         #endif
00132         }
00133 
00134         void Thread::sleep(size_t timeout)
00135         {
00136                 timespec ts;
00137                 ts.tv_sec = timeout / 1000l;
00138                 ts.tv_nsec = (timeout % 1000l) * 1000000l;
00139         #if defined(HAVE_PTHREAD_DELAY)
00140                 pthread_delay(&ts);
00141         #elif defined(HAVE_PTHREAD_DELAY_NP)
00142                 pthread_delay_np(&ts);
00143         #else
00144                 usleep(timeout * 1000);
00145         #endif
00146         }
00147 
00148         Thread::~Thread()
00149         {
00150                 pthread_attr_destroy(&attr);
00151         }
00152 
00153         void Thread::detach(void)
00154         {
00155                 pthread_detach(tid);
00156         }
00157 
00158         void Thread::exit(void)
00159         {
00160                 pthread_exit(NULL);
00161         }
00162 
00163         int Thread::kill(int sig)
00164         {
00165                 if (tid == 0) return -1;
00166                 return pthread_kill(tid, sig);
00167         }
00168 
00169         void Thread::cancel() throw (ThreadException)
00170         {
00171                 // block multiple cancel calls
00172                 {
00173                         ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00174 
00175                         // is this thread running?
00176                         if (ls == THREAD_INITIALIZED)
00177                         {
00178                                 // deny any futher startup of this thread
00179                                 ls == THREAD_JOINED;
00180                                 return;
00181                         }
00182 
00183                         if ( (ls == THREAD_PREPARE) || (ls == THREAD_SETUP) )
00184                         {
00185                                 // wait until RUNNING, ERROR, JOINED or FINALIZED is reached
00186                                 ls.wait(THREAD_RUNNING | THREAD_JOINED | THREAD_ERROR | THREAD_FINALIZED | THREAD_CANCELLED);
00187                         }
00188 
00189                         // exit if the thread is not running
00190                         if (ls != THREAD_RUNNING) return;
00191 
00192                         // set state to cancelled
00193                         ls = THREAD_CANCELLED;
00194                 }
00195 
00196                 // run custom cancellation
00197                 if (!__cancellation())
00198                 {
00199                         // check if the thread is still alive
00200                         if (Thread::kill(0) == 0)
00201                         {
00202                                 int ret = pthread_cancel(tid);
00203                                 if ( 0 != ret) throw ThreadException(ret, "pthread_cancel");
00204                         }
00205                 }
00206         }
00207 
00208         bool Thread::equal(pthread_t t1, pthread_t t2)
00209         {
00210                 return (pthread_equal(t1, t2) != 0);
00211         }
00212 
00213         Thread::CancelProtector::CancelProtector(bool enable) : _oldstate(0)
00214         {
00215                 if (enable)
00216                 {
00217                         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &_oldstate);
00218                 }
00219                 else
00220                 {
00221                         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &_oldstate);
00222                 }
00223         }
00224 
00225         Thread::CancelProtector::~CancelProtector()
00226         {
00227                 pthread_setcancelstate(_oldstate, NULL);
00228         }
00229 
00230         JoinableThread::JoinableThread(size_t size)
00231          : Thread(size, false)
00232         {
00233         }
00234 
00235         JoinableThread::~JoinableThread()
00236         {
00237 #ifdef __DEVELOPMENT_ASSERTIONS__
00238                 // every thread should be joined, when the destructor is called.
00239                 if ( (_state != THREAD_JOINED) && (_state != THREAD_ERROR) )
00240                 {
00241                         std::cerr << "FAILURE: Thread not joined! Current state:" << _state.get() << std::endl;
00242                         assert( (_state == THREAD_JOINED) || (_state == THREAD_ERROR) );
00243                 }
00244 #endif
00245                 join();
00246         }
00247 
00248         void JoinableThread::start(int adj) throw (ThreadException)
00249         {
00250                 int ret;
00251 
00252                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00253                 if (ls != THREAD_INITIALIZED) return;
00254 
00255                 // set the thread state to PREPARE
00256                 ls = THREAD_PREPARE;
00257 
00258                 // set priority
00259                 priority = adj;
00260 
00261 #ifndef __PTH__
00262                 // modify the threads attributes - set as joinable thread
00263                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
00264 
00265                 // ignore scheduling policy and use the same as the parent
00266                 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00267 #endif
00268                 // we typically use "stack 1" for min stack...
00269 #ifdef  PTHREAD_STACK_MIN
00270                 // if the given stack size is too small...
00271                 if(stack && stack < PTHREAD_STACK_MIN)
00272                 {
00273                         // set it to the min stack size
00274                         stack = PTHREAD_STACK_MIN;
00275                 }
00276 #else
00277                 // if stack size if larger than zero and smaller than two...
00278                 if (stack && stack < 2)
00279                 {
00280                         // set it to zero (we will not set the stack size)
00281                         stack = 0;
00282                 }
00283 #endif
00284 
00285 #ifdef  __PTH__
00286                 // spawn a new thread
00287                 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00288 #else
00289                 // if the stack size is specified (> 0)
00290                 if (stack)
00291                 {
00292                         // set the stack size attribute
00293                         pthread_attr_setstacksize(&attr, stack);
00294                 }
00295 
00296                 // spawn a new thread
00297                 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00298 
00299                 
00300                 switch (ret)
00301                 {
00302                 case EAGAIN:
00303                         ls = THREAD_ERROR;
00304                         throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00305                 case EINVAL:
00306                         ls = THREAD_ERROR;
00307                         throw ThreadException(ret, "The value specified by attr is invalid.");
00308                 case EPERM:
00309                         ls = THREAD_ERROR;
00310                         throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00311                 }
00312 #endif
00313         }
00314 
00315         void JoinableThread::stop() throw (ThreadException)
00316         {
00317                 // Cancel throws the exception of the terminated thread
00318                 // so we have to catch any possible exception here
00319                 try {
00320                         Thread::cancel();
00321                 } catch (const ThreadException&) {
00322                         throw;
00323                 } catch (const std::exception&) {
00324                 }
00325         }
00326 
00327         void JoinableThread::join(void)
00328         {
00329                 // first to some state checking
00330                 {
00331                         ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00332 
00333                         // if the thread never has been started: exit and deny any further startup
00334                         if (ls == THREAD_INITIALIZED)
00335                         {
00336                                 ls = THREAD_JOINED;
00337                                 return;
00338                         }
00339 
00340                         // wait until the finalized state is reached
00341                         ls.wait(THREAD_FINALIZED | THREAD_JOINED | THREAD_ERROR);
00342 
00343                         // do not join if an error occured
00344                         if (ls == THREAD_ERROR) return;
00345 
00346                         // if the thread has been joined already: exit
00347                         if (ls == THREAD_JOINED) return;
00348 
00349                         // get the thread-id of the calling thread
00350                         pthread_t self = pthread_self();
00351 
00352                         // if we try to join our own thread, just call Thread::exit()
00353                         if (equal(tid, self))
00354                         {
00355                                 Thread::exit();
00356                                 return;
00357                         }
00358                 }
00359 
00360                 // if the thread has been started, do join
00361                 if (pthread_join(tid, NULL) == 0)
00362                 {
00363                         // set the state to joined
00364                         _state = THREAD_JOINED;
00365                 }
00366                 else
00367                 {
00368                         IBRCOMMON_LOGGER(error) << "Join on a thread failed[" << tid << "]" << IBRCOMMON_LOGGER_ENDL;
00369                         _state = THREAD_ERROR;
00370                 }
00371         }
00372 
00373         DetachedThread::DetachedThread(size_t size) : Thread(size, true)
00374         {
00375         }
00376 
00377         DetachedThread::~DetachedThread()
00378         {
00379         }
00380 
00381         void DetachedThread::start(int adj) throw (ThreadException)
00382         {
00383                 int ret = 0;
00384 
00385                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00386                 if (ls != THREAD_INITIALIZED) return;
00387 
00388                 // set the thread state to PREPARE
00389                 ls = THREAD_PREPARE;
00390 
00391                 // set the priority
00392                 priority = adj;
00393 
00394 #ifndef __PTH__
00395                 // modify the threads attributes - set as detached thread
00396                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00397 
00398                 // ignore scheduling policy and use the same as the parent
00399                 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00400 #endif
00401                 // we typically use "stack 1" for min stack...
00402 #ifdef  PTHREAD_STACK_MIN
00403                 // if the given stack size is too small...
00404                 if(stack && stack < PTHREAD_STACK_MIN)
00405                 {
00406                         // set it to the min stack size
00407                         stack = PTHREAD_STACK_MIN;
00408                 }
00409 #else
00410                 // if stack size if larger than zero and smaller than two...
00411                 if (stack && stack < 2)
00412                 {
00413                         // set it to zero (we will not set the stack size)
00414                         stack = 0;
00415                 }
00416 #endif
00417 
00418 #ifdef  __PTH__
00419                 // spawn a new thread
00420                 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00421 #else
00422                 // if the stack size is specified (> 0)
00423                 if (stack)
00424                 {
00425                         // set the stack size attribute
00426                         pthread_attr_setstacksize(&attr, stack);
00427                 }
00428 
00429                 // spawn a new thread
00430                 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00431 
00432                 // check for errors
00433                 switch (ret)
00434                 {
00435                 case EAGAIN:
00436                         ls = THREAD_ERROR;
00437                         throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00438                 case EINVAL:
00439                         ls = THREAD_ERROR;
00440                         throw ThreadException(ret, "The value specified by attr is invalid.");
00441                 case EPERM:
00442                         ls = THREAD_ERROR;
00443                         throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00444                 }
00445 #endif
00446         }
00447 
00448         void DetachedThread::stop() throw (ThreadException)
00449         {
00450                 // Cancel throws the exception of the terminated thread
00451                 // so we have to catch any possible exception here
00452                 try {
00453                         Thread::cancel();
00454                 } catch (const ThreadException&) {
00455                         throw;
00456                 } catch (const std::exception&) {
00457                 }
00458         }
00459 }