ManagerNode.h
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package RobotComponents
19 * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20 * @date 2015
21 * @copyright http://www.gnu.org/licenses/gpl.txt
22 * GNU General Public License
23 */
24#pragma once
25
26#include <atomic>
27#include <condition_variable>
28#include <mutex>
29#include <thread>
30
33
34#include <RobotComponents/interface/components/MotionPlanning/Tasks/AdaptiveDynamicDomainInformedRRTStar/ManagerNode.h>
35#include <RobotComponents/interface/components/MotionPlanning/Tasks/AdaptiveDynamicDomainInformedRRTStar/WorkerNode.h>
36
38#include "Tree.h"
39
40namespace armarx
41{
42 template <class IceBaseClass, class DerivedClass>
43 class GenericFactory;
44}
45
46namespace armarx::addirrtstar
47{
48 class ManagerNode;
49 /**
50 * @brief An ice handle for a ManagerNode.
51 */
53
54 /**
55 * @brief Manages the planning of the addirrt* algorithm.
56 * Starts workers and checks whether the planning should end.
57 */
58 class ManagerNode : virtual public ManagerNodeBase, virtual public ManagedIceObject
59 {
60 /**
61 * @brief The clock used to meassure durations.
62 */
63 using ClockType = std::chrono::system_clock;
64
65 public:
66 /**
67 * @brief Ctor.
68 * @param task The manager node's task. The manager will store its result in it.
69 * @param remoteObjectNodes RemoteObjectNodes used for parallelization.
70 * @param initialWorkerCount The initial worker count.
71 * @param maximalWorkerCount The maximal worker count.
72 * @param planningComputingPowerRequestStrategy The used cprs.
73 * @param dcdStep The dcd step size.
74 * @param maximalPlanningTimeInSeconds The maximal planning time in seconds. Planning will end after this time.
75 * @param batchSize The size of a batch.
76 * @param nodeCountDeltaForGoalConnectionTries deprecated
77 * @param cspace The used planning cspace.
78 * @param startCfg The start configuration.
79 * @param goalCfg The goal configuration.
80 * @param addParams Parameters for adaptive dynamic domain.
81 * @param targetCost The target cost. Planning will end after shorter path was found..
82 */
84 TaskBasePrx task,
85 RemoteObjectNodePrxList remoteObjectNodes,
86 //management
87 Ice::Long initialWorkerCount,
88 Ice::Long maximalWorkerCount,
89 const cprs::ComputingPowerRequestStrategyBasePtr& planningComputingPowerRequestStrategy,
90 //general
91 float dcdStep,
92 Ice::Long maximalPlanningTimeInSeconds,
93 Ice::Long batchSize,
94 Ice::Long nodeCountDeltaForGoalConnectionTries,
95 //problem
96 CSpaceBasePtr cspace,
97 VectorXf startCfg,
98 VectorXf goalCfg,
99 AdaptiveDynamicDomainParameters addParams,
100 float targetCost) :
101 ManagerNodeBase(task,
102 remoteObjectNodes,
103 initialWorkerCount,
104 maximalWorkerCount,
105 planningComputingPowerRequestStrategy,
106 dcdStep,
107 maximalPlanningTimeInSeconds,
108 cspace,
109 startCfg,
110 goalCfg,
111 addParams,
112 targetCost,
113 batchSize,
114 nodeCountDeltaForGoalConnectionTries)
115 {
116 //other variables get initialized in onInitComponent
117 }
118
119 /**
120 * @brief dtor.
121 * asserts the manager thread was joined.
122 */
123 ~ManagerNode() override
124 {
125#pragma GCC diagnostic push
126#pragma GCC diagnostic ignored "-Wterminate"
128#pragma GCC diagnostic pop
129 }
130
131 //from managedIceObject
132 /**
133 * @brief Initializes the tree and sampler.
134 * Starts the manager thread.
135 */
136 void onInitComponent() override;
137 /**
138 * @brief noop. (debug output)
139 */
140 void onConnectComponent() override;
141
142 /**
143 * @brief noop. (debug output)
144 */
145 void
147 {
148 ARMARX_VERBOSE_S << "armarx::addirrtstar::ManagerNode::onDisconnectComponent()";
149 }
150
151 /**
152 * @brief Stopps planning and joins the manager thread.
153 */
154 void onExitComponent() override;
155
156 /**
157 * @return The components default name.
158 */
159 std::string
160 getDefaultName() const override
161 {
162 return "ADDIRRTStarManagerNode";
163 }
164
165 //from ManagerNodeBase
166 /**
167 * @brief Sets the flag to stop planning
168 */
169 void
170 kill(const Ice::Current& = Ice::emptyCurrent) override
171 {
172 killRequest = true;
173 }
174
175 /**
176 * @return The shortest found path. (with its cost)
177 */
178 PathWithCost getBestPath(const Ice::Current& = Ice::emptyCurrent) const override;
179 /**
180 * @return The number of found paths.
181 */
182 Ice::Long getPathCount(const Ice::Current& = Ice::emptyCurrent) const override;
183 /**
184 * @param index The index.
185 * @return The path at the given index.
186 */
187 PathWithCost getNthPathWithCost(Ice::Long n,
188 const Ice::Current& = Ice::emptyCurrent) const override;
189 /**
190 * @return All found paths.
191 */
192 PathWithCostSeq getAllPathsWithCost(const Ice::Current& = Ice::emptyCurrent) const override;
193
194 /**
195 * @param workerId The updates worker id.
196 * @param updateId The updates sub id.
197 * @return The requested update. If the update is not cached it will be fetched from the corresponding worker.
198 */
199 Update getUpdate(Ice::Long workerId,
200 Ice::Long updateId,
201 const Ice::Current& = Ice::emptyCurrent) const override;
202 /**
203 * @return The current tree with all updates applied.
204 */
205 FullIceTree getTree(const Ice::Current& = Ice::emptyCurrent) const override;
206
207 /**
208 * @brief Used by workers to inform the manager about their number of updates before exiting.
209 * Used by the manager to fetch all remaining updates before exiting.
210 * @param workerId The worker.
211 * @param finalUpdateId Its final update id.
212 */
213 void setWorkersFinalUpdateId(Ice::Long workerId,
214 Ice::Long finalUpdateId,
215 const Ice::Current&) override;
216
217 //from TreeUpdateInterface
218 /**
219 * @brief Adds the given update to the queue of pending updates.
220 * @param u The update.
221 */
222 void updateTree(const Update& u, const Ice::Current& = Ice::emptyCurrent) override;
223
224 /**
225 * @brief Sents the manager's collected test data to the task.
226 */
228 /**
229 * @return The RRT's node count.
230 */
231 Ice::Long getNodeCount(const Ice::Current& = Ice::emptyCurrent) const override;
232
233 // ResourceManagementInterface interface
234 void setMaxCpus(Ice::Int maxCpus, const Ice::Current& = Ice::emptyCurrent) override;
235
236 Ice::Int getMaxCpus(const Ice::Current& = Ice::emptyCurrent) const override;
237
238 protected:
239 friend class GenericFactory<ManagerNodeBase, ManagerNode>;
240 /**
241 * @brief Ctor used by ice factories.
242 */
243 ManagerNode() = default;
244
245 /**
246 * @brief Creates a new worker on the given remote object node.
247 * @param remoteObjectNodeIndex The remote object node's index.
248 */
249 void createNewWorkerOn(std::size_t remoteObjectNodeIndex);
250
251 /**
252 * @brief The managet task.checkedCastIt checks whether new workers are required and starts them if this is the case.
253 * Ith checks whether planing has finished.
254 */
255 void managerTask();
256
257 //not threadsafe! use only when holding updateMutex
258 /**
259 * @brief Returns whether the given update is cached. (requires the caller to hold the updateMutex)
260 * @param workerId The updates worker id.
261 * @param updateId The updates sub id.
262 * @return Whether the given update is cached.
263 */
264 bool hasLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const;
265 //not threadsafe! use only when holding updateMutex
266 /**
267 * @brief Returns the requested update from the cache. (requires the caller to hold the updateMutex)
268 * @param workerId The updates worker id.
269 * @param updateId The updates sub id.
270 * @return The requested update from the cache.
271 */
272 const Update& getLocalUpdateRequiresUpdateMutex(std::size_t workerId,
273 std::size_t updateId) const;
274 //not threadsafe! use only when holding updateMutex
275 /**
276 * @brief Returns the requested update fetched from the corresponding worker. (requires the caller to hold the updateMutex)
277 * @param workerId The updates worker id.
278 * @param updateId The updates sub id.
279 * @return The requested update fetched from the corresponding worker.
280 */
281 Update getRemoteUpdate(std::size_t workerId, std::size_t updateId) const;
282
283 /**
284 * @brief Applies all pending updates. (requires the user to hold update and tree mutex)
285 * @param updateLock Lock for the update mutex. it will be unlocked when getting a remote update.
286 */
287 void applyUpdatesNotThreadSafe(std::unique_lock<std::mutex>& updateLock);
288 /**
289 * @brief Creates a new worker. (the remote object node is selected automatically)
290 */
291 void createNewWorker();
292 /**
293 * @brief Shuts down and removes all workers. (Their newest data is fetched before destruction.)
294 */
295 void cleanupAllWorkers();
296
297 /**
298 * @brief Stores the given applied update to the cache. (requires the caller to hold the updateMutex)
299 * @param u The update.
300 */
302
303 /**
304 * @return Whether planning is done.
305 */
306 bool isPlanningDone();
307
308 /**
309 * @return Whether planning has failed.
310 */
311 bool hasTimeRunOut();
312
313 /**
314 * @brief getActiveWorkerCount returns the number of currently active workers.
315 * @return
316 */
317 std::size_t
319 {
320 return activeWorkerCount;
321 }
322
323 /**
324 * @brief Returns the number of currently available workers (both active and paused).
325 * @return
326 */
327 std::size_t
329 {
330 return workers.size();
331 }
332
333 //management
334 /**
335 * @brief Flag to signal the manager thread to exit.
336 */
337 std::atomic_bool killRequest;
338 /**
339 * @brief The tread executing \ref managerTask.
340 */
341 std::thread managerThread;
342 /**
343 * @brief currentlyActiveWorkers the index of the newest planning process in the workers vector (is <= workers.size()).
344 */
345 std::atomic_size_t activeWorkerCount;
346 /**
347 * @brief Worker proxies.
348 * worker[i][j] is the j-th worker on the remote object node remoteObjectNodes[i]
349 */
350 std::vector<RemoteHandle<WorkerNodeBasePrx>> workers;
351 /**
352 * @brief How many workers are started on each remote object node.
353 */
354 std::vector<std::size_t> workersPerRemoteObjectNode;
355 /**
356 * @brief How many workers are maximal allowed on each remote object node.
357 */
358 std::vector<std::size_t> maxWorkersPerRemoteObjectNode;
359 /**
360 * @brief used to lock access to the vector workers
361 */
362 mutable std::mutex workerMutex;
363
364 //updates
365 /**
366 * @brief Protects the update section of the tree and the update cache of the manager
367 */
368 mutable std::mutex updateMutex;
369 /**
370 * @brief The update topic's prefix. (to ensure unique names.)
371 */
372 std::string updateTopicPrefix;
373 /**
374 * @brief All applied updates. (per worker)
375 */
376 std::vector<std::deque<Update>> appliedUpdates;
377
378 /**
379 * @brief Used when shutting down to ensure all updates were applied before destroying the nodes remote object.
380 */
381 std::vector<Ice::Long> workersFinalUpdateId;
382 //rrt
383 /**
384 * @brief protects the tree data
385 */
386 mutable std::mutex treeMutex;
387 /**
388 * @brief CV used by the manager thread to wait for new updates.
389 */
390 std::condition_variable managerEvent;
391 /**
392 * @brief The RRT
393 */
395
396 /**
397 * @brief The rotation matrix used by the informed samplers.
398 */
399 Ice::FloatSeq rotationMatrix;
400
401 /**
402 * @brief Timepoint when the manager node started planning.
403 * Used to check whether the maximal planning time was exceeded.
404 */
405 ClockType::time_point timepointStart;
406
407 /**
408 *@brief required for static factory methode create
409 */
410 friend class ManagedIceObject;
411 };
412} // namespace armarx::addirrtstar
Manages the planning of the addirrt* algorithm.
Definition ManagerNode.h:59
std::vector< std::size_t > maxWorkersPerRemoteObjectNode
How many workers are maximal allowed on each remote object node.
void updateTree(const Update &u, const Ice::Current &=Ice::emptyCurrent) override
Adds the given update to the queue of pending updates.
void onInitComponent() override
Initializes the tree and sampler.
std::size_t getActiveWorkerCount() const
getActiveWorkerCount returns the number of currently active workers.
Update getRemoteUpdate(std::size_t workerId, std::size_t updateId) const
Returns the requested update fetched from the corresponding worker.
std::vector< RemoteHandle< WorkerNodeBasePrx > > workers
Worker proxies.
PathWithCostSeq getAllPathsWithCost(const Ice::Current &=Ice::emptyCurrent) const override
Ice::Long getPathCount(const Ice::Current &=Ice::emptyCurrent) const override
bool hasLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
Returns whether the given update is cached.
std::vector< std::size_t > workersPerRemoteObjectNode
How many workers are started on each remote object node.
ManagerNode(TaskBasePrx task, RemoteObjectNodePrxList remoteObjectNodes, Ice::Long initialWorkerCount, Ice::Long maximalWorkerCount, const cprs::ComputingPowerRequestStrategyBasePtr &planningComputingPowerRequestStrategy, float dcdStep, Ice::Long maximalPlanningTimeInSeconds, Ice::Long batchSize, Ice::Long nodeCountDeltaForGoalConnectionTries, CSpaceBasePtr cspace, VectorXf startCfg, VectorXf goalCfg, AdaptiveDynamicDomainParameters addParams, float targetCost)
Ctor.
Definition ManagerNode.h:83
void onDisconnectComponent() override
noop.
std::mutex workerMutex
used to lock access to the vector workers
void createNewWorkerOn(std::size_t remoteObjectNodeIndex)
Creates a new worker on the given remote object node.
std::atomic_bool killRequest
Flag to signal the manager thread to exit.
std::vector< Ice::Long > workersFinalUpdateId
Used when shutting down to ensure all updates were applied before destroying the nodes remote object.
std::string updateTopicPrefix
The update topic's prefix.
std::vector< std::deque< Update > > appliedUpdates
All applied updates.
Update getUpdate(Ice::Long workerId, Ice::Long updateId, const Ice::Current &=Ice::emptyCurrent) const override
friend class ManagedIceObject
required for static factory methode create
std::mutex updateMutex
Protects the update section of the tree and the update cache of the manager.
std::thread managerThread
The tread executing managerTask.
PathWithCost getNthPathWithCost(Ice::Long n, const Ice::Current &=Ice::emptyCurrent) const override
void cleanupAllWorkers()
Shuts down and removes all workers.
Ice::FloatSeq rotationMatrix
The rotation matrix used by the informed samplers.
void sendManagerNodeData() const
Sents the manager's collected test data to the task.
PathWithCost getBestPath(const Ice::Current &=Ice::emptyCurrent) const override
void cacheAppliedUpdateRequiresUpdateMutex(Update &&u)
Stores the given applied update to the cache.
void onConnectComponent() override
noop.
void setMaxCpus(Ice::Int maxCpus, const Ice::Current &=Ice::emptyCurrent) override
void setWorkersFinalUpdateId(Ice::Long workerId, Ice::Long finalUpdateId, const Ice::Current &) override
Used by workers to inform the manager about their number of updates before exiting.
FullIceTree getTree(const Ice::Current &=Ice::emptyCurrent) const override
ManagerNode()=default
Ctor used by ice factories.
std::atomic_size_t activeWorkerCount
currentlyActiveWorkers the index of the newest planning process in the workers vector (is <= workers....
const Update & getLocalUpdateRequiresUpdateMutex(std::size_t workerId, std::size_t updateId) const
Returns the requested update from the cache.
ClockType::time_point timepointStart
Timepoint when the manager node started planning.
void managerTask()
The managet task.checkedCastIt checks whether new workers are required and starts them if this is the...
std::size_t getWorkerCount() const
Returns the number of currently available workers (both active and paused).
void createNewWorker()
Creates a new worker.
void onExitComponent() override
Stopps planning and joins the manager thread.
Ice::Long getNodeCount(const Ice::Current &=Ice::emptyCurrent) const override
std::condition_variable managerEvent
CV used by the manager thread to wait for new updates.
Ice::Int getMaxCpus(const Ice::Current &=Ice::emptyCurrent) const override
std::mutex treeMutex
protects the tree data
void kill(const Ice::Current &=Ice::emptyCurrent) override
Sets the flag to stop planning.
std::string getDefaultName() const override
void applyUpdatesNotThreadSafe(std::unique_lock< std::mutex > &updateLock)
Applies all pending updates.
A structure holding and managing all data connected to the tree used in the ADDIRRT* algorithm.
Definition Tree.h:50
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_VERBOSE_S
Definition Logging.h:207
IceInternal::Handle< ManagerNode > ManagerNodePtr
An ice handle for a ManagerNode.
Definition ManagerNode.h:52
This file offers overloads of toIce() and fromIce() functions for STL container types.