WorkerNode.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package RobotComponents
19 * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20 * @date 2015
21 * @copyright http://www.gnu.org/licenses/gpl.txt
22 * GNU General Public License
23 */
24
25#include "WorkerNode.h"
26
27#include "../../CSpace/CSpace.h"
28
29namespace armarx::rrtconnect
30{
31 void
33 {
34 abortRequest = false;
35 usingTopic(topicName);
36 offeringTopic(topicName);
37 updater.setWorkerId(workerId);
40 }
41
42 void
44 {
46 if (!workerThread.joinable())
47 {
48 workerThread = std::thread{[this]
49 {
50 try
51 {
52 workerTask();
53 }
54 catch (Ice::Exception& e)
55 {
57 << "The worker thread of worker " << workerId
58 << " had an Ice::Exception! \nStack trace:\n"
59 << e.ice_stackTrace();
60 }
61 }};
62 }
63 }
64
65 void
67 {
68 abort();
69 workerThread.join();
70 }
71
72 Update
73 WorkerNode::getUpdate(Ice::Long updateId, const Ice::Current&) const
74 {
75 std::lock_guard<std::mutex> lock{updateMutex};
76 ARMARX_VERBOSE_S << "[worker " << workerId << "] request for update " << updateId << " (of "
77 << localUpdates.size() << ")";
78 return localUpdates.at(updateId);
79 }
80
81 void
82 WorkerNode::setWorkerNodes(const WorkerNodeBasePrxList& workers, const Ice::Current&)
83 {
85 {
86 std::lock_guard<std::mutex> lock{updateMutex};
87 this->workers = workers;
88 updater.setWorkerCount(workers.size());
89 }
90 doApplyUpdates = true;
91 }
92
93 void
94 WorkerNode::updateTree(const Update& u, const Ice::Current&)
95 {
96 std::lock_guard<std::mutex> lock{updateMutex};
97 updater.addPendingUpdate(u);
98 }
99
100 void
102 {
103 if (doApplyUpdates)
104 {
105 std::unique_lock<std::mutex> lock{updateMutex};
106 updater.applyPendingUpdates(lock,
107 [this](Ice::Long workerId, Ice::Long updateId)
108 { return getRemoteUpdate(workerId, updateId); });
110 }
111 }
112
113 void
115 {
116 treeFromStart.setMaximalWorkerCount(workerCount);
117 treeFromGoal.setMaximalWorkerCount(workerCount);
118 treeFromStart.setRoot(startCfg);
119 treeFromGoal.setRoot(goalCfg);
120 }
121
122 void
124 {
125 cspace->initCollisionTest();
126
127 //init sampler
128 const auto cspaceDims = cspace->getDimensionsBounds();
129 std::vector<std::pair<float, float>> dimensionBounds{};
130 dimensionBounds.reserve(getDimensionality());
131 std::transform(cspaceDims.begin(),
132 cspaceDims.end(),
133 std::back_inserter(dimensionBounds),
134 [](const FloatRange& dim) { return std::make_pair(dim.min, dim.max); });
135
136 sampler.reset(
137 new CuboidSampler{typename CuboidSampler::DistributionType{dimensionBounds.begin(),
138 dimensionBounds.end()},
139 typename CuboidSampler::GeneratorType{std::random_device{}()}});
140
141 //util functions
142 auto cspaceDerived = CSpacePtr::dynamicCast(cspace);
143 auto steer = [cspaceDerived, this](const ConfigType& from, const ConfigType& to)
144 {
145 ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(cspaceDerived->getDimensionality()) ==
146 from.size());
147 ARMARX_CHECK_EXPRESSION(from.size() == to.size());
148
149 return dcdSteer(from,
150 to,
151 DCDStepSize,
152 [cspaceDerived](const ConfigType& cfg)
153 { return cspaceDerived->isCollisionFree(cfg); });
154 };
155 //init trees
156 treeFromStart.setMaximalWorkerCount(workerCount);
157 treeFromGoal.setMaximalWorkerCount(workerCount);
158 treeFromStart.setRoot(startCfg);
159 treeFromGoal.setRoot(goalCfg);
160
162
163 //plan
164 while (true)
165 {
166 if (abortRequest)
167 {
168 break;
169 }
170 //update
171 applyUpdates();
172
173 //plan
174 auto& primTree = getPrimaryTree();
175 auto& secnTree = getSecondaryTree();
176
177 ConfigType randomCfg(getDimensionality());
178 (*sampler)(randomCfg.data());
179
180 const auto nearId = primTree.getNearestNeighbour(randomCfg);
181 const auto& nearNode = primTree.getNode(nearId);
182
183 const auto reachCfg = steer(nearNode.cfg, randomCfg);
184 if (reachCfg != nearNode.cfg)
185 {
186 //add node
187 const auto reachId = addNodeToPrimaryTree(reachCfg, nearId);
188
189 const auto nearSecondId = secnTree.getNearestNeighbour(reachCfg);
190 const auto& nearSecondNode = secnTree.getNode(nearSecondId);
191
192 const auto reachSecondCfg = steer(nearSecondNode.cfg, reachCfg);
193 if (reachSecondCfg != nearSecondNode.cfg)
194 {
195 //add node
196 const auto reachSecondId = addNodeToSecondaryTree(reachSecondCfg, nearSecondId);
197 if (reachSecondCfg == reachCfg)
198 {
199 //found path. set path + exit
200 const auto startTreeId = fromStartIsPrimaryTree ? reachId : reachSecondId;
201 const auto goalTreeId = fromStartIsPrimaryTree ? reachSecondId : reachId;
202
203 VectorXfSeq p = treeFromStart.getPathTo(startTreeId);
204 p.pop_back(); //pop the middle node (it would be added 2 times)
205
206 VectorXfSeq pPart2 = treeFromGoal.getReversedPathTo(goalTreeId);
207 std::move(pPart2.begin(), pPart2.end(), std::back_inserter(p));
208 task->setPath({p, "Path"});
209 break;
210 }
211 }
212 }
213 sendUpdate();
214 swapTrees();
215 }
216 //cleanup
217 if (!abortRequest)
218 {
219 globalWorkers->abort();
220 }
221 task->workerHasAborted(workerId);
222 }
223
224 void
226 {
227 std::lock_guard<std::mutex> lock{updateMutex};
228 //set dependent updates + id
230 ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(workerId) <
231 updater.getAppliedUpdateIds().size());
232 localUpdates.back().workerId = workerId;
233 localUpdates.back().dependetOnUpdateIds = updater.getAppliedUpdateIds();
234 //current update id = #updates -1
235 ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(workerId) <
236 localUpdates.back().dependetOnUpdateIds.size());
237 localUpdates.back().dependetOnUpdateIds.at(workerId) =
238 static_cast<Ice::Long>(localUpdates.size()) - 2;
239
240 globalWorkers->updateTree(localUpdates.back());
241 //next update
243 }
244
245 void
247 {
248 localUpdates.emplace_back();
249 localUpdates.back().updatesPerTree.resize(2);
250 }
251
252 NodeId
254 const NodeId& parent,
255 bool addToPrimaryTree)
256 {
257 cspace->initCollisionTest();
258 auto& tree = addToPrimaryTree ? getPrimaryTree() : getSecondaryTree();
259 tree.addNode(cfg, parent, workerId);
260
261 const auto treeId = addToPrimaryTree ? getPrimaryTreeId() : getSecondaryTreeId();
262 ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(treeId) <
263 localUpdates.back().updatesPerTree.size());
264 localUpdates.back().updatesPerTree.at(treeId).nodes.emplace_back(
265 NodeCreationUpdate{cfg, parent});
266
267 ARMARX_CHECK_EXPRESSION(workerId < static_cast<Ice::Long>(tree.getNodes().size()));
268 return {workerId,
269 static_cast<Ice::Long>(tree.getNodes().at(workerId).size()) -
270 1}; //wid + index of new node
271 }
272} // namespace armarx::rrtconnect
void offeringTopic(const std::string &name)
Registers a topic for retrival after initialization.
TopicProxyType getTopic(const std::string &name)
Returns a proxy of the specified topic.
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
void updateTree(const Update &u, const Ice::Current &=Ice::emptyCurrent) override
void onInitComponent() override
Pure virtual hook for the subclass.
void abort(const ::Ice::Current &=Ice::emptyCurrent) override
Definition WorkerNode.h:93
NodeId addNodeToSecondaryTree(const ConfigType &cfg, const NodeId &parent)
Definition WorkerNode.h:144
void workerTask()
The worker task.
std::unique_ptr< CuboidSampler > sampler
Definition WorkerNode.h:206
Ice::Byte getSecondaryTreeId() const
Definition WorkerNode.h:177
std::deque< Update > localUpdates
Definition WorkerNode.h:204
Ice::Byte getPrimaryTreeId() const
Definition WorkerNode.h:171
TreeUpdateInterfacePrx globalWorkers
Definition WorkerNode.h:200
Update getUpdate(Ice::Long updateId, const Ice::Current &=Ice::emptyCurrent) const override
WorkerNodeBasePrxList workers
Definition WorkerNode.h:196
NodeId addNodeToPrimaryTree(const ConfigType &cfg, const NodeId &parent)
Definition WorkerNode.h:138
std::atomic_bool abortRequest
Definition WorkerNode.h:202
std::atomic_bool doApplyUpdates
Definition WorkerNode.h:195
NodeId addNode(const ConfigType &cfg, const NodeId &parent, bool addToPrimaryTree)
void onConnectComponent() override
Pure virtual hook for the subclass.
Update getRemoteUpdate(Ice::Long workerNodeId, Ice::Long updateSubId)
Definition WorkerNode.h:119
void setWorkerNodes(const WorkerNodeBasePrxList &workers, const Ice::Current &=Ice::emptyCurrent) override
void onExitComponent() override
Hook for subclass.
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_ERROR_S
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:216
#define ARMARX_VERBOSE_S
Definition Logging.h:207
ConfigType dcdSteer(const ConfigType &from, const ConfigType &to, RealType dcdStepSize, CollisionChecker isCollisionFree)
Tries to reach to from from using the given stepsize.
Sampler< UniformCuboidDistribution< float >, std::mt19937 > CuboidSampler
Definition Samplers.h:323