ManagerNode.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5  *
6  * ArmarX is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 as
8  * published by the Free Software Foundation.
9  *
10  * ArmarX is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  * @package RobotComponents
19  * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20  * @date 2015
21  * @copyright http://www.gnu.org/licenses/gpl.txt
22  * GNU General Public License
23  */
24 
25 #include "ManagerNode.h"
26 
27 #include <algorithm>
28 #include <numeric>
29 
31 
32 #include "../../util/Samplers.h"
33 #include "WorkerNode.h"
34 #include "util.h"
35 
37 {
38  //from managedIceObject
39  void
41  {
42  setTag("ManagerNode");
43  ARMARX_VERBOSE_S << "start init";
44  //set other variables
45  killRequest = false;
46 
48  workersPerRemoteObjectNode.resize(remoteObjectNodes.size(), 0);
49 
51  maxWorkersPerRemoteObjectNode.reserve(remoteObjectNodes.size());
52  std::transform(remoteObjectNodes.begin(),
53  remoteObjectNodes.end(),
54  std::back_inserter(maxWorkersPerRemoteObjectNode),
55  [](const RemoteObjectNodeInterfacePrx& prx)
56  {
57  const auto count = static_cast<std::size_t>(prx->getNumberOfCores());
58  ARMARX_VERBOSE_S << "ROI " << prx.get() << " has " << count << " cores";
59  return count;
60  });
61  //there cant be more worker than remote objects
62  const Ice::Long maxROCount = std::accumulate(
64  maximalWorkerCount = std::min(maximalWorkerCount, maxROCount);
65  initialWorkerCount = std::min(initialWorkerCount, maxROCount);
67 
68  //init tree with start node
69  tree.init(FullIceTree{FullNodeDataListList{FullNodeDataList{FullNodeData{
70  startNode, //config
71  NodeId{0, 0}, //parent
72  std::numeric_limits<Ice::Float>::infinity(), //radius;
73  0.f //fromParentCost
74  }}},
75  Ice::LongSeq{-1}},
76  addParams,
77  maximalWorkerCount //since this tree does no updates the id is not important
78  );
79  tree.increaseWorkerCountTo(maximalWorkerCount);
80 
81  //init prefix
82  updateTopicPrefix = getName() + ':';
83 
84  //register for updates
85  usingTopic(updateTopicPrefix + ADDIRRTSTAR_TREE_UPDATE_TOPIC_NAME);
86 
87  ARMARX_VERBOSE_S << "done init";
88  }
89 
90  void
92  {
93  ARMARX_VERBOSE_S << "armarx::addirrtstar::ManagerNode::onConnectComponent()";
94  //start manager thread
95  managerThread = std::thread{[this] { this->managerTask(); }};
96  }
97 
98  void
100  {
101  ARMARX_VERBOSE_S << "start onExitComponent()";
102  killRequest = true;
103  managerThread.join();
104  ARMARX_VERBOSE_S << "done onExitComponent()";
105  }
106 
107  //from ManagerNodeBase
108  PathWithCost
109  ManagerNode::getBestPath(const Ice::Current&) const
110  {
111  std::lock_guard<std::mutex> lock{treeMutex};
112  auto&& path = tree.getBestPath();
113  path.nodes.emplace_back(goalNode);
114  return path;
115  }
116 
117  Ice::Long
118  ManagerNode::getPathCount(const Ice::Current&) const
119  {
120  std::lock_guard<std::mutex> lock{treeMutex};
121  return tree.getPathCount();
122  }
123 
124  PathWithCost
125  ManagerNode::getNthPathWithCost(Ice::Long n, const Ice::Current&) const
126  {
127  std::lock_guard<std::mutex> lock{treeMutex};
128  auto&& path = tree.getNthPathWithCost(static_cast<std::size_t>(n));
129  path.nodes.emplace_back(goalNode);
130  return path;
131  }
132 
133  PathWithCostSeq
134  ManagerNode::getAllPathsWithCost(const Ice::Current&) const
135  {
136  std::lock_guard<std::mutex> lock{treeMutex};
137  const auto&& pathCount = tree.getPathCount();
138  PathWithCostSeq paths{pathCount};
139 
140  for (std::size_t i = 0; i < pathCount; ++i)
141  {
142  paths.at(i) = tree.getNthPathWithCost(i);
143  paths.at(i).nodes.emplace_back(goalNode);
144  }
145 
146  return paths;
147  }
148 
149  Update
150  ManagerNode::getUpdate(Ice::Long workerId, Ice::Long updateId, const Ice::Current&) const
151  {
152  ARMARX_WARNING_S << "worker requested remote update w/u " << workerId << "/" << updateId;
153  std::lock_guard<std::mutex> lock{updateMutex};
154  ARMARX_CHECK_EXPRESSION(workerId >= 0);
155  ARMARX_CHECK_EXPRESSION(updateId >= 0);
156 
157  const auto uWorkerId = static_cast<std::size_t>(workerId);
158  const auto uUpdateId = static_cast<std::size_t>(updateId);
160 
161  if (hasLocalUpdateRequiresUpdateMutex(uWorkerId, uUpdateId))
162  {
163  return getLocalUpdateRequiresUpdateMutex(uWorkerId, uUpdateId);
164  }
165 
166  return getRemoteUpdate(uWorkerId, uUpdateId);
167  }
168 
169  FullIceTree
170  ManagerNode::getTree(const Ice::Current&) const
171  {
172  //since we a copying the tree and all update ids we need both mutexes
173  std::lock(treeMutex, updateMutex);
174  std::lock_guard<std::mutex> treeLock(treeMutex, std::adopt_lock);
175  std::lock_guard<std::mutex> updateLock(updateMutex, std::adopt_lock);
176  return tree.getIceTree();
177  }
178 
179  void
181  {
182  try
183  {
184  ARMARX_CHECK_EXPRESSION(planningComputingPowerRequestStrategy);
185  //since post unmarshall is of this is called BEFORE post unmarshall of the member cspace is called this assert has to be done here!
186  const auto dim = static_cast<std::size_t>(cspace->getDimensionality());
187  ARMARX_CHECK_EXPRESSION(startNode.size() == dim);
188  ARMARX_CHECK_EXPRESSION(cspace->isValidConfiguration(
189  std::make_pair(&startNode.front(), &startNode.back() + 1)));
190  ARMARX_CHECK_EXPRESSION(goalNode.size() == dim);
191  ARMARX_CHECK_EXPRESSION(cspace->isValidConfiguration(
192  std::make_pair(&goalNode.front(), &goalNode.back() + 1)));
193 
194  //calculate rotation matrix
195  {
197  startNode.begin(), startNode.end(), goalNode.begin());
199  }
200 
201  //after starting the requred proxies are set.
202  getObjectScheduler()->waitForObjectStateMinimum(eManagedIceObjectStarted);
203 
204  ARMARX_IMPORTANT << "Planning Task starts planning";
205  //start initial worker
206  for (Ice::Long i = 0; i < initialWorkerCount; ++i)
207  {
208  createNewWorker();
209  }
210 
211  timepointStart = ClockType::now();
212  TaskStatus::Status status = TaskStatus::ePlanning;
213  planningComputingPowerRequestStrategy->updateTaskStatus(TaskStatus::ePlanning);
214 
215  std::mutex mutexWait;
216  std::unique_lock<std::mutex> waitingLock{mutexWait};
217 
218 
219  planningComputingPowerRequestStrategy->setCurrentStateAsInitialState();
220 
221  while (true)
222  {
223  managerEvent.wait_for(waitingLock, std::chrono::milliseconds{100});
224 
225  if (killRequest)
226  {
227  //update correct status
229  ARMARX_IMPORTANT << "Planning Task received killRequest";
230  break;
231  }
232 
233  if (hasTimeRunOut())
234  {
236  ARMARX_IMPORTANT << "Planning Task encountered timeout ("
237  << maximalPlanningTimeInSeconds << " seconds)";
238  break;
239  }
240 
241  if (isPlanningDone())
242  {
244  ARMARX_IMPORTANT << "Planning Task is finished";
245  break;
246  }
247 
248  {
249  std::lock(treeMutex, updateMutex);
250  std::lock_guard<std::mutex> treeLock(treeMutex, std::adopt_lock);
251  std::unique_lock<std::mutex> updateLock(updateMutex, std::adopt_lock);
252  //apply all updates.
253  //this touches the updates and tree => both mutexes are required.
254  //when fetching a remote update the tree is in a consistent state and the updates are not used. => both mutexes can be released
255  //since there is no lock guard for this only updateMutex is unlocked.
256 
257  applyUpdatesNotThreadSafe(updateLock);
258 
259  if ((status == TaskStatus::ePlanning) && tree.getPathCount())
260  {
261  status = TaskStatus::eRefining;
262  task->setTaskStatus(status);
263  ARMARX_IMPORTANT << "Planning Task start refining the found solution";
264  planningComputingPowerRequestStrategy->updateTaskStatus(
265  TaskStatus::eRefining);
266  }
267 
268  //update planningComputingPowerRequestStrategy
269  planningComputingPowerRequestStrategy->updateNodeCount(tree.size());
270  }
271 
272  // pause worker processes if maximalWorkerCount was reduced
273  while (getActiveWorkerCount() > static_cast<std::size_t>(maximalWorkerCount) &&
274  getActiveWorkerCount() > 1)
275  {
276  workers.at(activeWorkerCount - 1)->pauseWorker(true);
278  }
279  // unpause/start new worker, if maximalWorkerCount is greater then the number of active workers and new resources are required
280  if ((getActiveWorkerCount() < static_cast<std::size_t>(maximalWorkerCount)) &&
281  planningComputingPowerRequestStrategy->shouldAllocateComputingPower())
282  {
283  if (activeWorkerCount >= workers.size())
284  {
285  createNewWorker();
286  }
287  else
288  {
289  workers.at(activeWorkerCount - 1)->pauseWorker(false);
291  }
292  }
293  }
294 
295  ARMARX_VERBOSE_S << "finished main loop managerTask()";
296 
297  //clean up everyting
299 
300  //write back data
301  ARMARX_VERBOSE_S << "storing paths in task";
302  task->setPaths(getAllPathsWithCost());
303 
304  //the status may have changed! (if aborted / failed it may be now RefinementAborted)
305  if ((status == TaskStatus::ePlanningAborted || status == TaskStatus::ePlanningFailed) &&
306  tree.getPathCount())
307  {
308  //first set the task status
309  task->setTaskStatus(TaskStatus::eRefining);
310  status = TaskStatus::eRefinementAborted;
311  }
312 
313  ARMARX_VERBOSE_S << "setting task status to " << status;
314  //update status
315  task->setTaskStatus(status);
316  ARMARX_VERBOSE_S << "exit managerTask";
317  }
318 
319 #define common_exception_output \
320  "EXCEPTION!\n" \
321  << "\n\ttask name: " << task->getTaskName() << "\n\tice id = " << task->ice_id() \
322  << "\n\told status " << TaskStatus::toString(task->getTaskStatus())
323  catch (Ice::Exception& e)
324  {
325  ARMARX_ERROR_S << common_exception_output << "\n\tWHAT:\n"
326  << e.what() << "\n\tSTACK:\n"
327  << e.ice_stackTrace();
328  task->setTaskStatus(TaskStatus::eException);
329  }
330  catch (std::exception& e)
331  {
332  ARMARX_ERROR_S << common_exception_output << "\n\tWHAT:\n" << e.what();
333  task->setTaskStatus(TaskStatus::eException);
334  }
335  catch (...)
336  {
338  << "\n\tsomething not derived from std::exception was thrown";
339  task->setTaskStatus(TaskStatus::eException);
340  }
341 #undef common_exception_output
342  }
343 
344  void
345  ManagerNode::createNewWorkerOn(std::size_t remoteObjectNodeIndex)
346  {
347  std::lock_guard<std::mutex> lock{workerMutex};
348  ARMARX_VERBOSE_S << "creating worker on " << remoteObjectNodeIndex;
349 
350  ARMARX_CHECK_EXPRESSION(remoteObjectNodeIndex < remoteObjectNodes.size());
351  RemoteObjectNodeInterfacePrx& ron = remoteObjectNodes.at(remoteObjectNodeIndex);
352  //prepare worker
353  const Ice::Long newWorkerId = getWorkerCount();
354 
356  ManagerNodeBasePrx selfProxy = ManagerNodeBasePrx::uncheckedCast(getProxy());
357  ARMARX_CHECK_EXPRESSION(selfProxy);
358 
359  WorkerNodePtr newWorker{new WorkerNode{CSpaceBasePtr::dynamicCast(cspace->clone()),
360  startNode,
361  goalNode,
362  dcdStep,
363  addParams,
364  selfProxy,
365  newWorkerId,
366  batchSize,
367  nodeCountDeltaForGoalConnectionTries,
369  rotationMatrix}};
370  std::stringstream newWorkerName;
371  newWorkerName << newWorker->getName() << ":" << newWorkerId << "@[" << getName() << "]";
372  //init local structures
373  appliedUpdates.emplace_back();
374  workers.emplace_back(ron->registerRemoteHandledObject(newWorkerName.str(), newWorker));
375  //increment count
377  ++workersPerRemoteObjectNode.at(remoteObjectNodeIndex);
378 
379  ARMARX_VERBOSE_S << "\n created worker"
380  << "\n\t RON index: " << remoteObjectNodeIndex
381  << "\n\t #worker: " << getWorkerCount();
382  planningComputingPowerRequestStrategy->allocatedComputingPower();
383  }
384 
385  void
387  {
388  if (getWorkerCount() >= static_cast<std::size_t>(maximalWorkerCount))
389  {
390  return;
391  }
392 
393  //select where to create a new worker
394  ARMARX_CHECK_EXPRESSION(remoteObjectNodes.size() == workersPerRemoteObjectNode.size());
395  ARMARX_CHECK_EXPRESSION(remoteObjectNodes.size() == maxWorkersPerRemoteObjectNode.size());
396  std::size_t ronIndex = 0;
397 
398  for (;
399  (ronIndex < remoteObjectNodes.size()) && !(workersPerRemoteObjectNode.at(ronIndex) <
400  maxWorkersPerRemoteObjectNode.at(ronIndex));
401  ++ronIndex)
402  ; //no body
403 
404  ARMARX_CHECK_EXPRESSION(ronIndex < remoteObjectNodes.size());
405  createNewWorkerOn(ronIndex);
406  }
407 
408  //not threadsafe! use only when holding updateMutex
409  Update
410  ManagerNode::getRemoteUpdate(std::size_t workerId, std::size_t updateId) const
411  {
412  std::lock_guard<std::mutex> lock{workerMutex};
413  ARMARX_WARNING_S << "manager requested remote update w/u " << workerId << "/" << updateId;
415  return workers.at(workerId)->getUpdate(updateId);
416  }
417 
418  //not threadsafe! use only when holding updateMutex
419  bool
420  ManagerNode::hasLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
421  {
423  return (updateId < appliedUpdates.at(workerId).size()) ||
424  (tree.hasPendingUpdate(workerId, updateId));
425  }
426 
427  //not threadsafe! use only when holding updateMutex
428  const Update&
429  ManagerNode::getLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
430  {
432 
433  if (updateId < appliedUpdates.at(workerId).size())
434  {
435  return appliedUpdates.at(workerId).at(updateId);
436  }
437 
438  //we have the update. so it has to be pending
439  return tree.getPendingUpdate(workerId, updateId);
440  }
441 
442  void
444  Ice::Long finalUpdateId,
445  const Ice::Current&)
446  {
447  {
448  std::lock_guard<std::mutex> lock{updateMutex};
449  ARMARX_CHECK_EXPRESSION(workerId >= 0);
450  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(workerId) <
451  workersFinalUpdateId.size());
452  ARMARX_CHECK_EXPRESSION(workersFinalUpdateId.at(workerId) == -1);
453  workersFinalUpdateId.at(workerId) = finalUpdateId;
454  }
455  ARMARX_VERBOSE_S << "final update id for worker " << workerId << " is " << finalUpdateId;
456  managerEvent.notify_all();
457  }
458 
459  void
460  ManagerNode::updateTree(const Update& u, const Ice::Current&)
461  {
462  std::lock_guard<std::mutex> lock{updateMutex};
464  managerEvent.notify_all();
465  }
466 
467  void
469  {
470  workersFinalUpdateId.resize(getWorkerCount(), -1);
471  //kill all
472  {
473  std::lock_guard<std::mutex> lock{workerMutex};
474 
475  for (auto& worker : workers)
476  {
477  worker->killWorker();
478  }
479  }
480 
481  {
482  std::lock(treeMutex, updateMutex);
483  std::unique_lock<std::mutex> treeLock(treeMutex, std::adopt_lock);
484  std::unique_lock<std::mutex> updateLock(updateMutex, std::adopt_lock);
485 
486  //sync for update counts
487  do
488  {
489  managerEvent.wait_for(updateLock, std::chrono::milliseconds{100});
490  } while (std::any_of(workersFinalUpdateId.begin(),
491  workersFinalUpdateId.end(),
492  [](const Ice::Long& l) { return l == -1; }));
493 
494  //get all updates
497  updateLock,
498  [this](std::size_t workerId, Ice::Long updateId) //remote update getter
499  {
501  return getRemoteUpdate(workerId, updateId);
502  },
503  [this](Update&& u) //update consumer
504  { cacheAppliedUpdateRequiresUpdateMutex(std::move(u)); });
505 
507 
508  for (std::size_t i = 0; i < workersFinalUpdateId.size(); ++i)
509  {
510  const auto workerAppliedUpdateCount = appliedUpdates.at(i).size();
511  const auto workerUpdateCount =
512  static_cast<std::size_t>(workersFinalUpdateId.at(i) + 1);
513  const auto workersLastUpdateId = tree.getAppliedUpdateIds().at(i);
514  ARMARX_CHECK_EXPRESSION(workersLastUpdateId ==
515  static_cast<Ice::Long>(workerAppliedUpdateCount) - 1);
516  ARMARX_CHECK_EXPRESSION(workerUpdateCount == workerAppliedUpdateCount);
517  }
518  }
519 
520  //now all required data is stored here
521  //destroy worker
522  std::lock_guard<std::mutex> lock{workerMutex};
523  workers.clear();
524  }
525 
526  void
527  ManagerNode::applyUpdatesNotThreadSafe(std::unique_lock<std::mutex>& updateLock)
528  {
529  ARMARX_CHECK_EXPRESSION(updateLock);
531  updateLock,
532  [this](std::size_t workerId, Ice::Long updateId) //remote update getter
533  {
535  return getRemoteUpdate(workerId, updateId);
536  },
537  [this](Update&& u) //update consumer
538  {
539  planningComputingPowerRequestStrategy->updateNodeCreations(u.nodes.size(),
540  batchSize);
541 
543  });
544  ARMARX_CHECK_EXPRESSION(updateLock);
545  }
546 
547  void
549  {
550  //correct worker id
551  const auto workerId = getUpdatesWorkerId(u);
552  ARMARX_CHECK_EXPRESSION(workerId < appliedUpdates.size());
553  //assert the last update from the same worker is the worker's update prior to this update
554  ARMARX_CHECK_EXPRESSION(tree.hasAppliedUpdate(workerId, getUpdatesPreviousUpdateSubId(u)));
555  //the update was applied to the tree
556  ARMARX_CHECK_EXPRESSION(tree.hasAppliedUpdate(workerId, getUpdatesSubId(u)));
557 
558  // ARMARX_DEBUG_S << "caching update (workerId/updateID): " << workerId << " / " << getUpdatesSubId(u);
559  //add the update to the cache
560  appliedUpdates.at(workerId).emplace_back(std::move(u));
561  }
562 
563  Ice::Long
564  ManagerNode::getNodeCount(const Ice::Current&) const
565  {
566  std::lock_guard<std::mutex> treeLock{treeMutex};
567  return static_cast<Ice::Long>(tree.size());
568  }
569 
570  bool
572  {
573  std::lock_guard<std::mutex> treeLock(treeMutex, std::adopt_lock);
574  return tree.getBestCost() <= targetCost;
575  }
576 
577  bool
579  {
580  std::lock_guard<std::mutex> treeLock(treeMutex, std::adopt_lock);
581  return ((ClockType::now() - timepointStart) >=
582  std::chrono::seconds{maximalPlanningTimeInSeconds});
583  }
584 
585  void
586  ManagerNode::setMaxCpus(Ice::Int maxCpus, const Ice::Current&)
587  {
588  maximalWorkerCount = maxCpus;
589  }
590 
591  Ice::Int
592  ManagerNode::getMaxCpus(const Ice::Current&) const
593  {
594  return maximalWorkerCount;
595  }
596 } // namespace armarx::addirrtstar
armarx::addirrtstar::ManagerNode::treeMutex
std::mutex treeMutex
protects the tree data
Definition: ManagerNode.h:386
armarx::addirrtstar::ManagerNode::getBestPath
PathWithCost getBestPath(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:109
armarx::addirrtstar::ManagerNode::workersPerRemoteObjectNode
std::vector< std::size_t > workersPerRemoteObjectNode
How many workers are started on each remote object node.
Definition: ManagerNode.h:354
armarx::addirrtstar::ManagerNode::onInitComponent
void onInitComponent() override
Initializes the tree and sampler.
Definition: ManagerNode.cpp:40
common_exception_output
#define common_exception_output
armarx::addirrtstar::Tree::getPendingUpdate
const Update & getPendingUpdate(std::size_t workerId, std::size_t updateId) const
Definition: Tree.h:194
armarx::addirrtstar::ManagerNode::setMaxCpus
void setMaxCpus(Ice::Int maxCpus, const Ice::Current &=Ice::emptyCurrent) override
Definition: ManagerNode.cpp:586
armarx::addirrtstar::ManagerNode::getWorkerCount
std::size_t getWorkerCount() const
Returns the number of currently available workers (both active and paused).
Definition: ManagerNode.h:328
ARMARX_IMPORTANT
#define ARMARX_IMPORTANT
Definition: Logging.h:190
WorkerNode.h
armarx::addirrtstar::Tree::size
std::size_t size() const
Definition: Tree.h:694
armarx::addirrtstar::Tree::getBestPath
PathWithCost getBestPath() const
Definition: Tree.cpp:416
armarx::addirrtstar::ManagerNode::getTree
FullIceTree getTree(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:170
armarx::addirrtstar::Tree::hasAppliedUpdate
bool hasAppliedUpdate(std::size_t workerId, Ice::Long updateId)
Definition: Tree.h:171
util.h
armarx::addirrtstar::ManagerNode::getNodeCount
Ice::Long getNodeCount(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:564
armarx::addirrtstar::Tree::init
void init(const FullIceTree &iceTree, AdaptiveDynamicDomainParameters newaddParams, std::size_t newWorkerId)
Initailizes the tree from the given tree.
Definition: Tree.cpp:54
armarx::addirrtstar::Tree::applyPendingUpdates
void applyPendingUpdates()
Applies all pending updates.
Definition: Tree.h:206
armarx::addirrtstar::ManagerNode::createNewWorkerOn
void createNewWorkerOn(std::size_t remoteObjectNodeIndex)
Creates a new worker on the given remote object node.
Definition: ManagerNode.cpp:345
armarx::addirrtstar::ManagerNode::getMaxCpus
Ice::Int getMaxCpus(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:592
armarx::TaskStatus::transitionAtKill
Status transitionAtKill(Status status)
Performs a transition from the given to the appropiate following status in case of a task kill.
Definition: PlanningUtil.cpp:126
armarx::addirrtstar::ManagerNode::updateTopicPrefix
std::string updateTopicPrefix
The update topic's prefix.
Definition: ManagerNode.h:372
armarx::TaskStatus::transitionAtOutoftime
Status transitionAtOutoftime(Status status)
Performs a transition from the given to the appropiate following status in case of the task failing.
Definition: PlanningUtil.cpp:156
armarx::addirrtstar::ManagerNode::hasLocalUpdateRequiresUpdateMutex
bool hasLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
Returns whether the given update is cached.
Definition: ManagerNode.cpp:420
armarx::addirrtstar::ManagerNode::managerEvent
std::condition_variable managerEvent
CV used by the manager thread to wait for new updates.
Definition: ManagerNode.h:390
armarx::addirrtstar::ManagerNode::rotationMatrix
Ice::FloatSeq rotationMatrix
The rotation matrix used by the informed samplers.
Definition: ManagerNode.h:399
armarx::addirrtstar::ManagerNode::managerThread
std::thread managerThread
The tread executing managerTask.
Definition: ManagerNode.h:341
armarx::UniformProlateSpheroidDistribution::getRotationMatrix
std::vector< RealType > getRotationMatrix() const
Returns the rotation matrix stored in a vector.
Definition: Samplers.h:506
armarx::addirrtstar::ManagerNode::maxWorkersPerRemoteObjectNode
std::vector< std::size_t > maxWorkersPerRemoteObjectNode
How many workers are maximal allowed on each remote object node.
Definition: ManagerNode.h:358
armarx::addirrtstar::ManagerNode::getAllPathsWithCost
PathWithCostSeq getAllPathsWithCost(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:134
IceInternal::Handle
Definition: forward_declarations.h:8
armarx::addirrtstar::ManagerNode::hasTimeRunOut
bool hasTimeRunOut()
Definition: ManagerNode.cpp:578
armarx::addirrtstar::WorkerNode
Implements the worker side of the batch distributed adaptive dynamic domain informed rrt* planner.
Definition: WorkerNode.h:75
armarx::status
status
Definition: FiniteStateMachine.h:244
armarx::addirrtstar::ManagerNode::getRemoteUpdate
Update getRemoteUpdate(std::size_t workerId, std::size_t updateId) const
Returns the requested update fetched from the corresponding worker.
Definition: ManagerNode.cpp:410
armarx::addirrtstar::Tree::getPathCount
std::size_t getPathCount() const
Definition: Tree.h:789
ManagerNode.h
armarx::TaskStatus::transitionAtDone
Status transitionAtDone(Status status)
Performs a transition from the given to the appropiate following status in case the task is done.
Definition: PlanningUtil.cpp:186
armarx::addirrtstar::ManagerNode::updateTree
void updateTree(const Update &u, const Ice::Current &=Ice::emptyCurrent) override
Adds the given update to the queue of pending updates.
Definition: ManagerNode.cpp:460
armarx::addirrtstar::ManagerNode::setWorkersFinalUpdateId
void setWorkersFinalUpdateId(Ice::Long workerId, Ice::Long finalUpdateId, const Ice::Current &) override
Used by workers to inform the manager about their number of updates before exiting.
Definition: ManagerNode.cpp:443
armarx::addirrtstar::ManagerNode::getPathCount
Ice::Long getPathCount(const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:118
ARMARX_ERROR_S
#define ARMARX_ERROR_S
Definition: Logging.h:216
armarx::addirrtstar::ManagerNode::killRequest
std::atomic_bool killRequest
Flag to signal the manager thread to exit.
Definition: ManagerNode.h:337
armarx::addirrtstar::Tree::getAppliedUpdateIds
const Ice::LongSeq & getAppliedUpdateIds() const
Definition: Tree.h:845
armarx::addirrtstar::ManagerNode::updateMutex
std::mutex updateMutex
Protects the update section of the tree and the update cache of the manager.
Definition: ManagerNode.h:368
armarx::addirrtstar::ManagerNode::onExitComponent
void onExitComponent() override
Stopps planning and joins the manager thread.
Definition: ManagerNode.cpp:99
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:918
armarx::addirrtstar::Tree::hasPendingUpdate
bool hasPendingUpdate(std::size_t workerId, std::size_t updateId) const
Definition: Tree.h:183
armarx::addirrtstar::ManagerNode::managerTask
void managerTask()
The managet task.checkedCastIt checks whether new workers are required and starts them if this is the...
Definition: ManagerNode.cpp:180
armarx::addirrtstar::Tree::getIceTree
FullIceTree getIceTree() const
Definition: Tree.cpp:475
ArmarXObjectScheduler.h
armarx::addirrtstar::ManagerNode::workersFinalUpdateId
std::vector< Ice::Long > workersFinalUpdateId
Used when shutting down to ensure all updates were applied before destroying the nodes remote object.
Definition: ManagerNode.h:381
ARMARX_WARNING_S
#define ARMARX_WARNING_S
Definition: Logging.h:213
armarx::addirrtstar::ManagerNode::workers
std::vector< RemoteHandle< WorkerNodeBasePrx > > workers
Worker proxies.
Definition: ManagerNode.h:350
armarx::addirrtstar::ManagerNode::cleanupAllWorkers
void cleanupAllWorkers()
Shuts down and removes all workers.
Definition: ManagerNode.cpp:468
armarx::addirrtstar::ManagerNode::getActiveWorkerCount
std::size_t getActiveWorkerCount() const
getActiveWorkerCount returns the number of currently active workers.
Definition: ManagerNode.h:318
armarx::transform
auto transform(const Container< InputT, Alloc > &in, OutputT(*func)(InputT const &)) -> Container< OutputT, typename std::allocator_traits< Alloc >::template rebind_alloc< OutputT >>
Convenience function (with less typing) to transform a container of type InputT into the same contain...
Definition: algorithm.h:351
armarx::addirrtstar::ManagerNode::cacheAppliedUpdateRequiresUpdateMutex
void cacheAppliedUpdateRequiresUpdateMutex(Update &&u)
Stores the given applied update to the cache.
Definition: ManagerNode.cpp:548
armarx::addirrtstar::ManagerNode::appliedUpdates
std::vector< std::deque< Update > > appliedUpdates
All applied updates.
Definition: ManagerNode.h:376
armarx::ManagedIceObject::usingTopic
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Definition: ManagedIceObject.cpp:254
armarx::addirrtstar::ManagerNode::tree
Tree tree
The RRT.
Definition: ManagerNode.h:394
armarx::addirrtstar::ManagerNode::applyUpdatesNotThreadSafe
void applyUpdatesNotThreadSafe(std::unique_lock< std::mutex > &updateLock)
Applies all pending updates.
Definition: ManagerNode.cpp:527
armarx::addirrtstar::Tree::getNthPathWithCost
PathWithCost getNthPathWithCost(std::size_t n) const
Definition: Tree.cpp:426
ARMARX_CHECK_EXPRESSION
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
Definition: ExpressionException.h:73
armarx::addirrtstar::ManagerNode::getNthPathWithCost
PathWithCost getNthPathWithCost(Ice::Long n, const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:125
armarx::addirrtstar::ManagerNode::getUpdate
Update getUpdate(Ice::Long workerId, Ice::Long updateId, const Ice::Current &=Ice::emptyCurrent) const override
Definition: ManagerNode.cpp:150
armarx::addirrtstar::Tree::addPendingUpdate
void addPendingUpdate(const Update &u)
Adds a new update to the list of pending updates.
Definition: Tree.cpp:502
armarx::addirrtstar::ManagerNode::createNewWorker
void createNewWorker()
Creates a new worker.
Definition: ManagerNode.cpp:386
armarx::addirrtstar::Tree::increaseWorkerCountTo
void increaseWorkerCountTo(std::size_t count)
Increases the storage of all data structures to be appropriate for count workers.
Definition: Tree.cpp:108
armarx::addirrtstar::ManagerNode::workerMutex
std::mutex workerMutex
used to lock access to the vector workers
Definition: ManagerNode.h:362
armarx::VariantType::Int
const VariantTypeId Int
Definition: Variant.h:917
armarx::addirrtstar::ManagerNode::isPlanningDone
bool isPlanningDone()
Definition: ManagerNode.cpp:571
ARMARX_VERBOSE_S
#define ARMARX_VERBOSE_S
Definition: Logging.h:207
armarx::ManagedIceObject::getName
std::string getName() const
Retrieve name of object.
Definition: ManagedIceObject.cpp:108
armarx::addirrtstar::ManagerNode::getLocalUpdateRequiresUpdateMutex
const Update & getLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
Returns the requested update from the cache.
Definition: ManagerNode.cpp:429
armarx::addirrtstar::Tree::getBestCost
float getBestCost() const
Definition: Tree.cpp:406
armarx::Logging::setTag
void setTag(const LogTag &tag)
Definition: Logging.cpp:54
min
T min(T t1, T t2)
Definition: gdiam.h:44
armarx::UniformProlateSpheroidDistribution< float >
armarx::addirrtstar::ManagerNode::onConnectComponent
void onConnectComponent() override
noop.
Definition: ManagerNode.cpp:91
armarx::addirrtstar::ManagerNode::timepointStart
ClockType::time_point timepointStart
Timepoint when the manager node started planning.
Definition: ManagerNode.h:405
armarx::ManagedIceObject::getProxy
Ice::ObjectPrx getProxy(long timeoutMs=0, bool waitForScheduler=true) const
Returns the proxy of this object (optionally it waits for the proxy)
Definition: ManagedIceObject.cpp:407
armarx::addirrtstar::ManagerNode::activeWorkerCount
std::atomic_size_t activeWorkerCount
currentlyActiveWorkers the index of the newest planning process in the workers vector (is <= workers....
Definition: ManagerNode.h:345
armarx::addirrtstar::Tree::prepareUpdate
void prepareUpdate(Ice::LongSeq dependetOnUpdateIds, LockType &&treeLock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Prepares an update by applying all dependet on updates.
Definition: Tree.h:461
armarx::ManagedIceObject::getObjectScheduler
ArmarXObjectSchedulerPtr getObjectScheduler() const
Definition: ManagedIceObject.cpp:776
armarx::addirrtstar
Definition: ManagerNode.cpp:36