00001
00002
00003
00004
00005
00006
00007
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/thread/Thread.h"
00010 #include "ibrcommon/thread/MutexLock.h"
00011 #include "ibrcommon/Logger.h"
00012 #include <stdexcept>
00013 #include <pthread.h>
00014 #include <sys/times.h>
00015 #include <stdio.h>
00016 #include <unistd.h>
00017 #include <signal.h>
00018
00019 #ifdef __DEVELOPMENT_ASSERTIONS__
00020 #include <cassert>
00021 #endif
00022
00023 namespace ibrcommon
00024 {
00025 void Thread::finalize_thread(void *obj)
00026 {
00027 #ifdef __DEVELOPMENT_ASSERTIONS__
00028
00029 assert(obj != NULL);
00030 #endif
00031
00032 Thread *th = static_cast<Thread *>(obj);
00033
00034
00035 {
00036 ibrcommon::MutexLock l(th->_state_lock);
00037
00038
00039 th->_state = THREAD_FINALIZED;
00040 th->_state_lock.signal(true);
00041 }
00042
00043
00044 th->finally();
00045
00046
00047 if (th->__delete_on_exit)
00048 {
00049 delete th;
00050 }
00051 }
00052
00053 void* Thread::exec_thread(void *obj)
00054 {
00055 #ifdef __DEVELOPMENT_ASSERTIONS__
00056
00057 assert(obj != NULL);
00058 #endif
00059
00060 Thread *th = static_cast<Thread *>(obj);
00061
00062
00063 try {
00064 ibrcommon::MutexLock l(th->_state_lock);
00065 if (th->_state == THREAD_CANCELLED)
00066 {
00067 throw ibrcommon::Exception("Thread has been canceled.");
00068 }
00069
00070
00071 th->setup();
00072
00073
00074 th->_state = THREAD_RUNNING;
00075 th->_state_lock.signal(true);
00076 } catch (const std::exception &ex) {
00077 IBRCOMMON_LOGGER_DEBUG(40) << "Thread setup aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00078 Thread::exit();
00079 }
00080
00081
00082 pthread_cleanup_push(Thread::finalize_thread, (void *)th);
00083
00084 try {
00085
00086 th->run();
00087 } catch (const std::exception &ex) {
00088
00089 IBRCOMMON_LOGGER_DEBUG(40) << "Thread execution aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00090 Thread::exit();
00091 }
00092
00093
00094 pthread_cleanup_pop(1);
00095
00096
00097 Thread::exit();
00098
00099 return NULL;
00100 }
00101
00102 Thread::Thread(size_t size, bool delete_on_exit)
00103 : _state(THREAD_INITIALIZED), tid(0), stack(size), priority(0), __delete_on_exit(delete_on_exit)
00104 {
00105 pthread_attr_init(&attr);
00106 }
00107
00108 void Thread::yield(void)
00109 {
00110 #if defined(HAVE_PTHREAD_YIELD_NP)
00111 pthread_yield_np();
00112 #elif defined(HAVE_PTHREAD_YIELD)
00113 pthread_yield();
00114 #else
00115 sched_yield();
00116 #endif
00117 }
00118
00119 void Thread::cancellation_point()
00120 {
00121 ::pthread_testcancel();
00122 }
00123
00124 void Thread::concurrency(int level)
00125 {
00126 #if defined(HAVE_PTHREAD_SETCONCURRENCY)
00127 pthread_setconcurrency(level);
00128 #endif
00129 }
00130
00131 void Thread::sleep(size_t timeout)
00132 {
00133 timespec ts;
00134 ts.tv_sec = timeout / 1000l;
00135 ts.tv_nsec = (timeout % 1000l) * 1000000l;
00136 #if defined(HAVE_PTHREAD_DELAY)
00137 pthread_delay(&ts);
00138 #elif defined(HAVE_PTHREAD_DELAY_NP)
00139 pthread_delay_np(&ts);
00140 #else
00141 usleep(timeout * 1000);
00142 #endif
00143 }
00144
00145 Thread::~Thread()
00146 {
00147 pthread_attr_destroy(&attr);
00148 }
00149
00150 void Thread::detach(void)
00151 {
00152 pthread_detach(tid);
00153 }
00154
00155 void Thread::exit(void)
00156 {
00157 pthread_exit(NULL);
00158 }
00159
00160 int Thread::kill(int sig)
00161 {
00162 if (tid == 0) return -1;
00163 return pthread_kill(tid, sig);
00164 }
00165
00166 void Thread::interrupt()
00167 {
00168 kill(SIGINT);
00169 }
00170
00171 void Thread::enable_interruption()
00172 {
00173 sigemptyset(&mask);
00174 sigaddset(&mask, SIGINT);
00175 pthread_sigmask(SIG_BLOCK, &mask, &orig_mask);
00176 }
00177
00178 void Thread::disable_interruption()
00179 {
00180 pthread_sigmask(SIG_SETMASK, &orig_mask, NULL);
00181 }
00182
00183 void Thread::cancel() throw (ThreadException)
00184 {
00185
00186 ibrcommon::MutexLock l(_state_lock);
00187 if ((_state != THREAD_PREPARE) && (_state != THREAD_RUNNING)) return;
00188 _state = THREAD_CANCELLED;
00189 _state_lock.signal(true);
00190
00191
00192 if (!__cancellation())
00193 {
00194
00195 if (Thread::kill(0) == 0)
00196 {
00197 int ret = pthread_cancel(tid);
00198 if ( 0 != ret) throw ThreadException(ret, "pthread_cancel");
00199 }
00200 }
00201 }
00202
00203 bool Thread::equal(pthread_t t1, pthread_t t2)
00204 {
00205 return (pthread_equal(t1, t2) != 0);
00206 }
00207
00208 Thread::CancelProtector::CancelProtector(bool enable) : _oldstate(0)
00209 {
00210 if (enable)
00211 {
00212 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &_oldstate);
00213 }
00214 else
00215 {
00216 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &_oldstate);
00217 }
00218 }
00219
00220 Thread::CancelProtector::~CancelProtector()
00221 {
00222 pthread_setcancelstate(_oldstate, NULL);
00223 }
00224
00225 JoinableThread::JoinableThread(size_t size)
00226 : Thread(size, false)
00227 {
00228 }
00229
00230 JoinableThread::~JoinableThread()
00231 {
00232 #ifdef __DEVELOPMENT_ASSERTIONS__
00233
00234 ibrcommon::Mutex l(_state_lock);
00235 assert(_state == THREAD_JOINED);
00236 #endif
00237 join();
00238 }
00239
00240 void JoinableThread::start(int adj) throw (ThreadException)
00241 {
00242 int ret;
00243
00244 ibrcommon::MutexLock l(_state_lock);
00245 if (_state != THREAD_INITIALIZED) return;
00246
00247
00248 _state = THREAD_PREPARE;
00249 _state_lock.signal(true);
00250
00251
00252 priority = adj;
00253
00254 #ifndef __PTH__
00255
00256 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
00257
00258
00259 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00260 #endif
00261
00262 #ifdef PTHREAD_STACK_MIN
00263
00264 if(stack && stack < PTHREAD_STACK_MIN)
00265 {
00266
00267 stack = PTHREAD_STACK_MIN;
00268 }
00269 #else
00270
00271 if (stack && stack < 2)
00272 {
00273
00274 stack = 0;
00275 }
00276 #endif
00277
00278 #ifdef __PTH__
00279
00280 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00281 #else
00282
00283 if (stack)
00284 {
00285
00286 pthread_attr_setstacksize(&attr, stack);
00287 }
00288
00289
00290 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00291
00292
00293 switch (ret)
00294 {
00295 case EAGAIN:
00296 _state = THREAD_ERROR;
00297 _state_lock.signal(true);
00298 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00299 case EINVAL:
00300 _state = THREAD_ERROR;
00301 _state_lock.signal(true);
00302 throw ThreadException(ret, "The value specified by attr is invalid.");
00303 case EPERM:
00304 _state = THREAD_ERROR;
00305 _state_lock.signal(true);
00306 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00307 }
00308 #endif
00309 }
00310
00311 void JoinableThread::stop() throw (ThreadException)
00312 {
00313
00314
00315 try {
00316 Thread::cancel();
00317 } catch (const ThreadException&) {
00318 throw;
00319 } catch (const std::exception&) {
00320 }
00321 }
00322
00323 void JoinableThread::join(void)
00324 {
00325
00326 {
00327 ibrcommon::MutexLock l(_state_lock);
00328
00329
00330 if (_state == THREAD_INITIALIZED)
00331 {
00332 _state = THREAD_JOINED;
00333 _state_lock.signal(true);
00334 return;
00335 }
00336
00337
00338 if (_state == THREAD_JOINED)
00339 {
00340 return;
00341 }
00342
00343
00344 while (_state != THREAD_FINALIZED)
00345 {
00346 _state_lock.wait();
00347 }
00348
00349
00350 pthread_t self = pthread_self();
00351
00352
00353 if (equal(tid, self))
00354 {
00355 _state = THREAD_JOINED;
00356 _state_lock.signal(true);
00357 Thread::exit();
00358 return;
00359 }
00360 }
00361
00362
00363 try {
00364 if (!pthread_join(tid, NULL))
00365 {
00366 ibrcommon::MutexLock l(_state_lock);
00367 _state = THREAD_JOINED;
00368 _state_lock.signal(true);
00369 }
00370 } catch (const std::exception &ex) {
00371 IBRCOMMON_LOGGER(error) << "Exception on thread[" << tid << "]::join(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00372 }
00373 }
00374
00375 DetachedThread::DetachedThread(size_t size) : Thread(size, true)
00376 {
00377 }
00378
00379 DetachedThread::~DetachedThread()
00380 {
00381 }
00382
00383 void DetachedThread::start(int adj) throw (ThreadException)
00384 {
00385 int ret = 0;
00386
00387 ibrcommon::MutexLock l(_state_lock);
00388 if (_state != THREAD_INITIALIZED) return;
00389
00390
00391 _state = THREAD_PREPARE;
00392 _state_lock.signal(true);
00393
00394
00395 priority = adj;
00396
00397 #ifndef __PTH__
00398
00399 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00400
00401
00402 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00403 #endif
00404
00405 #ifdef PTHREAD_STACK_MIN
00406
00407 if(stack && stack < PTHREAD_STACK_MIN)
00408 {
00409
00410 stack = PTHREAD_STACK_MIN;
00411 }
00412 #else
00413
00414 if (stack && stack < 2)
00415 {
00416
00417 stack = 0;
00418 }
00419 #endif
00420
00421 #ifdef __PTH__
00422
00423 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00424 #else
00425
00426 if (stack)
00427 {
00428
00429 pthread_attr_setstacksize(&attr, stack);
00430 }
00431
00432
00433 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00434
00435
00436 switch (ret)
00437 {
00438 case EAGAIN:
00439 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00440 case EINVAL:
00441 throw ThreadException(ret, "The value specified by attr is invalid.");
00442 case EPERM:
00443 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00444 }
00445 #endif
00446 }
00447
00448 void DetachedThread::stop() throw (ThreadException)
00449 {
00450
00451
00452 try {
00453 Thread::cancel();
00454 } catch (const ThreadException&) {
00455 throw;
00456 } catch (const std::exception&) {
00457 }
00458 }
00459 }