ZeroMQController.cpp
Go to the documentation of this file.
1#include "ZeroMQController.h"
2
3#include <Eigen/Dense>
4
7#include <ArmarXCore/interface/observers/ObserverInterface.h>
8#include <ArmarXCore/interface/observers/VariantBase.h>
10
11// for ARMARX_RT_LOGF_WARN
12#include <chrono>
13#include <cstring>
14#include <mutex>
15#include <string>
16
17#include <boost/date_time/posix_time/posix_time.hpp>
18
19#include <Ice/Current.h>
20#include <Ice/Object.h>
21
22#include <VirtualRobot/VirtualRobot.h>
23
27#include <RobotAPI/interface/units/RobotUnit/NJointController.h>
28#include <RobotAPI/interface/visualization/DebugDrawerInterface.h>
30
35
36#include <nlohmann/json.hpp>
37#include <zmq.h>
38#include <zmq.hpp>
39
41{
42 namespace base = armarx::control::njoint_controller::task_space;
43
44 armarx::NJointControllerRegistration<
49
55
61
67
69 base::NJointTSMixImpVelColController>>
73
74 // Implement getClassName() method for templated NJointZeroMQTaskspaceController
75 template <>
76 std::string
83
84 template <>
85 std::string
92
93 template <>
94 std::string
101
102 template <>
103 std::string
105 class base::NJointTSImpColController>::
106 getClassName(const Ice::Current&) const
107 {
110 }
111
112 template <>
113 std::string
115 class base::NJointTSMixImpVelColController>::
116 getClassName(const Ice::Current&) const
117 {
120 }
121
122 template <typename NJointTaskspaceController>
125 const armarx::NJointControllerConfigPtr& config,
126 const VirtualRobot::RobotPtr& robot) :
127 NJointTaskspaceController{robotUnit, config, robot},
128 context(1),
129 socket(context, ZMQ_PULL),
130 publishSocket_(context, ZMQ_PUSH)
131 {
132 init();
133 }
134
135 // template <>
136 // NJointZeroMQTaskspaceController<
137 // class base::NJointTaskspaceCollisionAvoidanceImpedanceController>::
138 // NJointZeroMQTaskspaceController(const armarx::RobotUnitPtr& robotUnit,
139 // const armarx::NJointControllerConfigPtr& config,
140 // const VirtualRobot::RobotPtr& robot) :
141 // base::NJointTaskspaceImpedanceController{robotUnit, config, robot},
142 // base::NJointTaskspaceCollisionAvoidanceImpedanceController{robotUnit, config, robot},
143 // context(1),
144 // socket(context, ZMQ_PULL),
145 // publishSocket_(context, ZMQ_PUSH)
146 // {
147 // init();
148 // }
149
150 // template <>
151 // NJointZeroMQTaskspaceController<
152 // class base::NJointTaskspaceCollisionAvoidanceMixedImpedanceVelocityController>::
153 // NJointZeroMQTaskspaceController(const armarx::RobotUnitPtr& robotUnit,
154 // const armarx::NJointControllerConfigPtr& config,
155 // const VirtualRobot::RobotPtr& robot) :
156 // base::NJointTaskspaceMixedImpedanceVelocityController{robotUnit, config, robot},
157 // base::NJointTaskspaceCollisionAvoidanceMixedImpedanceVelocityController{robotUnit,
158 // config,
159 // robot},
160 // context(1),
161 // socket(context, ZMQ_PULL),
162 // publishSocket_(context, ZMQ_PUSH)
163 // {
164 // init();
165 // }
166
167 // Method(s) of NJointZeroMQTaskspaceController in general
168 template <typename NJointTaskspaceController>
169 void
171 {
172 // Bind to ZeroMQ socket
173 int port = 5556; // TODO(@rietsch): Make configurable
174 std::string ip_address = "*";
175 std::string bind_address = "tcp://" + ip_address + ":" + std::to_string(port);
176 try
177 {
178 socket.bind(bind_address);
179 ARMARX_INFO << "Controller bound to " << bind_address << " through ZeroMQ";
180 }
181 catch (const zmq::error_t& e)
182 {
183 ARMARX_INFO << "ZeroMQ bind error: " << e.what();
184 return;
185 }
186
187 time_last_packet = std::chrono::steady_clock::now();
188 }
189
190 template <typename NJointTaskspaceController>
191 void
192 NJointZeroMQTaskspaceController<NJointTaskspaceController>::setPublishIPAddress(
193 const std::string& ipAddress)
194 {
195 ARMARX_INFO << "Trying to set publisher socket";
196 int publishPort = 5557; // TODO(@rietsch): Make configurable
197 std::string pub_bind_address = "tcp://" + ipAddress + ":" + std::to_string(publishPort);
198 try
199 {
200 std::unique_lock<std::mutex> lock(publishSocketMutex_);
201 // publishSocket_.bind(pub_bind_address);
202 publishSocket_.connect(pub_bind_address);
203 publishSocketBound_.store(true);
204 publishSocket_.set(zmq::sockopt::linger, 0); // Important for socket to be non-blocking on deletion
205 ARMARX_INFO << "Controller publish socket bound to " << pub_bind_address
206 << " through ZeroMQ";
207 }
208 catch (const zmq::error_t& e)
209 {
210 ARMARX_WARNING << "ZeroMQ publish bind error: " << e.what();
211 }
212 }
213
214 template <typename NJointTaskspaceController>
215 void
217 {
218 zmq::message_t config_msg;
219 auto received = socket.recv(config_msg, zmq::recv_flags::dontwait); // Try receive message, don't wait
220 if (received)
221 {
222 // Measure time passed since last packet was received
223 auto currentTime = std::chrono::steady_clock::now();
224 std::chrono::duration<double> elapsed_seconds = currentTime - time_last_packet;
225 frequency_ms = elapsed_seconds.count() * 1000;
226 time_last_packet = currentTime;
227
228 // Parse config message into nlohmann::json
229 std::string config_json_str(static_cast<char*>(config_msg.data()), config_msg.size());
230 nlohmann::json config_json = nlohmann::json::parse(config_json_str);
231
232 // Calculate latency (might be imprecise due to clocks being not being synchronized?)
233 if (config_json.contains("timestamp") && config_json["timestamp"].is_number())
234 {
235 auto arrival_time = std::chrono::duration<double>(
236 std::chrono::system_clock::now().time_since_epoch())
237 .count();
238 double sender_time = config_json["timestamp"];
239 latency_ms = (arrival_time - sender_time) * 1000; // Convert to milliseconds
240 config_json.erase(
241 "timestamp"); // Important! Remove any data that is not part of the config
242 }
243
244 // If the sender send "publish_ip", the controller will send out robot data to that address
245 if (config_json.contains("publish_ip") && !publishSocketBound_.load())
246 {
247 setPublishIPAddress(config_json["publish_ip"].get<std::string>());
248 config_json.erase("publish_ip");
249 }
250
251 // Read into config data structure
253 typename NJointTaskspaceController::ConfigDict receivedConfig;
254 receivedConfig.read(reader, config_json);
255 auto prevCfg = this->userConfig;
256 for (auto& pair : this->limb)
257 {
258 auto limbName = pair.first;
259 auto& limb = pair.second;
260
261 auto limbConfig = prevCfg.limbs.at(limbName);
262 auto limbConfigNew = receivedConfig.limbs.at(limbName);
263 limbConfig.desiredPose = limbConfigNew.desiredPose;
264 limb->bufferConfigUserToNonRt.getWriteBuffer() =
265 limbConfig; // Overwrite user config
266 limb->bufferConfigUserToNonRt.commitWrite();
267 }
268
269 // Update hands
270 if (this->hands)
271 {
272 this->hands->updateConfig(receivedConfig.hands);
273 }
274 }
275
276 auto [rtTargetSafe, rtSafe] = this->additionalTaskUpdateStatus();
278 if (not rtTargetSafe)
279 {
281 }
282 }
283
284 template <typename NJointTaskspaceController>
285 void
287 const SensorAndControl&,
289 const DebugObserverInterfacePrx& debugObs)
290 {
291 // Log metrics
292 StringVariantBaseMap datafields;
293 datafields["frequency_ms"] = new Variant(frequency_ms);
294 datafields["latency_ms"] = new Variant(latency_ms);
295 debugObs->setDebugChannel(getClassName(), datafields);
296
297 // Do limb publish
298 for (auto& pair : this->limb)
299 {
300 if (not pair.second->rtReady.load())
301 {
302 continue;
303 }
304 this->limbPublish(pair.second, debugObs);
305 }
306
307 // If there's no publisher socket, skip rest
308 if (!publishSocketBound_.load())
309 {
310 return;
311 }
312
313 nlohmann::json stateJson;
314 stateJson["timestamp"] =
315 std::chrono::duration<double>(std::chrono::system_clock::now().time_since_epoch())
316 .count();
317
318 auto mat4ToJson = [](const Eigen::Matrix4f& m)
319 {
320 nlohmann::json j = nlohmann::json::array();
321 for (int r = 0; r < 4; ++r)
322 for (int c = 0; c < 4; ++c)
323 j.push_back(m(r, c));
324 return j;
325 };
326
327 // Extract limb data
328 for (auto& pair : this->limb)
329 {
330 if (not pair.second->rtReady.load())
331 continue;
332 auto rtData = pair.second->bufferConfigRtToOnPublish.getUpToDateReadBuffer();
333 auto rtStatus = pair.second->bufferRtStatusToOnPublish.getUpToDateReadBuffer();
334 const auto& nodeSetName = pair.second->kinematicChainName;
335 const auto& jointNames = pair.second->jointNames;
336 auto jointPosition = rtStatus.jointPosition;
337 auto jointVelocity = rtStatus.jointVelocity;
338 auto jointTorque = rtStatus.jointTorque;
339 auto desiredJointTorque = rtStatus.desiredJointTorque;
340 auto desiredJointVelocity = rtStatus.desiredJointVelocity;
341 auto desiredJointPosition = rtStatus.desiredJointPosition;
342 Eigen::Matrix4f currentPose = rtStatus.currentPose;
343 Eigen::Matrix4f desiredPose = rtStatus.desiredPose;
344
345 for (size_t i = 0; i < jointNames.size(); ++i)
346 {
347 stateJson[nodeSetName]["jointPosition"][jointNames[i]] = jointPosition(i);
348 stateJson[nodeSetName]["jointVelocity"][jointNames[i]] = jointVelocity(i);
349 stateJson[nodeSetName]["jointTorque"][jointNames[i]] = jointTorque(i);
350 stateJson[nodeSetName]["desiredJointTorque"][jointNames[i]] = desiredJointTorque(i);
351 stateJson[nodeSetName]["desiredJointVelocity"][jointNames[i]] = desiredJointVelocity(i);
352 stateJson[nodeSetName]["desiredJointPosition"][jointNames[i]] = desiredJointPosition(i);
353 }
354 stateJson[nodeSetName]["currentPose"] = mat4ToJson(currentPose);
355 stateJson[nodeSetName]["desiredPose"] = mat4ToJson(desiredPose);
356 }
357
358 // Extract hand data
359 if (this->hands)
360 {
361 for (auto& [handName, hand] : this->hands->hands)
362 {
363 const auto& jointNames = hand->jointNames;
364 const auto desiredConfig =
365 hand->bufferConfigNonRTToOnPublish.getUpToDateReadBuffer();
366 const auto currentJointPositionMap = hand->ctrl->getJointValuesMap();
367
368 for (size_t i = 0; i < jointNames.size(); ++i)
369 {
370 const auto it = currentJointPositionMap.find(jointNames[i]);
371 stateJson[handName]["jointPosition"][jointNames[i]] =
372 (it != currentJointPositionMap.end()) ? it->second : 0.F;
373 if (desiredConfig.jointPosition.has_value())
374 {
375 stateJson[handName]["desiredJointPosition"][jointNames[i]] =
376 desiredConfig.jointPosition.value()(i);
377 }
378 }
379 }
380 }
381
382 nlohmann::json metadata_json;
383 metadata_json["name"] = "robot_state";
384 metadata_json["type"] = "json";
385 metadata_json["timestamp_processed"] = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
386 std::string metadata_string = metadata_json.dump();
387
388 // Send data via ZMQ
389 const std::string stateStr = stateJson.dump();
390 zmq::message_t msg(stateStr.size());
391 std::memcpy(msg.data(), stateStr.data(), stateStr.size());
392 zmq::message_t metadata_msg(metadata_string.size());
393 memcpy(metadata_msg.data(), metadata_string.data(), metadata_string.size());
394 try
395 {
396 std::unique_lock<std::mutex> lock(publishSocketMutex_);
397 publishSocket_.send(metadata_msg, zmq::send_flags::sndmore);
398 publishSocket_.send(msg, zmq::send_flags::dontwait);
399 }
400 catch (const zmq::error_t& e)
401 {
402 ARMARX_RT_LOGF_WARN("ZeroMQ joint state publish error: %s", e.what());
403 }
404 }
405
406 template <typename NJointTaskspaceController>
408 {
409 if (publishSocketBound_.load())
410 {
411 std::unique_lock<std::mutex> lock(publishSocketMutex_);
412 publishSocket_.close();
413 publishSocketBound_.store(false);
414 }
415 }
416
417} // namespace armarx::control::njoint_controller::task_space
#define ARMARX_RT_LOGF_WARN(...)
constexpr T c
The Variant class is described here: Variants.
Definition Variant.h:224
NJointTaskspaceController(const RobotUnitPtr &robotUnit, const NJointControllerConfigPtr &config, const VirtualRobot::RobotPtr &)
Definition Base.cpp:138
typename NJointTaskspaceControllerType::ConfigDict ConfigDict
Definition Base.h:64
void limbPublish(ArmPtr &arm, const DebugObserverInterfacePrx &debugObs)
Definition Base.cpp:804
void onPublish(const SensorAndControl &, const DebugDrawerInterfacePrx &, const DebugObserverInterfacePrx &) override
NJointZeroMQTaskspaceController(const armarx::RobotUnitPtr &robotUnit, const armarx::NJointControllerConfigPtr &config, const VirtualRobot::RobotPtr &robot)
std::string getClassName(const Ice::Current &=Ice::emptyCurrent) const final
void onPublishDeactivation(const DebugDrawerInterfacePrx &, const DebugObserverInterfacePrx &) override
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
std::shared_ptr< class Robot > RobotPtr
Definition Bus.h:19
const simox::meta::EnumNames< ControllerType > ControllerTypeNames
Definition type.h:170
armarx::NJointControllerRegistration< NJointSharedMemoryTaskspaceController< base::NJointTSMixImpVelColController > > registrationControllerNJointCollisionAvoidanceTSMixedImpedanceVelocityController(armarx::control::common::ControllerTypeNames.to_name(armarx::control::common::ControllerType::SharedMemoryTSMixImpVelCol))
armarx::NJointControllerRegistration< NJointSharedMemoryTaskspaceController< base::NJointTSMixImpVelController > > registrationControllerNJointTSMixedImpedanceVelocityController(armarx::control::common::ControllerTypeNames.to_name(armarx::control::common::ControllerType::SharedMemoryTSMixImpVel))
armarx::NJointControllerRegistration< NJointSharedMemoryTaskspaceController< base::NJointTSImpController > > registrationControllerNJointTSImpedanceController(armarx::control::common::ControllerTypeNames.to_name(armarx::control::common::ControllerType::SharedMemoryTSImp))
armarx::NJointControllerRegistration< NJointSharedMemoryTaskspaceController< base::NJointTSVelController > > registrationControllerNJointTSVelocityController(armarx::control::common::ControllerTypeNames.to_name(armarx::control::common::ControllerType::SharedMemoryTSVel))
armarx::NJointControllerRegistration< NJointSharedMemoryTaskspaceController< base::NJointTSImpColController > > registrationControllerNJointCollisionAvoidanceTSImpedanceController(armarx::control::common::ControllerTypeNames.to_name(armarx::control::common::ControllerType::SharedMemoryTSImpCol))
::IceInternal::ProxyHandle<::IceProxy::armarx::DebugObserverInterface > DebugObserverInterfacePrx
std::string Variant::get< std::string >() const
Definition Variant.cpp:284
std::map< std::string, VariantBasePtr > StringVariantBaseMap
IceUtil::Handle< class RobotUnit > RobotUnitPtr
Definition FTSensor.h:34
::IceInternal::ProxyHandle<::IceProxy::armarx::DebugDrawerInterface > DebugDrawerInterfacePrx
detail::ControlThreadOutputBufferEntry SensorAndControl