Updater.h
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package RobotComponents
19 * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
20 * @date 2015
21 * @copyright http://www.gnu.org/licenses/gpl.txt
22 * GNU General Public License
23 */
24#pragma once
25
26#include <functional>
27#include <map>
28#include <mutex>
29#include <vector>
30
32
33#include <RobotComponents/interface/components/MotionPlanning/Tasks/RRTConnect/DataStructures.h>
34
35#include "Tree.h"
36
37namespace armarx::rrtconnect
38{
39 struct UpdateId
40 {
41 UpdateId(Ice::Long workerId = -1, Ice::Long updateSubId = -1) :
43 {
44 }
45
46 UpdateId(const Update& u) :
48 updateSubId{((workerId >= 0) &&
49 (static_cast<std::size_t>(workerId) < u.dependetOnUpdateIds.size()))
50 ? u.dependetOnUpdateIds.at(workerId) + 1
51 : -1}
52 {
53 }
54
55 bool
56 operator<(const UpdateId& other) const
57 {
58 return (workerId < other.workerId) ||
59 ((workerId == other.workerId) && (updateSubId < other.updateSubId));
60 }
61
62 Ice::Long workerId;
63 Ice::Long updateSubId;
64 };
65
66 class Updater
67 {
68 public:
69 Updater() = default;
70
71 void
72 setWorkerCount(std::size_t count)
73 {
74 if (appliedUpdateIds.size() < count)
75 {
76 appliedUpdateIds.resize(count, -1);
77 }
78 }
79
80 void setWorkerId(Ice::Long newId);
81
82 void
83 setTrees(const std::vector<std::reference_wrapper<Tree>>& newTrees)
84 {
85 trees = newTrees;
86 }
87
88 //pending
89 void addPendingUpdate(const Update& u);
90
92
93 bool
94 hasPendingUpdate(const UpdateId& id) const
95 {
96 return pendingUpdateLookupTable.find(id) != pendingUpdateLookupTable.end();
97 }
98
99 Update&
101 {
102 return getPendingUpdate(pendingUpdateLookupTable.at(id));
103 }
104
105 Update&
106 getPendingUpdate(std::size_t id)
107 {
108 return pendingUpdates.at(id);
109 }
110
111 //applying
112 void applyUpdate(const Update& u);
113
114 template <class LockType, class RemoteUpdateGetter>
115 void
116 applyPendingUpdates(LockType&& lock, RemoteUpdateGetter getRemoteUpdate)
117 {
118 applyPendingUpdates(lock, getRemoteUpdate, [](Update&&) {});
119 }
120
121 template <class LockType, class RemoteUpdateGetter, class UpdateConsumer>
122 void
123 applyPendingUpdates(LockType&& lock,
124 RemoteUpdateGetter getRemoteUpdate,
125 UpdateConsumer updateConsumer)
126 {
128 //we want i for an assert after the loop
129 std::size_t i = 0;
130
131 for (; i < pendingUpdates.size(); ++i)
132 {
133 //apply this pending update
134 applyPendingUpdate(getPendingUpdate(i), lock, getRemoteUpdate, updateConsumer);
136 }
137
139 ARMARX_CHECK_EXPRESSION(i == pendingUpdates.size());
140 //clear updates
142 }
143
144 template <class LockType, class RemoteUpdateGetter, class UpdateConsumer>
145 void
147 LockType&& lock,
148 RemoteUpdateGetter getRemoteUpdate,
149 UpdateConsumer updateConsumer)
150 {
152 ARMARX_CHECK_EXPRESSION(u.workerId >= 0);
153
154 if (u.dependetOnUpdateIds.empty())
155 {
156 //this update was moved from! => it was applied because it was queued out of order)
157 return;
158 }
159
160 ARMARX_CHECK_EXPRESSION(static_cast<std::size_t>(u.workerId) <
161 u.dependetOnUpdateIds.size());
162
163 //is it an update from this node or was it already applied?
164 if (
165 //u.workerId == workerId || //self update //now handled by has applied update
166 hasAppliedUpdate(u) //update was already applied
167 )
168 {
169 return;
170 }
171
172 //this update is new => check if it requires prev updates
173 //(if we have them: apply them directly, else get them from remote)
174 prepareUpdate(u.dependetOnUpdateIds, lock, getRemoteUpdate, updateConsumer);
176
177 //now we have all required updates
179 applyUpdate(u);
180 //we dont need the update anymore. move it to the consumer
181 updateConsumer(std::move(u));
182 //mark the update as moved from
183 u.dependetOnUpdateIds.clear();
184 }
185
186 template <class LockType, class RemoteUpdateGetter, class UpdateConsumer>
187 void
188 prepareUpdate(Ice::LongSeq dependetOnUpdateIds,
189 LockType&& lock,
190 RemoteUpdateGetter getRemoteUpdate,
191 UpdateConsumer updateConsumer)
192 {
194
195 for (Ice::Long workerNodeId = 0;
196 static_cast<std::size_t>(workerNodeId) < dependetOnUpdateIds.size();
197 ++workerNodeId)
198 {
199 const auto updateSubId = dependetOnUpdateIds.at(workerNodeId);
200
201 const UpdateId uId{workerNodeId, updateSubId};
202
203 if (hasAppliedUpdate(uId))
204 {
205 //we have this update!
206 continue;
207 }
208 //the update was not applied
209
210 //is this update local?
211 if (hasPendingUpdate(uId))
212 {
213 //local update => recurse
215 getPendingUpdate(uId), lock, getRemoteUpdate, updateConsumer);
216 }
217 else
218 {
220 //get the update from remote. this may take a while => unlock
221 lock.unlock();
222 Update update = getRemoteUpdate(workerNodeId, updateSubId);
223 lock.lock();
224 //prepare this update and apply it
226 update.dependetOnUpdateIds, lock, getRemoteUpdate, updateConsumer);
229 applyUpdate(update);
230 //we dont need the update anymore. move it to the consumer
231 updateConsumer(std::move(update));
232 //mark the update as moved from
233 update.dependetOnUpdateIds.clear();
234 }
235 }
236 }
237
238 //applied
239 bool
240 hasAppliedUpdate(Ice::Long workerId, Ice::Long updateSubId) const
241 {
242 return (this->workerId == workerId) ||
243 ((static_cast<std::size_t>(workerId) < appliedUpdateIds.size())
244 ? updateSubId <= appliedUpdateIds.at(workerId)
245 : false);
246 }
247
248 bool
249 hasAppliedUpdate(const UpdateId& id) const
250 {
251 return hasAppliedUpdate(id.workerId, id.updateSubId);
252 }
253
254 bool canApplyUpdate(const Update& u);
255
256 const std::vector<Ice::Long>&
258 {
259 return appliedUpdateIds;
260 }
261
262 private:
263 std::vector<std::reference_wrapper<Tree>> trees;
264
265 std::deque<Update> pendingUpdates;
266 std::map<UpdateId, std::size_t> pendingUpdateLookupTable;
267
268 std::vector<Ice::Long> appliedUpdateIds;
269 Ice::Long workerId = -1;
270 };
271} // namespace armarx::rrtconnect
auto newId
bool hasPendingUpdate(const UpdateId &id) const
Definition Updater.h:94
void prepareUpdate(Ice::LongSeq dependetOnUpdateIds, LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition Updater.h:188
bool hasAppliedUpdate(const UpdateId &id) const
Definition Updater.h:249
void applyUpdate(const Update &u)
Definition Updater.cpp:51
void setWorkerCount(std::size_t count)
Definition Updater.h:72
void applyPendingUpdates(LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition Updater.h:123
Update & getPendingUpdate(std::size_t id)
Definition Updater.h:106
void setTrees(const std::vector< std::reference_wrapper< Tree > > &newTrees)
Definition Updater.h:83
const std::vector< Ice::Long > & getAppliedUpdateIds() const
Definition Updater.h:257
void applyPendingUpdate(Update &u, LockType &&lock, RemoteUpdateGetter getRemoteUpdate, UpdateConsumer updateConsumer)
Definition Updater.h:146
void setWorkerId(Ice::Long newId)
Definition Updater.cpp:29
void addPendingUpdate(const Update &u)
Definition Updater.cpp:37
bool canApplyUpdate(const Update &u)
Definition Updater.cpp:66
bool hasAppliedUpdate(Ice::Long workerId, Ice::Long updateSubId) const
Definition Updater.h:240
Update & getPendingUpdate(const UpdateId &id)
Definition Updater.h:100
void applyPendingUpdates(LockType &&lock, RemoteUpdateGetter getRemoteUpdate)
Definition Updater.h:116
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
bool operator<(const UpdateId &other) const
Definition Updater.h:56
UpdateId(const Update &u)
Definition Updater.h:46
UpdateId(Ice::Long workerId=-1, Ice::Long updateSubId=-1)
Definition Updater.h:41