00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _ConnectionHandler_
00022 #define _ConnectionHandler_
00023
00024 #include "ChainableFrameHandler.h"
00025 #include "ConnectionSettings.h"
00026 #include "StateManager.h"
00027 #include "qpid/framing/AMQMethodBody.h"
00028 #include "qpid/framing/AMQP_HighestVersion.h"
00029 #include "qpid/framing/AMQP_ClientOperations.h"
00030 #include "qpid/framing/AMQP_ServerProxy.h"
00031 #include "qpid/framing/Array.h"
00032 #include "qpid/framing/FieldTable.h"
00033 #include "qpid/framing/FrameHandler.h"
00034 #include "qpid/framing/InputHandler.h"
00035 #include "qpid/Url.h"
00036
00037 namespace qpid {
00038 namespace client {
00039
00040 class ConnectionHandler : private StateManager,
00041 public ConnectionSettings,
00042 public ChainableFrameHandler,
00043 public framing::InputHandler,
00044 private framing::AMQP_ClientOperations::ConnectionHandler
00045 {
00046 typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations;
00047 enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
00048 std::set<int> ESTABLISHED, FINISHED;
00049
00050 class Adapter : public framing::FrameHandler
00051 {
00052 ConnectionHandler& handler;
00053 public:
00054 Adapter(ConnectionHandler& h) : handler(h) {}
00055 void handle(framing::AMQFrame& f) { handler.out(f); }
00056 };
00057
00058 Adapter outHandler;
00059 framing::AMQP_ServerProxy::Connection proxy;
00060 uint16_t errorCode;
00061 std::string errorText;
00062 bool insist;
00063 framing::ProtocolVersion version;
00064 framing::Array capabilities;
00065 framing::FieldTable properties;
00066
00067 void checkState(STATES s, const std::string& msg);
00068
00069
00070 void start(const framing::FieldTable& serverProperties,
00071 const framing::Array& mechanisms,
00072 const framing::Array& locales);
00073 void secure(const std::string& challenge);
00074 void tune(uint16_t channelMax,
00075 uint16_t frameMax,
00076 uint16_t heartbeatMin,
00077 uint16_t heartbeatMax);
00078 void openOk(const framing::Array& knownHosts);
00079 void redirect(const std::string& host,
00080 const framing::Array& knownHosts);
00081 void close(uint16_t replyCode, const std::string& replyText);
00082 void closeOk();
00083
00084 public:
00085 using InputHandler::handle;
00086 typedef boost::function<void()> CloseListener;
00087 typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
00088
00089 ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&);
00090
00091 void received(framing::AMQFrame& f) { incoming(f); }
00092
00093 void incoming(framing::AMQFrame& frame);
00094 void outgoing(framing::AMQFrame& frame);
00095
00096 void waitForOpen();
00097 void close();
00098 void fail(const std::string& message);
00099
00100
00101 bool isOpen() const;
00102 bool isClosed() const;
00103 bool isClosing() const;
00104
00105 CloseListener onClose;
00106 ErrorListener onError;
00107
00108 std::vector<Url> knownBrokersUrls;
00109 };
00110
00111 }}
00112
00113 #endif