19 const int64_t dtHistoryLength,
20 const std::set<std::string>& laserSensorIds,
24 dtHistoryLength(dtHistoryLength),
25 messageCallee(messageCallee),
26 useOdometry(useOdometry)
31 <<
"Requested to throttle processing of data. This is currently not available.";
34 const std::size_t laserMaxQueueSize = 10;
37 for (
const auto& laserSensorId : laserSensorIds)
40 laserQueues.insert({laserSensorId,
LaserTimeQueue(laserMaxQueueSize)});
44 messageCalleeTask->start();
54 auto entry = laserQueues.find(
message.frame);
55 if (entry == laserQueues.end())
65 processAllAvailableData(
message.timestamp);
67 catch (
const armarx::LocalException& e)
69 ARMARX_INFO <<
"Unable to insert message into queue. Reason: " << e.what();
82 odomQueue.
insert(std::move(odomPose));
87 ApproximateTimeQueue::allDataAvailable(
const int64_t
timestamp)
const
92 if (useOdometry and not hasTimestamp(odomQueue))
98 for (
const auto& [sensorName, sensorQueue] : laserQueues)
100 if (not hasTimestamp(sensorQueue))
110 ApproximateTimeQueue::processAllAvailableData(
const int64_t
timestamp)
122 std::set<int64_t> timestamps;
124 const auto insertTimestamps = [×tamps](std::vector<int64_t> ts)
131 std::copy(ts.begin(), ts.end(), std::inserter(timestamps, timestamps.end()));
135 for (
const auto& [_,
q] : laserQueues)
137 insertTimestamps(
q.timestamps());
144 std::for_each(timestamps.begin(),
146 [&](
const auto& ts) { processAvailableData(ts); });
150 ApproximateTimeQueue::processAvailableData(
const int64_t
timestamp)
155 const int64_t
dt =
timestamp - lastProcessedTimestamp;
160 <<
" older than last processed timestamp.";
181 std::map<std::string, std::size_t> queueSizes;
182 queueSizes[
"odometry"] = odomQueue.
size();
184 for (
const auto& [sensorName, sensorQueue] : laserQueues)
186 queueSizes[
"laser_" + sensorName] = sensorQueue.size();
197 const Eigen::Isometry3f odom_T_lookup =
204 timedData.odomPose.timestamp =
timestamp;
205 timedData.odomPose.pose = odom_T_lookup;
207 for (
const auto& [sensorName, sensorQueue] : laserQueues)
215 const Eigen::Isometry3f measurement_T_lookup =
216 odom_T_measurement.inverse() * odom_T_lookup;
218 LaserMessage laserData;
220 laserData.robotPoseCorrection = measurement_T_lookup;
222 timedData.laserData.push_back(laserData);
226 timedDataQueue.push(timedData);
238 ApproximateTimeQueue::trimUntil(
const int64_t&
timestamp)
243 for (
auto& [sensorName, sensorQueue] : laserQueues)
250 ApproximateTimeQueue::serveMessageCallee()
253 while (messageCalleeTask and not messageCalleeTask->isStopped())
256 if (
const auto status = timedDataQueue.wait_pull(timedData);
257 status == boost::queue_op_status::success)
260 ARMARX_DEBUG <<
"Serving message callee with data at " << timedData.timestamp;
266 <<
static_cast<std::underlying_type<boost::queue_op_status>::type
>(