00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "TaskContext.hpp"
00041 #include <CommandInterface.hpp>
00042
00043 #include <string>
00044 #include <algorithm>
00045 #include <functional>
00046 #include <boost/bind.hpp>
00047 #include <boost/mem_fn.hpp>
00048
00049 #include "DataSource.hpp"
00050 #include "Method.hpp"
00051 #include "ConnectionInterface.hpp"
00052
00053 #include "rtt-config.h"
00054 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00055 #include "scripting/ParserScriptingAccess.hpp"
00056 #include "scripting/ParserExecutionAccess.hpp"
00057 #endif
00058 #include "MarshallingAccess.hpp"
00059
00060 namespace RTT
00061 {
00062
00063 using namespace boost;
00064 using namespace std;
00065
00066 TaskContext::TaskContext(const std::string& name, TaskState initial_state )
00067 : TaskCore(name, initial_state)
00068 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00069 ,mscriptAcc(new ParserScriptingAccess(this))
00070 #else
00071 ,mscriptAcc(new ScriptingAccess(this))
00072 #endif
00073 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00074 ,mengAcc( new ParserExecutionAccess(this ) )
00075 #else
00076 ,mengAcc( new ExecutionAccess(this ) )
00077 #endif
00078 ,marshAcc( new MarshallingAccess(this) )
00079 ,dataPorts(this)
00080 {
00081 this->setup();
00082 }
00083
00084 TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state )
00085 : TaskCore(name, parent, initial_state)
00086 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00087 ,mscriptAcc(new ParserScriptingAccess(this))
00088 #else
00089 ,mscriptAcc(new ScriptingAccess(this))
00090 #endif
00091 #if !defined(ORO_EMBEDDED) && defined(OROPKG_EXECUTION_PROGRAM_PARSER)
00092 ,mengAcc( new ParserExecutionAccess(this ) )
00093 #else
00094 ,mengAcc( new ExecutionAccess(this ) )
00095 #endif
00096 ,marshAcc( new MarshallingAccess(this) )
00097 ,dataPorts(this)
00098 {
00099 this->setup();
00100 }
00101
00102 void TaskContext::setup()
00103 {
00104
00105 this->mevents.setEventProcessor( ee->events() );
00106
00107 mdescription = "The interface of this TaskContext.";
00108
00109 this->methods()
00110 ->addMethod( method("configure",&TaskContext::configure, this),
00111 "Configure this TaskContext (read properties etc)." );
00112 this->methods()
00113 ->addMethod( method("isConfigured",&TaskContext::isConfigured, this),
00114 "Is this TaskContext configured ?" );
00115 this->methods()
00116 ->addMethod( method("start",&TaskContext::start, this),
00117 "Start the Execution Engine of this TaskContext (= activate + updateHook() )." );
00118 this->methods()
00119 ->addMethod( method("activate",&TaskContext::activate, this),
00120 "Activate the Execution Engine of this TaskContext (= events and commands)." );
00121 this->methods()
00122 ->addMethod( method("stop",&TaskContext::stop, this),
00123 "Stop the Execution Engine of this TaskContext." );
00124 this->methods()
00125 ->addMethod( method("isRunning",&TaskContext::isRunning, this),
00126 "Is the Execution Engine executing this TaskContext ?" );
00127 this->methods()
00128 ->addMethod( method("getPeriod",&TaskContext::getPeriod, this),
00129 "Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period." );
00130 this->methods()
00131 ->addMethod( method("isActive",&TaskContext::isActive, this),
00132 "Is the Execution Engine of this TaskContext processing events and commands ?" );
00133 this->methods()
00134 ->addMethod( method("inFatalError",&TaskContext::inFatalError, this),
00135 "Check if this TaskContext is in the FatalError state." );
00136 this->methods()
00137 ->addMethod( method("warning",&TaskContext::warning, this),
00138 "Enter the RunTimeWarning state." );
00139 this->methods()
00140 ->addMethod( method("inRunTimeWarning",&TaskContext::inRunTimeWarning, this),
00141 "Check if this TaskContext is in the RunTimeWarning state." );
00142 this->methods()
00143 ->addMethod( method("getWarningCount",&TaskContext::getWarningCount, this),
00144 "Check if the number of times the RunTimeWarning state has been entered." );
00145 this->methods()
00146 ->addMethod( method("error",&TaskContext::error, this),
00147 "Enter the RunTimeError state." );
00148 this->methods()
00149 ->addMethod( method("inRunTimeError",&TaskContext::inRunTimeError, this),
00150 "Check if this TaskContext is in the RunTimeError state." );
00151 this->methods()
00152 ->addMethod( method("getErrorCount",&TaskContext::getErrorCount, this),
00153 "Check if the number of times the RunTimeError state has been entered." );
00154 this->methods()
00155 ->addMethod( method("cleanup",&TaskContext::cleanup, this),
00156 "Reset this TaskContext to the PreOperational state (write properties etc)." );
00157 this->methods()
00158 ->addMethod( method("resetError",&TaskContext::resetError, this),
00159 "Reset this TaskContext from the FatalError state." );
00160 this->methods()
00161 ->addMethod( method("update",&TaskContext::doUpdate, this),
00162 "Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task." );
00163
00164 this->methods()
00165 ->addMethod( method("trigger",&TaskContext::doTrigger, this),
00166 "Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task." );
00167 }
00168
00169 TaskContext::~TaskContext()
00170 {
00171
00172
00173
00174
00175
00176 mattributes.clear();
00177
00178 delete mscriptAcc;
00179 delete mengAcc;
00180 delete marshAcc;
00181
00182
00183 while( !musers.empty() ) {
00184 musers.front()->removePeer(this);
00185 }
00186
00187
00188 while ( !_task_map.empty() ) {
00189 _task_map.begin()->second->removeUser(this);
00190 _task_map.erase( _task_map.begin() );
00191 }
00192
00193
00194
00195 }
00196
00197 void TaskContext::exportPorts()
00198 {
00199 DataFlowInterface::Ports myports = this->ports()->getPorts();
00200 for (DataFlowInterface::Ports::iterator it = myports.begin();
00201 it != myports.end();
00202 ++it) {
00203
00204 if (this->getObject((*it)->getName()) == 0) {
00205 OperationInterface* ms = this->ports()->createPortObject((*it)->getName());
00206 if ( ms )
00207 if ( this->addObject( ms ) == false )
00208 delete ms;
00209 }
00210 }
00211 }
00212
00213 bool TaskContext::connectPorts( TaskContext* peer )
00214 {
00215 bool failure = false;
00216 const std::string& location = this->getName();
00217 Logger::In in( location.c_str() );
00218
00219
00220 DataFlowInterface::Ports myports = this->ports()->getPorts();
00221 for (DataFlowInterface::Ports::iterator it = myports.begin();
00222 it != myports.end();
00223 ++it) {
00224
00225 PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
00226 if ( !peerport ) {
00227 log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
00228 continue;
00229 }
00230
00231
00232 if ( peerport->connected() && (*it)->connected() ) {
00233 if (peerport->connection() == (*it)->connection() )
00234 continue;
00235 log(Warning) << "Ports '"<< peerport->getName() << "' of task " <<peer->getName() << " and task " << getName()
00236 << " have the same name but are not connected to each other." << endlog();
00237 }
00238
00239
00240
00241
00242
00243
00244 if ( (*it)->connected() ) {
00245
00246 if ( peerport->connectTo( *it ) ) {
00247 log(Info)<< "Connected Port " << (*it)->getName()
00248 << " of peer Task "<<peer->getName() << " to existing connection." << endlog();
00249 }
00250 else {
00251 log(Error)<< "Failed to connect Port " << (*it)->getName()
00252 << " of peer Task "<<peer->getName() << " to existing connection." << endlog();
00253 failure = true;
00254 }
00255 continue;
00256 }
00257
00258
00259 if ( peerport->connected() ) {
00260 if ( (*it)->connectTo( peerport ) ) {
00261 log(Info)<< "Added Port " << (*it)->getName()
00262 << " to existing connection of peer Task "<<peer->getName() << "." << endlog();
00263 }
00264 else {
00265 log(Error)<< "Not connecting Port " << (*it)->getName()
00266 << " to existing connection of peer Task "<<peer->getName() << "." << endlog();
00267 failure = true;
00268 }
00269 continue;
00270 }
00271
00272
00273 if ( !(*it)->connectTo( peerport ) ) {
00274
00275 log(Warning)<< "Failed to connect Port " << (*it)->getName() << " of " << getName() << " to peer Task "<<peer->getName() <<"." << endlog();
00276 failure = true;
00277 } else {
00278
00279 log(Info)<< "Connected Port " << (*it)->getName() << " to peer Task "<<peer->getName() <<"." << endlog();
00280 }
00281 }
00282 return !failure;
00283 }
00284
00285 const std::string& TaskContext::getName() const
00286 {
00287 return TaskCore::getName();
00288 }
00289
00290 const std::string& TaskContext::getDescription() const
00291 {
00292 return mdescription;
00293 }
00294
00295 void TaskContext::setDescription(const std::string& d)
00296 {
00297 mdescription = d;
00298 }
00299
00300 void TaskContext::addUser( TaskContext* peer )
00301 {
00302 musers.push_back(peer);
00303 }
00304
00305 void TaskContext::removeUser( TaskContext* peer )
00306 {
00307 Users::iterator it = find(musers.begin(), musers.end(), peer);
00308 if ( it != musers.end() )
00309 musers.erase(it);
00310 }
00311
00312 bool TaskContext::addPeer( TaskContext* peer, std::string alias )
00313 {
00314 if ( alias.empty() )
00315 alias = peer->getName();
00316 if ( _task_map.count( alias ) != 0 )
00317 return false;
00318 _task_map[ alias ] = peer;
00319 peer->addUser( this );
00320 return true;
00321 }
00322
00323 void TaskContext::removePeer( const std::string& name )
00324 {
00325 PeerMap::iterator it = _task_map.find( name );
00326 if ( _task_map.end() != it ) {
00327 it->second->removeUser( this );
00328 _task_map.erase( _task_map.find( name ) );
00329 }
00330 }
00331
00332 void TaskContext::removePeer( TaskContext* peer )
00333 {
00334 for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
00335 if ( it->second == peer ) {
00336 peer->removeUser( this );
00337 _task_map.erase( it );
00338 return;
00339 }
00340 }
00341
00342 bool TaskContext::connectPeers( TaskContext* peer )
00343 {
00344 if ( _task_map.count( peer->getName() ) != 0
00345 || peer->hasPeer( mtask_name ) )
00346 return false;
00347 this->addPeer ( peer );
00348 peer->addPeer ( this );
00349 return true;
00350 }
00351
00352 void TaskContext::reconnect()
00353 {
00354 Logger::In in( this->getName().c_str() );
00355 log(Info) << "Starting reconnection..."<<endlog();
00356
00357 DataFlowInterface::Ports myports = this->ports()->getPorts();
00358 for (DataFlowInterface::Ports::iterator it = myports.begin();
00359 it != myports.end();
00360 ++it) {
00361 (*it)->disconnect();
00362 }
00363
00364 for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
00365 this->connectPorts(it->second);
00366
00367 log(Info) << "Reconnection done."<<endlog();
00368 }
00369
00370 void TaskContext::disconnect() {
00371 Logger::In in( this->getName().c_str() );
00372
00373 DataFlowInterface::Ports myports = this->ports()->getPorts();
00374 for (DataFlowInterface::Ports::iterator it = myports.begin();
00375 it != myports.end();
00376 ++it) {
00377 (*it)->disconnect();
00378 }
00379
00380
00381 while( !musers.empty() ) {
00382 musers.front()->removePeer(this);
00383 }
00384
00385 while ( !_task_map.empty() ) {
00386 _task_map.begin()->second->removeUser(this);
00387 _task_map.erase( _task_map.begin() );
00388 }
00389 }
00390
00391 void TaskContext::disconnectPeers( const std::string& name )
00392 {
00393 if ( _task_map.end() != _task_map.find( name ) ) {
00394 TaskContext* peer = _task_map.find(name)->second;
00395 this->removePeer(peer);
00396 peer->removePeer(this);
00397 }
00398 }
00399
00400 std::vector<std::string> TaskContext::getPeerList() const
00401 {
00402 std::vector<std::string> res;
00403 std::transform(_task_map.begin(), _task_map.end(),
00404 std::back_inserter( res ),
00405 select1st<PeerMap::value_type>() );
00406 return res;
00407 }
00408
00409 bool TaskContext::hasPeer( const std::string& peer_name ) const
00410 {
00411 return _task_map.count( peer_name ) == 1;
00412 }
00413
00414 TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
00415 {
00416 if (this->hasPeer( peer_name ) )
00417 return _task_map.find(peer_name)->second;
00418 return 0;
00419 }
00420
00421 bool TaskContext::addObject( OperationInterface *obj ) {
00422 if (OperationInterface::addObject(obj) == false)
00423 return false;
00424 obj->setEngine( this->engine() );
00425 return true;
00426 }
00427
00428 void TaskContext::clear()
00429 {
00430 this->removeObject("this");
00431 this->properties()->clear();
00432 this->ports()->clear();
00433
00434 OperationInterface::clear();
00435 }
00436
00437 bool connectPorts(TaskContext* A, TaskContext* B) {
00438 return A->connectPorts(B);
00439 }
00440
00441 bool connectPeers(TaskContext* A, TaskContext* B) {
00442 return A->connectPeers(B);
00443 }
00444
00445 }
00446