25 #include <Ice/ObjectAdapter.h>
34 virtual public RobotUnitDataStreaming::Receiver
40 return "RobotUnitDataStreamingReceiver";
64 update_async(
const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
65 const RobotUnitDataStreaming::TimeStepSeq&
data,
67 const Ice::Current&)
override
74 static_assert(
sizeof(std::uint64_t) ==
sizeof(msgSequenceNbr));
75 const auto seq =
static_cast<std::uint64_t
>(msgSequenceNbr);
83 std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq>
_data;
93 const RobotUnitDataStreaming::Config& cfg) :
98 _receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>();
100 _receiver->_identity.name = _obj->getName() +
"_RobotUnitDataStreamingReceiver_" +
103 auto adapter = _obj->getArmarXManager()->getAdapter();
104 _proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
105 adapter->add(_receiver, _receiver->_identity));
107 _description = _ru->startDataStreaming(_proxy, cfg);
114 _receiver->_discard_data =
true;
115 if (!_description.entries.empty())
119 _ru->stopDataStreaming(_proxy);
123 ARMARX_WARNING <<
"did not stop streaming since the network call failed";
126 auto icemanager = _obj->getArmarXManager()->getIceManager();
127 auto adapter = _obj->getArmarXManager()->getAdapter();
128 adapter->remove(_receiver->_identity);
130 while (icemanager->isObjectReachable(_receiver->_identity.name))
139 std::deque<RobotUnitDataStreaming::TimeStep>&
143 std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq>
data;
145 std::lock_guard g{_receiver->_data_mutex};
148 _tmp_data_buffer.merge(
data);
151 ARMARX_ERROR <<
"Double message sequence IDs! This should not happen!\nIDs:\n"
154 for (
const auto& [key, _] :
data)
156 out <<
" " << key <<
"\n";
161 auto it = _tmp_data_buffer.begin();
162 for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
164 if (_last_iteration_id == -1)
166 _tmp_data_buffer_seq_id = it->first - 1;
168 if (_tmp_data_buffer_seq_id + 1 != it->first)
170 if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
182 _tmp_data_buffer_seq_id = it->first;
183 for (
auto& step : it->second)
185 if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId)
188 <<
"Missing Iterations or iterations out of order! "
189 <<
"This should not happen. " <<
VAROUT(_last_iteration_id) <<
", "
190 <<
VAROUT(step.iterationId);
192 _last_iteration_id = step.iterationId;
193 _data_buffer.emplace_back(std::move(step));
196 _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
200 const RobotUnitDataStreaming::DataStreamingDescription&
209 std::stringstream
str;
211 str <<
"Received data (" << entr.size() <<
" entries):\n";
212 for (
const auto& [k,
v] : entr)
214 str <<
" " << k <<
": type " <<
v.type <<
" index " <<
v.index <<
"\n";