RobotUnitDataStreamingReceiver.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * ArmarX is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 2 as
6  * published by the Free Software Foundation.
7  *
8  * ArmarX is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  *
16  * @package RobotAPI::ArmarXObjects::RobotUnitDataStreamingReceiver
17  * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
18  * @date 2020
19  * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20  * GNU General Public License
21  */
22 
24 
25 #include <Ice/ObjectAdapter.h>
26 
29 
31 {
32  class Receiver :
33  virtual public ManagedIceObject,
34  virtual public RobotUnitDataStreaming::Receiver
35  {
36  public:
37  std::string
38  getDefaultName() const override
39  {
40  return "RobotUnitDataStreamingReceiver";
41  }
42 
44  {
45  std::lock_guard g{_data_mutex};
46  }
47 
48  void
49  onInitComponent() override
50  {
51  }
52 
53  void
54  onConnectComponent() override
55  {
56  }
57 
58  void
59  onExitComponent() override
60  {
61  }
62 
63  void
64  update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
65  const RobotUnitDataStreaming::TimeStepSeq& data,
66  Ice::Long msgSequenceNbr,
67  const Ice::Current&) override
68  {
69  ptr->ice_response();
70  if (_discard_data)
71  {
72  return;
73  }
74  static_assert(sizeof(std::uint64_t) == sizeof(msgSequenceNbr));
75  const auto seq = static_cast<std::uint64_t>(msgSequenceNbr);
76  std::lock_guard g{_data_mutex};
77  ARMARX_VERBOSE << deactivateSpam() << "received " << data.size() << " timesteps";
78  _data[seq] = data;
79  }
80 
81  std::atomic_bool _discard_data = false;
82  std::mutex _data_mutex;
83  std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data;
85  };
86 } // namespace armarx::detail::RobotUnitDataStreamingReceiver
87 
88 namespace armarx
89 {
91  const ManagedIceObjectPtr& obj,
92  const RobotUnitInterfacePrx& ru,
93  const RobotUnitDataStreaming::Config& cfg) :
94  _obj{obj}, _ru{ru}
95  {
98  _receiver = make_shared<detail::RobotUnitDataStreamingReceiver::Receiver>();
99 
100  _receiver->_identity.name = _obj->getName() + "_RobotUnitDataStreamingReceiver_" +
101  std::to_string(clock_t::now().time_since_epoch().count());
102 
103  auto adapter = _obj->getArmarXManager()->getAdapter();
104  _proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
105  adapter->add(_receiver, _receiver->_identity));
106 
107  _description = _ru->startDataStreaming(_proxy, cfg);
108  }
109 
111  {
112  if (_proxy)
113  {
114  _receiver->_discard_data = true;
115  if (!_description.entries.empty())
116  {
117  try
118  {
119  _ru->stopDataStreaming(_proxy);
120  }
121  catch (...)
122  {
123  ARMARX_WARNING << "did not stop streaming since the network call failed";
124  }
125  }
126  auto icemanager = _obj->getArmarXManager()->getIceManager();
127  auto adapter = _obj->getArmarXManager()->getAdapter();
128  adapter->remove(_receiver->_identity);
129 
130  while (icemanager->isObjectReachable(_receiver->_identity.name))
131  {
132  ARMARX_INFO << deactivateSpam() << "waiting until receiver is removed from ice";
133  }
134  }
135  _proxy = nullptr;
136  _receiver = nullptr;
137  }
138 
139  std::deque<RobotUnitDataStreaming::TimeStep>&
141  {
142  ARMARX_CHECK_NOT_NULL(_receiver);
143  std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data;
144  {
145  std::lock_guard g{_receiver->_data_mutex};
146  std::swap(data, _receiver->_data);
147  }
148  _tmp_data_buffer.merge(data);
149  if (!data.empty())
150  {
151  ARMARX_ERROR << "Double message sequence IDs! This should not happen!\nIDs:\n"
153  {
154  for (const auto& [key, _] : data)
155  {
156  out << " " << key << "\n";
157  }
158  };
159  }
160 
161  auto it = _tmp_data_buffer.begin();
162  for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
163  {
164  if (_last_iteration_id == -1)
165  {
166  _tmp_data_buffer_seq_id = it->first - 1;
167  }
168  if (_tmp_data_buffer_seq_id + 1 != it->first)
169  {
170  if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
171  {
172  //there is a lot more data (10 updates) in the buffer!
173  //-> some message calls went missing!
174  ARMARX_ERROR << "some update messages went missing!";
175  }
176  else
177  {
178  //maybe one or two frames are missing (due to async calls) -> wait
179  break;
180  }
181  }
182  _tmp_data_buffer_seq_id = it->first;
183  for (auto& step : it->second)
184  {
185  if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId)
186  {
188  << "Missing Iterations or iterations out of order! "
189  << "This should not happen. " << VAROUT(_last_iteration_id) << ", "
190  << VAROUT(step.iterationId);
191  }
192  _last_iteration_id = step.iterationId;
193  _data_buffer.emplace_back(std::move(step));
194  }
195  }
196  _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
197  return _data_buffer;
198  }
199 
200  const RobotUnitDataStreaming::DataStreamingDescription&
202  {
203  return _description;
204  }
205 
206  std::string
208  {
209  std::stringstream str;
210  const auto& entr = getDataDescription().entries;
211  str << "Received data (" << entr.size() << " entries):\n";
212  for (const auto& [k, v] : entr)
213  {
214  str << " " << k << ": type " << v.type << " index " << v.index << "\n";
215  }
216  return str.str();
217  }
218 } // namespace armarx
armarx::RobotUnitDataStreamingReceiver::getDataDescription
const RobotUnitDataStreaming::DataStreamingDescription & getDataDescription() const
Definition: RobotUnitDataStreamingReceiver.cpp:201
ARMARX_VERBOSE
#define ARMARX_VERBOSE
Definition: Logging.h:187
str
std::string str(const T &t)
Definition: UserAssistedSegmenterGuiWidgetController.cpp:43
armarx::detail::RobotUnitDataStreamingReceiver
Definition: RobotUnitDataStreamingReceiver.cpp:30
ArmarXManager.h
ARMARX_CHECK_NOT_NULL
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
Definition: ExpressionException.h:206
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::onConnectComponent
void onConnectComponent() override
Pure virtual hook for the subclass.
Definition: RobotUnitDataStreamingReceiver.cpp:54
IceInternal::Handle< ManagedIceObject >
RobotUnitDataStreamingReceiver.h
armarx::armem::client::util::swap
void swap(SubscriptionHandle &first, SubscriptionHandle &second)
Definition: SubscriptionHandle.cpp:66
deactivateSpam
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition: Logging.cpp:75
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:570
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::onInitComponent
void onInitComponent() override
Pure virtual hook for the subclass.
Definition: RobotUnitDataStreamingReceiver.cpp:49
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::update_async
void update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr &ptr, const RobotUnitDataStreaming::TimeStepSeq &data, Ice::Long msgSequenceNbr, const Ice::Current &) override
Definition: RobotUnitDataStreamingReceiver.cpp:64
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::_data
std::map< std::uint64_t, RobotUnitDataStreaming::TimeStepSeq > _data
Definition: RobotUnitDataStreamingReceiver.cpp:83
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::_discard_data
std::atomic_bool _discard_data
Definition: RobotUnitDataStreamingReceiver.cpp:81
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:918
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::onExitComponent
void onExitComponent() override
Hook for subclass.
Definition: RobotUnitDataStreamingReceiver.cpp:59
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::_data_mutex
std::mutex _data_mutex
Definition: RobotUnitDataStreamingReceiver.cpp:82
armarx::RobotUnitDataStreamingReceiver::RobotUnitDataStreamingReceiver
RobotUnitDataStreamingReceiver(const ManagedIceObjectPtr &obj, const RobotUnitInterfacePrx &ru, const RobotUnitDataStreaming::Config &cfg)
Definition: RobotUnitDataStreamingReceiver.cpp:90
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::getDefaultName
std::string getDefaultName() const override
Retrieve default name of component.
Definition: RobotUnitDataStreamingReceiver.cpp:38
ARMARX_ERROR
#define ARMARX_ERROR
Definition: Logging.h:196
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::~Receiver
~Receiver()
Definition: RobotUnitDataStreamingReceiver.cpp:43
armarx::to_string
const std::string & to_string(const std::string &s)
Definition: StringHelpers.h:41
armarx::detail::RobotUnitDataStreamingReceiver::Receiver::_identity
Ice::Identity _identity
Definition: RobotUnitDataStreamingReceiver.cpp:84
armarx::RobotUnitDataStreamingReceiver::~RobotUnitDataStreamingReceiver
~RobotUnitDataStreamingReceiver()
Definition: RobotUnitDataStreamingReceiver.cpp:110
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
armarx::ManagedIceObject
The ManagedIceObject is the base class for all ArmarX objects.
Definition: ManagedIceObject.h:162
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:181
VAROUT
#define VAROUT(x)
Definition: StringHelpers.h:198
IceInternal::ProxyHandle<::IceProxy::armarx::RobotUnitInterface >
armarx::Logging::deactivateSpam
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition: Logging.cpp:99
Logging.h
armarx::RobotUnitDataStreamingReceiver::getDataDescriptionString
std::string getDataDescriptionString() const
Definition: RobotUnitDataStreamingReceiver.cpp:207
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
armarx::detail::RobotUnitDataStreamingReceiver::Receiver
Definition: RobotUnitDataStreamingReceiver.cpp:32
armarx
This file offers overloads of toIce() and fromIce() functions for STL container types.
Definition: ArmarXTimeserver.cpp:27
armarx::RobotUnitDataStreamingReceiver::getDataBuffer
std::deque< timestep_t > & getDataBuffer()
Definition: RobotUnitDataStreamingReceiver.cpp:140
ARMARX_STREAM_PRINTER
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
Definition: Logging.h:310