AtomicQueue.hpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_QUEUE_LOCK_FREE_HPP
00040 #define ORO_QUEUE_LOCK_FREE_HPP
00041
00042 #include <vector>
00043 #include "os/oro_atomic.h"
00044 #include "os/CAS.hpp"
00045 #include "BufferPolicy.hpp"
00046
00047 namespace RTT
00048 {
00065 template< class T, class ReadPolicy = NonBlockingPolicy, class WritePolicy = NonBlockingPolicy>
00066 class AtomicQueue
00067 {
00068 public:
00074 const unsigned int MAX_THREADS;
00075
00076 typedef T value_t;
00077 private:
00078 typedef std::vector<value_t> BufferType;
00079 typedef typename BufferType::iterator Iterator;
00080 typedef typename BufferType::const_iterator CIterator;
00081 struct Item {
00082 Item() {
00083
00084 oro_atomic_set(&count,-1);
00085 }
00086 mutable oro_atomic_t count;
00087 BufferType data;
00088 };
00089
00090 struct StorageImpl
00091 {
00092 Item* items;
00093 StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00094 }
00095 ~StorageImpl() {
00096 delete[] items;
00097 }
00098 Item& operator[](int i) {
00099 return items[i];
00100 }
00101 };
00102
00106 typedef StorageImpl* Storage;
00107
00108 Storage newStorage(size_t alloc, size_t items, bool init = true)
00109 {
00110 Storage st( new StorageImpl(alloc) );
00111 for (unsigned int i=0; i < alloc; ++i) {
00112 (*st)[i].data.reserve( items );
00113 }
00114
00115 if (init) {
00116 active = &(*st)[0];
00117 oro_atomic_inc( &active->count );
00118 }
00119
00120 return st;
00121 }
00122
00123 Storage bufs;
00124 Item* volatile active;
00125
00126
00127
00128
00129 inline size_t BufNum() const {
00130 return MAX_THREADS * 2;
00131 }
00132
00133 WritePolicy write_policy;
00134 ReadPolicy read_policy;
00135
00136 oro_atomic_t counter;
00137 oro_atomic_t dcounter;
00138 public:
00139 typedef unsigned int size_type;
00140
00149 AtomicQueue(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00150 : MAX_THREADS( threads ), write_policy(lsize), read_policy(0)
00151 {
00152 const unsigned int BUF_NUM = BufNum();
00153 bufs = newStorage( BUF_NUM, lsize );
00154 oro_atomic_set(&counter,0);
00155 oro_atomic_set(&dcounter,0);
00156 }
00157
00158 ~AtomicQueue() {
00159 delete bufs;
00160 }
00161
00162 size_type capacity() const
00163 {
00164 size_type res;
00165 Item* orig = lockAndGetActive();
00166 res = orig->data.capacity();
00167 oro_atomic_dec( &orig->count );
00168 return res;
00169 }
00170
00171 size_type size() const
00172 {
00173 size_type res;
00174 Item* orig = lockAndGetActive();
00175 res = orig->data.size();
00176 oro_atomic_dec( &orig->count );
00177 return res;
00178 }
00179
00184 bool isEmpty() const
00185 {
00186 bool res;
00187 Item* orig = lockAndGetActive();
00188 res = orig->data.empty();
00189 oro_atomic_dec( &orig->count );
00190 return res;
00191 }
00192
00197 bool isFull() const
00198 {
00199 bool res;
00200 Item* orig = lockAndGetActive();
00201 res = (orig->data.size() == orig->data.capacity());
00202 oro_atomic_dec( &orig->count );
00203 return res;
00204 }
00205
00206 void clear()
00207 {
00208 Item* orig(0);
00209 Item* nextbuf(0);
00210 int items = 0;
00211 do {
00212 if (orig) {
00213 oro_atomic_dec(&orig->count);
00214 oro_atomic_dec(&nextbuf->count);
00215 }
00216 orig = lockAndGetActive();
00217 items = orig->data.size();
00218 nextbuf = findEmptyBuf();
00219 } while ( OS::CAS(&active, orig, nextbuf ) == false );
00220 oro_atomic_dec( &orig->count );
00221 oro_atomic_dec( &orig->count );
00222 oro_atomic_set(&counter,0);
00223 oro_atomic_set(&dcounter,0);
00224 }
00225
00231 bool enqueue(const T& value)
00232 {
00233 Item* orig=0;
00234 Item* usingbuf(0);
00235 write_policy.pop();
00236 do {
00237 if (orig) {
00238 oro_atomic_dec(&orig->count);
00239 oro_atomic_dec(&usingbuf->count);
00240 }
00241 orig = lockAndGetActive();
00242 if ( orig->data.size() == orig->data.capacity() ) {
00243 oro_atomic_dec( &orig->count );
00244 return false;
00245 }
00246 usingbuf = findEmptyBuf();
00247 usingbuf->data = orig->data;
00248 usingbuf->data.push_back( value );
00249 } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00250 oro_atomic_dec( &orig->count );
00251 oro_atomic_dec( &orig->count );
00252 read_policy.push();
00253 return true;
00254 }
00255
00262 int enqueueCounted(const T& value)
00263 {
00264 if ( enqueue( value ) ) {
00265 oro_atomic_inc(&counter);
00266 return oro_atomic_read(&counter);
00267 }
00268 return 0;
00269 }
00270
00276 bool dequeue( T& result )
00277 {
00278 Item* orig=0;
00279 Item* usingbuf(0);
00280 read_policy.pop();
00281 do {
00282 if (orig) {
00283 oro_atomic_dec(&orig->count);
00284 oro_atomic_dec(&usingbuf->count);
00285 }
00286 orig = lockAndGetActive();
00287 if ( orig->data.empty() ) {
00288 oro_atomic_dec( &orig->count );
00289 return false;
00290 }
00291 usingbuf = findEmptyBuf();
00292 result = orig->data.front();
00293 CIterator it = ++(orig->data.begin());
00294 for ( ; it != orig->data.end(); ++it )
00295 usingbuf->data.push_back(*it);
00296
00297 } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00298 oro_atomic_dec( &orig->count );
00299 oro_atomic_dec( &orig->count );
00300 write_policy.push();
00301 return true;
00302 }
00303
00310 int dequeueCounted( T& result )
00311 {
00312 if (dequeue(result) ) {
00313 oro_atomic_inc(&dcounter);
00314 return oro_atomic_read(&dcounter);
00315 }
00316 return 0;
00317 }
00318
00325 template<class MPoolType>
00326 T lockfront(MPoolType& mp) const
00327 {
00328 bool was_locked = false;
00329 Item* orig=0;
00330 T result;
00331 do {
00332 if (orig) {
00333 mp.unlock( orig->data.front() );
00334 oro_atomic_dec(&orig->count);
00335 }
00336 orig = lockAndGetActive();
00337 if ( orig->data.empty() ) {
00338 oro_atomic_dec( &orig->count );
00339 return 0;
00340 }
00341
00342 was_locked = mp.lock( orig->data.front() );
00343
00344 } while( !was_locked );
00345 result = orig->data.front();
00346 oro_atomic_dec( &orig->count );
00347 return result;
00348 }
00349
00353 value_t front() const
00354 {
00355 Item* orig = lockAndGetActive();
00356 value_t ret(orig->data.front());
00357 oro_atomic_dec( &orig->count );
00358 return ret;
00359 }
00360
00364 value_t back() const
00365 {
00366 Item* orig = lockAndGetActive();
00367 value_t ret(orig->data.back());
00368 oro_atomic_dec( &orig->count );
00369 return ret;
00370 }
00371
00372 private:
00376 Item* findEmptyBuf() {
00377
00378
00379 Item* start = &(*bufs)[0];
00380 while( true ) {
00381 if ( oro_atomic_inc_and_test( &start->count ) )
00382 break;
00383 oro_atomic_dec( &start->count );
00384 ++start;
00385 if (start == &(*bufs)[0] + BufNum() )
00386 start = &(*bufs)[0];
00387 }
00388 start->data.clear();
00389 return start;
00390 }
00391
00396 Item* lockAndGetActive() const {
00397
00398 Item* orig=0;
00399 do {
00400 if (orig)
00401 oro_atomic_dec( &orig->count );
00402 orig = active;
00403 oro_atomic_inc( &orig->count );
00404
00405
00406
00407 } while ( active != orig );
00408 return orig;
00409 }
00410 };
00411
00412 }
00413
00414 #endif