WorkPile.h

00001 // $Id: WorkPile.h 21 2010-09-05 04:18:17Z cschwarz1 $
00002 
00003 #ifndef BASE_WORKPILE_H
00004 #define BASE_WORKPILE_H
00005 
00006 #include <cerrno>
00007 #include <deque>
00008 #include <list>
00009 #include "Condition.h"
00010 #include "Thread.h"
00011 
00012 namespace base {
00014 
00018     template<class T> class WorkPile: public std::deque<T>, public Condition {
00019     public:
00020         typedef std::deque<T> DEQUE;   
00021 
00023 
00029         WorkPile(unsigned wmlow = 0, unsigned wmhigh = 0) {
00030             _isblocked = false;
00031             _workers = 0;
00032             _wmlow = wmlow;
00033             _wmhigh = wmhigh ? wmhigh : wmlow;
00034             if (_wmhigh) {
00035                 if (!_wmlow)
00036                     _wmlow = 1;
00037                 else if (_wmlow > _wmhigh)
00038                     _wmlow = _wmhigh;
00039                 _wmcond = new Condition(_mutex);
00040             } else
00041                 _wmcond = NULL;
00042         }
00043 
00045         ~WorkPile() {
00046             if (_wmcond)
00047                 delete _wmcond;
00048         }
00049 
00051 
00055         T dequeue() {
00056             T ret;
00057 
00058             lock();
00059             if (DEQUE::empty()) {
00060                 unlock();
00061                 throw IOException(IOException::errThread, ENOENT, "WorkPile is empty");
00062             } else {
00063                 if (DEQUE::size() == _wmlow)
00064                     _wmcond->signal();
00065                 ret = DEQUE::front();
00066                 DEQUE::pop_front();
00067             }
00068             unlock();
00069             return ret;
00070         }
00071 
00073 
00077         std::list<T> dequeue(unsigned count) {
00078             std::list<T> ret;
00079 
00080             lock();
00081             if (count) {
00082                 if (_wmlow && DEQUE::size() > _wmlow && DEQUE::size() < _wmlow + count)
00083                     _wmcond->signal();
00084                 while (count && !DEQUE::empty()) {
00085                     ret.push_back(DEQUE::front());
00086                     DEQUE::pop_front();
00087                     count--;
00088                 }
00089             } else {
00090                 if (_wmlow)
00091                     _wmcond->signal();
00092                 while (!DEQUE::empty()) {
00093                     ret.push_back(DEQUE::front());
00094                     DEQUE::pop_front();
00095                 }
00096             }
00097             unlock();
00098             return ret;
00099         }
00100 
00102 
00107         T dequeue(Thread *worker) {
00108             T ret;
00109 
00110             lock();
00111             if (DEQUE::empty()) {
00112                 _workers++;
00113                 do
00114                     wait();
00115                 while (DEQUE::empty() && !worker->isCancelled());
00116                 _workers--;
00117             }
00118             if (DEQUE::empty()) {
00119                 unlock();
00120                 throw IOException(IOException::errThread, ENOENT, "WorkPile is empty, the thread has received a cancellation request");
00121             } else {
00122                 if (DEQUE::size() == _wmlow)
00123                     _wmcond->signal();
00124                 ret = DEQUE::front();
00125                 DEQUE::pop_front();
00126             }
00127             unlock();
00128             return ret;
00129         }
00130 
00132 
00137         size_t enqueue(T t, bool block = true) {
00138             size_t ret;
00139 
00140             lock();
00141             if (_isblocked || (_wmhigh && DEQUE::size() >= _wmhigh)) {
00142                 if (!block) {
00143                     unlock();
00144                     throw IOException(IOException::errThread, ENOENT, "WorkPile is full");
00145                 }
00146                 _isblocked = true;
00147                 while (DEQUE::size() >= _wmlow)
00148                     _wmcond->wait();
00149                 _isblocked = false;
00150             }
00151             push_back(t);
00152             if (_workers)
00153                 signal();
00154             ret = DEQUE::size() > _workers ? DEQUE::size() - _workers : 0;
00155             unlock();
00156             return ret;
00157         }
00158 
00160 
00163         unsigned getWorkerCount() {
00164             unsigned ret;
00165 
00166             lock();
00167             ret = _workers;
00168             unlock();
00169             return ret;
00170         }
00171 
00173 
00176         size_t size() {
00177             size_t ret;
00178 
00179             lock();
00180             ret = DEQUE::size();
00181             unlock();
00182             return ret;
00183         }
00184 
00185     private:
00186         bool       _isblocked; 
00187         unsigned   _workers;   
00188         Condition *_wmcond;    
00189         unsigned   _wmhigh;    
00190         unsigned   _wmlow;     
00191 
00192         DISALLOW_COPY_CONSTRUCTOR_AND_ASSIGNMENT(WorkPile);
00193     };
00194 }
00195 
00196 #endif