RobotUnitDataStreamingReceiver.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @package RobotAPI::ArmarXObjects::RobotUnitDataStreamingReceiver
17 * @author Raphael Grimm ( raphael dot grimm at kit dot edu )
18 * @date 2020
19 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20 * GNU General Public License
21 */
22
24
25#include <Ice/ObjectAdapter.h>
26
29
31{
32 class Receiver :
33 virtual public ManagedIceObject,
34 virtual public RobotUnitDataStreaming::Receiver
35 {
36 public:
37 std::string
38 getDefaultName() const override
39 {
40 return "RobotUnitDataStreamingReceiver";
41 }
42
44 {
45 std::lock_guard g{_data_mutex};
46 }
47
48 void
49 onInitComponent() override
50 {
51 }
52
53 void
55 {
56 }
57
58 void
59 onExitComponent() override
60 {
61 }
62
63 void
64 update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr& ptr,
65 const RobotUnitDataStreaming::TimeStepSeq& data,
66 Ice::Long msgSequenceNbr,
67 const Ice::Current&) override
68 {
69 ptr->ice_response();
70 if (_discard_data)
71 {
72 return;
73 }
74 static_assert(sizeof(std::uint64_t) == sizeof(msgSequenceNbr));
75 const auto seq = static_cast<std::uint64_t>(msgSequenceNbr);
76 std::lock_guard g{_data_mutex};
77 ARMARX_VERBOSE << deactivateSpam() << "received " << data.size() << " timesteps";
78 _data[seq] = data;
79 }
80
81 std::atomic_bool _discard_data = false;
82 std::mutex _data_mutex;
83 std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> _data;
84 Ice::Identity _identity;
85 };
86} // namespace armarx::detail::RobotUnitDataStreamingReceiver
87
88namespace armarx
89{
91 const ManagedIceObjectPtr& obj,
92 const RobotUnitInterfacePrx& ru,
93 const RobotUnitDataStreaming::Config& cfg) :
94 _obj{obj}, _ru{ru}
95 {
99
100 _receiver->_identity.name = _obj->getName() + "_RobotUnitDataStreamingReceiver_" +
101 std::to_string(clock_t::now().time_since_epoch().count());
102
103 auto adapter = _obj->getArmarXManager()->getAdapter();
104 _proxy = RobotUnitDataStreaming::ReceiverPrx::uncheckedCast(
105 adapter->add(_receiver, _receiver->_identity));
106
107 _description = _ru->startDataStreaming(_proxy, cfg);
108 }
109
111 {
112 if (_proxy)
113 {
114 _receiver->_discard_data = true;
115 if (!_description.entries.empty())
116 {
117 try
118 {
119 _ru->stopDataStreaming(_proxy);
120 }
121 catch (...)
122 {
123 ARMARX_WARNING << "did not stop streaming since the network call failed";
124 }
125 }
126 auto icemanager = _obj->getArmarXManager()->getIceManager();
127 auto adapter = _obj->getArmarXManager()->getAdapter();
128 adapter->remove(_receiver->_identity);
129
130 while (icemanager->isObjectReachable(_receiver->_identity.name))
131 {
132 ARMARX_INFO << deactivateSpam() << "waiting until receiver is removed from ice";
133 }
134 }
135 _proxy = nullptr;
136 _receiver = nullptr;
137 }
138
139 std::deque<RobotUnitDataStreaming::TimeStep>&
141 {
142 ARMARX_CHECK_NOT_NULL(_receiver);
143 std::map<std::uint64_t, RobotUnitDataStreaming::TimeStepSeq> data;
144 {
145 std::lock_guard g{_receiver->_data_mutex};
146 std::swap(data, _receiver->_data);
147 }
148 _tmp_data_buffer.merge(data);
149 if (!data.empty())
150 {
151 ARMARX_ERROR << "Double message sequence IDs! This should not happen!\nIDs:\n"
153 {
154 for (const auto& [key, _] : data)
155 {
156 out << " " << key << "\n";
157 }
158 };
159 }
160
161 auto it = _tmp_data_buffer.begin();
162 for (std::size_t idx = 0; it != _tmp_data_buffer.end(); ++it, ++idx)
163 {
164 if (_last_iteration_id == -1)
165 {
166 _tmp_data_buffer_seq_id = it->first - 1;
167 }
168 if (_tmp_data_buffer_seq_id + 1 != it->first)
169 {
170 if (_tmp_data_buffer.size() > 10 && idx < _tmp_data_buffer.size() - 10)
171 {
172 //there is a lot more data (10 updates) in the buffer!
173 //-> some message calls went missing!
174 ARMARX_ERROR << "some update messages went missing!";
175 }
176 else
177 {
178 //maybe one or two frames are missing (due to async calls) -> wait
179 break;
180 }
181 }
182 _tmp_data_buffer_seq_id = it->first;
183 for (auto& step : it->second)
184 {
185 if (_last_iteration_id != -1 && _last_iteration_id + 1 != step.iterationId)
186 {
188 << "Missing Iterations or iterations out of order! "
189 << "This should not happen. " << VAROUT(_last_iteration_id) << ", "
190 << VAROUT(step.iterationId);
191 }
192 _last_iteration_id = step.iterationId;
193 _data_buffer.emplace_back(std::move(step));
194 }
195 }
196 _tmp_data_buffer.erase(_tmp_data_buffer.begin(), it);
197 return _data_buffer;
198 }
199
200 const RobotUnitDataStreaming::DataStreamingDescription&
202 {
203 return _description;
204 }
205
206 std::string
208 {
209 std::stringstream str;
210 const auto& entr = getDataDescription().entries;
211 str << "Received data (" << entr.size() << " entries):\n";
212 for (const auto& [k, v] : entr)
213 {
214 str << " " << k << ": type " << v.type << " index " << v.index << "\n";
215 }
216 return str.str();
217 }
218} // namespace armarx
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
Definition Logging.h:310
#define VAROUT(x)
std::string str(const T &t)
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition Logging.cpp:99
ManagedIceObject(ManagedIceObject const &other)
const RobotUnitDataStreaming::DataStreamingDescription & getDataDescription() const
RobotUnitDataStreamingReceiver(const ManagedIceObjectPtr &obj, const RobotUnitInterfacePrx &ru, const RobotUnitDataStreaming::Config &cfg)
void onInitComponent() override
Pure virtual hook for the subclass.
void update_async(const RobotUnitDataStreaming::AMD_Receiver_updatePtr &ptr, const RobotUnitDataStreaming::TimeStepSeq &data, Ice::Long msgSequenceNbr, const Ice::Current &) override
void onConnectComponent() override
Pure virtual hook for the subclass.
std::map< std::uint64_t, RobotUnitDataStreaming::TimeStepSeq > _data
std::string getDefaultName() const override
Retrieve default name of component.
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
This file offers overloads of toIce() and fromIce() functions for STL container types.
::IceInternal::ProxyHandle<::IceProxy::armarx::RobotUnitInterface > RobotUnitInterfacePrx
auto make_shared(Args &&... args)
IceInternal::Handle< ManagedIceObject > ManagedIceObjectPtr
Definition ArmarXFwd.h:42