ListLockFree.hpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #ifndef ORO_LIST_LOCK_FREE_HPP
00040 #define ORO_LIST_LOCK_FREE_HPP
00041
00042 #include <vector>
00043 #include "os/oro_atomic.h"
00044 #include "os/CAS.hpp"
00045 #include <boost/intrusive_ptr.hpp>
00046
00047 #ifdef ORO_PRAGMA_INTERFACE
00048 #pragma interface
00049 #endif
00050
00051 namespace RTT
00052 {
00053 namespace detail {
00054 struct IntrusiveStorage
00055 {
00056 oro_atomic_t ref;
00057 IntrusiveStorage() {
00058 oro_atomic_set(&ref,0);
00059 }
00060 virtual ~IntrusiveStorage() {
00061 }
00062 };
00063 }
00064 }
00065
00066
00067 void intrusive_ptr_add_ref(RTT::detail::IntrusiveStorage* p );
00068 void intrusive_ptr_release(RTT::detail::IntrusiveStorage* p );
00069
00070 namespace RTT
00071 {
00082 template< class T>
00083 class ListLockFree
00084 {
00085 public:
00091 const unsigned int MAX_THREADS;
00092
00093 typedef T value_t;
00094 private:
00095 typedef std::vector<value_t> BufferType;
00096 typedef typename BufferType::iterator Iterator;
00097 typedef typename BufferType::const_iterator CIterator;
00098 struct Item {
00099 Item() {
00100
00101 oro_atomic_set(&count,-1);
00102 }
00103 mutable oro_atomic_t count;
00104 BufferType data;
00105 };
00106
00107 struct StorageImpl : public detail::IntrusiveStorage
00108 {
00109 Item* items;
00110 StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00111 }
00112 ~StorageImpl() {
00113 delete[] items;
00114 }
00115 Item& operator[](int i) {
00116 return items[i];
00117 }
00118 };
00119
00124 typedef boost::intrusive_ptr<StorageImpl> Storage;
00125
00126 Storage newStorage(size_t alloc, size_t items, bool init = true)
00127 {
00128 Storage st( new StorageImpl(alloc) );
00129 for (unsigned int i=0; i < alloc; ++i) {
00130 (*st)[i].data.reserve( items );
00131 }
00132
00133 if (init) {
00134 active = &(*st)[0];
00135 oro_atomic_inc( &active->count );
00136 }
00137
00138 return st;
00139 }
00140
00141 Storage bufs;
00142 Item* volatile active;
00143 Item* volatile blankp;
00144
00145
00146
00147
00148 inline size_t BufNum() const {
00149 return MAX_THREADS * 2;
00150 }
00151
00152 size_t required;
00153 public:
00162 ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00163 : MAX_THREADS( threads ), blankp(0), required(lsize)
00164 {
00165 const unsigned int BUF_NUM = BufNum();
00166 bufs = newStorage( BUF_NUM, lsize );
00167 }
00168
00169 ~ListLockFree() {
00170 }
00171
00172 size_t capacity() const
00173 {
00174 size_t res;
00175 Storage st;
00176 Item* orig = lockAndGetActive(st);
00177 res = orig->data.capacity();
00178 oro_atomic_dec( &orig->count );
00179 return res;
00180 }
00181
00182 size_t size() const
00183 {
00184 size_t res;
00185 Storage st;
00186 Item* orig = lockAndGetActive(st);
00187 res = orig->data.size();
00188 oro_atomic_dec( &orig->count );
00189 return res;
00190 }
00191
00192 bool empty() const
00193 {
00194 bool res;
00195 Storage st;
00196 Item* orig = lockAndGetActive(st);
00197 res = orig->data.empty();
00198 oro_atomic_dec( &orig->count );
00199 return res;
00200 }
00201
00209 void grow(size_t items = 1) {
00210 required += items;
00211 if (required > this->capacity()) {
00212 this->reserve( required*2 );
00213 }
00214 }
00222 void shrink(size_t items = 1) {
00223 required -= items;
00224 }
00225
00235 void reserve(size_t lsize)
00236 {
00237 if (lsize <= this->capacity() )
00238 return;
00239
00240 const unsigned int BUF_NUM = BufNum();
00241 Storage res( newStorage(BUF_NUM, lsize, false) );
00242
00243
00244 Item* nextbuf = &(*res)[0];
00245 oro_atomic_inc( &nextbuf->count );
00246
00247
00248 Item* orig = 0;
00249
00250
00251
00252 Storage save = bufs;
00253
00254
00255
00256
00257
00258 bufs = res;
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270 do {
00271 if (orig)
00272 oro_atomic_dec(&orig->count);
00273 orig = lockAndGetActive();
00274 nextbuf->data.clear();
00275 Iterator it( orig->data.begin() );
00276 while ( it != orig->data.end() ) {
00277 nextbuf->data.push_back( *it );
00278 ++it;
00279 }
00280
00281
00282
00283
00284 } while ( OS::CAS(&active, orig, nextbuf ) == false);
00285
00286
00287 assert( pointsTo( active, bufs ) );
00288
00289 oro_atomic_dec( &orig->count );
00290 oro_atomic_dec( &orig->count );
00291 }
00292
00293 void clear()
00294 {
00295 Storage bufptr;
00296 Item* orig(0);
00297 Item* nextbuf(0);
00298 int items = 0;
00299 do {
00300 if (orig) {
00301 oro_atomic_dec(&orig->count);
00302 oro_atomic_dec(&nextbuf->count);
00303 }
00304 orig = lockAndGetActive(bufptr);
00305 items = orig->data.size();
00306 nextbuf = findEmptyBuf(bufptr);
00307 nextbuf->data.clear();
00308 } while ( OS::CAS(&active, orig, nextbuf ) == false );
00309 oro_atomic_dec( &orig->count );
00310 oro_atomic_dec( &orig->count );
00311 }
00312
00318 bool append( value_t item )
00319 {
00320 Item* orig=0;
00321 Storage bufptr;
00322 Item* usingbuf(0);
00323 do {
00324 if (orig) {
00325 oro_atomic_dec(&orig->count);
00326 oro_atomic_dec(&usingbuf->count);
00327 }
00328 orig = lockAndGetActive( bufptr );
00329 if ( orig->data.size() == orig->data.capacity() ) {
00330 oro_atomic_dec( &orig->count );
00331 return false;
00332 }
00333 usingbuf = findEmptyBuf( bufptr );
00334 usingbuf->data = orig->data;
00335 usingbuf->data.push_back( item );
00336 } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00337 oro_atomic_dec( &orig->count );
00338 oro_atomic_dec( &orig->count );
00339 return true;
00340 }
00341
00345 value_t front() const
00346 {
00347 Storage bufptr;
00348 Item* orig = lockAndGetActive(bufptr);
00349 value_t ret(orig->data.front());
00350 oro_atomic_dec( &orig->count );
00351 return ret;
00352 }
00353
00357 value_t back() const
00358 {
00359 Storage bufptr;
00360 Item* orig = lockAndGetActive(bufptr);
00361 value_t ret(orig->data.back());
00362 oro_atomic_dec( &orig->count );
00363 return ret;
00364 }
00365
00371 size_t append(const std::vector<T>& items)
00372 {
00373 Item* usingbuf(0);
00374 Item* orig=0;
00375 int towrite = items.size();
00376 Storage bufptr;
00377 do {
00378 if (orig) {
00379 oro_atomic_dec(&orig->count);
00380 oro_atomic_dec(&usingbuf->count);
00381 }
00382
00383 orig = lockAndGetActive( bufptr );
00384 int maxwrite = orig->data.capacity() - orig->data.size();
00385 if ( maxwrite == 0 ) {
00386 oro_atomic_dec( &orig->count );
00387 return 0;
00388 }
00389 if ( towrite > maxwrite )
00390 towrite = maxwrite;
00391 usingbuf = findEmptyBuf( bufptr );
00392 usingbuf->data = orig->data;
00393 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
00394 } while ( OS::CAS(&active, orig, usingbuf ) ==false );
00395 oro_atomic_dec( &orig->count );
00396 oro_atomic_dec( &orig->count );
00397 return towrite;
00398 }
00399
00400
00406 bool erase( value_t item )
00407 {
00408 Item* orig=0;
00409 Item* nextbuf(0);
00410 Storage bufptr;
00411 do {
00412 if (orig) {
00413 oro_atomic_dec(&orig->count);
00414 oro_atomic_dec(&nextbuf->count);
00415 }
00416 orig = lockAndGetActive( bufptr );
00417
00418 nextbuf = findEmptyBuf( bufptr );
00419 Iterator it( orig->data.begin() );
00420 while (it != orig->data.end() && !( *it == item ) ) {
00421 nextbuf->data.push_back( *it );
00422 ++it;
00423 }
00424 if ( it == orig->data.end() ) {
00425 oro_atomic_dec( &orig->count );
00426 oro_atomic_dec( &nextbuf->count );
00427 return false;
00428 }
00429 ++it;
00430 while ( it != orig->data.end() ) {
00431 nextbuf->data.push_back( *it );
00432 ++it;
00433 }
00434 } while ( OS::CAS(&active, orig, nextbuf ) ==false );
00435 oro_atomic_dec( &orig->count );
00436 oro_atomic_dec( &orig->count );
00437 return true;
00438 }
00439
00444 template<class Function>
00445 void apply(Function func )
00446 {
00447 Storage st;
00448 Item* orig = lockAndGetActive(st);
00449 Iterator it( orig->data.begin() );
00450 while ( it != orig->data.end() ) {
00451 func( *it );
00452 ++it;
00453 }
00454 oro_atomic_dec( &orig->count );
00455 }
00456
00470 template<class Function>
00471 void apply_and_blank(Function func, value_t blank )
00472 {
00473 Storage st;
00474 Item* orig = lockAndGetActive(st);
00475 Item* newp = findEmptyBuf(st);
00476 Iterator it( orig->data.begin() );
00477
00478 while ( it != orig->data.end() ) {
00479 newp->data.push_back( *it );
00480 ++it;
00481 }
00482 blankp = newp;
00483 it = blankp->data.begin();
00484
00485 while ( it != blankp->data.end() ) {
00486
00487
00488 value_t a = *it;
00489 if ( !(a == blank) )
00490 func( a );
00491 ++it;
00492 }
00493 blankp = 0;
00494
00495 oro_atomic_dec( &orig->count );
00496 oro_atomic_dec( &newp->count );
00497 }
00498
00514 bool erase_and_blank(value_t item, value_t blank )
00515 {
00516 Storage st;
00517 bool res = this->erase(item);
00518 Item* orig = lockAndGetBlank(st);
00519 if (orig) {
00520 Iterator it( orig->data.begin() );
00521
00522 while ( *it != item ) {
00523 ++it;
00524 if (it == orig->data.end() ) {
00525 oro_atomic_dec( &orig->count );
00526 return res;
00527 }
00528 }
00529 (*it) = blank;
00530 oro_atomic_dec( &orig->count );
00531 }
00532 return res;
00533 }
00534
00540 template<class Function>
00541 value_t find_if( Function func, value_t blank = value_t() )
00542 {
00543 Storage st;
00544 Item* orig = lockAndGetActive(st);
00545 Iterator it( orig->data.begin() );
00546 while ( it != orig->data.end() ) {
00547 if (func( *it ) == true ) {
00548 oro_atomic_dec( &orig->count );
00549 return *it;
00550 }
00551 ++it;
00552 }
00553 oro_atomic_dec( &orig->count );
00554 return blank;
00555 }
00556 private:
00560 Item* findEmptyBuf(Storage& bufptr) {
00561
00562
00563 Item* start = &(*bufptr)[0];
00564 while( true ) {
00565 if ( oro_atomic_inc_and_test( &start->count ) )
00566 break;
00567 oro_atomic_dec( &start->count );
00568 ++start;
00569 if (start == &(*bufptr)[0] + BufNum() )
00570 start = &(*bufptr)[0];
00571 }
00572 assert( pointsTo(start, bufptr) );
00573 start->data.clear();
00574 return start;
00575 }
00576
00580 Item* lockAndGetActive(Storage& bufptr) const {
00581
00582
00583 Item* orig=0;
00584 do {
00585 if (orig)
00586 oro_atomic_dec( &orig->count );
00587 bufptr = bufs;
00588 orig = active;
00589
00590 if ( pointsTo(orig, bufptr) )
00591 oro_atomic_inc( &orig->count );
00592 else {
00593 orig = 0;
00594 }
00595
00596
00597
00598 } while ( active != orig );
00599 assert( pointsTo(orig, bufptr) );
00600 return orig;
00601 }
00602
00607 Item* lockAndGetActive() const {
00608
00609 Item* orig=0;
00610 do {
00611 if (orig)
00612 oro_atomic_dec( &orig->count );
00613 orig = active;
00614 oro_atomic_inc( &orig->count );
00615
00616
00617
00618 } while ( active != orig );
00619 return orig;
00620 }
00621
00625 Item* lockAndGetBlank(Storage& bufptr) const {
00626 Item* orig=0;
00627 do {
00628 if (orig)
00629 oro_atomic_dec( &orig->count );
00630 bufptr = bufs;
00631 orig = blankp;
00632 if (orig == 0)
00633 return 0;
00634
00635 if ( pointsTo(orig, bufptr) )
00636 oro_atomic_inc( &orig->count );
00637 else {
00638 orig = 0;
00639 }
00640
00641
00642
00643 } while ( blankp != orig );
00644 assert( pointsTo(orig, bufptr) );
00645 return orig;
00646 }
00647
00648 inline bool pointsTo( Item* p, const Storage& bf ) const {
00649 return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
00650 }
00651
00652 };
00653 }
00654
00655 #endif