RobotUnitModuleLogging.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @package RobotAPI::ArmarXObjects::RobotUnit
17 * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
18 * @date 2018
19 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20 * GNU General Public License
21 */
22
23
25
26#include <regex>
27
32
34
38
40{
42 {
44 {
46 {
47 t = IceUtil::Time::seconds(0);
48 }
49
50 IceUtil::Time t;
51
52 void
54 {
55 t -= armarx::rtNow();
56 ++n;
57 }
58
59 void
61 {
62 t += armarx::rtNow();
63 }
64
65 double
66 ms() const
67 {
68 return t.toMilliSecondsDouble();
69 }
70
71 std::size_t n = 0;
72 };
73
87 };
88} // namespace armarx::RobotUnitModule::details
89
91{
92 void
93 Logging::addMarkerToRtLog(const SimpleRemoteReferenceCounterBasePtr& token,
94 const std::string& marker,
95 const Ice::Current&)
96 {
98 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
99 std::lock_guard<std::mutex> guard{rtLoggingMutex};
100 if (!rtLoggingEntries.count(token->getId()))
101 {
102 throw InvalidArgumentException{"addMarkerToRtLog called for a nonexistent log"};
103 }
104 rtLoggingEntries.at(token->getId())->marker = marker;
105 }
106
107 SimpleRemoteReferenceCounterBasePtr
108 Logging::startRtLogging(const std::string& formatString,
109 const Ice::StringSeq& loggingNames,
110 const Ice::Current&)
111 {
113 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
114 StringStringDictionary alias;
115 for (const auto& name : loggingNames)
116 {
117 alias.emplace(name, "");
118 }
119 return startRtLoggingWithAliasNames(formatString, alias, Ice::emptyCurrent);
120 }
121
122 void
123 Logging::stopRtLogging(const SimpleRemoteReferenceCounterBasePtr& token, const Ice::Current&)
124 {
126 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
127 std::lock_guard<std::mutex> guard{rtLoggingMutex};
129 try
130 {
131 if (!rtLoggingEntries.count(token->getId()))
132 {
133 throw InvalidArgumentException{"stopRtLogging called for a nonexistent log"};
134 }
135 ARMARX_DEBUG_S << "RobotUnit: stop RtLogging for file "
136 << rtLoggingEntries.at(token->getId())->filename;
137 rtLoggingEntries.at(token->getId())->stopLogging = true;
138 }
139 catch (...)
140 {
141 ARMARX_WARNING << "Error during attempting to stop rt logging";
142 }
143 }
144
145 Ice::StringSeq
146 Logging::getLoggingNames(const Ice::Current&) const
147 {
149 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
150 Ice::StringSeq result;
151 for (const auto& data : sensorDeviceValueMetaData)
152 {
153 for (auto& fieldData : data.fields)
154 {
155 result.push_back(fieldData.name);
156 }
157 }
158 for (const auto& datas : controlDeviceValueMetaData)
159 {
160 for (const auto& data : datas)
161 {
162 for (auto& fieldData : data.fields)
163 {
164 result.push_back(fieldData.name);
165 }
166 }
167 }
168 return result;
169 }
170
171 SimpleRemoteReferenceCounterBasePtr
172 Logging::startRtLoggingWithAliasNames(const std::string& formatString,
173 const StringStringDictionary& aliasNames,
174 const Ice::Current&)
175 {
177 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
178 FileSystemPathBuilder pb{formatString};
179 std::lock_guard<std::mutex> guard{rtLoggingMutex};
180 if (rtLoggingEntries.count(pb.getPath()))
181 {
182 throw InvalidArgumentException{"There already is a logger for the path '" +
183 pb.getPath() + "'"};
184 }
185 rtLoggingEntries[pb.getPath()].reset(new CSVLoggingEntry());
186 auto ptr = rtLoggingEntries[pb.getPath()];
187 CSVLoggingEntry& e = *ptr;
188 e.filename = pb.getPath();
189 pb.createParentDirectories();
190 e.stream.open(e.filename);
191 if (!e.stream)
192 {
193 rtLoggingEntries.erase(pb.getPath());
194 throw LogicError{"RtLogging could not open filestream for '" + pb.getPath() + "'"};
195 }
196 ARMARX_VERBOSE << "Start logging to " << e.filename
197 << ". Names (pattern, replacement name): " << aliasNames;
198
199 std::stringstream header;
200 header << "marker;iteration;timestamp;TimeSinceLastIteration";
201 auto logDev = [&](const std::string& dev)
202 {
204 for (const auto& [key, value] : aliasNames)
205 {
206 if (MatchName(key, dev))
207 {
208 header << ";" << (value.empty() ? dev : value);
209 return true;
210 }
211 }
212 return false;
213 };
214
215 //get logged sensor device values
216 {
218 e.loggedSensorDeviceValues.reserve(sensorDeviceValueMetaData.size());
219 for (const auto& valData : sensorDeviceValueMetaData)
220 {
221 e.loggedSensorDeviceValues.emplace_back();
222 auto& svfieldsFlags = e.loggedSensorDeviceValues.back(); //vv
223 svfieldsFlags.reserve(valData.fields.size());
224 for (const auto& field : valData.fields)
225 {
226 svfieldsFlags.emplace_back(logDev(field.name));
227 }
228 }
229 }
230 //get logged control device values
231 {
233 e.loggedControlDeviceValues.reserve(controlDeviceValueMetaData.size());
234 for (const auto& datas : controlDeviceValueMetaData)
235 {
236 e.loggedControlDeviceValues.emplace_back();
237 auto& deviceCtrlFlags = e.loggedControlDeviceValues.back(); //vv
238 deviceCtrlFlags.reserve(datas.size());
239 for (const auto& valData : datas)
240 {
241 deviceCtrlFlags.emplace_back();
242 auto& ctrlFieldFlags = deviceCtrlFlags.back(); //v
243 ctrlFieldFlags.reserve(valData.fields.size());
244
245 for (const auto& field : valData.fields)
246 {
247 ctrlFieldFlags.emplace_back(logDev(field.name));
248 }
249 }
250 }
251 }
253
254 //write header
255 e.stream << header.str()
256 << std::flush; // newline is written at the beginning of each log line
257 //create and return handle
258 auto block = getArmarXManager()->createSimpleRemoteReferenceCount(
259 [ptr]()
260 {
261 ARMARX_DEBUG_S << "RobotUnit: stop RtLogging for file " << ptr->filename;
262 ptr->stopLogging = true;
263 },
264 e.filename,
265 IceUtil::Time::milliSeconds(100));
266 auto counter = block->getReferenceCounter();
267 block->activateCounting();
268 ARMARX_DEBUG_S << "RobotUnit: start RtLogging for file " << ptr->filename;
269 return counter;
270 }
271
272 void
273 Logging::writeRecentIterationsToFile(const std::string& formatString, const Ice::Current&) const
274 {
276 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
277 std::lock_guard<std::mutex> guard{rtLoggingMutex};
278 FileSystemPathBuilder pb{formatString};
280 std::ofstream outCSV{pb.getPath() + ".csv"};
281 if (!outCSV)
282 {
283 throw LogicError{"writeRecentIterationsToFile could not open filestream for '" +
284 pb.getPath() + ".csv'"};
285 }
286 std::ofstream outMsg{pb.getPath() + ".messages"};
287 if (!outMsg)
288 {
289 throw LogicError{"writeRecentIterationsToFile could not open filestream for '" +
290 pb.getPath() + ".messages'"};
291 }
292 ARMARX_INFO << "writing the last " << backlog.size() << " iterations to " << pb.getPath()
293 << ".{csv, messages}";
294 //write csv header
295 {
296 outCSV << "iteration;timestamp";
297 for (const auto& vs : sensorDeviceValueMetaData)
298 {
299 for (const auto& f : vs.fields)
300 {
301 outCSV << ";" << f.name;
302 }
303 }
304 for (const auto& vvs : controlDeviceValueMetaData)
305 {
306 for (const auto& vs : vvs)
307 {
308 for (const auto& f : vs.fields)
309 {
310 outCSV << ";" << f.name;
311 }
312 }
313 }
314 outCSV << std::endl;
315 }
316
317 for (const ::armarx::detail::ControlThreadOutputBufferEntry& iteration : backlog)
318 {
319 //write csv data
320 {
321 outCSV << iteration.iteration << ";" << iteration.sensorValuesTimestamp;
322 //sens
323 {
324 for (const SensorValueBase* val : iteration.sensors)
325 {
326 for (std::size_t idxField = 0; idxField < val->getNumberOfDataFields();
327 ++idxField)
328 {
329 std::string s;
330 val->getDataFieldAs(idxField, s);
331 outCSV << ";" << s;
332 }
333 }
334 }
335 //ctrl
336 {
337 for (const auto& vals : iteration.control)
338 {
339 for (const ControlTargetBase* val : vals)
340 {
341 for (std::size_t idxField = 0; idxField < val->getNumberOfDataFields();
342 ++idxField)
343 {
344 std::string s;
345 val->getDataFieldAs(idxField, s);
346 outCSV << ";" << s;
347 }
348 }
349 }
350 }
351 outCSV << std::endl;
352 }
353 //write message data
354 {
355 bool atLeastOneMessage = false;
356 for (const ::armarx::detail::RtMessageLogEntryBase* msg :
357 iteration.messages.getEntries())
358 {
359 if (!msg)
360 {
361 break;
362 }
363 outMsg << "[" << msg->getTime().toDateTime() << "] iteration "
364 << iteration.iteration << ":\n"
365 << msg->format() << std::endl;
366 atLeastOneMessage = true;
367 }
368 if (atLeastOneMessage)
369 {
370 outMsg << "\nmessages lost: " << iteration.messages.messagesLost
371 << " (required additional "
372 << iteration.messages.requiredAdditionalBufferSpace << " bytes, "
373 << iteration.messages.requiredAdditionalEntries << " message entries)\n"
374 << std::endl;
375 }
376 }
377 }
378 }
379
380 RobotUnitDataStreaming::DataStreamingDescription
381 Logging::startDataStreaming(const RobotUnitDataStreaming::ReceiverPrx& receiver,
382 const RobotUnitDataStreaming::Config& config,
383 const Ice::Current&)
384 {
386 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
387 if (!receiver)
388 {
389 throw InvalidArgumentException{"Receiver proxy is NULL!"};
390 }
391 std::lock_guard<std::mutex> guard{rtLoggingMutex};
392 if (rtDataStreamingEntry.count(receiver))
393 {
394 throw InvalidArgumentException{"There already is a logger for the given receiver"};
395 }
396
397 RobotUnitDataStreaming::DataStreamingDescription result;
398 DataStreamingEntry& streamingEntry = rtDataStreamingEntry[receiver];
399 getProperty(streamingEntry.rtStreamMaxClientErrors,
400 "RTLogging_StreamingDataMaxClientConnectionFailures");
401
402
403 ARMARX_INFO << "start data streaming to " << receiver->ice_getIdentity().name
404 << ". Values: " << config.loggingNames;
405 auto devMatchesAnyKey = [&](const std::string& dev)
406 {
407 for (const auto& key : config.loggingNames)
408 {
409 if (MatchName(key, dev))
410 {
411 return true;
412 }
413 }
414 return false;
415 };
416
417 const auto handleVal = [&](const ValueMetaData& valData,
418 DataStreamingEntry& streamingEntry,
419 RobotUnitDataStreaming::DataStreamingDescription& descr)
420 -> std::vector<DataStreamingEntry::OutVal>
421 {
423 std::vector<DataStreamingEntry::OutVal> result;
424 result.resize(valData.fields.size());
425 for (std::size_t i = 0; i < valData.fields.size(); ++i)
426 {
427 if (!devMatchesAnyKey(valData.fields.at(i).name))
428 {
429 continue; //do not add to result and skipp during processing
430 }
431 auto& descrEntr = descr.entries[valData.fields.at(i).name];
432 //*INDENT-OFF*
433 // clang-format off
434#define make_case(Type, TName) \
435 (typeid(Type) == *valData.fields.at(i).type) \
436 { \
437 descrEntr.index = streamingEntry.num##TName##s; \
438 descrEntr.type = RobotUnitDataStreaming::NodeType##TName; \
439 result.at(i).idx = streamingEntry.num##TName##s; \
440 result.at(i).value = DataStreamingEntry::ValueT::TName; \
441 ++streamingEntry.num##TName##s; \
442 }
443 if make_case (bool, Bool)
444 else if make_case (Ice::Byte, Byte)
445 else if make_case (Ice::Short, Short)
446 else if make_case (Ice::Int, Int)
447 else if make_case (Ice::Long, Long)
448 else if make_case (Ice::Float, Float)
449 else if make_case (Ice::Double, Double)
450 else if make_case (std::uint16_t, Long)
451 else if make_case (std::uint32_t, Long)
452 else {
454 << "This code sould be unreachable! "
455 "The type of "
456 << valData.fields.at(i).name << " is not handled correctly!";
457 }
458#undef make_case
459 }
460 return result;
461 };
462 //*INDENT-ON*
463 // clang-format on
464 //get logged sensor device values
465 {
467 streamingEntry.sensDevs.reserve(sensorDeviceValueMetaData.size());
468 for (const auto& valData : sensorDeviceValueMetaData)
469 {
470 streamingEntry.sensDevs.emplace_back(handleVal(valData, streamingEntry, result));
471 }
472 }
473 //get logged control device values
474 {
476 streamingEntry.ctrlDevs.reserve(controlDeviceValueMetaData.size());
477 for (const auto& devData : controlDeviceValueMetaData)
478 {
479 streamingEntry.ctrlDevs.emplace_back();
480 auto& ctrlDevEntrs = streamingEntry.ctrlDevs.back();
481 ctrlDevEntrs.reserve(devData.size());
482 for (const auto& valData : devData)
483 {
484 ctrlDevEntrs.emplace_back(handleVal(valData, streamingEntry, result));
485 }
486 }
487 }
488
489 return result;
490 }
491
492 void
493 Logging::stopDataStreaming(const RobotUnitDataStreaming::ReceiverPrx& receiver,
494 const Ice::Current&)
495 {
497 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
498 std::lock_guard<std::mutex> guard{rtLoggingMutex};
499
500 if (rtDataStreamingEntry.count(receiver) == 0u)
501 {
502 ARMARX_INFO << "stopDataStreaming called for a nonexistent log";
503 return;
504 }
505
506 ARMARX_INFO_S << "RobotUnit: request to stop DataStreaming for " << receiver->ice_id();
507 rtDataStreamingEntry.at(receiver).stopStreaming = true;
508 }
509
510 void
511 Logging::_preFinishRunning()
512 {
514 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
515 defaultLogHandle = nullptr;
516 if (rtLoggingTask.joinable())
517 {
518 ARMARX_DEBUG << "shutting down rt logging task";
519 stopRtLoggingTask = true;
520 rtLoggingTask.join();
521 ARMARX_DEBUG << "shutting down rt logging task done";
522 }
523 }
524
525 void
526 Logging::_preFinishControlThreadInitialization()
527 {
529 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
530 controlThreadId = LogSender::getThreadId();
531 ControlThreadOutputBuffer::RtLoggingInstance =
532 &(_module<ControlThreadDataBuffer>().getControlThreadOutputBuffer());
533
534 ARMARX_INFO << "starting rt logging with timestep " << rtLoggingTimestepMs;
535 stopRtLoggingTask = false;
536 Metronome metronome(Duration::MilliSeconds(rtLoggingTimestepMs));
537 rtLoggingTask = std::thread{
538 [this]
539 {
540 using clock_t = std::chrono::steady_clock;
541 const auto now = [] { return clock_t::now(); };
542 Metronome metronome(Duration::MilliSeconds(rtLoggingTimestepMs));
543 while (!stopRtLoggingTask)
544 {
545 const auto start = now();
546 doLogging();
547 const std::uint64_t ms =
548 std::chrono::duration_cast<std::chrono::milliseconds>(now() - start)
549 .count();
550 if (ms > rtLoggingTimestepMs)
551 {
552 ARMARX_WARNING << deactivateSpam(10) << "logging thread took " << ms
553 << " ms > " << rtLoggingTimestepMs
554 << " ms (message printed every 10 seconds)";
555 }
556 metronome.waitForNextTick();
557 }
558 }};
559 }
560
561 void
562 Logging::doLogging()
563 {
565 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
566 std::lock_guard<std::mutex> guard{rtLoggingMutex};
567 const auto now = armarx::rtNow();
568 // entries are removed last
569
570 //remove backlog entries
571 const auto start_time_remove_backlog_entries = armarx::rtNow();
572 {
573 if (rtLoggingBacklogEnabled)
574 {
575 while (!backlog.empty() &&
576 (backlog.front().writeTimestamp + rtLoggingBacklogRetentionTime < now or
577 backlog.size() > rtLoggingBacklogMaxSize))
578 {
579 backlog.pop_front();
580 }
581 }
582 }
583 //log all
584 const auto start_time_log_all = armarx::rtNow();
585 details::DoLoggingDurations dlogdurs;
586 {
587 if (!rtLoggingEntries.empty() || !rtDataStreamingEntry.empty())
588 {
589 ARMARX_DEBUG << deactivateSpam() << "Number of logs " << rtLoggingEntries.size()
590 << '\n'
591 << "Number of streams " << rtDataStreamingEntry.size();
592 }
593
594 if (numberOfEntriesToLog < 1)
595 {
597 .getControlThreadOutputBuffer()
598 .forEachNewLoggingEntry(
599 [this, &dlogdurs, &now](const auto& data, auto i, auto num)
600 {
602 doLogging(dlogdurs, now, data, i, num);
603 });
604 }
605 else
606 { // only log newest entry
608 .getControlThreadOutputBuffer()
609 .forLatestLoggingEntry(
610 [this, &dlogdurs, &now](const auto& data, auto i, auto num)
611 {
613 doLogging(dlogdurs, now, data, i, num);
614 },
615 numberOfEntriesToLog);
616 }
617 }
618 ARMARX_DEBUG << ::deactivateSpam() << "the last " << backlog.size()
619 << " iterations are stored";
620 //flush all files
621 const auto start_time_flush_all_files = armarx::rtNow();
622 {
623 for (auto& pair : rtLoggingEntries)
624 {
625 pair.second->stream << std::flush;
626 }
627 }
628
629 //remove entries
630 const auto start_time_remove_entries = armarx::rtNow();
631 {
633 std::vector<std::string> toRemove;
634 toRemove.reserve(rtLoggingEntries.size());
635 for (auto& [key, value] : rtLoggingEntries)
636 {
637 if (value->stopLogging)
638 {
639 //can't remove the current elemet
640 //(this would invalidate the current iterator)
641 toRemove.emplace_back(key);
642 }
643 }
644 for (const auto& rem : toRemove)
645 {
646 rtLoggingEntries.erase(rem);
647 }
648 }
649 //deal with data streaming
650 const auto start_time_data_streaming = armarx::rtNow();
651 {
653 std::vector<RobotUnitDataStreaming::ReceiverPrx> toRemove;
654 toRemove.reserve(rtDataStreamingEntry.size());
655 for (auto& [prx, data] : rtDataStreamingEntry)
656 {
657 ARMARX_DEBUG_S << data.entryBuffer.size() << " entries to send to "
658 << prx->ice_toString();
659 if (data.stopStreaming)
660 {
661 toRemove.emplace_back(prx);
662 }
663 else
664 {
665 data.send(prx, rtDataStreamingMsgID);
666 }
667 }
668 ++rtDataStreamingMsgID;
669 for (const auto& prx : toRemove)
670 {
671 rtDataStreamingEntry.erase(prx);
672 }
673 }
674 // clang-format off
675 const auto end_time = armarx::rtNow();
676 const auto time_total = (end_time - now).toMilliSecondsDouble();
678 << "rtlogging time required: " << time_total << "ms\n"
679 << " time_remove_backlog_entries "
680 << (start_time_log_all - start_time_remove_backlog_entries).toMilliSecondsDouble() << "ms\n"
681 << " time_log_all "
682 << (start_time_flush_all_files - start_time_log_all).toMilliSecondsDouble() << "ms\n"
683 << " header " << dlogdurs.header.ms() << "ms\t(" << dlogdurs.header.n
684 << " calls)\n"
685 << " csv " << dlogdurs.header_csv.ms() << "ms\t("
686 << dlogdurs.header_csv.n << " calls)\n"
687 << " stream " << dlogdurs.header_stream.ms() << "ms\t("
688 << dlogdurs.header_stream.n << " calls)\n"
689 << " sens " << dlogdurs.sens.ms() << "ms\t(" << dlogdurs.sens.n
690 << " calls)\n"
691 << " csv " << dlogdurs.sens_csv.ms() << "ms\t(" << dlogdurs.sens_csv.n
692 << " calls)\n"
693 << " stream " << dlogdurs.sens_stream.ms() << "ms\t("
694 << dlogdurs.sens_stream.n << " calls)\n"
695 << " per elem " << dlogdurs.sens_stream_elem.ms() << "ms\t("
696 << dlogdurs.sens_stream_elem.n << " calls)\n"
697 << " ctrl " << dlogdurs.ctrl.ms() << "ms\t(" << dlogdurs.ctrl.n
698 << " calls)\n"
699 << " csv " << dlogdurs.ctrl_csv.ms() << "ms\t(" << dlogdurs.ctrl_csv.n
700 << " calls)\n"
701 << " stream " << dlogdurs.ctrl_stream.ms() << "ms\t("
702 << dlogdurs.ctrl_stream.n << " calls)\n"
703 << " per elem " << dlogdurs.ctrl_stream_elem.ms() << "ms\t("
704 << dlogdurs.ctrl_stream_elem.n << " calls)\n"
705 << " backlog " << dlogdurs.backlog.ms() << "ms\t(" << dlogdurs.backlog.n
706 << " calls)\n"
707 << " msg " << dlogdurs.msg.ms() << "ms\t(" << dlogdurs.msg.n
708 << " calls)\n"
709 << " time_flush_all_files "
710 << (start_time_remove_entries - start_time_flush_all_files).toMilliSecondsDouble() << "ms\n"
711 << " time_remove_entries "
712 << (start_time_data_streaming - start_time_remove_entries).toMilliSecondsDouble() << "ms\n"
713 << " time_data_streaming "
714 << (end_time - start_time_data_streaming).toMilliSecondsDouble() << "ms\n";
715 // clang-format on
716 }
717
718 void
719 Logging::doLogging(details::DoLoggingDurations& durations,
720 const IceUtil::Time& now,
722 std::size_t i,
723 std::size_t num)
724 {
725
727
728 // Header.
729 {
730 durations.header.start();
732 //base (marker;iteration;timestamp)
733 if (!rtLoggingEntries.empty())
734 {
735 durations.header_csv.start();
736 for (auto& [_, e] : rtLoggingEntries)
737 {
738 e->stream << "\n"
739 << e->marker << ";" << data.iteration << ";"
740 << data.sensorValuesTimestamp.toMicroSeconds() << ";"
741 << data.timeSinceLastIteration.toMicroSeconds();
742 e->marker.clear();
743 }
744 durations.header_csv.stop();
745 }
746 //streaming
747 if (!rtDataStreamingEntry.empty())
748 {
749 durations.header_stream.start();
750 for (auto& [_, e] : rtDataStreamingEntry)
751 {
752 e.processHeader(data);
753 }
754 durations.header_stream.stop();
755 }
756 durations.header.stop();
757 }
758
759 // Process devices.
760 { // Sensors.
761 {
763 durations.sens.start();
764 //sensors
765 for (std::size_t idxDev = 0; idxDev < data.sensors.size(); ++idxDev)
766 {
767 const SensorValueBase* val = data.sensors.at(idxDev);
768 //dimensions of sensor value (e.g. vel, tor, f_x, f_y, ...)
769 for (std::size_t idxField = 0; idxField < val->getNumberOfDataFields();
770 ++idxField)
771 {
772 if (!rtLoggingEntries.empty())
773 {
774 durations.sens_csv.start();
775 const auto str = val->getDataFieldAs<std::string>(idxField);
776 for (auto& [_, entry] : rtLoggingEntries)
777 {
778 if (entry->loggedSensorDeviceValues.at(idxDev).at(idxField))
779 {
780 entry->stream << ";" << str;
781 }
782 }
783 durations.sens_csv.stop();
784 }
785 if (!rtDataStreamingEntry.empty())
786 {
787 durations.sens_stream.start();
788 for (auto& [_, rtStreamingEntry] : rtDataStreamingEntry)
789 {
790 durations.sens_stream_elem.start();
791 rtStreamingEntry.processSens(*val, idxDev, idxField);
792 durations.sens_stream_elem.stop();
793 }
794 durations.sens_stream.stop();
795 }
796 }
797 }
798 durations.sens.stop();
799 }
800
801 // Controller.
802 {
803 durations.ctrl.start();
805 //joint controllers
806 for (std::size_t idxDev = 0; idxDev < data.control.size(); ++idxDev)
807 {
808 const auto& vals = data.control.at(idxDev);
809 //control value (e.g. v_platform)
810 for (std::size_t idxVal = 0; idxVal < vals.size(); ++idxVal)
811 {
812 const ControlTargetBase* val = vals.at(idxVal);
813 //dimensions of control value (e.g. v_platform_x, v_platform_y, v_platform_rotate)
814 for (std::size_t idxField = 0; idxField < val->getNumberOfDataFields();
815 ++idxField)
816 {
817 if (!rtLoggingEntries.empty())
818 {
819 durations.ctrl_csv.start();
820 std::string str;
821 val->getDataFieldAs(idxField, str); // expensive function
822 for (auto& [_, entry] : rtLoggingEntries)
823 {
824 if (entry->loggedControlDeviceValues.at(idxDev).at(idxVal).at(
825 idxField))
826 {
827 entry->stream << ";" << str;
828 }
829 }
830 durations.ctrl_csv.stop();
831 }
832 if (!rtDataStreamingEntry.empty())
833 {
834 durations.ctrl_stream.start();
835 for (auto& [_, rtStreamingEntry] : rtDataStreamingEntry)
836 {
837 durations.ctrl_stream_elem.start();
838 rtStreamingEntry.processCtrl(*val, idxDev, idxVal, idxField);
839 durations.ctrl_stream_elem.stop();
840 }
841 durations.ctrl_stream.stop();
842 }
843 }
844 }
845 }
846
847 durations.ctrl.stop();
848 }
849 } // namespace armarx::RobotUnitModule
850
851 //finish processing
852 {
853 //store data to backlog
854 {
855 if (rtLoggingBacklogEnabled)
856 {
857 durations.backlog.start();
859 if (data.writeTimestamp + rtLoggingBacklogRetentionTime >= now)
860 {
861 backlog.emplace_back(data, true); //true for minimal copy
862 }
863 durations.backlog.stop();
864 }
865 }
866 //print + reset messages
867 {
868 durations.msg.start();
870 for (const ::armarx::detail::RtMessageLogEntryBase* ptr :
871 data.messages.getEntries())
872 {
873 if (!ptr)
874 {
875 break;
876 }
877 ptr->print(controlThreadId);
878 }
879 durations.msg.stop();
880 }
881 }
882 }
883
884 bool
885 Logging::MatchName(const std::string& pattern, const std::string& name)
886 {
888 if (pattern.empty())
889 {
890 return false;
891 }
892 static const std::regex pattern_regex{R"(^\^?[- ._*a-zA-Z0-9]+\$?$)"};
893 if (!std::regex_match(pattern, pattern_regex))
894 {
895 throw InvalidArgumentException{"Pattern '" + pattern + "' is invalid"};
896 }
897 static const std::regex reg_dot{"[.]"};
898 static const std::regex reg_star{"[*]"};
899 const std::string rpl1 = std::regex_replace(pattern, reg_dot, "\\.");
900 const std::string rpl2 = std::regex_replace(rpl1, reg_star, ".*");
901 const std::regex key_regex{rpl2};
902 return std::regex_search(name, key_regex);
903 }
904
905 void
906 Logging::_postOnInitRobotUnit()
907 {
909 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
910 rtLoggingTimestepMs = getProperty<std::size_t>("RTLogging_PeriodMs");
911 ARMARX_CHECK_LESS(0, rtLoggingTimestepMs) << "The property RTLoggingPeriodMs must not be 0";
912
913 messageBufferSize = getProperty<std::size_t>("RTLogging_MessageBufferSize");
914 messageBufferMaxSize = getProperty<std::size_t>("RTLogging_MaxMessageBufferSize");
915 ARMARX_CHECK_LESS_EQUAL(messageBufferSize, messageBufferMaxSize);
916
917 messageBufferNumberEntries = getProperty<std::size_t>("RTLogging_MessageNumber");
918 messageBufferMaxNumberEntries = getProperty<std::size_t>("RTLogging_MaxMessageNumber");
919 ARMARX_CHECK_LESS_EQUAL(messageBufferNumberEntries, messageBufferMaxNumberEntries);
920
921 rtLoggingBacklogRetentionTime =
922 IceUtil::Time::milliSeconds(getProperty<std::size_t>("RTLogging_KeepIterationsForMs"));
923 rtLoggingBacklogMaxSize = getProperty<std::size_t>("RTLogging_MaxBacklogSize");
924 rtLoggingBacklogEnabled = getProperty<bool>("RTLogging_EnableBacklog");
925 getProperty(numberOfEntriesToLog, "RTLogging_LogLastNMessagesOnly");
927 ARMARX_IMPORTANT << "Initializing RTLogging with the following parameters: "
928 << VAROUT(rtLoggingTimestepMs) << VAROUT(messageBufferSize)
929 << VAROUT(messageBufferMaxSize) << VAROUT(messageBufferNumberEntries)
930 << VAROUT(messageBufferMaxNumberEntries)
931 << VAROUT(rtLoggingBacklogRetentionTime) << VAROUT(rtLoggingBacklogMaxSize)
932 << VAROUT(rtLoggingBacklogEnabled) << VAROUT(numberOfEntriesToLog);
933 }
934
935 void
936 Logging::_postFinishDeviceInitialization()
937 {
939 throwIfInControlThread(BOOST_CURRENT_FUNCTION);
940 //init buffer
941 {
943 std::size_t ctrlThreadPeriodUs =
944 static_cast<std::size_t>(getControlThreadTargetPeriod().toMicroSeconds());
945 std::size_t logThreadPeriodUs = rtLoggingTimestepMs * 1000;
946 std::size_t nBuffers = (logThreadPeriodUs / ctrlThreadPeriodUs + 1) * 100;
947
948 const auto bufferSize =
949 _module<ControlThreadDataBuffer>().getControlThreadOutputBuffer().initialize(
950 nBuffers,
951 _module<Devices>().getControlDevices(),
952 _module<Devices>().getSensorDevices(),
953 messageBufferSize,
954 messageBufferNumberEntries,
955 messageBufferMaxSize,
956 messageBufferMaxNumberEntries);
957 ARMARX_INFO << "RTLogging activated. Using " << nBuffers << " buffers "
958 << "(buffersize = " << bufferSize << " bytes)";
959 }
960 //init logging names + field types
961 {
963 const auto makeValueMetaData = [&](auto* val, const std::string& namePre)
964 {
965 ValueMetaData data;
966 const auto names = val->getDataFieldNames();
967 data.fields.resize(names.size());
968
969 for (std::size_t fieldIdx = 0; fieldIdx < names.size(); ++fieldIdx)
970 {
971 std::string const& fieldName = names[fieldIdx];
972 data.fields.at(fieldIdx).name = namePre + '.' + fieldName;
973 data.fields.at(fieldIdx).type = &(val->getDataFieldType(fieldIdx));
974 }
975 return data;
976 };
977
978 //sensorDevices
979 controlDeviceValueMetaData.reserve(_module<Devices>().getControlDevices().size());
980 for (const auto& cd : _module<Devices>().getControlDevices().values())
981 {
983 controlDeviceValueMetaData.emplace_back();
984 auto& dataForDev = controlDeviceValueMetaData.back();
985 dataForDev.reserve(cd->getJointControllers().size());
986 for (auto jointC : cd->getJointControllers())
987 {
988 dataForDev.emplace_back(makeValueMetaData(jointC->getControlTarget(),
989 "ctrl." + cd->getDeviceName() + "." +
990 jointC->getControlMode()));
991 }
992 }
993 //sensorDevices
994 sensorDeviceValueMetaData.reserve(_module<Devices>().getSensorDevices().size());
995 for (const auto& sd : _module<Devices>().getSensorDevices().values())
996 {
998 sensorDeviceValueMetaData.emplace_back(
999 makeValueMetaData(sd->getSensorValue(), "sens." + sd->getDeviceName()));
1000 }
1001 }
1002 //start logging thread is done in rtinit
1003 //maybe add the default log
1004 {
1006 const auto loggingpath = getProperty<std::string>("RTLogging_DefaultLog").getValue();
1007 if (!loggingpath.empty())
1008 {
1009 defaultLogHandle = startRtLogging(loggingpath, getLoggingNames());
1010 }
1011 }
1012 }
1013
1014 void
1015 Logging::DataStreamingEntry::clearResult()
1016 {
1018 for (auto& e : result)
1019 {
1020 entryBuffer.emplace_back(std::move(e));
1021 }
1022 result.clear();
1023 }
1024
1025 RobotUnitDataStreaming::TimeStep
1026 Logging::DataStreamingEntry::allocateResultElement() const
1027 {
1029 RobotUnitDataStreaming::TimeStep data;
1030 data.bools.resize(numBools);
1031 data.bytes.resize(numBytes);
1032 data.shorts.resize(numShorts);
1033 data.ints.resize(numInts);
1034 data.longs.resize(numLongs);
1035 data.floats.resize(numFloats);
1036 data.doubles.resize(numDoubles);
1037 return data;
1038 }
1039
1040 RobotUnitDataStreaming::TimeStep
1041 Logging::DataStreamingEntry::getResultElement()
1042 {
1044 if (entryBuffer.empty())
1045 {
1046 return allocateResultElement();
1047 }
1048 auto e = std::move(entryBuffer.back());
1049 entryBuffer.pop_back();
1050 return e;
1051 }
1052
1053 void
1054 Logging::DataStreamingEntry::processHeader(const ControlThreadOutputBuffer::Entry& e)
1055 {
1057 if (stopStreaming)
1058 {
1059 return;
1060 }
1061
1062 auto& data = result.emplace_back(getResultElement());
1063
1064 data.iterationId = e.iteration;
1065 data.timestampUSec =
1066 armarx::mapRtTimestampToNonRtTimestamp(e.sensorValuesTimestamp).toMicroSeconds();
1067 data.timesSinceLastIterationUSec = e.timeSinceLastIteration.toMicroSeconds();
1068 }
1069
1070 void
1071 WriteTo(const auto& dentr,
1073 const auto& val,
1074 std::size_t fidx,
1075 auto& data)
1076 {
1078 using enum_t = Logging::DataStreamingEntry::ValueT;
1079 try
1080 {
1082 switch (out.value)
1083 {
1084 case enum_t::Bool:
1085 bool b;
1086 val.getDataFieldAs(fidx, b);
1087 data.bools.at(out.idx) = b;
1088 return;
1089 case enum_t::Byte:
1090 val.getDataFieldAs(fidx, data.bytes.at(out.idx));
1091 return;
1092 case enum_t::Short:
1093 val.getDataFieldAs(fidx, data.shorts.at(out.idx));
1094 return;
1095 case enum_t::Int:
1096 val.getDataFieldAs(fidx, data.ints.at(out.idx));
1097 return;
1098 case enum_t::Long:
1099 val.getDataFieldAs(fidx, data.longs.at(out.idx));
1100 return;
1101 case enum_t::Float:
1102 val.getDataFieldAs(fidx, data.floats.at(out.idx));
1103 return;
1104 case enum_t::Double:
1105 val.getDataFieldAs(fidx, data.doubles.at(out.idx));
1106 return;
1107 case enum_t::Skipped:
1108 return;
1109 }
1110 }
1111 catch (...)
1112 {
1113 ARMARX_ERROR << GetHandledExceptionString() << "\ntype " << static_cast<int>(out.value)
1114 << "\n"
1115 << VAROUT(data.bools.size()) << " " << VAROUT(dentr.numBools) << "\n"
1116 << VAROUT(data.bytes.size()) << " " << VAROUT(dentr.numBytes) << "\n"
1117 << VAROUT(data.shorts.size()) << " " << VAROUT(dentr.numShorts) << "\n"
1118 << VAROUT(data.ints.size()) << " " << VAROUT(dentr.numInts) << "\n"
1119 << VAROUT(data.longs.size()) << " " << VAROUT(dentr.numLongs) << "\n"
1120 << VAROUT(data.floats.size()) << " " << VAROUT(dentr.numFloats) << "\n"
1121 << VAROUT(data.doubles.size()) << " " << VAROUT(dentr.numDoubles);
1122 throw;
1123 }
1124 }
1125
1126 void
1127 Logging::DataStreamingEntry::processCtrl(const ControlTargetBase& val,
1128 std::size_t didx,
1129 std::size_t vidx,
1130 std::size_t fidx)
1131 {
1133 if (stopStreaming)
1134 {
1135 return;
1136 }
1137 auto& data = result.back();
1138 const OutVal& o = ctrlDevs.at(didx).at(vidx).at(fidx);
1139 WriteTo(*this, o, val, fidx, data);
1140 }
1141
1142 void
1143 Logging::DataStreamingEntry::processSens(const SensorValueBase& val,
1144 std::size_t didx,
1145 std::size_t fidx)
1146 {
1148 if (stopStreaming)
1149 {
1150 return;
1151 }
1152 auto& data = result.back();
1153 const OutVal& o = sensDevs.at(didx).at(fidx);
1154 WriteTo(*this, o, val, fidx, data);
1155 }
1156
1157 void
1158 Logging::DataStreamingEntry::send(const RobotUnitDataStreaming::ReceiverPrx& r,
1159 std::uint64_t msgId)
1160 {
1162 const auto start_send = armarx::rtNow();
1163 const auto num_timesteps = result.size();
1164 updateCalls.emplace_back(r->begin_update(result, static_cast<Ice::Long>(msgId)));
1165 const auto start_clear = armarx::rtNow();
1166 clearResult();
1167 const auto end = armarx::rtNow();
1168 ARMARX_DEBUG_S << "Logging::DataStreamingEntry::send"
1169 << "\n update " << (start_clear - start_send).toMilliSecondsDouble()
1170 << "ms (" << num_timesteps << " timesteps)"
1171 << "\n clear " << (end - start_clear).toMilliSecondsDouble() << "ms";
1172
1173 //now execute all ready callbacks
1174 std::size_t i = 0;
1175 for (; i < updateCalls.size(); ++i)
1176 {
1177 try
1178 {
1179 if (!updateCalls.at(i)->isCompleted())
1180 {
1181 break;
1182 }
1183 r->end_update(updateCalls.at(i));
1184 connectionFailures = 0;
1185 }
1186 catch (...)
1187 {
1189 ++connectionFailures;
1190 if (connectionFailures > rtStreamMaxClientErrors)
1191 {
1192 stopStreaming = true;
1193 ARMARX_WARNING_S << "DataStreaming Receiver was not reachable for "
1194 << connectionFailures << " iterations! Removing receiver";
1195 updateCalls.clear();
1196 break;
1197 }
1198 }
1199 }
1200 if (!updateCalls.empty())
1201 {
1202 updateCalls.erase(updateCalls.begin(), updateCalls.begin() + i);
1203 }
1204 }
1205} // namespace armarx::RobotUnitModule
uint8_t data[1]
#define make_case(Type, TName)
#define VAROUT(x)
constexpr long num_timesteps
std::string str(const T &t)
Property< PropertyType > getProperty(const std::string &name)
Brief description of class JointControlTargetBase.
static Duration MilliSeconds(std::int64_t milliSeconds)
Constructs a duration in milliseconds.
Definition Duration.cpp:48
static long getThreadId()
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition Logging.cpp:99
ArmarXManagerPtr getArmarXManager() const
Returns the ArmarX manager used to add and remove components.
Property< PropertyType > getProperty(const std::string &name)
Property creation and retrieval.
SimpleRemoteReferenceCounterBasePtr startRtLoggingWithAliasNames(const std::string &formatString, const StringStringDictionary &aliasNames, const Ice::Current &=Ice::emptyCurrent) override
Starts logging to a CSV file.
Ice::StringSeq getLoggingNames(const Ice::Current &=Ice::emptyCurrent) const override
Returns the names of all loggable data fields.
void writeRecentIterationsToFile(const std::string &formatString, const Ice::Current &=Ice::emptyCurrent) const override
Dumps the backlog of all recent iterations to the given file.
SimpleRemoteReferenceCounterBasePtr startRtLogging(const std::string &formatString, const Ice::StringSeq &loggingNames, const Ice::Current &=Ice::emptyCurrent) override
Starts logging to a CSV file.
void stopRtLogging(const armarx::SimpleRemoteReferenceCounterBasePtr &token, const Ice::Current &=Ice::emptyCurrent) override
Stops logging to the given log.
RobotUnitDataStreaming::DataStreamingDescription startDataStreaming(const RobotUnitDataStreaming::ReceiverPrx &receiver, const RobotUnitDataStreaming::Config &config, const Ice::Current &=Ice::emptyCurrent) override
friend void WriteTo(const auto &dentr, const Logging::DataStreamingEntry::OutVal &out, const auto &val, std::size_t fidx, auto &data)
void stopDataStreaming(const RobotUnitDataStreaming::ReceiverPrx &receiver, const Ice::Current &=Ice::emptyCurrent) override
void addMarkerToRtLog(const SimpleRemoteReferenceCounterBasePtr &token, const std::string &marker, const Ice::Current &=Ice::emptyCurrent) override
Adds a string to the log (it is added in a special column).
T & _module()
Returns this as ref to the given type.
void throwIfInControlThread(const std::string &fnc) const
Throws if the current thread is the ControlThread.
virtual IceUtil::Time getControlThreadTargetPeriod() const =0
The ControlThread's period.
The SensorValueBase class.
virtual std::size_t getNumberOfDataFields() const =0
virtual void getDataFieldAs(std::size_t i, bool &out) const =0
#define ARMARX_CHECK_GREATER(lhs, rhs)
This macro evaluates whether lhs is greater (>) than rhs and if it turns out to be false it will thro...
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_CHECK_LESS(lhs, rhs)
This macro evaluates whether lhs is less (<) than rhs and if it turns out to be false it will throw a...
#define ARMARX_CHECK_LESS_EQUAL(lhs, rhs)
This macro evaluates whether lhs is less or equal (<=) rhs and if it turns out to be false it will th...
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_DEBUG_S
The logging level for output that is only interesting while debugging.
Definition Logging.h:205
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
Definition Logging.h:190
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_INFO_S
Definition Logging.h:202
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
#define ARMARX_WARNING_S
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:213
const simox::meta::IntEnumNames names
std::string GetHandledExceptionString()
IceUtil::Time mapRtTimestampToNonRtTimestamp(const IceUtil::Time &time_monotic_raw)
Definition NonRtTiming.h:46
IceUtil::Time rtNow()
Definition RtTiming.h:40
std::shared_ptr< Value > value()
Definition cxxopts.hpp:855
detail::ControlThreadOutputBufferEntry Entry
Helps to build a path via format strings: All format strings are of the form '{[^{}...
#define ARMARX_TRACE
Definition trace.h:77
#define ARMARX_TRACE_LITE
Definition trace.h:98