WorkerNode.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5  *
6  * ArmarX is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 as
8  * published by the Free Software Foundation.
9  *
10  * ArmarX is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  * @package RobotComponents
19  * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20  * @date 2015
21  * @copyright http://www.gnu.org/licenses/gpl.txt
22  * GNU General Public License
23  */
24 
25 #include "WorkerNode.h"
26 
27 #include "../../CSpace/CSpace.h"
28 
29 namespace armarx::rrtconnect
30 {
31  void
33  {
34  abortRequest = false;
35  usingTopic(topicName);
36  offeringTopic(topicName);
37  updater.setWorkerId(workerId);
40  }
41 
42  void
44  {
45  globalWorkers = getTopic<TreeUpdateInterfacePrx>(topicName);
46  if (!workerThread.joinable())
47  {
48  workerThread = std::thread{[this]
49  {
50  try
51  {
52  workerTask();
53  }
54  catch (Ice::Exception& e)
55  {
57  << "The worker thread of worker " << workerId
58  << " had an Ice::Exception! \nStack trace:\n"
59  << e.ice_stackTrace();
60  }
61  }};
62  }
63  }
64 
65  void
67  {
68  abort();
69  workerThread.join();
70  }
71 
72  Update
73  WorkerNode::getUpdate(Ice::Long updateId, const Ice::Current&) const
74  {
75  std::lock_guard<std::mutex> lock{updateMutex};
76  ARMARX_VERBOSE_S << "[worker " << workerId << "] request for update " << updateId << " (of "
77  << localUpdates.size() << ")";
78  return localUpdates.at(updateId);
79  }
80 
81  void
82  WorkerNode::setWorkerNodes(const WorkerNodeBasePrxList& workers, const Ice::Current&)
83  {
85  {
86  std::lock_guard<std::mutex> lock{updateMutex};
87  this->workers = workers;
89  }
90  doApplyUpdates = true;
91  }
92 
93  void
94  WorkerNode::updateTree(const Update& u, const Ice::Current&)
95  {
96  std::lock_guard<std::mutex> lock{updateMutex};
98  }
99 
100  void
102  {
103  if (doApplyUpdates)
104  {
105  std::unique_lock<std::mutex> lock{updateMutex};
107  [this](Ice::Long workerId, Ice::Long updateId)
108  { return getRemoteUpdate(workerId, updateId); });
110  }
111  }
112 
113  void
115  {
117  treeFromGoal.setMaximalWorkerCount(workerCount);
118  treeFromStart.setRoot(startCfg);
119  treeFromGoal.setRoot(goalCfg);
120  }
121 
122  void
124  {
125  cspace->initCollisionTest();
126 
127  //init sampler
128  const auto cspaceDims = cspace->getDimensionsBounds();
129  std::vector<std::pair<float, float>> dimensionBounds{};
130  dimensionBounds.reserve(getDimensionality());
131  std::transform(cspaceDims.begin(),
132  cspaceDims.end(),
133  std::back_inserter(dimensionBounds),
134  [](const FloatRange& dim) { return std::make_pair(dim.min, dim.max); });
135 
136  sampler.reset(
137  new CuboidSampler{typename CuboidSampler::DistributionType{dimensionBounds.begin(),
138  dimensionBounds.end()},
139  typename CuboidSampler::GeneratorType{std::random_device{}()}});
140 
141  //util functions
142  auto cspaceDerived = CSpacePtr::dynamicCast(cspace);
143  auto steer = [cspaceDerived, this](const ConfigType& from, const ConfigType& to)
144  {
145  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(cspaceDerived->getDimensionality()) ==
146  from.size());
147  ARMARX_CHECK_EXPRESSION(from.size() == to.size());
148 
149  return dcdSteer(from,
150  to,
151  DCDStepSize,
152  [cspaceDerived](const ConfigType& cfg)
153  { return cspaceDerived->isCollisionFree(cfg); });
154  };
155  //init trees
157  treeFromGoal.setMaximalWorkerCount(workerCount);
158  treeFromStart.setRoot(startCfg);
159  treeFromGoal.setRoot(goalCfg);
160 
162 
163  //plan
164  while (true)
165  {
166  if (abortRequest)
167  {
168  break;
169  }
170  //update
171  applyUpdates();
172 
173  //plan
174  auto& primTree = getPrimaryTree();
175  auto& secnTree = getSecondaryTree();
176 
177  ConfigType randomCfg(getDimensionality());
178  (*sampler)(randomCfg.data());
179 
180  const auto nearId = primTree.getNearestNeighbour(randomCfg);
181  const auto& nearNode = primTree.getNode(nearId);
182 
183  const auto reachCfg = steer(nearNode.cfg, randomCfg);
184  if (reachCfg != nearNode.cfg)
185  {
186  //add node
187  const auto reachId = addNodeToPrimaryTree(reachCfg, nearId);
188 
189  const auto nearSecondId = secnTree.getNearestNeighbour(reachCfg);
190  const auto& nearSecondNode = secnTree.getNode(nearSecondId);
191 
192  const auto reachSecondCfg = steer(nearSecondNode.cfg, reachCfg);
193  if (reachSecondCfg != nearSecondNode.cfg)
194  {
195  //add node
196  const auto reachSecondId = addNodeToSecondaryTree(reachSecondCfg, nearSecondId);
197  if (reachSecondCfg == reachCfg)
198  {
199  //found path. set path + exit
200  const auto startTreeId = fromStartIsPrimaryTree ? reachId : reachSecondId;
201  const auto goalTreeId = fromStartIsPrimaryTree ? reachSecondId : reachId;
202 
203  VectorXfSeq p = treeFromStart.getPathTo(startTreeId);
204  p.pop_back(); //pop the middle node (it would be added 2 times)
205 
206  VectorXfSeq pPart2 = treeFromGoal.getReversedPathTo(goalTreeId);
207  std::move(pPart2.begin(), pPart2.end(), std::back_inserter(p));
208  task->setPath({p, "Path"});
209  break;
210  }
211  }
212  }
213  sendUpdate();
214  swapTrees();
215  }
216  //cleanup
217  if (!abortRequest)
218  {
219  globalWorkers->abort();
220  }
221  task->workerHasAborted(workerId);
222  }
223 
224  void
226  {
227  std::lock_guard<std::mutex> lock{updateMutex};
228  //set dependent updates + id
230  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(workerId) <
231  updater.getAppliedUpdateIds().size());
232  localUpdates.back().workerId = workerId;
233  localUpdates.back().dependetOnUpdateIds = updater.getAppliedUpdateIds();
234  //current update id = #updates -1
235  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(workerId) <
236  localUpdates.back().dependetOnUpdateIds.size());
237  localUpdates.back().dependetOnUpdateIds.at(workerId) =
238  static_cast<Ice::Long>(localUpdates.size()) - 2;
239 
240  globalWorkers->updateTree(localUpdates.back());
241  //next update
243  }
244 
245  void
247  {
248  localUpdates.emplace_back();
249  localUpdates.back().updatesPerTree.resize(2);
250  }
251 
252  NodeId
254  const NodeId& parent,
255  bool addToPrimaryTree)
256  {
257  cspace->initCollisionTest();
258  auto& tree = addToPrimaryTree ? getPrimaryTree() : getSecondaryTree();
259  tree.addNode(cfg, parent, workerId);
260 
261  const auto treeId = addToPrimaryTree ? getPrimaryTreeId() : getSecondaryTreeId();
262  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(treeId) <
263  localUpdates.back().updatesPerTree.size());
264  localUpdates.back().updatesPerTree.at(treeId).nodes.emplace_back(
265  NodeCreationUpdate{cfg, parent});
266 
267  ARMARX_CHECK_EXPRESSION(workerId < static_cast<Ice::Long>(tree.getNodes().size()));
268  return {workerId,
269  static_cast<Ice::Long>(tree.getNodes().at(workerId).size()) -
270  1}; //wid + index of new node
271  }
272 } // namespace armarx::rrtconnect
armarx::rrtconnect::Updater::getAppliedUpdateIds
const std::vector< Ice::Long > & getAppliedUpdateIds() const
Definition: Updater.h:257
armarx::rrtconnect::WorkerNode::treeFromStart
Tree treeFromStart
Definition: WorkerNode.h:188
armarx::rrtconnect::Tree::getPathTo
std::vector< ConfigType > getPathTo(const NodeId &nodeId) const
armarx::rrtconnect::WorkerNode::ConfigType
VectorXf ConfigType
Definition: WorkerNode.h:58
armarx::rrtconnect::WorkerNode::abort
void abort(const ::Ice::Current &=Ice::emptyCurrent) override
Definition: WorkerNode.h:93
armarx::rrtconnect::Updater::setWorkerCount
void setWorkerCount(std::size_t count)
Definition: Updater.h:72
armarx::rrtconnect::WorkerNode::onInitComponent
void onInitComponent() override
Pure virtual hook for the subclass.
Definition: WorkerNode.cpp:32
armarx::rrtconnect::WorkerNode::abortRequest
std::atomic_bool abortRequest
Definition: WorkerNode.h:202
armarx::rrtconnect::Tree::setRoot
void setRoot(const ConfigType &root)
Definition: Tree.cpp:37
armarx::Sampler
Stores a distribution and a generator.
Definition: Samplers.h:47
armarx::rrtconnect::WorkerNode::workers
WorkerNodeBasePrxList workers
Definition: WorkerNode.h:196
armarx::view_selection::skills::direction::state::from
state::Type from(Eigen::Vector3f targetPosition)
Definition: LookDirection.cpp:194
armarx::rrtconnect::WorkerNode::addNode
NodeId addNode(const ConfigType &cfg, const NodeId &parent, bool addToPrimaryTree)
Definition: WorkerNode.cpp:253
armarx::rrtconnect::Updater::setTrees
void setTrees(const std::vector< std::reference_wrapper< Tree >> &newTrees)
Definition: Updater.h:83
armarx::rrtconnect::WorkerNode::localUpdates
std::deque< Update > localUpdates
Definition: WorkerNode.h:204
armarx::rrtconnect::Tree::getReversedPathTo
std::vector< ConfigType > getReversedPathTo(NodeId nodeId) const
Definition: Tree.cpp:51
armarx::rrtconnect::WorkerNode::updateMutex
std::mutex updateMutex
Definition: WorkerNode.h:192
armarx::rrtconnect::WorkerNode::addNodeToPrimaryTree
NodeId addNodeToPrimaryTree(const ConfigType &cfg, const NodeId &parent)
Definition: WorkerNode.h:138
armarx::rrtconnect::WorkerNode::sampler
std::unique_ptr< CuboidSampler > sampler
Definition: WorkerNode.h:206
armarx::rrtconnect::Tree::setMaximalWorkerCount
void setMaximalWorkerCount(std::size_t count)
Sets the maximal worker count to the given value.
Definition: Tree.h:66
armarx::rrtconnect::WorkerNode::getPrimaryTreeId
Ice::Byte getPrimaryTreeId() const
Definition: WorkerNode.h:171
armarx::rrtconnect::WorkerNode::treeFromGoal
Tree treeFromGoal
Definition: WorkerNode.h:189
armarx::rrtconnect::WorkerNode::doApplyUpdates
std::atomic_bool doApplyUpdates
Definition: WorkerNode.h:195
ARMARX_ERROR_S
#define ARMARX_ERROR_S
Definition: Logging.h:216
armarx::rrtconnect::WorkerNode::getDimensionality
std::size_t getDimensionality()
Definition: WorkerNode.h:153
armarx::Sampler::DistributionType
Distribution DistributionType
Type of the stored distribution.
Definition: Samplers.h:53
armarx::dcdSteer
ConfigType dcdSteer(const ConfigType &from, const ConfigType &to, RealType dcdStepSize, CollisionChecker isCollisionFree)
Tries to reach to from from using the given stepsize.
Definition: CollisionCheckUtil.h:47
armarx::rrtconnect
Definition: Task.cpp:29
armarx::rrtconnect::Updater::applyPendingUpdates
void applyPendingUpdates(LockType &&lock, RemoteUpdateGetter getRemoteUpdate)
Definition: Updater.h:116
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:918
armarx::rrtconnect::WorkerNode::fromStartIsPrimaryTree
bool fromStartIsPrimaryTree
Definition: WorkerNode.h:190
armarx::rrtconnect::WorkerNode::getSecondaryTree
Tree & getSecondaryTree()
Definition: WorkerNode.h:165
armarx::rrtconnect::WorkerNode::sendUpdate
void sendUpdate()
Definition: WorkerNode.cpp:225
armarx::rrtconnect::WorkerNode::onConnectComponent
void onConnectComponent() override
Pure virtual hook for the subclass.
Definition: WorkerNode.cpp:43
armarx::rrtconnect::WorkerNode::getRemoteUpdate
Update getRemoteUpdate(Ice::Long workerNodeId, Ice::Long updateSubId)
Definition: WorkerNode.h:119
armarx::rrtconnect::WorkerNode::workerTask
void workerTask()
The worker task.
Definition: WorkerNode.cpp:123
armarx::rrtconnect::WorkerNode::prepareNextUpdate
void prepareNextUpdate()
Definition: WorkerNode.cpp:246
armarx::transform
auto transform(const Container< InputT, Alloc > &in, OutputT(*func)(InputT const &)) -> Container< OutputT, typename std::allocator_traits< Alloc >::template rebind_alloc< OutputT >>
Convenience function (with less typing) to transform a container of type InputT into the same contain...
Definition: algorithm.h:351
armarx::rrtconnect::WorkerNode::updateTree
void updateTree(const Update &u, const Ice::Current &=Ice::emptyCurrent) override
Definition: WorkerNode.cpp:94
armarx::ManagedIceObject::usingTopic
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Definition: ManagedIceObject.cpp:254
ARMARX_CHECK_EXPRESSION
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
Definition: ExpressionException.h:73
armarx::rrtconnect::WorkerNode::onExitComponent
void onExitComponent() override
Hook for subclass.
Definition: WorkerNode.cpp:66
armarx::rrtconnect::WorkerNode::applyUpdates
void applyUpdates()
Definition: WorkerNode.cpp:101
armarx::rrtconnect::WorkerNode::addNodeToSecondaryTree
NodeId addNodeToSecondaryTree(const ConfigType &cfg, const NodeId &parent)
Definition: WorkerNode.h:144
armarx::ManagedIceObject::offeringTopic
void offeringTopic(const std::string &name)
Registers a topic for retrival after initialization.
Definition: ManagedIceObject.cpp:300
armarx::rrtconnect::WorkerNode::getPrimaryTree
Tree & getPrimaryTree()
Definition: WorkerNode.h:159
armarx::rrtconnect::Updater::addPendingUpdate
void addPendingUpdate(const Update &u)
Definition: Updater.cpp:37
armarx::rrtconnect::WorkerNode::workerThread
std::thread workerThread
Definition: WorkerNode.h:198
WorkerNode.h
armarx::rrtconnect::Updater::setWorkerId
void setWorkerId(Ice::Long newId)
Definition: Updater.cpp:29
ARMARX_VERBOSE_S
#define ARMARX_VERBOSE_S
Definition: Logging.h:207
armarx::rrtconnect::WorkerNode::globalWorkers
TreeUpdateInterfacePrx globalWorkers
Definition: WorkerNode.h:200
armarx::rrtconnect::WorkerNode::swapTrees
void swapTrees()
Definition: WorkerNode.h:183
armarx::Sampler::GeneratorType
Generator GeneratorType
Type of the stored generator.
Definition: Samplers.h:57
armarx::rrtconnect::WorkerNode::getUpdate
Update getUpdate(Ice::Long updateId, const Ice::Current &=Ice::emptyCurrent) const override
Definition: WorkerNode.cpp:73
armarx::rrtconnect::WorkerNode::updater
Updater updater
Definition: WorkerNode.h:193
armarx::rrtconnect::WorkerNode::getSecondaryTreeId
Ice::Byte getSecondaryTreeId() const
Definition: WorkerNode.h:177
armarx::rrtconnect::WorkerNode::ice_postUnmarshal
void ice_postUnmarshal() override
Definition: WorkerNode.cpp:114
armarx::rrtconnect::WorkerNode::setWorkerNodes
void setWorkerNodes(const WorkerNodeBasePrxList &workers, const Ice::Current &=Ice::emptyCurrent) override
Definition: WorkerNode.cpp:82