00001 #ifndef CPG_H
00002 #define CPG_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "qpid/Exception.h"
00023 #include "qpid/cluster/Dispatchable.h"
00024
00025 #include <cassert>
00026 #include <string.h>
00027
00028 extern "C" {
00029 #include <openais/cpg.h>
00030 }
00031
00032 namespace qpid {
00033 namespace cluster {
00034
00040 class Cpg : public Dispatchable {
00041 public:
00042 struct Exception : public ::qpid::Exception {
00043 Exception(const std::string& msg) : ::qpid::Exception(msg) {}
00044 };
00045
00046 struct Name : public cpg_name {
00047 Name(const char* s) { copy(s, strlen(s)); }
00048 Name(const char* s, size_t n) { copy(s,n); }
00049 Name(const std::string& s) { copy(s.data(), s.size()); }
00050 void copy(const char* s, size_t n) {
00051 assert(n < CPG_MAX_NAME_LENGTH);
00052 memcpy(value, s, n);
00053 length=n;
00054 }
00055
00056 std::string str() const { return std::string(value, length); }
00057 };
00058
00059 struct Id {
00060 uint64_t id;
00061 Id(uint64_t n=0) : id(n) {}
00062 Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
00063 Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
00064
00065 operator uint64_t() const { return id; }
00066 uint32_t nodeId() const { return id >> 32; }
00067 pid_t pid() const { return id & 0xFFFF; }
00068 };
00069
00070 static std::string str(const cpg_name& n) {
00071 return std::string(n.value, n.length);
00072 }
00073
00074 struct Handler {
00075 virtual ~Handler() {};
00076 virtual void deliver(
00077 cpg_handle_t ,
00078 struct cpg_name *group,
00079 uint32_t ,
00080 uint32_t ,
00081 void* ,
00082 int ) = 0;
00083
00084 virtual void configChange(
00085 cpg_handle_t ,
00086 struct cpg_name *,
00087 struct cpg_address *, int ,
00088 struct cpg_address *, int ,
00089 struct cpg_address *, int
00090 ) = 0;
00091 };
00092
00096 Cpg(Handler&);
00097
00099 ~Cpg();
00100
00102 void shutdown();
00103
00110 void dispatch(cpg_dispatch_t type) {
00111 check(cpg_dispatch(handle,type), "Error in CPG dispatch");
00112 }
00113
00114 void dispatchOne() { dispatch(CPG_DISPATCH_ONE); }
00115 void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
00116 void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
00117
00118 void join(const Name& group) {
00119 check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
00120 };
00121
00122 void leave(const Name& group) {
00123 check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
00124 }
00125
00126 void mcast(const Name& group, const iovec* iov, int iovLen) {
00127 check(cpg_mcast_joined(
00128 handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen),
00129 cantMcastMsg(group));
00130 }
00131
00132 cpg_handle_t getHandle() const { return handle; }
00133
00134 private:
00135 class Handles;
00136 struct ClearHandleOnExit;
00137 friend class Handles;
00138 friend struct ClearHandleOnExit;
00139
00140 static std::string errorStr(cpg_error_t err, const std::string& msg);
00141 static std::string cantJoinMsg(const Name&);
00142 static std::string cantLeaveMsg(const Name&);
00143 static std::string cantMcastMsg(const Name&);
00144
00145 static void check(cpg_error_t result, const std::string& msg) {
00146
00147 if (result != CPG_OK)
00148 throw Exception(errorStr(result, msg));
00149 }
00150
00151 static void globalDeliver(
00152 cpg_handle_t ,
00153 struct cpg_name *group,
00154 uint32_t ,
00155 uint32_t ,
00156 void* ,
00157 int );
00158
00159 static void globalConfigChange(
00160 cpg_handle_t ,
00161 struct cpg_name *,
00162 struct cpg_address *, int ,
00163 struct cpg_address *, int ,
00164 struct cpg_address *, int
00165 );
00166
00167 static Handles handles;
00168 cpg_handle_t handle;
00169 Handler& handler;
00170 };
00171
00172 std::ostream& operator <<(std::ostream& out, const cpg_name& name);
00173 std::ostream& operator <<(std::ostream& out, const Cpg::Id& id);
00174 std::ostream& operator <<(std::ostream& out, const std::pair<cpg_address*,int> addresses);
00175
00176 inline bool operator==(const cpg_name& a, const cpg_name& b) {
00177 return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
00178 }
00179 inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }
00180
00181 }}
00182
00183
00184
00185 #endif