PeriodicUpdateWorker.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2012-2025, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package RobotStateComponent::
19 * @author Samet Soenmez (uewtt at student dot kit dot edu)
20 * @date 2025t
21 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22 * GNU General Public License
23 */
24
26
27#include <iomanip>
28
29#include <QTimer>
30
33
35{
37 std::shared_ptr<armem::gui::model::MemoryViewerModel> model)
38 {
39 _model = std::move(model);
40
41 }
42
44 {
45 // this function can not be called in the constructur, otherwise
46 // the objects (here timers) will be created in the wrong thread.
47
48 _periodicTimer = new QTimer(this);
49 _periodicTimer->setInterval(1000 / 60); // Keep this stable.
50 connect(_periodicTimer,
51 &QTimer::timeout,
52 this,
53 &This::onPeriodicTimerTimeout);
54
55 _autoUpdateTimer = new QTimer(this);
56 connect(_autoUpdateTimer,
57 &QTimer::timeout,
58 this,
60
61 }
62
64 {
65 _periodicTimer->deleteLater();
66 _periodicTimer = nullptr;
67
68 _autoUpdateTimer->deleteLater();
69 _autoUpdateTimer = nullptr;
70 }
71
73 {
74 _periodicTimer->start();
75 }
76
78 {
79 _periodicTimer->stop();
80 }
81
82 void PeriodicUpdateWorker::onPeriodicTimerTimeout()
83 {
84 updateMemoryReadersWriters();
85 processQueryResults();
86 }
87
89 {
90 _autoUpdateTimer->start();
91 }
92
94 {
95 _autoUpdateTimer->stop();
96 }
97
99 {
100 _autoUpdateTimer->setInterval(freq);
101 }
102
104 {
105 emit showLoadingLabel();
106 startDueQueries();
107 _updateLabelAndTree = true;
108 }
109
110 void PeriodicUpdateWorker::startDueQueries()
111 {
112 armem::client::QueryInput input = _model->queryInput();
113 int recursionDepth = _model->recursionDepth();
114
115 // Can't use a structured binding here because you can't capture those in a lambda
116 // according to the C++ standard.
117 auto enabledMemories = _model->getEnabledMemories();
118 for (const auto& pair : _model->memoryReadersCopy())
119 {
120 // skip if memory should not be queried
121 if (std::find(enabledMemories.begin(), enabledMemories.end(), pair.first) ==
122 enabledMemories.end())
123 {
124 continue;
125 }
126
127 const auto& name = pair.first;
128 const auto& reader = pair.second;
129
130 // skip if query already running
131 if (_runningQueries.count(name) != 0)
132 {
133 continue;
134 }
135
136 // You could pass the query function itself to async here,
137 // but that caused severe template headaches when I tried it.
138 _runningQueries[name] = std::async(
139 std::launch::async,
140 [reader, input, recursionDepth, this]()
141 {
142 // Can't resolve MemoryLinks without data
143 return recursionDepth == 0 || input.dataMode == armem::query::DataMode::NoData
144 ? reader.query(input.toIce())
145 : reader.query(input.toIce(), _model->mns(), recursionDepth);
146 });
147 }
148 }
149
150 void
151 PeriodicUpdateWorker::processQueryResults()
152 {
153 const std::map<std::string, client::QueryResult> results = collectQueryResults();
154
155 int errorCount = 0;
156 if (results.size() > 0)
157 {
158 // this means we have actual query results
159 applyQueryResults(results, &errorCount);
160 _updateLabelAndTree = true;
161 }
162 if (_updateLabelAndTree)
163 {
164 _updateLabelAndTree = false;
165 emit updateLabelAndTree(errorCount);
166 }
167 }
168
169 std::map<std::string, client::QueryResult>
170 PeriodicUpdateWorker::collectQueryResults()
171 {
172
173 TIMING_START(tCollectQueryResults)
174
175 std::map<std::string, client::QueryResult> results;
176 for (auto it = _runningQueries.begin(); it != _runningQueries.end();)
177 {
178 const std::string& name = it->first;
179 std::future<armem::query::data::Result>* queryPromise = &it->second;
180
181 if (queryPromise->wait_for(std::chrono::seconds(0)) == std::future_status::ready)
182 {
183 auto readers = _model->memoryReadersCopy();
184 if (auto jt = readers.find(name); jt != readers.end())
185 {
186 try
187 {
188 results[name] = client::QueryResult::fromIce(queryPromise->get());
189 }
190 catch (const Ice::ConnectionRefusedException&)
191 {
192 // Server is gone (MNS did not know about it yet) => Skip result.
193 }
194 }
195 // else: Server is gone (MNS knew about it) => Skip result.
196
197 // Promise is completed => Clean up in any case.
198 it = _runningQueries.erase(it);
199 }
200 else
201 {
202 ++it; // Uncompleted => Keep.
203 }
204 }
205
206 TIMING_END_STREAM(tCollectQueryResults, ARMARX_VERBOSE)
207 if (_model->debugObserver())
208 {
209 _model->debugObserver()->begin_setDebugChannel(
210 Logging::tag.tagName,
211 {
212 {"t Collect Query Results [ms]",
213 new Variant(tCollectQueryResults.toMilliSecondsDouble())},
214 {"# Collected Query Results", new Variant(static_cast<int>(results.size()))},
215 });
216 }
217
218 return results;
219 }
220
221 void
222 PeriodicUpdateWorker::applyQueryResults(const std::map<std::string, client::QueryResult>& results,
223 int* outErrorCount)
224 {
225 TIMING_START(tProcessQueryResults)
226 auto memoryData = _model->memoryDataCopy();
227 for (const auto& [name, result] : results)
228 {
229 if (result.success)
230 {
231 memoryData[name] = std::move(result.memory);
232 }
233 else
234 {
235 ARMARX_WARNING << "Querying memory server '" << name << "' produced an error: \n"
236 << result.errorMessage;
237 if (outErrorCount)
238 {
239 (*outErrorCount)++;
240 }
241 }
242 }
243
244 // Perhaps remove entries
245 auto enabledMemories = _model->getEnabledMemories();
246 for (auto it = memoryData.begin(); it != memoryData.end();)
247 {
248 // Drop all entries in memoryData which are not in memoryReaders anymore.
249 if (_model->memoryReadersCopy().count(it->first) == 0)
250 {
251 if (_model->dropRemovedMemories())
252 {
253 it = memoryData.erase(it);
254 }
255 else
256 {
257 ++it;
258 }
259 continue;
260 }
261
262 // Drop all entries that are not enabled by user (which means that there is no query result)
263 if (std::find(enabledMemories.begin(), enabledMemories.end(), it->first) ==
264 enabledMemories.end())
265 {
266 if (_model->dropDisabledMemories())
267 {
268 it = memoryData.erase(it);
269 }
270 else
271 {
272 ++it;
273 }
274 continue;
275 }
276
277 // Memory found
278 ++it;
279 }
280 _model->setMemoryData(memoryData);
281
282 TIMING_END_STREAM(tProcessQueryResults, ARMARX_VERBOSE)
283 if (_model->debugObserver())
284 {
285 _model->debugObserver()->begin_setDebugChannel(
286 Logging::tag.tagName,
287 {
288 {"t Process Query Results [ms]",
289 new Variant(tProcessQueryResults.toMilliSecondsDouble())},
290 {"# Processed Query Results", new Variant(static_cast<int>(results.size()))},
291 });
292 }
293 }
294
295 void
296 PeriodicUpdateWorker::updateMemoryReadersWriters()
297 {
298 if (_model->connected() and _model->mns()) // mns must be connected and mns must be available
299 {
300 try
301 {
302
303 auto readers = _model->mns().getAllReaders(true);
304 auto writers = _model->mns().getAllWriters(true);
305 _model->setMemoryReaders(readers);
306 _model->setMemoryWriters(writers);
307
308 std::vector<std::string> activeMemoryNames;
309
310 // add all active memories to update list
311 std::transform(readers.begin(),
312 readers.end(),
313 std::back_inserter(activeMemoryNames),
314 [](const auto& p) { return p.first; });
315
316
317 // only update if memories changed
318 if (activeMemoryNames != _activeMemoryNames)
319 {
320 _activeMemoryNames = activeMemoryNames;
321
322 // convertion into QStringList is necessary for signal/slot in this case
323 QStringList qnames;
324 for (auto &s : activeMemoryNames) {
325 qnames << QString::fromStdString(s);
326 }
327 emit updateQueryWidget(qnames);
328 }
329 }
330 catch (...)
331 {
332 // MNS was killed/stopped
333 // ignore?!
334 }
335 }
336 else
337 {
339 << deactivateSpam()
340 << "MNS not ready yet. Skip update of available memories in query widget.";
341 }
342 }
343
344}
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition Logging.cpp:99
PeriodicUpdateWorker(std::shared_ptr< armem::gui::model::MemoryViewerModel > model)
void updateQueryWidget(QStringList activeMemoryNames)
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
#define TIMING_START(name)
Helper macro to do timing tests.
Definition TimeUtil.h:289
#define TIMING_END_STREAM(name, os)
Prints duration.
Definition TimeUtil.h:310
@ NoData
Just get the structure, but no ARON data.
Definition DataMode.h:8
A query for parts of a memory.
Definition Query.h:24
static QueryResult fromIce(const armem::query::data::Result &ice)
Definition Query.cpp:26