12 #include <Ice/SliceChecksums.h>
33 Ice::Error error(com->getLogger());
34 error <<
"LMDB error: " << ex;
41 _instance(instance), _impl(impl)
46 create(
const string&
id,
const Ice::Current&)
56 return master->create(
id);
58 catch (
const Ice::ConnectFailedException&)
60 _instance->node()->recovery(generation);
63 catch (
const Ice::TimeoutException&)
65 _instance->node()->recovery(generation);
72 return _impl->create(
id);
78 retrieve(
const string&
id,
const Ice::Current&)
const
82 return _impl->retrieve(
id);
86 retrieveAll(
const Ice::Current&)
const
90 return _impl->retrieveAll();
93 virtual Ice::SliceChecksumDict
94 getSliceChecksums(
const Ice::Current&)
const
97 return Ice::sliceChecksums();
101 getReplicaNode(
const Ice::Current&)
const
104 return _instance->nodeProxy();
109 getMaster(
Ice::Long& generation,
const char* file,
int line)
const
114 return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line));
130 _instance(instance), _impl(impl)
142 _impl->observerInit(llu, content);
146 createTopic(
const LogUpdate& llu,
const string& name,
const Ice::Current&)
151 _impl->observerCreateTopic(llu, name);
155 Ice::Warning warn(_instance->traceLevels()->logger);
156 warn <<
"ReplicaObserverI::create: ObserverInconsistencyException: " << e.
reason;
163 destroyTopic(
const LogUpdate& llu,
const string& name,
const Ice::Current&)
168 _impl->observerDestroyTopic(llu, name);
172 Ice::Warning warn(_instance->traceLevels()->logger);
173 warn <<
"ReplicaObserverI::destroy: ObserverInconsistencyException: " << e.
reason;
188 _impl->observerAddSubscriber(llu, name, rec);
192 Ice::Warning warn(_instance->traceLevels()->logger);
193 warn <<
"ReplicaObserverI::add: ObserverInconsistencyException: " << e.
reason;
202 const Ice::IdentitySeq&
id,
208 _impl->observerRemoveSubscriber(llu, name,
id);
212 Ice::Warning warn(_instance->traceLevels()->logger);
213 warn <<
"ReplicaObserverI::remove: ObserverInconsistencyException: " << e.
reason;
234 _impl->getContent(llu, content);
244 _instance(instance), _lluMap(instance->lluMap()), _subscriberMap(instance->subscriberMap())
250 if (_instance->observer())
252 _instance->observer()->setObserverUpdater(
this);
258 _managerImpl =
new TopicManagerI(instance,
this);
262 if (_instance->nodeAdapter())
264 _observerImpl =
new ReplicaObserverI(instance,
this);
265 _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl);
266 _syncImpl =
new TopicManagerSyncI(
this);
267 _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
282 if (cursor.
get(k,
v, MDB_FIRST))
284 bool moreTopics =
false;
289 assert(k.
id.name.empty() && k.
id.category.empty());
295 while ((moreTopics = cursor.
get(k,
v, MDB_NEXT)) && k.
topic == topic)
297 content.push_back(
v);
301 installTopic(name, topic,
false, content);
302 }
while (moreTopics);
311 __setNoDelete(
false);
314 __setNoDelete(
false);
323 if (_topics.find(name) != _topics.end())
344 _subscriberMap.
put(txn, key, rec);
352 logError(_instance->communicator(), ex);
356 _instance->observers()->createTopic(llu, name);
357 return installTopic(name,
id,
true);
368 map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
369 if (p == _topics.end())
376 return p->second->proxy();
388 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
390 all.insert(TopicDict::value_type(p->first, p->second->proxy()));
402 if (traceLevels->topicMgr > 0)
404 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
406 for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
408 out <<
" topic: " << _instance->communicator()->identityToString(p->id)
410 for (SubscriberRecordSeq::const_iterator
q = p->records.begin();
q != p->records.end();
413 if (
q != p->records.begin())
417 out << _instance->communicator()->identityToString(
q->id);
418 if (traceLevels->topicMgr > 1)
434 _subscriberMap.
clear(txn);
436 for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
444 _subscriberMap.
put(txn, key, rec);
446 for (SubscriberRecordSeq::const_iterator
q = p->records.begin();
q != p->records.end();
453 _subscriberMap.
put(txn, key, *
q);
460 logError(_instance->communicator(), ex);
469 map<string, TopicImplPtr>::iterator p = _topics.begin();
470 while (p != _topics.end())
472 TopicContentSeq::const_iterator
q;
473 for (
q = content.begin();
q != content.end(); ++
q)
475 if (
q->id == p->second->id())
481 if (
q == content.end())
489 p->second->observerDestroyTopic(llu);
500 for (TopicContentSeq::const_iterator
q = content.begin();
q != content.end(); ++
q)
503 map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
504 if (p == _topics.end())
506 installTopic(name,
q->id,
true,
q->records);
510 p->second->update(
q->records);
514 _instance->observers()->clear();
533 if (_subscriberMap.
find(txn, key))
537 _subscriberMap.
put(txn, key, rec);
545 logError(_instance->communicator(), ex);
549 installTopic(name,
id,
true);
557 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
558 if (
q == _topics.end())
562 q->second->observerDestroyTopic(llu);
576 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
577 if (
q == _topics.end())
581 assert(
q != _topics.end());
584 topic->observerAddSubscriber(llu, record);
590 const Ice::IdentitySeq&
id)
596 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
597 if (
q == _topics.end())
601 assert(
q != _topics.end());
604 topic->observerRemoveSubscriber(llu,
id);
618 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
621 content.push_back(rec);
629 logError(_instance->communicator(), ex);
645 logError(_instance->communicator(), ex);
659 sync->getContent(llu, content);
690 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
693 content.push_back(rec);
702 logError(_instance->communicator(), ex);
707 _instance->observers()->init(slaves, llu, content);
730 vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
731 for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
733 map<string, TopicImplPtr>::iterator
q = _topics.find(*p);
734 if (
q != _topics.end() &&
q->second->destroyed())
746 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
748 p->second->shutdown();
764 TopicManagerImpl::updateTopicObservers()
767 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
769 p->second->updateObserver();
774 TopicManagerImpl::updateSubscriberObservers()
777 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
779 p->second->updateSubscriberObservers();
784 TopicManagerImpl::installTopic(
const string& name,
793 if (traceLevels->topicMgr > 0)
795 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
798 out <<
"creating new topic \"" << name
799 <<
"\". id: " << _instance->communicator()->identityToString(
id)
801 for (SubscriberRecordSeq::const_iterator
q = subscribers.begin();
802 q != subscribers.end();
805 if (
q != subscribers.begin())
809 if (traceLevels->topicMgr > 1)
811 out << _instance->communicator()->identityToString(
q->id)
818 out <<
"loading topic \"" << name
819 <<
"\" from database. id: " << _instance->communicator()->identityToString(
id)
821 for (SubscriberRecordSeq::const_iterator
q = subscribers.begin();
822 q != subscribers.end();
825 if (
q != subscribers.begin())
829 if (traceLevels->topicMgr > 1)
831 out << _instance->communicator()->identityToString(
q->id)
842 _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));
843 _instance->topicAdapter()->add(topicImpl->getServant(),
id);
844 return topicImpl->proxy();