ApproximateTimeQueue.cpp
Go to the documentation of this file.
1 #include "ApproximateTimeQueue.h"
2 
3 #include <algorithm>
4 #include <cstdint>
5 #include <mutex>
6 
12 
14 
16 {
17 
19  const int64_t dtHistoryLength,
20  const std::set<std::string>& laserSensorIds,
21  MessageCallee& messageCallee,
22  bool useOdometry) :
23  dtEps(std::max<int64_t>(dt, 0)),
24  dtHistoryLength(dtHistoryLength),
25  messageCallee(messageCallee),
26  useOdometry(useOdometry)
27  {
28  if (dt > 0)
29  {
31  << "Requested to throttle processing of data. This is currently not available.";
32  }
33 
34  const std::size_t laserMaxQueueSize = 10;
35 
37  for (const auto& laserSensorId : laserSensorIds)
38  {
39  ARMARX_INFO << VAROUT(laserSensorId);
40  laserQueues.insert({laserSensorId, LaserTimeQueue(laserMaxQueueSize)});
41  }
42  messageCalleeTask =
43  new SimpleRunningTask<>([this]() { serveMessageCallee(); }, "Serve Message Callee");
44  messageCalleeTask->start();
45  }
46 
47  void
49  {
51  // ARMARX_INFO << "Laser data " << message->frame << " ts: " << message->timestamp;
52 
53  // ARMARX_CHECK(laserQueues.count(message.frame) > 0);
54  auto entry = laserQueues.find(message.frame);
55  if (entry == laserQueues.end())
56  {
57  ARMARX_INFO << "Received laser data for unknown frame `" << message.frame << "`.";
58  return;
59  }
60  LaserTimeQueue& queue = entry->second;
61 
62  try
63  {
64  queue.insert(std::move(message));
65  processAllAvailableData(message.timestamp);
66  }
67  catch (const armarx::LocalException& e)
68  {
69  ARMARX_INFO << "Unable to insert message into queue. Reason: " << e.what();
70  queue.clear();
71  }
72  }
73 
74  void
77  {
79  // ARMARX_INFO << "odom data ts:" << odomPose->timestamp;
80  const auto timestamp = odomPose.timestamp;
81 
82  odomQueue.insert(std::move(odomPose));
83  processAllAvailableData(timestamp);
84  }
85 
86  bool
87  ApproximateTimeQueue::allDataAvailable(const int64_t timestamp) const
88  {
90  auto hasTimestamp = [timestamp](const auto& q) { return q.has(timestamp); };
91 
92  if (useOdometry and not hasTimestamp(odomQueue))
93  {
94  return false;
95  }
96 
97  // TODO(fabian.reister): all_of(..., hasTimestamp)
98  for (const auto& [sensorName, sensorQueue] : laserQueues)
99  {
100  if (not hasTimestamp(sensorQueue))
101  {
102  return false;
103  }
104  }
105 
106  return true;
107  }
108 
109  void
110  ApproximateTimeQueue::processAllAvailableData(const int64_t timestamp)
111  {
112  ARMARX_TRACE;
113 
114  // This is inspired by ROS' approximate time synchronizer, but a bit simpler.
115  // It works as follows:
116  // (1) Try to find a pivot element. That is an element that has matches in other queues.
117  // The pivot's timestamp is larger than the timestamp of the matches.
118  // (2) Start at the beginning of the queues, traverse them and check for matches
119  // (3) Once a match is found, it is processed.
120  // (4) All messages older than the pivot element are deleted.
121 
122  std::set<int64_t> timestamps;
123 
124  const auto insertTimestamps = [&timestamps](std::vector<int64_t> ts)
125  {
126  if (ts.empty())
127  {
128  return;
129  }
130 
131  std::copy(ts.begin(), ts.end(), std::inserter(timestamps, timestamps.end()));
132  };
133 
134 
135  for (const auto& [_, q] : laserQueues)
136  {
137  insertTimestamps(q.timestamps());
138  }
139 
140  // std::set is sorted :)
141 
142  // this will now go through the queues. whenever a match is found, it is processed
143  // and older messages are dropped.
144  std::for_each(timestamps.begin(),
145  timestamps.end(),
146  [&](const auto& ts) { processAvailableData(ts); });
147  }
148 
149  void
150  ApproximateTimeQueue::processAvailableData(const int64_t timestamp)
151  {
152  ARMARX_TRACE;
153  ARMARX_DEBUG << "Processing available data at " << timestamp;
154 
155  const int64_t dt = timestamp - lastProcessedTimestamp;
156 
157  if (dt < 0)
158  {
159  ARMARX_DEBUG << "Trying to process a timestamp that is " << dt
160  << " older than last processed timestamp.";
161  return;
162  }
163 
164  // only process data at a certain frequency
165  // WARNING: this feature is broken.
166  // if (dt <= dtEps)
167  // {
168  // return;
169  // }
170 
171  // process no messages that are too old
172  // trimUntil(timestamp - dtHistoryLength);
173 
174  if (not allDataAvailable(timestamp))
175  {
176  return;
177  }
178 
179  ARMARX_DEBUG << "Processing approximate time data " << timestamp;
180 
181  std::map<std::string, std::size_t> queueSizes;
182  queueSizes["odometry"] = odomQueue.size();
183 
184  for (const auto& [sensorName, sensorQueue] : laserQueues)
185  {
186  queueSizes["laser_" + sensorName] = sensorQueue.size();
187  }
188 
189  ARMARX_VERBOSE << deactivateSpam(1) << "queue sizes: " << queueSizes;
190 
191  lastProcessedTimestamp = timestamp;
192 
193  try
194  {
195  ARMARX_TRACE;
196 
197  const Eigen::Isometry3f odom_T_lookup =
198  useOdometry ? odomQueue.lookupInterpolate(timestamp).pose
200 
201  TimedData timedData;
202  timedData.timestamp = timestamp;
203 
204  timedData.odomPose.timestamp = timestamp;
205  timedData.odomPose.pose = odom_T_lookup;
206 
207  for (const auto& [sensorName, sensorQueue] : laserQueues)
208  {
209  ARMARX_TRACE;
210  const auto& message = sensorQueue.lookupAt(timestamp);
211 
212  // The lookup timestamp and the message timestamp do not match exaclty.
213  // Therefore, we use the odometry information to "project" the sensor data.
214  const auto odom_T_measurement = odomQueue.lookupInterpolate(message.timestamp).pose;
215  const Eigen::Isometry3f measurement_T_lookup =
216  odom_T_measurement.inverse() * odom_T_lookup;
217 
218  LaserMessage laserData;
219  laserData.data = message;
220  laserData.robotPoseCorrection = measurement_T_lookup;
221 
222  timedData.laserData.push_back(laserData);
223  }
224 
225  ARMARX_TRACE;
226  timedDataQueue.push(timedData);
227 
228  // house keeping
229  trimUntil(timestamp);
230  }
231  catch (...)
232  {
233  return;
234  }
235  }
236 
237  void
238  ApproximateTimeQueue::trimUntil(const int64_t& timestamp)
239  {
240  ARMARX_TRACE;
241 
242  odomQueue.trimUntil(timestamp - 1'000'000);
243  for (auto& [sensorName, sensorQueue] : laserQueues)
244  {
245  sensorQueue.trimUntil(timestamp);
246  }
247  }
248 
249  void
250  ApproximateTimeQueue::serveMessageCallee()
251  {
252  ARMARX_TRACE;
253  while (messageCalleeTask and not messageCalleeTask->isStopped())
254  {
255  TimedData timedData;
256  if (const auto status = timedDataQueue.wait_pull(timedData);
257  status == boost::queue_op_status::success)
258  {
259  ARMARX_TRACE;
260  ARMARX_DEBUG << "Serving message callee with data at " << timedData.timestamp;
261  messageCallee.onTimedDataAvailable(timedData);
262  }
263  else
264  {
265  ARMARX_WARNING << "Queue pull was not successful. "
266  << static_cast<std::underlying_type<boost::queue_op_status>::type>(
267  status);
268  }
269  }
270  }
271 
272 } // namespace armarx::localization_and_mapping::cartographer_adapter
armarx::SimpleRunningTask
Usage:
Definition: TaskUtil.h:85
ARMARX_VERBOSE
#define ARMARX_VERBOSE
Definition: Logging.h:187
armarx::localization_and_mapping::cartographer_adapter::PoseStamped::timestamp
int64_t timestamp
Definition: InterpolatingTimeQueue.h:46
LocalException.h
armarx::localization_and_mapping::cartographer_adapter::LaserScannerMessage
Definition: types.h:153
Frequency.h
interfaces.h
armarx::max
std::vector< T > max(const std::vector< T > &v1, const std::vector< T > &v2)
Definition: VectorHelpers.h:297
message
message(STATUS "Boost-Library-Dir: " "${Boost_LIBRARY_DIRS}") message(STATUS "Boost-LIBRARIES
Definition: CMakeLists.txt:8
armarx::localization_and_mapping::cartographer_adapter::PoseStamped
Definition: InterpolatingTimeQueue.h:43
ApproximateTimeQueue.h
deactivateSpam
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition: Logging.cpp:75
armarx::localization_and_mapping::cartographer_adapter::ApproximateTimeQueue::insertLaserData
void insertLaserData(LaserScannerMessage message)
Definition: ApproximateTimeQueue.cpp:48
ARMARX_TRACE
#define ARMARX_TRACE
Definition: trace.h:77
armarx::status
status
Definition: FiniteStateMachine.h:244
armarx::localization_and_mapping::cartographer_adapter::TimeQueue::trimUntil
void trimUntil(int64_t timestamp)
Definition: TimeQueue.h:138
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:570
copy
Use of this software is granted under one of the following two to be chosen freely by the user Boost Software License Version Marcin Kalicinski Permission is hereby free of to any person or organization obtaining a copy of the software and accompanying documentation covered by this and transmit the and to prepare derivative works of the and to permit third parties to whom the Software is furnished to do all subject to the including the above license this restriction and the following must be included in all copies of the in whole or in and all derivative works of the unless such copies or derivative works are solely in the form of machine executable object code generated by a source language processor THE SOFTWARE IS PROVIDED AS WITHOUT WARRANTY OF ANY EXPRESS OR INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF FITNESS FOR A PARTICULAR TITLE AND NON INFRINGEMENT IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER WHETHER IN TORT OR ARISING OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE The MIT Marcin Kalicinski Permission is hereby free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to copy
Definition: license.txt:39
armarx::localization_and_mapping::cartographer_adapter::ApproximateTimeQueue::insertOdomData
void insertOdomData(armarx::localization_and_mapping::cartographer_adapter::PoseStamped odomPose)
Definition: ApproximateTimeQueue.cpp:75
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:184
armarx::localization_and_mapping::cartographer_adapter::TimeQueue::insert
void insert(MessageType message)
Definition: TimeQueue.h:67
timestamp
std::string timestamp()
Definition: CartographerAdapter.cpp:85
Metronome.h
ARMARX_ERROR
#define ARMARX_ERROR
Definition: Logging.h:196
armarx::localization_and_mapping::cartographer_adapter
This file is part of ArmarX.
Definition: ApproximateTimeQueue.cpp:15
armarx::localization_and_mapping::cartographer_adapter::ApproximateTimeQueue::ApproximateTimeQueue
ApproximateTimeQueue(int64_t dt, int64_t dtHistoryLength, const std::set< std::string > &laserSensorIds, MessageCallee &messageCallee, bool useOdometry)
Construct a new Approximate Time Queue object.
Definition: ApproximateTimeQueue.cpp:18
q
#define q
ExpressionException.h
armarx::localization_and_mapping::cartographer_adapter::PoseStamped::pose
Eigen::Isometry3f pose
Definition: InterpolatingTimeQueue.h:45
armarx::localization_and_mapping::cartographer_adapter::MessageCallee
Definition: interfaces.h:43
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:181
std
Definition: Application.h:66
VAROUT
#define VAROUT(x)
Definition: StringHelpers.h:198
armarx::localization_and_mapping::cartographer_adapter::TimeQueue
Definition: TimeQueue.h:50
armarx::localization_and_mapping::cartographer_adapter::MessageCallee::onTimedDataAvailable
virtual void onTimedDataAvailable(const TimedData &data)=0
armarx::localization_and_mapping::cartographer_adapter::InterpolatingTimeQueue::size
std::size_t size() const noexcept
Definition: InterpolatingTimeQueue.h:88
Logging.h
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
dt
constexpr T dt
Definition: UnscentedKalmanFilterTest.cpp:45
armarx::localization_and_mapping::cartographer_adapter::TimeQueue::clear
void clear()
Definition: TimeQueue.h:158
armarx::localization_and_mapping::cartographer_adapter::InterpolatingTimeQueue::lookupInterpolate
MessageType lookupInterpolate(int64_t timestamp) const
Definition: InterpolatingTimeQueue.h:61