Updater.h
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5  *
6  * ArmarX is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 as
8  * published by the Free Software Foundation.
9  *
10  * ArmarX is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  * @package RobotComponents
19  * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20  * @date 2015
21  * @copyright http://www.gnu.org/licenses/gpl.txt
22  * GNU General Public License
23  */
24 #pragma once
25 
26 #include <functional>
27 #include <map>
28 #include <mutex>
29 #include <vector>
30 
32 
33 #include <RobotComponents/interface/components/MotionPlanning/Tasks/RRTConnect/DataStructures.h>
34 
35 #include "Tree.h"
36 
37 namespace armarx::rrtconnect
38 {
39  struct UpdateId
40  {
43  {
44  }
45 
46  UpdateId(const Update& u) :
47  workerId{u.workerId},
48  updateSubId{((workerId >= 0) &&
49  (static_cast<std::size_t>(workerId) < u.dependetOnUpdateIds.size()))
50  ? u.dependetOnUpdateIds.at(workerId) + 1
51  : -1}
52  {
53  }
54 
55  bool
56  operator<(const UpdateId& other) const
57  {
58  return (workerId < other.workerId) ||
59  ((workerId == other.workerId) && (updateSubId < other.updateSubId));
60  }
61 
64  };
65 
66  class Updater
67  {
68  public:
69  Updater() = default;
70 
71  void
72  setWorkerCount(std::size_t count)
73  {
74  if (appliedUpdateIds.size() < count)
75  {
76  appliedUpdateIds.resize(count, -1);
77  }
78  }
79 
81 
82  void
83  setTrees(const std::vector<std::reference_wrapper<Tree>>& newTrees)
84  {
85  trees = newTrees;
86  }
87 
88  //pending
89  void addPendingUpdate(const Update& u);
90 
91  void clearPendingUpdates();
92 
93  bool
94  hasPendingUpdate(const UpdateId& id) const
95  {
96  return pendingUpdateLookupTable.find(id) != pendingUpdateLookupTable.end();
97  }
98 
99  Update&
101  {
102  return getPendingUpdate(pendingUpdateLookupTable.at(id));
103  }
104 
105  Update&
106  getPendingUpdate(std::size_t id)
107  {
108  return pendingUpdates.at(id);
109  }
110 
111  //applying
112  void applyUpdate(const Update& u);
113 
114  template <class LockType, class RemoteUpdateGetter>
115  void
116  applyPendingUpdates(LockType&& lock, RemoteUpdateGetter getRemoteUpdate)
117  {
118  applyPendingUpdates(lock, getRemoteUpdate, [](Update&&) {});
119  }
120 
121  template <class LockType, class RemoteUpdateGetter, class UpdateConsumer>
122  void
123  applyPendingUpdates(LockType&& lock,
124  RemoteUpdateGetter getRemoteUpdate,
125  UpdateConsumer updateConsumer)
126  {
128  //we want i for an assert after the loop
129  std::size_t i = 0;
130 
131  for (; i < pendingUpdates.size(); ++i)
132  {
133  //apply this pending update
134  applyPendingUpdate(getPendingUpdate(i), lock, getRemoteUpdate, updateConsumer);
136  }
137 
139  ARMARX_CHECK_EXPRESSION(i == pendingUpdates.size());
140  //clear updates
142  }
143 
144  template <class LockType, class RemoteUpdateGetter, class UpdateConsumer>
145  void
147  LockType&& lock,
148  RemoteUpdateGetter getRemoteUpdate,
149  UpdateConsumer updateConsumer)
150  {
152  ARMARX_CHECK_EXPRESSION(u.workerId >= 0);
153 
154  if (u.dependetOnUpdateIds.empty())
155  {
156  //this update was moved from! => it was applied because it was queued out of order)
157  return;
158  }
159 
160  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(u.workerId) <
161  u.dependetOnUpdateIds.size());
162 
163  //is it an update from this node or was it already applied?
164  if (
165  //u.workerId == workerId || //self update //now handled by has applied update
166  hasAppliedUpdate(u) //update was already applied
167  )
168  {
169  return;
170  }
171 
172  //this update is new => check if it requires prev updates
173  //(if we have them: apply them directly, else get them from remote)
174  prepareUpdate(u.dependetOnUpdateIds, lock, getRemoteUpdate, updateConsumer);
176 
177  //now we have all required updates
179  applyUpdate(u);
180  //we dont need the update anymore. move it to the consumer
181  updateConsumer(std::move(u));
182  //mark the update as moved from
183  u.dependetOnUpdateIds.clear();
184  }
185 
186  template <class LockType, class RemoteUpdateGetter, class UpdateConsumer>
187  void
188  prepareUpdate(Ice::LongSeq dependetOnUpdateIds,
189  LockType&& lock,
190  RemoteUpdateGetter getRemoteUpdate,
191  UpdateConsumer updateConsumer)
192  {
194 
195  for (Ice::Long workerNodeId = 0;
196  static_cast<std::size_t>(workerNodeId) < dependetOnUpdateIds.size();
197  ++workerNodeId)
198  {
199  const auto updateSubId = dependetOnUpdateIds.at(workerNodeId);
200 
201  const UpdateId uId{workerNodeId, updateSubId};
202 
203  if (hasAppliedUpdate(uId))
204  {
205  //we have this update!
206  continue;
207  }
208  //the update was not applied
209 
210  //is this update local?
211  if (hasPendingUpdate(uId))
212  {
213  //local update => recurse
215  getPendingUpdate(uId), lock, getRemoteUpdate, updateConsumer);
216  }
217  else
218  {
220  //get the update from remote. this may take a while => unlock
221  lock.unlock();
222  Update update = getRemoteUpdate(workerNodeId, updateSubId);
223  lock.lock();
224  //prepare this update and apply it
226  update.dependetOnUpdateIds, lock, getRemoteUpdate, updateConsumer);
230  //we dont need the update anymore. move it to the consumer
231  updateConsumer(std::move(update));
232  //mark the update as moved from
233  update.dependetOnUpdateIds.clear();
234  }
235  }
236  }
237 
238  //applied
239  bool
240  hasAppliedUpdate(Ice::Long workerId, Ice::Long updateSubId) const
241  {
242  return (this->workerId == workerId) ||
243  ((static_cast<std::size_t>(workerId) < appliedUpdateIds.size())
244  ? updateSubId <= appliedUpdateIds.at(workerId)
245  : false);
246  }
247 
248  bool
249  hasAppliedUpdate(const UpdateId& id) const
250  {
251  return hasAppliedUpdate(id.workerId, id.updateSubId);
252  }
253 
254  bool canApplyUpdate(const Update& u);
255 
256  const std::vector<Ice::Long>&
258  {
259  return appliedUpdateIds;
260  }
261 
262  private:
263  std::vector<std::reference_wrapper<Tree>> trees;
264 
265  std::deque<Update> pendingUpdates;
266  std::map<UpdateId, std::size_t> pendingUpdateLookupTable;
267 
268  std::vector<Ice::Long> appliedUpdateIds;
269  Ice::Long workerId = -1;
270  };
271 } // namespace armarx::rrtconnect
armarx::rrtconnect::Updater::getAppliedUpdateIds
const std::vector< Ice::Long > & getAppliedUpdateIds() const
Definition: Updater.h:257
armarx::rrtconnect::UpdateId::updateSubId
Ice::Long updateSubId
Definition: Updater.h:63
armarx::rrtconnect::Updater::hasAppliedUpdate
bool hasAppliedUpdate(const UpdateId &id) const
Definition: Updater.h:249
armarx::rrtconnect::UpdateId::workerId
Ice::Long workerId
Definition: Updater.h:62
armarx::rrtconnect::Updater::setWorkerCount
void setWorkerCount(std::size_t count)
Definition: Updater.h:72
armarx::rrtconnect::Updater::hasPendingUpdate
bool hasPendingUpdate(const UpdateId &id) const
Definition: Updater.h:94
armarx::rrtconnect::Updater
Definition: Updater.h:66
armarx::rrtconnect::Updater::applyUpdate
void applyUpdate(const Update &u)
Definition: Updater.cpp:51
armarx::rrtconnect::UpdateId
Definition: Updater.h:39
armarx::rrtconnect::Updater::applyPendingUpdates
void applyPendingUpdates(LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition: Updater.h:123
armarx::rrtconnect::UpdateId::UpdateId
UpdateId(Ice::Long workerId=-1, Ice::Long updateSubId=-1)
Definition: Updater.h:41
armarx::rrtconnect::Updater::getPendingUpdate
Update & getPendingUpdate(const UpdateId &id)
Definition: Updater.h:100
armarx::rrtconnect::Updater::setTrees
void setTrees(const std::vector< std::reference_wrapper< Tree >> &newTrees)
Definition: Updater.h:83
armarx::rrtconnect::Updater::prepareUpdate
void prepareUpdate(Ice::LongSeq dependetOnUpdateIds, LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition: Updater.h:188
Tree.h
newId
auto newId
Definition: GraspingManager.cpp:76
armarx::rrtconnect::Updater::clearPendingUpdates
void clearPendingUpdates()
Definition: Updater.cpp:44
armarx::rrtconnect
Definition: Task.cpp:29
armarx::rrtconnect::Updater::Updater
Updater()=default
armarx::rrtconnect::Updater::applyPendingUpdates
void applyPendingUpdates(LockType &&lock, RemoteUpdateGetter getRemoteUpdate)
Definition: Updater.h:116
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:918
armarx::rrtconnect::Updater::getPendingUpdate
Update & getPendingUpdate(std::size_t id)
Definition: Updater.h:106
armarx::rrtconnect::Updater::applyPendingUpdate
void applyPendingUpdate(Update &u, LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition: Updater.h:146
armarx::armem::server::ltm::util::mongodb::detail::update
bool update(mongocxx::collection &coll, const nlohmann::json &query, const nlohmann::json &update)
Definition: mongodb.cpp:68
ExpressionException.h
armarx::rrtconnect::UpdateId::operator<
bool operator<(const UpdateId &other) const
Definition: Updater.h:56
ARMARX_CHECK_EXPRESSION
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
Definition: ExpressionException.h:73
armarx::rrtconnect::Updater::addPendingUpdate
void addPendingUpdate(const Update &u)
Definition: Updater.cpp:37
armarx::rrtconnect::Updater::setWorkerId
void setWorkerId(Ice::Long newId)
Definition: Updater.cpp:29
armarx::rrtconnect::Updater::hasAppliedUpdate
bool hasAppliedUpdate(Ice::Long workerId, Ice::Long updateSubId) const
Definition: Updater.h:240
armarx::rrtconnect::Updater::canApplyUpdate
bool canApplyUpdate(const Update &u)
Definition: Updater.cpp:66
armarx::rrtconnect::UpdateId::UpdateId
UpdateId(const Update &u)
Definition: Updater.h:46