00001 #ifndef QPID_SYS_BLOCKINGQUEUE_H
00002 #define QPID_SYS_BLOCKINGQUEUE_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "Waitable.h"
00026
00027 #include <queue>
00028
00029 namespace qpid {
00030 namespace sys {
00031
00035 template <class T>
00036 class BlockingQueue
00037 {
00038 mutable sys::Waitable lock;
00039 std::queue<T> queue;
00040 bool closed;
00041
00042 public:
00043 BlockingQueue() : closed(false) {}
00044 ~BlockingQueue() { close(); }
00045
00047 T pop()
00048 {
00049 Waitable::ScopedLock l(lock);
00050 if (!queueWait()) throw ClosedException();
00051 return popInternal();
00052 }
00053
00057 bool tryPop(T& outValue) {
00058 Waitable::ScopedLock l(lock);
00059 if (queue.empty()) return false;
00060 outValue = popInternal();
00061 return true;
00062 }
00063
00067 T tryPop(const T& valueIfEmpty=T()) {
00068 T result=valueIfEmpty;
00069 tryPop(result);
00070 return result;
00071 }
00072
00074 void push(const T& t)
00075 {
00076 Waitable::ScopedLock l(lock);
00077 queue.push(t);
00078 queueNotify(0);
00079 }
00080
00085 void close()
00086 {
00087 Waitable::ScopedLock l(lock);
00088 if (!closed) {
00089 closed = true;
00090 lock.notifyAll();
00091 lock.waitWaiters();
00092 }
00093 }
00094
00096 void open() {
00097 Waitable::ScopedLock l(lock);
00098 closed=false;
00099 }
00100
00101 bool isClosed() const {
00102 Waitable::ScopedLock l(lock);
00103 return closed;
00104 }
00105
00106 bool empty() const {
00107 Waitable::ScopedLock l(lock);
00108 return queue.empty();
00109 }
00110 size_t size() const {
00111 Waitable::ScopedLock l(lock);
00112 return queue.size();
00113 }
00114
00115 private:
00116
00117 void queueNotify(size_t ignore) {
00118 if (!queue.empty() && lock.hasWaiters()>ignore)
00119 lock.notify();
00120 }
00121
00122 bool queueWait() {
00123 Waitable::ScopedWait w(lock);
00124 while (!closed && queue.empty())
00125 lock.wait();
00126 return !queue.empty();
00127 }
00128
00129 T popInternal() {
00130 T t=queue.front();
00131 queue.pop();
00132 queueNotify(1);
00133 return t;
00134 }
00135
00136 };
00137
00138 }}
00139
00140
00141
00142 #endif