00001 #ifndef QPID_SESSIONSTATE_H
00002 #define QPID_SESSIONSTATE_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <qpid/SessionId.h>
00026 #include <qpid/framing/SequenceNumber.h>
00027 #include <qpid/framing/SequenceSet.h>
00028 #include <qpid/framing/AMQFrame.h>
00029 #include <qpid/framing/FrameHandler.h>
00030 #include <boost/operators.hpp>
00031 #include <vector>
00032 #include <iosfwd>
00033
00034 namespace qpid {
00035 using framing::SequenceNumber;
00036 using framing::SequenceSet;
00037
00039 struct SessionPoint : boost::totally_ordered1<SessionPoint> {
00040 SessionPoint(SequenceNumber command_=0, uint64_t offset_ = 0) : command(command_), offset(offset_) {}
00041
00042 SequenceNumber command;
00043 uint64_t offset;
00044
00046 void advance(const framing::AMQFrame& f);
00047
00048 bool operator<(const SessionPoint&) const;
00049 bool operator==(const SessionPoint&) const;
00050 };
00051
00052 std::ostream& operator<<(std::ostream&, const SessionPoint&);
00053
00072 class SessionState {
00073 public:
00074
00078 class SendState {
00079 public:
00080 typedef std::vector<framing::AMQFrame> ReplayList;
00081
00083 void record(const framing::AMQFrame& f);
00084
00086 bool needFlush() const;
00087
00089 void recordFlush();
00090
00092 void confirmed(const SessionPoint& confirmed);
00093
00095 void completed(const SequenceSet& commands);
00096
00098 const SessionPoint& getReplayPoint() const { return replayPoint; }
00099
00101
00102 ReplayList& getReplayList() { return replayList; }
00103
00105 const SessionPoint& getCommandPoint();
00106
00108 const SequenceSet& getIncomplete() const { return incomplete; }
00109
00113 bool expected(const SessionPoint& expected);
00114
00115 private:
00116 SendState(SessionState& s);
00117
00118 SessionState* session;
00119
00120 SessionPoint replayPoint;
00121 SessionPoint flushPoint;
00122 SessionPoint sendPoint;
00123 ReplayList replayList;
00124 size_t unflushedSize;
00125 SequenceSet incomplete;
00126
00127 friend class SessionState;
00128 };
00129
00134 class ReceiveState {
00135 public:
00137 void setCommandPoint(const SessionPoint& point);
00138
00140 bool record(const framing::AMQFrame& f);
00141
00143 void completed(SequenceNumber command, bool cumulative=false);
00144
00146 void knownCompleted(const SequenceSet& commands);
00147
00149 const SessionPoint& getExpected() const { return expected; }
00150
00152 const SessionPoint& getReceived() const { return received; }
00153
00155 const SequenceSet& getUnknownComplete() const { return unknownCompleted; }
00156
00158 SequenceNumber getCurrent() const;
00159
00160 private:
00161 ReceiveState(SessionState&);
00162
00163 SessionState* session;
00164 SessionPoint expected;
00165 SessionPoint received;
00166 SequenceSet unknownCompleted;
00167 SequenceNumber firstIncomplete;
00168
00169 friend class SessionState;
00170 };
00171
00172 struct Configuration {
00173 Configuration();
00174 size_t replaySyncSize;
00175 size_t replayKillSize;
00176 };
00177
00178 SessionState(const SessionId& =SessionId(), const Configuration& =Configuration());
00179
00180 virtual ~SessionState();
00181
00182 const SessionId& getId() const { return id; }
00183 uint32_t getTimeout() const { return timeout; }
00184 void setTimeout(uint32_t seconds) { timeout = seconds; }
00185
00186 bool operator==(const SessionId& other) const { return id == other; }
00187 bool operator==(const SessionState& other) const { return id == other.id; }
00188
00189 SendState sender;
00190 ReceiveState receiver;
00191
00192 bool hasState() const;
00193
00194 private:
00195 SessionId id;
00196 uint32_t timeout;
00197 Configuration config;
00198 bool stateful;
00199
00200 friend class SendState;
00201 friend class ReceiveState;
00202 };
00203
00204 inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; }
00205
00206 }
00207
00208
00209 #endif