00001 #ifndef _broker_PersistableMessage_h
00002 #define _broker_PersistableMessage_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <string>
00026 #include <list>
00027 #include <boost/shared_ptr.hpp>
00028 #include <boost/weak_ptr.hpp>
00029 #include "Persistable.h"
00030 #include "qpid/framing/amqp_types.h"
00031 #include "qpid/sys/Monitor.h"
00032 #include "PersistableQueue.h"
00033
00034 namespace qpid {
00035 namespace broker {
00036
00037 class MessageStore;
00038
00043 class PersistableMessage : public Persistable
00044 {
00045 sys::Monitor asyncEnqueueLock;
00046 sys::Monitor asyncDequeueLock;
00047 sys::Mutex storeLock;
00048
00056 int asyncEnqueueCounter;
00057
00065 int asyncDequeueCounter;
00066 protected:
00067 typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
00068 syncList synclist;
00069 MessageStore* store;
00070 bool contentReleased;
00071
00072 inline void setContentReleased() {contentReleased = true; }
00073
00074 public:
00075 typedef boost::shared_ptr<PersistableMessage> shared_ptr;
00076
00080 virtual uint32_t encodedHeaderSize() const = 0;
00081
00082 virtual ~PersistableMessage() {};
00083
00084 PersistableMessage():
00085 asyncEnqueueCounter(0),
00086 asyncDequeueCounter(0),
00087 store(0),
00088 contentReleased(false)
00089 {}
00090
00091 void flush();
00092
00093 inline bool isContentReleased()const {return contentReleased; }
00094
00095 inline void waitForEnqueueComplete() {
00096 sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00097 while (asyncEnqueueCounter > 0) {
00098 asyncEnqueueLock.wait();
00099 }
00100 }
00101
00102 inline bool isEnqueueComplete() {
00103 sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00104 return asyncEnqueueCounter == 0;
00105 }
00106
00107 inline void enqueueComplete() {
00108 bool notify = false;
00109 {
00110 sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00111 if (asyncEnqueueCounter > 0) {
00112 if (--asyncEnqueueCounter == 0) {
00113 asyncEnqueueLock.notify();
00114 notify = true;
00115 }
00116 }
00117 }
00118 if (notify) {
00119 sys::ScopedLock<sys::Mutex> l(storeLock);
00120 if (store) {
00121 for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) {
00122 PersistableQueue::shared_ptr q(i->lock());
00123 if (q) q->notifyDurableIOComplete();
00124 }
00125
00126 }
00127 }
00128 }
00129
00130 inline void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
00131 if (_store){
00132 sys::ScopedLock<sys::Mutex> l(storeLock);
00133 store = _store;
00134 boost::weak_ptr<PersistableQueue> q(queue);
00135 synclist.push_back(q);
00136 }
00137 enqueueAsync();
00138 }
00139
00140 inline void enqueueAsync() {
00141 sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
00142 asyncEnqueueCounter++;
00143 }
00144
00145 inline bool isDequeueComplete() {
00146 sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00147 return asyncDequeueCounter == 0;
00148 }
00149
00150 inline void dequeueComplete() {
00151
00152 sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00153 if (asyncDequeueCounter > 0) {
00154 if (--asyncDequeueCounter == 0) {
00155 asyncDequeueLock.notify();
00156 }
00157 }
00158 }
00159
00160 inline void waitForDequeueComplete() {
00161 sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00162 while (asyncDequeueCounter > 0) {
00163 asyncDequeueLock.wait();
00164 }
00165 }
00166
00167 inline void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) {
00168 if (_store){
00169 sys::ScopedLock<sys::Mutex> l(storeLock);
00170 store = _store;
00171 boost::weak_ptr<PersistableQueue> q(queue);
00172 synclist.push_back(q);
00173 }
00174 dequeueAsync();
00175 }
00176
00177 inline void dequeueAsync() {
00178 sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
00179 asyncDequeueCounter++;
00180 }
00181
00182
00183 };
00184
00185 }}
00186
00187
00188 #endif