TransientTopicI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
11 #include <IceStorm/Instance.h>
12 #include <IceStorm/Subscriber.h>
13 #include <IceStorm/TraceLevels.h>
14 #include <IceStorm/Util.h>
15 
16 #include <Ice/Ice.h>
17 
18 #include <list>
19 #include <algorithm>
20 
21 using namespace IceStorm;
22 using namespace std;
23 
24 namespace
25 {
26 
27  //
28  // The servant has a 1-1 association with a topic. It is used to
29  // receive events from Publishers.
30  //
31  class TransientPublisherI : public Ice::BlobjectArray
32  {
33  public:
34 
35  TransientPublisherI(const TransientTopicImplPtr& impl) :
36  _impl(impl)
37  {
38  }
39 
40  virtual bool
41  ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
42  Ice::ByteSeq&,
43  const Ice::Current& current)
44  {
45  // Use cached reads.
46  EventDataPtr event = new EventData(
47  current.operation,
48  current.mode,
49  Ice::ByteSeq(),
50  current.ctx);
51 
52  //
53  // COMPILERBUG: gcc 4.0.1 doesn't like this.
54  //
55  //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
56  Ice::ByteSeq data(inParams.first, inParams.second);
57  event->data.swap(data);
58 
60  v.push_back(event);
61  _impl->publish(false, v);
62 
63  return true;
64  }
65 
66  private:
67 
68  const TransientTopicImplPtr _impl;
69  };
70 
71  //
72  // The servant has a 1-1 association with a topic. It is used to
73  // receive events from linked Topics.
74  //
75  class TransientTopicLinkI : public TopicLink
76  {
77  public:
78 
79  TransientTopicLinkI(const TransientTopicImplPtr& impl) :
80  _impl(impl)
81  {
82  }
83 
84  virtual void
85  forward(const EventDataSeq& v, const Ice::Current& /*current*/)
86  {
87  _impl->publish(true, v);
88  }
89 
90  private:
91 
92  const TransientTopicImplPtr _impl;
93  };
94 
95 }
96 
98  const InstancePtr& instance,
99  const string& name,
100  const Ice::Identity& id) :
101  _instance(instance),
102  _name(name),
103  _id(id),
104  _destroyed(false)
105 {
106  //
107  // Create a servant per topic to receive event data. If the
108  // category is empty then we are in backwards compatibility
109  // mode. In this case the servant's identity is
110  // category=<topicname>, name=publish, otherwise the name is
111  // <instancename>/<topicname>.publish. The same applies to the
112  // link proxy.
113  //
114  // Activate the object and save a reference to give to publishers.
115  //
116  Ice::Identity pubid;
117  Ice::Identity linkid;
118  if (id.category.empty())
119  {
120  pubid.category = _name;
121  pubid.name = "publish";
122  linkid.category = _name;
123  linkid.name = "link";
124  }
125  else
126  {
127  pubid.category = id.category;
128  pubid.name = _name + ".publish";
129  linkid.category = id.category;
130  linkid.name = _name + ".link";
131  }
132 
133  _publisherPrx = _instance->publishAdapter()->add(new TransientPublisherI(this), pubid);
134  _linkPrx = TopicLinkPrx::uncheckedCast(_instance->publishAdapter()->add(new TransientTopicLinkI(this), linkid));
135 }
136 
138 {
139 }
140 
141 string
142 TransientTopicImpl::getName(const Ice::Current&) const
143 {
144  // Immutable
145  return _name;
146 }
147 
148 Ice::ObjectPrx
149 TransientTopicImpl::getPublisher(const Ice::Current&) const
150 {
151  // Immutable
152  return _publisherPrx;
153 }
154 
155 Ice::ObjectPrx
157 {
158  // Immutable
159  return _publisherPrx;
160 }
161 
162 void
163 TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
164 {
165  if (!obj)
166  {
167  TraceLevelsPtr traceLevels = _instance->traceLevels();
168  if (traceLevels->topic > 0)
169  {
170  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
171  out << _name << ": subscribe: null proxy";
172  }
173  throw InvalidSubscriber("subscriber is a null proxy");
174  }
175  Ice::Identity id = obj->ice_getIdentity();
176  TraceLevelsPtr traceLevels = _instance->traceLevels();
177  QoS qos = origQoS;
178  if (traceLevels->topic > 0)
179  {
180  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
181  out << _name << ": subscribe: " << _instance->communicator()->identityToString(id);
182 
183  if (traceLevels->topic > 1)
184  {
185  out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
186  << " QoS: ";
187  for (QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
188  {
189  if (p != qos.begin())
190  {
191  out << ',';
192  }
193  out << '[' << p->first << "," << p->second << ']';
194  }
195  }
196  }
197 
198  string reliability = "oneway";
199  {
200  QoS::iterator p = qos.find("reliability");
201  if (p != qos.end())
202  {
203  reliability = p->second;
204  qos.erase(p);
205  }
206  }
207 
208  Ice::ObjectPrx newObj = obj;
209  if (reliability == "batch")
210  {
211  if (newObj->ice_isDatagram())
212  {
213  newObj = newObj->ice_batchDatagram();
214  }
215  else
216  {
217  newObj = newObj->ice_batchOneway();
218  }
219  }
220  else if (reliability == "twoway")
221  {
222  newObj = newObj->ice_twoway();
223  }
224  else if (reliability == "twoway ordered")
225  {
226  qos["reliability"] = "ordered";
227  newObj = newObj->ice_twoway();
228  }
229  else // reliability == "oneway"
230  {
231  if (reliability != "oneway" && traceLevels->subscriber > 0)
232  {
233  Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
234  out << reliability << " mode not understood.";
235  }
236  if (!newObj->ice_isDatagram())
237  {
238  newObj = newObj->ice_oneway();
239  }
240  }
241 
242  Lock sync(*this);
243  SubscriberRecord record;
244  record.id = id;
245  record.obj = newObj;
246  record.theQoS = qos;
247  record.topicName = _name;
248  record.link = false;
249  record.cost = 0;
250 
251  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
252  if (p != _subscribers.end())
253  {
254  // If we already have this subscriber remove it from our
255  // subscriber list and remove it from the database.
256  (*p)->destroy();
257  _subscribers.erase(p);
258  }
259 
260  SubscriberPtr subscriber = Subscriber::create(_instance, record);
261  _subscribers.push_back(subscriber);
262 }
263 
264 Ice::ObjectPrx
265 TransientTopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&)
266 {
267  if (!obj)
268  {
269  TraceLevelsPtr traceLevels = _instance->traceLevels();
270  if (traceLevels->topic > 0)
271  {
272  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
273  out << _name << ": subscribe: null proxy";
274  }
275  throw InvalidSubscriber("subscriber is a null proxy");
276  }
277  Ice::Identity id = obj->ice_getIdentity();
278 
279  TraceLevelsPtr traceLevels = _instance->traceLevels();
280  if (traceLevels->topic > 0)
281  {
282  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
283  out << _name << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
284 
285  if (traceLevels->topic > 1)
286  {
287  out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
288  << " QoS: ";
289  for (QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
290  {
291  if (p != qos.begin())
292  {
293  out << ',';
294  }
295 
296  }
297  }
298  }
299 
300  Lock sync(*this);
301 
302  SubscriberRecord record;
303  record.id = id;
304  record.obj = obj;
305  record.theQoS = qos;
306  record.topicName = _name;
307  record.link = false;
308  record.cost = 0;
309 
310  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
311  if (p != _subscribers.end())
312  {
313  throw AlreadySubscribed();
314  }
315 
316  SubscriberPtr subscriber = Subscriber::create(_instance, record);
317  _subscribers.push_back(subscriber);
318 
319  return subscriber->proxy();
320 }
321 
322 void
323 TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
324 {
325  TraceLevelsPtr traceLevels = _instance->traceLevels();
326  if (!subscriber)
327  {
328  if (traceLevels->topic > 0)
329  {
330  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
331  out << _name << ": unsubscribe: null proxy";
332  }
333  throw InvalidSubscriber("subscriber is a null proxy");
334  }
335 
336  Ice::Identity id = subscriber->ice_getIdentity();
337 
338  if (traceLevels->topic > 0)
339  {
340  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
341  out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
342  if (traceLevels->topic > 1)
343  {
344  out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
345  }
346  }
347 
348  Lock sync(*this);
349  // First remove the subscriber from the subscribers list. Note
350  // that its possible that the subscriber isn't in the list, but is
351  // in the database if the subscriber was locally reaped.
352  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
353  if (p != _subscribers.end())
354  {
355  (*p)->destroy();
356  _subscribers.erase(p);
357  }
358 }
359 
362 {
363  // immutable
364  return _linkPrx;
365 }
366 
367 void
368 TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
369 {
370  TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
371  TopicLinkPrx link = internal->getLinkProxy();
372 
373  TraceLevelsPtr traceLevels = _instance->traceLevels();
374  if (traceLevels->topic > 0)
375  {
376  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
377  out << _name << ": link " << _instance->communicator()->identityToString(topic->ice_getIdentity())
378  << " cost " << cost;
379  }
380 
381  Lock sync(*this);
382 
383  Ice::Identity id = topic->ice_getIdentity();
384 
385  SubscriberRecord record;
386  record.id = id;
387  record.obj = link;
388  record.theTopic = topic;
389  record.topicName = _name;
390  record.link = true;
391  record.cost = cost;
392 
393  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
394  if (p != _subscribers.end())
395  {
396  string name = IceStormInternal::identityToTopicName(id);
397  LinkExists ex;
398  ex.name = name;
399  throw ex;
400  }
401 
402  SubscriberPtr subscriber = Subscriber::create(_instance, record);
403  _subscribers.push_back(subscriber);
404 }
405 
406 void
407 TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
408 {
409  Lock sync(*this);
410  if (_destroyed)
411  {
412  throw Ice::ObjectNotExistException(__FILE__, __LINE__);
413  }
414 
415  Ice::Identity id = topic->ice_getIdentity();
416 
417  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
418  if (p == _subscribers.end())
419  {
420  string name = IceStormInternal::identityToTopicName(id);
421  TraceLevelsPtr traceLevels = _instance->traceLevels();
422  if (traceLevels->topic > 0)
423  {
424  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
425  out << _name << ": unlink " << name << " failed - not linked";
426  }
427 
428  NoSuchLink ex;
429  ex.name = name;
430  throw ex;
431  }
432 
433  TraceLevelsPtr traceLevels = _instance->traceLevels();
434  if (traceLevels->topic > 0)
435  {
436  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
437  out << _name << " unlink " << _instance->communicator()->identityToString(id);
438  }
439 
440  // Remove the subscriber from the subscribers list. Note
441  // that its possible that the subscriber isn't in the list, but is
442  // in the database if the subscriber was locally reaped.
443  p = find(_subscribers.begin(), _subscribers.end(), id);
444  if (p != _subscribers.end())
445  {
446  (*p)->destroy();
447  _subscribers.erase(p);
448  }
449 }
450 
451 LinkInfoSeq
452 TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const
453 {
454  Lock sync(*this);
455  LinkInfoSeq seq;
456  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
457  {
458  SubscriberRecord record = (*p)->record();
459  if (record.link && !(*p)->errored())
460  {
461  LinkInfo info;
462  info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
463  info.cost = record.cost;
464  info.theTopic = record.theTopic;
465  seq.push_back(info);
466  }
467  }
468  return seq;
469 }
470 
471 Ice::IdentitySeq
472 TransientTopicImpl::getSubscribers(const Ice::Current&) const
473 {
474  IceUtil::Mutex::Lock sync(*this);
475 
476  Ice::IdentitySeq subscribers;
477  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
478  {
479  subscribers.push_back((*p)->id());
480  }
481  return subscribers;
482 }
483 
484 void
485 TransientTopicImpl::destroy(const Ice::Current&)
486 {
487  Lock sync(*this);
488 
489  if (_destroyed)
490  {
491  throw Ice::ObjectNotExistException(__FILE__, __LINE__);
492  }
493  _destroyed = true;
494 
495  TraceLevelsPtr traceLevels = _instance->traceLevels();
496  if (traceLevels->topic > 0)
497  {
498  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
499  out << _name << ": destroy";
500  }
501 
502  try
503  {
504  _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
505  _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
506  }
507  catch (const Ice::ObjectAdapterDeactivatedException&)
508  {
509  // Ignore -- this could occur on shutdown.
510  }
511 
512  // Destroy all of the subscribers.
513  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
514  {
515  (*p)->destroy();
516  }
517  _subscribers.clear();
518 }
519 
520 void
521 TransientTopicImpl::reap(const Ice::IdentitySeq&, const Ice::Current&)
522 {
523 }
524 
525 bool
527 {
528  Lock sync(*this);
529  return _destroyed;
530 }
531 
534 {
535  // immutable
536  return _id;
537 }
538 
539 void
540 TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
541 {
542  //
543  // Copy of the subscriber list so that event publishing can occur
544  // in parallel.
545  //
546  vector<SubscriberPtr> copy;
547  {
548  Lock sync(*this);
549  copy = _subscribers;
550  }
551 
552  //
553  // Queue each event, gathering a list of those subscribers that
554  // must be reaped.
555  //
556  vector<Ice::Identity> e;
557  for (vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
558  {
559  if (!(*p)->queue(forwarded, events) && (*p)->reap())
560  {
561  e.push_back((*p)->id());
562  }
563  }
564 
565  //
566  // Run through the error list removing those subscribers that are
567  // in error from the subscriber list.
568  //
569  if (!e.empty())
570  {
571  Lock sync(*this);
572  for (vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
573  {
574  //
575  // Its possible for the subscriber to already have been
576  // removed since the copy is iterated over outside of
577  // mutex protection.
578  //
579  // Note that although this could be quicker if we used a
580  // map, the most optimal case should be pushing around
581  // events not searching for a particular subscriber.
582  //
583  // The subscriber is immediately destroyed & removed from
584  // the _subscribers list. Add the subscriber to a list of
585  // error'd subscribers and remove it from the database on
586  // the next reap.
587  //
588  vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
589  if (q != _subscribers.end())
590  {
591  SubscriberPtr subscriber = *q;
592  //
593  // Destroy the subscriber.
594  //
595  subscriber->destroy();
596  _subscribers.erase(q);
597  }
598  }
599  }
600 }
601 
602 void
604 {
605  Lock sync(*this);
606 
607  // Shutdown each subscriber. This waits for the event queues to drain.
608  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
609  {
610  (*p)->shutdown();
611  }
612 }
IceStorm::TransientTopicImpl::getSubscribers
virtual Ice::IdentitySeq getSubscribers(const Ice::Current &) const
Definition: TransientTopicI.cpp:472
IceStorm
Definition: DBTypes.ice:22
IceStorm::TransientTopicImpl::getPublisher
virtual Ice::ObjectPrx getPublisher(const Ice::Current &) const
Definition: TransientTopicI.cpp:149
IceStorm::TransientTopicImpl::TransientTopicImpl
TransientTopicImpl(const InstancePtr &, const std::string &, const Ice::Identity &)
Definition: TransientTopicI.cpp:97
IceStorm::EventDataSeq
std::deque< ::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
Definition: IceStormInternal.h:463
IceStorm::TransientTopicImpl::unlink
virtual void unlink(const TopicPrx &, const Ice::Current &)
Definition: TransientTopicI.cpp:407
IceStorm::TransientTopicImpl::unsubscribe
virtual void unsubscribe(const Ice::ObjectPrx &, const Ice::Current &)
Definition: TransientTopicI.cpp:323
IceStorm::TopicInternal::getLinkProxy
idempotent TopicLink * getLinkProxy()
Retrieve a proxy to the TopicLink interface.
IceStorm::TransientTopicImpl::getName
virtual std::string getName(const Ice::Current &) const
Definition: TransientTopicI.cpp:142
IceStorm::Subscriber::create
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
Definition: Subscriber.cpp:522
IceStorm::TransientTopicImpl::subscribe
virtual void subscribe(const QoS &, const Ice::ObjectPrx &, const Ice::Current &)
Definition: TransientTopicI.cpp:163
Util.h
IceStorm::SubscriberRecord::theQoS
::IceStorm::QoS theQoS
Definition: SubscriberRecord.h:220
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:214
IceStorm::TransientTopicImpl::reap
virtual void reap(const Ice::IdentitySeq &, const Ice::Current &)
Definition: TransientTopicI.cpp:521
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:523
copy
Use of this software is granted under one of the following two to be chosen freely by the user Boost Software License Version Marcin Kalicinski Permission is hereby free of to any person or organization obtaining a copy of the software and accompanying documentation covered by this and transmit the and to prepare derivative works of the and to permit third parties to whom the Software is furnished to do all subject to the including the above license this restriction and the following must be included in all copies of the in whole or in and all derivative works of the unless such copies or derivative works are solely in the form of machine executable object code generated by a source language processor THE SOFTWARE IS PROVIDED AS WITHOUT WARRANTY OF ANY EXPRESS OR INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF FITNESS FOR A PARTICULAR TITLE AND NON INFRINGEMENT IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER WHETHER IN TORT OR ARISING OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE The MIT Marcin Kalicinski Permission is hereby free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to copy
Definition: license.txt:39
TransientTopicI.h
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
IceStorm::TransientTopicImpl::id
Ice::Identity id() const
Definition: TransientTopicI.cpp:533
IceStorm::TransientTopicImpl::subscribeAndGetPublisher
virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS &, const Ice::ObjectPrx &, const Ice::Current &)
Definition: TransientTopicI.cpp:265
IceStorm::TransientTopicImpl::~TransientTopicImpl
~TransientTopicImpl()
Definition: TransientTopicI.cpp:137
IceStorm::EventData
The event data.
Definition: IceStormInternal.h:429
IceStorm::SubscriberRecord::obj
::Ice::ObjectPrx obj
Definition: SubscriberRecord.h:219
IceStorm::TransientTopicImpl::publish
void publish(bool, const EventDataSeq &)
Definition: TransientTopicI.cpp:540
Subscriber.h
IceStorm::TransientTopicImpl::destroyed
bool destroyed() const
Definition: TransientTopicI.cpp:526
q
#define q
IceStormInternal::describeEndpoints
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition: Util.cpp:51
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:438
IceStorm::TransientTopicImpl::shutdown
void shutdown()
Definition: TransientTopicI.cpp:603
IceStorm::TransientTopicImpl::getLinkInfoSeq
virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current &) const
Definition: TransientTopicI.cpp:452
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
IceStorm::SubscriberRecord::cost
::Ice::Int cost
Definition: SubscriberRecord.h:221
IceStorm::SubscriberRecord::theTopic
::IceStorm::TopicPrx theTopic
Definition: SubscriberRecord.h:222
TraceLevels.h
IceStormInternal::identityToTopicName
std::string identityToTopicName(const Ice::Identity &)
Definition: Util.cpp:23
std
Definition: Application.h:66
IceUtil::Handle
Definition: forward_declarations.h:29
IceInternal::ProxyHandle< ::IceProxy::IceStorm::TopicLink >
IceStorm::TransientTopicImpl::link
virtual void link(const TopicPrx &, Ice::Int, const Ice::Current &)
Definition: TransientTopicI.cpp:368
armarx::VariantType::Int
const VariantTypeId Int
Definition: Variant.h:916
IceStorm::SubscriberRecord::id
::Ice::Identity id
Definition: SubscriberRecord.h:217
IceStorm::TransientTopicImpl::getNonReplicatedPublisher
virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current &) const
Definition: TransientTopicI.cpp:156
IceStorm::SubscriberRecord::link
bool link
Definition: SubscriberRecord.h:218
IceStorm::TransientTopicImpl::destroy
virtual void destroy(const Ice::Current &)
Definition: TransientTopicI.cpp:485
Instance.h
IceStorm::SubscriberRecord::topicName
::std::string topicName
Definition: SubscriberRecord.h:216