TaskContext.cpp

00001 /***************************************************************************
00002   tag: Peter Soetens  Tue Dec 21 22:43:08 CET 2004  TaskContext.cxx 
00003 
00004                         TaskContext.cxx -  description
00005                            -------------------
00006     begin                : Tue December 21 2004
00007     copyright            : (C) 2004 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.ac.be
00009  
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037  
00038  
00039 
00040 #include "TaskContext.hpp"
00041 #include <CommandInterface.hpp>
00042 
00043 #include <string>
00044 #include <algorithm>
00045 #include <functional>
00046 #include <boost/bind.hpp>
00047 #include <boost/mem_fn.hpp>
00048 
00049 #include "DataSource.hpp"
00050 #include "Method.hpp"
00051 #include "ConnectionInterface.hpp"
00052 
00053 #include "rtt-config.h"
00054 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00055 #include "scripting/ParserScriptingAccess.hpp"
00056 #include "scripting/ParserExecutionAccess.hpp"
00057 #endif
00058 #include "MarshallingAccess.hpp"
00059 
00060 namespace RTT
00061 {
00062     
00063     using namespace boost;
00064     using namespace std;
00065 
00066     TaskContext::TaskContext(const std::string& name, TaskState initial_state /*= Stopped*/)
00067         :  TaskCore(name, initial_state)
00068 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00069            ,mscriptAcc(new ParserScriptingAccess(this))
00070 #else
00071            ,mscriptAcc(new ScriptingAccess(this))
00072 #endif
00073 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00074            ,mengAcc( new ParserExecutionAccess(this ) )
00075 #else
00076            ,mengAcc( new ExecutionAccess(this ) )
00077 #endif
00078            ,marshAcc( new MarshallingAccess(this) )
00079            ,dataPorts(this)
00080     {
00081         this->setup();
00082     }
00083 
00084     TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state /*= Stopped*/ )
00085         :  TaskCore(name, parent, initial_state)
00086 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00087            ,mscriptAcc(new ParserScriptingAccess(this))
00088 #else
00089            ,mscriptAcc(new ScriptingAccess(this))
00090 #endif
00091 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00092            ,mengAcc( new ParserExecutionAccess(this ) )
00093 #else
00094            ,mengAcc( new ExecutionAccess(this ) )
00095 #endif
00096            ,marshAcc( new MarshallingAccess(this) )
00097            ,dataPorts(this)
00098     {
00099         this->setup();
00100     }
00101 
00102     void TaskContext::setup()
00103     {
00104         // Work around for bug
00105         this->mevents.setEventProcessor( ee->events() );
00106 
00107         mdescription = "The interface of this TaskContext.";
00108 
00109         this->methods()
00110             ->addMethod( method("configure",&TaskContext::configure, this),
00111                          "Configure this TaskContext (read properties etc)." );
00112         this->methods()
00113             ->addMethod( method("isConfigured",&TaskContext::isConfigured, this),
00114                          "Is this TaskContext configured ?" );
00115         this->methods()
00116             ->addMethod( method("start",&TaskContext::start, this),
00117                          "Start the Execution Engine of this TaskContext (= activate + updateHook() )." );
00118         this->methods()
00119             ->addMethod( method("activate",&TaskContext::activate, this),
00120                          "Activate the Execution Engine of this TaskContext (= events and commands)." );
00121         this->methods()
00122             ->addMethod( method("stop",&TaskContext::stop, this),
00123                          "Stop the Execution Engine of this TaskContext." );
00124         this->methods()
00125             ->addMethod( method("isRunning",&TaskContext::isRunning, this),
00126                          "Is the Execution Engine executing this TaskContext ?" );
00127         this->methods()
00128             ->addMethod( method("getPeriod",&TaskContext::getPeriod, this),
00129                          "Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period." );
00130         this->methods()
00131             ->addMethod( method("isActive",&TaskContext::isActive, this),
00132                          "Is the Execution Engine of this TaskContext processing events and commands ?" );
00133         this->methods()
00134             ->addMethod( method("inFatalError",&TaskContext::inFatalError, this),
00135                          "Check if this TaskContext is in the FatalError state." );
00136         this->methods()
00137             ->addMethod( method("warning",&TaskContext::warning, this),
00138                          "Enter the RunTimeWarning state." );
00139         this->methods()
00140             ->addMethod( method("inRunTimeWarning",&TaskContext::inRunTimeWarning, this),
00141                          "Check if this TaskContext is in the RunTimeWarning state." );
00142         this->methods()
00143             ->addMethod( method("getWarningCount",&TaskContext::getWarningCount, this),
00144                          "Check if the number of times the RunTimeWarning state has been entered." );
00145         this->methods()
00146             ->addMethod( method("error",&TaskContext::error, this),
00147                          "Enter the RunTimeError state." );
00148         this->methods()
00149             ->addMethod( method("inRunTimeError",&TaskContext::inRunTimeError, this),
00150                          "Check if this TaskContext is in the RunTimeError state." );
00151         this->methods()
00152             ->addMethod( method("getErrorCount",&TaskContext::getErrorCount, this),
00153                          "Check if the number of times the RunTimeError state has been entered." );
00154         this->methods()
00155             ->addMethod( method("cleanup",&TaskContext::cleanup, this),
00156                          "Reset this TaskContext to the PreOperational state (write properties etc)." );
00157         this->methods()
00158             ->addMethod( method("resetError",&TaskContext::resetError, this),
00159                          "Reset this TaskContext from the FatalError state." );
00160         this->methods()
00161             ->addMethod( method("update",&TaskContext::doUpdate, this),
00162                          "Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task." );
00163 
00164         this->methods()
00165             ->addMethod( method("trigger",&TaskContext::doTrigger, this),
00166                          "Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task." );
00167     }
00168 
00169         TaskContext::~TaskContext()
00170         {
00171             // We don't call stop() or cleanup() here since this is
00172             // the responsibility of the subclass. Calling these functions
00173             // here would only lead to calling invalid virtual functions.
00174             // [Rule no 1: Don't call virtual functions in a destructor.]
00175             // [Rule no 2: Don't call virtual functions in a constructor.]
00176             mattributes.clear();
00177 
00178             delete mscriptAcc;
00179             delete mengAcc;
00180             delete marshAcc;
00181 
00182             // remove from all users.
00183             while( !musers.empty() ) {
00184                 musers.front()->removePeer(this);
00185             }
00186             // since we are destroyed, be sure that the peer no longer
00187             // has a 'user' pointer to us.
00188             while ( !_task_map.empty() ) {
00189                 _task_map.begin()->second->removeUser(this);
00190                 _task_map.erase( _task_map.begin() );
00191             }
00192             // Do not call this->disconnect() !!!
00193             // Ports are probably already destructed by user code.
00194 
00195         }
00196 
00197     void TaskContext::exportPorts()
00198     {
00199         DataFlowInterface::Ports myports = this->ports()->getPorts();
00200         for (DataFlowInterface::Ports::iterator it = myports.begin();
00201              it != myports.end();
00202              ++it) {
00203             // Add the port to the method interface.
00204             if (this->getObject((*it)->getName()) == 0) {
00205                 OperationInterface* ms = this->ports()->createPortObject((*it)->getName());
00206                 if ( ms )
00207                     if ( this->addObject( ms ) == false )
00208                         delete ms;
00209             }
00210         }
00211     }
00212 
00213     bool TaskContext::connectPorts( TaskContext* peer )
00214     {
00215         bool failure = false;
00216         const std::string& location = this->getName();
00217         Logger::In in( location.c_str()  );
00218         // connect our write ports to the read ports of 'peer'.
00219         // and vice versa.
00220         DataFlowInterface::Ports myports = this->ports()->getPorts();
00221         for (DataFlowInterface::Ports::iterator it = myports.begin();
00222              it != myports.end();
00223              ++it) {
00224             // Then try to get the peer port's connection
00225             PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
00226             if ( !peerport ) {
00227                 log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
00228                 continue;
00229             }
00230 
00231             // Detect already connected ports.
00232             if ( peerport->connected() && (*it)->connected() ) {
00233                 if (peerport->connection() == (*it)->connection() )
00234                     continue;
00235                 log(Warning) << "Ports '"<< peerport->getName() << "' of task " <<peer->getName() << " and task " << getName()
00236                       << " have the same name but are not connected to each other." << endlog();
00237             }
00238 
00239             // NOTE: all code below can be replaced by a single line:
00240             // peerport->connectTo( *it ) || (*it)->connectTo(peerport);
00241             // but that has less informational log messages.
00242 
00243             // Our port is connected thus peerport is not connected.
00244             if ( (*it)->connected() ) {
00245                 // ask peer to connect to us:
00246                 if ( peerport->connectTo( *it ) ) {
00247                     log(Info)<< "Connected Port " << (*it)->getName()
00248                                   << " of peer Task "<<peer->getName() << " to existing connection." << endlog();
00249                 }
00250                 else {
00251                     log(Error)<< "Failed to connect Port " << (*it)->getName()
00252                                   << " of peer Task "<<peer->getName() << " to existing connection." << endlog();
00253                     failure = true;
00254                 }
00255                 continue;
00256             }
00257 
00258             // Peer port is connected thus our port is not connected.
00259             if ( peerport->connected() ) {
00260                 if ( (*it)->connectTo( peerport ) ) {
00261                     log(Info)<< "Added Port " << (*it)->getName()
00262                                   << " to existing connection of peer Task "<<peer->getName() << "." << endlog();
00263                 }
00264                 else {
00265                     log(Error)<< "Not connecting Port " << (*it)->getName()
00266                                   << " to existing connection of peer Task "<<peer->getName() << "." << endlog();
00267                     failure = true;
00268                 }
00269                 continue;
00270             }
00271 
00272             // Last resort: both not connected: create new connection.
00273             if ( !(*it)->connectTo( peerport ) ) {
00274                 // real error msg will be produced by factory itself.
00275                 log(Warning)<< "Failed to connect Port " << (*it)->getName() << " of " << getName() << " to peer Task "<<peer->getName() <<"." << endlog();
00276                 failure = true;
00277             } else {
00278                 // all went fine, add peerport as well, will always succeed:
00279                 log(Info)<< "Connected Port " << (*it)->getName() << " to peer Task "<<peer->getName() <<"." << endlog();
00280             }
00281         }
00282         return !failure;
00283     }
00284 
00285     const std::string& TaskContext::getName() const
00286     {
00287         return TaskCore::getName();
00288     }
00289 
00290     const std::string& TaskContext::getDescription() const
00291     {
00292         return mdescription;
00293     }
00294 
00295     void TaskContext::setDescription(const std::string&  d)
00296     {
00297         mdescription = d;
00298     }
00299 
00300     void TaskContext::addUser( TaskContext* peer )
00301     {
00302         musers.push_back(peer);
00303     }
00304 
00305     void TaskContext::removeUser( TaskContext* peer )
00306     {
00307         Users::iterator it = find(musers.begin(), musers.end(), peer);
00308         if ( it != musers.end() )
00309             musers.erase(it);
00310     }
00311 
00312         bool TaskContext::addPeer( TaskContext* peer, std::string alias )
00313         {
00314             if ( alias.empty() )
00315                 alias = peer->getName();
00316             if ( _task_map.count( alias ) != 0 )
00317                 return false;
00318             _task_map[ alias ] = peer;
00319             peer->addUser( this );
00320             return true;
00321         }
00322 
00323         void TaskContext::removePeer( const std::string& name )
00324         {
00325             PeerMap::iterator it = _task_map.find( name );
00326             if ( _task_map.end() != it ) {
00327                 it->second->removeUser( this );
00328                 _task_map.erase( _task_map.find( name ) );
00329             }
00330         }
00331 
00332         void TaskContext::removePeer( TaskContext* peer )
00333         {
00334             for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
00335                 if ( it->second == peer ) {
00336                     peer->removeUser( this );
00337                     _task_map.erase( it );
00338                     return;
00339                 }
00340         }
00341 
00342         bool TaskContext::connectPeers( TaskContext* peer )
00343         {
00344             if ( _task_map.count( peer->getName() ) != 0
00345                  || peer->hasPeer( mtask_name ) )
00346                 return false;
00347             this->addPeer ( peer );
00348             peer->addPeer ( this );
00349             return true;
00350         }
00351 
00352     void TaskContext::reconnect()
00353     {
00354         Logger::In in( this->getName().c_str()  );
00355         log(Info) << "Starting reconnection..."<<endlog();
00356         // first disconnect all our ports
00357         DataFlowInterface::Ports myports = this->ports()->getPorts();
00358         for (DataFlowInterface::Ports::iterator it = myports.begin();
00359              it != myports.end();
00360              ++it) {
00361             (*it)->disconnect();
00362         }
00363         // reconnect again to our peers and ask our 'users' to reconnect as well.
00364         for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
00365             this->connectPorts(it->second);
00366 
00367         log(Info) << "Reconnection done."<<endlog();
00368     }
00369 
00370     void TaskContext::disconnect() {
00371         Logger::In in( this->getName().c_str()  );
00372         // disconnect all our ports
00373         DataFlowInterface::Ports myports = this->ports()->getPorts();
00374         for (DataFlowInterface::Ports::iterator it = myports.begin();
00375              it != myports.end();
00376              ++it) {
00377             (*it)->disconnect();
00378         }
00379 
00380         // remove from all users.
00381         while( !musers.empty() ) {
00382             musers.front()->removePeer(this);
00383         }
00384 
00385         while ( !_task_map.empty() ) {
00386             _task_map.begin()->second->removeUser(this);
00387             _task_map.erase( _task_map.begin() );
00388         }
00389     }
00390 
00391         void TaskContext::disconnectPeers( const std::string& name )
00392         {
00393             if ( _task_map.end() != _task_map.find( name ) ) {
00394                 TaskContext* peer = _task_map.find(name)->second;
00395                 this->removePeer(peer);
00396                 peer->removePeer(this);
00397             }
00398         }
00399 
00400         std::vector<std::string> TaskContext::getPeerList() const
00401         {
00402             std::vector<std::string> res;
00403             std::transform(_task_map.begin(), _task_map.end(),
00404                            std::back_inserter( res ),
00405                            select1st<PeerMap::value_type>() );
00406             return res;
00407         }
00408 
00409         bool TaskContext::hasPeer( const std::string& peer_name ) const
00410         {
00411             return _task_map.count( peer_name ) == 1;
00412         }
00413 
00414         TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
00415         {
00416             if (this->hasPeer( peer_name ) )
00417                 return _task_map.find(peer_name)->second;
00418             return 0;
00419         }
00420 
00421     bool TaskContext::addObject( OperationInterface *obj ) {
00422         if (OperationInterface::addObject(obj) == false)
00423             return false;
00424         obj->setEngine( this->engine() );
00425         return true;
00426     }
00427 
00428     void TaskContext::clear()
00429     {
00430         this->removeObject("this");
00431         this->properties()->clear();
00432         this->ports()->clear();
00433 
00434         OperationInterface::clear();
00435     }
00436 
00437     bool connectPorts(TaskContext* A, TaskContext* B) { 
00438         return A->connectPorts(B);
00439     }
00440 
00441     bool connectPeers(TaskContext* A, TaskContext* B) {
00442         return A->connectPeers(B);
00443     }
00444     
00445 }
00446 

Generated on Tue Mar 25 17:41:52 2008 for OrocosReal-TimeToolkit by  doxygen 1.5.3