00001 #ifndef SERIALIZER_H
00002 #define SERIALIZER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "qpid/Exception.h"
00027 #include "qpid/sys/Runnable.h"
00028 #include "qpid/sys/Monitor.h"
00029 #include "qpid/sys/Thread.h"
00030
00031 #include <boost/function.hpp>
00032 #include <boost/noncopyable.hpp>
00033
00034 #include <deque>
00035
00036 namespace qpid {
00037 namespace sys {
00038
00040 class SerializerBase : private boost::noncopyable, private Runnable
00041 {
00042 public:
00043 typedef boost::function<void()> VoidFn0;
00044 struct ShutdownException : public Exception {};
00045
00047 SerializerBase(bool immediate=true);
00048
00049 virtual ~SerializerBase() { shutdown(); }
00050
00051 virtual void dispatch() = 0;
00052 protected:
00053 enum State {
00054 IDLE,
00055 EXECUTING,
00056 DISPATCHING,
00057 SHUTDOWN
00058 };
00059
00060 void shutdown();
00061 void notifyWorker();
00062 void run();
00063 virtual bool empty() = 0;
00064 bool running();
00065 void wait();
00066
00067 Monitor lock;
00068 State state;
00069 bool immediate;
00070 Thread worker;
00071 };
00072
00073
00085 template <class Task>
00086 class Serializer : public SerializerBase {
00087
00088 std::deque<Task> queue;
00089
00090 bool empty() { return queue.empty(); }
00091 void dispatch(Task& task);
00092
00093 public:
00099 Serializer(bool immediate=true)
00100 : SerializerBase(immediate) {}
00101
00102 ~Serializer() { shutdown(); }
00109 void execute(Task& task);
00110
00111
00117 void dispatch();
00118 };
00119
00120
00121 template <class Task>
00122 void Serializer<Task>::execute(Task& task) {
00123 Mutex::ScopedLock l(lock);
00124 assert(state != SHUTDOWN);
00125 if (immediate && state == IDLE) {
00126 state = EXECUTING;
00127 dispatch(task);
00128 if (state != SHUTDOWN) {
00129 assert(state == EXECUTING);
00130 state = IDLE;
00131 }
00132 }
00133 else
00134 queue.push_back(task);
00135 if (!queue.empty() && state == IDLE) {
00136 state = DISPATCHING;
00137 notifyWorker();
00138 }
00139 }
00140
00141 template <class Task>
00142 void Serializer<Task>::dispatch() {
00143 Mutex::ScopedLock l(lock);
00144
00145
00146
00147
00148
00149 while (!queue.empty() && state != SHUTDOWN) {
00150 assert(state == DISPATCHING);
00151 dispatch(queue.front());
00152 queue.pop_front();
00153 }
00154 if (state != SHUTDOWN) {
00155 assert(state == DISPATCHING);
00156 state = IDLE;
00157 }
00158 }
00159
00160 template <class Task>
00161 void Serializer<Task>::dispatch(Task& task) {
00162
00163 assert(state != IDLE);
00164 assert(state != SHUTDOWN);
00165 assert(state == EXECUTING || state == DISPATCHING);
00166 Mutex::ScopedUnlock u(lock);
00167
00168 notifyWorker();
00169 try { task(); } catch (...) { assert(0); }
00170 }
00171
00172
00173
00174
00175 }}
00176
00177
00178
00179
00180
00181 #endif