TopicManagerI.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <functional>
11
12#include <Ice/SliceChecksums.h>
13#include <IceStorm/Instance.h>
14#include <IceStorm/NodeI.h>
15#include <IceStorm/Observers.h>
16#include <IceStorm/Subscriber.h>
17#include <IceStorm/TopicI.h>
20#include <IceStorm/Util.h>
21
22using namespace std;
23using namespace IceStorm;
24using namespace IceStormElection;
25using namespace IceStormInternal;
26
27namespace
28{
29
30 void
31 logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
32 {
33 Ice::Error error(com->getLogger());
34 error << "LMDB error: " << ex;
35 }
36
37 class TopicManagerI : public TopicManagerInternal
38 {
39 public:
40 TopicManagerI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) :
41 _instance(instance), _impl(impl)
42 {
43 }
44
45 virtual TopicPrx
46 create(const string& id, const Ice::Current&)
47 {
48 while (true)
49 {
50 Ice::Long generation;
51 TopicManagerPrx master = getMaster(generation, __FILE__, __LINE__);
52 if (master)
53 {
54 try
55 {
56 return master->create(id);
57 }
58 catch (const Ice::ConnectFailedException&)
59 {
60 _instance->node()->recovery(generation);
61 continue;
62 }
63 catch (const Ice::TimeoutException&)
64 {
65 _instance->node()->recovery(generation);
66 continue;
67 }
68 }
69 else
70 {
71 FinishUpdateHelper unlock(_instance->node());
72 return _impl->create(id);
73 }
74 }
75 }
76
77 virtual TopicPrx
78 retrieve(const string& id, const Ice::Current&) const
79 {
80 // Use cached reads.
81 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
82 return _impl->retrieve(id);
83 }
84
85 virtual TopicDict
86 retrieveAll(const Ice::Current&) const
87 {
88 // Use cached reads.
89 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
90 return _impl->retrieveAll();
91 }
92
93 virtual Ice::SliceChecksumDict
94 getSliceChecksums(const Ice::Current&) const
95 {
96 // This doesn't require the replication to be running.
97 return Ice::sliceChecksums();
98 }
99
100 virtual NodePrx
101 getReplicaNode(const Ice::Current&) const
102 {
103 // This doesn't require the replication to be running.
104 return _instance->nodeProxy();
105 }
106
107 private:
109 getMaster(Ice::Long& generation, const char* file, int line) const
110 {
111 NodeIPtr node = _instance->node();
112 if (node)
113 {
114 return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line));
115 }
116 else
117 {
118 return TopicManagerPrx();
119 }
120 }
121
122 const PersistentInstancePtr _instance;
123 const TopicManagerImplPtr _impl;
124 };
125
126 class ReplicaObserverI : public ReplicaObserver
127 {
128 public:
129 ReplicaObserverI(const PersistentInstancePtr& instance, const TopicManagerImplPtr& impl) :
130 _instance(instance), _impl(impl)
131 {
132 }
133
134 virtual void
135 init(const LogUpdate& llu, const TopicContentSeq& content, const Ice::Current&)
136 {
137 NodeIPtr node = _instance->node();
138 if (node)
139 {
140 node->checkObserverInit(llu.generation);
141 }
142 _impl->observerInit(llu, content);
143 }
144
145 virtual void
146 createTopic(const LogUpdate& llu, const string& name, const Ice::Current&)
147 {
148 try
149 {
150 ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
151 _impl->observerCreateTopic(llu, name);
152 }
153 catch (const ObserverInconsistencyException& e)
154 {
155 Ice::Warning warn(_instance->traceLevels()->logger);
156 warn << "ReplicaObserverI::create: ObserverInconsistencyException: " << e.reason;
157 _instance->node()->recovery(llu.generation);
158 throw;
159 }
160 }
161
162 virtual void
163 destroyTopic(const LogUpdate& llu, const string& name, const Ice::Current&)
164 {
165 try
166 {
167 ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
168 _impl->observerDestroyTopic(llu, name);
169 }
170 catch (const ObserverInconsistencyException& e)
171 {
172 Ice::Warning warn(_instance->traceLevels()->logger);
173 warn << "ReplicaObserverI::destroy: ObserverInconsistencyException: " << e.reason;
174 _instance->node()->recovery(llu.generation);
175 throw;
176 }
177 }
178
179 virtual void
180 addSubscriber(const LogUpdate& llu,
181 const string& name,
182 const SubscriberRecord& rec,
183 const Ice::Current&)
184 {
185 try
186 {
187 ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
188 _impl->observerAddSubscriber(llu, name, rec);
189 }
190 catch (const ObserverInconsistencyException& e)
191 {
192 Ice::Warning warn(_instance->traceLevels()->logger);
193 warn << "ReplicaObserverI::add: ObserverInconsistencyException: " << e.reason;
194 _instance->node()->recovery(llu.generation);
195 throw;
196 }
197 }
198
199 virtual void
200 removeSubscriber(const LogUpdate& llu,
201 const string& name,
202 const Ice::IdentitySeq& id,
203 const Ice::Current&)
204 {
205 try
206 {
207 ObserverUpdateHelper unlock(_instance->node(), llu.generation, __FILE__, __LINE__);
208 _impl->observerRemoveSubscriber(llu, name, id);
209 }
210 catch (const ObserverInconsistencyException& e)
211 {
212 Ice::Warning warn(_instance->traceLevels()->logger);
213 warn << "ReplicaObserverI::remove: ObserverInconsistencyException: " << e.reason;
214 _instance->node()->recovery(llu.generation);
215 throw;
216 }
217 }
218
219 private:
220 const PersistentInstancePtr _instance;
221 const TopicManagerImplPtr _impl;
222 };
223
224 class TopicManagerSyncI : public TopicManagerSync
225 {
226 public:
227 TopicManagerSyncI(const TopicManagerImplPtr& impl) : _impl(impl)
228 {
229 }
230
231 virtual void
232 getContent(LogUpdate& llu, TopicContentSeq& content, const Ice::Current&)
233 {
234 _impl->getContent(llu, content);
235 }
236
237 private:
238 const TopicManagerImplPtr _impl;
239 };
240
241} // namespace
242
244 _instance(instance), _lluMap(instance->lluMap()), _subscriberMap(instance->subscriberMap())
245{
246 try
247 {
248 __setNoDelete(true);
249
250 if (_instance->observer())
251 {
252 _instance->observer()->setObserverUpdater(this);
253 }
254
255 // TODO: If we want to improve the performance of the
256 // non-replicated case we could allocate a null-topic manager impl
257 // here.
258 _managerImpl = new TopicManagerI(instance, this);
259
260 // If there is no node adapter we don't need to start the
261 // observer, nor sync since we're not replicating.
262 if (_instance->nodeAdapter())
263 {
264 _observerImpl = new ReplicaObserverI(instance, this);
265 _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl);
266 _syncImpl = new TopicManagerSyncI(this);
267 _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
268 }
269
270 {
271 IceDB::ReadWriteTxn txn(_instance->dbEnv());
272
273 // Ensure that the llu counter is present in the log.
274 LogUpdate empty = {0, 0};
275 _instance->lluMap().put(txn, lluDbKey, empty);
276
277 // Recreate each of the topics.
280
281 SubscriberMapRWCursor cursor(_subscriberMap, txn);
282 if (cursor.get(k, v, MDB_FIRST))
283 {
284 bool moreTopics = false;
285 do
286 {
287 // This record has to be a place holder record, otherwise
288 // there is a database bug.
289 assert(k.id.name.empty() && k.id.category.empty());
290
291 Ice::Identity topic = k.topic;
292
293 bool moreTopics;
294 SubscriberRecordSeq content;
295 while ((moreTopics = cursor.get(k, v, MDB_NEXT)) && k.topic == topic)
296 {
297 content.push_back(v);
298 }
299
300 string name = identityToTopicName(topic);
301 installTopic(name, topic, false, content);
302 } while (moreTopics);
303 }
304
305 txn.commit();
306 }
307 }
308 catch (...)
309 {
310 shutdown();
311 __setNoDelete(false);
312 throw;
313 }
314 __setNoDelete(false);
315}
316
318TopicManagerImpl::create(const string& name)
319{
320 Lock sync(*this);
321
322 reap();
323 if (_topics.find(name) != _topics.end())
324 {
325 TopicExists ex;
326 ex.name = name;
327 throw ex;
328 }
329
330 // Identity is <instanceName>/topic.<topicname>
331 Ice::Identity id = nameToIdentity(_instance, name);
332
333 LogUpdate llu;
334 try
335 {
336 IceDB::ReadWriteTxn txn(_instance->dbEnv());
337
339 key.topic = id;
341 rec.link = false;
342 rec.cost = 0;
343
344 _subscriberMap.put(txn, key, rec);
345
346 llu = getIncrementedLLU(txn, _lluMap);
347
348 txn.commit();
349 }
350 catch (const IceDB::LMDBException& ex)
351 {
352 logError(_instance->communicator(), ex);
353 throw; // will become UnknownException in caller
354 }
355
356 _instance->observers()->createTopic(llu, name);
357 return installTopic(name, id, true);
358}
359
361TopicManagerImpl::retrieve(const string& name) const
362{
363 Lock sync(*this);
364
365 TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this);
366 This->reap();
367
368 map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
369 if (p == _topics.end())
370 {
371 NoSuchTopic ex;
372 ex.name = name;
373 throw ex;
374 }
375
376 return p->second->proxy();
377}
378
379TopicDict
381{
382 Lock sync(*this);
383
384 TopicManagerImpl* This = const_cast<TopicManagerImpl*>(this);
385 This->reap();
386
387 TopicDict all;
388 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
389 {
390 all.insert(TopicDict::value_type(p->first, p->second->proxy()));
391 }
392
393 return all;
394}
395
396void
398{
399 Lock sync(*this);
400
401 TraceLevelsPtr traceLevels = _instance->traceLevels();
402 if (traceLevels->topicMgr > 0)
403 {
404 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
405 out << "init";
406 for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
407 {
408 out << " topic: " << _instance->communicator()->identityToString(p->id)
409 << " subscribers: ";
410 for (SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end();
411 ++q)
412 {
413 if (q != p->records.begin())
414 {
415 out << ",";
416 }
417 out << _instance->communicator()->identityToString(q->id);
418 if (traceLevels->topicMgr > 1)
419 {
420 out << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
421 }
422 }
423 }
424 }
425
426 // First we update the database state, and then we update our
427 // internal state.
428 try
429 {
430 IceDB::ReadWriteTxn txn(_instance->dbEnv());
431
432 _lluMap.put(txn, lluDbKey, llu);
433
434 _subscriberMap.clear(txn);
435
436 for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
437 {
439 key.topic = p->id;
441 rec.link = false;
442 rec.cost = 0;
443
444 _subscriberMap.put(txn, key, rec);
445
446 for (SubscriberRecordSeq::const_iterator q = p->records.begin(); q != p->records.end();
447 ++q)
448 {
450 key.topic = p->id;
451 key.id = q->id;
452
453 _subscriberMap.put(txn, key, *q);
454 }
455 }
456 txn.commit();
457 }
458 catch (const IceDB::LMDBException& ex)
459 {
460 logError(_instance->communicator(), ex);
461 throw; // will become UnknownException in caller
462 }
463
464 // We do this with two scans. The first runs through the topics
465 // that we have and removes those not in the init list. The second
466 // runs through the init list and either adds the ones that don't
467 // exist, or updates those that do.
468
469 map<string, TopicImplPtr>::iterator p = _topics.begin();
470 while (p != _topics.end())
471 {
472 TopicContentSeq::const_iterator q;
473 for (q = content.begin(); q != content.end(); ++q)
474 {
475 if (q->id == p->second->id())
476 {
477 break;
478 }
479 }
480
481 if (q == content.end())
482 {
483 // Note that this destroy should not remove anything from
484 // the database since we've already synced up the db
485 // state.
486 //
487 // TODO: We could short circuit the database operations in
488 // the topic by calling a third form of destroy.
489 p->second->observerDestroyTopic(llu);
490 _topics.erase(p++);
491 }
492 else
493 {
494 ++p;
495 }
496 }
497
498 // Now run through the contents updating the topics that do exist,
499 // and creating those that do not.
500 for (TopicContentSeq::const_iterator q = content.begin(); q != content.end(); ++q)
501 {
502 string name = identityToTopicName(q->id);
503 map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
504 if (p == _topics.end())
505 {
506 installTopic(name, q->id, true, q->records);
507 }
508 else
509 {
510 p->second->update(q->records);
511 }
512 }
513 // Clear the set of observers.
514 _instance->observers()->clear();
515}
516
517void
519{
520 Lock sync(*this);
521 Ice::Identity id = nameToIdentity(_instance, name);
522
523 try
524 {
525 IceDB::ReadWriteTxn txn(_instance->dbEnv());
526
528 key.topic = id;
530 rec.link = false;
531 rec.cost = 0;
532
533 if (_subscriberMap.find(txn, key))
534 {
535 throw ObserverInconsistencyException("topic exists: " + name);
536 }
537 _subscriberMap.put(txn, key, rec);
538
539 _lluMap.put(txn, lluDbKey, llu);
540
541 txn.commit();
542 }
543 catch (const IceDB::LMDBException& ex)
544 {
545 logError(_instance->communicator(), ex);
546 throw; // will become UnknownException in caller
547 }
548
549 installTopic(name, id, true);
550}
551
552void
554{
555 Lock sync(*this);
556
557 map<string, TopicImplPtr>::iterator q = _topics.find(name);
558 if (q == _topics.end())
559 {
560 throw ObserverInconsistencyException("no topic: " + name);
561 }
562 q->second->observerDestroyTopic(llu);
563
564 _topics.erase(q);
565}
566
567void
569 const string& name,
570 const SubscriberRecord& record)
571{
572 TopicImplPtr topic;
573 {
574 Lock sync(*this);
575
576 map<string, TopicImplPtr>::iterator q = _topics.find(name);
577 if (q == _topics.end())
578 {
579 throw ObserverInconsistencyException("no topic: " + name);
580 }
581 assert(q != _topics.end());
582 topic = q->second;
583 }
584 topic->observerAddSubscriber(llu, record);
585}
586
587void
589 const string& name,
590 const Ice::IdentitySeq& id)
591{
592 TopicImplPtr topic;
593 {
594 Lock sync(*this);
595
596 map<string, TopicImplPtr>::iterator q = _topics.find(name);
597 if (q == _topics.end())
598 {
599 throw ObserverInconsistencyException("no topic: " + name);
600 }
601 assert(q != _topics.end());
602 topic = q->second;
603 }
604 topic->observerRemoveSubscriber(llu, id);
605}
606
607void
609{
610 {
611 Lock sync(*this);
612 reap();
613 }
614
615 try
616 {
617 content.clear();
618 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
619 {
620 TopicContent rec = p->second->getContent();
621 content.push_back(rec);
622 }
623
624 IceDB::ReadOnlyTxn txn(_instance->dbEnv());
625 _lluMap.get(txn, lluDbKey, llu);
626 }
627 catch (const IceDB::LMDBException& ex)
628 {
629 logError(_instance->communicator(), ex);
630 throw; // will become UnknownException in caller
631 }
632}
633
636{
637 LogUpdate llu;
638 try
639 {
640 IceDB::ReadOnlyTxn txn(_instance->dbEnv());
641 _lluMap.get(txn, lluDbKey, llu);
642 }
643 catch (const IceDB::LMDBException& ex)
644 {
645 logError(_instance->communicator(), ex);
646 throw; // will become UnknownException in caller
647 }
648
649 return llu;
650}
651
652void
653TopicManagerImpl::sync(const Ice::ObjectPrx& master)
654{
655 TopicManagerSyncPrx sync = TopicManagerSyncPrx::uncheckedCast(master);
656
657 LogUpdate llu;
658 TopicContentSeq content;
659 sync->getContent(llu, content);
660
661 observerInit(llu, content);
662}
663
664void
665TopicManagerImpl::initMaster(const set<GroupNodeInfo>& slaves, const LogUpdate& llu)
666{
667 Lock sync(*this);
668
669 reap();
670
671 TopicContentSeq content;
672
673 // Update the database llu. This prevents the following case:
674 //
675 // Three replicas 1, 2, 3. 3 is the master. It accepts a change
676 // (say A=10, old value 9), writes to disk and then crashes. Now 2
677 // becomes the master. The client can ask this master for A and it
678 // returns 9. Now 3 comes back online, it has the last database
679 // state, so it syncs this state with 1, 2. The client will now
680 // magically get A==10. The solution here is when a new master is
681 // elected and gets the latest database state it immediately
682 // updates the llu stamp.
683 //
684 try
685 {
686 content.clear();
687
688 IceDB::ReadWriteTxn txn(_instance->dbEnv());
689
690 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
691 {
692 TopicContent rec = p->second->getContent();
693 content.push_back(rec);
694 }
695
696 _lluMap.put(txn, lluDbKey, llu);
697
698 txn.commit();
699 }
700 catch (const IceDB::LMDBException& ex)
701 {
702 logError(_instance->communicator(), ex);
703 throw; // will become UnknownException in caller
704 }
705
706 // Now initialize the observers.
707 _instance->observers()->init(slaves, llu, content);
708}
709
710Ice::ObjectPrx
712{
713 return _observer;
714}
715
716Ice::ObjectPrx
718{
719 return _sync;
720}
721
722void
724{
725 //
726 // Always called with mutex locked.
727 //
728 // Lock sync(*this);
729 //
730 vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
731 for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
732 {
733 map<string, TopicImplPtr>::iterator q = _topics.find(*p);
734 if (q != _topics.end() && q->second->destroyed())
735 {
736 _topics.erase(q);
737 }
738 }
739}
740
741void
743{
744 Lock sync(*this);
745
746 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
747 {
748 p->second->shutdown();
749 }
750 _topics.clear();
751
752 _observerImpl = 0;
753 _syncImpl = 0;
754 _managerImpl = 0;
755}
756
757Ice::ObjectPtr
759{
760 return _managerImpl;
761}
762
763void
764TopicManagerImpl::updateTopicObservers()
765{
766 Lock sync(*this);
767 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
768 {
769 p->second->updateObserver();
770 }
771}
772
773void
774TopicManagerImpl::updateSubscriberObservers()
775{
776 Lock sync(*this);
777 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
778 {
779 p->second->updateSubscriberObservers();
780 }
781}
782
784TopicManagerImpl::installTopic(const string& name,
785 const Ice::Identity& id,
786 bool create,
787 const IceStorm::SubscriberRecordSeq& subscribers)
788{
789 //
790 // Called by constructor or with 'this' mutex locked.
791 //
792 TraceLevelsPtr traceLevels = _instance->traceLevels();
793 if (traceLevels->topicMgr > 0)
794 {
795 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
796 if (create)
797 {
798 out << "creating new topic \"" << name
799 << "\". id: " << _instance->communicator()->identityToString(id)
800 << " subscribers: ";
801 for (SubscriberRecordSeq::const_iterator q = subscribers.begin();
802 q != subscribers.end();
803 ++q)
804 {
805 if (q != subscribers.begin())
806 {
807 out << ",";
808 }
809 if (traceLevels->topicMgr > 1)
810 {
811 out << _instance->communicator()->identityToString(q->id)
812 << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
813 }
814 }
815 }
816 else
817 {
818 out << "loading topic \"" << name
819 << "\" from database. id: " << _instance->communicator()->identityToString(id)
820 << " subscribers: ";
821 for (SubscriberRecordSeq::const_iterator q = subscribers.begin();
822 q != subscribers.end();
823 ++q)
824 {
825 if (q != subscribers.begin())
826 {
827 out << ",";
828 }
829 if (traceLevels->topicMgr > 1)
830 {
831 out << _instance->communicator()->identityToString(q->id)
832 << " endpoints: " << IceStormInternal::describeEndpoints(q->obj);
833 }
834 }
835 }
836 }
837
838 // Create topic implementation
839 TopicImplPtr topicImpl = new TopicImpl(_instance, name, id, subscribers);
840
841 // The identity is the name of the Topic.
842 _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));
843 _instance->topicAdapter()->add(topicImpl->getServant(), id);
844 return topicImpl->proxy();
845}
bool get(K &key, D &data, MDB_cursor_op op)
Definition IceDB.h:386
void commit()
virtual Ice::ObjectPrx getSync() const
TopicPrx retrieve(const std::string &) const
void observerCreateTopic(const IceStormElection::LogUpdate &, const std::string &)
TopicPrx create(const std::string &)
virtual IceStormElection::LogUpdate getLastLogUpdate() const
TopicDict retrieveAll() const
void observerInit(const IceStormElection::LogUpdate &, const IceStormElection::TopicContentSeq &)
Ice::ObjectPtr getServant() const
virtual void sync(const Ice::ObjectPrx &)
virtual Ice::ObjectPrx getObserver() const
void getContent(IceStormElection::LogUpdate &, IceStormElection::TopicContentSeq &)
void observerDestroyTopic(const IceStormElection::LogUpdate &, const std::string &)
TopicManagerImpl(const PersistentInstancePtr &)
void observerAddSubscriber(const IceStormElection::LogUpdate &, const std::string &, const IceStorm::SubscriberRecord &)
void observerRemoveSubscriber(const IceStormElection::LogUpdate &, const std::string &, const Ice::IdentitySeq &)
virtual void initMaster(const std::set< IceStormElection::GroupNodeInfo > &, const IceStormElection::LogUpdate &)
Internal operations for a topic manager.
Thrown if an observer detects an inconsistency.
Definition Election.ice:37
string reason
The reason for the inconsistency.
Definition Election.ice:39
The replica observer.
Definition Election.ice:44
Interface used to sync topics.
Definition Election.ice:124
#define q
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::Node > NodePrx
Definition Election.h:1210
IceUtil::Handle< NodeI > NodeIPtr
Definition Instance.h:36
::std::vector<::IceStormElection::TopicContent > TopicContentSeq
A sequence of topic content.
Definition Election.h:1225
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::TopicManagerSync > TopicManagerSyncPrx
Definition Election.h:1203
std::string identityToTopicName(const Ice::Identity &)
Definition Util.cpp:23
Ice::Identity nameToIdentity(const IceStorm::InstancePtr &, const std::string &)
IceStormElection::LogUpdate getIncrementedLLU(const IceDB::ReadWriteTxn &, IceStorm::LLUMap &)
Definition Util.cpp:96
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition Util.cpp:51
IceDB::ReadWriteCursor< SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream > SubscriberMapRWCursor
Definition Instance.h:132
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
Definition IceManager.h:70
IceUtil::Handle< TraceLevels > TraceLevelsPtr
Definition Instance.h:44
::std::vector<::IceStorm::SubscriberRecord > SubscriberRecordSeq
IceUtil::Handle< TopicManagerImpl > TopicManagerImplPtr
IceUtil::Handle< PersistentInstance > PersistentInstancePtr
Definition Instance.h:172
IceUtil::Handle< TopicImpl > TopicImplPtr
Definition TopicI.h:110
const std::string lluDbKey
Definition Util.h:36
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicManager > TopicManagerPrx
Definition IceManager.h:69
::IceInternal::Handle<::Ice::Communicator > CommunicatorPtr
Definition IceManager.h:49
A struct used for marking the last log update.
Definition LLURecord.h:103
The contents of topic.
Definition Election.ice:24
The key for persistent subscribers, or topics.
Used to store persistent information for persistent subscribers.