00001 #ifndef Rdma_Acceptor_h 00002 #define Rdma_Acceptor_h 00003 00004 #include "rdma_wrap.h" 00005 00006 #include "qpid/sys/Dispatcher.h" 00007 #include "qpid/sys/Mutex.h" 00008 00009 #include <netinet/in.h> 00010 00011 #include <boost/function.hpp> 00012 #include <boost/ptr_container/ptr_deque.hpp> 00013 #include <deque> 00014 00015 using qpid::sys::DispatchHandle; 00016 using qpid::sys::Poller; 00017 00018 namespace Rdma { 00019 00020 class Connection; 00021 00022 typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ConnectedCallback; 00023 typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> ErrorCallback; 00024 typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback; 00025 typedef boost::function1<bool, Rdma::Connection::intrusive_ptr&> ConnectionRequestCallback; 00026 typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> RejectedCallback; 00027 00028 class AsynchIO 00029 { 00030 typedef boost::function1<void, AsynchIO&> ErrorCallback; 00031 typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; 00032 typedef boost::function1<void, AsynchIO&> IdleCallback; 00033 typedef boost::function1<void, AsynchIO&> FullCallback; 00034 00035 QueuePair::intrusive_ptr qp; 00036 DispatchHandle dataHandle; 00037 int bufferSize; 00038 int recvBufferCount; 00039 int xmitBufferCount; 00040 int outstandingWrites; 00041 std::deque<Buffer*> bufferQueue; 00042 qpid::sys::Mutex bufferQueueLock; 00043 boost::ptr_deque<Buffer> buffers; 00044 00045 ReadCallback readCallback; 00046 IdleCallback idleCallback; 00047 FullCallback fullCallback; 00048 ErrorCallback errorCallback; 00049 00050 public: 00051 AsynchIO( 00052 QueuePair::intrusive_ptr q, 00053 int s, 00054 ReadCallback rc, 00055 IdleCallback ic, 00056 FullCallback fc, 00057 ErrorCallback ec 00058 ); 00059 ~AsynchIO(); 00060 00061 void start(Poller::shared_ptr poller); 00062 void queueWrite(Buffer* buff); 00063 void notifyPendingWrite(); 00064 void queueWriteClose(); 00065 Buffer* getBuffer(); 00066 00067 private: 00068 void dataEvent(DispatchHandle& handle); 00069 }; 00070 00071 class Listener 00072 { 00073 sockaddr src_addr; 00074 Connection::intrusive_ptr ci; 00075 DispatchHandle handle; 00076 ConnectedCallback connectedCallback; 00077 ErrorCallback errorCallback; 00078 DisconnectedCallback disconnectedCallback; 00079 ConnectionRequestCallback connectionRequestCallback; 00080 00081 public: 00082 Listener( 00083 const sockaddr& src, 00084 ConnectedCallback cc, 00085 ErrorCallback errc, 00086 DisconnectedCallback dc, 00087 ConnectionRequestCallback crc = 0 00088 ); 00089 void start(Poller::shared_ptr poller); 00090 00091 private: 00092 void connectionEvent(DispatchHandle& handle); 00093 }; 00094 00095 class Connector 00096 { 00097 sockaddr dst_addr; 00098 Connection::intrusive_ptr ci; 00099 DispatchHandle handle; 00100 ConnectedCallback connectedCallback; 00101 ErrorCallback errorCallback; 00102 DisconnectedCallback disconnectedCallback; 00103 RejectedCallback rejectedCallback; 00104 00105 public: 00106 Connector( 00107 const sockaddr& dst, 00108 ConnectedCallback cc, 00109 ErrorCallback errc, 00110 DisconnectedCallback dc, 00111 RejectedCallback rc = 0 00112 ); 00113 void start(Poller::shared_ptr poller); 00114 00115 private: 00116 void connectionEvent(DispatchHandle& handle); 00117 }; 00118 } 00119 00120 #endif // Rdma_Acceptor_h