TopicManagerI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <functional>
11 
12 #include <Ice/SliceChecksums.h>
13 #include <IceStorm/Instance.h>
14 #include <IceStorm/NodeI.h>
15 #include <IceStorm/Observers.h>
16 #include <IceStorm/Subscriber.h>
17 #include <IceStorm/TopicI.h>
18 #include <IceStorm/TopicManagerI.h>
19 #include <IceStorm/TraceLevels.h>
20 #include <IceStorm/Util.h>
21 
22 using namespace std;
23 using namespace IceStorm;
24 using namespace IceStormElection;
25 using namespace IceStormInternal;
26 
27 namespace
28 {
29 
30  void
31  logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
32  {
33  Ice::Error error(com->getLogger());
34  error << "LMDB error: " << ex;
35  }
36 
37  class TopicManagerI : public TopicManagerInternal
38  {
39  public:
40  TopicManagerI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) :
41  _instance(instance), _impl(impl)
42  {
43  }
44 
45  virtual TopicPrx
46  create(const string& id, const Ice::Current&)
47  {
48  while (true)
49  {
50  Ice::Long generation;
51  TopicManagerPrx master = getMaster(generation, __FILE__, __LINE__);
52  if (master)
53  {
54  try
55  {
56  return master->create(id);
57  }
58  catch (const Ice::ConnectFailedException&)
59  {
60  _instance->node()->recovery(generation);
61  continue;
62  }
63  catch (const Ice::TimeoutException&)
64  {
65  _instance->node()->recovery(generation);
66  continue;
67  }
68  }
69  else
70  {
71  FinishUpdateHelper unlock(_instance->node());
72  return _impl->create(id);
73  }
74  }
75  }
76 
77  virtual TopicPrx
78  retrieve(const string& id, const Ice::Current&) const
79  {
80  // Use cached reads.
81  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
82  return _impl->retrieve(id);
83  }
84 
85  virtual TopicDict
86  retrieveAll(const Ice::Current&) const
87  {
88  // Use cached reads.
89  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
90  return _impl->retrieveAll();
91  }
92 
93  virtual Ice::SliceChecksumDict
94  getSliceChecksums(const Ice::Current&) const
95  {
96  // This doesn't require the replication to be running.
97  return Ice::sliceChecksums();
98  }
99 
100  virtual NodePrx
101  getReplicaNode(const Ice::Current&) const
102  {
103  // This doesn't require the replication to be running.
104  return _instance->nodeProxy();
105  }
106 
107  private:
109  getMaster(Ice::Long& generation, const char* file, int line) const
110  {
111  NodeIPtr node = _instance->node();
112  if (node)
113  {
114  return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line));
115  }
116  else
117  {
118  return TopicManagerPrx();
119  }
120  }
121 
122  const PersistentInstancePtr _instance;
123  const TopicManagerImplPtr _impl;
124  };
125 
126  class ReplicaObserverI : public ReplicaObserver
127  {
128  public:
129  ReplicaObserverI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) :
130  _instance(instance), _impl(impl)
131  {
132  }
133 
134  virtual void
135  init(const LogUpdate& llu, const TopicContentSeq& content, const Ice::Current&)
136  {
137  NodeIPtr node = _instance->node();
138  if (node)
139  {
140  node->checkObserverInit(llu.generation);
141  }
142  _impl->observerInit(llu, content);
143  }
144 
145  virtual void
146  createTopic(const LogUpdate& llu, const string& name, const Ice::Current&)
147  {
148  try
149  {
150  ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
151  _impl->observerCreateTopic(llu, name);
152  }
153  catch (const ObserverInconsistencyException& e)
154  {
155  Ice::Warning warn(_instance->traceLevels()->logger);
156  warn << "ReplicaObserverI::create: ObserverInconsistencyException: " << e.reason;
157  _instance->node()->recovery(llu.generation);
158  throw;
159  }
160  }
161 
162  virtual void
163  destroyTopic(const LogUpdate& llu, const string& name, const Ice::Current&)
164  {
165  try
166  {
167  ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
168  _impl->observerDestroyTopic(llu, name);
169  }
170  catch (const ObserverInconsistencyException& e)
171  {
172  Ice::Warning warn(_instance->traceLevels()->logger);
173  warn << "ReplicaObserverI::destroy: ObserverInconsistencyException: " << e.reason;
174  _instance->node()->recovery(llu.generation);
175  throw;
176  }
177  }
178 
179  virtual void
180  addSubscriber(const LogUpdate& llu,
181  const string& name,
182  const SubscriberRecord& rec,
183  const Ice::Current&)
184  {
185  try
186  {
187  ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
188  _impl->observerAddSubscriber(llu, name, rec);
189  }
190  catch (const ObserverInconsistencyException& e)
191  {
192  Ice::Warning warn(_instance->traceLevels()->logger);
193  warn << "ReplicaObserverI::add: ObserverInconsistencyException: " << e.reason;
194  _instance->node()->recovery(llu.generation);
195  throw;
196  }
197  }
198 
199  virtual void
200  removeSubscriber(const LogUpdate& llu,
201  const string& name,
202  const Ice::IdentitySeq& id,
203  const Ice::Current&)
204  {
205  try
206  {
207  ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
208  _impl->observerRemoveSubscriber(llu, name, id);
209  }
210  catch (const ObserverInconsistencyException& e)
211  {
212  Ice::Warning warn(_instance->traceLevels()->logger);
213  warn << "ReplicaObserverI::remove: ObserverInconsistencyException: " << e.reason;
214  _instance->node()->recovery(llu.generation);
215  throw;
216  }
217  }
218 
219  private:
220  const PersistentInstancePtr _instance;
221  const TopicManagerImplPtr _impl;
222  };
223 
224  class TopicManagerSyncI : public TopicManagerSync
225  {
226  public:
227  TopicManagerSyncI(const TopicManagerImplPtr& impl) : _impl(impl)
228  {
229  }
230 
231  virtual void
232  getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&)
233  {
234  _impl->getContent(llu, content);
235  }
236 
237  private:
238  const TopicManagerImplPtr _impl;
239  };
240 
241 } // namespace
242 
243 TopicManagerImpl::TopicManagerImpl(const PersistentInstancePtr& instance) :
244  _instance(instance), _lluMap(instance->lluMap()), _subscriberMap(instance->subscriberMap())
245 {
246  try
247  {
248  __setNoDelete(true);
249 
250  if (_instance->observer())
251  {
252  _instance->observer()->setObserverUpdater(this);
253  }
254 
255  // TODO: If we want to improve the performance of the
256  // non-replicated case we could allocate a null-topic manager impl
257  // here.
258  _managerImpl = new TopicManagerI(instance, this);
259 
260  // If there is no node adapter we don't need to start the
261  // observer, nor sync since we're not replicating.
262  if (_instance->nodeAdapter())
263  {
264  _observerImpl = new ReplicaObserverI(instance, this);
265  _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl);
266  _syncImpl = new TopicManagerSyncI(this);
267  _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
268  }
269 
270  {
271  IceDB::ReadWriteTxn txn(_instance->dbEnv());
272 
273  // Ensure that the llu counter is present in the log.
274  LogUpdate empty = {0, 0};
275  _instance->lluMap().put(txn, lluDbKey, empty);
276 
277  // Recreate each of the topics.
280 
281  SubscriberMapRWCursor cursor(_subscriberMap, txn);
282  if (cursor.get(k, v, MDB_FIRST))
283  {
284  bool moreTopics = false;
285  do
286  {
287  // This record has to be a place holder record, otherwise
288  // there is a database bug.
289  assert(k.id.name.empty() && k.id.category.empty());
290 
291  Ice::Identity topic = k.topic;
292 
293  bool moreTopics;
294  SubscriberRecordSeq content;
295  while ((moreTopics = cursor.get(k, v, MDB_NEXT)) && k.topic == topic)
296  {
297  content.push_back(v);
298  }
299 
300  string name = identityToTopicName(topic);
301  installTopic(name, topic, false, content);
302  } while (moreTopics);
303  }
304 
305  txn.commit();
306  }
307  }
308  catch (...)
309  {
310  shutdown();
311  __setNoDelete(false);
312  throw;
313  }
314  __setNoDelete(false);
315 }
316 
317 TopicPrx
318 TopicManagerImpl::create(const string& name)
319 {
320  Lock sync(*this);
321 
322  reap();
323  if (_topics.find(name) != _topics.end())
324  {
325  TopicExists ex;
326  ex.name = name;
327  throw ex;
328  }
329 
330  // Identity is <instanceName>/topic.<topicname>
331  Ice::Identity id = nameToIdentity(_instance, name);
332 
333  LogUpdate llu;
334  try
335  {
336  IceDB::ReadWriteTxn txn(_instance->dbEnv());
337 
339  key.topic = id;
340  SubscriberRecord rec;
341  rec.link = false;
342  rec.cost = 0;
343 
344  _subscriberMap.put(txn, key, rec);
345 
346  llu = getIncrementedLLU(txn, _lluMap);
347 
348  txn.commit();
349  }
350  catch (const IceDB::LMDBException& ex)
351  {
352  logError(_instance->communicator(), ex);
353  throw; // will become UnknownException in caller
354  }
355 
356  _instance->observers()->createTopic(llu, name);
357  return installTopic(name, id, true);
358 }
359 
360 TopicPrx
361 TopicManagerImpl::retrieve(const string& name) const
362 {
363  Lock sync(*this);
364 
365  TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this);
366  This->reap();
367 
368  map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
369  if (p == _topics.end())
370  {
371  NoSuchTopic ex;
372  ex.name = name;
373  throw ex;
374  }
375 
376  return p->second->proxy();
377 }
378 
379 TopicDict
381 {
382  Lock sync(*this);
383 
384  TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this);
385  This->reap();
386 
387  TopicDict all;
388  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
389  {
390  all.insert(TopicDict::value_type(p->first, p->second->proxy()));
391  }
392 
393  return all;
394 }
395 
396 void
398 {
399  Lock sync(*this);
400 
401  TraceLevelsPtr traceLevels = _instance->traceLevels();
402  if (traceLevels->topicMgr > 0)
403  {
404  Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
405  out << "init";
406  for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
407  {
408  out << " topic: " << _instance->communicator()->identityToString(p->id)
409  << " subscribers: ";
410  for (SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end();
411  ++q)
412  {
413  if (q != p->records.begin())
414  {
415  out << ",";
416  }
417  out << _instance->communicator()->identityToString(q->id);
418  if (traceLevels->topicMgr > 1)
419  {
420  out << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
421  }
422  }
423  }
424  }
425 
426  // First we update the database state, and then we update our
427  // internal state.
428  try
429  {
430  IceDB::ReadWriteTxn txn(_instance->dbEnv());
431 
432  _lluMap.put(txn, lluDbKey, llu);
433 
434  _subscriberMap.clear(txn);
435 
436  for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
437  {
439  key.topic = p->id;
440  SubscriberRecord rec;
441  rec.link = false;
442  rec.cost = 0;
443 
444  _subscriberMap.put(txn, key, rec);
445 
446  for (SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end();
447  ++q)
448  {
450  key.topic = p->id;
451  key.id = q->id;
452 
453  _subscriberMap.put(txn, key, *q);
454  }
455  }
456  txn.commit();
457  }
458  catch (const IceDB::LMDBException& ex)
459  {
460  logError(_instance->communicator(), ex);
461  throw; // will become UnknownException in caller
462  }
463 
464  // We do this with two scans. The first runs through the topics
465  // that we have and removes those not in the init list. The second
466  // runs through the init list and either adds the ones that don't
467  // exist, or updates those that do.
468 
469  map<string, TopicImplPtr>::iterator p = _topics.begin();
470  while (p != _topics.end())
471  {
472  TopicContentSeq::const_iterator q;
473  for (q = content.begin(); q != content.end(); ++q)
474  {
475  if (q->id == p->second->id())
476  {
477  break;
478  }
479  }
480 
481  if (q == content.end())
482  {
483  // Note that this destroy should not remove anything from
484  // the database since we've already synced up the db
485  // state.
486  //
487  // TODO: We could short circuit the database operations in
488  // the topic by calling a third form of destroy.
489  p->second->observerDestroyTopic(llu);
490  _topics.erase(p++);
491  }
492  else
493  {
494  ++p;
495  }
496  }
497 
498  // Now run through the contents updating the topics that do exist,
499  // and creating those that do not.
500  for (TopicContentSeq::const_iterator q = content.begin(); q != content.end(); ++q)
501  {
502  string name = identityToTopicName(q->id);
503  map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
504  if (p == _topics.end())
505  {
506  installTopic(name, q->id, true, q->records);
507  }
508  else
509  {
510  p->second->update(q->records);
511  }
512  }
513  // Clear the set of observers.
514  _instance->observers()->clear();
515 }
516 
517 void
518 TopicManagerImpl::observerCreateTopic(const LogUpdate& llu, const string& name)
519 {
520  Lock sync(*this);
521  Ice::Identity id = nameToIdentity(_instance, name);
522 
523  try
524  {
525  IceDB::ReadWriteTxn txn(_instance->dbEnv());
526 
528  key.topic = id;
529  SubscriberRecord rec;
530  rec.link = false;
531  rec.cost = 0;
532 
533  if (_subscriberMap.find(txn, key))
534  {
535  throw ObserverInconsistencyException("topic exists: " + name);
536  }
537  _subscriberMap.put(txn, key, rec);
538 
539  _lluMap.put(txn, lluDbKey, llu);
540 
541  txn.commit();
542  }
543  catch (const IceDB::LMDBException& ex)
544  {
545  logError(_instance->communicator(), ex);
546  throw; // will become UnknownException in caller
547  }
548 
549  installTopic(name, id, true);
550 }
551 
552 void
553 TopicManagerImpl::observerDestroyTopic(const LogUpdate& llu, const string& name)
554 {
555  Lock sync(*this);
556 
557  map<string, TopicImplPtr>::iterator q = _topics.find(name);
558  if (q == _topics.end())
559  {
560  throw ObserverInconsistencyException("no topic: " + name);
561  }
562  q->second->observerDestroyTopic(llu);
563 
564  _topics.erase(q);
565 }
566 
567 void
569  const string& name,
570  const SubscriberRecord& record)
571 {
572  TopicImplPtr topic;
573  {
574  Lock sync(*this);
575 
576  map<string, TopicImplPtr>::iterator q = _topics.find(name);
577  if (q == _topics.end())
578  {
579  throw ObserverInconsistencyException("no topic: " + name);
580  }
581  assert(q != _topics.end());
582  topic = q->second;
583  }
584  topic->observerAddSubscriber(llu, record);
585 }
586 
587 void
589  const string& name,
590  const Ice::IdentitySeq& id)
591 {
592  TopicImplPtr topic;
593  {
594  Lock sync(*this);
595 
596  map<string, TopicImplPtr>::iterator q = _topics.find(name);
597  if (q == _topics.end())
598  {
599  throw ObserverInconsistencyException("no topic: " + name);
600  }
601  assert(q != _topics.end());
602  topic = q->second;
603  }
604  topic->observerRemoveSubscriber(llu, id);
605 }
606 
607 void
609 {
610  {
611  Lock sync(*this);
612  reap();
613  }
614 
615  try
616  {
617  content.clear();
618  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
619  {
620  TopicContent rec = p->second->getContent();
621  content.push_back(rec);
622  }
623 
624  IceDB::ReadOnlyTxn txn(_instance->dbEnv());
625  _lluMap.get(txn, lluDbKey, llu);
626  }
627  catch (const IceDB::LMDBException& ex)
628  {
629  logError(_instance->communicator(), ex);
630  throw; // will become UnknownException in caller
631  }
632 }
633 
634 LogUpdate
636 {
637  LogUpdate llu;
638  try
639  {
640  IceDB::ReadOnlyTxn txn(_instance->dbEnv());
641  _lluMap.get(txn, lluDbKey, llu);
642  }
643  catch (const IceDB::LMDBException& ex)
644  {
645  logError(_instance->communicator(), ex);
646  throw; // will become UnknownException in caller
647  }
648 
649  return llu;
650 }
651 
652 void
653 TopicManagerImpl::sync(const Ice::ObjectPrx& master)
654 {
655  TopicManagerSyncPrx sync = TopicManagerSyncPrx::uncheckedCast(master);
656 
657  LogUpdate llu;
658  TopicContentSeq content;
659  sync->getContent(llu, content);
660 
661  observerInit(llu, content);
662 }
663 
664 void
665 TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& llu)
666 {
667  Lock sync(*this);
668 
669  reap();
670 
671  TopicContentSeq content;
672 
673  // Update the database llu. This prevents the following case:
674  //
675  // Three replicas 1, 2, 3. 3 is the master. It accepts a change
676  // (say A=10, old value 9), writes to disk and then crashes. Now 2
677  // becomes the master. The client can ask this master for A and it
678  // returns 9. Now 3 comes back online, it has the last database
679  // state, so it syncs this state with 1, 2. The client will now
680  // magically get A==10. The solution here is when a new master is
681  // elected and gets the latest database state it immediately
682  // updates the llu stamp.
683  //
684  try
685  {
686  content.clear();
687 
688  IceDB::ReadWriteTxn txn(_instance->dbEnv());
689 
690  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
691  {
692  TopicContent rec = p->second->getContent();
693  content.push_back(rec);
694  }
695 
696  _lluMap.put(txn, lluDbKey, llu);
697 
698  txn.commit();
699  }
700  catch (const IceDB::LMDBException& ex)
701  {
702  logError(_instance->communicator(), ex);
703  throw; // will become UnknownException in caller
704  }
705 
706  // Now initialize the observers.
707  _instance->observers()->init(slaves, llu, content);
708 }
709 
710 Ice::ObjectPrx
712 {
713  return _observer;
714 }
715 
716 Ice::ObjectPrx
718 {
719  return _sync;
720 }
721 
722 void
724 {
725  //
726  // Always called with mutex locked.
727  //
728  // Lock sync(*this);
729  //
730  vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
731  for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
732  {
733  map<string, TopicImplPtr>::iterator q = _topics.find(*p);
734  if (q != _topics.end() && q->second->destroyed())
735  {
736  _topics.erase(q);
737  }
738  }
739 }
740 
741 void
743 {
744  Lock sync(*this);
745 
746  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
747  {
748  p->second->shutdown();
749  }
750  _topics.clear();
751 
752  _observerImpl = 0;
753  _syncImpl = 0;
754  _managerImpl = 0;
755 }
756 
759 {
760  return _managerImpl;
761 }
762 
763 void
764 TopicManagerImpl::updateTopicObservers()
765 {
766  Lock sync(*this);
767  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
768  {
769  p->second->updateObserver();
770  }
771 }
772 
773 void
774 TopicManagerImpl::updateSubscriberObservers()
775 {
776  Lock sync(*this);
777  for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
778  {
779  p->second->updateSubscriberObservers();
780  }
781 }
782 
783 TopicPrx
784 TopicManagerImpl::installTopic(const string& name,
785  const Ice::Identity& id,
786  bool create,
787  const IceStorm::SubscriberRecordSeq& subscribers)
788 {
789  //
790  // Called by constructor or with 'this' mutex locked.
791  //
792  TraceLevelsPtr traceLevels = _instance->traceLevels();
793  if (traceLevels->topicMgr > 0)
794  {
795  Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
796  if (create)
797  {
798  out << "creating new topic \"" << name
799  << "\". id: " << _instance->communicator()->identityToString(id)
800  << " subscribers: ";
801  for (SubscriberRecordSeq::const_iterator q = subscribers.begin();
802  q != subscribers.end();
803  ++q)
804  {
805  if (q != subscribers.begin())
806  {
807  out << ",";
808  }
809  if (traceLevels->topicMgr > 1)
810  {
811  out << _instance->communicator()->identityToString(q->id)
812  << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
813  }
814  }
815  }
816  else
817  {
818  out << "loading topic \"" << name
819  << "\" from database. id: " << _instance->communicator()->identityToString(id)
820  << " subscribers: ";
821  for (SubscriberRecordSeq::const_iterator q = subscribers.begin();
822  q != subscribers.end();
823  ++q)
824  {
825  if (q != subscribers.begin())
826  {
827  out << ",";
828  }
829  if (traceLevels->topicMgr > 1)
830  {
831  out << _instance->communicator()->identityToString(q->id)
832  << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
833  }
834  }
835  }
836  }
837 
838  // Create topic implementation
839  TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers);
840 
841  // The identity is the name of the Topic.
842  _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));
843  _instance->topicAdapter()->add(topicImpl->getServant(), id);
844  return topicImpl->proxy();
845 }
IceStorm::SubscriberRecordSeq
::std::vector<::IceStorm::SubscriberRecord > SubscriberRecordSeq
Definition: SubscriberRecord.h:244
IceStorm
Definition: DBTypes.ice:22
IceStorm::TopicManagerImpl::reap
void reap()
Definition: TopicManagerI.cpp:723
IceStormElection::TopicManagerSync
Interface used to sync topics.
Definition: Election.ice:123
IceDB::ReadWriteTxn
Definition: IceDB.h:194
IceStorm::SubscriberRecordKey::topic
::Ice::Identity topic
Definition: SubscriberRecord.h:161
IceStorm::TopicManagerImpl::shutdown
void shutdown()
Definition: TopicManagerI.cpp:742
TopicI.h
IceDB::Txn::commit
void commit()
IceStorm::SubscriberRecordKey
The key for persistent subscribers, or topics.
Definition: SubscriberRecord.h:159
IceStorm::TopicManagerInternal
Internal operations for a topic manager.
Definition: IceStormInternal.h:921
IceStorm::TopicManagerImpl::getServant
Ice::ObjectPtr getServant() const
Definition: TopicManagerI.cpp:758
IceStormElection::LogUpdate
A struct used for marking the last log update.
Definition: LLURecord.h:102
IceDB::ReadOnlyTxn
Definition: IceDB.h:183
IceStorm::TopicManagerImpl::getSync
virtual Ice::ObjectPrx getSync() const
Definition: TopicManagerI.cpp:717
IceStormElection::ReplicaObserver
The replica observer.
Definition: Election.ice:43
IceDB::Dbi::find
bool find(const Txn &txn, const K &key) const
Definition: IceDB.h:280
TopicManagerI.h
IceStorm::TopicManagerImpl::getContent
void getContent(IceStormElection::LogUpdate &, IceStormElection::TopicContentSeq &)
Definition: TopicManagerI.cpp:608
IceStorm::TopicManagerImpl::initMaster
virtual void initMaster(const std::set< IceStormElection::GroupNodeInfo > &, const IceStormElection::LogUpdate &)
Definition: TopicManagerI.cpp:665
Util.h
cxxopts::empty
bool empty(const std::string &s)
Definition: cxxopts.hpp:234
IceStormElection::ObserverInconsistencyException
Thrown if an observer detects an inconsistency.
Definition: Election.ice:36
IceInternal::Handle<::Ice::Communicator >
IceDB::Cursor::get
bool get(K &key, D &data, MDB_cursor_op op)
Definition: IceDB.h:386
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:233
IceStormElection::ObserverInconsistencyException::reason
string reason
The reason for the inconsistency.
Definition: Election.ice:39
IceStormElection::TopicContentSeq
::std::vector<::IceStormElection::TopicContent > TopicContentSeq
A sequence of topic content.
Definition: Election.h:1225
IceDB::Dbi::put
void put(const ReadWriteTxn &txn, const K &key, const D &data, unsigned int flags=0)
Definition: IceDB.h:261
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:570
IceStormElection::TopicContent
The contents of topic.
Definition: Election.ice:23
armarx::armem::client::query_fns::all
auto all()
Definition: query_fns.h:9
IceStormElection::LogUpdate::generation
::Ice::Long generation
Definition: LLURecord.h:104
IceStormElection
Definition: DBTypes.ice:17
IceStorm::TopicManagerImpl::sync
virtual void sync(const Ice::ObjectPrx &)
Definition: TopicManagerI.cpp:653
IceStorm::TopicImpl
Definition: TopicI.h:31
IceStorm::lluDbKey
const std::string lluDbKey
Definition: Util.h:36
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:918
IceStorm::TopicManagerImpl::observerInit
void observerInit(const IceStormElection::LogUpdate &, const IceStormElection::TopicContentSeq &)
Definition: TopicManagerI.cpp:397
IceDB::ReadWriteCursor
Definition: IceDB.h:435
IceStorm::TopicManagerImpl::observerAddSubscriber
void observerAddSubscriber(const IceStormElection::LogUpdate &, const std::string &, const IceStorm::SubscriberRecord &)
Definition: TopicManagerI.cpp:568
Subscriber.h
IceStorm::TopicManagerImpl::getObserver
virtual Ice::ObjectPrx getObserver() const
Definition: TopicManagerI.cpp:711
q
#define q
IceStormInternal::describeEndpoints
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition: Util.cpp:51
IceStorm::TopicManagerImpl
Definition: TopicManagerI.h:35
IceStorm::TopicManagerImpl::observerDestroyTopic
void observerDestroyTopic(const IceStormElection::LogUpdate &, const std::string &)
Definition: TopicManagerI.cpp:553
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:478
IceStorm::TopicManagerPrx
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicManager > TopicManagerPrx
Definition: IceManager.h:69
IceStormInternal::nameToIdentity
Ice::Identity nameToIdentity(const IceStorm::InstancePtr &, const std::string &)
IceDB::Dbi::get
bool get(const Txn &txn, const K &key, D &data) const
Definition: IceDB.h:243
IceStormInternal::getIncrementedLLU
IceStormElection::LogUpdate getIncrementedLLU(const IceDB::ReadWriteTxn &, IceStorm::LLUMap &)
Definition: Util.cpp:96
IceStorm::TopicManagerImpl::retrieve
TopicPrx retrieve(const std::string &) const
Definition: TopicManagerI.cpp:361
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
IceStorm::SubscriberRecord::cost
::Ice::Int cost
Definition: SubscriberRecord.h:240
TraceLevels.h
IceStormInternal::identityToTopicName
std::string identityToTopicName(const Ice::Identity &)
Definition: Util.cpp:23
std
Definition: Application.h:66
IceStormElection::CachedReadHelper
Definition: NodeI.h:134
IceStormInternal
Definition: Instance.cpp:26
IceUtil::Handle
Definition: forward_declarations.h:30
IceStorm::SubscriberRecordKey::id
::Ice::Identity id
Definition: SubscriberRecord.h:162
IceStorm::TopicManagerImpl::observerCreateTopic
void observerCreateTopic(const IceStormElection::LogUpdate &, const std::string &)
Definition: TopicManagerI.cpp:518
IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic >
Observers.h
armarx::aron::type::ObjectPtr
std::shared_ptr< Object > ObjectPtr
Definition: Object.h:36
IceStorm::TopicManagerImpl::observerRemoveSubscriber
void observerRemoveSubscriber(const IceStormElection::LogUpdate &, const std::string &, const Ice::IdentitySeq &)
Definition: TopicManagerI.cpp:588
IceStorm::SubscriberRecord::link
bool link
Definition: SubscriberRecord.h:237
IceStormElection::ObserverUpdateHelper
Definition: NodeI.h:177
IceStorm::TopicManagerImpl::retrieveAll
TopicDict retrieveAll() const
Definition: TopicManagerI.cpp:380
IceStorm::TopicManagerImpl::create
TopicPrx create(const std::string &)
Definition: TopicManagerI.cpp:318
NodeI.h
Instance.h
IceDB::LMDBException
Definition: IceDB.h:51
IceStorm::TopicManagerImpl::getLastLogUpdate
virtual IceStormElection::LogUpdate getLastLogUpdate() const
Definition: TopicManagerI.cpp:635
IceDB::DbiBase::clear
void clear(const ReadWriteTxn &)
IceStormElection::FinishUpdateHelper
Definition: NodeI.h:115