Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef IBRCOMMON_QUEUE_H_
00009 #define IBRCOMMON_QUEUE_H_
00010
00011 #include "ibrcommon/thread/MutexLock.h"
00012 #include "ibrcommon/thread/Conditional.h"
00013 #include "ibrcommon/Exceptions.h"
00014 #include "ibrcommon/thread/Semaphore.h"
00015 #include "ibrcommon/thread/Thread.h"
00016 #include <queue>
00017 #include <iostream>
00018
00019 namespace ibrcommon
00020 {
00021 class QueueUnblockedException : public ibrcommon::Exception
00022 {
00023 public:
00024 enum type_t
00025 {
00026 QUEUE_ABORT = 0,
00027 QUEUE_ERROR = 1,
00028 QUEUE_TIMEOUT = 2
00029 };
00030
00031 QueueUnblockedException(const type_t r, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what), reason(r)
00032 {
00033 };
00034
00035 QueueUnblockedException(const ibrcommon::Conditional::ConditionalAbortException &ex, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what)
00036 {
00037 switch (ex.reason)
00038 {
00039 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00040 reason = QUEUE_ABORT;
00041 _what = "queue function aborted in " + what;
00042 break;
00043
00044 case ibrcommon::Conditional::ConditionalAbortException::COND_ERROR:
00045 reason = QUEUE_ERROR;
00046 _what = "queue function error in " + what;
00047 break;
00048
00049 case ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT:
00050 reason = QUEUE_TIMEOUT;
00051 _what = "queue function timeout in " + what;
00052 break;
00053 }
00054 };
00055
00056 type_t reason;
00057 };
00058
00059 template <class T>
00060 class Queue
00061 {
00062 ibrcommon::Conditional _cond;
00063 std::queue<T> _queue;
00064 ibrcommon::Semaphore _sem;
00065 bool _limit;
00066
00067 public:
00068 Queue(size_t max = 0) : _sem(max), _limit(max > 0)
00069 {};
00070
00071 virtual ~Queue()
00072 {
00073 abort();
00074 };
00075
00076
00077 bool empty ( )
00078 {
00079 ibrcommon::MutexLock l(_cond);
00080 return _queue.empty();
00081 }
00082
00083
00084 size_t size ( ) const
00085 {
00086 return _queue.size();
00087 }
00088
00089
00090 T& front ( )
00091 {
00092 ibrcommon::MutexLock l(_cond);
00093 return _queue.front();
00094 }
00095
00096 const T& front ( ) const
00097 {
00098 ibrcommon::MutexLock l(_cond);
00099 return _queue.front();
00100 }
00101
00102
00103 T& back ( )
00104 {
00105 ibrcommon::MutexLock l(_cond);
00106 return _queue.back();
00107 }
00108
00109 const T& back ( ) const
00110 {
00111 ibrcommon::MutexLock l(_cond);
00112 return _queue.back();
00113 }
00114
00115
00116 void push ( const T& x )
00117 {
00118 if (_limit) _sem.wait();
00119
00120 ibrcommon::MutexLock l(_cond);
00121 _queue.push(x);
00122 _cond.signal(true);
00123 }
00124
00125
00126 void pop ()
00127 {
00128 ibrcommon::MutexLock l(_cond);
00129 __pop();
00130 }
00131
00132 T getnpop(bool blocking = false, size_t timeout = 0) throw (QueueUnblockedException)
00133 {
00134 try {
00135 ibrcommon::MutexLock l(_cond);
00136 if (_queue.empty())
00137 {
00138 if (blocking)
00139 {
00140 if (timeout == 0)
00141 {
00142 __wait(QUEUE_NOT_EMPTY);
00143 }
00144 else
00145 {
00146 __wait(QUEUE_NOT_EMPTY, timeout);
00147 }
00148 }
00149 else
00150 {
00151 throw QueueUnblockedException(QueueUnblockedException::QUEUE_ABORT, "getnpop(): queue is empty!");
00152 }
00153 }
00154
00155 T ret = _queue.front();
00156 __pop();
00157 return ret;
00158 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00159 throw QueueUnblockedException(ex, "getnpop()");
00160 }
00161 }
00162
00163 void abort() throw ()
00164 {
00165 ibrcommon::MutexLock l(_cond);
00166 _cond.abort();
00167 }
00168
00169 enum WAIT_MODES
00170 {
00171 QUEUE_NOT_EMPTY = 0,
00172 QUEUE_EMPTY = 1
00173 };
00174
00175 void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException)
00176 {
00177 ibrcommon::MutexLock l(_cond);
00178 if (timeout == 0)
00179 {
00180 __wait(mode);
00181 }
00182 else
00183 {
00184 __wait(mode, timeout);
00185 }
00186 }
00187
00188 class Locked
00189 {
00190 public:
00191 Locked(Queue<T> &queue)
00192 : _queue(queue), _lock(queue._cond)
00193 {
00194 };
00195
00196 virtual ~Locked()
00197 {
00198 };
00199
00200 void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException)
00201 {
00202 if (timeout == 0)
00203 {
00204 __wait(mode);
00205 }
00206 else
00207 {
00208 __wait(mode, timeout);
00209 }
00210 }
00211
00212 void pop()
00213 {
00214 _queue.__pop();
00215 }
00216
00217 const T& front() const
00218 {
00219 return _queue._queue.front();
00220 }
00221
00222 T& front()
00223 {
00224 return _queue._queue.front();
00225 }
00226
00227 bool empty()
00228 {
00229 return _queue._queue.empty();
00230 }
00231
00232 size_t size()
00233 {
00234 return _queue._queue.size();
00235 }
00236
00237 private:
00238 Queue<T> &_queue;
00239 ibrcommon::MutexLock _lock;
00240 };
00241
00242 typename Queue<T>::Locked exclusive()
00243 {
00244 return typename Queue<T>::Locked(*this);
00245 }
00246
00247 protected:
00248 void __push( const T& x )
00249 {
00250 _queue.push(x);
00251 _cond.signal(true);
00252 }
00253
00254 void __pop()
00255 {
00256 if (!_queue.empty())
00257 {
00258 _queue.pop();
00259 if (_limit) _sem.post();
00260 _cond.signal(true);
00261 }
00262 }
00263
00264 void __wait(const WAIT_MODES mode) throw (QueueUnblockedException)
00265 {
00266 try {
00267 switch (mode)
00268 {
00269 case QUEUE_NOT_EMPTY:
00270 {
00271 while (_queue.empty())
00272 {
00273 _cond.wait();
00274 }
00275 break;
00276 }
00277
00278 case QUEUE_EMPTY:
00279 {
00280 while (!_queue.empty())
00281 {
00282 _cond.wait();
00283 }
00284 break;
00285 }
00286 }
00287 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00288 switch (ex.reason)
00289 {
00290 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00291 _cond.reset();
00292 break;
00293
00294 default:
00295 break;
00296 }
00297
00298 throw QueueUnblockedException(ex, "__wait(): Queue is not empty!");
00299 }
00300 }
00301
00302 void __wait(const WAIT_MODES mode, const size_t timeout) throw (QueueUnblockedException)
00303 {
00304 try {
00305 struct timespec ts;
00306 Conditional::gettimeout(timeout, &ts);
00307
00308 switch (mode)
00309 {
00310 case QUEUE_NOT_EMPTY:
00311 {
00312 while (_queue.empty())
00313 {
00314 _cond.wait(&ts);
00315 }
00316 break;
00317 }
00318
00319 case QUEUE_EMPTY:
00320 {
00321 while (!_queue.empty())
00322 {
00323 _cond.wait(&ts);
00324 }
00325 break;
00326 }
00327 }
00328 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00329 switch (ex.reason)
00330 {
00331 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00332 _cond.reset();
00333 break;
00334
00335 default:
00336 break;
00337 }
00338
00339 throw QueueUnblockedException(ex, "__wait(): Queue is not empty!");
00340 }
00341 }
00342 };
00343 }
00344
00345 #endif