00001 #ifndef QPID_BROKER_SESSION_H
00002 #define QPID_BROKER_SESSION_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "qpid/SessionState.h"
00026 #include "qpid/framing/FrameHandler.h"
00027 #include "qpid/framing/SequenceSet.h"
00028 #include "qpid/sys/Time.h"
00029 #include "qpid/management/Manageable.h"
00030 #include "qmf/org/apache/qpid/broker/Session.h"
00031 #include "SessionAdapter.h"
00032 #include "DeliveryAdapter.h"
00033 #include "IncompleteMessageList.h"
00034 #include "MessageBuilder.h"
00035 #include "SessionContext.h"
00036 #include "SemanticState.h"
00037
00038 #include <boost/noncopyable.hpp>
00039 #include <boost/scoped_ptr.hpp>
00040
00041 #include <set>
00042 #include <vector>
00043 #include <ostream>
00044
00045 namespace qpid {
00046
00047 namespace framing {
00048 class AMQP_ClientProxy;
00049 }
00050
00051 namespace broker {
00052
00053 class Broker;
00054 class ConnectionState;
00055 class Message;
00056 class SessionHandler;
00057 class SessionManager;
00058
00063 class SessionState : public qpid::SessionState,
00064 public SessionContext,
00065 public DeliveryAdapter,
00066 public management::Manageable,
00067 public framing::FrameHandler::InOutHandler
00068 {
00069 public:
00070 SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&);
00071 ~SessionState();
00072 bool isAttached() const { return handler; }
00073
00074 void detach();
00075 void attach(SessionHandler& handler);
00076
00078 framing::AMQP_ClientProxy& getProxy();
00079
00081 ConnectionState& getConnection();
00082 bool isLocal(const ConnectionToken* t) const;
00083
00084 Broker& getBroker();
00085
00087 void activateOutput();
00088
00089 void senderCompleted(const framing::SequenceSet& ranges);
00090
00091 void sendCompletion();
00092
00093
00094 DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
00095
00096
00097 management::ManagementObject* GetManagementObject (void) const;
00098 management::Manageable::status_t
00099 ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
00100
00101 void readyToSend();
00102
00103 template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); }
00104
00105 private:
00106
00107 void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
00108 void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
00109 void enqueued(boost::intrusive_ptr<Message> msg);
00110
00111 void handleIn(framing::AMQFrame& frame);
00112 void handleOut(framing::AMQFrame& frame);
00113
00114
00115 void handleInLast(framing::AMQFrame& frame);
00116 void handleOutLast(framing::AMQFrame& frame);
00117
00118 Broker& broker;
00119 SessionHandler* handler;
00120 sys::AbsTime expiry;
00121 SemanticState semanticState;
00122 SessionAdapter adapter;
00123 MessageBuilder msgBuilder;
00124 IncompleteMessageList incomplete;
00125 IncompleteMessageList::CompletionListener enqueuedOp;
00126 qmf::org::apache::qpid::broker::Session* mgmtObject;
00127
00128 friend class SessionManager;
00129 };
00130
00131
00132 inline std::ostream& operator<<(std::ostream& out, const SessionState& session) {
00133 return out << session.getId();
00134 }
00135
00136 }}
00137
00138
00139
00140 #endif