00001 #ifndef _broker_Message_h
00002 #define _broker_Message_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <string>
00026 #include <vector>
00027 #include <boost/shared_ptr.hpp>
00028 #include <boost/variant.hpp>
00029 #include "PersistableMessage.h"
00030 #include "MessageAdapter.h"
00031 #include "qpid/framing/amqp_types.h"
00032 #include "qpid/sys/Mutex.h"
00033
00034 namespace qpid {
00035
00036 namespace framing {
00037 class FieldTable;
00038 class SequenceNumber;
00039 }
00040
00041 namespace broker {
00042 class ConnectionToken;
00043 class Exchange;
00044 class ExchangeRegistry;
00045 class MessageStore;
00046 class Queue;
00047
00048 class Message : public PersistableMessage {
00049 public:
00050 Message(const framing::SequenceNumber& id = framing::SequenceNumber());
00051 ~Message();
00052
00053 uint64_t getPersistenceId() const { return persistenceId; }
00054 void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
00055
00056 bool getRedelivered() const { return redelivered; }
00057 void redeliver() { redelivered = true; }
00058
00059 const ConnectionToken* getPublisher() const { return publisher; }
00060 void setPublisher(ConnectionToken* p) { publisher = p; }
00061
00062 const framing::SequenceNumber& getCommandId() { return frames.getId(); }
00063
00064 uint64_t contentSize() const;
00065
00066 std::string getRoutingKey() const;
00067 const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
00068 std::string getExchangeName() const;
00069 bool isImmediate() const;
00070 const framing::FieldTable* getApplicationHeaders() const;
00071 bool isPersistent();
00072 bool requiresAccept();
00073
00074 framing::FrameSet& getFrames() { return frames; }
00075 const framing::FrameSet& getFrames() const { return frames; }
00076
00077 template <class T> T* getProperties() {
00078 qpid::framing::AMQHeaderBody* p = frames.getHeaders();
00079 return p->get<T>(true);
00080 }
00081
00082 template <class T> const T* getProperties() const {
00083 qpid::framing::AMQHeaderBody* p = frames.getHeaders();
00084 return p->get<T>(true);
00085 }
00086
00087 template <class T> const T* getMethod() const {
00088 return frames.as<T>();
00089 }
00090
00091 template <class T> bool isA() const {
00092 return frames.isA<T>();
00093 }
00094
00095 uint32_t getRequiredCredit() const;
00096
00097 void encode(framing::Buffer& buffer) const;
00098 void encodeContent(framing::Buffer& buffer) const;
00099
00104 uint32_t encodedSize() const;
00110 uint32_t encodedHeaderSize() const;
00111 uint32_t encodedContentSize() const;
00112
00113 void decodeHeader(framing::Buffer& buffer);
00114 void decodeContent(framing::Buffer& buffer);
00115
00121 void releaseContent(MessageStore* store);
00122
00123 void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
00124 void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
00125
00126 bool isContentLoaded() const;
00127
00128 bool isExcluded(const std::vector<std::string>& excludes) const;
00129 void addTraceId(const std::string& id);
00130
00131 private:
00132 mutable sys::Mutex lock;
00133 framing::FrameSet frames;
00134 mutable boost::shared_ptr<Exchange> exchange;
00135 mutable uint64_t persistenceId;
00136 bool redelivered;
00137 bool loaded;
00138 bool staged;
00139 ConnectionToken* publisher;
00140 mutable MessageAdapter* adapter;
00141
00142 static TransferAdapter TRANSFER;
00143
00144 MessageAdapter& getAdapter() const;
00145 };
00146
00147 }}
00148
00149
00150 #endif