00001 #ifndef CPG_H
00002 #define CPG_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "qpid/cluster/types.h"
00023 #include "qpid/cluster/Dispatchable.h"
00024
00025 #include "qpid/Exception.h"
00026 #include "qpid/sys/IOHandle.h"
00027
00028 #include <boost/scoped_ptr.hpp>
00029
00030 #include <cassert>
00031
00032 #include <string.h>
00033
00034 extern "C" {
00035 #include <openais/cpg.h>
00036 }
00037
00038 namespace qpid {
00039 namespace cluster {
00040
00041
00049 class Cpg : public sys::IOHandle {
00050 public:
00051 struct Exception : public ::qpid::Exception {
00052 Exception(const std::string& msg) : ::qpid::Exception(msg) {}
00053 };
00054
00055 struct Name : public cpg_name {
00056 Name(const char* s) { copy(s, strlen(s)); }
00057 Name(const char* s, size_t n) { copy(s,n); }
00058 Name(const std::string& s) { copy(s.data(), s.size()); }
00059 void copy(const char* s, size_t n) {
00060 assert(n < CPG_MAX_NAME_LENGTH);
00061 memcpy(value, s, n);
00062 length=n;
00063 }
00064
00065 std::string str() const { return std::string(value, length); }
00066 };
00067
00068 static std::string str(const cpg_name& n) {
00069 return std::string(n.value, n.length);
00070 }
00071
00072 struct Handler {
00073 virtual ~Handler() {};
00074 virtual void deliver(
00075 cpg_handle_t ,
00076 struct cpg_name *group,
00077 uint32_t ,
00078 uint32_t ,
00079 void* ,
00080 int ) = 0;
00081
00082 virtual void configChange(
00083 cpg_handle_t ,
00084 struct cpg_name *,
00085 struct cpg_address *, int ,
00086 struct cpg_address *, int ,
00087 struct cpg_address *, int
00088 ) = 0;
00089 };
00090
00094 Cpg(Handler&);
00095
00097 ~Cpg();
00098
00100 void shutdown();
00101
00108 void dispatch(cpg_dispatch_t type) {
00109 check(cpg_dispatch(handle,type), "Error in CPG dispatch");
00110 }
00111
00112 void dispatchOne() { dispatch(CPG_DISPATCH_ONE); }
00113 void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
00114 void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
00115
00116 void join(const Name& group);
00117 void leave(const Name& group);
00118 void mcast(const Name& group, const iovec* iov, int iovLen);
00119
00120 cpg_handle_t getHandle() const { return handle; }
00121
00122 MemberId self() const;
00123
00124 int getFd();
00125
00126 private:
00127 static std::string errorStr(cpg_error_t err, const std::string& msg);
00128 static std::string cantJoinMsg(const Name&);
00129 static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
00130
00131 static void check(cpg_error_t result, const std::string& msg) {
00132 if (result != CPG_OK) throw Exception(errorStr(result, msg));
00133 }
00134
00135 static Cpg* cpgFromHandle(cpg_handle_t);
00136
00137 static void globalDeliver(
00138 cpg_handle_t handle,
00139 struct cpg_name *group,
00140 uint32_t nodeid,
00141 uint32_t pid,
00142 void* msg,
00143 int msg_len);
00144
00145 static void globalConfigChange(
00146 cpg_handle_t handle,
00147 struct cpg_name *group,
00148 struct cpg_address *members, int nMembers,
00149 struct cpg_address *left, int nLeft,
00150 struct cpg_address *joined, int nJoined
00151 );
00152
00153 bool isFlowControlEnabled();
00154 void waitForFlowControl();
00155
00156 cpg_handle_t handle;
00157 Handler& handler;
00158 bool isShutdown;
00159 };
00160
00161 inline bool operator==(const cpg_name& a, const cpg_name& b) {
00162 return a.length==b.length && strncmp(a.value, b.value, a.length) == 0;
00163 }
00164 inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); }
00165
00166 }}
00167
00168 #endif