00001 #ifndef SOCKETPROXY_H
00002 #define SOCKETPROXY_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "qpid/sys/Socket.h"
00025 #include "qpid/sys/Poller.h"
00026 #include "qpid/sys/Runnable.h"
00027 #include "qpid/sys/Thread.h"
00028 #include "qpid/sys/Mutex.h"
00029 #include "qpid/client/Connection.h"
00030 #include "qpid/log/Statement.h"
00031
00032 #include <algorithm>
00033
00038 class SocketProxy : private qpid::sys::Runnable
00039 {
00040 public:
00044 SocketProxy(int connectPort, const std::string host="localhost")
00045 : closed(false), port(listener.listen()), dropClient(), dropServer()
00046 {
00047 client.connect(host, connectPort);
00048 thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
00049 }
00050
00051 ~SocketProxy() { close(); }
00052
00054 void close() {
00055 {
00056 qpid::sys::Mutex::ScopedLock l(lock);
00057 if (closed) return;
00058 closed=true;
00059 }
00060 poller.shutdown();
00061 if (thread.id() != qpid::sys::Thread::current().id())
00062 thread.join();
00063 client.close();
00064 }
00065
00067 void dropClientData(bool drop=true) { dropClient=drop; }
00068
00070 void dropServerData(bool drop=true) { dropServer=drop; }
00071
00072 bool isClosed() const {
00073 qpid::sys::Mutex::ScopedLock l(lock);
00074 return closed;
00075 }
00076
00077 uint16_t getPort() const { return port; }
00078
00079 private:
00080 static void throwErrno(const std::string& msg) {
00081 throw qpid::Exception(msg+":"+qpid::strError(errno));
00082 }
00083 static void throwIf(bool condition, const std::string& msg) {
00084 if (condition) throw qpid::Exception(msg);
00085 }
00086
00087 void run() {
00088 std::auto_ptr<qpid::sys::Socket> server;
00089 try {
00090 qpid::sys::PollerHandle listenerHandle(listener);
00091 poller.addFd(listenerHandle, qpid::sys::Poller::IN);
00092 qpid::sys::Poller::Event event = poller.wait();
00093 throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
00094 throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed");
00095
00096 poller.delFd(listenerHandle);
00097 server.reset(listener.accept(0, 0));
00098
00099
00100 qpid::sys::PollerHandle clientHandle(client);
00101 qpid::sys::PollerHandle serverHandle(*server);
00102 poller.addFd(clientHandle, qpid::sys::Poller::IN);
00103 poller.addFd(serverHandle, qpid::sys::Poller::IN);
00104 char buffer[1024];
00105 for (;;) {
00106 qpid::sys::Poller::Event event = poller.wait();
00107 throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
00108 throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected");
00109 if (event.handle == &serverHandle) {
00110 ssize_t n = server->read(buffer, sizeof(buffer));
00111 if (!dropServer) client.write(buffer, n);
00112 poller.rearmFd(serverHandle);
00113 } else if (event.handle == &clientHandle) {
00114 ssize_t n = client.read(buffer, sizeof(buffer));
00115 if (!dropClient) server->write(buffer, n);
00116 poller.rearmFd(clientHandle);
00117 } else {
00118 throwIf(true, "SocketProxy: No handle ready");
00119 }
00120 }
00121 }
00122 catch (const std::exception& e) {
00123 QPID_LOG(debug, "SocketProxy::run exception: " << e.what());
00124 }
00125 try {
00126 if (server.get()) server->close();
00127 close();
00128 }
00129 catch (const std::exception& e) {
00130 QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what());
00131 }
00132 }
00133
00134 mutable qpid::sys::Mutex lock;
00135 bool closed;
00136 qpid::sys::Poller poller;
00137 qpid::sys::Socket client, listener;
00138 uint16_t port;
00139 qpid::sys::Thread thread;
00140 bool dropClient, dropServer;
00141 };
00142
00143 #endif