ApproximateTimeQueue.cpp
Go to the documentation of this file.
2
3#include <algorithm>
4#include <cstdint>
5#include <mutex>
6
12
14
16{
17
19 const int64_t dtHistoryLength,
20 const std::set<std::string>& laserSensorIds,
21 MessageCallee& messageCallee,
22 bool useOdometry) :
23 dtEps(std::max<int64_t>(dt, 0)),
24 dtHistoryLength(dtHistoryLength),
25 messageCallee(messageCallee),
26 useOdometry(useOdometry)
27 {
28 if (dt > 0)
29 {
31 << "Requested to throttle processing of data. This is currently not available.";
32 }
33
34 const std::size_t laserMaxQueueSize = 10;
35
37 for (const auto& laserSensorId : laserSensorIds)
38 {
39 ARMARX_INFO << VAROUT(laserSensorId);
40 laserQueues.insert({laserSensorId, LaserTimeQueue(laserMaxQueueSize)});
41 }
42 messageCalleeTask =
43 new SimpleRunningTask<>([this]() { serveMessageCallee(); }, "Serve Message Callee");
44 messageCalleeTask->start();
45 }
46
47 void
49 {
51 // ARMARX_INFO << "Laser data " << message->frame << " ts: " << message->timestamp;
52
53 // ARMARX_CHECK(laserQueues.count(message.frame) > 0);
54 auto entry = laserQueues.find(message.frame);
55 if (entry == laserQueues.end())
56 {
57 ARMARX_INFO << "Received laser data for unknown frame `" << message.frame << "`.";
58 return;
59 }
60 LaserTimeQueue& queue = entry->second;
61
62 try
63 {
64 queue.insert(std::move(message));
65 processAllAvailableData(message.timestamp);
66 }
67 catch (const armarx::LocalException& e)
68 {
69 ARMARX_INFO << "Unable to insert message into queue. Reason: " << e.what();
70 queue.clear();
71 }
72 }
73
74 void
77 {
79 // ARMARX_INFO << "odom data ts:" << odomPose->timestamp;
80 const auto timestamp = odomPose.timestamp;
81
82 odomQueue.insert(std::move(odomPose));
83 processAllAvailableData(timestamp);
84 }
85
86 bool
87 ApproximateTimeQueue::allDataAvailable(const int64_t timestamp) const
88 {
90 auto hasTimestamp = [timestamp](const auto& q) { return q.has(timestamp); };
91
92 if (useOdometry and not hasTimestamp(odomQueue))
93 {
94 return false;
95 }
96
97 // TODO(fabian.reister): all_of(..., hasTimestamp)
98 for (const auto& [sensorName, sensorQueue] : laserQueues)
99 {
100 if (not hasTimestamp(sensorQueue))
101 {
102 return false;
103 }
104 }
105
106 return true;
107 }
108
109 void
110 ApproximateTimeQueue::processAllAvailableData(const int64_t timestamp)
111 {
113
114 // This is inspired by ROS' approximate time synchronizer, but a bit simpler.
115 // It works as follows:
116 // (1) Try to find a pivot element. That is an element that has matches in other queues.
117 // The pivot's timestamp is larger than the timestamp of the matches.
118 // (2) Start at the beginning of the queues, traverse them and check for matches
119 // (3) Once a match is found, it is processed.
120 // (4) All messages older than the pivot element are deleted.
121
122 std::set<int64_t> timestamps;
123
124 const auto insertTimestamps = [&timestamps](std::vector<int64_t> ts)
125 {
126 if (ts.empty())
127 {
128 return;
129 }
130
131 std::copy(ts.begin(), ts.end(), std::inserter(timestamps, timestamps.end()));
132 };
133
134
135 for (const auto& [_, q] : laserQueues)
136 {
137 insertTimestamps(q.timestamps());
138 }
139
140 // std::set is sorted :)
141
142 // this will now go through the queues. whenever a match is found, it is processed
143 // and older messages are dropped.
144 std::for_each(timestamps.begin(),
145 timestamps.end(),
146 [&](const auto& ts) { processAvailableData(ts); });
147 }
148
149 void
150 ApproximateTimeQueue::processAvailableData(const int64_t timestamp)
151 {
153 ARMARX_DEBUG << "Processing available data at " << timestamp;
154
155 const int64_t dt = timestamp - lastProcessedTimestamp;
156
157 if (dt < 0)
158 {
159 ARMARX_DEBUG << "Trying to process a timestamp that is " << dt
160 << " older than last processed timestamp.";
161 return;
162 }
163
164 // only process data at a certain frequency
165 // WARNING: this feature is broken.
166 // if (dt <= dtEps)
167 // {
168 // return;
169 // }
170
171 // process no messages that are too old
172 // trimUntil(timestamp - dtHistoryLength);
173
174 if (not allDataAvailable(timestamp))
175 {
176 return;
177 }
178
179 ARMARX_DEBUG << "Processing approximate time data " << timestamp;
180
181 std::map<std::string, std::size_t> queueSizes;
182 queueSizes["odometry"] = odomQueue.size();
183
184 for (const auto& [sensorName, sensorQueue] : laserQueues)
185 {
186 queueSizes["laser_" + sensorName] = sensorQueue.size();
187 }
188
189 ARMARX_VERBOSE << deactivateSpam(1) << "queue sizes: " << queueSizes;
190
191 lastProcessedTimestamp = timestamp;
192
193 try
194 {
196
197 const Eigen::Isometry3f odom_T_lookup =
198 useOdometry ? odomQueue.lookupInterpolate(timestamp).pose
199 : Eigen::Isometry3f::Identity();
200
201 TimedData timedData;
202 timedData.timestamp = timestamp;
203
204 timedData.odomPose.timestamp = timestamp;
205 timedData.odomPose.pose = odom_T_lookup;
206
207 for (const auto& [sensorName, sensorQueue] : laserQueues)
208 {
210 const auto& message = sensorQueue.lookupAt(timestamp);
211
212 // The lookup timestamp and the message timestamp do not match exaclty.
213 // Therefore, we use the odometry information to "project" the sensor data.
214 const auto odom_T_measurement = odomQueue.lookupInterpolate(message.timestamp).pose;
215 const Eigen::Isometry3f measurement_T_lookup =
216 odom_T_measurement.inverse() * odom_T_lookup;
217
218 LaserMessage laserData;
219 laserData.data = message;
220 laserData.robotPoseCorrection = measurement_T_lookup;
221
222 timedData.laserData.push_back(laserData);
223 }
224
226 timedDataQueue.push(timedData);
227
228 // house keeping
229 trimUntil(timestamp);
230 }
231 catch (...)
232 {
233 return;
234 }
235 }
236
237 void
238 ApproximateTimeQueue::trimUntil(const int64_t& timestamp)
239 {
241
242 odomQueue.trimUntil(timestamp - 1'000'000);
243 for (auto& [sensorName, sensorQueue] : laserQueues)
244 {
245 sensorQueue.trimUntil(timestamp);
246 }
247 }
248
249 void
250 ApproximateTimeQueue::serveMessageCallee()
251 {
253 while (messageCalleeTask and not messageCalleeTask->isStopped())
254 {
255 TimedData timedData;
256 if (const auto status = timedDataQueue.wait_pull(timedData);
257 status == boost::queue_op_status::success)
258 {
260 ARMARX_DEBUG << "Serving message callee with data at " << timedData.timestamp;
261 messageCallee.onTimedDataAvailable(timedData);
262 }
263 else
264 {
265 ARMARX_WARNING << "Queue pull was not successful. "
266 << static_cast<std::underlying_type<boost::queue_op_status>::type>(
267 status);
268 }
269 }
270 }
271
272} // namespace armarx::localization_and_mapping::cartographer_adapter
std::string timestamp()
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
#define VAROUT(x)
constexpr T dt
ApproximateTimeQueue(int64_t dt, int64_t dtHistoryLength, const std::set< std::string > &laserSensorIds, MessageCallee &messageCallee, bool useOdometry)
Construct a new Approximate Time Queue object.
void insertOdomData(armarx::localization_and_mapping::cartographer_adapter::PoseStamped odomPose)
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
#define q
std::vector< T > max(const std::vector< T > &v1, const std::vector< T > &v2)
SimpleRunningTask(Ts...) -> SimpleRunningTask< std::function< void(void)> >
#define ARMARX_TRACE
Definition trace.h:77