ManagerNode.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5  *
6  * ArmarX is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 as
8  * published by the Free Software Foundation.
9  *
10  * ArmarX is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  * @package RobotComponents
19  * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20  * @date 2015
21  * @copyright http://www.gnu.org/licenses/gpl.txt
22  * GNU General Public License
23  */
24 
25 #include <numeric>
26 #include <algorithm>
27 
29 #include "../../util/Samplers.h"
30 
31 #include "WorkerNode.h"
32 #include "ManagerNode.h"
33 #include "util.h"
34 
36 {
37  //from managedIceObject
39  {
40  setTag("ManagerNode");
41  ARMARX_VERBOSE_S << "start init";
42  //set other variables
43  killRequest = false;
44 
46  workersPerRemoteObjectNode.resize(remoteObjectNodes.size(), 0);
47 
49  maxWorkersPerRemoteObjectNode.reserve(remoteObjectNodes.size());
51  remoteObjectNodes.begin(), remoteObjectNodes.end(),
52  std::back_inserter(maxWorkersPerRemoteObjectNode),
53  [](const RemoteObjectNodeInterfacePrx & prx)
54  {
55  const auto count = static_cast<std::size_t>(prx->getNumberOfCores());
56  ARMARX_VERBOSE_S << "ROI " << prx.get() << " has " << count << " cores";
57  return count;
58  }
59  );
60  //there cant be more worker than remote objects
61  const Ice::Long maxROCount = std::accumulate(maxWorkersPerRemoteObjectNode.begin(), maxWorkersPerRemoteObjectNode.end(), 0);
62  maximalWorkerCount = std::min(maximalWorkerCount, maxROCount);
63  initialWorkerCount = std::min(initialWorkerCount, maxROCount);
65 
66  //init tree with start node
67  tree.init(
68  FullIceTree
69  {
70  FullNodeDataListList{
71  FullNodeDataList{
72  FullNodeData{
73  startNode, //config
74  NodeId{0, 0}, //parent
75  std::numeric_limits<Ice::Float>::infinity(), //radius;
76  0.f //fromParentCost
77  }
78  }
79  },
80  Ice::LongSeq{ -1}
81  },
82  addParams,
83  maximalWorkerCount //since this tree does no updates the id is not important
84  );
85  tree.increaseWorkerCountTo(maximalWorkerCount);
86 
87  //init prefix
88  updateTopicPrefix = getName() + ':';
89 
90  //register for updates
91  usingTopic(updateTopicPrefix + ADDIRRTSTAR_TREE_UPDATE_TOPIC_NAME);
92 
93  ARMARX_VERBOSE_S << "done init";
94  }
95 
97  {
98  ARMARX_VERBOSE_S << "armarx::addirrtstar::ManagerNode::onConnectComponent()";
99  //start manager thread
100  managerThread = std::thread {[this]{this->managerTask();}};
101  }
102 
103 
105  {
106  ARMARX_VERBOSE_S << "start onExitComponent()";
107  killRequest = true;
108  managerThread.join();
109  ARMARX_VERBOSE_S << "done onExitComponent()";
110  }
111 
112  //from ManagerNodeBase
113  PathWithCost ManagerNode::getBestPath(const Ice::Current&) const
114  {
115  std::lock_guard<std::mutex> lock {treeMutex};
116  auto&& path = tree.getBestPath();
117  path.nodes.emplace_back(goalNode);
118  return path;
119  }
120  Ice::Long ManagerNode::getPathCount(const Ice::Current&) const
121  {
122  std::lock_guard<std::mutex> lock {treeMutex};
123  return tree.getPathCount();
124  }
125  PathWithCost ManagerNode::getNthPathWithCost(Ice::Long n, const Ice::Current&) const
126  {
127  std::lock_guard<std::mutex> lock {treeMutex};
128  auto&& path = tree.getNthPathWithCost(static_cast<std::size_t>(n));
129  path.nodes.emplace_back(goalNode);
130  return path;
131  }
132 
133  PathWithCostSeq ManagerNode::getAllPathsWithCost(const Ice::Current&) const
134  {
135  std::lock_guard<std::mutex> lock {treeMutex};
136  const auto&& pathCount = tree.getPathCount();
137  PathWithCostSeq paths {pathCount};
138 
139  for (std::size_t i = 0; i < pathCount; ++i)
140  {
141  paths.at(i) = tree.getNthPathWithCost(i);
142  paths.at(i).nodes.emplace_back(goalNode);
143  }
144 
145  return paths;
146  }
147 
148  Update ManagerNode::getUpdate(Ice::Long workerId, Ice::Long updateId, const Ice::Current&) const
149  {
150  ARMARX_WARNING_S << "worker requested remote update w/u " << workerId << "/" << updateId;
151  std::lock_guard<std::mutex> lock {updateMutex};
152  ARMARX_CHECK_EXPRESSION(workerId >= 0);
153  ARMARX_CHECK_EXPRESSION(updateId >= 0);
154 
155  const auto uWorkerId = static_cast<std::size_t>(workerId);
156  const auto uUpdateId = static_cast<std::size_t>(updateId);
158 
159  if (hasLocalUpdateRequiresUpdateMutex(uWorkerId, uUpdateId))
160  {
161  return getLocalUpdateRequiresUpdateMutex(uWorkerId, uUpdateId);
162  }
163 
164  return getRemoteUpdate(uWorkerId, uUpdateId);
165  }
166 
167  FullIceTree ManagerNode::getTree(const Ice::Current&) const
168  {
169  //since we a copying the tree and all update ids we need both mutexes
170  std::lock(treeMutex, updateMutex);
171  std::lock_guard<std::mutex> treeLock(treeMutex, std::adopt_lock);
172  std::lock_guard<std::mutex> updateLock(updateMutex, std::adopt_lock);
173  return tree.getIceTree();
174  }
175 
177  {
178  try
179  {
180  ARMARX_CHECK_EXPRESSION(planningComputingPowerRequestStrategy);
181  //since post unmarshall is of this is called BEFORE post unmarshall of the member cspace is called this assert has to be done here!
182  const auto dim = static_cast<std::size_t>(cspace->getDimensionality());
183  ARMARX_CHECK_EXPRESSION(startNode.size() == dim);
184  ARMARX_CHECK_EXPRESSION(cspace->isValidConfiguration(std::make_pair(&startNode.front(), &startNode.back() + 1)));
185  ARMARX_CHECK_EXPRESSION(goalNode.size() == dim);
186  ARMARX_CHECK_EXPRESSION(cspace->isValidConfiguration(std::make_pair(&goalNode.front(), &goalNode.back() + 1)));
187 
188  //calculate rotation matrix
189  {
190  UniformProlateSpheroidDistribution<float> dist(startNode.begin(), startNode.end(), goalNode.begin());
192  }
193 
194  //after starting the requred proxies are set.
195  getObjectScheduler()->waitForObjectStateMinimum(eManagedIceObjectStarted);
196 
197  ARMARX_IMPORTANT << "Planning Task starts planning";
198  //start initial worker
199  for (Ice::Long i = 0; i < initialWorkerCount; ++i)
200  {
201  createNewWorker();
202  }
203 
204  timepointStart = ClockType::now();
205  TaskStatus::Status status = TaskStatus::ePlanning;
206  planningComputingPowerRequestStrategy->updateTaskStatus(TaskStatus::ePlanning);
207 
208  std::mutex mutexWait;
209  std::unique_lock<std::mutex> waitingLock {mutexWait};
210 
211 
212  planningComputingPowerRequestStrategy->setCurrentStateAsInitialState();
213 
214  while (true)
215  {
216  managerEvent.wait_for(waitingLock, std::chrono::milliseconds {100});
217 
218  if (killRequest)
219  {
220  //update correct status
222  ARMARX_IMPORTANT << "Planning Task received killRequest";
223  break;
224  }
225 
226  if (hasTimeRunOut())
227  {
229  ARMARX_IMPORTANT << "Planning Task encountered timeout (" << maximalPlanningTimeInSeconds << " seconds)";
230  break;
231  }
232 
233  if (isPlanningDone())
234  {
236  ARMARX_IMPORTANT << "Planning Task is finished";
237  break;
238  }
239 
240  {
241  std::lock(treeMutex, updateMutex);
242  std::lock_guard<std::mutex> treeLock(treeMutex, std::adopt_lock);
243  std::unique_lock<std::mutex> updateLock(updateMutex, std::adopt_lock);
244  //apply all updates.
245  //this touches the updates and tree => both mutexes are required.
246  //when fetching a remote update the tree is in a consistent state and the updates are not used. => both mutexes can be released
247  //since there is no lock guard for this only updateMutex is unlocked.
248 
249  applyUpdatesNotThreadSafe(updateLock);
250 
251  if ((status == TaskStatus::ePlanning) && tree.getPathCount())
252  {
253  status = TaskStatus::eRefining;
254  task->setTaskStatus(status);
255  ARMARX_IMPORTANT << "Planning Task start refining the found solution";
256  planningComputingPowerRequestStrategy->updateTaskStatus(TaskStatus::eRefining);
257  }
258 
259  //update planningComputingPowerRequestStrategy
260  planningComputingPowerRequestStrategy->updateNodeCount(tree.size());
261  }
262 
263  // pause worker processes if maximalWorkerCount was reduced
264  while (getActiveWorkerCount() > static_cast<std::size_t>(maximalWorkerCount) && getActiveWorkerCount() > 1)
265  {
266  workers.at(activeWorkerCount - 1)->pauseWorker(true);
268  }
269  // unpause/start new worker, if maximalWorkerCount is greater then the number of active workers and new resources are required
270  if ((getActiveWorkerCount() < static_cast<std::size_t>(maximalWorkerCount)) && planningComputingPowerRequestStrategy->shouldAllocateComputingPower())
271  {
272  if (activeWorkerCount >= workers.size())
273  {
274  createNewWorker();
275  }
276  else
277  {
278  workers.at(activeWorkerCount - 1)->pauseWorker(false);
280  }
281  }
282  }
283 
284  ARMARX_VERBOSE_S << "finished main loop managerTask()";
285 
286  //clean up everyting
288 
289  //write back data
290  ARMARX_VERBOSE_S << "storing paths in task";
291  task->setPaths(getAllPathsWithCost());
292 
293  //the status may have changed! (if aborted / failed it may be now RefinementAborted)
294  if ((status == TaskStatus::ePlanningAborted || status == TaskStatus::ePlanningFailed) && tree.getPathCount())
295  {
296  //first set the task status
297  task->setTaskStatus(TaskStatus::eRefining);
298  status = TaskStatus::eRefinementAborted;
299  }
300 
301  ARMARX_VERBOSE_S << "setting task status to " << status;
302  //update status
303  task->setTaskStatus(status);
304  ARMARX_VERBOSE_S << "exit managerTask";
305  }
306 
307 #define common_exception_output "EXCEPTION!\n"\
308  << "\n\ttask name: " << task->getTaskName()\
309  << "\n\tice id = " << task->ice_id()\
310  << "\n\told status " << TaskStatus::toString(task->getTaskStatus())
311  catch (Ice::Exception& e)
312  {
314  << "\n\tWHAT:\n" << e.what()
315  << "\n\tSTACK:\n" << e.ice_stackTrace();
316  task->setTaskStatus(TaskStatus::eException);
317  }
318  catch (std::exception& e)
319  {
321  << "\n\tWHAT:\n" << e.what();
322  task->setTaskStatus(TaskStatus::eException);
323  }
324  catch (...)
325  {
327  << "\n\tsomething not derived from std::exception was thrown";
328  task->setTaskStatus(TaskStatus::eException);
329  }
330 #undef common_exception_output
331  }
332 
333 
334 
335  void ManagerNode::createNewWorkerOn(std::size_t remoteObjectNodeIndex)
336  {
337  std::lock_guard<std::mutex> lock {workerMutex};
338  ARMARX_VERBOSE_S << "creating worker on " << remoteObjectNodeIndex;
339 
340  ARMARX_CHECK_EXPRESSION(remoteObjectNodeIndex < remoteObjectNodes.size());
341  RemoteObjectNodeInterfacePrx& ron = remoteObjectNodes.at(remoteObjectNodeIndex);
342  //prepare worker
343  const Ice::Long newWorkerId = getWorkerCount();
344 
346  ManagerNodeBasePrx selfProxy = ManagerNodeBasePrx::uncheckedCast(getProxy());
347  ARMARX_CHECK_EXPRESSION(selfProxy);
348 
349  WorkerNodePtr newWorker {new WorkerNode{
350  CSpaceBasePtr::dynamicCast(cspace->clone()),
351  startNode, goalNode, dcdStep, addParams,
352  selfProxy,
353  newWorkerId,
354  batchSize,
355  nodeCountDeltaForGoalConnectionTries,
358  }
359  };
360  std::stringstream newWorkerName;
361  newWorkerName << newWorker->getName() << ":" << newWorkerId << "@[" << getName() << "]";
362  //init local structures
363  appliedUpdates.emplace_back();
364  workers.emplace_back(ron->registerRemoteHandledObject(newWorkerName.str(), newWorker));
365  //increment count
367  ++workersPerRemoteObjectNode.at(remoteObjectNodeIndex);
368 
369  ARMARX_VERBOSE_S << "\n created worker"
370  << "\n\t RON index: " << remoteObjectNodeIndex
371  << "\n\t #worker: " << getWorkerCount();
372  planningComputingPowerRequestStrategy->allocatedComputingPower();
373  }
374 
376  {
377  if (getWorkerCount() >= static_cast<std::size_t>(maximalWorkerCount))
378  {
379  return;
380  }
381 
382  //select where to create a new worker
383  ARMARX_CHECK_EXPRESSION(remoteObjectNodes.size() == workersPerRemoteObjectNode.size());
384  ARMARX_CHECK_EXPRESSION(remoteObjectNodes.size() == maxWorkersPerRemoteObjectNode.size());
385  std::size_t ronIndex = 0;
386 
387  for (; (ronIndex < remoteObjectNodes.size()) &&
388  !(workersPerRemoteObjectNode.at(ronIndex) < maxWorkersPerRemoteObjectNode.at(ronIndex)); ++ronIndex);//no body
389 
390  ARMARX_CHECK_EXPRESSION(ronIndex < remoteObjectNodes.size());
391  createNewWorkerOn(ronIndex);
392  }
393 
394  //not threadsafe! use only when holding updateMutex
395  Update ManagerNode::getRemoteUpdate(std::size_t workerId, std::size_t updateId) const
396  {
397  std::lock_guard<std::mutex> lock {workerMutex};
398  ARMARX_WARNING_S << "manager requested remote update w/u " << workerId << "/" << updateId;
400  return workers.at(workerId)->getUpdate(updateId);
401  }
402  //not threadsafe! use only when holding updateMutex
403  bool ManagerNode::hasLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
404  {
406  return (updateId < appliedUpdates.at(workerId).size()) || (tree.hasPendingUpdate(workerId, updateId));
407  }
408  //not threadsafe! use only when holding updateMutex
409  const Update& ManagerNode::getLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
410  {
412 
413  if (updateId < appliedUpdates.at(workerId).size())
414  {
415  return appliedUpdates.at(workerId).at(updateId);
416  }
417 
418  //we have the update. so it has to be pending
419  return tree.getPendingUpdate(workerId, updateId);
420  }
421 
422  void ManagerNode::setWorkersFinalUpdateId(Ice::Long workerId, Ice::Long finalUpdateId, const Ice::Current&)
423  {
424  {
425  std::lock_guard<std::mutex> lock {updateMutex};
426  ARMARX_CHECK_EXPRESSION(workerId >= 0);
427  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(workerId) < workersFinalUpdateId.size());
428  ARMARX_CHECK_EXPRESSION(workersFinalUpdateId.at(workerId) == -1);
429  workersFinalUpdateId.at(workerId) = finalUpdateId;
430  }
431  ARMARX_VERBOSE_S << "final update id for worker " << workerId << " is " << finalUpdateId;
432  managerEvent.notify_all();
433  }
434 
435  void ManagerNode::updateTree(const Update& u, const Ice::Current&)
436  {
437  std::lock_guard<std::mutex> lock {updateMutex};
439  managerEvent.notify_all();
440  }
441 
443  {
444  workersFinalUpdateId.resize(getWorkerCount(), -1);
445  //kill all
446  {
447  std::lock_guard<std::mutex> lock {workerMutex};
448 
449  for (auto& worker : workers)
450  {
451  worker->killWorker();
452  }
453  }
454 
455  {
456  std::lock(treeMutex, updateMutex);
457  std::unique_lock<std::mutex> treeLock(treeMutex, std::adopt_lock);
458  std::unique_lock<std::mutex> updateLock(updateMutex, std::adopt_lock);
459 
460  //sync for update counts
461  do
462  {
463  managerEvent.wait_for(updateLock, std::chrono::milliseconds {100});
464  }
465  while (std::any_of(
467  [](const Ice::Long & l)
468  {
469  return l == -1;
470  }
471  ));
472 
473  //get all updates
475  [this](std::size_t workerId, Ice::Long updateId)//remote update getter
476  {
478  return getRemoteUpdate(workerId, updateId);
479  },
480  [this](Update && u) //update consumer
481  {
483  }
484  );
485 
487 
488  for (std::size_t i = 0; i < workersFinalUpdateId.size(); ++i)
489  {
490  const auto workerAppliedUpdateCount = appliedUpdates.at(i).size();
491  const auto workerUpdateCount = static_cast<std::size_t>(workersFinalUpdateId.at(i) + 1);
492  const auto workersLastUpdateId = tree.getAppliedUpdateIds().at(i);
493  ARMARX_CHECK_EXPRESSION(workersLastUpdateId == static_cast<Ice::Long>(workerAppliedUpdateCount) - 1);
494  ARMARX_CHECK_EXPRESSION(workerUpdateCount == workerAppliedUpdateCount);
495  }
496  }
497 
498  //now all required data is stored here
499  //destroy worker
500  std::lock_guard<std::mutex> lock {workerMutex};
501  workers.clear();
502  }
503 
504  void ManagerNode::applyUpdatesNotThreadSafe(std::unique_lock<std::mutex>& updateLock)
505  {
506  ARMARX_CHECK_EXPRESSION(updateLock);
507  tree.applyPendingUpdates(updateLock,
508  [this](std::size_t workerId, Ice::Long updateId)//remote update getter
509  {
511  return getRemoteUpdate(workerId, updateId);
512  },
513  [this](Update && u) //update consumer
514  {
515  planningComputingPowerRequestStrategy->updateNodeCreations(u.nodes.size(), batchSize);
516 
518  }
519  );
520  ARMARX_CHECK_EXPRESSION(updateLock);
521  }
522 
524  {
525  //correct worker id
526  const auto workerId = getUpdatesWorkerId(u);
527  ARMARX_CHECK_EXPRESSION(workerId < appliedUpdates.size());
528  //assert the last update from the same worker is the worker's update prior to this update
529  ARMARX_CHECK_EXPRESSION(tree.hasAppliedUpdate(workerId, getUpdatesPreviousUpdateSubId(u)));
530  //the update was applied to the tree
531  ARMARX_CHECK_EXPRESSION(tree.hasAppliedUpdate(workerId, getUpdatesSubId(u)));
532 
533  // ARMARX_DEBUG_S << "caching update (workerId/updateID): " << workerId << " / " << getUpdatesSubId(u);
534  //add the update to the cache
535  appliedUpdates.at(workerId).emplace_back(std::move(u));
536  }
537 
538  Ice::Long ManagerNode::getNodeCount(const Ice::Current&) const
539  {
540  std::lock_guard<std::mutex> treeLock {treeMutex};
541  return static_cast<Ice::Long>(tree.size());
542  }
543 
545  {
546  std::lock_guard<std::mutex> treeLock(treeMutex, std::adopt_lock);
547  return tree.getBestCost() <= targetCost;
548  }
549 
551  {
552  std::lock_guard<std::mutex> treeLock(treeMutex, std::adopt_lock);
553  return ((ClockType::now() - timepointStart) >= std::chrono::seconds {maximalPlanningTimeInSeconds});
554  }
555 
556  void ManagerNode::setMaxCpus(Ice::Int maxCpus, const Ice::Current&)
557  {
558  maximalWorkerCount = maxCpus;
559  }
560 
561  Ice::Int ManagerNode::getMaxCpus(const Ice::Current&) const
562  {
563  return maximalWorkerCount;
564  }
565 }
566 
armarx::addirrtstar::ManagerNode::treeMutex
std::mutex treeMutex
protects the tree data
Definition: ManagerNode.h:371
armarx::addirrtstar::ManagerNode::getBestPath
PathWithCost getBestPath(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:113
armarx::addirrtstar::ManagerNode::workersPerRemoteObjectNode
std::vector< std::size_t > workersPerRemoteObjectNode
How many workers are started on each remote object node.
Definition: ManagerNode.h:339
armarx::addirrtstar::ManagerNode::onInitComponent
void onInitComponent() override
Initializes the tree and sampler.
Definition: ManagerNode.cpp:38
common_exception_output
#define common_exception_output
armarx::addirrtstar::Tree::getPendingUpdate
const Update & getPendingUpdate(std::size_t workerId, std::size_t updateId) const
Definition: Tree.h:186
armarx::addirrtstar::ManagerNode::setMaxCpus
void setMaxCpus(Ice::Int maxCpus, const Ice::Current &=Ice::emptyCurrent) override
Definition: ManagerNode.cpp:556
armarx::addirrtstar::ManagerNode::getWorkerCount
std::size_t getWorkerCount() const
Returns the number of currently available workers (both active and paused).
Definition: ManagerNode.h:313
ARMARX_IMPORTANT
#define ARMARX_IMPORTANT
Definition: Logging.h:183
WorkerNode.h
armarx::addirrtstar::Tree::size
std::size_t size() const
Definition: Tree.h:623
armarx::addirrtstar::Tree::getBestPath
PathWithCost getBestPath() const
Definition: Tree.cpp:406
armarx::addirrtstar::ManagerNode::getTree
FullIceTree getTree(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:167
armarx::addirrtstar::Tree::hasAppliedUpdate
bool hasAppliedUpdate(std::size_t workerId, Ice::Long updateId)
Definition: Tree.h:166
util.h
armarx::addirrtstar::ManagerNode::getNodeCount
Ice::Long getNodeCount(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:538
armarx::addirrtstar::Tree::init
void init(const FullIceTree &iceTree, AdaptiveDynamicDomainParameters newaddParams, std::size_t newWorkerId)
Initailizes the tree from the given tree.
Definition: Tree.cpp:65
armarx::addirrtstar::Tree::applyPendingUpdates
void applyPendingUpdates()
Applies all pending updates.
Definition: Tree.h:197
armarx::addirrtstar::ManagerNode::createNewWorkerOn
void createNewWorkerOn(std::size_t remoteObjectNodeIndex)
Creates a new worker on the given remote object node.
Definition: ManagerNode.cpp:335
armarx::addirrtstar::ManagerNode::getMaxCpus
Ice::Int getMaxCpus(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:561
armarx::TaskStatus::transitionAtKill
Status transitionAtKill(Status status)
Performs a transition from the given to the appropiate following status in case of a task kill.
Definition: PlanningUtil.cpp:122
armarx::addirrtstar::ManagerNode::updateTopicPrefix
std::string updateTopicPrefix
The update topic's prefix.
Definition: ManagerNode.h:357
armarx::TaskStatus::transitionAtOutoftime
Status transitionAtOutoftime(Status status)
Performs a transition from the given to the appropiate following status in case of the task failing.
Definition: PlanningUtil.cpp:150
armarx::addirrtstar::ManagerNode::hasLocalUpdateRequiresUpdateMutex
bool hasLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
Returns whether the given update is cached.
Definition: ManagerNode.cpp:403
armarx::addirrtstar::ManagerNode::managerEvent
std::condition_variable managerEvent
CV used by the manager thread to wait for new updates.
Definition: ManagerNode.h:375
armarx::addirrtstar::ManagerNode::rotationMatrix
Ice::FloatSeq rotationMatrix
The rotation matrix used by the informed samplers.
Definition: ManagerNode.h:384
armarx::addirrtstar::ManagerNode::managerThread
std::thread managerThread
The tread executing managerTask.
Definition: ManagerNode.h:326
armarx::UniformProlateSpheroidDistribution::getRotationMatrix
std::vector< RealType > getRotationMatrix() const
Returns the rotation matrix stored in a vector.
Definition: Samplers.h:487
armarx::addirrtstar::ManagerNode::maxWorkersPerRemoteObjectNode
std::vector< std::size_t > maxWorkersPerRemoteObjectNode
How many workers are maximal allowed on each remote object node.
Definition: ManagerNode.h:343
armarx::addirrtstar::ManagerNode::getAllPathsWithCost
PathWithCostSeq getAllPathsWithCost(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:133
IceInternal::Handle
Definition: forward_declarations.h:8
armarx::addirrtstar::ManagerNode::hasTimeRunOut
bool hasTimeRunOut()
Definition: ManagerNode.cpp:550
armarx::addirrtstar::WorkerNode
Implements the worker side of the batch distributed adaptive dynamic domain informed rrt* planner.
Definition: WorkerNode.h:73
armarx::status
status
Definition: FiniteStateMachine.h:259
armarx::addirrtstar::ManagerNode::getRemoteUpdate
Update getRemoteUpdate(std::size_t workerId, std::size_t updateId) const
Returns the requested update fetched from the corresponding worker.
Definition: ManagerNode.cpp:395
armarx::addirrtstar::Tree::getPathCount
std::size_t getPathCount() const
Definition: Tree.h:708
ManagerNode.h
armarx::TaskStatus::transitionAtDone
Status transitionAtDone(Status status)
Performs a transition from the given to the appropiate following status in case the task is done.
Definition: PlanningUtil.cpp:178
armarx::addirrtstar::ManagerNode::updateTree
void updateTree(const Update &u, const Ice::Current &=Ice::emptyCurrent) override
Adds the given update to the queue of pending updates.
Definition: ManagerNode.cpp:435
armarx::addirrtstar::ManagerNode::setWorkersFinalUpdateId
void setWorkersFinalUpdateId(Ice::Long workerId, Ice::Long finalUpdateId, const Ice::Current &) override
Used by workers to inform the manager about their number of updates before exiting.
Definition: ManagerNode.cpp:422
armarx::addirrtstar::ManagerNode::getPathCount
Ice::Long getPathCount(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:120
ARMARX_ERROR_S
#define ARMARX_ERROR_S
Definition: Logging.h:209
armarx::addirrtstar::ManagerNode::killRequest
std::atomic_bool killRequest
Flag to signal the manager thread to exit.
Definition: ManagerNode.h:322
armarx::addirrtstar::Tree::getAppliedUpdateIds
const Ice::LongSeq & getAppliedUpdateIds() const
Definition: Tree.h:758
armarx::addirrtstar::ManagerNode::updateMutex
std::mutex updateMutex
Protects the update section of the tree and the update cache of the manager.
Definition: ManagerNode.h:353
armarx::addirrtstar::ManagerNode::onExitComponent
void onExitComponent() override
Stopps planning and joins the manager thread.
Definition: ManagerNode.cpp:104
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:917
armarx::addirrtstar::Tree::hasPendingUpdate
bool hasPendingUpdate(std::size_t workerId, std::size_t updateId) const
Definition: Tree.h:176
armarx::addirrtstar::ManagerNode::managerTask
void managerTask()
The managet task.checkedCastIt checks whether new workers are required and starts them if this is the...
Definition: ManagerNode.cpp:176
armarx::addirrtstar::Tree::getIceTree
FullIceTree getIceTree() const
Definition: Tree.cpp:458
ArmarXObjectScheduler.h
armarx::addirrtstar::ManagerNode::workersFinalUpdateId
std::vector< Ice::Long > workersFinalUpdateId
Used when shutting down to ensure all updates were applied before destroying the nodes remote object.
Definition: ManagerNode.h:366
ARMARX_WARNING_S
#define ARMARX_WARNING_S
Definition: Logging.h:206
armarx::addirrtstar::ManagerNode::workers
std::vector< RemoteHandle< WorkerNodeBasePrx > > workers
Worker proxies.
Definition: ManagerNode.h:335
armarx::addirrtstar::ManagerNode::cleanupAllWorkers
void cleanupAllWorkers()
Shuts down and removes all workers.
Definition: ManagerNode.cpp:442
armarx::addirrtstar::ManagerNode::getActiveWorkerCount
std::size_t getActiveWorkerCount() const
getActiveWorkerCount returns the number of currently active workers.
Definition: ManagerNode.h:304
armarx::addirrtstar::ManagerNode::cacheAppliedUpdateRequiresUpdateMutex
void cacheAppliedUpdateRequiresUpdateMutex(Update &&u)
Stores the given applied update to the cache.
Definition: ManagerNode.cpp:523
armarx::addirrtstar::ManagerNode::appliedUpdates
std::vector< std::deque< Update > > appliedUpdates
All applied updates.
Definition: ManagerNode.h:361
armarx::ManagedIceObject::usingTopic
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Definition: ManagedIceObject.cpp:248
armarx::addirrtstar::ManagerNode::tree
Tree tree
The RRT.
Definition: ManagerNode.h:379
armarx::addirrtstar::ManagerNode::applyUpdatesNotThreadSafe
void applyUpdatesNotThreadSafe(std::unique_lock< std::mutex > &updateLock)
Applies all pending updates.
Definition: ManagerNode.cpp:504
armarx::addirrtstar::Tree::getNthPathWithCost
PathWithCost getNthPathWithCost(std::size_t n) const
Definition: Tree.cpp:413
ARMARX_CHECK_EXPRESSION
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
Definition: ExpressionException.h:73
armarx::addirrtstar::ManagerNode::getNthPathWithCost
PathWithCost getNthPathWithCost(Ice::Long n, const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:125
armarx::addirrtstar::ManagerNode::getUpdate
Update getUpdate(Ice::Long workerId, Ice::Long updateId, const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:148
armarx::addirrtstar::Tree::addPendingUpdate
void addPendingUpdate(const Update &u)
Adds a new update to the list of pending updates.
Definition: Tree.cpp:483
armarx::transform
auto transform(const Container< InputT, Alloc > &in, OutputT(*func)(InputT const &)) -> Container< OutputT, typename std::allocator_traits< Alloc >::template rebind_alloc< OutputT > >
Convenience function (with less typing) to transform a container of type InputT into the same contain...
Definition: algorithm.h:315
armarx::addirrtstar::ManagerNode::createNewWorker
void createNewWorker()
Creates a new worker.
Definition: ManagerNode.cpp:375
armarx::addirrtstar::Tree::increaseWorkerCountTo
void increaseWorkerCountTo(std::size_t count)
Increases the storage of all data structures to be appropriate for count workers.
Definition: Tree.cpp:122
armarx::addirrtstar::ManagerNode::workerMutex
std::mutex workerMutex
used to lock access to the vector workers
Definition: ManagerNode.h:347
armarx::VariantType::Int
const VariantTypeId Int
Definition: Variant.h:916
armarx::addirrtstar::ManagerNode::isPlanningDone
bool isPlanningDone()
Definition: ManagerNode.cpp:544
ARMARX_VERBOSE_S
#define ARMARX_VERBOSE_S
Definition: Logging.h:200
armarx::ManagedIceObject::getName
std::string getName() const
Retrieve name of object.
Definition: ManagedIceObject.cpp:107
armarx::addirrtstar::ManagerNode::getLocalUpdateRequiresUpdateMutex
const Update & getLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
Returns the requested update from the cache.
Definition: ManagerNode.cpp:409
armarx::addirrtstar::Tree::getBestCost
float getBestCost() const
Definition: Tree.cpp:399
armarx::Logging::setTag
void setTag(const LogTag &tag)
Definition: Logging.cpp:55
min
T min(T t1, T t2)
Definition: gdiam.h:42
armarx::UniformProlateSpheroidDistribution< float >
armarx::addirrtstar::ManagerNode::onConnectComponent
void onConnectComponent() override
noop.
Definition: ManagerNode.cpp:96
armarx::addirrtstar::ManagerNode::timepointStart
ClockType::time_point timepointStart
Timepoint when the manager node started planning.
Definition: ManagerNode.h:390
armarx::ManagedIceObject::getProxy
Ice::ObjectPrx getProxy(long timeoutMs=0, bool waitForScheduler=true) const
Returns the proxy of this object (optionally it waits for the proxy)
Definition: ManagedIceObject.cpp:393
armarx::addirrtstar::ManagerNode::activeWorkerCount
std::atomic_size_t activeWorkerCount
currentlyActiveWorkers the index of the newest planning process in the workers vector (is <= workers....
Definition: ManagerNode.h:330
armarx::addirrtstar::Tree::prepareUpdate
void prepareUpdate(Ice::LongSeq dependetOnUpdateIds, LockType &&treeLock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Prepares an update by applying all dependet on updates.
Definition: Tree.h:423
armarx::ManagedIceObject::getObjectScheduler
ArmarXObjectSchedulerPtr getObjectScheduler() const
Definition: ManagedIceObject.cpp:731
armarx::addirrtstar
Definition: ManagerNode.cpp:35