12#include <Ice/SliceChecksums.h>
33 Ice::Error error(com->getLogger());
34 error <<
"LMDB error: " << ex;
41 _instance(instance), _impl(impl)
46 create(
const string&
id,
const Ice::Current&)
56 return master->create(
id);
58 catch (
const Ice::ConnectFailedException&)
60 _instance->node()->recovery(generation);
63 catch (
const Ice::TimeoutException&)
65 _instance->node()->recovery(generation);
71 FinishUpdateHelper unlock(_instance->node());
72 return _impl->create(
id);
78 retrieve(
const string&
id,
const Ice::Current&)
const
81 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
82 return _impl->retrieve(
id);
86 retrieveAll(
const Ice::Current&)
const
89 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
90 return _impl->retrieveAll();
93 virtual Ice::SliceChecksumDict
94 getSliceChecksums(
const Ice::Current&)
const
97 return Ice::sliceChecksums();
101 getReplicaNode(
const Ice::Current&)
const
104 return _instance->nodeProxy();
109 getMaster(Ice::Long& generation,
const char* file,
int line)
const
114 return TopicManagerPrx::uncheckedCast(node->startUpdate(generation, file, line));
130 _instance(instance), _impl(impl)
135 init(
const LogUpdate& llu,
const TopicContentSeq& content,
const Ice::Current&)
142 _impl->observerInit(llu, content);
146 createTopic(
const LogUpdate& llu,
const string& name,
const Ice::Current&)
150 ObserverUpdateHelper unlock(_instance->node(), llu.
generation, __FILE__, __LINE__);
151 _impl->observerCreateTopic(llu, name);
153 catch (
const ObserverInconsistencyException& e)
155 Ice::Warning warn(_instance->traceLevels()->logger);
156 warn <<
"ReplicaObserverI::create: ObserverInconsistencyException: " << e.
reason;
163 destroyTopic(
const LogUpdate& llu,
const string& name,
const Ice::Current&)
167 ObserverUpdateHelper unlock(_instance->node(), llu.
generation, __FILE__, __LINE__);
168 _impl->observerDestroyTopic(llu, name);
170 catch (
const ObserverInconsistencyException& e)
172 Ice::Warning warn(_instance->traceLevels()->logger);
173 warn <<
"ReplicaObserverI::destroy: ObserverInconsistencyException: " << e.
reason;
180 addSubscriber(
const LogUpdate& llu,
182 const SubscriberRecord& rec,
187 ObserverUpdateHelper unlock(_instance->node(), llu.
generation, __FILE__, __LINE__);
188 _impl->observerAddSubscriber(llu, name, rec);
190 catch (
const ObserverInconsistencyException& e)
192 Ice::Warning warn(_instance->traceLevels()->logger);
193 warn <<
"ReplicaObserverI::add: ObserverInconsistencyException: " << e.
reason;
200 removeSubscriber(
const LogUpdate& llu,
202 const Ice::IdentitySeq&
id,
207 ObserverUpdateHelper unlock(_instance->node(), llu.
generation, __FILE__, __LINE__);
208 _impl->observerRemoveSubscriber(llu, name,
id);
210 catch (
const ObserverInconsistencyException& e)
212 Ice::Warning warn(_instance->traceLevels()->logger);
213 warn <<
"ReplicaObserverI::remove: ObserverInconsistencyException: " << e.
reason;
232 getContent(LogUpdate& llu,
TopicContentSeq& content,
const Ice::Current&)
234 _impl->getContent(llu, content);
244 _instance(instance), _lluMap(instance->lluMap()), _subscriberMap(instance->subscriberMap())
250 if (_instance->observer())
252 _instance->observer()->setObserverUpdater(
this);
258 _managerImpl =
new TopicManagerI(instance,
this);
262 if (_instance->nodeAdapter())
264 _observerImpl =
new ReplicaObserverI(instance,
this);
265 _observer = _instance->nodeAdapter()->addWithUUID(_observerImpl);
266 _syncImpl =
new TopicManagerSyncI(
this);
267 _sync = _instance->nodeAdapter()->addWithUUID(_syncImpl);
275 _instance->lluMap().put(txn,
lluDbKey, empty);
282 if (cursor.
get(k, v, MDB_FIRST))
284 bool moreTopics =
false;
289 assert(k.
id.name.empty() && k.
id.category.empty());
291 Ice::Identity topic = k.
topic;
295 while ((moreTopics = cursor.
get(k, v, MDB_NEXT)) && k.
topic == topic)
297 content.push_back(v);
301 installTopic(name, topic,
false, content);
302 }
while (moreTopics);
311 __setNoDelete(
false);
314 __setNoDelete(
false);
323 if (_topics.find(name) != _topics.end())
344 _subscriberMap.put(txn, key, rec);
352 logError(_instance->communicator(), ex);
356 _instance->observers()->createTopic(llu, name);
357 return installTopic(name,
id,
true);
368 map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
369 if (p == _topics.end())
376 return p->second->proxy();
388 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
390 all.insert(TopicDict::value_type(p->first, p->second->proxy()));
402 if (traceLevels->topicMgr > 0)
404 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
406 for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
408 out <<
" topic: " << _instance->communicator()->identityToString(p->id)
410 for (SubscriberRecordSeq::const_iterator
q = p->records.begin();
q != p->records.end();
413 if (
q != p->records.begin())
417 out << _instance->communicator()->identityToString(
q->id);
418 if (traceLevels->topicMgr > 1)
434 _subscriberMap.clear(txn);
436 for (TopicContentSeq::const_iterator p = content.begin(); p != content.end(); ++p)
444 _subscriberMap.put(txn, key, rec);
446 for (SubscriberRecordSeq::const_iterator
q = p->records.begin();
q != p->records.end();
453 _subscriberMap.put(txn, key, *
q);
460 logError(_instance->communicator(), ex);
469 map<string, TopicImplPtr>::iterator p = _topics.begin();
470 while (p != _topics.end())
472 TopicContentSeq::const_iterator
q;
473 for (
q = content.begin();
q != content.end(); ++
q)
475 if (
q->id == p->second->id())
481 if (
q == content.end())
489 p->second->observerDestroyTopic(llu);
500 for (TopicContentSeq::const_iterator
q = content.begin();
q != content.end(); ++
q)
503 map<string, TopicImplPtr>::const_iterator p = _topics.find(name);
504 if (p == _topics.end())
506 installTopic(name,
q->id,
true,
q->records);
510 p->second->update(
q->records);
514 _instance->observers()->clear();
533 if (_subscriberMap.find(txn, key))
537 _subscriberMap.put(txn, key, rec);
545 logError(_instance->communicator(), ex);
549 installTopic(name,
id,
true);
557 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
558 if (
q == _topics.end())
562 q->second->observerDestroyTopic(llu);
576 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
577 if (
q == _topics.end())
581 assert(
q != _topics.end());
584 topic->observerAddSubscriber(llu, record);
590 const Ice::IdentitySeq&
id)
596 map<string, TopicImplPtr>::iterator
q = _topics.find(name);
597 if (
q == _topics.end())
601 assert(
q != _topics.end());
604 topic->observerRemoveSubscriber(llu,
id);
618 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
621 content.push_back(rec);
629 logError(_instance->communicator(), ex);
645 logError(_instance->communicator(), ex);
659 sync->getContent(llu, content);
690 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
693 content.push_back(rec);
702 logError(_instance->communicator(), ex);
707 _instance->observers()->init(slaves, llu, content);
730 vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
731 for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
733 map<string, TopicImplPtr>::iterator
q = _topics.find(*p);
734 if (
q != _topics.end() &&
q->second->destroyed())
746 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
748 p->second->shutdown();
764TopicManagerImpl::updateTopicObservers()
767 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
769 p->second->updateObserver();
774TopicManagerImpl::updateSubscriberObservers()
777 for (map<string, TopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
779 p->second->updateSubscriberObservers();
784TopicManagerImpl::installTopic(
const string& name,
785 const Ice::Identity&
id,
793 if (traceLevels->topicMgr > 0)
795 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
798 out <<
"creating new topic \"" << name
799 <<
"\". id: " << _instance->communicator()->identityToString(
id)
801 for (SubscriberRecordSeq::const_iterator
q = subscribers.begin();
802 q != subscribers.end();
805 if (
q != subscribers.begin())
809 if (traceLevels->topicMgr > 1)
811 out << _instance->communicator()->identityToString(
q->id)
818 out <<
"loading topic \"" << name
819 <<
"\" from database. id: " << _instance->communicator()->identityToString(
id)
821 for (SubscriberRecordSeq::const_iterator
q = subscribers.begin();
822 q != subscribers.end();
825 if (
q != subscribers.begin())
829 if (traceLevels->topicMgr > 1)
831 out << _instance->communicator()->identityToString(
q->id)
839 TopicImplPtr topicImpl =
new TopicImpl(_instance, name,
id, subscribers);
842 _topics.insert(map<string, TopicImplPtr>::value_type(name, topicImpl));
843 _instance->topicAdapter()->add(topicImpl->getServant(),
id);
844 return topicImpl->proxy();
bool get(K &key, D &data, MDB_cursor_op op)
virtual Ice::ObjectPrx getSync() const
TopicPrx retrieve(const std::string &) const
void observerCreateTopic(const IceStormElection::LogUpdate &, const std::string &)
TopicPrx create(const std::string &)
virtual IceStormElection::LogUpdate getLastLogUpdate() const
TopicDict retrieveAll() const
void observerInit(const IceStormElection::LogUpdate &, const IceStormElection::TopicContentSeq &)
Ice::ObjectPtr getServant() const
virtual void sync(const Ice::ObjectPrx &)
virtual Ice::ObjectPrx getObserver() const
void getContent(IceStormElection::LogUpdate &, IceStormElection::TopicContentSeq &)
void observerDestroyTopic(const IceStormElection::LogUpdate &, const std::string &)
TopicManagerImpl(const PersistentInstancePtr &)
void observerAddSubscriber(const IceStormElection::LogUpdate &, const std::string &, const IceStorm::SubscriberRecord &)
void observerRemoveSubscriber(const IceStormElection::LogUpdate &, const std::string &, const Ice::IdentitySeq &)
virtual void initMaster(const std::set< IceStormElection::GroupNodeInfo > &, const IceStormElection::LogUpdate &)
Internal operations for a topic manager.
Thrown if an observer detects an inconsistency.
string reason
The reason for the inconsistency.
Interface used to sync topics.
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::Node > NodePrx
IceUtil::Handle< NodeI > NodeIPtr
::std::vector<::IceStormElection::TopicContent > TopicContentSeq
A sequence of topic content.
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::TopicManagerSync > TopicManagerSyncPrx
std::string identityToTopicName(const Ice::Identity &)
Ice::Identity nameToIdentity(const IceStorm::InstancePtr &, const std::string &)
IceStormElection::LogUpdate getIncrementedLLU(const IceDB::ReadWriteTxn &, IceStorm::LLUMap &)
std::string describeEndpoints(const Ice::ObjectPrx &)
IceDB::ReadWriteCursor< SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream > SubscriberMapRWCursor
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
IceUtil::Handle< TraceLevels > TraceLevelsPtr
::std::vector<::IceStorm::SubscriberRecord > SubscriberRecordSeq
IceUtil::Handle< TopicManagerImpl > TopicManagerImplPtr
IceUtil::Handle< PersistentInstance > PersistentInstancePtr
IceUtil::Handle< TopicImpl > TopicImplPtr
const std::string lluDbKey
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicManager > TopicManagerPrx
::IceInternal::Handle<::Ice::Communicator > CommunicatorPtr
A struct used for marking the last log update.
The key for persistent subscribers, or topics.
Used to store persistent information for persistent subscribers.