Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/thread/Thread.h"
00010 #include "ibrcommon/thread/MutexLock.h"
00011 #include "ibrcommon/Logger.h"
00012 #include <stdexcept>
00013 #include <pthread.h>
00014 #include <sys/times.h>
00015 #include <stdio.h>
00016 #include <unistd.h>
00017 #include <signal.h>
00018 #include <cassert>
00019
00020
00021 namespace ibrcommon
00022 {
00023 ThreadFinalizer::ThreadFinalizer(Thread *t) : _thread(t) { };
00024 ThreadFinalizer::~ThreadFinalizer()
00025 {
00026
00027 _thread->finally();
00028
00029
00030 if (_thread->__delete_on_exit)
00031 {
00032 delete _thread;
00033 }
00034 };
00035
00036 void* Thread::exec_thread(void *obj)
00037 {
00038 assert(obj != NULL);
00039
00040 Thread *th = static_cast<Thread *>(obj);
00041
00042
00043 try {
00044
00045 ThreadFinalizer finalizer(th);
00046
00047
00048 th->ready();
00049
00050
00051 th->run();
00052 } catch (const std::exception &ex) {
00053
00054 IBRCOMMON_LOGGER_DEBUG(40) << "Exception during thread execution: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00055 }
00056
00057
00058 Thread::exit();
00059
00060 return NULL;
00061 }
00062
00063 Thread::Thread(size_t size, bool delete_on_exit)
00064 : _cancel(false), tid(0), stack(size), priority(0), _ready(false), __delete_on_exit(delete_on_exit)
00065 {
00066 pthread_attr_init(&attr);
00067 }
00068
00069 void Thread::yield(void)
00070 {
00071 #if defined(HAVE_PTHREAD_YIELD_NP)
00072 pthread_yield_np();
00073 #elif defined(HAVE_PTHREAD_YIELD)
00074 pthread_yield();
00075 #else
00076 sched_yield();
00077 #endif
00078 }
00079
00080 void Thread::testcancel()
00081 {
00082 ::pthread_testcancel();
00083 }
00084
00085 void Thread::concurrency(int level)
00086 {
00087 #if defined(HAVE_PTHREAD_SETCONCURRENCY)
00088 pthread_setconcurrency(level);
00089 #endif
00090 }
00091
00092 void Thread::sleep(size_t timeout)
00093 {
00094 timespec ts;
00095 ts.tv_sec = timeout / 1000l;
00096 ts.tv_nsec = (timeout % 1000l) * 1000000l;
00097 #if defined(HAVE_PTHREAD_DELAY)
00098 pthread_delay(&ts);
00099 #elif defined(HAVE_PTHREAD_DELAY_NP)
00100 pthread_delay_np(&ts);
00101 #else
00102 usleep(timeout * 1000);
00103 #endif
00104 }
00105
00106 Thread::~Thread()
00107 {
00108 pthread_attr_destroy(&attr);
00109 }
00110
00111 void Thread::detach(void)
00112 {
00113 pthread_detach(tid);
00114 }
00115
00116 void Thread::exit(void)
00117 {
00118 pthread_exit(NULL);
00119 }
00120
00121 int Thread::kill(int sig)
00122 {
00123 if (tid == 0) return -1;
00124 return pthread_kill(tid, sig);
00125 }
00126
00127 void Thread::cancel() throw (ThreadException)
00128 {
00129 ibrcommon::MutexLock l(_cancelLock);
00130 if (_cancel)
00131 return;
00132 if (!__cancellation())
00133 {
00134
00135 if (Thread::kill(0) == 0)
00136 {
00137 int ret = pthread_cancel(tid);
00138 if ( 0 != ret) throw ThreadException(ret, "pthread_cancel");
00139 }
00140 }
00141 _cancel=true;
00142 }
00143
00144 int Thread::enableCancel(int &state)
00145 {
00146 return pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &state);
00147 }
00148
00149 int Thread::disableCancel(int &state)
00150 {
00151 return pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &state);
00152 }
00153
00154 int Thread::restoreCancel(const int &state)
00155 {
00156 return pthread_setcancelstate(state, NULL);
00157 }
00158
00159 int Thread::setCancel(bool val)
00160 {
00161 int old = 0;
00162
00163 if (val)
00164 {
00165 return pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &old);
00166 }
00167 else
00168 {
00169 return pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &old);
00170 }
00171 }
00172
00173 bool Thread::equal(pthread_t t1, pthread_t t2)
00174 {
00175 return (pthread_equal(t1, t2) != 0);
00176 }
00177
00178 void Thread::ready()
00179 {
00180 ibrcommon::MutexLock l(_readycond);
00181 _ready = true;
00182 _readycond.signal(true);
00183 }
00184
00185 void Thread::waitready()
00186 {
00187 ibrcommon::MutexLock l(_readycond);
00188 while (_ready == false)
00189 {
00190 _readycond.wait();
00191 }
00192 }
00193
00194 Thread::CancelProtector::CancelProtector(bool enable) : _oldstate(0)
00195 {
00196 if (enable)
00197 {
00198 Thread::enableCancel(_oldstate);
00199 }
00200 else
00201 {
00202 Thread::disableCancel(_oldstate);
00203 }
00204 }
00205
00206 Thread::CancelProtector::~CancelProtector()
00207 {
00208 Thread::restoreCancel(_oldstate);
00209 }
00210
00211 JoinableThread::JoinableThread(size_t size)
00212 : Thread(size, false), running_mutex(ibrcommon::Mutex::MUTEX_ERRORCHECK), running(false), joined(false)
00213 {
00214 }
00215
00216 JoinableThread::~JoinableThread()
00217 {
00218 join();
00219 }
00220
00221 void JoinableThread::start(int adj) throw (ThreadException)
00222 {
00223 int result;
00224
00225 ibrcommon::MutexLock l(running_mutex);
00226 if(running)
00227 return;
00228
00229 priority = adj;
00230
00231 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
00232 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00233
00234
00235 #ifdef PTHREAD_STACK_MIN
00236 if(stack && stack < PTHREAD_STACK_MIN)
00237 stack = PTHREAD_STACK_MIN;
00238 #else
00239 if(stack && stack < 2)
00240 stack = 0;
00241 #endif
00242
00243 if(stack)
00244 pthread_attr_setstacksize(&attr, stack);
00245 result = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00246
00247 switch (result)
00248 {
00249 case EAGAIN:
00250 throw ThreadException(result, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00251 case EINVAL:
00252 throw ThreadException(result, "The value specified by attr is invalid.");
00253 case EPERM:
00254 throw ThreadException(result, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00255 }
00256
00257
00258 waitready();
00259
00260 running = true;
00261 }
00262
00263 void JoinableThread::stop() throw (ThreadException)
00264 {
00265
00266
00267 try {
00268 ibrcommon::MutexLock l(running_mutex);
00269 Thread::cancel();
00270 } catch (ThreadException) {
00271 throw;
00272 } catch (std::exception) {
00273 }
00274 }
00275
00276 void JoinableThread::join(void)
00277 {
00278 try {
00279 pthread_t self = pthread_self();
00280
00281 {
00282 ibrcommon::MutexLock l(running_mutex);
00283 if(running && equal(tid, self))
00284 Thread::exit();
00285
00286
00287 if (!running) return;
00288 }
00289
00290
00291 ibrcommon::MutexLock l(join_mutex);
00292 if (joined || (tid == 0)) return;
00293
00294 if(!pthread_join(tid, NULL))
00295 {
00296 ibrcommon::MutexLock l(running_mutex);
00297 running = false;
00298 joined = true;
00299 }
00300 } catch (const std::exception &ex) {
00301 IBRCOMMON_LOGGER(error) << "Exception on Thread[" << tid << "]::join(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00302 }
00303 }
00304
00305 DetachedThread::DetachedThread(size_t size) : Thread(size, true)
00306 {
00307 stack = size;
00308 }
00309
00310 void DetachedThread::exit(void)
00311 {
00312 pthread_exit(NULL);
00313 }
00314
00315 DetachedThread::~DetachedThread()
00316 {
00317 }
00318
00319 void DetachedThread::start(int adj) throw (ThreadException)
00320 {
00321 priority = adj;
00322 #ifndef __PTH__
00323 pthread_attr_t attr;
00324 pthread_attr_init(&attr);
00325 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00326 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00327 #endif
00328
00329 #ifdef PTHREAD_STACK_MIN
00330 if(stack && stack < PTHREAD_STACK_MIN)
00331 stack = PTHREAD_STACK_MIN;
00332 #else
00333 if(stack && stack < 2)
00334 stack = 0;
00335 #endif
00336 #ifdef __PTH__
00337 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00338 #else
00339 if(stack)
00340 pthread_attr_setstacksize(&attr, stack);
00341 int ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00342 pthread_attr_destroy(&attr);
00343
00344 switch (ret)
00345 {
00346 case EAGAIN:
00347 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00348 case EINVAL:
00349 throw ThreadException(ret, "The value specified by attr is invalid.");
00350 case EPERM:
00351 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00352 }
00353 #endif
00354 }
00355
00356 void DetachedThread::stop() throw (ThreadException)
00357 {
00358
00359
00360 try {
00361 Thread::cancel();
00362 } catch (ThreadException) {
00363 throw;
00364 } catch (std::exception) {
00365 }
00366 }
00367 }