25 #include <Ice/ObjectAdapter.h>
34 virtual public RobotUnitDataStreaming::Receiver
40 return "RobotUnitDataStreamingReceiver";
64 update_async(
const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
65 const RobotUnitDataStreaming::TimeStepSeq&
data,
67 const Ice::Current&)
override
74 static_assert(
sizeof(std::uint64_t) ==
sizeof(msgSequenceNbr));
75 const auto seq =
static_cast<std::uint64_t
>(msgSequenceNbr);
78 <<
"received " <<
data.size() <<
" timesteps";
84 std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq>
_data;
94 const RobotUnitDataStreaming::Config& cfg) :
99 _receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>();
101 _receiver->_identity.name = _obj->getName() +
"_RobotUnitDataStreamingReceiver_" +
104 auto adapter = _obj->getArmarXManager()->getAdapter();
105 _proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
106 adapter->add(_receiver, _receiver->_identity));
108 _description = _ru->startDataStreaming(_proxy, cfg);
115 _receiver->_discard_data =
true;
116 if (!_description.entries.empty())
120 _ru->stopDataStreaming(_proxy);
124 ARMARX_WARNING <<
"did not stop streaming since the network call failed";
127 auto icemanager = _obj->getArmarXManager()->getIceManager();
128 auto adapter = _obj->getArmarXManager()->getAdapter();
129 adapter->remove(_receiver->_identity);
131 while (icemanager->isObjectReachable(_receiver->_identity.name))
140 std::deque<RobotUnitDataStreaming::TimeStep>&
144 std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq>
data;
146 std::lock_guard g{_receiver->_data_mutex};
149 _tmp_data_buffer.merge(
data);
152 ARMARX_ERROR <<
"Double message sequence IDs! This should not happen!\nIDs:\n"
155 for (
const auto& [key, _] :
data)
157 out <<
" " << key <<
"\n";
162 auto it = _tmp_data_buffer.begin();
163 for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
165 if (_last_iteration_id == -1)
167 _tmp_data_buffer_seq_id = it->first - 1;
169 if (_tmp_data_buffer_seq_id + 1 != it->first)
171 if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
183 _tmp_data_buffer_seq_id = it->first;
184 for (
auto& step : it->second)
186 if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId)
189 <<
"Missing Iterations or iterations out of order! "
190 <<
"This should not happen. " <<
VAROUT(_last_iteration_id) <<
", "
191 <<
VAROUT(step.iterationId);
193 _last_iteration_id = step.iterationId;
194 _data_buffer.emplace_back(std::move(step));
197 _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
201 const RobotUnitDataStreaming::DataStreamingDescription&
210 std::stringstream
str;
212 str <<
"Received data (" << entr.size() <<
" entries):\n";
213 for (
const auto& [k,
v] : entr)
215 str <<
" " << k <<
": type " <<
v.type <<
" index " <<
v.index <<
"\n";