00001 #ifndef _ManagementBroker_
00002 #define _ManagementBroker_
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "qpid/Options.h"
00026 #include "qpid/broker/Exchange.h"
00027 #include "qpid/broker/Timer.h"
00028 #include "qpid/framing/Uuid.h"
00029 #include "qpid/sys/Mutex.h"
00030 #include "ManagementAgent.h"
00031 #include "ManagementObject.h"
00032 #include "Manageable.h"
00033 #include "qpid/management/Agent.h"
00034 #include <qpid/framing/AMQFrame.h>
00035 #include <boost/shared_ptr.hpp>
00036
00037 namespace qpid {
00038 namespace management {
00039
00040 class ManagementBroker : public ManagementAgent
00041 {
00042 private:
00043
00044 ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker);
00045
00046 public:
00047
00048 virtual ~ManagementBroker ();
00049
00050 static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker);
00051 static shared_ptr getAgent (void);
00052 static void shutdown (void);
00053
00054 void setInterval (uint16_t _interval) { interval = _interval; }
00055 void setExchange (broker::Exchange::shared_ptr mgmtExchange,
00056 broker::Exchange::shared_ptr directExchange);
00057 void RegisterClass (std::string packageName,
00058 std::string className,
00059 uint8_t* md5Sum,
00060 ManagementObject::writeSchemaCall_t schemaCall);
00061 void addObject (ManagementObject::shared_ptr object,
00062 uint32_t persistId = 0,
00063 uint32_t persistBank = 4);
00064 void clientAdded (void);
00065 void dispatchCommand (broker::Deliverable& msg,
00066 const std::string& routingKey,
00067 const framing::FieldTable* args);
00068
00069 private:
00070 friend class ManagementAgent;
00071
00072 struct Periodic : public broker::TimerTask
00073 {
00074 ManagementBroker& broker;
00075
00076 Periodic (ManagementBroker& broker, uint32_t seconds);
00077 virtual ~Periodic ();
00078 void fire ();
00079 };
00080
00081
00082
00083
00084 struct RemoteAgent : public Manageable
00085 {
00086 uint32_t objIdBank;
00087 Agent::shared_ptr mgmtObject;
00088 ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
00089 virtual ~RemoteAgent ();
00090 };
00091
00092
00093
00094
00095 typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap;
00096 typedef std::vector<std::string> ReplyToVector;
00097
00098
00099
00100
00101
00102
00103
00104 struct SchemaClassKey
00105 {
00106 std::string name;
00107 uint8_t hash[16];
00108 };
00109
00110 struct SchemaClassKeyComp
00111 {
00112 bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
00113 {
00114 if (lhs.name != rhs.name)
00115 return lhs.name < rhs.name;
00116 else
00117 for (int i = 0; i < 16; i++)
00118 if (lhs.hash[i] != rhs.hash[i])
00119 return lhs.hash[i] < rhs.hash[i];
00120 return false;
00121 }
00122 };
00123
00124 struct SchemaClass
00125 {
00126 ManagementObject::writeSchemaCall_t writeSchemaCall;
00127 ReplyToVector remoteAgents;
00128 size_t bufferLen;
00129 uint8_t* buffer;
00130
00131 SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {}
00132 bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
00133 void appendSchema (framing::Buffer& buf);
00134 };
00135
00136 typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
00137 typedef std::map<std::string, ClassMap> PackageMap;
00138
00139 RemoteAgentMap remoteAgents;
00140 PackageMap packages;
00141 ManagementObjectMap managementObjects;
00142
00143 static shared_ptr agent;
00144 static bool enabled;
00145
00146 framing::Uuid uuid;
00147 sys::Mutex userLock;
00148 broker::Timer timer;
00149 broker::Exchange::shared_ptr mExchange;
00150 broker::Exchange::shared_ptr dExchange;
00151 std::string dataDir;
00152 uint16_t interval;
00153 Manageable* broker;
00154 uint16_t bootSequence;
00155 uint32_t localBank;
00156 uint32_t nextObjectId;
00157 uint32_t nextRemoteBank;
00158 bool clientWasAdded;
00159
00160 # define MA_BUFFER_SIZE 65536
00161 char inputBuffer[MA_BUFFER_SIZE];
00162 char outputBuffer[MA_BUFFER_SIZE];
00163
00164 void writeData ();
00165 void PeriodicProcessing (void);
00166 void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
00167 bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
00168 void SendBuffer (framing::Buffer& buf,
00169 uint32_t length,
00170 broker::Exchange::shared_ptr exchange,
00171 std::string routingKey);
00172
00173 void dispatchMethodLH (broker::Message& msg,
00174 const std::string& routingKey,
00175 size_t first);
00176 void dispatchAgentCommandLH (broker::Message& msg);
00177
00178 PackageMap::iterator FindOrAddPackage (std::string name);
00179 void AddClassLocal (PackageMap::iterator pIter,
00180 std::string className,
00181 uint8_t* md5Sum,
00182 ManagementObject::writeSchemaCall_t schemaCall);
00183 void EncodePackageIndication (framing::Buffer& buf,
00184 PackageMap::iterator pIter);
00185 void EncodeClassIndication (framing::Buffer& buf,
00186 PackageMap::iterator pIter,
00187 ClassMap::iterator cIter);
00188 bool bankInUse (uint32_t bank);
00189 uint32_t allocateNewBank ();
00190 uint32_t assignBankLH (uint32_t requestedPrefix);
00191 void sendCommandComplete (std::string replyToKey, uint32_t sequence,
00192 uint32_t code = 0, std::string text = std::string("OK"));
00193 void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00194 void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00195 void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00196 void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00197 void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00198 void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00199 void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
00200 };
00201
00202 }}
00203
00204 #endif