00001 #ifndef QPID_BROKER_SESSION_H
00002 #define QPID_BROKER_SESSION_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "qpid/framing/Uuid.h"
00026 #include "qpid/framing/FrameHandler.h"
00027 #include "qpid/framing/SessionState.h"
00028 #include "qpid/framing/SequenceSet.h"
00029 #include "qpid/framing/ProtocolVersion.h"
00030 #include "qpid/sys/Mutex.h"
00031 #include "qpid/sys/Time.h"
00032 #include "qpid/management/Manageable.h"
00033 #include "qpid/management/Session.h"
00034 #include "SessionAdapter.h"
00035 #include "DeliveryAdapter.h"
00036 #include "IncompleteMessageList.h"
00037 #include "MessageBuilder.h"
00038 #include "SessionContext.h"
00039 #include "SemanticState.h"
00040
00041 #include <boost/noncopyable.hpp>
00042 #include <boost/scoped_ptr.hpp>
00043
00044 #include <set>
00045 #include <vector>
00046 #include <ostream>
00047
00048 namespace qpid {
00049
00050 namespace framing {
00051 class AMQP_ClientProxy;
00052 }
00053
00054 namespace broker {
00055
00056 class Broker;
00057 class ConnectionState;
00058 class Message;
00059 class SessionHandler;
00060 class SessionManager;
00061
00066 class SessionState : public framing::SessionState,
00067 public SessionContext,
00068 public DeliveryAdapter,
00069 public management::Manageable
00070 {
00071 public:
00072 ~SessionState();
00073 bool isAttached() const { return handler; }
00074
00075 void detach();
00076 void attach(SessionHandler& handler);
00077
00078
00079 SessionHandler* getHandler();
00080
00082 framing::AMQP_ClientProxy& getProxy();
00083
00085 ConnectionState& getConnection();
00086 bool isLocal(const ConnectionToken* t) const;
00087
00088 uint32_t getTimeout() const { return timeout; }
00089 void setTimeout(uint32_t t) { timeout = t; }
00090
00091 Broker& getBroker() { return broker; }
00092 framing::ProtocolVersion getVersion() const { return version; }
00093
00095 void activateOutput();
00096
00097 void handle(framing::AMQFrame& frame);
00098
00099 void complete(const framing::SequenceSet& ranges);
00100 void sendCompletion();
00101
00102
00103 DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
00104
00105
00106 management::ManagementObject::shared_ptr GetManagementObject (void) const;
00107 management::Manageable::status_t
00108 ManagementMethod (uint32_t methodId, management::Args& args);
00109
00110
00111 SessionState(SessionManager*,
00112 SessionHandler* out,
00113 uint32_t timeout,
00114 uint32_t ackInterval,
00115 std::string& name);
00116
00117
00118 framing::SequenceSet completed;
00119 framing::SequenceSet knownCompleted;
00120 framing::SequenceNumber nextIn;
00121 framing::SequenceNumber nextOut;
00122
00123 private:
00124 typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
00125
00126 SessionManager* factory;
00127 SessionHandler* handler;
00128 framing::Uuid id;
00129 uint32_t timeout;
00130 sys::AbsTime expiry;
00131 Broker& broker;
00132 framing::ProtocolVersion version;
00133 sys::Mutex lock;
00134 bool ignoring;
00135 std::string name;
00136
00137 SemanticState semanticState;
00138 SessionAdapter adapter;
00139 MessageBuilder msgBuilder;
00140 IncompleteMessageList incomplete;
00141
00142 RangedOperation ackOp;
00143 IncompleteMessageList::CompletionListener enqueuedOp;
00144
00145 management::Session::shared_ptr mgmtObject;
00146 void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id);
00147 void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id);
00148 void enqueued(boost::intrusive_ptr<Message> msg);
00149
00150 friend class SessionManager;
00151 };
00152
00153
00154 inline std::ostream& operator<<(std::ostream& out, const SessionState& session) {
00155 return out << session.getId();
00156 }
00157
00158 }}
00159
00160
00161
00162 #endif