00001
00002
00003 #ifndef BASE_WORKPILE_H
00004 #define BASE_WORKPILE_H
00005
00006 #include <cerrno>
00007 #include <deque>
00008 #include <list>
00009 #include "Condition.h"
00010 #include "Thread.h"
00011
00012 namespace base {
00014
00018 template<class T> class WorkPile: public std::deque<T>, public Condition {
00019 public:
00020 typedef std::deque<T> DEQUE;
00021
00023
00029 WorkPile(unsigned wmlow = 0, unsigned wmhigh = 0) {
00030 _isblocked = false;
00031 _workers = 0;
00032 _wmlow = wmlow;
00033 _wmhigh = wmhigh ? wmhigh : wmlow;
00034 if (_wmhigh) {
00035 if (!_wmlow)
00036 _wmlow = 1;
00037 else if (_wmlow > _wmhigh)
00038 _wmlow = _wmhigh;
00039 _wmcond = new Condition(_mutex);
00040 } else
00041 _wmcond = NULL;
00042 }
00043
00045 ~WorkPile() {
00046 if (_wmcond)
00047 delete _wmcond;
00048 }
00049
00051
00055 T dequeue() {
00056 T ret;
00057
00058 lock();
00059 if (DEQUE::empty()) {
00060 unlock();
00061 throw IOException(IOException::errThread, ENOENT, "WorkPile is empty");
00062 } else {
00063 if (DEQUE::size() == _wmlow)
00064 _wmcond->signal();
00065 ret = DEQUE::front();
00066 DEQUE::pop_front();
00067 }
00068 unlock();
00069 return ret;
00070 }
00071
00073
00077 std::list<T> dequeue(unsigned count) {
00078 std::list<T> ret;
00079
00080 lock();
00081 if (count) {
00082 if (_wmlow && DEQUE::size() > _wmlow && DEQUE::size() < _wmlow + count)
00083 _wmcond->signal();
00084 while (count && !DEQUE::empty()) {
00085 ret.push_back(DEQUE::front());
00086 DEQUE::pop_front();
00087 count--;
00088 }
00089 } else {
00090 if (_wmlow)
00091 _wmcond->signal();
00092 while (!DEQUE::empty()) {
00093 ret.push_back(DEQUE::front());
00094 DEQUE::pop_front();
00095 }
00096 }
00097 unlock();
00098 return ret;
00099 }
00100
00102
00107 T dequeue(Thread *worker) {
00108 T ret;
00109
00110 lock();
00111 if (DEQUE::empty()) {
00112 _workers++;
00113 do
00114 wait();
00115 while (DEQUE::empty() && !worker->isCancelled());
00116 _workers--;
00117 }
00118 if (DEQUE::empty()) {
00119 unlock();
00120 throw IOException(IOException::errThread, ENOENT, "WorkPile is empty, the thread has received a cancellation request");
00121 } else {
00122 if (DEQUE::size() == _wmlow)
00123 _wmcond->signal();
00124 ret = DEQUE::front();
00125 DEQUE::pop_front();
00126 }
00127 unlock();
00128 return ret;
00129 }
00130
00132
00137 size_t enqueue(T t, bool block = true) {
00138 size_t ret;
00139
00140 lock();
00141 if (_isblocked || (_wmhigh && DEQUE::size() >= _wmhigh)) {
00142 if (!block) {
00143 unlock();
00144 throw IOException(IOException::errThread, ENOENT, "WorkPile is full");
00145 }
00146 _isblocked = true;
00147 while (DEQUE::size() >= _wmlow)
00148 _wmcond->wait();
00149 _isblocked = false;
00150 }
00151 push_back(t);
00152 if (_workers)
00153 signal();
00154 ret = DEQUE::size() > _workers ? DEQUE::size() - _workers : 0;
00155 unlock();
00156 return ret;
00157 }
00158
00160
00163 unsigned getWorkerCount() {
00164 unsigned ret;
00165
00166 lock();
00167 ret = _workers;
00168 unlock();
00169 return ret;
00170 }
00171
00173
00176 size_t size() {
00177 size_t ret;
00178
00179 lock();
00180 ret = DEQUE::size();
00181 unlock();
00182 return ret;
00183 }
00184
00185 private:
00186 bool _isblocked;
00187 unsigned _workers;
00188 Condition *_wmcond;
00189 unsigned _wmhigh;
00190 unsigned _wmlow;
00191
00192 DISALLOW_COPY_CONSTRUCTOR_AND_ASSIGNMENT(WorkPile);
00193 };
00194 }
00195
00196 #endif