ControlThreadOutputBuffer.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @package RobotAPI::RobotUnit
17 * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
18 * @date 2017
19 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20 * GNU General Public License
21 */
23
24#include <memory>
25#include <utility>
26
27namespace armarx
28{
31 thread_local ControlThreadOutputBuffer* ControlThreadOutputBuffer::RtLoggingInstance{nullptr};
32
35 {
36 ARMARX_CHECK_EXPRESSION(isInitialized);
37 return entries.at(toBounds(writePosition));
38 }
39
40 void
42 {
43 ARMARX_CHECK_EXPRESSION(isInitialized);
44 getWriteBuffer().iteration = writePosition;
45 ++writePosition;
46 }
47
50 {
51 ARMARX_CHECK_EXPRESSION(isInitialized);
52 if (!onePastReadPosition)
53 {
54 //data is not initialized
55 return entries.back();
56 }
57 return entries.at(toBounds(onePastReadPosition - 1));
58 }
59
60 bool
62 {
63 ARMARX_CHECK_EXPRESSION(isInitialized);
64 std::size_t localWritePosition = this->writePosition;
65 if (onePastReadPosition == localWritePosition)
66 {
67 //already up to date
68 return false;
69 }
70 onePastReadPosition = localWritePosition;
71 return true;
72 }
73
74 void
76 {
77 ARMARX_CHECK_EXPRESSION(isInitialized);
78 onePastLoggingReadPosition = writePosition.load();
79 }
80
81 void
83 {
85 ARMARX_CHECK(isInitialized);
86 ARMARX_DEBUG << VAROUT(entries.size());
87 const size_t writePosition_local = writePosition.load(); // copy to prevent external changes
88 if (writePosition_local - onePastLoggingReadPosition >= numEntries)
89 {
91 << "There are " << writePosition_local - onePastLoggingReadPosition
92 << " unlogged entries, but only the last " << numEntries
93 << " are saved! "
94 "There seems to be something wrong (e.g. the rt logging threw an exception, "
95 "the system load is too high or the logging takes to long). "
96 "The log position will be reset to the newest entry!";
98 }
99 //the number of new entries
100 auto numNewEntries = writePosition_local - onePastLoggingReadPosition;
101
102 if (numNewEntries >= numEntries)
103 {
104 ARMARX_VERBOSE << " more new entries (" << numNewEntries << ") than space ("
105 << numEntries << ") -> Skipping everything else";
106 return;
107 }
108 //consume all
109 const std::size_t num = writePosition_local - onePastLoggingReadPosition;
110 ARMARX_DEBUG << num << " new entries to be treated";
111 for (std::size_t offset = 0; onePastLoggingReadPosition < writePosition_local;
112 ++onePastLoggingReadPosition, ++offset)
113 {
115
117 entries.at(toBounds(onePastLoggingReadPosition));
118 consumer(entry, offset, num);
119 entry.messages.reset(messageBufferSize, messageBufferEntries, entry.iteration);
120
121 //update how many
122 auto newNumNewEntries = writePosition_local - onePastLoggingReadPosition;
123 if (newNumNewEntries * 2 > numEntries)
124 {
125 ARMARX_VERBOSE << deactivateSpam(5) << "RT-Logging is slow! "
126 << "The RT-Thread writes faster new data than the RT-Logging thread "
127 "consumes it! "
128 << " old/new/max number of entries: " << numNewEntries << " /"
129 << newNumNewEntries << " / " << numEntries;
130 }
131 numNewEntries = newNumNewEntries;
132 if (numNewEntries >= numEntries)
133 {
134 ARMARX_VERBOSE << " more new entries (" << numNewEntries << ") than space ("
135 << numEntries << ") -> Skipping everything else";
136 return;
137 }
138 }
139 }
140
141 void
143 size_t numberOfEntriesToLog)
144 {
146 ARMARX_CHECK_EXPRESSION(isInitialized);
147 size_t unloggedEntries = writePosition - onePastLoggingReadPosition;
148 if (unloggedEntries >= numEntries)
149 {
151 << "There are " << unloggedEntries << " unlogged entries, but only the last "
152 << numEntries
153 << " are saved! "
154 "There seems to be something wrong (e.g. the rt logging threw an exception, "
155 "the system load is too high or the logging takes to long). "
156 "The log position will be reset to the newest entry!";
158 unloggedEntries = writePosition - onePastLoggingReadPosition;
159 }
160 if (unloggedEntries > numberOfEntriesToLog)
161 {
162 ARMARX_DEBUG << "not logging all " << unloggedEntries << ". Only the last "
163 << numberOfEntriesToLog << " will be used.";
164
165 const std::size_t onePastLoggingReadPositionLastRead =
166 onePastLoggingReadPosition.load();
167
168 // skip all messages except the latest numberOfEntriesToLog
169 onePastLoggingReadPosition.store(writePosition.load() - numberOfEntriesToLog);
170
171 // reset skipped entries
172 for (std::size_t i = onePastLoggingReadPositionLastRead;
173 i < onePastLoggingReadPosition - 1;
174 i++)
175 {
176 detail::ControlThreadOutputBufferEntry& entry = entries.at(toBounds(i));
177 entry.messages.reset(messageBufferSize, messageBufferEntries, entry.iteration);
178 }
179 }
180
181 // use already existing impl.
182 forEachNewLoggingEntry(std::move(consumer));
183 }
184
185 std::size_t
187 std::size_t initialNumEntries,
190 std::size_t initialMessageBufferSize,
191 std::size_t messageBufferNumberEntries,
192 std::size_t messageBufferMaxSize,
193 std::size_t messageBufferMaxNumberEntries)
194 {
195 ARMARX_CHECK_EXPRESSION(!isInitialized);
196 numEntries = initialNumEntries;
197 messageBufferSize = initialMessageBufferSize;
198 //decide whether to use a triple buffer (in case no rtlogging is used)
199 ARMARX_CHECK_NOT_EQUAL(numEntries, 0);
200 entries.reserve(numEntries);
201 entries.emplace_back(controlDevices,
202 sensorDevices,
203 messageBufferSize,
204 messageBufferNumberEntries,
205 messageBufferMaxSize,
206 messageBufferMaxNumberEntries);
207 for (std::size_t i = 1; i < numEntries; ++i)
208 {
209 entries.emplace_back(entries.at(0));
210 }
211
212 isInitialized = true;
213 return entries.at(0).getDataBufferSize();
214 }
215
217 {
218 if (!isInitialized)
219 {
220 return;
221 }
222 for (Entry& e : entries)
223 {
224 for (auto& s : e.sensors)
225 {
226 s->~SensorValueBase();
227 }
228 for (auto& cs : e.control)
229 {
230 for (auto& c : cs)
231 {
232 c->~ControlTargetBase();
233 }
234 }
235 }
236 }
237
240 {
241 printMsg = true;
242 loggingLevel = lvl;
243 return *this;
244 }
245
248 {
249 printMsg = true;
250 deactivateSpamSec = sec;
251 return *this;
252 }
253
256 {
257 printMsg = true;
258 deactivateSpamTag_ = tag;
259 return *this;
260 }
261
262 void
263 detail::RtMessageLogEntryBase::print(Ice::Int controlThreadId) const
264 {
265 if (printMsg)
266 {
267 (checkLogLevel(loggingLevel))
269 : (*loghelper(file().c_str(), line(), func().c_str())
270 ->setBacktrace(false)
271 ->setTag({"RtLogMessages"})
272 ->setThreadId(controlThreadId))
273 << loggingLevel
274 << ::deactivateSpam(deactivateSpamSec, to_string(deactivateSpamTag_))
275 << format();
276 }
277 }
278
279 const auto PotentiallyMinimizeMember = [](auto& member, bool minimize)
280 { return minimize ? 0 : member; };
281
283 bool minimize) :
284 initialBufferSize{PotentiallyMinimizeMember(other.initialBufferSize, minimize)},
285 initialBufferEntryNumbers{
286 PotentiallyMinimizeMember(other.initialBufferEntryNumbers, minimize)},
287 bufferMaxSize{PotentiallyMinimizeMember(other.bufferMaxSize, minimize)},
288 bufferMaxNumberEntries{PotentiallyMinimizeMember(other.bufferMaxNumberEntries, minimize)},
289 buffer(minimize ? other.buffer.size() + other.maxAlign - 1 - other.bufferSpace
290 : other.buffer.size(),
291 0),
292 bufferSpace{buffer.size()},
293 bufferPlace{buffer.data()},
294 entries(minimize ? other.entriesWritten : other.entries.size(), nullptr),
295 entriesWritten{0},
296 requiredAdditionalBufferSpace{other.requiredAdditionalBufferSpace},
297 requiredAdditionalEntries{other.requiredAdditionalEntries},
298 messagesLost{other.messagesLost},
299 maxAlign{1}
300 {
301 ARMARX_DEBUG << "copying RtMessageLogBuffer with minimize = " << minimize << " "
302 << VAROUT(initialBufferSize) << " " << VAROUT(initialBufferEntryNumbers) << " "
303 << VAROUT(bufferMaxSize) << " " << VAROUT(bufferMaxNumberEntries) << " "
304 << VAROUT(buffer.size()) << " " << VAROUT(other.bufferSpace) << " "
305 << VAROUT(bufferPlace) << " " << VAROUT(entries.size()) << " "
306 << VAROUT(entriesWritten) << " " << VAROUT(requiredAdditionalBufferSpace)
307 << " " << VAROUT(requiredAdditionalEntries) << " " << VAROUT(messagesLost)
308 << " " << VAROUT(maxAlign);
309 for (std::size_t idx = 0; idx < other.entries.size() && other.entries.at(idx); ++idx)
310 {
311 const RtMessageLogEntryBase* entry = other.entries.at(idx);
313 maxAlign = std::max(maxAlign, entry->_alignof());
314 void* place = std::align(entry->_alignof(), entry->_sizeof(), bufferPlace, bufferSpace);
315 const auto hint = ARMARX_STREAM_PRINTER
316 {
317 out << "entry " << idx << " of " << other.entriesWritten
318 << "\nbuffer first = " << static_cast<void*>(&buffer.front())
319 << "\nbuffer last = " << static_cast<void*>(&buffer.back())
320 << "\nbuffer size = " << buffer.size()
321 << "\nbuffer place = " << bufferPlace
322 << "\nbuffer space = " << bufferSpace
323 << "\nentry size = " << entry->_sizeof()
324 << "\nentry align = " << entry->_alignof() << "\n"
325 << "\nother buffer size = " << other.buffer.size()
326 << "\nother buffer space = " << other.bufferSpace
327 << "\nother max align = " << other.maxAlign << "\n"
328 << "\nthis = " << this;
329 };
330 ARMARX_CHECK_NOT_NULL(place) << hint;
331 ARMARX_CHECK_LESS_EQUAL(static_cast<void*>(&buffer.front()), static_cast<void*>(place))
332 << hint;
333 ARMARX_CHECK_LESS_EQUAL(static_cast<void*>(place), static_cast<void*>(&buffer.back()))
334 << hint;
336 static_cast<void*>(static_cast<std::uint8_t*>(place) + entry->_sizeof() - 1),
337 static_cast<void*>(&buffer.back()))
338 << hint;
339 ARMARX_CHECK_LESS_EQUAL(place, &buffer.back());
340 ARMARX_CHECK_LESS(entriesWritten, entries.size());
341 ARMARX_CHECK_EXPRESSION(!entries.at(entriesWritten));
342 entries.at(entriesWritten++) = entry->_placementCopyConstruct(place);
343 bufferSpace -= entry->_sizeof();
344 bufferPlace = static_cast<std::uint8_t*>(place) + entry->_sizeof();
345 }
346 ARMARX_CHECK_EQUAL(entriesWritten, other.entriesWritten);
347 ARMARX_CHECK_EQUAL(maxAlign, other.maxAlign);
348 if (minimize)
349 {
350 ARMARX_CHECK_EQUAL(entriesWritten, entries.size());
351 }
352 }
353
354 void
355 detail::RtMessageLogBuffer::reset(std::size_t& bufferSize,
356 std::size_t& numEntries,
357 std::size_t iterationCount)
358 {
359 deleteAll();
360 if (requiredAdditionalEntries || entries.size() < numEntries)
361 {
362 const auto numExcessEntries =
363 std::max(requiredAdditionalEntries, numEntries - entries.size());
364 const auto requiredSize = entries.size() + numExcessEntries;
365 ARMARX_VERBOSE << "Iteration " << iterationCount << " required "
366 << requiredAdditionalEntries << " | " << numExcessEntries
367 << " additional message entries. \n"
368 << "The requested total number of entries is " << requiredSize << ". \n"
369 << "The current number of entries is " << entries.size() << ". \n"
370 << "The maximal number of entries is "
372 if (requiredSize > getMaximalNumberOfBufferEntries())
373 {
374 ARMARX_VERBOSE << deactivateSpam(1, to_string(requiredSize)) << "Iteration "
375 << iterationCount << " would require " << requiredSize
376 << " message entries, but the maximal number of entries is "
378 }
379 numEntries = std::max(requiredSize, getMaximalNumberOfBufferEntries());
380 entries.resize(numEntries, nullptr);
381 requiredAdditionalEntries = 0;
382 }
383 if (requiredAdditionalBufferSpace)
384 {
385 const auto requiredSpace = buffer.size() + requiredAdditionalBufferSpace;
386 ARMARX_WARNING << "Iteration " << iterationCount << " required "
387 << requiredAdditionalBufferSpace
388 << " additional bytes for messages in the buffer. \n"
389 << "Therefore the new required size is " << requiredSpace << ". \n"
390 << "The current size of the buffer is " << buffer.size() << ". \n"
391 << "The maximal size of the buffer is " << getMaximalBufferSize();
392 if (requiredSpace > getMaximalBufferSize())
393 {
394 ARMARX_WARNING << deactivateSpam(1, to_string(requiredSpace)) << "Iteration "
395 << iterationCount << " would require " << requiredSpace
396 << " bytes for messages buffer, but the maximal buffer size is "
398 }
399 bufferSize = std::min(requiredSpace, getMaximalBufferSize());
400 buffer.resize(bufferSize, 0);
401 requiredAdditionalBufferSpace = 0;
402 messagesLost = 0;
403 bufferSpace = buffer.size();
404 bufferPlace = buffer.data();
405 }
406 maxAlign = 1;
407 }
408
409 void
410 detail::RtMessageLogBuffer::deleteAll()
411 {
412 for (std::size_t idx = 0; idx < entries.size() && entries.at(idx); ++idx)
413 {
414 entries.at(idx)->~RtMessageLogEntryBase();
415 entries.at(idx) = nullptr;
416 }
417 bufferSpace = buffer.size();
418 bufferPlace = buffer.data();
419 entriesWritten = 0;
420 }
421
425 std::size_t messageBufferSize,
426 std::size_t messageBufferNumberEntries,
427 std::size_t messageBufferMaxSize,
428 std::size_t messageBufferMaxNumberEntries) :
429 sensorValuesTimestamp{IceUtil::Time::microSeconds(0)},
430 timeSinceLastIteration{IceUtil::Time::microSeconds(0)},
431 messages{messageBufferSize,
432 messageBufferNumberEntries,
433 messageBufferMaxSize,
434 messageBufferMaxNumberEntries}
435 {
436 //calculate size in bytes for one buffer
437 const std::size_t bytes = [&]
438 {
439 std::size_t maxAlign = 1;
440 std::size_t bytes = 0;
441 for (const SensorDevicePtr& sd : sensorDevices.values())
442 {
443 const auto align = sd->getSensorValue()->_alignof();
444 const auto alignShift = (align - bytes % align) % align;
445 bytes += alignShift;
446 maxAlign = std::max(maxAlign, align);
447
448 bytes += sd->getSensorValue()->_sizeof();
449 }
450 for (const ControlDevicePtr& cd : controlDevices.values())
451 {
452 for (const JointController* ctrl : cd->getJointControllers())
453 {
454 const auto align = ctrl->getControlTarget()->_alignof();
455 const auto alignShift = (align - bytes % align) % align;
456 bytes += alignShift;
457 maxAlign = std::max(maxAlign, align);
458
459 bytes += ctrl->getControlTarget()->_sizeof();
460 }
461 }
462 return bytes + maxAlign - 1;
463 }();
464
465 buffer.resize(bytes, 0);
466 void* place = buffer.data();
467 std::size_t space = buffer.size();
468
469 auto getAlignedPlace = [&space, &place, this](std::size_t bytes, std::size_t alignment)
470 {
471 ARMARX_CHECK_EXPRESSION(std::align(alignment, bytes, place, space));
472 ARMARX_CHECK_LESS(bytes, space);
473 const auto resultPlace = place;
474 place = static_cast<std::uint8_t*>(place) + bytes;
475 space -= bytes;
476 ARMARX_CHECK_LESS_EQUAL(place, &buffer.back());
477 return resultPlace;
478 };
479
480 sensors.reserve(sensorDevices.size());
481 for (const SensorDevicePtr& sd : sensorDevices.values())
482 {
483 const SensorValueBase* sv = sd->getSensorValue();
484 sensors.emplace_back(
485 sv->_placementConstruct(getAlignedPlace(sv->_sizeof(), sv->_alignof())));
486 }
487
488 control.reserve(controlDevices.size());
489 for (const ControlDevicePtr& cd : controlDevices.values())
490 {
491 control.emplace_back();
492 auto& ctargs = control.back();
493 const auto ctrls = cd->getJointControllers();
494 ctargs.reserve(ctrls.size());
495 for (const JointController* ctrl : ctrls)
496 {
497 const ControlTargetBase* ct = ctrl->getControlTarget();
498 ctargs.emplace_back(
499 ct->_placementConstruct(getAlignedPlace(ct->_sizeof(), ct->_alignof())));
500 ctargs.back()->reset();
501 }
502 }
503 }
504
507 bool minimize) :
511 iteration{other.iteration},
512 messages{other.messages, minimize},
513 buffer(other.buffer.size(), 0)
514 {
516 ARMARX_DEBUG << "Copy ControlThreadOutputBufferEntry with parameters: " << VAROUT(minimize)
517 << " " << VAROUT(writeTimestamp) << " " << VAROUT(sensorValuesTimestamp) << " "
518 << VAROUT(timeSinceLastIteration) << " " << VAROUT(iteration) << " "
519 << VAROUT(buffer.size()) << " " << VAROUT(messages.buffer.size()) << " "
520 << VAROUT(messages.entries.size());
521 void* place = buffer.data();
522 std::size_t space = buffer.size();
523
524 auto getAlignedPlace = [&space, &place, this](std::size_t bytes, std::size_t alignment)
525 {
527 ARMARX_CHECK_EXPRESSION(std::align(alignment, bytes, place, space));
528 ARMARX_CHECK_LESS(bytes, space);
529 const auto resultPlace = place;
530 place = static_cast<std::uint8_t*>(place) + bytes;
531 space -= bytes;
532 ARMARX_CHECK_LESS_EQUAL(place, &buffer.back());
533 return resultPlace;
534 };
535
536 //copy sensor values
537 sensors.reserve(other.sensors.size());
538 for (const SensorValueBase* sv : other.sensors)
539 {
541 sensors.emplace_back(
542 sv->_placementCopyConstruct(getAlignedPlace(sv->_sizeof(), sv->_alignof())));
543 }
544
545 //copy control targets
546 control.reserve(other.control.size());
547 for (const auto& cdctargs : other.control)
548 {
550 control.emplace_back();
551 auto& ctargs = control.back();
552 ctargs.reserve(cdctargs.size());
553 for (const ControlTargetBase* ct : cdctargs)
554 {
555 ctargs.emplace_back(
556 ct->_placementCopyConstruct(getAlignedPlace(ct->_sizeof(), ct->_alignof())));
557 }
558 }
559 }
560} // namespace armarx
for(;yybottom<=yytop;yybottom++)
Definition Grammar.cpp:705
bool checkLogLevel(MessageTypeT level)
Definition Logging.cpp:150
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
LogSenderPtr loghelper(const char *file, int line, const char *function)
Definition Logging.cpp:143
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
Definition Logging.h:310
const armarx::LogSender _GlobalDummyLogSender
Dummy instance for faster skipped logging (if verbosity level is lower than selected level) - DO NOT ...
Definition Logging.h:124
std::string space
#define VAROUT(x)
constexpr T c
Brief description of class JointControlTargetBase.
The JointController class represents one joint in one control mode.
This class is pretty much similar to a map.
ValContT const & values() const
The SensorValueBase class.
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_CHECK(expression)
Shortcut for ARMARX_CHECK_EXPRESSION.
#define ARMARX_CHECK_LESS(lhs, rhs)
This macro evaluates whether lhs is less (<) than rhs and if it turns out to be false it will throw a...
#define ARMARX_CHECK_LESS_EQUAL(lhs, rhs)
This macro evaluates whether lhs is less or equal (<=) rhs and if it turns out to be false it will th...
#define ARMARX_CHECK_NOT_EQUAL(lhs, rhs)
This macro evaluates whether lhs is inequal (!=) rhs and if it turns out to be false it will throw an...
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_CHECK_EQUAL(lhs, rhs)
This macro evaluates whether lhs is equal (==) rhs and if it turns out to be false it will throw an E...
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
This file offers overloads of toIce() and fromIce() functions for STL container types.
MessageTypeT
Definition LogSender.h:46
const auto PotentiallyMinimizeMember
const std::string & to_string(const std::string &s)
This file is part of ArmarX.
void * align(size_t alignment, size_t bytes, void *&bufferPlace, size_t &bufferSpace) noexcept
std::size_t initialize(std::size_t numEntries, const KeyValueVector< std::string, ControlDevicePtr > &controlDevices, const KeyValueVector< std::string, SensorDevicePtr > &sensorDevices, std::size_t messageBufferSize, std::size_t messageBufferNumberEntries, std::size_t messageBufferMaxSize, std::size_t messageBufferMaxNumberEntries)
void forEachNewLoggingEntry(ConsumerFunctor consumer)
detail::RtMessageLogEntryBase RtMessageLogEntryBase
void forLatestLoggingEntry(ConsumerFunctor consumer, size_t numberOfEntriesToLog)
std::function< void(const Entry &, std::size_t, std::size_t)> ConsumerFunctor
detail::ControlThreadOutputBufferEntry Entry
std::vector< std::vector< PropagateConst< ControlTargetBase * > > > control
IceUtil::Time writeTimestamp
Timestamp in wall time (never use the virtual time for this)
std::vector< PropagateConst< SensorValueBase * > > sensors
void reset(std::size_t &bufferSize, std::size_t &numEntries, std::size_t iterationCount)
RtMessageLogEntryBase & setLoggingLevel(MessageTypeT lvl)
void print(Ice::Int controlThreadId) const
virtual std::string func() const =0
RtMessageLogEntryBase & deactivateSpamTag(std::uint64_t tag)
virtual std::string file() const =0
virtual std::string format() const =0
virtual std::size_t line() const =0
RtMessageLogEntryBase & deactivateSpam(float sec)
#define ARMARX_TRACE
Definition trace.h:77