00001 #ifndef QPID_CLUSTER_CLUSTER_H
00002 #define QPID_CLUSTER_CLUSTER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "Cpg.h"
00023
00024 #include "qpid/broker/Broker.h"
00025 #include "qpid/sys/Monitor.h"
00026 #include "qpid/sys/Runnable.h"
00027 #include "qpid/sys/Thread.h"
00028 #include "qpid/log/Logger.h"
00029 #include "qpid/Url.h"
00030
00031
00032 #include <boost/optional.hpp>
00033 #include <boost/function.hpp>
00034 #include <boost/intrusive_ptr.hpp>
00035
00036 #include <map>
00037 #include <vector>
00038
00039 namespace qpid { namespace cluster {
00040
00045 class Cluster : private sys::Runnable, private Cpg::Handler
00046 {
00047 public:
00049 struct Member {
00050 Member(const Url& url_=Url()) : url(url_) {}
00051 Url url;
00052 };
00053
00054 typedef std::vector<Member> MemberList;
00055
00061 Cluster(const std::string& name, const Url& url, broker::Broker&);
00062
00063 virtual ~Cluster();
00064
00065
00066 boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
00067
00069 MemberList getMembers() const;
00070
00072 size_t size() const;
00073
00074 bool empty() const { return size() == 0; }
00075
00081 bool wait(boost::function<bool(const Cluster&)> predicate,
00082 sys::Duration timeout=sys::TIME_INFINITE) const;
00083
00085 void send(framing::AMQFrame&, framing::FrameHandler*);
00086
00087 private:
00088 typedef Cpg::Id Id;
00089 typedef std::map<Id, Member> MemberMap;
00090
00091 void notify();
00092
00093 void deliver(
00094 cpg_handle_t ,
00095 struct cpg_name *group,
00096 uint32_t ,
00097 uint32_t ,
00098 void* ,
00099 int );
00100
00101 void configChange(
00102 cpg_handle_t ,
00103 struct cpg_name *,
00104 struct cpg_address *, int ,
00105 struct cpg_address *, int ,
00106 struct cpg_address *, int
00107 );
00108
00109 void run();
00110 void handleClusterFrame(Id from, framing::AMQFrame&);
00111
00112 mutable sys::Monitor lock;
00113 Cpg cpg;
00114 Cpg::Name name;
00115 Url url;
00116 Id self;
00117 MemberMap members;
00118 sys::Thread dispatcher;
00119 boost::function<void()> callback;
00120 boost::intrusive_ptr<broker::SessionManager::Observer> observer;
00121
00122 friend std::ostream& operator <<(std::ostream&, const Cluster&);
00123 friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
00124 friend std::ostream& operator <<(std::ostream&, const MemberMap&);
00125 };
00126
00127 }}
00128
00129
00130
00131 #endif