00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _ConnectionHandler_
00022 #define _ConnectionHandler_
00023
00024 #include "ChainableFrameHandler.h"
00025 #include "ConnectionSettings.h"
00026 #include "StateManager.h"
00027 #include "qpid/framing/AMQMethodBody.h"
00028 #include "qpid/framing/AMQP_HighestVersion.h"
00029 #include "qpid/framing/AMQP_ClientOperations.h"
00030 #include "qpid/framing/AMQP_ServerProxy.h"
00031 #include "qpid/framing/Array.h"
00032 #include "qpid/framing/FieldTable.h"
00033 #include "qpid/framing/FrameHandler.h"
00034 #include "qpid/framing/InputHandler.h"
00035
00036 namespace qpid {
00037 namespace client {
00038
00039 class ConnectionHandler : private StateManager,
00040 public ConnectionSettings,
00041 public ChainableFrameHandler,
00042 public framing::InputHandler,
00043 private framing::AMQP_ClientOperations::ConnectionHandler
00044 {
00045 typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations;
00046 enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
00047 std::set<int> ESTABLISHED;
00048
00049 class Adapter : public framing::FrameHandler
00050 {
00051 ConnectionHandler& handler;
00052 public:
00053 Adapter(ConnectionHandler& h) : handler(h) {}
00054 void handle(framing::AMQFrame& f) { handler.out(f); }
00055 };
00056
00057 Adapter outHandler;
00058 framing::AMQP_ServerProxy::Connection proxy;
00059 uint16_t errorCode;
00060 std::string errorText;
00061 bool insist;
00062 framing::ProtocolVersion version;
00063 framing::Array capabilities;
00064 framing::FieldTable properties;
00065
00066 void checkState(STATES s, const std::string& msg);
00067
00068
00069 void start(const framing::FieldTable& serverProperties,
00070 const framing::Array& mechanisms,
00071 const framing::Array& locales);
00072 void secure(const std::string& challenge);
00073 void tune(uint16_t channelMax,
00074 uint16_t frameMax,
00075 uint16_t heartbeatMin,
00076 uint16_t heartbeatMax);
00077 void openOk(const framing::Array& knownHosts);
00078 void redirect(const std::string& host,
00079 const framing::Array& knownHosts);
00080 void close(uint16_t replyCode, const std::string& replyText);
00081 void closeOk();
00082
00083 public:
00084 using InputHandler::handle;
00085 typedef boost::function<void()> CloseListener;
00086 typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
00087
00088 ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&);
00089
00090 void received(framing::AMQFrame& f) { incoming(f); }
00091
00092 void incoming(framing::AMQFrame& frame);
00093 void outgoing(framing::AMQFrame& frame);
00094
00095 void waitForOpen();
00096 void close();
00097 void fail(const std::string& message);
00098
00099 CloseListener onClose;
00100 ErrorListener onError;
00101 };
00102
00103 }}
00104
00105 #endif