00001 #ifndef _client_Channel_h
00002 #define _client_Channel_h
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <memory>
00025 #include <boost/scoped_ptr.hpp>
00026 #include "qpid/framing/amqp_framing.h"
00027 #include "qpid/framing/Uuid.h"
00028 #include "Exchange.h"
00029 #include "Message.h"
00030 #include "Queue.h"
00031 #include "ConnectionImpl.h"
00032 #include "qpid/client/Session.h"
00033 #include "qpid/Exception.h"
00034 #include "qpid/sys/Mutex.h"
00035 #include "qpid/sys/Runnable.h"
00036 #include "qpid/sys/Thread.h"
00037 #include "AckMode.h"
00038
00039 namespace qpid {
00040
00041 namespace framing {
00042 class ChannelCloseBody;
00043 class AMQMethodBody;
00044 }
00045
00046 namespace client {
00047
00048 class Connection;
00049 class MessageChannel;
00050 class MessageListener;
00051 class ReturnedMessageHandler;
00052
00061 class Channel : private sys::Runnable
00062 {
00063 private:
00064 struct Consumer{
00065 MessageListener* listener;
00066 AckMode ackMode;
00067 uint32_t count;
00068 };
00069 typedef std::map<std::string, Consumer> ConsumerMap;
00070
00071 mutable sys::Mutex lock;
00072 sys::Thread dispatcher;
00073
00074 uint32_t prefetch;
00075 const bool transactional;
00076 framing::ProtocolVersion version;
00077
00078 mutable sys::Mutex stopLock;
00079 bool running;
00080
00081 ConsumerMap consumers;
00082 Session session;
00083 framing::ChannelId channelId;
00084 sys::BlockingQueue<framing::FrameSet::shared_ptr> gets;
00085 framing::Uuid uniqueId;
00086 uint32_t nameCounter;
00087 bool active;
00088
00089 void stop();
00090
00091 void open(const Session& session);
00092 void closeInternal();
00093 void join();
00094
00095 void dispatch(framing::FrameSet& msg, const std::string& destination);
00096
00097 friend class Connection;
00098
00099 public:
00111 Channel(bool transactional = false, u_int16_t prefetch = 0);
00112
00113 ~Channel();
00114
00129 void declareExchange(Exchange& exchange, bool synch = true);
00138 void deleteExchange(Exchange& exchange, bool synch = true);
00147 void declareQueue(Queue& queue, bool synch = true);
00156 void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
00175 void bind(const Exchange& exchange, const Queue& queue,
00176 const std::string& key,
00177 const framing::FieldTable& args=framing::FieldTable(),
00178 bool synch = true);
00179
00192 void commit();
00193
00200 void rollback();
00201
00205 void setPrefetch(uint32_t prefetch);
00206
00207 uint32_t getPrefetch() { return prefetch; }
00208
00212 void start();
00213
00218 void close();
00219
00221 bool isTransactional() { return transactional; }
00222
00224 bool isOpen() const;
00225
00227 framing::ProtocolVersion getVersion() const { return version ; }
00228
00256 void consume(
00257 Queue& queue, const std::string& tag, MessageListener* listener,
00258 AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true,
00259 framing::FieldTable* fields = 0);
00260
00270 void cancel(const std::string& tag, bool synch = true);
00285 bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK);
00286
00304 void publish(Message& msg, const Exchange& exchange,
00305 const std::string& routingKey,
00306 bool mandatory = false, bool immediate = false);
00307
00311 void run();
00312 };
00313
00314 }}
00315
00316 #endif