00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _DeliveryRecord_
00022 #define _DeliveryRecord_
00023
00024 #include <algorithm>
00025 #include <list>
00026 #include <vector>
00027 #include <ostream>
00028 #include "qpid/framing/AccumulatedAck.h"
00029 #include "Queue.h"
00030 #include "Consumer.h"
00031 #include "DeliveryId.h"
00032 #include "DeliveryToken.h"
00033 #include "Message.h"
00034 #include "Prefetch.h"
00035
00036 namespace qpid {
00037 namespace broker {
00038 class SemanticState;
00039
00043 class DeliveryRecord{
00044 QueuedMessage msg;
00045 mutable Queue::shared_ptr queue;
00046 const std::string tag;
00047 DeliveryToken::shared_ptr token;
00048 DeliveryId id;
00049 bool acquired;
00050 const bool pull;
00051 bool cancelled;
00052 const uint32_t credit;
00053 const uint64_t size;
00054
00055 bool completed;
00056 bool ended;
00057
00058 public:
00059 DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
00060 const DeliveryId id, bool acquired, bool confirmed = false);
00061 DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
00062
00063 bool matches(DeliveryId tag) const;
00064 bool matchOrAfter(DeliveryId tag) const;
00065 bool after(DeliveryId tag) const;
00066 bool coveredBy(const framing::AccumulatedAck* const range) const;
00067
00068 void dequeue(TransactionContext* ctxt = 0) const;
00069 void requeue() const;
00070 void release(bool setRedelivered);
00071 void reject();
00072 void cancel(const std::string& tag);
00073 void redeliver(SemanticState* const);
00074 void acquire(DeliveryIds& results);
00075 void complete();
00076 void accept(TransactionContext* ctxt);
00077 void setEnded();
00078
00079 bool isAcquired() const { return acquired; }
00080 bool isComplete() const { return completed; }
00081 bool isRedundant() const { return ended && completed; }
00082
00083 uint32_t getCredit() const;
00084 void addTo(Prefetch&) const;
00085 void subtractFrom(Prefetch&) const;
00086 const std::string& getTag() const { return tag; }
00087 bool isPull() const { return pull; }
00088 friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
00089 friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
00090 };
00091
00092 typedef std::list<DeliveryRecord> DeliveryRecords;
00093 typedef std::list<DeliveryRecord>::iterator ack_iterator;
00094
00095 struct AckRange
00096 {
00097 ack_iterator start;
00098 ack_iterator end;
00099 AckRange(ack_iterator _start, ack_iterator _end) : start(_start), end(_end) {}
00100 };
00101
00102 struct AcquireFunctor
00103 {
00104 DeliveryIds& results;
00105
00106 AcquireFunctor(DeliveryIds& _results) : results(_results) {}
00107
00108 void operator()(DeliveryRecord& record)
00109 {
00110 record.acquire(results);
00111 }
00112 };
00113
00114 }
00115 }
00116
00117
00118 #endif