TopicManagerI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <IceStorm/TopicManagerI.h>
11 #include <IceStorm/TopicI.h>
12 #include <IceStorm/TraceLevels.h>
13 #include <IceStorm/Instance.h>
14 #include <IceStorm/NodeI.h>
15 #include <IceStorm/Observers.h>
16 #include <IceStorm/Subscriber.h>
17 #include <IceStorm/Util.h>
18 #include <Ice/SliceChecksums.h>
19 
20 #include <functional>
21 
22 using namespace std;
23 using namespace IceStorm;
24 using namespace IceStormElection;
25 using namespace IceStormInternal;
26 
27 namespace
28 {
29 
30  void
31  logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
32  {
33  Ice::Error error(com->getLogger());
34  error << "LMDB error: " << ex;
35  }
36 
37  class TopicManagerI : public TopicManagerInternal
38  {
39  public:
40 
41  TopicManagerI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) :
42  _instance(instance), _impl(impl)
43  {
44  }
45 
46  virtual TopicPrx create(const string& id, const Ice::Current&)
47  {
48  while (true)
49  {
50  Ice::Long generation;
51  TopicManagerPrx master = getMaster(generation, __FILE__, __LINE__);
52  if (master)
53  {
54  try
55  {
56  return master->create(id);
57  }
58  catch (const Ice::ConnectFailedException&)
59  {
60  _instance->node()->recovery(generation);
61  continue;
62  }
63  catch (const Ice::TimeoutException&)
64  {
65  _instance->node()->recovery(generation);
66  continue;
67  }
68  }
69  else
70  {
71  FinishUpdateHelper unlock(_instance->node());
72  return _impl->create(id);
73  }
74  }
75  }
76 
77  virtual TopicPrx retrieve(const string& id, const Ice::Current&) const
78  {
79  // Use cached reads.
80  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
81  return _impl->retrieve(id);
82  }
83 
84  virtual TopicDict retrieveAll(const Ice::Current&) const
85  {
86  // Use cached reads.
87  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
88  return _impl->retrieveAll();
89  }
90 
91  virtual Ice::SliceChecksumDict getSliceChecksums(const Ice::Current&) const
92  {
93  // This doesn't require the replication to be running.
94  return Ice::sliceChecksums();
95  }
96 
97  virtual NodePrx getReplicaNode(const Ice::Current&) const
98  {
99  // This doesn't require the replication to be running.
100  return _instance->nodeProxy();
101  }
102 
103  private:
104 
105  TopicManagerPrx getMaster(Ice::Long& generation, const char* file, int line) const
106  {
107  NodeIPtr node = _instance->node();
108  if (node)
109  {
110  return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line));
111  }
112  else
113  {
114  return TopicManagerPrx();
115  }
116  }
117 
118  const PersistentInstancePtr _instance;
119  const TopicManagerImplPtr _impl;
120  };
121 
122  class ReplicaObserverI : public ReplicaObserver
123  {
124  public:
125 
126  ReplicaObserverI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) :
127  _instance(instance),
128  _impl(impl)
129  {
130  }
131 
132  virtual void init(const LogUpdate& llu, const TopicContentSeq& content, const Ice::Current&)
133  {
134  NodeIPtr node = _instance->node();
135  if (node)
136  {
137  node->checkObserverInit(llu.generation);
138  }
139  _impl->observerInit(llu, content);
140  }
141 
142  virtual void createTopic(const LogUpdate& llu, const string& name, const Ice::Current&)
143  {
144  try
145  {
146  ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
147  _impl->observerCreateTopic(llu, name);
148  }
149  catch (const ObserverInconsistencyException& e)
150  {
151  Ice::Warning warn(_instance->traceLevels()->logger);
152  warn << "ReplicaObserverI::create: ObserverInconsistencyException: " << e.reason;
153  _instance->node()->recovery(llu.generation);
154  throw;
155  }
156  }
157 
158  virtual void destroyTopic(const LogUpdate& llu, const string& name, const Ice::Current&)
159  {
160  try
161  {
162  ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
163  _impl->observerDestroyTopic(llu, name);
164  }
165  catch (const ObserverInconsistencyException& e)
166  {
167  Ice::Warning warn(_instance->traceLevels()->logger);
168  warn << "ReplicaObserverI::destroy: ObserverInconsistencyException: " << e.reason;
169  _instance->node()->recovery(llu.generation);
170  throw;
171  }
172  }
173 
174  virtual void addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec,
175  const Ice::Current&)
176  {
177  try
178  {
179  ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
180  _impl->observerAddSubscriber(llu, name, rec);
181  }
182  catch (const ObserverInconsistencyException& e)
183  {
184  Ice::Warning warn(_instance->traceLevels()->logger);
185  warn << "ReplicaObserverI::add: ObserverInconsistencyException: " << e.reason;
186  _instance->node()->recovery(llu.generation);
187  throw;
188  }
189  }
190 
191  virtual void removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id,
192  const Ice::Current&)
193  {
194  try
195  {
196  ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
197  _impl->observerRemoveSubscriber(llu, name, id);
198  }
199  catch (const ObserverInconsistencyException& e)
200  {
201  Ice::Warning warn(_instance->traceLevels()->logger);
202  warn << "ReplicaObserverI::remove: ObserverInconsistencyException: " << e.reason;
203  _instance->node()->recovery(llu.generation);
204  throw;
205  }
206  }
207 
208  private:
209 
210  const PersistentInstancePtr _instance;
211  const TopicManagerImplPtr _impl;
212  };
213 
214  class TopicManagerSyncI : public TopicManagerSync
215  {
216  public:
217 
218  TopicManagerSyncI(const TopicManagerImplPtr& impl) :
219  _impl(impl)
220  {
221  }
222 
223  virtual void getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&)
224  {
225  _impl->getContent(llu, content);
226  }
227 
228  private:
229 
230  const TopicManagerImplPtr _impl;
231  };
232 
233 }
234 
235 TopicManagerImpl::TopicManagerImpl(const PersistentInstancePtr& instance) :
236  _instance(instance),
237  _lluMap(instance->lluMap()),
238  _subscriberMap(instance->subscriberMap())
239 {
240  try
241  {
242  __setNoDelete(true);
243 
244  if (_instance->observer())
245  {
246  _instance->observer()->setObserverUpdater(this);
247  }
248 
249  // TODO: If we want to improve the performance of the
250  // non-replicated case we could allocate a null-topic manager impl
251  // here.
252  _managerImpl = new TopicManagerI(instance, this);
253 
254  // If there is no node adapter we don't need to start the
255  // observer, nor sync since we're not replicating.
256  if (_instance->nodeAdapter())
257  {
258  _observerImpl = new ReplicaObserverI(instance, this);
259  _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl);
260  _syncImpl = new TopicManagerSyncI(this);
261  _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
262  }
263 
264  {
265  IceDB::ReadWriteTxn txn(_instance->dbEnv());
266 
267  // Ensure that the llu counter is present in the log.
268  LogUpdate empty = {0, 0};
269  _instance->lluMap().put(txn, lluDbKey, empty);
270 
271  // Recreate each of the topics.
274 
275  SubscriberMapRWCursor cursor(_subscriberMap, txn);
276  if (cursor.get(k, v, MDB_FIRST))
277  {
278  bool moreTopics = false;
279  do
280  {
281  // This record has to be a place holder record, otherwise
282  // there is a database bug.
283  assert(k.id.name.empty() && k.id.category.empty());
284 
285  Ice::Identity topic = k.topic;
286 
287  bool moreTopics;
288  SubscriberRecordSeq content;
289  while ((moreTopics = cursor.get(k, v, MDB_NEXT)) && k.topic == topic)
290  {
291  content.push_back(v);
292  }
293 
294  string name = identityToTopicName(topic);
295  installTopic(name, topic, false, content);
296  }
297  while (moreTopics);
298  }
299 
300  txn.commit();
301  }
302  }
303  catch (...)
304  {
305  shutdown();
306  __setNoDelete(false);
307  throw;
308  }
309  __setNoDelete(false);
310 }
311 
312 TopicPrx
313 TopicManagerImpl::create(const string& name)
314 {
315  Lock sync(*this);
316 
317  reap();
318  if (_topics.find(name) != _topics.end())
319  {
320  TopicExists ex;
321  ex.name = name;
322  throw ex;
323  }
324 
325  // Identity is <instanceName>/topic.<topicname>
326  Ice::Identity id = nameToIdentity(_instance, name);
327 
328  LogUpdate llu;
329  try
330  {
331  IceDB::ReadWriteTxn txn(_instance->dbEnv());
332 
334  key.topic = id;
335  SubscriberRecord rec;
336  rec.link = false;
337  rec.cost = 0;
338 
339  _subscriberMap.put(txn, key, rec);
340 
341  llu = getIncrementedLLU(txn, _lluMap);
342 
343  txn.commit();
344  }
345  catch (const IceDB::LMDBException& ex)
346  {
347  logError(_instance->communicator(), ex);
348  throw; // will become UnknownException in caller
349  }
350 
351  _instance->observers()->createTopic(llu, name);
352  return installTopic(name, id, true);
353 }
354 
355 TopicPrx
356 TopicManagerImpl::retrieve(const string& name) const
357 {
358  Lock sync(*this);
359 
360  TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this);
361  This->reap();
362 
363  map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
364  if (p == _topics.end())
365  {
366  NoSuchTopic ex;
367  ex.name = name;
368  throw ex;
369  }
370 
371  return p->second->proxy();
372 }
373 
374 TopicDict
376 {
377  Lock sync(*this);
378 
379  TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this);
380  This->reap();
381 
382  TopicDict all;
383  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
384  {
385  all.insert(TopicDict::value_type(p->first, p->second->proxy()));
386  }
387 
388  return all;
389 }
390 
391 void
393 {
394  Lock sync(*this);
395 
396  TraceLevelsPtr traceLevels = _instance->traceLevels();
397  if (traceLevels->topicMgr > 0)
398  {
399  Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
400  out << "init";
401  for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
402  {
403  out << " topic: " << _instance->communicator()->identityToString(p->id) << " subscribers: ";
404  for (SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
405  {
406  if (q != p->records.begin())
407  {
408  out << ",";
409  }
410  out << _instance->communicator()->identityToString(q->id);
411  if (traceLevels->topicMgr > 1)
412  {
413  out << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
414  }
415  }
416  }
417  }
418 
419  // First we update the database state, and then we update our
420  // internal state.
421  try
422  {
423  IceDB::ReadWriteTxn txn(_instance->dbEnv());
424 
425  _lluMap.put(txn, lluDbKey, llu);
426 
427  _subscriberMap.clear(txn);
428 
429  for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
430  {
432  key.topic = p->id;
433  SubscriberRecord rec;
434  rec.link = false;
435  rec.cost = 0;
436 
437  _subscriberMap.put(txn, key, rec);
438 
439  for (SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end(); ++q)
440  {
442  key.topic = p->id;
443  key.id = q->id;
444 
445  _subscriberMap.put(txn, key, *q);
446  }
447  }
448  txn.commit();
449  }
450  catch (const IceDB::LMDBException& ex)
451  {
452  logError(_instance->communicator(), ex);
453  throw; // will become UnknownException in caller
454  }
455 
456  // We do this with two scans. The first runs through the topics
457  // that we have and removes those not in the init list. The second
458  // runs through the init list and either adds the ones that don't
459  // exist, or updates those that do.
460 
461  map<string, TopicImplPtr>::iterator p = _topics.begin();
462  while (p != _topics.end())
463  {
464  TopicContentSeq::const_iterator q;
465  for (q = content.begin(); q != content.end(); ++q)
466  {
467  if (q->id == p->second->id())
468  {
469  break;
470  }
471  }
472 
473  if (q == content.end())
474  {
475  // Note that this destroy should not remove anything from
476  // the database since we've already synced up the db
477  // state.
478  //
479  // TODO: We could short circuit the database operations in
480  // the topic by calling a third form of destroy.
481  p->second->observerDestroyTopic(llu);
482  _topics.erase(p++);
483  }
484  else
485  {
486  ++p;
487  }
488  }
489 
490  // Now run through the contents updating the topics that do exist,
491  // and creating those that do not.
492  for (TopicContentSeq::const_iterator q = content.begin(); q != content.end(); ++q)
493  {
494  string name = identityToTopicName(q->id);
495  map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
496  if (p == _topics.end())
497  {
498  installTopic(name, q->id, true, q->records);
499  }
500  else
501  {
502  p->second->update(q->records);
503  }
504  }
505  // Clear the set of observers.
506  _instance->observers()->clear();
507 }
508 
509 void
510 TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name)
511 {
512  Lock sync(*this);
513  Ice::Identity id = nameToIdentity(_instance, name);
514 
515  try
516  {
517  IceDB::ReadWriteTxn txn(_instance->dbEnv());
518 
520  key.topic = id;
521  SubscriberRecord rec;
522  rec.link = false;
523  rec.cost = 0;
524 
525  if (_subscriberMap.find(txn, key))
526  {
527  throw ObserverInconsistencyException("topic exists: " + name);
528  }
529  _subscriberMap.put(txn, key, rec);
530 
531  _lluMap.put(txn, lluDbKey, llu);
532 
533  txn.commit();
534  }
535  catch (const IceDB::LMDBException& ex)
536  {
537  logError(_instance->communicator(), ex);
538  throw; // will become UnknownException in caller
539  }
540 
541  installTopic(name, id, true);
542 }
543 
544 void
545 TopicManagerImpl::observerDestroyTopic(const LogUpdate& llu, const string& name)
546 {
547  Lock sync(*this);
548 
549  map<string, TopicImplPtr>::iterator q = _topics.find(name);
550  if (q == _topics.end())
551  {
552  throw ObserverInconsistencyException("no topic: " + name);
553  }
554  q->second->observerDestroyTopic(llu);
555 
556  _topics.erase(q);
557 }
558 
559 void
560 TopicManagerImpl::observerAddSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& record)
561 {
562  TopicImplPtr topic;
563  {
564  Lock sync(*this);
565 
566  map<string, TopicImplPtr>::iterator q = _topics.find(name);
567  if (q == _topics.end())
568  {
569  throw ObserverInconsistencyException("no topic: " + name);
570  }
571  assert(q != _topics.end());
572  topic = q->second;
573  }
574  topic->observerAddSubscriber(llu, record);
575 }
576 
577 void
578 TopicManagerImpl::observerRemoveSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id)
579 {
580  TopicImplPtr topic;
581  {
582  Lock sync(*this);
583 
584  map<string, TopicImplPtr>::iterator q = _topics.find(name);
585  if (q == _topics.end())
586  {
587  throw ObserverInconsistencyException("no topic: " + name);
588  }
589  assert(q != _topics.end());
590  topic = q->second;
591  }
592  topic->observerRemoveSubscriber(llu, id);
593 }
594 
595 void
597 {
598  {
599  Lock sync(*this);
600  reap();
601  }
602 
603  try
604  {
605  content.clear();
606  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
607  {
608  TopicContent rec = p->second->getContent();
609  content.push_back(rec);
610  }
611 
612  IceDB::ReadOnlyTxn txn(_instance->dbEnv());
613  _lluMap.get(txn, lluDbKey, llu);
614  }
615  catch (const IceDB::LMDBException& ex)
616  {
617  logError(_instance->communicator(), ex);
618  throw; // will become UnknownException in caller
619  }
620 }
621 
622 LogUpdate
624 {
625  LogUpdate llu;
626  try
627  {
628  IceDB::ReadOnlyTxn txn(_instance->dbEnv());
629  _lluMap.get(txn, lluDbKey, llu);
630  }
631  catch (const IceDB::LMDBException& ex)
632  {
633  logError(_instance->communicator(), ex);
634  throw; // will become UnknownException in caller
635  }
636 
637  return llu;
638 }
639 
640 void
641 TopicManagerImpl::sync(const Ice::ObjectPrx& master)
642 {
643  TopicManagerSyncPrx sync = TopicManagerSyncPrx::uncheckedCast(master);
644 
645  LogUpdate llu;
646  TopicContentSeq content;
647  sync->getContent(llu, content);
648 
649  observerInit(llu, content);
650 }
651 
652 void
653 TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& llu)
654 {
655  Lock sync(*this);
656 
657  reap();
658 
659  TopicContentSeq content;
660 
661  // Update the database llu. This prevents the following case:
662  //
663  // Three replicas 1, 2, 3. 3 is the master. It accepts a change
664  // (say A=10, old value 9), writes to disk and then crashes. Now 2
665  // becomes the master. The client can ask this master for A and it
666  // returns 9. Now 3 comes back online, it has the last database
667  // state, so it syncs this state with 1, 2. The client will now
668  // magically get A==10. The solution here is when a new master is
669  // elected and gets the latest database state it immediately
670  // updates the llu stamp.
671  //
672  try
673  {
674  content.clear();
675 
676  IceDB::ReadWriteTxn txn(_instance->dbEnv());
677 
678  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
679  {
680  TopicContent rec = p->second->getContent();
681  content.push_back(rec);
682  }
683 
684  _lluMap.put(txn, lluDbKey, llu);
685 
686  txn.commit();
687  }
688  catch (const IceDB::LMDBException& ex)
689  {
690  logError(_instance->communicator(), ex);
691  throw; // will become UnknownException in caller
692  }
693 
694  // Now initialize the observers.
695  _instance->observers()->init(slaves, llu, content);
696 }
697 
698 Ice::ObjectPrx
700 {
701  return _observer;
702 }
703 
704 Ice::ObjectPrx
706 {
707  return _sync;
708 }
709 
710 void
712 {
713  //
714  // Always called with mutex locked.
715  //
716  // Lock sync(*this);
717  //
718  vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
719  for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
720  {
721  map<string, TopicImplPtr>::iterator q = _topics.find(*p);
722  if (q != _topics.end() && q->second->destroyed())
723  {
724  _topics.erase(q);
725  }
726  }
727 }
728 
729 void
731 {
732  Lock sync(*this);
733 
734  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
735  {
736  p->second->shutdown();
737  }
738  _topics.clear();
739 
740  _observerImpl = 0;
741  _syncImpl = 0;
742  _managerImpl = 0;
743 }
744 
747 {
748  return _managerImpl;
749 }
750 
751 void
752 TopicManagerImpl::updateTopicObservers()
753 {
754  Lock sync(*this);
755  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
756  {
757  p->second->updateObserver();
758  }
759 }
760 
761 void
762 TopicManagerImpl::updateSubscriberObservers()
763 {
764  Lock sync(*this);
765  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
766  {
767  p->second->updateSubscriberObservers();
768  }
769 }
770 
771 TopicPrx
772 TopicManagerImpl::installTopic(const string& name, const Ice::Identity& id, bool create,
773  const IceStorm::SubscriberRecordSeq& subscribers)
774 {
775  //
776  // Called by constructor or with 'this' mutex locked.
777  //
778  TraceLevelsPtr traceLevels = _instance->traceLevels();
779  if (traceLevels->topicMgr > 0)
780  {
781  Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
782  if (create)
783  {
784  out << "creating new topic \"" << name << "\". id: "
785  << _instance->communicator()->identityToString(id)
786  << " subscribers: ";
787  for (SubscriberRecordSeq::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q)
788  {
789  if (q != subscribers.begin())
790  {
791  out << ",";
792  }
793  if (traceLevels->topicMgr > 1)
794  {
795  out << _instance->communicator()->identityToString(q->id)
796  << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
797  }
798  }
799  }
800  else
801  {
802  out << "loading topic \"" << name << "\" from database. id: "
803  << _instance->communicator()->identityToString(id)
804  << " subscribers: ";
805  for (SubscriberRecordSeq::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q)
806  {
807  if (q != subscribers.begin())
808  {
809  out << ",";
810  }
811  if (traceLevels->topicMgr > 1)
812  {
813  out << _instance->communicator()->identityToString(q->id)
814  << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
815  }
816  }
817  }
818  }
819 
820  // Create topic implementation
821  TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers);
822 
823  // The identity is the name of the Topic.
824  _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));
825  _instance->topicAdapter()->add(topicImpl->getServant(), id);
826  return topicImpl->proxy();
827 }
IceStorm::SubscriberRecordSeq
::std::vector< ::IceStorm::SubscriberRecord > SubscriberRecordSeq
Definition: SubscriberRecord.h:225
IceStorm
Definition: DBTypes.ice:22
IceStorm::TopicManagerImpl::reap
void reap()
Definition: TopicManagerI.cpp:711
IceStormElection::TopicManagerSync
Interface used to sync topics.
Definition: Election.ice:125
IceDB::ReadWriteTxn
Definition: IceDB.h:206
IceStorm::SubscriberRecordKey::topic
::Ice::Identity topic
Definition: SubscriberRecord.h:151
IceStorm::TopicManagerImpl::shutdown
void shutdown()
Definition: TopicManagerI.cpp:730
TopicI.h
IceDB::Txn::commit
void commit()
IceStorm::SubscriberRecordKey
The key for persistent subscribers, or topics.
Definition: SubscriberRecord.h:149
IceStorm::TopicManagerInternal
Internal operations for a topic manager.
Definition: IceStormInternal.h:773
IceStorm::TopicManagerImpl::getServant
Ice::ObjectPtr getServant() const
Definition: TopicManagerI.cpp:746
IceStormElection::LogUpdate
A struct used for marking the last log update.
Definition: LLURecord.h:100
IceDB::ReadOnlyTxn
Definition: IceDB.h:194
IceStorm::TopicManagerImpl::getSync
virtual Ice::ObjectPrx getSync() const
Definition: TopicManagerI.cpp:705
IceStormElection::ReplicaObserver
The replica observer.
Definition: Election.ice:42
IceDB::Dbi::find
bool find(const Txn &txn, const K &key) const
Definition: IceDB.h:291
TopicManagerI.h
IceStorm::TopicManagerImpl::getContent
void getContent(IceStormElection::LogUpdate &, IceStormElection::TopicContentSeq &)
Definition: TopicManagerI.cpp:596
IceStorm::TopicManagerImpl::initMaster
virtual void initMaster(const std::set< IceStormElection::GroupNodeInfo > &, const IceStormElection::LogUpdate &)
Definition: TopicManagerI.cpp:653
Util.h
cxxopts::empty
bool empty(const std::string &s)
Definition: cxxopts.hpp:255
IceStormElection::ObserverInconsistencyException
Thrown if an observer detects an inconsistency.
Definition: Election.ice:35
IceInternal::Handle< ::Ice::Communicator >
IceDB::Cursor::get
bool get(K &key, D &data, MDB_cursor_op op)
Definition: IceDB.h:400
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:214
IceStormElection::ObserverInconsistencyException::reason
string reason
The reason for the inconsistency.
Definition: Election.ice:38
IceStormElection::TopicContentSeq
::std::vector< ::IceStormElection::TopicContent > TopicContentSeq
A sequence of topic content.
Definition: Election.h:819
IceDB::Dbi::put
void put(const ReadWriteTxn &txn, const K &key, const D &data, unsigned int flags=0)
Definition: IceDB.h:273
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:523
IceStormElection::TopicContent
The contents of topic.
Definition: Election.ice:23
armarx::armem::client::query_fns::all
auto all()
Definition: query_fns.h:10
IceStormElection::LogUpdate::generation
::Ice::Long generation
Definition: LLURecord.h:102
IceStormElection
Definition: DBTypes.ice:17
IceStorm::TopicManagerPrx
::IceInternal::ProxyHandle< ::IceProxy::IceStorm::TopicManager > TopicManagerPrx
Definition: IceManager.h:69
IceStorm::TopicManagerImpl::sync
virtual void sync(const Ice::ObjectPrx &)
Definition: TopicManagerI.cpp:641
IceStorm::TopicImpl
Definition: TopicI.h:30
IceStorm::lluDbKey
const std::string lluDbKey
Definition: Util.h:31
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:917
IceStorm::TopicManagerImpl::observerInit
void observerInit(const IceStormElection::LogUpdate &, const IceStormElection::TopicContentSeq &)
Definition: TopicManagerI.cpp:392
IceDB::ReadWriteCursor
Definition: IceDB.h:448
IceStorm::TopicManagerImpl::observerAddSubscriber
void observerAddSubscriber(const IceStormElection::LogUpdate &, const std::string &, const IceStorm::SubscriberRecord &)
Definition: TopicManagerI.cpp:560
Subscriber.h
IceStorm::TopicManagerImpl::getObserver
virtual Ice::ObjectPrx getObserver() const
Definition: TopicManagerI.cpp:699
q
#define q
IceStormInternal::describeEndpoints
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition: Util.cpp:51
IceStorm::TopicManagerImpl
Definition: TopicManagerI.h:37
IceStorm::TopicManagerImpl::observerDestroyTopic
void observerDestroyTopic(const IceStormElection::LogUpdate &, const std::string &)
Definition: TopicManagerI.cpp:545
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:438
IceStormInternal::nameToIdentity
Ice::Identity nameToIdentity(const IceStorm::InstancePtr &, const std::string &)
IceDB::Dbi::get
bool get(const Txn &txn, const K &key, D &data) const
Definition: IceDB.h:256
IceStormInternal::getIncrementedLLU
IceStormElection::LogUpdate getIncrementedLLU(const IceDB::ReadWriteTxn &, IceStorm::LLUMap &)
Definition: Util.cpp:94
IceStorm::TopicManagerImpl::retrieve
TopicPrx retrieve(const std::string &) const
Definition: TopicManagerI.cpp:356
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
IceStorm::SubscriberRecord::cost
::Ice::Int cost
Definition: SubscriberRecord.h:221
TraceLevels.h
IceStormInternal::identityToTopicName
std::string identityToTopicName(const Ice::Identity &)
Definition: Util.cpp:23
std
Definition: Application.h:66
IceStormElection::CachedReadHelper
Definition: NodeI.h:127
IceStormInternal
Definition: Instance.cpp:27
IceUtil::Handle
Definition: forward_declarations.h:29
IceStorm::SubscriberRecordKey::id
::Ice::Identity id
Definition: SubscriberRecord.h:152
IceStorm::TopicManagerImpl::observerCreateTopic
void observerCreateTopic(const IceStormElection::LogUpdate &, const std::string &)
Definition: TopicManagerI.cpp:510
IceInternal::ProxyHandle< ::IceProxy::IceStorm::Topic >
Observers.h
armarx::aron::type::ObjectPtr
std::shared_ptr< Object > ObjectPtr
Definition: Object.h:36
IceStorm::TopicManagerImpl::observerRemoveSubscriber
void observerRemoveSubscriber(const IceStormElection::LogUpdate &, const std::string &, const Ice::IdentitySeq &)
Definition: TopicManagerI.cpp:578
IceStorm::SubscriberRecord::link
bool link
Definition: SubscriberRecord.h:218
IceStormElection::ObserverUpdateHelper
Definition: NodeI.h:173
IceStorm::TopicManagerImpl::retrieveAll
TopicDict retrieveAll() const
Definition: TopicManagerI.cpp:375
IceStorm::TopicManagerImpl::create
TopicPrx create(const std::string &)
Definition: TopicManagerI.cpp:313
NodeI.h
Instance.h
IceDB::LMDBException
Definition: IceDB.h:51
IceStorm::TopicManagerImpl::getLastLogUpdate
virtual IceStormElection::LogUpdate getLastLogUpdate() const
Definition: TopicManagerI.cpp:623
IceDB::DbiBase::clear
void clear(const ReadWriteTxn &)
IceStormElection::FinishUpdateHelper
Definition: NodeI.h:105