25#include <Ice/ObjectAdapter.h>
34 virtual public RobotUnitDataStreaming::Receiver
40 return "RobotUnitDataStreamingReceiver";
64 update_async(
const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
65 const RobotUnitDataStreaming::TimeStepSeq&
data,
66 Ice::Long msgSequenceNbr,
67 const Ice::Current&)
override
74 static_assert(
sizeof(std::uint64_t) ==
sizeof(msgSequenceNbr));
75 const auto seq =
static_cast<std::uint64_t
>(msgSequenceNbr);
83 std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq>
_data;
93 const RobotUnitDataStreaming::Config& cfg) :
100 _receiver->_identity.name = _obj->getName() +
"_RobotUnitDataStreamingReceiver_" +
101 std::to_string(clock_t::now().time_since_epoch().count());
103 auto adapter = _obj->getArmarXManager()->getAdapter();
104 _proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
105 adapter->add(_receiver, _receiver->_identity));
107 _description = _ru->startDataStreaming(_proxy, cfg);
114 _receiver->_discard_data =
true;
115 if (!_description.entries.empty())
119 _ru->stopDataStreaming(_proxy);
123 ARMARX_WARNING <<
"did not stop streaming since the network call failed";
126 auto icemanager = _obj->getArmarXManager()->getIceManager();
127 auto adapter = _obj->getArmarXManager()->getAdapter();
128 adapter->remove(_receiver->_identity);
130 while (icemanager->isObjectReachable(_receiver->_identity.name))
139 std::deque<RobotUnitDataStreaming::TimeStep>&
143 std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq>
data;
145 std::lock_guard g{_receiver->_data_mutex};
146 std::swap(
data, _receiver->_data);
148 _tmp_data_buffer.merge(
data);
151 ARMARX_ERROR <<
"Double message sequence IDs! This should not happen!\nIDs:\n"
154 for (
const auto& [key, _] :
data)
156 out <<
" " << key <<
"\n";
161 auto it = _tmp_data_buffer.begin();
162 for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
164 if (_last_iteration_id == -1)
166 _tmp_data_buffer_seq_id = it->first - 1;
168 if (_tmp_data_buffer_seq_id + 1 != it->first)
170 if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
182 _tmp_data_buffer_seq_id = it->first;
183 for (
auto& step : it->second)
185 if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId)
188 <<
"Missing Iterations or iterations out of order! "
189 <<
"This should not happen. " <<
VAROUT(_last_iteration_id) <<
", "
190 <<
VAROUT(step.iterationId);
192 _last_iteration_id = step.iterationId;
193 _data_buffer.emplace_back(std::move(step));
196 _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
200 const RobotUnitDataStreaming::DataStreamingDescription&
209 std::stringstream
str;
211 str <<
"Received data (" << entr.size() <<
" entries):\n";
212 for (
const auto& [k, v] : entr)
214 str <<
" " << k <<
": type " << v.type <<
" index " << v.index <<
"\n";
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
ManagedIceObject(ManagedIceObject const &other)
const RobotUnitDataStreaming::DataStreamingDescription & getDataDescription() const
RobotUnitDataStreamingReceiver(const ManagedIceObjectPtr &obj, const RobotUnitInterfacePrx &ru, const RobotUnitDataStreaming::Config &cfg)
std::deque< timestep_t > & getDataBuffer()
std::string getDataDescriptionString() const
~RobotUnitDataStreamingReceiver()
void onInitComponent() override
Pure virtual hook for the subclass.
void update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr &ptr, const RobotUnitDataStreaming::TimeStepSeq &data, Ice::Long msgSequenceNbr, const Ice::Current &) override
void onConnectComponent() override
Pure virtual hook for the subclass.
std::atomic_bool _discard_data
std::map< std::uint64_t, RobotUnitDataStreaming::TimeStepSeq > _data
void onExitComponent() override
Hook for subclass.
std::string getDefaultName() const override
Retrieve default name of component.
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_INFO
The normal logging level.
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
#define ARMARX_VERBOSE
The logging level for verbose information.
This file offers overloads of toIce() and fromIce() functions for STL container types.
::IceInternal::ProxyHandle<::IceProxy::armarx::RobotUnitInterface > RobotUnitInterfacePrx
auto make_shared(Args &&... args)
IceInternal::Handle< ManagedIceObject > ManagedIceObjectPtr