00001 #ifndef QPID_BROKER_SEMANTICSTATE_H
00002 #define QPID_BROKER_SEMANTICSTATE_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "Consumer.h"
00026 #include "Deliverable.h"
00027 #include "DeliveryAdapter.h"
00028 #include "DeliveryRecord.h"
00029 #include "DeliveryToken.h"
00030 #include "DtxBuffer.h"
00031 #include "DtxManager.h"
00032 #include "NameGenerator.h"
00033 #include "Prefetch.h"
00034 #include "TxBuffer.h"
00035
00036 #include "qpid/framing/FrameHandler.h"
00037 #include "qpid/framing/AccumulatedAck.h"
00038 #include "qpid/framing/Uuid.h"
00039 #include "qpid/sys/AggregateOutput.h"
00040 #include "qpid/shared_ptr.h"
00041
00042 #include <list>
00043 #include <map>
00044 #include <vector>
00045
00046 #include <boost/intrusive_ptr.hpp>
00047
00048 namespace qpid {
00049 namespace broker {
00050
00051 class SessionContext;
00052
00057 class SemanticState : public framing::FrameHandler::Chains,
00058 public sys::OutputTask,
00059 private boost::noncopyable
00060 {
00061 class ConsumerImpl : public Consumer, public sys::OutputTask
00062 {
00063 SemanticState* const parent;
00064 const DeliveryToken::shared_ptr token;
00065 const string name;
00066 const Queue::shared_ptr queue;
00067 const bool ackExpected;
00068 const bool nolocal;
00069 const bool acquire;
00070 bool blocked;
00071 bool windowing;
00072 uint32_t msgCredit;
00073 uint32_t byteCredit;
00074
00075 bool checkCredit(boost::intrusive_ptr<Message>& msg);
00076 void allocateCredit(boost::intrusive_ptr<Message>& msg);
00077
00078 public:
00079 ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
00080 const string& name, Queue::shared_ptr queue,
00081 bool ack, bool nolocal, bool acquire);
00082 ~ConsumerImpl();
00083 OwnershipToken* getSession();
00084 bool deliver(QueuedMessage& msg);
00085 bool filter(boost::intrusive_ptr<Message> msg);
00086 bool accept(boost::intrusive_ptr<Message> msg);
00087 void notify();
00088
00089 void setWindowMode();
00090 void setCreditMode();
00091 void addByteCredit(uint32_t value);
00092 void addMessageCredit(uint32_t value);
00093 void flush();
00094 void stop();
00095 void complete(DeliveryRecord&);
00096 Queue::shared_ptr getQueue() { return queue; }
00097 bool isBlocked() const { return blocked; }
00098
00099 bool doOutput();
00100 };
00101
00102 typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
00103 typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
00104
00105 SessionContext& session;
00106 DeliveryAdapter& deliveryAdapter;
00107 Queue::shared_ptr defaultQueue;
00108 ConsumerImplMap consumers;
00109 uint32_t prefetchSize;
00110 uint16_t prefetchCount;
00111 Prefetch outstanding;
00112 NameGenerator tagGenerator;
00113 std::list<DeliveryRecord> unacked;
00114 TxBuffer::shared_ptr txBuffer;
00115 DtxBuffer::shared_ptr dtxBuffer;
00116 bool dtxSelected;
00117 DtxBufferMap suspendedXids;
00118 framing::AccumulatedAck accumulatedAck;
00119 bool flowActive;
00120 boost::shared_ptr<Exchange> cacheExchange;
00121 sys::AggregateOutput outputTasks;
00122
00123 void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
00124 void record(const DeliveryRecord& delivery);
00125 bool checkPrefetch(boost::intrusive_ptr<Message>& msg);
00126 void checkDtxTimeout();
00127 ConsumerImpl& find(const std::string& destination);
00128 void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
00129 void complete(DeliveryRecord&);
00130 AckRange findRange(DeliveryId first, DeliveryId last);
00131 void requestDispatch();
00132 void requestDispatch(ConsumerImpl&);
00133 void cancel(ConsumerImpl&);
00134
00135 public:
00136 SemanticState(DeliveryAdapter&, SessionContext&);
00137 ~SemanticState();
00138
00139 SessionContext& getSession() { return session; }
00140
00147 Queue::shared_ptr getQueue(const std::string& name) const;
00148
00149 uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
00150 uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
00151
00152 bool exists(const string& consumerTag);
00153
00157 void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
00158 bool nolocal, bool ackRequired, bool acquire, bool exclusive, const framing::FieldTable* = 0);
00159
00160 void cancel(const string& tag);
00161
00162 void setWindowMode(const std::string& destination);
00163 void setCreditMode(const std::string& destination);
00164 void addByteCredit(const std::string& destination, uint32_t value);
00165 void addMessageCredit(const std::string& destination, uint32_t value);
00166 void flush(const std::string& destination);
00167 void stop(const std::string& destination);
00168
00169 bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
00170 void startTx();
00171 void commit(MessageStore* const store, bool completeOnCommit);
00172 void rollback();
00173 void selectDtx();
00174 void startDtx(const std::string& xid, DtxManager& mgr, bool join);
00175 void endDtx(const std::string& xid, bool fail);
00176 void suspendDtx(const std::string& xid);
00177 void resumeDtx(const std::string& xid);
00178 void recover(bool requeue);
00179 void flow(bool active);
00180 DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
00181 void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
00182 void release(DeliveryId first, DeliveryId last, bool setRedelivered);
00183 void reject(DeliveryId first, DeliveryId last);
00184 void handle(boost::intrusive_ptr<Message> msg);
00185 bool doOutput() { return outputTasks.doOutput(); }
00186
00187
00188 void ackCumulative(DeliveryId deliveryTag);
00189 void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
00190
00191
00192 void completed(DeliveryId deliveryTag, DeliveryId endTag);
00193 void accepted(DeliveryId deliveryTag, DeliveryId endTag);
00194 };
00195
00196 }}
00197
00198
00199
00200
00201 #endif