Updater.h
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5  *
6  * ArmarX is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 as
8  * published by the Free Software Foundation.
9  *
10  * ArmarX is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  * @package RobotComponents
19  * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20  * @date 2015
21  * @copyright http://www.gnu.org/licenses/gpl.txt
22  * GNU General Public License
23  */
24 #pragma once
25 
26 #include <functional>
27 #include <vector>
28 #include <map>
29 #include <mutex>
30 
32 
33 #include <RobotComponents/interface/components/MotionPlanning/Tasks/RRTConnect/DataStructures.h>
34 
35 #include "Tree.h"
36 namespace armarx::rrtconnect
37 {
38  struct UpdateId
39  {
43  {}
44 
45  UpdateId(const Update& u):
46  workerId {u.workerId},
48  {
49  ((workerId >= 0) && (static_cast<std::size_t>(workerId) < u.dependetOnUpdateIds.size())) ?
50  u.dependetOnUpdateIds.at(workerId) + 1 :
51  -1
52  }
53  {}
54 
55  bool operator <(const UpdateId& other) const
56  {
57  return (workerId < other.workerId) || ((workerId == other.workerId) && (updateSubId < other.updateSubId));
58  }
59 
62  };
63 
64  class Updater
65  {
66  public:
67  Updater() = default;
68 
69 
70  void setWorkerCount(std::size_t count)
71  {
72  if (appliedUpdateIds.size() < count)
73  {
74  appliedUpdateIds.resize(count, -1);
75  }
76  }
77 
79 
80  void setTrees(const std::vector<std::reference_wrapper<Tree>>& newTrees)
81  {
82  trees = newTrees;
83  }
84 
85  //pending
86  void addPendingUpdate(const Update& u);
87 
88  void clearPendingUpdates();
89 
90  bool hasPendingUpdate(const UpdateId& id) const
91  {
92  return pendingUpdateLookupTable.find(id) != pendingUpdateLookupTable.end();
93  }
94 
95  Update& getPendingUpdate(const UpdateId& id)
96  {
97  return getPendingUpdate(pendingUpdateLookupTable.at(id));
98  }
99 
100  Update& getPendingUpdate(std::size_t id)
101  {
102  return pendingUpdates.at(id);
103  }
104 
105  //applying
106  void applyUpdate(const Update& u);
107 
108  template<class LockType, class RemoteUpdateGetter>
109  void applyPendingUpdates(LockType&& lock, RemoteUpdateGetter getRemoteUpdate)
110  {
111  applyPendingUpdates(lock, getRemoteUpdate, [](Update&&) {});
112  }
113 
114  template<class LockType, class RemoteUpdateGetter, class UpdateConsumer>
115  void applyPendingUpdates(LockType&& lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
116  {
118  //we want i for an assert after the loop
119  std::size_t i = 0;
120 
121  for (; i < pendingUpdates.size(); ++i)
122  {
123  //apply this pending update
124  applyPendingUpdate(getPendingUpdate(i), lock, getRemoteUpdate, updateConsumer);
126  }
127 
129  ARMARX_CHECK_EXPRESSION(i == pendingUpdates.size());
130  //clear updates
132  }
133 
134  template<class LockType, class RemoteUpdateGetter, class UpdateConsumer>
135  void applyPendingUpdate(Update& u, LockType&& lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
136  {
138  ARMARX_CHECK_EXPRESSION(u.workerId >= 0);
139 
140  if (u.dependetOnUpdateIds.empty())
141  {
142  //this update was moved from! => it was applied because it was queued out of order)
143  return;
144  }
145 
146  ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(u.workerId) < u.dependetOnUpdateIds.size());
147 
148  //is it an update from this node or was it already applied?
149  if (
150  //u.workerId == workerId || //self update //now handled by has applied update
151  hasAppliedUpdate(u)//update was already applied
152  )
153  {
154  return;
155  }
156 
157  //this update is new => check if it requires prev updates
158  //(if we have them: apply them directly, else get them from remote)
159  prepareUpdate(u.dependetOnUpdateIds, lock, getRemoteUpdate, updateConsumer);
161 
162  //now we have all required updates
164  applyUpdate(u);
165  //we dont need the update anymore. move it to the consumer
166  updateConsumer(std::move(u));
167  //mark the update as moved from
168  u.dependetOnUpdateIds.clear();
169  }
170 
171  template<class LockType, class RemoteUpdateGetter, class UpdateConsumer>
173  Ice::LongSeq dependetOnUpdateIds,
174  LockType&& lock,
175  RemoteUpdateGetter getRemoteUpdate,
176  UpdateConsumer updateConsumer
177  )
178  {
180 
181  for (Ice::Long workerNodeId = 0; static_cast<std::size_t>(workerNodeId) < dependetOnUpdateIds.size(); ++workerNodeId)
182  {
183  const auto updateSubId = dependetOnUpdateIds.at(workerNodeId);
184 
185  const UpdateId uId
186  {
187  workerNodeId, updateSubId
188  };
189 
190  if (hasAppliedUpdate(uId))
191  {
192  //we have this update!
193  continue;
194  }
195  //the update was not applied
196 
197  //is this update local?
198  if (hasPendingUpdate(uId))
199  {
200  //local update => recurse
201  applyPendingUpdate(getPendingUpdate(uId), lock, getRemoteUpdate, updateConsumer);
202  }
203  else
204  {
206  //get the update from remote. this may take a while => unlock
207  lock.unlock();
208  Update update = getRemoteUpdate(workerNodeId, updateSubId);
209  lock.lock();
210  //prepare this update and apply it
211  prepareUpdate(update.dependetOnUpdateIds, lock, getRemoteUpdate, updateConsumer);
215  //we dont need the update anymore. move it to the consumer
216  updateConsumer(std::move(update));
217  //mark the update as moved from
218  update.dependetOnUpdateIds.clear();
219  }
220  }
221  }
222 
223  //applied
224  bool hasAppliedUpdate(Ice::Long workerId, Ice::Long updateSubId) const
225  {
226  return (this->workerId == workerId) || ((static_cast<std::size_t>(workerId) < appliedUpdateIds.size()) ?
227  updateSubId <= appliedUpdateIds.at(workerId) : false);
228  }
229 
230  bool hasAppliedUpdate(const UpdateId& id) const
231  {
232  return hasAppliedUpdate(id.workerId, id.updateSubId);
233  }
234 
235  bool canApplyUpdate(const Update& u);
236 
237  const std::vector<Ice::Long>& getAppliedUpdateIds() const
238  {
239  return appliedUpdateIds;
240  }
241 
242  private:
243  std::vector<std::reference_wrapper<Tree>> trees;
244 
245  std::deque<Update> pendingUpdates;
246  std::map<UpdateId, std::size_t> pendingUpdateLookupTable;
247 
248  std::vector<Ice::Long> appliedUpdateIds;
249  Ice::Long workerId = -1;
250  };
251 }
armarx::rrtconnect::Updater::getAppliedUpdateIds
const std::vector< Ice::Long > & getAppliedUpdateIds() const
Definition: Updater.h:237
armarx::rrtconnect::UpdateId::updateSubId
Ice::Long updateSubId
Definition: Updater.h:61
armarx::rrtconnect::Updater::hasAppliedUpdate
bool hasAppliedUpdate(const UpdateId &id) const
Definition: Updater.h:230
armarx::rrtconnect::UpdateId::workerId
Ice::Long workerId
Definition: Updater.h:60
armarx::rrtconnect::Updater::setWorkerCount
void setWorkerCount(std::size_t count)
Definition: Updater.h:70
armarx::rrtconnect::Updater::hasPendingUpdate
bool hasPendingUpdate(const UpdateId &id) const
Definition: Updater.h:90
armarx::rrtconnect::Updater
Definition: Updater.h:64
armarx::rrtconnect::Updater::applyUpdate
void applyUpdate(const Update &u)
Definition: Updater.cpp:47
armarx::rrtconnect::UpdateId
Definition: Updater.h:38
armarx::rrtconnect::Updater::applyPendingUpdates
void applyPendingUpdates(LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition: Updater.h:115
armarx::rrtconnect::UpdateId::UpdateId
UpdateId(Ice::Long workerId=-1, Ice::Long updateSubId=-1)
Definition: Updater.h:40
armarx::rrtconnect::Updater::getPendingUpdate
Update & getPendingUpdate(const UpdateId &id)
Definition: Updater.h:95
armarx::rrtconnect::Updater::setTrees
void setTrees(const std::vector< std::reference_wrapper< Tree >> &newTrees)
Definition: Updater.h:80
armarx::rrtconnect::Updater::prepareUpdate
void prepareUpdate(Ice::LongSeq dependetOnUpdateIds, LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition: Updater.h:172
Tree.h
newId
auto newId
Definition: GraspingManager.cpp:90
armarx::rrtconnect::Updater::clearPendingUpdates
void clearPendingUpdates()
Definition: Updater.cpp:41
armarx::rrtconnect
Definition: Task.cpp:29
armarx::rrtconnect::Updater::Updater
Updater()=default
armarx::rrtconnect::Updater::applyPendingUpdates
void applyPendingUpdates(LockType &&lock, RemoteUpdateGetter getRemoteUpdate)
Definition: Updater.h:109
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:917
armarx::rrtconnect::Updater::getPendingUpdate
Update & getPendingUpdate(std::size_t id)
Definition: Updater.h:100
armarx::rrtconnect::Updater::applyPendingUpdate
void applyPendingUpdate(Update &u, LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition: Updater.h:135
armarx::armem::server::ltm::util::mongodb::detail::update
bool update(mongocxx::collection &coll, const nlohmann::json &query, const nlohmann::json &update)
Definition: mongodb.cpp:67
ExpressionException.h
armarx::rrtconnect::UpdateId::operator<
bool operator<(const UpdateId &other) const
Definition: Updater.h:55
ARMARX_CHECK_EXPRESSION
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
Definition: ExpressionException.h:73
armarx::rrtconnect::Updater::addPendingUpdate
void addPendingUpdate(const Update &u)
Definition: Updater.cpp:35
armarx::rrtconnect::Updater::setWorkerId
void setWorkerId(Ice::Long newId)
Definition: Updater.cpp:28
armarx::rrtconnect::Updater::hasAppliedUpdate
bool hasAppliedUpdate(Ice::Long workerId, Ice::Long updateSubId) const
Definition: Updater.h:224
armarx::rrtconnect::Updater::canApplyUpdate
bool canApplyUpdate(const Update &u)
Definition: Updater.cpp:61
armarx::rrtconnect::UpdateId::UpdateId
UpdateId(const Update &u)
Definition: Updater.h:45