00001 #ifndef _sys_AsynchIO
00002 #define _sys_AsynchIO
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "Dispatcher.h"
00025 #include "Socket.h"
00026
00027 #include <boost/function.hpp>
00028 #include <deque>
00029
00030 namespace qpid {
00031 namespace sys {
00032
00033
00034
00035
00036
00037 class AsynchAcceptor {
00038 public:
00039 typedef boost::function1<void, const Socket&> Callback;
00040
00041 private:
00042 Callback acceptedCallback;
00043 DispatchHandle handle;
00044 const Socket& socket;
00045
00046 public:
00047 AsynchAcceptor(const Socket& s, Callback callback);
00048 void start(Poller::shared_ptr poller);
00049
00050 private:
00051 void readable(DispatchHandle& handle);
00052 };
00053
00054
00055
00056
00057
00058 class AsynchConnector : private DispatchHandle {
00059 public:
00060 typedef boost::function1<void, const Socket&> ConnectedCallback;
00061 typedef boost::function2<void, int, std::string> FailedCallback;
00062
00063 private:
00064 ConnectedCallback connCallback;
00065 FailedCallback failCallback;
00066 const Socket& socket;
00067
00068 public:
00069 AsynchConnector(const Socket& socket,
00070 Poller::shared_ptr poller,
00071 std::string hostname,
00072 uint16_t port,
00073 ConnectedCallback connCb,
00074 FailedCallback failCb = 0);
00075
00076 private:
00077 void connComplete(DispatchHandle& handle);
00078 void failure(int, std::string);
00079 };
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 class AsynchIO : private DispatchHandle {
00094 public:
00095 struct BufferBase {
00096 char* const bytes;
00097 const int32_t byteCount;
00098 int32_t dataStart;
00099 int32_t dataCount;
00100
00101 BufferBase(char* const b, const int32_t s) :
00102 bytes(b),
00103 byteCount(s),
00104 dataStart(0),
00105 dataCount(0)
00106 {}
00107
00108 virtual ~BufferBase()
00109 {}
00110 };
00111
00112 typedef boost::function2<void, AsynchIO&, BufferBase*> ReadCallback;
00113 typedef boost::function1<void, AsynchIO&> EofCallback;
00114 typedef boost::function1<void, AsynchIO&> DisconnectCallback;
00115 typedef boost::function2<void, AsynchIO&, const Socket&> ClosedCallback;
00116 typedef boost::function1<void, AsynchIO&> BuffersEmptyCallback;
00117 typedef boost::function1<void, AsynchIO&> IdleCallback;
00118
00119 private:
00120 ReadCallback readCallback;
00121 EofCallback eofCallback;
00122 DisconnectCallback disCallback;
00123 ClosedCallback closedCallback;
00124 BuffersEmptyCallback emptyCallback;
00125 IdleCallback idleCallback;
00126 const Socket& socket;
00127 std::deque<BufferBase*> bufferQueue;
00128 std::deque<BufferBase*> writeQueue;
00129 bool queuedClose;
00136 volatile bool writePending;
00137
00138 public:
00139 AsynchIO(const Socket& s,
00140 ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
00141 ClosedCallback cCb = 0, BuffersEmptyCallback eCb = 0, IdleCallback iCb = 0);
00142 void queueForDeletion();
00143
00144 void start(Poller::shared_ptr poller);
00145 void queueReadBuffer(BufferBase* buff);
00146 void unread(BufferBase* buff);
00147 void queueWrite(BufferBase* buff);
00148 void notifyPendingWrite();
00149 void queueWriteClose();
00150 bool writeQueueEmpty() { return writeQueue.empty(); }
00151 BufferBase* getQueuedBuffer();
00152
00153 private:
00154 ~AsynchIO();
00155 void readable(DispatchHandle& handle);
00156 void writeable(DispatchHandle& handle);
00157 void disconnected(DispatchHandle& handle);
00158 void close(DispatchHandle& handle);
00159 };
00160
00161 }}
00162
00163 #endif // _sys_AsynchIO