ListLockFree.hpp

00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:11:39 CET 2006  ListLockFree.hpp
00003 
00004                         ListLockFree.hpp -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef ORO_LIST_LOCK_FREE_HPP
00040 #define ORO_LIST_LOCK_FREE_HPP
00041 
00042 #include <vector>
00043 #include "os/oro_atomic.h"
00044 #include "os/CAS.hpp"
00045 #include <boost/intrusive_ptr.hpp>
00046 
00047 #ifdef ORO_PRAGMA_INTERFACE
00048 #pragma interface
00049 #endif
00050 
00051 namespace RTT
00052 {
00053     namespace detail {
00054         struct IntrusiveStorage
00055         {
00056             oro_atomic_t ref;
00057             IntrusiveStorage() {
00058                 oro_atomic_set(&ref,0);
00059             }
00060             virtual ~IntrusiveStorage() {
00061             }
00062         };
00063     }
00064 }
00065 
00066 
00067 void intrusive_ptr_add_ref(RTT::detail::IntrusiveStorage* p );
00068 void intrusive_ptr_release(RTT::detail::IntrusiveStorage* p );
00069 
00070 namespace RTT
00071 {
00082     template< class T>
00083     class ListLockFree
00084     {
00085     public:
00091         const unsigned int MAX_THREADS;
00092 
00093         typedef T value_t;
00094     private:
00095         typedef std::vector<value_t> BufferType;
00096         typedef typename BufferType::iterator Iterator;
00097         typedef typename BufferType::const_iterator CIterator;
00098         struct Item {
00099             Item()  {
00100                 //ORO_ATOMIC_INIT(count);
00101                 oro_atomic_set(&count,-1);
00102             }
00103             mutable oro_atomic_t count;  // refcount
00104             BufferType data;
00105         };
00106 
00107         struct StorageImpl : public detail::IntrusiveStorage
00108         {
00109             Item* items;
00110             StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00111             }
00112             ~StorageImpl() {
00113                 delete[] items;
00114             }
00115             Item& operator[](int i) {
00116                 return items[i];
00117             }
00118         };
00119 
00124         typedef boost::intrusive_ptr<StorageImpl> Storage;
00125 
00126         Storage newStorage(size_t alloc, size_t items, bool init = true)
00127         {
00128             Storage st( new StorageImpl(alloc) );
00129             for (unsigned int i=0; i < alloc; ++i) {
00130                 (*st)[i].data.reserve( items ); // pre-allocate
00131             }
00132             // bootstrap the first list :
00133             if (init) {
00134                 active = &(*st)[0];
00135                 oro_atomic_inc( &active->count );
00136             }
00137 
00138             return st;
00139         }
00140 
00141         Storage bufs;
00142         Item* volatile active;
00143         Item* volatile blankp;
00144 
00145         // each thread has one 'working' buffer, and one 'active' buffer
00146         // lock. Thus we require to allocate twice as much buffers as threads,
00147         // for all the locks to succeed in a worst case scenario.
00148         inline size_t BufNum() const {
00149             return MAX_THREADS * 2;
00150         }
00151 
00152         size_t required;
00153     public:
00162         ListLockFree(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00163             : MAX_THREADS( threads ), blankp(0), required(lsize)
00164         {
00165             const unsigned int BUF_NUM = BufNum();
00166             bufs = newStorage( BUF_NUM, lsize );
00167         }
00168 
00169         ~ListLockFree() {
00170         }
00171 
00172         size_t capacity() const
00173         {
00174             size_t res;
00175             Storage st;
00176             Item* orig = lockAndGetActive(st);
00177             res = orig->data.capacity();
00178             oro_atomic_dec( &orig->count ); // lockAndGetActive
00179             return res;
00180         }
00181 
00182         size_t size() const
00183         {
00184             size_t res;
00185             Storage st;
00186             Item* orig = lockAndGetActive(st);
00187             res = orig->data.size();
00188             oro_atomic_dec( &orig->count ); // lockAndGetActive
00189             return res;
00190         }
00191 
00192         bool empty() const
00193         {
00194             bool res;
00195             Storage st;
00196             Item* orig = lockAndGetActive(st);
00197             res = orig->data.empty();
00198             oro_atomic_dec( &orig->count ); // lockAndGetActive
00199             return res;
00200         }
00201 
00209         void grow(size_t items = 1) {
00210             required += items;
00211             if (required > this->capacity()) {
00212                 this->reserve( required*2 );
00213             }
00214         }
00222         void shrink(size_t items = 1) {
00223             required -= items;
00224         }
00225 
00235         void reserve(size_t lsize)
00236         {
00237             if (lsize <= this->capacity() )
00238                 return;
00239 
00240             const unsigned int BUF_NUM = BufNum();
00241             Storage res( newStorage(BUF_NUM, lsize, false) );
00242 
00243             // init the future 'active' buffer.
00244             Item* nextbuf = &(*res)[0];
00245             oro_atomic_inc( &nextbuf->count );
00246 
00247             // temporary for current active buffer.
00248             Item* orig = 0;
00249 
00250             // prevent current bufs from deletion.
00251             // will free upon return.
00252             Storage save = bufs;
00253             // active points at old, bufs points at new:
00254             // first the refcount is added to res, then
00255             // bufs' pointer is switched to res' pointer,
00256             // and stored in a temporary. Then the temp
00257             // is destructed and decrements bufs' old reference.
00258             bufs = res;
00259             // from now on, any findEmptyBuf will use the new bufs,
00260             // unless the algorithm was entered before the switch.
00261             // then, it will write the result to the old buf.
00262             // if it detects we updated active, it will find an
00263             // empty buf in the new buf. If it gets there before
00264             // our CAS, our CAS will fail and we try to recopy
00265             // everything. This retry may be unnessessary
00266             // if the data already is in the new buf, but for this
00267             // cornercase, we must be sure.
00268 
00269             // copy active into new:
00270             do {
00271                 if (orig)
00272                     oro_atomic_dec(&orig->count);
00273                 orig = lockAndGetActive(); // active is guaranteed to point in valid buffer ( save or bufs )
00274                 nextbuf->data.clear();
00275                 Iterator it( orig->data.begin() );
00276                 while ( it != orig->data.end() ) {
00277                     nextbuf->data.push_back( *it );
00278                     ++it;
00279                 }
00280                 // see explanation above: active could have changed,
00281                 // and still point in old buffer. we could check this
00282                 // with pointer arithmetics, but this is not a performant
00283                 // method.
00284             } while ( OS::CAS(&active, orig, nextbuf ) == false);
00285             // now,
00286             // active is guaranteed to point into bufs.
00287             assert( pointsTo( active, bufs ) );
00288 
00289             oro_atomic_dec( &orig->count ); // lockAndGetActive
00290             oro_atomic_dec( &orig->count ); // ref count
00291         }
00292 
00293         void clear()
00294         {
00295             Storage bufptr;
00296             Item* orig(0);
00297             Item* nextbuf(0);
00298             int items = 0;
00299             do {
00300                 if (orig) {
00301                     oro_atomic_dec(&orig->count);
00302                     oro_atomic_dec(&nextbuf->count);
00303                 }
00304                 orig = lockAndGetActive(bufptr);
00305                 items = orig->data.size();
00306                 nextbuf = findEmptyBuf(bufptr); // find unused Item in bufs
00307                 nextbuf->data.clear();
00308             } while ( OS::CAS(&active, orig, nextbuf ) == false );
00309             oro_atomic_dec( &orig->count ); // lockAndGetActive
00310             oro_atomic_dec( &orig->count ); // ref count
00311         }
00312 
00318         bool append( value_t item )
00319         {
00320             Item* orig=0;
00321             Storage bufptr;
00322             Item* usingbuf(0);
00323             do {
00324                 if (orig) {
00325                     oro_atomic_dec(&orig->count);
00326                     oro_atomic_dec(&usingbuf->count);
00327                 }
00328                 orig = lockAndGetActive( bufptr );
00329                 if ( orig->data.size() == orig->data.capacity() ) { // check for full
00330                     oro_atomic_dec( &orig->count );
00331                     return false;
00332                 }
00333                 usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs
00334                 usingbuf->data = orig->data;
00335                 usingbuf->data.push_back( item );
00336             } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00337             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00338             oro_atomic_dec( &orig->count ); // set list free
00339             return true;
00340         }
00341 
00345         value_t front() const
00346         {
00347             Storage bufptr;
00348             Item* orig = lockAndGetActive(bufptr);
00349             value_t ret(orig->data.front());
00350             oro_atomic_dec( &orig->count ); //lockAndGetActive
00351             return ret;
00352         }
00353 
00357         value_t back() const
00358         {
00359             Storage bufptr;
00360             Item* orig = lockAndGetActive(bufptr);
00361             value_t ret(orig->data.back());
00362             oro_atomic_dec( &orig->count ); //lockAndGetActive
00363             return ret;
00364         }
00365 
00371         size_t append(const std::vector<T>& items)
00372         {
00373             Item* usingbuf(0);
00374             Item* orig=0;
00375             int towrite  = items.size();
00376             Storage bufptr;
00377             do {
00378                 if (orig) {
00379                     oro_atomic_dec(&orig->count);
00380                     oro_atomic_dec(&usingbuf->count);
00381                 }
00382 
00383                 orig = lockAndGetActive( bufptr );
00384                 int maxwrite = orig->data.capacity() - orig->data.size();
00385                 if ( maxwrite == 0 ) {
00386                     oro_atomic_dec( &orig->count ); // lockAndGetActive()
00387                     return 0;
00388                 }
00389                 if ( towrite > maxwrite )
00390                     towrite = maxwrite;
00391                 usingbuf = findEmptyBuf( bufptr ); // find unused Item in bufs
00392                 usingbuf->data = orig->data;
00393                 usingbuf->data.insert( usingbuf->data.end(), items.begin(), items.begin() + towrite );
00394             } while ( OS::CAS(&active, orig, usingbuf ) ==false );
00395             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00396             oro_atomic_dec( &orig->count ); // set list free
00397             return towrite;
00398         }
00399 
00400 
00406         bool erase( value_t item )
00407         {
00408             Item* orig=0;
00409             Item* nextbuf(0);
00410             Storage bufptr;
00411             do {
00412                 if (orig) {
00413                     oro_atomic_dec(&orig->count);
00414                     oro_atomic_dec(&nextbuf->count);
00415                 }
00416                 orig = lockAndGetActive( bufptr ); // find active in bufptr
00417                 // we do this in the loop because bufs can change.
00418                 nextbuf = findEmptyBuf( bufptr ); // find unused Item in same buf.
00419                 Iterator it( orig->data.begin() );
00420                 while (it != orig->data.end() && !( *it == item ) ) {
00421                     nextbuf->data.push_back( *it );
00422                     ++it;
00423                 }
00424                 if ( it == orig->data.end() ) {
00425                     oro_atomic_dec( &orig->count );
00426                     oro_atomic_dec( &nextbuf->count );
00427                     return false; // item not found.
00428                 }
00429                 ++it; // skip item.
00430                 while ( it != orig->data.end() ) {
00431                     nextbuf->data.push_back( *it );
00432                     ++it;
00433                 }
00434             } while ( OS::CAS(&active, orig, nextbuf ) ==false );
00435             oro_atomic_dec( &orig->count ); // lockAndGetActive
00436             oro_atomic_dec( &orig->count ); // ref count
00437             return true;
00438         }
00439 
00444         template<class Function>
00445         void apply(Function func )
00446         {
00447             Storage st;
00448             Item* orig = lockAndGetActive(st);
00449             Iterator it( orig->data.begin() );
00450             while ( it != orig->data.end() ) {
00451                 func( *it );
00452                 ++it;
00453             }
00454             oro_atomic_dec( &orig->count ); //lockAndGetActive
00455         }
00456 
00470         template<class Function>
00471         void apply_and_blank(Function func, value_t blank )
00472         {
00473             Storage st;
00474             Item* orig = lockAndGetActive(st);
00475             Item* newp = findEmptyBuf(st);
00476             Iterator it( orig->data.begin() );
00477             // first copy the whole list.
00478             while ( it != orig->data.end() ) {
00479                 newp->data.push_back( *it );
00480                 ++it;
00481             }
00482             blankp = newp;
00483             it = blankp->data.begin();
00484             // iterate over copy and skip blanks.
00485             while ( it != blankp->data.end() ) {
00486                 // XXX Race condition: 'it' can be blanked after
00487                 // comparison or even during func.
00488                 value_t a = *it;
00489                 if ( !(a == blank) )
00490                     func( a );
00491                 ++it;
00492             }
00493             blankp = 0;
00494 
00495             oro_atomic_dec( &orig->count ); //lockAndGetActive
00496             oro_atomic_dec( &newp->count ); //findEmptyBuf
00497         }
00498 
00514         bool erase_and_blank(value_t item, value_t blank )
00515         {
00516             Storage st;
00517             bool res = this->erase(item);
00518             Item* orig = lockAndGetBlank(st);
00519             if (orig) {
00520                 Iterator it( orig->data.begin() );
00521                 // item may still not be present in the blank-list.
00522                 while ( *it != item ) {
00523                     ++it;
00524                     if (it == orig->data.end() ) {
00525                         oro_atomic_dec( &orig->count ); //lockAndGetBlank
00526                         return res;
00527                     }
00528                 }
00529                 (*it) = blank;
00530                 oro_atomic_dec( &orig->count ); //lockAndGetBlank
00531             }
00532             return res;
00533         }
00534 
00540         template<class Function>
00541         value_t find_if( Function func, value_t blank = value_t() )
00542         {
00543             Storage st;
00544             Item* orig = lockAndGetActive(st);
00545             Iterator it( orig->data.begin() );
00546             while ( it != orig->data.end() ) {
00547                 if (func( *it ) == true ) {
00548                     oro_atomic_dec( &orig->count ); //lockAndGetActive
00549                     return *it;
00550                 }
00551                 ++it;
00552             }
00553             oro_atomic_dec( &orig->count ); //lockAndGetActive
00554             return blank;
00555         }
00556     private:
00560         Item* findEmptyBuf(Storage& bufptr) {
00561             // These two functions are copy/pasted from BufferLockFree.
00562             // If MAX_THREADS is large enough, this will always succeed :
00563             Item* start = &(*bufptr)[0];
00564             while( true ) {
00565                 if ( oro_atomic_inc_and_test( &start->count ) )
00566                     break;
00567                 oro_atomic_dec( &start->count );
00568                 ++start;
00569                 if (start == &(*bufptr)[0] + BufNum() )
00570                     start = &(*bufptr)[0]; // in case of races, rewind
00571             }
00572             assert( pointsTo(start, bufptr) );
00573             start->data.clear();
00574             return start; // unique pointer across all threads
00575         }
00576 
00580         Item* lockAndGetActive(Storage& bufptr) const {
00581             // This is a kind-of smart-pointer implementation
00582             // We could move it into Item itself and overload operator=
00583             Item* orig=0;
00584             do {
00585                 if (orig)
00586                     oro_atomic_dec( &orig->count );
00587                 bufptr = bufs;
00588                 orig = active;
00589                 // also check that orig points into bufptr.
00590                 if ( pointsTo(orig, bufptr) )
00591                     oro_atomic_inc( &orig->count );
00592                 else {
00593                     orig = 0;
00594                 }
00595                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00596                 // if active is still equal to orig, the increase of orig->count is
00597                 // surely valid, since no contention (change of active) occured.
00598             } while ( active != orig );
00599             assert( pointsTo(orig, bufptr) );
00600             return orig;
00601         }
00602 
00607         Item* lockAndGetActive() const {
00608             // only operates on active's refcount.
00609             Item* orig=0;
00610             do {
00611                 if (orig)
00612                     oro_atomic_dec( &orig->count );
00613                 orig = active;
00614                 oro_atomic_inc( &orig->count );
00615                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00616                 // if active is still equal to orig, the increase of orig->count is
00617                 // surely valid, since no contention (change of active) occured.
00618             } while ( active != orig );
00619             return orig;
00620         }
00621 
00625         Item* lockAndGetBlank(Storage& bufptr) const {
00626             Item* orig=0;
00627             do {
00628                 if (orig)
00629                     oro_atomic_dec( &orig->count );
00630                 bufptr = bufs;
00631                 orig = blankp;
00632                 if (orig == 0)
00633                     return 0; // no blankp.
00634                 // also check that orig points into bufptr.
00635                 if ( pointsTo(orig, bufptr) )
00636                     oro_atomic_inc( &orig->count );
00637                 else {
00638                     orig = 0;
00639                 }
00640                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00641                 // if active is still equal to orig, the increase of orig->count is
00642                 // surely valid, since no contention (change of active) occured.
00643             } while ( blankp != orig );
00644             assert( pointsTo(orig, bufptr) );
00645             return orig;
00646         }
00647 
00648         inline bool pointsTo( Item* p, const Storage& bf ) const {
00649             return p >= &(*bf)[0] && p <= &(*bf)[ BufNum() - 1 ];
00650         }
00651 
00652     };
00653 }
00654 
00655 #endif

Generated on Tue Jul 13 11:03:23 2010 for Orocos Real-Time Toolkit by  doxygen 1.6.1