00001
00002 #ifndef _MANAGEMENT_CONSUMER_
00003 #define _MANAGEMENT_CONSUMER_
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "qpid/management/ManagementObject.h"
00028 #include "qpid/framing/FieldTable.h"
00029 #include "qpid/framing/Uuid.h"
00030
00031 namespace qpid {
00032 namespace management {
00033
00034 class Consumer : public ManagementObject
00035 {
00036 private:
00037
00038 static std::string packageName;
00039 static std::string className;
00040 static uint8_t md5Sum[16];
00041
00042
00043 uint64_t destinationRef;
00044 uint64_t queueRef;
00045
00046
00047 uint64_t msgsConsumed;
00048 uint64_t bytesConsumed;
00049 uint32_t unackedMessages;
00050 uint32_t unackedMessagesHigh;
00051 uint32_t unackedMessagesLow;
00052
00053
00054 static void writeSchema (qpid::framing::Buffer& buf);
00055 void writeConfig (qpid::framing::Buffer& buf);
00056 void writeInstrumentation (qpid::framing::Buffer& buf,
00057 bool skipHeaders = false);
00058 void doMethod (std::string methodName,
00059 qpid::framing::Buffer& inBuf,
00060 qpid::framing::Buffer& outBuf);
00061 writeSchemaCall_t getWriteSchemaCall (void) { return writeSchema; }
00062
00063
00064 public:
00065
00066 friend class PackageQpid;
00067 typedef boost::shared_ptr<Consumer> shared_ptr;
00068
00069 Consumer (Manageable* coreObject, uint64_t _destinationRef, uint64_t _queueRef);
00070 ~Consumer (void);
00071
00072 std::string getPackageName (void) { return packageName; }
00073 std::string getClassName (void) { return className; }
00074 uint8_t* getMd5Sum (void) { return md5Sum; }
00075
00076
00077 static const uint32_t METHOD_CLOSE = 1;
00078
00079
00080 inline void inc_msgsConsumed (uint64_t by = 1){
00081 sys::RWlock::ScopedWlock writeLock (accessLock);
00082 msgsConsumed += by;
00083 instChanged = true;
00084 }
00085 inline void dec_msgsConsumed (uint64_t by = 1){
00086 sys::RWlock::ScopedWlock writeLock (accessLock);
00087 msgsConsumed -= by;
00088 instChanged = true;
00089 }
00090 inline void set_msgsConsumed (uint64_t val){
00091 sys::RWlock::ScopedWlock writeLock (accessLock);
00092 msgsConsumed = val;
00093 instChanged = true;
00094 }
00095 inline void inc_bytesConsumed (uint64_t by = 1){
00096 sys::RWlock::ScopedWlock writeLock (accessLock);
00097 bytesConsumed += by;
00098 instChanged = true;
00099 }
00100 inline void dec_bytesConsumed (uint64_t by = 1){
00101 sys::RWlock::ScopedWlock writeLock (accessLock);
00102 bytesConsumed -= by;
00103 instChanged = true;
00104 }
00105 inline void set_bytesConsumed (uint64_t val){
00106 sys::RWlock::ScopedWlock writeLock (accessLock);
00107 bytesConsumed = val;
00108 instChanged = true;
00109 }
00110 inline void inc_unackedMessages (uint32_t by = 1){
00111 sys::RWlock::ScopedWlock writeLock (accessLock);
00112 unackedMessages += by;
00113 if (unackedMessagesHigh < unackedMessages)
00114 unackedMessagesHigh = unackedMessages;
00115 instChanged = true;
00116 }
00117 inline void dec_unackedMessages (uint32_t by = 1){
00118 sys::RWlock::ScopedWlock writeLock (accessLock);
00119 unackedMessages -= by;
00120 if (unackedMessagesLow > unackedMessages)
00121 unackedMessagesLow = unackedMessages;
00122 instChanged = true;
00123 }
00124 inline void set_unackedMessages (uint32_t val){
00125 sys::RWlock::ScopedWlock writeLock (accessLock);
00126 unackedMessages = val;
00127 if (unackedMessagesLow > val)
00128 unackedMessagesLow = val;
00129 if (unackedMessagesHigh < val)
00130 unackedMessagesHigh = val;
00131 instChanged = true;
00132 }
00133
00134 };
00135
00136 }}
00137
00138
00139 #endif