WorkerNode.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5  *
6  * ArmarX is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 as
8  * published by the Free Software Foundation.
9  *
10  * ArmarX is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  * @package RobotComponents
19  * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20  * @date 2015
21  * @copyright http://www.gnu.org/licenses/gpl.txt
22  * GNU General Public License
23  */
24 
25 #include "WorkerNode.h"
26 #include "../../CSpace/CSpace.h"
27 namespace armarx::rrtconnect
28 {
30  {
31  abortRequest = false;
32  usingTopic(topicName);
33  offeringTopic(topicName);
34  updater.setWorkerId(workerId);
37  }
38 
40  {
41  globalWorkers = getTopic<TreeUpdateInterfacePrx>(topicName);
42  if (!workerThread.joinable())
43  {
44  workerThread = std::thread
45  {
46  [this]
47  {
48  try
49  {
50  workerTask();
51  }
52  catch (Ice::Exception& e)
53  {
54  ARMARX_ERROR_S << "The worker thread of worker " << workerId << " had an Ice::Exception! \nStack trace:\n"
55  << e.ice_stackTrace();
56  }
57  }
58  };
59  }
60  }
61 
63  {
64  abort();
65  workerThread.join();
66  }
67 
68  Update WorkerNode::getUpdate(Ice::Long updateId, const Ice::Current&) const
69  {
70  std::lock_guard<std::mutex> lock {updateMutex};
71  ARMARX_VERBOSE_S << "[worker " << workerId
72  << "] request for update " << updateId << " (of " << localUpdates.size() << ")";
73  return localUpdates.at(updateId);
74  }
75 
76  void WorkerNode::setWorkerNodes(const WorkerNodeBasePrxList& workers, const Ice::Current&)
77  {
79  {
80  std::lock_guard<std::mutex> lock {updateMutex};
81  this->workers = workers;
83  }
84  doApplyUpdates = true;
85  }
86 
87  void WorkerNode::updateTree(const Update& u, const Ice::Current&)
88  {
89  std::lock_guard<std::mutex> lock {updateMutex};
91  }
92 
94  {
95  if (doApplyUpdates)
96  {
97  std::unique_lock<std::mutex> lock {updateMutex};
99  lock,
100  [this](Ice::Long workerId, Ice::Long updateId)
101  {
102  return getRemoteUpdate(workerId, updateId);
103  }
104  );
106  }
107  }
108 
110  {
112  treeFromGoal.setMaximalWorkerCount(workerCount);
113  treeFromStart.setRoot(startCfg);
114  treeFromGoal.setRoot(goalCfg);
115  }
116 
118  {
119  cspace->initCollisionTest();
120 
121  //init sampler
122  const auto cspaceDims = cspace->getDimensionsBounds();
123  std::vector<std::pair<float, float>> dimensionBounds {};
124  dimensionBounds.reserve(getDimensionality());
126  cspaceDims.begin(), cspaceDims.end(),
127  std::back_inserter(dimensionBounds),
128  [](const FloatRange & dim)
129  {
130  return std::make_pair(dim.min, dim.max);
131  }
132  );
133 
134  sampler.reset(
135  new CuboidSampler
136  {
137  typename CuboidSampler::DistributionType{dimensionBounds.begin(), dimensionBounds.end()},
138  typename CuboidSampler::GeneratorType{std::random_device{}()}
139  }
140  );
141 
142  //util functions
143  auto cspaceDerived = CSpacePtr::dynamicCast(cspace);
144  auto steer = [cspaceDerived, this](const ConfigType & from, const ConfigType & to)
145  {
146  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(cspaceDerived->getDimensionality()) == from.size());
147  ARMARX_CHECK_EXPRESSION(from.size() == to.size());
148 
149  return dcdSteer(from, to, DCDStepSize, [cspaceDerived](const ConfigType & cfg)
150  {
151  return cspaceDerived->isCollisionFree(cfg);
152  });
153  };
154  //init trees
156  treeFromGoal.setMaximalWorkerCount(workerCount);
157  treeFromStart.setRoot(startCfg);
158  treeFromGoal.setRoot(goalCfg);
159 
161 
162  //plan
163  while (true)
164  {
165  if (abortRequest)
166  {
167  break;
168  }
169  //update
170  applyUpdates();
171 
172  //plan
173  auto& primTree = getPrimaryTree();
174  auto& secnTree = getSecondaryTree();
175 
176  ConfigType randomCfg(getDimensionality());
177  (*sampler)(randomCfg.data());
178 
179  const auto nearId = primTree.getNearestNeighbour(randomCfg);
180  const auto& nearNode = primTree.getNode(nearId);
181 
182  const auto reachCfg = steer(nearNode.cfg, randomCfg);
183  if (reachCfg != nearNode.cfg)
184  {
185  //add node
186  const auto reachId = addNodeToPrimaryTree(reachCfg, nearId);
187 
188  const auto nearSecondId = secnTree.getNearestNeighbour(reachCfg);
189  const auto& nearSecondNode = secnTree.getNode(nearSecondId);
190 
191  const auto reachSecondCfg = steer(nearSecondNode.cfg, reachCfg);
192  if (reachSecondCfg != nearSecondNode.cfg)
193  {
194  //add node
195  const auto reachSecondId = addNodeToSecondaryTree(reachSecondCfg, nearSecondId);
196  if (reachSecondCfg == reachCfg)
197  {
198  //found path. set path + exit
199  const auto startTreeId = fromStartIsPrimaryTree ? reachId : reachSecondId;
200  const auto goalTreeId = fromStartIsPrimaryTree ? reachSecondId : reachId;
201 
202  VectorXfSeq p = treeFromStart.getPathTo(startTreeId);
203  p.pop_back();//pop the middle node (it would be added 2 times)
204 
205  VectorXfSeq pPart2 = treeFromGoal.getReversedPathTo(goalTreeId);
206  std::move(pPart2.begin(), pPart2.end(), std::back_inserter(p));
207  task->setPath({p, "Path"});
208  break;
209  }
210  }
211  }
212  sendUpdate();
213  swapTrees();
214  }
215  //cleanup
216  if (!abortRequest)
217  {
218  globalWorkers->abort();
219  }
220  task->workerHasAborted(workerId);
221  }
222 
224  {
225  std::lock_guard<std::mutex> lock {updateMutex};
226  //set dependent updates + id
228  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(workerId) < updater.getAppliedUpdateIds().size());
229  localUpdates.back().workerId = workerId;
230  localUpdates.back().dependetOnUpdateIds = updater.getAppliedUpdateIds();
231  //current update id = #updates -1
232  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(workerId) < localUpdates.back().dependetOnUpdateIds.size());
233  localUpdates.back().dependetOnUpdateIds.at(workerId) = static_cast<Ice::Long>(localUpdates.size()) - 2;
234 
235  globalWorkers->updateTree(localUpdates.back());
236  //next update
238  }
239 
241  {
242  localUpdates.emplace_back();
243  localUpdates.back().updatesPerTree.resize(2);
244  }
245 
246  NodeId WorkerNode::addNode(const WorkerNode::ConfigType& cfg, const NodeId& parent, bool addToPrimaryTree)
247  {
248  cspace->initCollisionTest();
249  auto& tree = addToPrimaryTree ? getPrimaryTree() : getSecondaryTree();
250  tree.addNode(cfg, parent, workerId);
251 
252  const auto treeId = addToPrimaryTree ? getPrimaryTreeId() : getSecondaryTreeId();
253  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(treeId) < localUpdates.back().updatesPerTree.size());
254  localUpdates.back().updatesPerTree.at(treeId).nodes.emplace_back(NodeCreationUpdate {cfg, parent});
255 
256  ARMARX_CHECK_EXPRESSION(workerId < static_cast<Ice::Long>(tree.getNodes().size()));
257  return {workerId, static_cast<Ice::Long>(tree.getNodes().at(workerId).size()) - 1}; //wid + index of new node
258  }
259 }
armarx::rrtconnect::Updater::getAppliedUpdateIds
const std::vector< Ice::Long > & getAppliedUpdateIds() const
Definition: Updater.h:237
armarx::rrtconnect::WorkerNode::treeFromStart
Tree treeFromStart
Definition: WorkerNode.h:171
armarx::rrtconnect::Tree::getPathTo
std::vector< ConfigType > getPathTo(const NodeId &nodeId) const
armarx::rrtconnect::WorkerNode::ConfigType
VectorXf ConfigType
Definition: WorkerNode.h:58
armarx::rrtconnect::WorkerNode::abort
void abort(const ::Ice::Current &=Ice::emptyCurrent) override
Definition: WorkerNode.h:88
armarx::rrtconnect::Updater::setWorkerCount
void setWorkerCount(std::size_t count)
Definition: Updater.h:70
armarx::rrtconnect::WorkerNode::onInitComponent
void onInitComponent() override
Pure virtual hook for the subclass.
Definition: WorkerNode.cpp:29
armarx::rrtconnect::WorkerNode::abortRequest
std::atomic_bool abortRequest
Definition: WorkerNode.h:185
armarx::rrtconnect::Tree::setRoot
void setRoot(const ConfigType &root)
Definition: Tree.cpp:39
armarx::Sampler
Stores a distribution and a generator.
Definition: Samplers.h:47
armarx::rrtconnect::WorkerNode::workers
WorkerNodeBasePrxList workers
Definition: WorkerNode.h:179
armarx::rrtconnect::WorkerNode::addNode
NodeId addNode(const ConfigType &cfg, const NodeId &parent, bool addToPrimaryTree)
Definition: WorkerNode.cpp:246
armarx::rrtconnect::Updater::setTrees
void setTrees(const std::vector< std::reference_wrapper< Tree >> &newTrees)
Definition: Updater.h:80
armarx::rrtconnect::WorkerNode::localUpdates
std::deque< Update > localUpdates
Definition: WorkerNode.h:187
armarx::rrtconnect::Tree::getReversedPathTo
std::vector< ConfigType > getReversedPathTo(NodeId nodeId) const
Definition: Tree.cpp:52
armarx::rrtconnect::WorkerNode::updateMutex
std::mutex updateMutex
Definition: WorkerNode.h:175
armarx::rrtconnect::WorkerNode::addNodeToPrimaryTree
NodeId addNodeToPrimaryTree(const ConfigType &cfg, const NodeId &parent)
Definition: WorkerNode.h:131
armarx::rrtconnect::WorkerNode::sampler
std::unique_ptr< CuboidSampler > sampler
Definition: WorkerNode.h:189
armarx::rrtconnect::Tree::setMaximalWorkerCount
void setMaximalWorkerCount(std::size_t count)
Sets the maximal worker count to the given value.
Definition: Tree.h:66
armarx::rrtconnect::WorkerNode::getPrimaryTreeId
Ice::Byte getPrimaryTreeId() const
Definition: WorkerNode.h:157
armarx::rrtconnect::WorkerNode::treeFromGoal
Tree treeFromGoal
Definition: WorkerNode.h:172
armarx::rrtconnect::WorkerNode::doApplyUpdates
std::atomic_bool doApplyUpdates
Definition: WorkerNode.h:178
ARMARX_ERROR_S
#define ARMARX_ERROR_S
Definition: Logging.h:209
armarx::rrtconnect::WorkerNode::getDimensionality
std::size_t getDimensionality()
Definition: WorkerNode.h:143
armarx::Sampler::DistributionType
Distribution DistributionType
Type of the stored distribution.
Definition: Samplers.h:53
armarx::dcdSteer
ConfigType dcdSteer(const ConfigType &from, const ConfigType &to, RealType dcdStepSize, CollisionChecker isCollisionFree)
Tries to reach to from from using the given stepsize.
Definition: CollisionCheckUtil.h:46
armarx::rrtconnect
Definition: Task.cpp:29
armarx::rrtconnect::Updater::applyPendingUpdates
void applyPendingUpdates(LockType &&lock, RemoteUpdateGetter getRemoteUpdate)
Definition: Updater.h:109
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:917
armarx::rrtconnect::WorkerNode::fromStartIsPrimaryTree
bool fromStartIsPrimaryTree
Definition: WorkerNode.h:173
armarx::rrtconnect::WorkerNode::getSecondaryTree
Tree & getSecondaryTree()
Definition: WorkerNode.h:152
armarx::rrtconnect::WorkerNode::sendUpdate
void sendUpdate()
Definition: WorkerNode.cpp:223
armarx::rrtconnect::WorkerNode::onConnectComponent
void onConnectComponent() override
Pure virtual hook for the subclass.
Definition: WorkerNode.cpp:39
armarx::rrtconnect::WorkerNode::getRemoteUpdate
Update getRemoteUpdate(Ice::Long workerNodeId, Ice::Long updateSubId)
Definition: WorkerNode.h:113
armarx::rrtconnect::WorkerNode::workerTask
void workerTask()
The worker task.
Definition: WorkerNode.cpp:117
armarx::rrtconnect::WorkerNode::prepareNextUpdate
void prepareNextUpdate()
Definition: WorkerNode.cpp:240
armarx::rrtconnect::WorkerNode::updateTree
void updateTree(const Update &u, const Ice::Current &=Ice::emptyCurrent) override
Definition: WorkerNode.cpp:87
armarx::ManagedIceObject::usingTopic
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Definition: ManagedIceObject.cpp:248
ARMARX_CHECK_EXPRESSION
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
Definition: ExpressionException.h:73
armarx::rrtconnect::WorkerNode::onExitComponent
void onExitComponent() override
Hook for subclass.
Definition: WorkerNode.cpp:62
armarx::rrtconnect::WorkerNode::applyUpdates
void applyUpdates()
Definition: WorkerNode.cpp:93
armarx::rrtconnect::WorkerNode::addNodeToSecondaryTree
NodeId addNodeToSecondaryTree(const ConfigType &cfg, const NodeId &parent)
Definition: WorkerNode.h:135
armarx::ManagedIceObject::offeringTopic
void offeringTopic(const std::string &name)
Registers a topic for retrival after initialization.
Definition: ManagedIceObject.cpp:290
armarx::transform
auto transform(const Container< InputT, Alloc > &in, OutputT(*func)(InputT const &)) -> Container< OutputT, typename std::allocator_traits< Alloc >::template rebind_alloc< OutputT > >
Convenience function (with less typing) to transform a container of type InputT into the same contain...
Definition: algorithm.h:315
armarx::rrtconnect::WorkerNode::getPrimaryTree
Tree & getPrimaryTree()
Definition: WorkerNode.h:148
armarx::rrtconnect::Updater::addPendingUpdate
void addPendingUpdate(const Update &u)
Definition: Updater.cpp:35
armarx::rrtconnect::WorkerNode::workerThread
std::thread workerThread
Definition: WorkerNode.h:181
WorkerNode.h
armarx::rrtconnect::Updater::setWorkerId
void setWorkerId(Ice::Long newId)
Definition: Updater.cpp:28
ARMARX_VERBOSE_S
#define ARMARX_VERBOSE_S
Definition: Logging.h:200
armarx::rrtconnect::WorkerNode::globalWorkers
TreeUpdateInterfacePrx globalWorkers
Definition: WorkerNode.h:183
armarx::rrtconnect::WorkerNode::swapTrees
void swapTrees()
Definition: WorkerNode.h:166
armarx::Sampler::GeneratorType
Generator GeneratorType
Type of the stored generator.
Definition: Samplers.h:57
armarx::rrtconnect::WorkerNode::getUpdate
Update getUpdate(Ice::Long updateId, const Ice::Current &=Ice::emptyCurrent) const override
Definition: WorkerNode.cpp:68
armarx::rrtconnect::WorkerNode::updater
Updater updater
Definition: WorkerNode.h:176
armarx::rrtconnect::WorkerNode::getSecondaryTreeId
Ice::Byte getSecondaryTreeId() const
Definition: WorkerNode.h:161
armarx::rrtconnect::WorkerNode::ice_postUnmarshal
void ice_postUnmarshal() override
Definition: WorkerNode.cpp:109
armarx::rrtconnect::WorkerNode::setWorkerNodes
void setWorkerNodes(const WorkerNodeBasePrxList &workers, const Ice::Current &=Ice::emptyCurrent) override
Definition: WorkerNode.cpp:76