00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _Connection_
00022 #define _Connection_
00023
00024 #include <memory>
00025 #include <sstream>
00026 #include <vector>
00027
00028 #include <boost/ptr_container/ptr_map.hpp>
00029
00030 #include "qpid/framing/AMQFrame.h"
00031 #include "qpid/framing/AMQP_ServerOperations.h"
00032 #include "qpid/framing/AMQP_ClientProxy.h"
00033 #include "qpid/sys/AggregateOutput.h"
00034 #include "qpid/sys/ConnectionOutputHandler.h"
00035 #include "qpid/sys/ConnectionInputHandler.h"
00036 #include "qpid/sys/TimeoutHandler.h"
00037 #include "qpid/framing/ProtocolVersion.h"
00038 #include "Broker.h"
00039 #include "qpid/sys/Socket.h"
00040 #include "qpid/Exception.h"
00041 #include "ConnectionHandler.h"
00042 #include "ConnectionState.h"
00043 #include "SessionHandler.h"
00044 #include "qpid/management/Manageable.h"
00045 #include "qpid/management/Client.h"
00046 #include "qpid/management/Link.h"
00047
00048 #include <boost/ptr_container/ptr_map.hpp>
00049
00050 namespace qpid {
00051 namespace broker {
00052
00053 class Connection : public sys::ConnectionInputHandler,
00054 public ConnectionState
00055 {
00056 public:
00057 typedef boost::shared_ptr<Connection> shared_ptr;
00058 Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
00059 ~Connection ();
00060
00062 SessionHandler& getChannel(framing::ChannelId channel);
00063
00065 void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
00066
00067
00068 void received(framing::AMQFrame& frame);
00069 void idleOut();
00070 void idleIn();
00071 void closed();
00072 bool doOutput();
00073
00074 void closeChannel(framing::ChannelId channel);
00075
00076
00077 management::ManagementObject::shared_ptr GetManagementObject (void) const;
00078 management::Manageable::status_t
00079 ManagementMethod (uint32_t methodId, management::Args& args);
00080
00081 void initMgmt(bool asLink = false);
00082 void requestIOProcessing (boost::function0<void>);
00083
00084 private:
00085 typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
00086 typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
00087
00094 class MgmtWrapper
00095 {
00096 public:
00097 virtual ~MgmtWrapper(){}
00098 virtual void received(framing::AMQFrame& frame) = 0;
00099 virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
00100 virtual void closing() = 0;
00101 virtual void processPending(){}
00102 virtual void process(Connection&, const management::Args&){}
00103 };
00104 class MgmtClient;
00105
00106 ChannelMap channels;
00107 framing::AMQP_ClientProxy::Connection* client;
00108 ConnectionHandler adapter;
00109 std::auto_ptr<MgmtWrapper> mgmtWrapper;
00110 bool mgmtClosing;
00111 const std::string mgmtId;
00112 boost::function0<void> ioCallback;
00113 };
00114
00115 }}
00116
00117 #endif