IBR-DTNSuite 0.6

ibrcommon/ibrcommon/thread/Queue.h

Go to the documentation of this file.
00001 /*
00002  * Queue.h
00003  *
00004  *  Created on: 10.06.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef IBRCOMMON_QUEUE_H_
00009 #define IBRCOMMON_QUEUE_H_
00010 
00011 #include "ibrcommon/thread/MutexLock.h"
00012 #include "ibrcommon/thread/Conditional.h"
00013 #include "ibrcommon/Exceptions.h"
00014 #include "ibrcommon/thread/Semaphore.h"
00015 #include "ibrcommon/thread/Thread.h"
00016 #include <queue>
00017 #include <iostream>
00018 
00019 namespace ibrcommon
00020 {
00021         class QueueUnblockedException : public ibrcommon::Exception
00022         {
00023         public:
00024                 enum type_t
00025                 {
00026                         QUEUE_ABORT = 0,
00027                         QUEUE_ERROR = 1,
00028                         QUEUE_TIMEOUT = 2
00029                 };
00030 
00031                 QueueUnblockedException(const type_t r, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what), reason(r)
00032                 {
00033                 };
00034 
00035                 QueueUnblockedException(const ibrcommon::Conditional::ConditionalAbortException &ex, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what)
00036                 {
00037                         switch (ex.reason)
00038                         {
00039                         case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00040                                 reason = QUEUE_ABORT;
00041                                 _what = "queue function aborted in " + what;
00042                                 break;
00043 
00044                         case ibrcommon::Conditional::ConditionalAbortException::COND_ERROR:
00045                                 reason = QUEUE_ERROR;
00046                                 _what = "queue function error in " + what;
00047                                 break;
00048 
00049                         case ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT:
00050                                 reason = QUEUE_TIMEOUT;
00051                                 _what = "queue function timeout in " + what;
00052                                 break;
00053                         }
00054                 };
00055 
00056                 type_t reason;
00057         };
00058 
00059         template <class T>
00060         class Queue
00061         {
00062                 ibrcommon::Conditional _cond;
00063                 std::queue<T> _queue;
00064                 ibrcommon::Semaphore _sem;
00065                 bool _limit;
00066 
00067         public:
00068                 Queue(size_t max = 0) : _sem(max), _limit(max > 0)
00069                 {};
00070 
00071                 virtual ~Queue()
00072                 {
00073                         abort();
00074                 };
00075 
00076                 /* Test whether container is empty (public member function) */
00077                 bool empty ( )
00078                 {
00079                         ibrcommon::MutexLock l(_cond);
00080                         return _queue.empty();
00081                 }
00082 
00083                 /* Return size (public member function) */
00084                 size_t size ( ) const
00085                 {
00086                         return _queue.size();
00087                 }
00088 
00089                 /* Access next element (public member function) */
00090                 T& front ( )
00091                 {
00092                         ibrcommon::MutexLock l(_cond);
00093                         return _queue.front();
00094                 }
00095 
00096                 const T& front ( ) const
00097                 {
00098                         ibrcommon::MutexLock l(_cond);
00099                         return _queue.front();
00100                 }
00101 
00102                 /* Access last element (public member function) */
00103                 T& back ( )
00104                 {
00105                         ibrcommon::MutexLock l(_cond);
00106                         return _queue.back();
00107                 }
00108 
00109                 const T& back ( ) const
00110                 {
00111                         ibrcommon::MutexLock l(_cond);
00112                         return _queue.back();
00113                 }
00114 
00115                 /* Insert element (public member function) */
00116                 void push ( const T& x )
00117                 {
00118                         if (_limit) _sem.wait();
00119 
00120                         ibrcommon::MutexLock l(_cond);
00121                         _queue.push(x);
00122                         _cond.signal(true);
00123                 }
00124 
00125                 /* Delete next element (public member function) */
00126                 void pop ()
00127                 {
00128                         ibrcommon::MutexLock l(_cond);
00129                         __pop();
00130                 }
00131 
00132                 T getnpop(bool blocking = false, size_t timeout = 0) throw (QueueUnblockedException)
00133                 {
00134                         try {
00135                                 ibrcommon::MutexLock l(_cond);
00136                                 if (_queue.empty())
00137                                 {
00138                                         if (blocking)
00139                                         {
00140                                                 if (timeout == 0)
00141                                                 {
00142                                                         __wait(QUEUE_NOT_EMPTY);
00143                                                 }
00144                                                 else
00145                                                 {
00146                                                         __wait(QUEUE_NOT_EMPTY, timeout);
00147                                                 }
00148                                         }
00149                                         else
00150                                         {
00151                                                 throw QueueUnblockedException(QueueUnblockedException::QUEUE_ABORT, "getnpop(): queue is empty!");
00152                                         }
00153                                 }
00154 
00155                                 T ret = _queue.front();
00156                                 __pop();
00157                                 return ret;
00158                         } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00159                                 throw QueueUnblockedException(ex, "getnpop()");
00160                         }
00161                 }
00162 
00163                 void abort() throw ()
00164                 {
00165                         ibrcommon::MutexLock l(_cond);
00166                         _cond.abort();
00167                 }
00168 
00169                 enum WAIT_MODES
00170                 {
00171                         QUEUE_NOT_EMPTY = 0,
00172                         QUEUE_EMPTY = 1
00173                 };
00174 
00175                 void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException)
00176                 {
00177                         ibrcommon::MutexLock l(_cond);
00178                         if (timeout == 0)
00179                         {
00180                                 __wait(mode);
00181                         }
00182                         else
00183                         {
00184                                 __wait(mode, timeout);
00185                         }
00186                 }
00187 
00188                 class Locked
00189                 {
00190                 public:
00191                         Locked(Queue<T> &queue)
00192                          : _queue(queue), _lock(queue._cond)
00193                         {
00194                         };
00195 
00196                         virtual ~Locked()
00197                         {
00198                         };
00199 
00200                         void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException)
00201                         {
00202                                 if (timeout == 0)
00203                                 {
00204                                         __wait(mode);
00205                                 }
00206                                 else
00207                                 {
00208                                         __wait(mode, timeout);
00209                                 }
00210                         }
00211 
00212                         void pop()
00213                         {
00214                                 _queue.__pop();
00215                         }
00216 
00217                         const T& front() const
00218                         {
00219                                 return _queue._queue.front();
00220                         }
00221 
00222                         T& front()
00223                         {
00224                                 return _queue._queue.front();
00225                         }
00226 
00227                         bool empty()
00228                         {
00229                                 return _queue._queue.empty();
00230                         }
00231 
00232                         size_t size()
00233                         {
00234                                 return _queue._queue.size();
00235                         }
00236 
00237                 private:
00238                         Queue<T> &_queue;
00239                         ibrcommon::MutexLock _lock;
00240                 };
00241 
00242                 typename Queue<T>::Locked exclusive()
00243                 {
00244                         return typename Queue<T>::Locked(*this);
00245                 }
00246 
00247         protected:
00248                 void __push( const T& x )
00249                 {
00250                         _queue.push(x);
00251                         _cond.signal(true);
00252                 }
00253 
00254                 void __pop()
00255                 {
00256                         if (!_queue.empty())
00257                         {
00258                                 _queue.pop();
00259                                 if (_limit) _sem.post();
00260                                 _cond.signal(true);
00261                         }
00262                 }
00263 
00264                 void __wait(const WAIT_MODES mode) throw (QueueUnblockedException)
00265                 {
00266                         try {
00267                                 switch (mode)
00268                                 {
00269                                         case QUEUE_NOT_EMPTY:
00270                                         {
00271                                                 while (_queue.empty())
00272                                                 {
00273                                                         _cond.wait();
00274                                                 }
00275                                                 break;
00276                                         }
00277 
00278                                         case QUEUE_EMPTY:
00279                                         {
00280                                                 while (!_queue.empty())
00281                                                 {
00282                                                         _cond.wait();
00283                                                 }
00284                                                 break;
00285                                         }
00286                                 }
00287                         } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00288                                 switch (ex.reason)
00289                                 {
00290                                 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00291                                         _cond.reset();
00292                                         break;
00293 
00294                                 default:
00295                                         break;
00296                                 }
00297 
00298                                 throw QueueUnblockedException(ex, "__wait(): Queue is not empty!");
00299                         }
00300                 }
00301 
00302                 void __wait(const WAIT_MODES mode, const size_t timeout) throw (QueueUnblockedException)
00303                 {
00304                         try {
00305                                 struct timespec ts;
00306                                 Conditional::gettimeout(timeout, &ts);
00307 
00308                                 switch (mode)
00309                                 {
00310                                         case QUEUE_NOT_EMPTY:
00311                                         {
00312                                                 while (_queue.empty())
00313                                                 {
00314                                                         _cond.wait(&ts);
00315                                                 }
00316                                                 break;
00317                                         }
00318 
00319                                         case QUEUE_EMPTY:
00320                                         {
00321                                                 while (!_queue.empty())
00322                                                 {
00323                                                         _cond.wait(&ts);
00324                                                 }
00325                                                 break;
00326                                         }
00327                                 }
00328                         } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00329                                 switch (ex.reason)
00330                                 {
00331                                 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00332                                         _cond.reset();
00333                                         break;
00334 
00335                                 default:
00336                                         break;
00337                                 }
00338 
00339                                 throw QueueUnblockedException(ex, "__wait(): Queue is not empty!");
00340                         }
00341                 }
00342         };
00343 }
00344 
00345 #endif /* IBRCOMMON_QUEUE_H_ */