|
IBR-DTNSuite 0.6
|
00001 /* 00002 * Queue.h 00003 * 00004 * Created on: 10.06.2010 00005 * Author: morgenro 00006 */ 00007 00008 #ifndef IBRCOMMON_QUEUE_H_ 00009 #define IBRCOMMON_QUEUE_H_ 00010 00011 #include "ibrcommon/thread/MutexLock.h" 00012 #include "ibrcommon/thread/Conditional.h" 00013 #include "ibrcommon/Exceptions.h" 00014 #include "ibrcommon/thread/Semaphore.h" 00015 #include "ibrcommon/thread/Thread.h" 00016 #include <queue> 00017 #include <iostream> 00018 00019 namespace ibrcommon 00020 { 00021 class QueueUnblockedException : public ibrcommon::Exception 00022 { 00023 public: 00024 enum type_t 00025 { 00026 QUEUE_ABORT = 0, 00027 QUEUE_ERROR = 1, 00028 QUEUE_TIMEOUT = 2 00029 }; 00030 00031 QueueUnblockedException(const type_t r, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what), reason(r) 00032 { 00033 }; 00034 00035 QueueUnblockedException(const ibrcommon::Conditional::ConditionalAbortException &ex, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what) 00036 { 00037 switch (ex.reason) 00038 { 00039 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT: 00040 reason = QUEUE_ABORT; 00041 _what = "queue function aborted in " + what; 00042 break; 00043 00044 case ibrcommon::Conditional::ConditionalAbortException::COND_ERROR: 00045 reason = QUEUE_ERROR; 00046 _what = "queue function error in " + what; 00047 break; 00048 00049 case ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT: 00050 reason = QUEUE_TIMEOUT; 00051 _what = "queue function timeout in " + what; 00052 break; 00053 } 00054 }; 00055 00056 type_t reason; 00057 }; 00058 00059 template <class T> 00060 class Queue 00061 { 00062 ibrcommon::Conditional _cond; 00063 std::queue<T> _queue; 00064 ibrcommon::Semaphore _sem; 00065 bool _limit; 00066 00067 public: 00068 Queue(size_t max = 0) : _sem(max), _limit(max > 0) 00069 {}; 00070 00071 virtual ~Queue() 00072 { 00073 abort(); 00074 }; 00075 00076 /* Test whether container is empty (public member function) */ 00077 bool empty ( ) 00078 { 00079 ibrcommon::MutexLock l(_cond); 00080 return _queue.empty(); 00081 } 00082 00083 /* Return size (public member function) */ 00084 size_t size ( ) const 00085 { 00086 return _queue.size(); 00087 } 00088 00089 /* Access next element (public member function) */ 00090 T& front ( ) 00091 { 00092 ibrcommon::MutexLock l(_cond); 00093 return _queue.front(); 00094 } 00095 00096 const T& front ( ) const 00097 { 00098 ibrcommon::MutexLock l(_cond); 00099 return _queue.front(); 00100 } 00101 00102 /* Access last element (public member function) */ 00103 T& back ( ) 00104 { 00105 ibrcommon::MutexLock l(_cond); 00106 return _queue.back(); 00107 } 00108 00109 const T& back ( ) const 00110 { 00111 ibrcommon::MutexLock l(_cond); 00112 return _queue.back(); 00113 } 00114 00115 /* Insert element (public member function) */ 00116 void push ( const T& x ) 00117 { 00118 if (_limit) _sem.wait(); 00119 00120 ibrcommon::MutexLock l(_cond); 00121 _queue.push(x); 00122 _cond.signal(true); 00123 } 00124 00125 /* Delete next element (public member function) */ 00126 void pop () 00127 { 00128 ibrcommon::MutexLock l(_cond); 00129 __pop(); 00130 } 00131 00132 T getnpop(bool blocking = false, size_t timeout = 0) throw (QueueUnblockedException) 00133 { 00134 try { 00135 ibrcommon::MutexLock l(_cond); 00136 if (_queue.empty()) 00137 { 00138 if (blocking) 00139 { 00140 if (timeout == 0) 00141 { 00142 __wait(QUEUE_NOT_EMPTY); 00143 } 00144 else 00145 { 00146 __wait(QUEUE_NOT_EMPTY, timeout); 00147 } 00148 } 00149 else 00150 { 00151 throw QueueUnblockedException(QueueUnblockedException::QUEUE_ABORT, "getnpop(): queue is empty!"); 00152 } 00153 } 00154 00155 T ret = _queue.front(); 00156 __pop(); 00157 return ret; 00158 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) { 00159 throw QueueUnblockedException(ex, "getnpop()"); 00160 } 00161 } 00162 00163 void abort() throw () 00164 { 00165 ibrcommon::MutexLock l(_cond); 00166 _cond.abort(); 00167 } 00168 00169 enum WAIT_MODES 00170 { 00171 QUEUE_NOT_EMPTY = 0, 00172 QUEUE_EMPTY = 1 00173 }; 00174 00175 void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException) 00176 { 00177 ibrcommon::MutexLock l(_cond); 00178 if (timeout == 0) 00179 { 00180 __wait(mode); 00181 } 00182 else 00183 { 00184 __wait(mode, timeout); 00185 } 00186 } 00187 00188 class Locked 00189 { 00190 public: 00191 Locked(Queue<T> &queue) 00192 : _queue(queue), _lock(queue._cond) 00193 { 00194 }; 00195 00196 virtual ~Locked() 00197 { 00198 }; 00199 00200 void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException) 00201 { 00202 if (timeout == 0) 00203 { 00204 __wait(mode); 00205 } 00206 else 00207 { 00208 __wait(mode, timeout); 00209 } 00210 } 00211 00212 void pop() 00213 { 00214 _queue.__pop(); 00215 } 00216 00217 const T& front() const 00218 { 00219 return _queue._queue.front(); 00220 } 00221 00222 T& front() 00223 { 00224 return _queue._queue.front(); 00225 } 00226 00227 bool empty() 00228 { 00229 return _queue._queue.empty(); 00230 } 00231 00232 size_t size() 00233 { 00234 return _queue._queue.size(); 00235 } 00236 00237 private: 00238 Queue<T> &_queue; 00239 ibrcommon::MutexLock _lock; 00240 }; 00241 00242 typename Queue<T>::Locked exclusive() 00243 { 00244 return typename Queue<T>::Locked(*this); 00245 } 00246 00247 protected: 00248 void __push( const T& x ) 00249 { 00250 _queue.push(x); 00251 _cond.signal(true); 00252 } 00253 00254 void __pop() 00255 { 00256 if (!_queue.empty()) 00257 { 00258 _queue.pop(); 00259 if (_limit) _sem.post(); 00260 _cond.signal(true); 00261 } 00262 } 00263 00264 void __wait(const WAIT_MODES mode) throw (QueueUnblockedException) 00265 { 00266 try { 00267 switch (mode) 00268 { 00269 case QUEUE_NOT_EMPTY: 00270 { 00271 while (_queue.empty()) 00272 { 00273 _cond.wait(); 00274 } 00275 break; 00276 } 00277 00278 case QUEUE_EMPTY: 00279 { 00280 while (!_queue.empty()) 00281 { 00282 _cond.wait(); 00283 } 00284 break; 00285 } 00286 } 00287 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) { 00288 switch (ex.reason) 00289 { 00290 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT: 00291 _cond.reset(); 00292 break; 00293 00294 default: 00295 break; 00296 } 00297 00298 throw QueueUnblockedException(ex, "__wait(): Queue is not empty!"); 00299 } 00300 } 00301 00302 void __wait(const WAIT_MODES mode, const size_t timeout) throw (QueueUnblockedException) 00303 { 00304 try { 00305 struct timespec ts; 00306 Conditional::gettimeout(timeout, &ts); 00307 00308 switch (mode) 00309 { 00310 case QUEUE_NOT_EMPTY: 00311 { 00312 while (_queue.empty()) 00313 { 00314 _cond.wait(&ts); 00315 } 00316 break; 00317 } 00318 00319 case QUEUE_EMPTY: 00320 { 00321 while (!_queue.empty()) 00322 { 00323 _cond.wait(&ts); 00324 } 00325 break; 00326 } 00327 } 00328 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) { 00329 switch (ex.reason) 00330 { 00331 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT: 00332 _cond.reset(); 00333 break; 00334 00335 default: 00336 break; 00337 } 00338 00339 throw QueueUnblockedException(ex, "__wait(): Queue is not empty!"); 00340 } 00341 } 00342 }; 00343 } 00344 00345 #endif /* IBRCOMMON_QUEUE_H_ */