00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef _SessionImpl_
00023 #define _SessionImpl_
00024
00025 #include "Demux.h"
00026 #include "Execution.h"
00027 #include "Results.h"
00028
00029 #include "qpid/shared_ptr.h"
00030 #include "qpid/framing/FrameHandler.h"
00031 #include "qpid/framing/ChannelHandler.h"
00032 #include "qpid/framing/SessionState.h"
00033 #include "qpid/framing/SequenceNumber.h"
00034 #include "qpid/framing/AMQP_ClientOperations.h"
00035 #include "qpid/framing/AMQP_ServerProxy.h"
00036 #include "qpid/sys/Semaphore.h"
00037 #include "qpid/sys/StateMonitor.h"
00038
00039 #include <boost/optional.hpp>
00040
00041 namespace qpid {
00042
00043 namespace framing {
00044
00045 class FrameSet;
00046 class MethodContent;
00047 class SequenceSet;
00048
00049 }
00050
00051 namespace client {
00052
00053 class Future;
00054 class ConnectionImpl;
00055
00056 class SessionImpl : public framing::FrameHandler::InOutHandler,
00057 public Execution,
00058 private framing::AMQP_ClientOperations::SessionHandler,
00059 private framing::AMQP_ClientOperations::ExecutionHandler
00060 {
00061 public:
00062 SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
00063 ~SessionImpl();
00064
00065
00066
00067 framing::FrameSet::shared_ptr get();
00068
00069 const framing::Uuid getId() const;
00070
00071 uint16_t getChannel() const;
00072 void setChannel(uint16_t channel);
00073
00074 void open(uint32_t detachedLifetime);
00075 void close();
00076 void resume(shared_ptr<ConnectionImpl>);
00077 void suspend();
00078
00079 void setSync(bool s);
00080 bool isSync();
00081 void assertOpen() const;
00082
00083 Future send(const framing::AMQBody& command);
00084 Future send(const framing::AMQBody& command, const framing::MethodContent& content);
00085
00086 Demux& getDemux();
00087 void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
00088 bool isComplete(const framing::SequenceNumber& id);
00089 bool isCompleteUpTo(const framing::SequenceNumber& id);
00090 void waitForCompletion(const framing::SequenceNumber& id);
00091 void sendCompletion();
00092 void sendFlush();
00093
00094
00095 void connectionClosed(uint16_t code, const std::string& text);
00096 void connectionBroke(uint16_t code, const std::string& text);
00097
00098 private:
00099 enum ErrorType {
00100 OK,
00101 CONNECTION_CLOSE,
00102 SESSION_DETACH,
00103 EXCEPTION
00104 };
00105 enum State {
00106 INACTIVE,
00107 ATTACHING,
00108 ATTACHED,
00109 DETACHING,
00110 DETACHED
00111 };
00112 typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
00113 typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
00114 typedef sys::StateMonitor<State, DETACHED> StateMonitor;
00115 typedef StateMonitor::Set States;
00116
00117 inline void setState(State s);
00118 inline void waitFor(State);
00119
00120 void detach();
00121
00122 void check() const;
00123 void checkOpen() const;
00124 void handleClosed();
00125
00126 void handleIn(framing::AMQFrame& frame);
00127 void handleOut(framing::AMQFrame& frame);
00128 void proxyOut(framing::AMQFrame& frame);
00129 void deliver(framing::AMQFrame& frame);
00130
00131 Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
00132 void sendContent(const framing::MethodContent&);
00133 void waitForCompletionImpl(const framing::SequenceNumber& id);
00134
00135 void sendCompletionImpl();
00136
00137
00138
00139 void attach(const std::string& name, bool force);
00140 void attached(const std::string& name);
00141 void detach(const std::string& name);
00142 void detached(const std::string& name, uint8_t detachCode);
00143 void requestTimeout(uint32_t timeout);
00144 void timeout(uint32_t timeout);
00145 void commandPoint(const framing::SequenceNumber& commandId, uint64_t commandOffset);
00146 void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
00147 void confirmed(const framing::SequenceSet& commands, const framing::Array& fragments);
00148 void completed(const framing::SequenceSet& commands, bool timelyReply);
00149 void knownCompleted(const framing::SequenceSet& commands);
00150 void flush(bool expected, bool confirmed, bool completed);
00151 void gap(const framing::SequenceSet& commands);
00152
00153
00154
00155 void sync();
00156 void result(const framing::SequenceNumber& commandId, const std::string& value);
00157 void exception(uint16_t errorCode,
00158 const framing::SequenceNumber& commandId,
00159 uint8_t classCode,
00160 uint8_t commandCode,
00161 uint8_t fieldIndex,
00162 const std::string& description,
00163 const framing::FieldTable& errorInfo);
00164
00165 ErrorType error;
00166 int code;
00167 std::string text;
00168 mutable StateMonitor state;
00169 mutable sys::Semaphore sendLock;
00170 volatile bool syncMode;
00171 uint32_t detachedLifetime;
00172 const uint64_t maxFrameSize;
00173 const framing::Uuid id;
00174 const std::string name;
00175
00176 shared_ptr<ConnectionImpl> connection;
00177 framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
00178 framing::ChannelHandler channel;
00179 framing::AMQP_ServerProxy::Session proxy;
00180
00181 Results results;
00182 Demux demux;
00183 framing::FrameSet::shared_ptr arriving;
00184
00185 framing::SequenceSet incompleteIn;
00186 framing::SequenceSet completedIn;
00187 framing::SequenceSet incompleteOut;
00188 framing::SequenceSet completedOut;
00189 framing::SequenceNumber nextIn;
00190 framing::SequenceNumber nextOut;
00191
00192 };
00193
00194 }}
00195
00196 #endif