31 thread_local ControlThreadOutputBuffer* ControlThreadOutputBuffer::RtLoggingInstance{
nullptr};
37 return entries.at(toBounds(writePosition));
52 if (!onePastReadPosition)
55 return entries.back();
57 return entries.at(toBounds(onePastReadPosition - 1));
64 std::size_t localWritePosition = this->writePosition;
65 if (onePastReadPosition == localWritePosition)
70 onePastReadPosition = localWritePosition;
78 onePastLoggingReadPosition = writePosition.load();
87 const size_t writePosition_local = writePosition.load();
88 if (writePosition_local - onePastLoggingReadPosition >= numEntries)
91 <<
"There are " << writePosition_local - onePastLoggingReadPosition
92 <<
" unlogged entries, but only the last " << numEntries
94 "There seems to be something wrong (e.g. the rt logging threw an exception, "
95 "the system load is too high or the logging takes to long). "
96 "The log position will be reset to the newest entry!";
100 auto numNewEntries = writePosition_local - onePastLoggingReadPosition;
102 if (numNewEntries >= numEntries)
104 ARMARX_VERBOSE <<
" more new entries (" << numNewEntries <<
") than space ("
105 << numEntries <<
") -> Skipping everything else";
109 const std::size_t num = writePosition_local - onePastLoggingReadPosition;
111 for (std::size_t offset = 0; onePastLoggingReadPosition < writePosition_local;
112 ++onePastLoggingReadPosition, ++offset)
117 entries.at(toBounds(onePastLoggingReadPosition));
118 consumer(entry, offset, num);
122 auto newNumNewEntries = writePosition_local - onePastLoggingReadPosition;
123 if (newNumNewEntries * 2 > numEntries)
126 <<
"The RT-Thread writes faster new data than the RT-Logging thread "
128 <<
" old/new/max number of entries: " << numNewEntries <<
" /"
129 << newNumNewEntries <<
" / " << numEntries;
131 numNewEntries = newNumNewEntries;
132 if (numNewEntries >= numEntries)
134 ARMARX_VERBOSE <<
" more new entries (" << numNewEntries <<
") than space ("
135 << numEntries <<
") -> Skipping everything else";
143 size_t numberOfEntriesToLog)
147 size_t unloggedEntries = writePosition - onePastLoggingReadPosition;
148 if (unloggedEntries >= numEntries)
151 <<
"There are " << unloggedEntries <<
" unlogged entries, but only the last "
154 "There seems to be something wrong (e.g. the rt logging threw an exception, "
155 "the system load is too high or the logging takes to long). "
156 "The log position will be reset to the newest entry!";
158 unloggedEntries = writePosition - onePastLoggingReadPosition;
160 if (unloggedEntries > numberOfEntriesToLog)
162 ARMARX_DEBUG <<
"not logging all " << unloggedEntries <<
". Only the last "
163 << numberOfEntriesToLog <<
" will be used.";
165 const std::size_t onePastLoggingReadPositionLastRead =
166 onePastLoggingReadPosition.load();
169 onePastLoggingReadPosition.store(writePosition.load() - numberOfEntriesToLog);
172 for (std::size_t i = onePastLoggingReadPositionLastRead;
173 i < onePastLoggingReadPosition - 1;
187 std::size_t initialNumEntries,
190 std::size_t initialMessageBufferSize,
191 std::size_t messageBufferNumberEntries,
192 std::size_t messageBufferMaxSize,
193 std::size_t messageBufferMaxNumberEntries)
196 numEntries = initialNumEntries;
197 messageBufferSize = initialMessageBufferSize;
200 entries.reserve(numEntries);
201 entries.emplace_back(controlDevices,
204 messageBufferNumberEntries,
205 messageBufferMaxSize,
206 messageBufferMaxNumberEntries);
207 for (std::size_t i = 1; i < numEntries; ++i)
209 entries.emplace_back(entries.at(0));
212 isInitialized =
true;
213 return entries.at(0).getDataBufferSize();
222 for (
Entry& e : entries)
224 for (
auto&
s : e.sensors)
226 s->~SensorValueBase();
228 for (
auto& cs : e.control)
232 c->~ControlTargetBase();
250 deactivateSpamSec = sec;
258 deactivateSpamTag_ =
tag;
269 : (*
loghelper(file().c_str(), line(), func().c_str())
270 ->setBacktrace(
false)
271 ->setTag({
"RtLogMessages"})
272 ->setThreadId(controlThreadId))
280 {
return minimize ? 0 : member; };
285 initialBufferEntryNumbers{
289 buffer(minimize ? other.buffer.size() + other.maxAlign - 1 - other.bufferSpace
290 : other.buffer.size(),
292 bufferSpace{buffer.size()},
293 bufferPlace{buffer.data()},
294 entries(minimize ? other.entriesWritten : other.entries.size(),
nullptr),
296 requiredAdditionalBufferSpace{other.requiredAdditionalBufferSpace},
297 requiredAdditionalEntries{other.requiredAdditionalEntries},
298 messagesLost{other.messagesLost},
301 ARMARX_DEBUG <<
"copying RtMessageLogBuffer with minimize = " << minimize <<
" "
302 <<
VAROUT(initialBufferSize) <<
" " <<
VAROUT(initialBufferEntryNumbers) <<
" "
303 <<
VAROUT(bufferMaxSize) <<
" " <<
VAROUT(bufferMaxNumberEntries) <<
" "
304 <<
VAROUT(buffer.size()) <<
" " <<
VAROUT(other.bufferSpace) <<
" "
305 <<
VAROUT(bufferPlace) <<
" " <<
VAROUT(entries.size()) <<
" "
306 <<
VAROUT(entriesWritten) <<
" " <<
VAROUT(requiredAdditionalBufferSpace)
307 <<
" " <<
VAROUT(requiredAdditionalEntries) <<
" " <<
VAROUT(messagesLost)
308 <<
" " <<
VAROUT(maxAlign);
309 for (std::size_t idx = 0; idx < other.entries.size() && other.entries.at(idx); ++idx)
311 const RtMessageLogEntryBase* entry = other.entries.at(idx);
313 maxAlign =
std::max(maxAlign, entry->_alignof());
314 void* place =
std::align(entry->_alignof(), entry->_sizeof(), bufferPlace, bufferSpace);
317 out <<
"entry " << idx <<
" of " << other.entriesWritten
318 <<
"\nbuffer first = " <<
static_cast<void*
>(&buffer.front())
319 <<
"\nbuffer last = " <<
static_cast<void*
>(&buffer.back())
320 <<
"\nbuffer size = " << buffer.size()
321 <<
"\nbuffer place = " << bufferPlace
322 <<
"\nbuffer space = " << bufferSpace
323 <<
"\nentry size = " << entry->_sizeof()
324 <<
"\nentry align = " << entry->_alignof() <<
"\n"
325 <<
"\nother buffer size = " << other.buffer.size()
326 <<
"\nother buffer space = " << other.bufferSpace
327 <<
"\nother max align = " << other.maxAlign <<
"\n"
328 <<
"\nthis = " <<
this;
336 static_cast<void*
>(
static_cast<std::uint8_t*
>(place) + entry->_sizeof() - 1),
337 static_cast<void*
>(&buffer.back()))
342 entries.at(entriesWritten++) = entry->_placementCopyConstruct(place);
343 bufferSpace -= entry->_sizeof();
344 bufferPlace =
static_cast<std::uint8_t*
>(place) + entry->_sizeof();
356 std::size_t& numEntries,
357 std::size_t iterationCount)
360 if (requiredAdditionalEntries || entries.size() < numEntries)
362 const auto numExcessEntries =
363 std::max(requiredAdditionalEntries, numEntries - entries.size());
364 const auto requiredSize = entries.size() + numExcessEntries;
366 << requiredAdditionalEntries <<
" | " << numExcessEntries
367 <<
" additional message entries. \n"
368 <<
"The requested total number of entries is " << requiredSize <<
". \n"
369 <<
"The current number of entries is " << entries.size() <<
". \n"
370 <<
"The maximal number of entries is "
371 << getMaximalNumberOfBufferEntries();
372 if (requiredSize > getMaximalNumberOfBufferEntries())
375 << iterationCount <<
" would require " << requiredSize
376 <<
" message entries, but the maximal number of entries is "
377 << getMaximalNumberOfBufferEntries();
379 numEntries =
std::max(requiredSize, getMaximalNumberOfBufferEntries());
380 entries.resize(numEntries,
nullptr);
381 requiredAdditionalEntries = 0;
383 if (requiredAdditionalBufferSpace)
385 const auto requiredSpace = buffer.size() + requiredAdditionalBufferSpace;
387 << requiredAdditionalBufferSpace
388 <<
" additional bytes for messages in the buffer. \n"
389 <<
"Therefore the new required size is " << requiredSpace <<
". \n"
390 <<
"The current size of the buffer is " << buffer.size() <<
". \n"
391 <<
"The maximal size of the buffer is " << getMaximalBufferSize();
392 if (requiredSpace > getMaximalBufferSize())
395 << iterationCount <<
" would require " << requiredSpace
396 <<
" bytes for messages buffer, but the maximal buffer size is "
397 << getMaximalBufferSize();
399 bufferSize =
std::min(requiredSpace, getMaximalBufferSize());
400 buffer.resize(bufferSize, 0);
401 requiredAdditionalBufferSpace = 0;
403 bufferSpace = buffer.size();
404 bufferPlace = buffer.data();
410 detail::RtMessageLogBuffer::deleteAll()
412 for (std::size_t idx = 0; idx < entries.size() && entries.at(idx); ++idx)
414 entries.at(idx)->~RtMessageLogEntryBase();
415 entries.at(idx) =
nullptr;
417 bufferSpace = buffer.size();
418 bufferPlace = buffer.data();
425 std::size_t messageBufferSize,
426 std::size_t messageBufferNumberEntries,
427 std::size_t messageBufferMaxSize,
428 std::size_t messageBufferMaxNumberEntries) :
429 sensorValuesTimestamp{IceUtil::Time::microSeconds(0)},
430 timeSinceLastIteration{IceUtil::Time::microSeconds(0)},
431 messages{messageBufferSize,
432 messageBufferNumberEntries,
433 messageBufferMaxSize,
434 messageBufferMaxNumberEntries}
437 const std::size_t bytes = [&]
439 std::size_t maxAlign = 1;
440 std::size_t bytes = 0;
441 for (
const SensorDevicePtr& sd : sensorDevices.values())
443 const auto align = sd->getSensorValue()->_alignof();
448 bytes += sd->getSensorValue()->_sizeof();
450 for (
const ControlDevicePtr& cd : controlDevices.values())
452 for (
const JointController* ctrl : cd->getJointControllers())
454 const auto align = ctrl->getControlTarget()->_alignof();
459 bytes += ctrl->getControlTarget()->_sizeof();
462 return bytes + maxAlign - 1;
465 buffer.resize(bytes, 0);
466 void* place = buffer.data();
467 std::size_t
space = buffer.size();
469 auto getAlignedPlace = [&
space, &place,
this](std::size_t bytes, std::size_t alignment)
473 const auto resultPlace = place;
474 place =
static_cast<std::uint8_t*
>(place) + bytes;
480 sensors.reserve(sensorDevices.size());
481 for (
const SensorDevicePtr& sd : sensorDevices.values())
483 const SensorValueBase* sv = sd->getSensorValue();
484 sensors.emplace_back(
485 sv->_placementConstruct(getAlignedPlace(sv->_sizeof(), sv->_alignof())));
488 control.reserve(controlDevices.size());
489 for (
const ControlDevicePtr& cd : controlDevices.values())
493 const auto ctrls = cd->getJointControllers();
494 ctargs.reserve(ctrls.size());
495 for (
const JointController* ctrl : ctrls)
497 const ControlTargetBase* ct = ctrl->getControlTarget();
499 ct->_placementConstruct(getAlignedPlace(ct->_sizeof(), ct->_alignof())));
500 ctargs.back()->reset();
509 sensorValuesTimestamp{other.sensorValuesTimestamp},
510 timeSinceLastIteration{other.timeSinceLastIteration},
511 iteration{other.iteration},
512 messages{other.messages, minimize},
513 buffer(other.buffer.size(), 0)
516 ARMARX_DEBUG <<
"Copy ControlThreadOutputBufferEntry with parameters: " <<
VAROUT(minimize)
517 <<
" " <<
VAROUT(writeTimestamp) <<
" " <<
VAROUT(sensorValuesTimestamp) <<
" "
518 <<
VAROUT(timeSinceLastIteration) <<
" " <<
VAROUT(iteration) <<
" "
519 <<
VAROUT(buffer.size()) <<
" " <<
VAROUT(messages.buffer.size()) <<
" "
520 <<
VAROUT(messages.entries.size());
521 void* place = buffer.data();
522 std::size_t
space = buffer.size();
524 auto getAlignedPlace = [&
space, &place,
this](std::size_t bytes, std::size_t alignment)
529 const auto resultPlace = place;
530 place =
static_cast<std::uint8_t*
>(place) + bytes;
537 sensors.reserve(other.sensors.size());
538 for (
const SensorValueBase* sv : other.sensors)
541 sensors.emplace_back(
542 sv->_placementCopyConstruct(getAlignedPlace(sv->_sizeof(), sv->_alignof())));
546 control.reserve(other.control.size());
547 for (
const auto& cdctargs : other.control)
552 ctargs.reserve(cdctargs.size());
553 for (
const ControlTargetBase* ct : cdctargs)
556 ct->_placementCopyConstruct(getAlignedPlace(ct->_sizeof(), ct->_alignof())));