00001 /* 00002 * ThreadSafeQueue.h 00003 * 00004 * Created on: 10.06.2010 00005 * Author: morgenro 00006 */ 00007 00008 #ifndef THREADSAFEQUEUE_H_ 00009 #define THREADSAFEQUEUE_H_ 00010 00011 #include "ibrcommon/thread/MutexLock.h" 00012 #include "ibrcommon/thread/Conditional.h" 00013 #include <queue> 00014 00015 namespace ibrcommon 00016 { 00017 template <class T> 00018 class LockedQueue 00019 { 00020 public: 00021 LockedQueue(ibrcommon::Mutex &mutex, std::queue<T> &queue) : _lock(mutex), _queue(queue) 00022 { 00023 }; 00024 00025 virtual ~LockedQueue() 00026 { 00027 }; 00028 00029 std::queue<T>& operator*() 00030 { 00031 return _queue; 00032 } 00033 00034 private: 00035 ibrcommon::MutexLock _lock; 00036 std::queue<T> &_queue; 00037 }; 00038 00039 template <class T> 00040 class ThreadSafeQueue : public ibrcommon::Conditional 00041 { 00042 public: 00043 ThreadSafeQueue() : _queue(), _unblock(false) 00044 {}; 00045 00046 virtual ~ThreadSafeQueue() 00047 {}; 00048 00049 /* Test whether container is empty (public member function) */ 00050 bool empty ( ) 00051 { 00052 ibrcommon::MutexLock l(*this); 00053 return _queue.empty(); 00054 } 00055 00056 /* Return size (public member function) */ 00057 size_t size ( ) 00058 { 00059 ibrcommon::MutexLock l(*this); 00060 return _queue.size(); 00061 } 00062 00063 /* Access next element (public member function) */ 00064 T front ( ) 00065 { 00066 ibrcommon::MutexLock l(*this); 00067 return _queue.front(); 00068 } 00069 00070 const T front ( ) const 00071 { 00072 ibrcommon::MutexLock l(*this); 00073 return _queue.front(); 00074 } 00075 00076 /* Access last element (public member function) */ 00077 T back ( ) 00078 { 00079 ibrcommon::MutexLock l(*this); 00080 return _queue.back(); 00081 } 00082 00083 const T back ( ) const 00084 { 00085 ibrcommon::MutexLock l(*this); 00086 return _queue.back(); 00087 } 00088 00089 /* Insert element (public member function) */ 00090 void push ( const T& x ) 00091 { 00092 ibrcommon::MutexLock l(*this); 00093 _queue.push(x); 00094 (*this).signal(true); 00095 } 00096 00097 /* Delete next element (public member function) */ 00098 void pop ( ) 00099 { 00100 ibrcommon::MutexLock l(*this); 00101 _queue.pop(); 00102 } 00103 00104 T frontpop() throw (ibrcommon::Exception) 00105 { 00106 ibrcommon::MutexLock l(*this); 00107 if (_queue.empty()) 00108 { 00109 throw ibrcommon::Exception("pop on empty queue"); 00110 } 00111 00112 T ret = _queue.front(); 00113 _queue.pop(); 00114 return ret; 00115 } 00116 00117 T blockingpop(size_t timeout = 0) throw (ibrcommon::Exception) 00118 { 00119 ibrcommon::MutexLock l(*this); 00120 if (_unblock) throw ibrcommon::Exception("queue unblocked"); 00121 00122 if (_queue.empty()) 00123 { 00124 if (timeout > 0) 00125 { 00126 (*this).wait(timeout); 00127 } 00128 else 00129 { 00130 (*this).wait(); 00131 } 00132 00133 if (_queue.empty() || _unblock) throw ibrcommon::Exception("queue unblocked without an element"); 00134 } 00135 00136 T ret = _queue.front(); 00137 _queue.pop(); 00138 return ret; 00139 } 00140 00141 void unblock() throw () 00142 { 00143 ibrcommon::MutexLock l(*this); 00144 _unblock = true; 00145 (*this).signal(true); 00146 } 00147 00148 void reset() throw () 00149 { 00150 ibrcommon::MutexLock l(*this); 00151 _unblock = false; 00152 } 00153 00154 LockedQueue<T> LockedAccess() 00155 { 00156 return LockedQueue<T>(*this, _queue); 00157 } 00158 00159 protected: 00160 std::queue<T> _queue; 00161 bool _unblock; 00162 }; 00163 } 00164 00165 #endif /* THREADSAFEQUEUE_H_ */
1.6.3