52 std::transform(remoteObjectNodes.begin(),
53 remoteObjectNodes.end(),
55 [](
const RemoteObjectNodeInterfacePrx& prx)
57 const auto count = static_cast<std::size_t>(prx->getNumberOfCores());
58 ARMARX_VERBOSE_S <<
"ROI " << prx.get() <<
" has " << count <<
" cores";
62 const Ice::Long maxROCount = std::accumulate(
64 maximalWorkerCount = std::min(maximalWorkerCount, maxROCount);
65 initialWorkerCount = std::min(initialWorkerCount, maxROCount);
69 tree.init(FullIceTree{FullNodeDataListList{FullNodeDataList{FullNodeData{
72 std::numeric_limits<Ice::Float>::infinity(),
79 tree.increaseWorkerCountTo(maximalWorkerCount);
93 ARMARX_VERBOSE_S <<
"armarx::addirrtstar::ManagerNode::onConnectComponent()";
111 std::lock_guard<std::mutex> lock{
treeMutex};
112 auto&& path =
tree.getBestPath();
113 path.nodes.emplace_back(goalNode);
120 std::lock_guard<std::mutex> lock{
treeMutex};
121 return tree.getPathCount();
127 std::lock_guard<std::mutex> lock{
treeMutex};
128 auto&& path =
tree.getNthPathWithCost(
static_cast<std::size_t
>(n));
129 path.nodes.emplace_back(goalNode);
136 std::lock_guard<std::mutex> lock{
treeMutex};
137 const auto&& pathCount =
tree.getPathCount();
138 PathWithCostSeq paths{pathCount};
140 for (std::size_t i = 0; i < pathCount; ++i)
142 paths.at(i) =
tree.getNthPathWithCost(i);
143 paths.at(i).nodes.emplace_back(goalNode);
152 ARMARX_WARNING_S <<
"worker requested remote update w/u " << workerId <<
"/" << updateId;
157 const auto uWorkerId =
static_cast<std::size_t
>(workerId);
158 const auto uUpdateId =
static_cast<std::size_t
>(updateId);
174 std::lock_guard<std::mutex> treeLock(
treeMutex, std::adopt_lock);
175 std::lock_guard<std::mutex> updateLock(
updateMutex, std::adopt_lock);
176 return tree.getIceTree();
186 const auto dim =
static_cast<std::size_t
>(cspace->getDimensionality());
189 std::make_pair(&startNode.front(), &startNode.back() + 1)));
192 std::make_pair(&goalNode.front(), &goalNode.back() + 1)));
197 startNode.begin(), startNode.end(), goalNode.begin());
206 for (Ice::Long i = 0; i < initialWorkerCount; ++i)
212 TaskStatus::Status
status = TaskStatus::ePlanning;
213 planningComputingPowerRequestStrategy->updateTaskStatus(TaskStatus::ePlanning);
215 std::mutex mutexWait;
216 std::unique_lock<std::mutex> waitingLock{mutexWait};
219 planningComputingPowerRequestStrategy->setCurrentStateAsInitialState();
223 managerEvent.wait_for(waitingLock, std::chrono::milliseconds{100});
237 << maximalPlanningTimeInSeconds <<
" seconds)";
250 std::lock_guard<std::mutex> treeLock(
treeMutex, std::adopt_lock);
251 std::unique_lock<std::mutex> updateLock(
updateMutex, std::adopt_lock);
259 if ((
status == TaskStatus::ePlanning) &&
tree.getPathCount())
261 status = TaskStatus::eRefining;
262 task->setTaskStatus(
status);
264 planningComputingPowerRequestStrategy->updateTaskStatus(
265 TaskStatus::eRefining);
269 planningComputingPowerRequestStrategy->updateNodeCount(
tree.size());
281 planningComputingPowerRequestStrategy->shouldAllocateComputingPower())
305 if ((
status == TaskStatus::ePlanningAborted ||
status == TaskStatus::ePlanningFailed) &&
309 task->setTaskStatus(TaskStatus::eRefining);
310 status = TaskStatus::eRefinementAborted;
315 task->setTaskStatus(
status);
319#define common_exception_output \
321 << "\n\ttask name: " << task->getTaskName() << "\n\tice id = " << task->ice_id() \
322 << "\n\told status " << TaskStatus::toString(task->getTaskStatus())
323 catch (Ice::Exception& e)
326 << e.what() <<
"\n\tSTACK:\n"
327 << e.ice_stackTrace();
328 task->setTaskStatus(TaskStatus::eException);
330 catch (std::exception& e)
333 task->setTaskStatus(TaskStatus::eException);
338 <<
"\n\tsomething not derived from std::exception was thrown";
339 task->setTaskStatus(TaskStatus::eException);
341#undef common_exception_output
351 RemoteObjectNodeInterfacePrx& ron = remoteObjectNodes.at(remoteObjectNodeIndex);
356 ManagerNodeBasePrx selfProxy = ManagerNodeBasePrx::uncheckedCast(
getProxy());
367 nodeCountDeltaForGoalConnectionTries,
370 std::stringstream newWorkerName;
371 newWorkerName << newWorker->getName() <<
":" << newWorkerId <<
"@[" <<
getName() <<
"]";
374 workers.emplace_back(ron->registerRemoteHandledObject(newWorkerName.str(), newWorker));
380 <<
"\n\t RON index: " << remoteObjectNodeIndex
382 planningComputingPowerRequestStrategy->allocatedComputingPower();
388 if (
getWorkerCount() >=
static_cast<std::size_t
>(maximalWorkerCount))
396 std::size_t ronIndex = 0;
413 ARMARX_WARNING_S <<
"manager requested remote update w/u " << workerId <<
"/" << updateId;
415 return workers.at(workerId)->getUpdate(updateId);
424 (
tree.hasPendingUpdate(workerId, updateId));
439 return tree.getPendingUpdate(workerId, updateId);
444 Ice::Long finalUpdateId,
455 ARMARX_VERBOSE_S <<
"final update id for worker " << workerId <<
" is " << finalUpdateId;
463 tree.addPendingUpdate(u);
477 worker->killWorker();
483 std::unique_lock<std::mutex> treeLock(
treeMutex, std::adopt_lock);
484 std::unique_lock<std::mutex> updateLock(
updateMutex, std::adopt_lock);
489 managerEvent.wait_for(updateLock, std::chrono::milliseconds{100});
492 [](
const Ice::Long& l) { return l == -1; }));
498 [
this](std::size_t workerId, Ice::Long updateId)
510 const auto workerAppliedUpdateCount =
appliedUpdates.at(i).size();
511 const auto workerUpdateCount =
513 const auto workersLastUpdateId =
tree.getAppliedUpdateIds().at(i);
515 static_cast<Ice::Long
>(workerAppliedUpdateCount) - 1);
530 tree.applyPendingUpdates(
532 [
this](std::size_t workerId, Ice::Long updateId)
539 planningComputingPowerRequestStrategy->updateNodeCreations(u.nodes.size(),
551 const auto workerId = getUpdatesWorkerId(u);
566 std::lock_guard<std::mutex> treeLock{
treeMutex};
567 return static_cast<Ice::Long
>(
tree.size());
573 std::lock_guard<std::mutex> treeLock(
treeMutex, std::adopt_lock);
574 return tree.getBestCost() <= targetCost;
580 std::lock_guard<std::mutex> treeLock(
treeMutex, std::adopt_lock);
582 std::chrono::seconds{maximalPlanningTimeInSeconds});
588 maximalWorkerCount = maxCpus;
594 return maximalWorkerCount;
#define common_exception_output
void setTag(const LogTag &tag)
ArmarXObjectSchedulerPtr getObjectScheduler() const
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
std::string getName() const
Retrieve name of object.
Ice::ObjectPrx getProxy(long timeoutMs=0, bool waitForScheduler=true) const
Returns the proxy of this object (optionally it waits for the proxy)
std::vector< std::size_t > maxWorkersPerRemoteObjectNode
How many workers are maximal allowed on each remote object node.
void updateTree(const Update &u, const Ice::Current &=Ice::emptyCurrent) override
Adds the given update to the queue of pending updates.
void onInitComponent() override
Initializes the tree and sampler.
std::size_t getActiveWorkerCount() const
getActiveWorkerCount returns the number of currently active workers.
Update getRemoteUpdate(std::size_t workerId, std::size_t updateId) const
Returns the requested update fetched from the corresponding worker.
std::vector< RemoteHandle< WorkerNodeBasePrx > > workers
Worker proxies.
PathWithCostSeq getAllPathsWithCost(const Ice::Current &=Ice::emptyCurrent) const override
Ice::Long getPathCount(const Ice::Current &=Ice::emptyCurrent) const override
bool hasLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
Returns whether the given update is cached.
std::vector< std::size_t > workersPerRemoteObjectNode
How many workers are started on each remote object node.
std::mutex workerMutex
used to lock access to the vector workers
void createNewWorkerOn(std::size_t remoteObjectNodeIndex)
Creates a new worker on the given remote object node.
std::atomic_bool killRequest
Flag to signal the manager thread to exit.
std::vector< Ice::Long > workersFinalUpdateId
Used when shutting down to ensure all updates were applied before destroying the nodes remote object.
std::string updateTopicPrefix
The update topic's prefix.
std::vector< std::deque< Update > > appliedUpdates
All applied updates.
Update getUpdate(Ice::Long workerId, Ice::Long updateId, const Ice::Current &=Ice::emptyCurrent) const override
std::mutex updateMutex
Protects the update section of the tree and the update cache of the manager.
std::thread managerThread
The tread executing managerTask.
PathWithCost getNthPathWithCost(Ice::Long n, const Ice::Current &=Ice::emptyCurrent) const override
void cleanupAllWorkers()
Shuts down and removes all workers.
Ice::FloatSeq rotationMatrix
The rotation matrix used by the informed samplers.
PathWithCost getBestPath(const Ice::Current &=Ice::emptyCurrent) const override
void cacheAppliedUpdateRequiresUpdateMutex(Update &&u)
Stores the given applied update to the cache.
void onConnectComponent() override
noop.
void setMaxCpus(Ice::Int maxCpus, const Ice::Current &=Ice::emptyCurrent) override
void setWorkersFinalUpdateId(Ice::Long workerId, Ice::Long finalUpdateId, const Ice::Current &) override
Used by workers to inform the manager about their number of updates before exiting.
FullIceTree getTree(const Ice::Current &=Ice::emptyCurrent) const override
std::atomic_size_t activeWorkerCount
currentlyActiveWorkers the index of the newest planning process in the workers vector (is <= workers....
const Update & getLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
Returns the requested update from the cache.
ClockType::time_point timepointStart
Timepoint when the manager node started planning.
void managerTask()
The managet task.checkedCastIt checks whether new workers are required and starts them if this is the...
std::size_t getWorkerCount() const
Returns the number of currently available workers (both active and paused).
void createNewWorker()
Creates a new worker.
void onExitComponent() override
Stopps planning and joins the manager thread.
Ice::Long getNodeCount(const Ice::Current &=Ice::emptyCurrent) const override
std::condition_variable managerEvent
CV used by the manager thread to wait for new updates.
Ice::Int getMaxCpus(const Ice::Current &=Ice::emptyCurrent) const override
std::mutex treeMutex
protects the tree data
void applyUpdatesNotThreadSafe(std::unique_lock< std::mutex > &updateLock)
Applies all pending updates.
Implements the worker side of the batch distributed adaptive dynamic domain informed rrt* planner.
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_ERROR_S
The logging level for unexpected behaviour, that must be fixed.
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
#define ARMARX_WARNING_S
The logging level for unexpected behaviour, but not a serious problem.
IceInternal::Handle< WorkerNode > WorkerNodePtr
An ice handle for a WorkerNode of the addirrt* algorithm.