TopicI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <algorithm>
11 
12 #include <Ice/LoggerUtil.h>
13 #include <IceStorm/Instance.h>
14 #include <IceStorm/NodeI.h>
15 #include <IceStorm/Observers.h>
16 #include <IceStorm/Subscriber.h>
17 #include <IceStorm/TopicI.h>
18 #include <IceStorm/TraceLevels.h>
19 #include <IceStorm/Util.h>
20 
21 using namespace std;
22 using namespace IceStorm;
23 using namespace IceStormElection;
24 using namespace IceStormInternal;
25 
26 namespace
27 {
28 
29  void
30  logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
31  {
32  Ice::Error error(com->getLogger());
33  error << "LMDB error: " << ex;
34  }
35 
36  //
37  // The servant has a 1-1 association with a topic. It is used to
38  // receive events from Publishers.
39  //
40  class PublisherI : public Ice::BlobjectArray
41  {
42  public:
43  PublisherI(const TopicImplPtr& topic, const PersistentInstancePtr& instance) :
44  _topic(topic), _instance(instance)
45  {
46  }
47 
48  virtual bool
49  ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
50  Ice::ByteSeq&,
51  const Ice::Current& current)
52  {
53  // The publish call does a cached read.
54  EventDataPtr event =
55  new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
56 
57  //
58  // COMPILERBUG: gcc 4.0.1 doesn't like this.
59  //
60  //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
61  Ice::ByteSeq data(inParams.first, inParams.second);
62  event->data.swap(data);
63 
65  v.push_back(event);
66  _topic->publish(false, v);
67 
68  return true;
69  }
70 
71  private:
72  const TopicImplPtr _topic;
73  const PersistentInstancePtr _instance;
74  };
75 
76  //
77  // The servant has a 1-1 association with a topic. It is used to
78  // receive events from linked Topics.
79  //
80  class TopicLinkI : public TopicLink
81  {
82  public:
83  TopicLinkI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
84  _impl(impl), _instance(instance)
85  {
86  }
87 
88  virtual void
89  forward(const EventDataSeq& v, const Ice::Current& /*current*/)
90  {
91  // The publish call does a cached read.
92  _impl->publish(true, v);
93  }
94 
95  private:
96  const TopicImplPtr _impl;
97  const PersistentInstancePtr _instance;
98  };
99 
100  class TopicI : public TopicInternal
101  {
102  public:
103  TopicI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
104  _impl(impl), _instance(instance)
105  {
106  }
107 
108  virtual string
109  getName(const Ice::Current&) const
110  {
111  // Use cached reads.
112  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
113  return _impl->getName();
114  }
115 
116  virtual Ice::ObjectPrx
117  getPublisher(const Ice::Current&) const
118  {
119  // Use cached reads.
120  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
121  return _impl->getPublisher();
122  }
123 
124  virtual Ice::ObjectPrx
125  getNonReplicatedPublisher(const Ice::Current&) const
126  {
127  // Use cached reads.
128  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
129  return _impl->getNonReplicatedPublisher();
130  }
131 
132  virtual Ice::ObjectPrx
133  subscribeAndGetPublisher(const QoS& qos,
134  const Ice::ObjectPrx& obj,
135  const Ice::Current& current)
136  {
137  while (true)
138  {
139  Ice::Long generation = -1;
140  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
141  if (master)
142  {
143  try
144  {
145  return master->subscribeAndGetPublisher(qos, obj);
146  }
147  catch (const Ice::ConnectFailedException&)
148  {
149  _instance->node()->recovery(generation);
150  continue;
151  }
152  catch (const Ice::TimeoutException&)
153  {
154  _instance->node()->recovery(generation);
155  continue;
156  }
157  }
158  else
159  {
160  FinishUpdateHelper unlock(_instance->node());
161  return _impl->subscribeAndGetPublisher(qos, obj);
162  }
163  }
164  }
165 
166  virtual void
167  unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current& current)
168  {
169  while (true)
170  {
171  Ice::Long generation = -1;
172  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
173  if (master)
174  {
175  try
176  {
177  master->unsubscribe(subscriber);
178  }
179  catch (const Ice::ConnectFailedException&)
180  {
181  _instance->node()->recovery(generation);
182  continue;
183  }
184  catch (const Ice::TimeoutException&)
185  {
186  _instance->node()->recovery(generation);
187  continue;
188  }
189  }
190  else
191  {
192  FinishUpdateHelper unlock(_instance->node());
193  _impl->unsubscribe(subscriber);
194  }
195  break;
196  }
197  }
198 
199  virtual TopicLinkPrx
200  getLinkProxy(const Ice::Current&)
201  {
202  // Use cached reads.
203  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
204  return _impl->getLinkProxy();
205  }
206 
207  virtual void
208  reap(const Ice::IdentitySeq& ids, const Ice::Current& /*current*/)
209  {
210  NodeIPtr node = _instance->node();
211  if (!node->updateMaster(__FILE__, __LINE__))
212  {
213  throw ReapWouldBlock();
214  }
215  FinishUpdateHelper unlock(node);
216  _impl->reap(ids);
217  }
218 
219  virtual void
220  link(const TopicPrx& topic, Ice::Int cost, const Ice::Current& current)
221  {
222  while (true)
223  {
224  Ice::Long generation = -1;
225  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
226  if (master)
227  {
228  try
229  {
230  master->link(topic, cost);
231  }
232  catch (const Ice::ConnectFailedException&)
233  {
234  _instance->node()->recovery(generation);
235  continue;
236  }
237  catch (const Ice::TimeoutException&)
238  {
239  _instance->node()->recovery(generation);
240  continue;
241  }
242  }
243  else
244  {
245  FinishUpdateHelper unlock(_instance->node());
246  _impl->link(topic, cost);
247  }
248  break;
249  }
250  }
251 
252  virtual void
253  unlink(const TopicPrx& topic, const Ice::Current& current)
254  {
255  while (true)
256  {
257  Ice::Long generation = -1;
258  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
259  if (master)
260  {
261  try
262  {
263  master->unlink(topic);
264  }
265  catch (const Ice::ConnectFailedException&)
266  {
267  _instance->node()->recovery(generation);
268  continue;
269  }
270  catch (const Ice::TimeoutException&)
271  {
272  _instance->node()->recovery(generation);
273  continue;
274  }
275  }
276  else
277  {
278  FinishUpdateHelper unlock(_instance->node());
279  _impl->unlink(topic);
280  }
281  break;
282  }
283  }
284 
285  virtual LinkInfoSeq
286  getLinkInfoSeq(const Ice::Current&) const
287  {
288  // Use cached reads.
289  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
290  return _impl->getLinkInfoSeq();
291  }
292 
293  virtual Ice::IdentitySeq
294  getSubscribers(const Ice::Current&) const
295  {
296  return _impl->getSubscribers();
297  }
298 
299  virtual void
300  destroy(const Ice::Current& current)
301  {
302  while (true)
303  {
304  Ice::Long generation = -1;
305  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
306  if (master)
307  {
308  try
309  {
310  master->destroy();
311  }
312  catch (const Ice::ConnectFailedException&)
313  {
314  _instance->node()->recovery(generation);
315  continue;
316  }
317  catch (const Ice::TimeoutException&)
318  {
319  _instance->node()->recovery(generation);
320  continue;
321  }
322  }
323  else
324  {
325  FinishUpdateHelper unlock(_instance->node());
326  _impl->destroy();
327  }
328  break;
329  }
330  }
331 
332  private:
333  TopicPrx
334  getMasterFor(const Ice::Current& cur,
335  Ice::Long& generation,
336  const char* file,
337  int line) const
338  {
339  NodeIPtr node = _instance->node();
340  Ice::ObjectPrx master;
341  if (node)
342  {
343  master = _instance->node()->startUpdate(generation, file, line);
344  }
345  return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) : TopicPrx();
346  }
347 
348  const TopicImplPtr _impl;
349  const PersistentInstancePtr _instance;
350  };
351 
352 } // namespace
353 
354 TopicImpl::TopicImpl(const PersistentInstancePtr& instance,
355  const string& name,
356  const Ice::Identity& id,
357  const SubscriberRecordSeq& subscribers) :
358  _instance(instance),
359  _name(name),
360  _id(id),
361  _destroyed(false),
362  _lluMap(_instance->lluMap()),
363  _subscriberMap(_instance->subscriberMap())
364 {
365  try
366  {
367  __setNoDelete(true);
368 
369  // TODO: If we want to improve the performance of the
370  // non-replicated case we could allocate a null-topic impl here.
371  _servant = new TopicI(this, instance);
372 
373  //
374  // Create a servant per topic to receive event data. If the
375  // category is empty then we are in backwards compatibility
376  // mode. In this case the servant's identity is
377  // category=<topicname>, name=publish, otherwise the name is
378  // <instancename>/<topicname>.publish. The same applies to the
379  // link proxy.
380  //
381  // Activate the object and save a reference to give to publishers.
382  //
383  Ice::Identity pubid;
384  Ice::Identity linkid;
385  if (id.category.empty())
386  {
387  pubid.category = _name;
388  pubid.name = "publish";
389  linkid.category = _name;
390  linkid.name = "link";
391  }
392  else
393  {
394  pubid.category = id.category;
395  pubid.name = _name + ".publish";
396  linkid.category = id.category;
397  linkid.name = _name + ".link";
398  }
399 
400  _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid);
401  _linkPrx = TopicLinkPrx::uncheckedCast(
402  _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid));
403 
404  //
405  // Re-establish subscribers.
406  //
407  for (SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end();
408  ++p)
409  {
410  Ice::Identity id = p->obj->ice_getIdentity();
411  TraceLevelsPtr traceLevels = _instance->traceLevels();
412  if (traceLevels->topic > 0)
413  {
414  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
415  out << _name << " recreate " << _instance->communicator()->identityToString(id);
416  if (traceLevels->topic > 1)
417  {
418  out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
419  }
420  }
421 
422  try
423  {
424  //
425  // Create the subscriber object add it to the set of
426  // subscribers.
427  //
428  SubscriberPtr subscriber = Subscriber::create(_instance, *p);
429  _subscribers.push_back(subscriber);
430  }
431  catch (const Ice::Exception& ex)
432  {
433  Ice::Warning out(traceLevels->logger);
434  out << _name << " recreate " << _instance->communicator()->identityToString(id);
435  if (traceLevels->topic > 1)
436  {
437  out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
438  }
439  out << " failed: " << ex;
440  }
441  }
442 
443  if (_instance->observer())
444  {
445  _observer.attach(
446  _instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0));
447  }
448  }
449  catch (...)
450  {
451  shutdown();
452  __setNoDelete(false);
453  throw;
454  }
455  __setNoDelete(false);
456 }
457 
458 string
460 {
461  // Immutable
462  return _name;
463 }
464 
465 Ice::ObjectPrx
467 {
468  // Immutable
469  if (_instance->publisherReplicaProxy())
470  {
471  return _instance->publisherReplicaProxy()->ice_identity(_publisherPrx->ice_getIdentity());
472  }
473  return _publisherPrx;
474 }
475 
476 Ice::ObjectPrx
478 {
479  // If there is an adapter id configured then we're using icegrid
480  // so create an indirect proxy, otherwise create a direct proxy.
481  if (!_publisherPrx->ice_getAdapterId().empty())
482  {
483  return _instance->publishAdapter()->createIndirectProxy(_publisherPrx->ice_getIdentity());
484  }
485  else
486  {
487  return _instance->publishAdapter()->createDirectProxy(_publisherPrx->ice_getIdentity());
488  }
489 }
490 
491 namespace
492 {
493  void
494  trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<SubscriberPtr>& s)
495  {
496  out << '[';
497  for (vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p)
498  {
499  if (p != s.begin())
500  {
501  out << ",";
502  }
503  out << instance->communicator()->identityToString((*p)->id());
504  }
505  out << "]";
506  }
507 } // namespace
508 
509 Ice::ObjectPrx
510 TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
511 {
512  if (!obj)
513  {
514  TraceLevelsPtr traceLevels = _instance->traceLevels();
515  if (traceLevels->topic > 0)
516  {
517  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
518  out << _name << ": subscribeAndGetPublisher: null proxy";
519  }
520  throw InvalidSubscriber("subscriber is a null proxy");
521  }
522  Ice::Identity id = obj->ice_getIdentity();
523 
524  TraceLevelsPtr traceLevels = _instance->traceLevels();
525  if (traceLevels->topic > 0)
526  {
527  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
528  out << _name
529  << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
530 
531  if (traceLevels->topic > 1)
532  {
533  out << " endpoints: " << IceStormInternal::describeEndpoints(obj) << " QoS: ";
534  for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
535  {
536  if (p != qos.begin())
537  {
538  out << ',';
539  }
540  }
541  out << " subscriptions: ";
542  trace(out, _instance, _subscribers);
543  }
544  }
545 
546  IceUtil::Mutex::Lock sync(_subscribersMutex);
547 
548  SubscriberRecord record;
549  record.id = id;
550  record.obj = obj;
551  record.theQoS = qos;
552  record.topicName = _name;
553  record.link = false;
554  record.cost = 0;
555 
556  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
557  if (p != _subscribers.end())
558  {
559  throw AlreadySubscribed();
560  }
561 
562  LogUpdate llu;
563 
564  SubscriberPtr subscriber = Subscriber::create(_instance, record);
565  try
566  {
567  IceDB::ReadWriteTxn txn(_instance->dbEnv());
568 
570  key.topic = _id;
571  key.id = subscriber->id();
572 
573  _subscriberMap.put(txn, key, record);
574 
575  llu = getIncrementedLLU(txn, _lluMap);
576 
577  txn.commit();
578  }
579  catch (const IceDB::LMDBException& ex)
580  {
581  logError(_instance->communicator(), ex);
582  throw; // will become UnknownException in caller
583  }
584 
585  _subscribers.push_back(subscriber);
586 
587  _instance->observers()->addSubscriber(llu, _name, record);
588 
589  return subscriber->proxy();
590 }
591 
592 void
593 TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber)
594 {
595  TraceLevelsPtr traceLevels = _instance->traceLevels();
596  if (!subscriber)
597  {
598  if (traceLevels->topic > 0)
599  {
600  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
601  out << _name << ": unsubscribe: null proxy";
602  }
603  throw InvalidSubscriber("subscriber is a null proxy");
604  }
605 
606  Ice::Identity id = subscriber->ice_getIdentity();
607 
608  if (traceLevels->topic > 0)
609  {
610  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
611  out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
612 
613  if (traceLevels->topic > 1)
614  {
615  out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
616  trace(out, _instance, _subscribers);
617  }
618  }
619 
620  IceUtil::Mutex::Lock sync(_subscribersMutex);
621  Ice::IdentitySeq ids;
622  ids.push_back(id);
623  removeSubscribers(ids);
624 }
625 
628 {
629  // immutable
630  if (_instance->publisherReplicaProxy())
631  {
632  return TopicLinkPrx::uncheckedCast(
633  _instance->publisherReplicaProxy()->ice_identity(_linkPrx->ice_getIdentity()));
634  }
635  return _linkPrx;
636 }
637 
638 void
639 TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
640 {
641  TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
642  TopicLinkPrx link = internal->getLinkProxy();
643 
644  TraceLevelsPtr traceLevels = _instance->traceLevels();
645  if (traceLevels->topic > 0)
646  {
647  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
648  out << _name << ": link "
649  << _instance->communicator()->identityToString(topic->ice_getIdentity()) << " cost "
650  << cost;
651  }
652 
653  IceUtil::Mutex::Lock sync(_subscribersMutex);
654 
655  Ice::Identity id = topic->ice_getIdentity();
656 
657  SubscriberRecord record;
658  record.id = id;
659  record.obj = link;
660  record.theTopic = topic;
661  record.topicName = _name;
662  record.link = true;
663  record.cost = cost;
664 
665  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
666  if (p != _subscribers.end())
667  {
668  string name = IceStormInternal::identityToTopicName(id);
669  LinkExists ex;
670  ex.name = name;
671  throw ex;
672  }
673 
674  LogUpdate llu;
675 
676  SubscriberPtr subscriber = Subscriber::create(_instance, record);
677 
678  try
679  {
680  IceDB::ReadWriteTxn txn(_instance->dbEnv());
681 
683  key.topic = _id;
684  key.id = id;
685 
686  _subscriberMap.put(txn, key, record);
687 
688  llu = getIncrementedLLU(txn, _lluMap);
689 
690  txn.commit();
691  }
692  catch (const IceDB::LMDBException& ex)
693  {
694  logError(_instance->communicator(), ex);
695  throw; // will become UnknownException in caller
696  }
697 
698  _subscribers.push_back(subscriber);
699 
700  _instance->observers()->addSubscriber(llu, _name, record);
701 }
702 
703 void
705 {
706  IceUtil::Mutex::Lock sync(_subscribersMutex);
707  if (_destroyed)
708  {
709  throw Ice::ObjectNotExistException(__FILE__, __LINE__);
710  }
711 
712  Ice::Identity id = topic->ice_getIdentity();
713 
714  vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id);
715  if (p == _subscribers.end())
716  {
717  string name = IceStormInternal::identityToTopicName(id);
718  TraceLevelsPtr traceLevels = _instance->traceLevels();
719  if (traceLevels->topic > 0)
720  {
721  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
722  out << _name << ": unlink " << name << " failed - not linked";
723  }
724 
725  NoSuchLink ex;
726  ex.name = name;
727  throw ex;
728  }
729 
730  TraceLevelsPtr traceLevels = _instance->traceLevels();
731  if (traceLevels->topic > 0)
732  {
733  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
734  out << _name << " unlink " << _instance->communicator()->identityToString(id);
735  }
736 
737  Ice::IdentitySeq ids;
738  ids.push_back(id);
739  removeSubscribers(ids);
740 }
741 
742 void
743 TopicImpl::reap(const Ice::IdentitySeq& ids)
744 {
745  IceUtil::Mutex::Lock sync(_subscribersMutex);
746 
747  TraceLevelsPtr traceLevels = _instance->traceLevels();
748  if (traceLevels->topic > 0)
749  {
750  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
751  out << _name << ": reap ";
752  for (Ice::IdentitySeq::const_iterator p = ids.begin(); p != ids.end(); ++p)
753  {
754  if (p != ids.begin())
755  {
756  out << ",";
757  }
758  out << _instance->communicator()->identityToString(*p);
759  }
760  }
761 
762  removeSubscribers(ids);
763 }
764 
765 void
767 {
768  IceUtil::Mutex::Lock sync(_subscribersMutex);
769  _servant = 0;
770 
771  // Shutdown each subscriber. This waits for the event queues to drain.
772  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
773  ++p)
774  {
775  (*p)->shutdown();
776  }
777 
778  _observer.detach();
779 }
780 
781 LinkInfoSeq
783 {
784  IceUtil::Mutex::Lock sync(_subscribersMutex);
785 
786  LinkInfoSeq seq;
787  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
788  ++p)
789  {
790  SubscriberRecord record = (*p)->record();
791  if (record.link && !(*p)->errored())
792  {
793  LinkInfo info;
794  info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
795  info.cost = record.cost;
796  info.theTopic = record.theTopic;
797  seq.push_back(info);
798  }
799  }
800  return seq;
801 }
802 
803 Ice::IdentitySeq
805 {
806  IceUtil::Mutex::Lock sync(_subscribersMutex);
807 
808  Ice::IdentitySeq subscribers;
809  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
810  ++p)
811  {
812  subscribers.push_back((*p)->id());
813  }
814  return subscribers;
815 }
816 
817 void
819 {
820  IceUtil::Mutex::Lock sync(_subscribersMutex);
821 
822  if (_destroyed)
823  {
824  throw Ice::ObjectNotExistException(__FILE__, __LINE__);
825  }
826  _destroyed = true;
827 
828  TraceLevelsPtr traceLevels = _instance->traceLevels();
829  if (traceLevels->topic > 0)
830  {
831  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
832  out << _name << ": destroy";
833  }
834 
835  // destroyInternal clears out the topic content.
836  LogUpdate llu = {0, 0};
837  _instance->observers()->destroyTopic(destroyInternal(llu, true), _name);
838 
839  _observer.detach();
840 }
841 
844 {
845  IceUtil::Mutex::Lock sync(_subscribersMutex);
846 
847  TopicContent content;
848  content.id = _id;
849  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
850  ++p)
851  {
852  // Don't return errored subscribers (subscribers that have
853  // errored out, but not reaped due to a failure with the
854  // master). This means we can avoid the reaping step later.
855  if (!(*p)->errored())
856  {
857  content.records.push_back((*p)->record());
858  }
859  }
860  return content;
861 }
862 
863 void
865 {
866  IceUtil::Mutex::Lock sync(_subscribersMutex);
867 
868  // We do this with two scans. The first runs through the subscribers
869  // that we have and removes those not in the init list. The second
870  // runs through the init list and add the ones that don't
871  // exist.
872 
873  {
874  vector<SubscriberPtr>::iterator p = _subscribers.begin();
875  while (p != _subscribers.end())
876  {
877  SubscriberRecordSeq::const_iterator q;
878  for (q = records.begin(); q != records.end(); ++q)
879  {
880  if ((*p)->id() == q->id)
881  {
882  break;
883  }
884  }
885  // The subscriber doesn't exist in the incoming subscriber
886  // set so destroy it.
887  if (q == records.end())
888  {
889  (*p)->destroy();
890  p = _subscribers.erase(p);
891  }
892  else
893  {
894  // Otherwise reset the reaped status if necessary.
895  (*p)->resetIfReaped();
896  ++p;
897  }
898  }
899  }
900 
901  for (SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p)
902  {
903  vector<SubscriberPtr>::iterator q;
904  for (q = _subscribers.begin(); q != _subscribers.end(); ++q)
905  {
906  if ((*q)->id() == p->id)
907  {
908  break;
909  }
910  }
911  if (q == _subscribers.end())
912  {
913  SubscriberPtr subscriber = Subscriber::create(_instance, *p);
914  _subscribers.push_back(subscriber);
915  }
916  }
917 }
918 
919 bool
921 {
922  IceUtil::Mutex::Lock sync(_subscribersMutex);
923  return _destroyed;
924 }
925 
928 {
929  // immutable
930  return _id;
931 }
932 
933 TopicPrx
935 {
936  // immutable
937  Ice::ObjectPrx prx;
938  if (_instance->topicReplicaProxy())
939  {
940  prx = _instance->topicReplicaProxy()->ice_identity(_id);
941  }
942  else
943  {
944  prx = _instance->topicAdapter()->createProxy(_id);
945  }
946  return TopicPrx::uncheckedCast(prx);
947 }
948 
949 namespace
950 {
951 
952  class TopicInternalReapCB : public IceUtil::Shared
953  {
954  public:
955  TopicInternalReapCB(const PersistentInstancePtr& instance, Ice::Long generation) :
956  _instance(instance), _generation(generation)
957  {
958  }
959 
960  virtual void
961  exception(const Ice::Exception& ex)
962  {
963  TraceLevelsPtr traceLevels = _instance->traceLevels();
964  if (traceLevels->topic > 0)
965  {
966  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
967  out << "exception when calling `reap' on the master replica: " << ex;
968  }
969  _instance->node()->recovery(_generation);
970  }
971 
972  private:
973  const PersistentInstancePtr _instance;
974  const Ice::Long _generation;
975  };
976 
977 } // namespace
978 
979 void
980 TopicImpl::publish(bool forwarded, const EventDataSeq& events)
981 {
982  TopicInternalPrx masterInternal;
983  Ice::Long generation = -1;
984  Ice::IdentitySeq reap;
985  {
986  // Use cached reads.
987  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
988 
989  //
990  // Copy of the subscriber list so that event publishing can occur
991  // in parallel.
992  //
993  vector<SubscriberPtr> copy;
994  {
995  IceUtil::Mutex::Lock sync(_subscribersMutex);
996  if (_observer)
997  {
998  if (forwarded)
999  {
1000  _observer->forwarded();
1001  }
1002  else
1003  {
1004  _observer->published();
1005  }
1006  }
1007  copy = _subscribers;
1008  }
1009 
1010  //
1011  // Queue each event, gathering a list of those subscribers that
1012  // must be reaped.
1013  //
1014  for (vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
1015  {
1016  if (!(*p)->queue(forwarded, events) && (*p)->reap())
1017  {
1018  reap.push_back((*p)->id());
1019  }
1020  }
1021 
1022  // If there are no subscribers in error then we're done.
1023  if (reap.empty())
1024  {
1025  return;
1026  }
1027  if (!unlock.getMaster())
1028  {
1029  IceUtil::Mutex::Lock sync(_subscribersMutex);
1030  removeSubscribers(reap);
1031  return;
1032  }
1033  masterInternal = TopicInternalPrx::uncheckedCast(unlock.getMaster()->ice_identity(_id));
1034  generation = unlock.generation();
1035  }
1036 
1037  // Tell the master to reap this set of subscribers. This is an
1038  // AMI invocation so it shouldn't block the caller (in the
1039  // typical case) we do it outside of the mutex lock for
1040  // performance reasons.
1041  //
1042  // We must release the cached lock before calling this as the AMI
1043  // call may raise an exception in the caller (that is directly
1044  // call ice_exception) which calls recover() on the node which
1045  // would result in a deadlock since the node is locked.
1046  masterInternal->begin_reap(
1047  reap,
1048  newCallback_TopicInternal_reap(new TopicInternalReapCB(_instance, generation),
1049  &TopicInternalReapCB::exception));
1050 }
1051 
1052 void
1054 {
1055  IceUtil::Mutex::Lock sync(_subscribersMutex);
1056 
1057  TraceLevelsPtr traceLevels = _instance->traceLevels();
1058  if (traceLevels->topic > 0)
1059  {
1060  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1061  out << _name
1062  << ": add replica observer: " << _instance->communicator()->identityToString(record.id);
1063 
1064  if (traceLevels->topic > 1)
1065  {
1066  out << " endpoints: " << IceStormInternal::describeEndpoints(record.obj) << " QoS: ";
1067  for (QoS::const_iterator p = record.theQoS.begin(); p != record.theQoS.end(); ++p)
1068  {
1069  if (p != record.theQoS.begin())
1070  {
1071  out << ',';
1072  }
1073  out << '[' << p->first << "," << p->second << ']';
1074  }
1075  }
1076  out << " llu: " << llu.generation << "/" << llu.iteration;
1077  }
1078 
1079  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
1080  if (p != _subscribers.end())
1081  {
1082  // If the subscriber is already in the database display a
1083  // diagnostic.
1084  TraceLevelsPtr traceLevels = _instance->traceLevels();
1085  if (traceLevels->topic > 0)
1086  {
1087  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1088  out << _instance->communicator()->identityToString(record.id) << ": already subscribed";
1089  }
1090  return;
1091  }
1092 
1093  SubscriberPtr subscriber = Subscriber::create(_instance, record);
1094  try
1095  {
1096  IceDB::ReadWriteTxn txn(_instance->dbEnv());
1097 
1098  SubscriberRecordKey key;
1099  key.topic = _id;
1100  key.id = subscriber->id();
1101 
1102  _subscriberMap.put(txn, key, record);
1103 
1104  // Update the LLU.
1105  _lluMap.put(txn, lluDbKey, llu);
1106 
1107  txn.commit();
1108  }
1109  catch (const IceDB::LMDBException& ex)
1110  {
1111  logError(_instance->communicator(), ex);
1112  throw; // will become UnknownException in caller
1113  }
1114 
1115  _subscribers.push_back(subscriber);
1116 }
1117 
1118 void
1119 TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq& ids)
1120 {
1121  TraceLevelsPtr traceLevels = _instance->traceLevels();
1122  if (traceLevels->topic > 0)
1123  {
1124  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1125  out << _name << ": remove replica observer: ";
1126  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1127  {
1128  if (id != ids.begin())
1129  {
1130  out << ",";
1131  }
1132  out << _instance->communicator()->identityToString(*id);
1133  }
1134  out << " llu: " << llu.generation << "/" << llu.iteration;
1135  }
1136 
1137  IceUtil::Mutex::Lock sync(_subscribersMutex);
1138 
1139  // First remove from the database.
1140  try
1141  {
1142  IceDB::ReadWriteTxn txn(_instance->dbEnv());
1143 
1144  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1145  {
1146  SubscriberRecordKey key;
1147  key.topic = _id;
1148  key.id = *id;
1149 
1150  _subscriberMap.del(txn, key);
1151  }
1152 
1153  _lluMap.put(txn, lluDbKey, llu);
1154 
1155  txn.commit();
1156  }
1157  catch (const IceDB::LMDBException& ex)
1158  {
1159  logError(_instance->communicator(), ex);
1160  throw; // will become UnknownException in caller
1161  }
1162 
1163  // Then remove the subscriber from the subscribers list. If the
1164  // subscriber had a local failure and was removed from the
1165  // subscriber list it could already be gone. That's not a problem.
1166  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1167  {
1168  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
1169  if (p != _subscribers.end())
1170  {
1171  (*p)->destroy();
1172  _subscribers.erase(p);
1173  }
1174  }
1175 }
1176 
1177 void
1179 {
1180  IceUtil::Mutex::Lock sync(_subscribersMutex);
1181 
1182  if (_destroyed)
1183  {
1184  return;
1185  }
1186  _destroyed = true;
1187 
1188  TraceLevelsPtr traceLevels = _instance->traceLevels();
1189  if (traceLevels->topic > 0)
1190  {
1191  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1192  out << _name << ": destroyed";
1193  out << " llu: " << llu.generation << "/" << llu.iteration;
1194  }
1195  destroyInternal(llu, false);
1196 }
1197 
1200 {
1201  return _servant;
1202 }
1203 
1204 void
1206 {
1207  IceUtil::Mutex::Lock sync(_subscribersMutex);
1208  if (_instance->observer())
1209  {
1210  _observer.attach(_instance->observer()->getTopicObserver(
1211  _instance->serviceName(), _name, _observer.get()));
1212  }
1213 }
1214 
1215 void
1217 {
1218  IceUtil::Mutex::Lock sync(_subscribersMutex);
1219  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
1220  ++p)
1221  {
1222  (*p)->updateObserver();
1223  }
1224 }
1225 
1226 LogUpdate
1227 TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
1228 {
1229 
1230  // Clear out the database records related to this topic.
1231  LogUpdate llu;
1232  try
1233  {
1234  IceDB::ReadWriteTxn txn(_instance->dbEnv());
1235 
1236  // Erase all subscriber records and the topic record.
1237  SubscriberRecordKey key;
1238  key.topic = _id;
1239 
1240  SubscriberMapRWCursor cursor(_subscriberMap, txn);
1241  if (cursor.find(key))
1242  {
1243  _subscriberMap.del(txn, key);
1244 
1247  while (cursor.get(k, v, MDB_NEXT) && k.topic == key.topic)
1248  {
1249  _subscriberMap.del(txn, k);
1250  }
1251  }
1252 
1253  // Update the LLU.
1254  if (master)
1255  {
1256  llu = getIncrementedLLU(txn, _lluMap);
1257  }
1258  else
1259  {
1260  llu = origLLU;
1261  _lluMap.put(txn, lluDbKey, llu);
1262  }
1263 
1264  txn.commit();
1265  }
1266  catch (const IceDB::LMDBException& ex)
1267  {
1268  logError(_instance->communicator(), ex);
1269  throw; // will become UnknownException in caller
1270  }
1271 
1272  _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
1273  _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
1274  _instance->topicReaper()->add(_name);
1275 
1276  // Destroy each of the subscribers.
1277  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
1278  ++p)
1279  {
1280  (*p)->destroy();
1281  }
1282  _subscribers.clear();
1283 
1284  _instance->topicAdapter()->remove(_id);
1285 
1286  _servant = 0;
1287 
1288  return llu;
1289 }
1290 
1291 void
1292 TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
1293 {
1294  // First update the database
1295 
1296  LogUpdate llu;
1297  bool found = false;
1298  try
1299  {
1300  IceDB::ReadWriteTxn txn(_instance->dbEnv());
1301 
1302  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1303  {
1304  SubscriberRecordKey key;
1305  key.topic = _id;
1306  key.id = *id;
1307 
1308  if (_subscriberMap.del(txn, key))
1309  {
1310  found = true;
1311  }
1312  }
1313 
1314  if (found)
1315  {
1316  llu = getIncrementedLLU(txn, _lluMap);
1317  txn.commit();
1318  }
1319  else
1320  {
1321  txn.rollback();
1322  }
1323  }
1324  catch (const IceDB::LMDBException& ex)
1325  {
1326  logError(_instance->communicator(), ex);
1327  throw; // will become UnknownException in caller
1328  }
1329 
1330  if (found)
1331  {
1332  // Then remove the subscriber from the subscribers list. Its
1333  // possible that some of these subscribers have already been
1334  // removed (consider, for example, a concurrent reap call from two
1335  // replicas on the same subscriber). To avoid sending unnecessary
1336  // observer updates keep track of the observers that are actually
1337  // removed.
1338  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1339  {
1340  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
1341  if (p != _subscribers.end())
1342  {
1343  (*p)->destroy();
1344  _subscribers.erase(p);
1345  }
1346  }
1347 
1348  _instance->observers()->removeSubscriber(llu, _name, ids);
1349  }
1350 }
IceStorm::SubscriberRecordSeq
::std::vector<::IceStorm::SubscriberRecord > SubscriberRecordSeq
Definition: SubscriberRecord.h:244
IceStorm::TopicImpl::destroy
void destroy()
Definition: TopicI.cpp:818
IceStorm::TopicImpl::getLinkInfoSeq
LinkInfoSeq getLinkInfoSeq() const
Definition: TopicI.cpp:782
IceStorm
Definition: DBTypes.ice:22
IceDB::ReadWriteTxn
Definition: IceDB.h:194
IceStorm::SubscriberRecordKey::topic
::Ice::Identity topic
Definition: SubscriberRecord.h:161
IceStorm::TopicImpl::getName
std::string getName() const
Definition: TopicI.cpp:459
IceStorm::TopicImpl::observerAddSubscriber
void observerAddSubscriber(const IceStormElection::LogUpdate &, const SubscriberRecord &)
Definition: TopicI.cpp:1053
TopicI.h
IceDB::Txn::commit
void commit()
IceStorm::SubscriberRecordKey
The key for persistent subscribers, or topics.
Definition: SubscriberRecord.h:159
IceStorm::EventDataSeq
std::deque<::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
Definition: IceStormInternal.h:528
IceStorm::TopicImpl::getServant
Ice::ObjectPtr getServant() const
Definition: TopicI.cpp:1199
IceStorm::TopicImpl::shutdown
void shutdown()
Definition: TopicI.cpp:766
IceStorm::ReapWouldBlock
Thrown if the reap call would block.
Definition: IceStormInternal.h:530
IceStormElection::LogUpdate
A struct used for marking the last log update.
Definition: LLURecord.h:102
IceStorm::TopicImpl::destroyed
bool destroyed() const
Definition: TopicI.cpp:920
IceStorm::TopicImpl::updateObserver
void updateObserver()
Definition: TopicI.cpp:1205
IceStormElection::CachedReadHelper::getMaster
Ice::ObjectPrx getMaster() const
Definition: NodeI.h:154
IceStorm::Subscriber::create
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
Definition: Subscriber.cpp:522
IceStorm::TopicImpl::unsubscribe
void unsubscribe(const Ice::ObjectPrx &)
Definition: TopicI.cpp:593
IceStorm::TopicImpl::update
void update(const SubscriberRecordSeq &)
Definition: TopicI.cpp:864
IceStorm::TopicImpl::unlink
void unlink(const TopicPrx &)
Definition: TopicI.cpp:704
IceStorm::newCallback_TopicInternal_reap
Callback_TopicInternal_reapPtr newCallback_TopicInternal_reap(const IceUtil::Handle< T > &instance, void(T::*cb)(), void(T::*excb)(const ::Ice::Exception &), void(T::*sentcb)(bool)=0)
Definition: IceStormInternal.h:1298
Util.h
IceStorm::SubscriberRecord::theQoS
::IceStorm::QoS theQoS
Definition: SubscriberRecord.h:239
IceInternal::Handle<::Ice::Communicator >
IceStorm::TopicImpl::getPublisher
Ice::ObjectPrx getPublisher() const
Definition: TopicI.cpp:466
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:233
IceDB::Dbi::put
void put(const ReadWriteTxn &txn, const K &key, const D &data, unsigned int flags=0)
Definition: IceDB.h:261
IceStorm::TopicImpl::reap
void reap(const Ice::IdentitySeq &)
Definition: TopicI.cpp:743
IceStorm::TopicInternal
Internal operations for a topic.
Definition: IceStormInternal.h:878
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:570
IceStormElection::TopicContent::id
Ice::Identity id
The topic identity.
Definition: Election.ice:26
IceStormElection::TopicContent
The contents of topic.
Definition: Election.ice:23
IceStormElection::LogUpdate::generation
::Ice::Long generation
Definition: LLURecord.h:104
copy
Use of this software is granted under one of the following two to be chosen freely by the user Boost Software License Version Marcin Kalicinski Permission is hereby free of to any person or organization obtaining a copy of the software and accompanying documentation covered by this and transmit the and to prepare derivative works of the and to permit third parties to whom the Software is furnished to do all subject to the including the above license this restriction and the following must be included in all copies of the in whole or in and all derivative works of the unless such copies or derivative works are solely in the form of machine executable object code generated by a source language processor THE SOFTWARE IS PROVIDED AS WITHOUT WARRANTY OF ANY EXPRESS OR INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF FITNESS FOR A PARTICULAR TITLE AND NON INFRINGEMENT IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER WHETHER IN TORT OR ARISING OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE The MIT Marcin Kalicinski Permission is hereby free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to copy
Definition: license.txt:39
IceStormElection
Definition: DBTypes.ice:17
IceStormElection::LogUpdate::iteration
::Ice::Long iteration
Definition: LLURecord.h:105
IceStorm::TopicImpl::updateSubscriberObservers
void updateSubscriberObservers()
Definition: TopicI.cpp:1216
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
armarx::ProxyType::topic
@ topic
IceStorm::lluDbKey
const std::string lluDbKey
Definition: Util.h:36
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:918
IceDB::ReadWriteCursor
Definition: IceDB.h:435
IceStorm::TopicImpl::proxy
TopicPrx proxy() const
Definition: TopicI.cpp:934
IceStorm::EventData
The event data.
Definition: IceStormInternal.h:494
IceStorm::SubscriberRecord::obj
::Ice::ObjectPrx obj
Definition: SubscriberRecord.h:238
Subscriber.h
q
#define q
IceStormInternal::describeEndpoints
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition: Util.cpp:51
IceStorm::TopicImpl::publish
void publish(bool, const EventDataSeq &)
Definition: TopicI.cpp:980
IceStorm::TopicImpl::observerRemoveSubscriber
void observerRemoveSubscriber(const IceStormElection::LogUpdate &, const Ice::IdentitySeq &)
Definition: TopicI.cpp:1119
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:478
IceStorm::TopicImpl::getLinkProxy
TopicLinkPrx getLinkProxy()
Definition: TopicI.cpp:627
IceStormElection::CachedReadHelper::generation
Ice::Long generation() const
Definition: NodeI.h:160
IceStormInternal::getIncrementedLLU
IceStormElection::LogUpdate getIncrementedLLU(const IceDB::ReadWriteTxn &, IceStorm::LLUMap &)
Definition: Util.cpp:96
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
IceStorm::SubscriberRecord::cost
::Ice::Int cost
Definition: SubscriberRecord.h:240
IceStorm::SubscriberRecord::theTopic
::IceStorm::TopicPrx theTopic
Definition: SubscriberRecord.h:241
TraceLevels.h
IceStormInternal::identityToTopicName
std::string identityToTopicName(const Ice::Identity &)
Definition: Util.cpp:23
std
Definition: Application.h:66
IceStormElection::CachedReadHelper
Definition: NodeI.h:134
IceStormInternal
Definition: Instance.cpp:26
IceUtil::Handle
Definition: forward_declarations.h:30
IceStorm::SubscriberRecordKey::id
::Ice::Identity id
Definition: SubscriberRecord.h:162
IceStorm::TopicImpl::id
Ice::Identity id() const
Definition: TopicI.cpp:927
IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic >
Observers.h
armarx::VariantType::Int
const VariantTypeId Int
Definition: Variant.h:917
IceStorm::TopicImpl::observerDestroyTopic
void observerDestroyTopic(const IceStormElection::LogUpdate &)
Definition: TopicI.cpp:1178
IceStorm::SubscriberRecord::id
::Ice::Identity id
Definition: SubscriberRecord.h:236
IceDB::Dbi::del
bool del(const ReadWriteTxn &txn, const K &key, const D &data)
Definition: IceDB.h:295
IceStorm::TopicImpl::subscribeAndGetPublisher
Ice::ObjectPrx subscribeAndGetPublisher(const QoS &, const Ice::ObjectPrx &)
Definition: TopicI.cpp:510
armarx::aron::type::ObjectPtr
std::shared_ptr< Object > ObjectPtr
Definition: Object.h:36
IceStorm::TopicImpl::link
void link(const TopicPrx &, Ice::Int)
Definition: TopicI.cpp:639
IceStorm::SubscriberRecord::link
bool link
Definition: SubscriberRecord.h:237
IceStorm::TopicImpl::getNonReplicatedPublisher
Ice::ObjectPrx getNonReplicatedPublisher() const
Definition: TopicI.cpp:477
NodeI.h
IceStorm::TopicImpl::getSubscribers
Ice::IdentitySeq getSubscribers() const
Definition: TopicI.cpp:804
Instance.h
IceDB::LMDBException
Definition: IceDB.h:51
IceStormElection::TopicContent::records
IceStorm::SubscriberRecordSeq records
The topic subscribers.
Definition: Election.ice:28
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
IceInternal::trace
void trace(const char *, const ::Ice::OutputStream &, const ::Ice::LoggerPtr &, const TraceLevelsPtr &)
IceStorm::SubscriberRecord::topicName
::std::string topicName
Definition: SubscriberRecord.h:235
IceStorm::TopicImpl::getContent
IceStormElection::TopicContent getContent() const
Definition: TopicI.cpp:843
IceStormElection::FinishUpdateHelper
Definition: NodeI.h:115