18 #include <Ice/SliceChecksums.h>
33 Ice::Error error(com->getLogger());
34 error <<
"LMDB error: " << ex;
42 _instance(instance), _impl(impl)
46 virtual TopicPrx create(
const string&
id,
const Ice::Current&)
56 return master->create(
id);
58 catch (
const Ice::ConnectFailedException&)
60 _instance->node()->recovery(generation);
63 catch (
const Ice::TimeoutException&)
65 _instance->node()->recovery(generation);
72 return _impl->create(
id);
77 virtual TopicPrx retrieve(
const string&
id,
const Ice::Current&)
const
81 return _impl->retrieve(
id);
84 virtual TopicDict retrieveAll(
const Ice::Current&)
const
88 return _impl->retrieveAll();
91 virtual Ice::SliceChecksumDict getSliceChecksums(
const Ice::Current&)
const
94 return Ice::sliceChecksums();
97 virtual NodePrx getReplicaNode(
const Ice::Current&)
const
100 return _instance->nodeProxy();
110 return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line));
139 _impl->observerInit(llu, content);
142 virtual void createTopic(
const LogUpdate& llu,
const string& name,
const Ice::Current&)
147 _impl->observerCreateTopic(llu, name);
151 Ice::Warning warn(_instance->traceLevels()->logger);
152 warn <<
"ReplicaObserverI::create: ObserverInconsistencyException: " << e.
reason;
158 virtual void destroyTopic(
const LogUpdate& llu,
const string& name,
const Ice::Current&)
163 _impl->observerDestroyTopic(llu, name);
167 Ice::Warning warn(_instance->traceLevels()->logger);
168 warn <<
"ReplicaObserverI::destroy: ObserverInconsistencyException: " << e.
reason;
180 _impl->observerAddSubscriber(llu, name, rec);
184 Ice::Warning warn(_instance->traceLevels()->logger);
185 warn <<
"ReplicaObserverI::add: ObserverInconsistencyException: " << e.
reason;
191 virtual void removeSubscriber(
const LogUpdate& llu,
const string& name,
const Ice::IdentitySeq&
id,
197 _impl->observerRemoveSubscriber(llu, name,
id);
201 Ice::Warning warn(_instance->traceLevels()->logger);
202 warn <<
"ReplicaObserverI::remove: ObserverInconsistencyException: " << e.
reason;
225 _impl->getContent(llu, content);
237 _lluMap(instance->lluMap()),
238 _subscriberMap(instance->subscriberMap())
244 if (_instance->observer())
246 _instance->observer()->setObserverUpdater(
this);
252 _managerImpl =
new TopicManagerI(instance,
this);
256 if (_instance->nodeAdapter())
258 _observerImpl =
new ReplicaObserverI(instance,
this);
259 _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl);
260 _syncImpl =
new TopicManagerSyncI(
this);
261 _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
276 if (cursor.
get(k,
v, MDB_FIRST))
278 bool moreTopics =
false;
283 assert(k.
id.name.empty() && k.
id.category.empty());
289 while ((moreTopics = cursor.
get(k,
v, MDB_NEXT)) && k.
topic == topic)
291 content.push_back(
v);
295 installTopic(name, topic,
false, content);
306 __setNoDelete(
false);
309 __setNoDelete(
false);
318 if (_topics.find(name) != _topics.end())
339 _subscriberMap.
put(txn, key, rec);
347 logError(_instance->communicator(), ex);
351 _instance->observers()->createTopic(llu, name);
352 return installTopic(name,
id,
true);
363 map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
364 if (p == _topics.end())
371 return p->second->proxy();
383 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
385 all.insert(TopicDict::value_type(p->first, p->second->proxy()));
397 if (traceLevels->topicMgr > 0)
399 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
401 for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
403 out <<
" topic: " << _instance->communicator()->identityToString(p->id) <<
" subscribers: ";
404 for (SubscriberRecordSeq::const_iterator
q = p->records.begin();
q != p->records.end(); ++
q)
406 if (
q != p->records.begin())
410 out << _instance->communicator()->identityToString(
q->id);
411 if (traceLevels->topicMgr > 1)
427 _subscriberMap.
clear(txn);
429 for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
437 _subscriberMap.
put(txn, key, rec);
439 for (SubscriberRecordSeq::const_iterator
q = p->records.begin();
q != p->records.end(); ++
q)
445 _subscriberMap.
put(txn, key, *
q);
452 logError(_instance->communicator(), ex);
461 map<string, TopicImplPtr>::iterator p = _topics.begin();
462 while (p != _topics.end())
464 TopicContentSeq::const_iterator
q;
465 for (
q = content.begin();
q != content.end(); ++
q)
467 if (
q->id == p->second->id())
473 if (
q == content.end())
481 p->second->observerDestroyTopic(llu);
492 for (TopicContentSeq::const_iterator
q = content.begin();
q != content.end(); ++
q)
495 map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
496 if (p == _topics.end())
498 installTopic(name,
q->id,
true,
q->records);
502 p->second->update(
q->records);
506 _instance->observers()->clear();
525 if (_subscriberMap.
find(txn, key))
529 _subscriberMap.
put(txn, key, rec);
537 logError(_instance->communicator(), ex);
541 installTopic(name,
id,
true);
549 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
550 if (
q == _topics.end())
554 q->second->observerDestroyTopic(llu);
566 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
567 if (
q == _topics.end())
571 assert(
q != _topics.end());
574 topic->observerAddSubscriber(llu, record);
584 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
585 if (
q == _topics.end())
589 assert(
q != _topics.end());
592 topic->observerRemoveSubscriber(llu,
id);
606 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
609 content.push_back(rec);
617 logError(_instance->communicator(), ex);
633 logError(_instance->communicator(), ex);
647 sync->getContent(llu, content);
678 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
681 content.push_back(rec);
690 logError(_instance->communicator(), ex);
695 _instance->observers()->init(slaves, llu, content);
718 vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
719 for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
721 map<string, TopicImplPtr>::iterator
q = _topics.find(*p);
722 if (
q != _topics.end() &&
q->second->destroyed())
734 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
736 p->second->shutdown();
752 TopicManagerImpl::updateTopicObservers()
755 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
757 p->second->updateObserver();
762 TopicManagerImpl::updateSubscriberObservers()
765 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
767 p->second->updateSubscriberObservers();
772 TopicManagerImpl::installTopic(
const string& name,
const Ice::Identity&
id,
bool create,
779 if (traceLevels->topicMgr > 0)
781 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
784 out <<
"creating new topic \"" << name <<
"\". id: "
785 << _instance->communicator()->identityToString(
id)
787 for (SubscriberRecordSeq::const_iterator
q = subscribers.begin();
q != subscribers.end(); ++
q)
789 if (
q != subscribers.begin())
793 if (traceLevels->topicMgr > 1)
795 out << _instance->communicator()->identityToString(
q->id)
802 out <<
"loading topic \"" << name <<
"\" from database. id: "
803 << _instance->communicator()->identityToString(
id)
805 for (SubscriberRecordSeq::const_iterator
q = subscribers.begin();
q != subscribers.end(); ++
q)
807 if (
q != subscribers.begin())
811 if (traceLevels->topicMgr > 1)
813 out << _instance->communicator()->identityToString(
q->id)
824 _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));
825 _instance->topicAdapter()->add(topicImpl->getServant(),
id);
826 return topicImpl->proxy();