00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef _Connector_
00022 #define _Connector_
00023
00024
00025 #include "qpid/framing/InputHandler.h"
00026 #include "qpid/framing/OutputHandler.h"
00027 #include "qpid/framing/InitiationHandler.h"
00028 #include "qpid/framing/ProtocolInitiation.h"
00029 #include "qpid/framing/ProtocolVersion.h"
00030 #include "qpid/sys/ShutdownHandler.h"
00031 #include "qpid/sys/TimeoutHandler.h"
00032 #include "qpid/sys/Thread.h"
00033 #include "qpid/sys/Runnable.h"
00034 #include "qpid/sys/Mutex.h"
00035 #include "qpid/sys/Socket.h"
00036 #include "qpid/sys/Time.h"
00037 #include "qpid/sys/AsynchIO.h"
00038
00039 #include <queue>
00040 #include <boost/shared_ptr.hpp>
00041
00042 namespace qpid {
00043
00044 namespace client {
00045
00046 class Bounds;
00047 class ConnectionSettings;
00048
00049 class Connector : public framing::OutputHandler,
00050 private sys::Runnable
00051 {
00052 struct Buff;
00053
00055 class Writer : public framing::FrameHandler {
00056 typedef sys::AsynchIO::BufferBase BufferBase;
00057 typedef std::vector<framing::AMQFrame> Frames;
00058
00059 const uint16_t maxFrameSize;
00060 sys::Mutex lock;
00061 sys::AsynchIO* aio;
00062 BufferBase* buffer;
00063 Frames frames;
00064 size_t lastEof;
00065 framing::Buffer encode;
00066 size_t framesEncoded;
00067 std::string identifier;
00068 Bounds* bounds;
00069
00070 void writeOne(const sys::Mutex::ScopedLock&);
00071 void newBuffer(const sys::Mutex::ScopedLock&);
00072
00073 public:
00074
00075 Writer(uint16_t maxFrameSize, Bounds*);
00076 ~Writer();
00077 void init(std::string id, sys::AsynchIO*);
00078 void handle(framing::AMQFrame&);
00079 void write(sys::AsynchIO&);
00080 };
00081
00082 const uint16_t maxFrameSize;
00083 framing::ProtocolVersion version;
00084 bool initiated;
00085
00086 sys::Mutex closedLock;
00087 bool closed;
00088 bool joined;
00089
00090 sys::AbsTime lastIn;
00091 sys::AbsTime lastOut;
00092 sys::Duration timeout;
00093 sys::Duration idleIn;
00094 sys::Duration idleOut;
00095
00096 sys::TimeoutHandler* timeoutHandler;
00097 sys::ShutdownHandler* shutdownHandler;
00098 framing::InputHandler* input;
00099 framing::InitiationHandler* initialiser;
00100 framing::OutputHandler* output;
00101
00102 Writer writer;
00103
00104 sys::Thread receiver;
00105
00106 sys::Socket socket;
00107
00108 sys::AsynchIO* aio;
00109 sys::Poller::shared_ptr poller;
00110
00111 void checkIdle(ssize_t status);
00112 void setSocketTimeout();
00113
00114 void run();
00115 void handleClosed();
00116 bool closeInternal();
00117
00118 void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*);
00119 void writebuff(qpid::sys::AsynchIO&);
00120 void writeDataBlock(const framing::AMQDataBlock& data);
00121 void eof(qpid::sys::AsynchIO&);
00122
00123 std::string identifier;
00124
00125 friend class Channel;
00126
00127 public:
00128 Connector(framing::ProtocolVersion pVersion,
00129 const ConnectionSettings&,
00130 Bounds* bounds = 0);
00131 virtual ~Connector();
00132 virtual void connect(const std::string& host, int port);
00133 virtual void init();
00134 virtual void close();
00135 virtual void setInputHandler(framing::InputHandler* handler);
00136 virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
00137 virtual void setShutdownHandler(sys::ShutdownHandler* handler);
00138 virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
00139 virtual framing::OutputHandler* getOutputHandler();
00140 virtual void send(framing::AMQFrame& frame);
00141 virtual void setReadTimeout(uint16_t timeout);
00142 virtual void setWriteTimeout(uint16_t timeout);
00143 const std::string& getIdentifier() const { return identifier; }
00144 };
00145
00146 }}
00147
00148
00149 #endif