10#include <Ice/Communicator.h>
12#include <Ice/Properties.h>
19#include <IceUtil/Timer.h>
35 _topics.push_back(name);
42 vector<string> reaped;
61 _dbLock(
communicator->getProperties()->getPropertyWithDefault(name +
".LMDB.Path", name) +
64 communicator->getProperties()->getPropertyWithDefault(name +
".LMDB.Path", name),
66 IceDB::getMapSize(
communicator->getProperties()->getPropertyAsInt(name +
".LMDB.MapSize")))
115 _discardInterval(
IceUtil::Time::seconds(
116 communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Discard.Interval",
118 _flushInterval(
IceUtil::Time::milliSeconds(
119 communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Flush.Timeout",
122 _sendTimeout(
communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Send.Timeout",
125 communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Send.QueueSizeMax",
135 if (
properties->getProperty(name +
".TopicManager.AdapterId").empty())
137 string p =
properties->getProperty(name +
".ReplicatedTopicManagerEndpoints");
140 const_cast<Ice::ObjectPrx&
>(_topicReplicaProxy) =
143 p =
properties->getProperty(name +
".ReplicatedPublishEndpoints");
146 const_cast<Ice::ObjectPrx&
>(_publisherReplicaProxy) =
151 _batchFlusher =
new IceUtil::Timer();
152 _timer =
new IceUtil::Timer();
154 string policy =
properties->getProperty(name +
".Send.QueueSizeMaxPolicy");
155 if (policy ==
"RemoveSubscriber")
159 else if (policy ==
"DropEvents")
163 else if (!policy.empty())
165 Ice::Warning warn(_traceLevels->logger);
166 warn <<
"invalid value `" << policy <<
"' for `" << name <<
".Send.QueueSizeMaxPolicy'";
173 IceInternal::CommunicatorObserverIPtr o =
174 IceInternal::CommunicatorObserverIPtr::dynamicCast(
communicator->getObserver());
184 __setNoDelete(
false);
188 __setNoDelete(
false);
200 return _instanceName;
212 return _communicator;
218 return _communicator->getProperties();
224 return _publishAdapter;
230 return _topicAdapter;
266 return _batchFlusher;
278 return _topicReplicaProxy;
284 return _publisherReplicaProxy;
302 return _discardInterval;
308 return _flushInterval;
320 return _sendQueueSizeMax;
326 return _sendQueueSizeMaxPolicy;
335 assert(_nodeAdapter);
336 _nodeAdapter->destroy();
339 _topicAdapter->destroy();
340 _publishAdapter->destroy();
353 _batchFlusher->destroy();
void setNode(const IceStormElection::NodeIPtr &)
TraceLevelsPtr traceLevels() const
Ice::ObjectAdapterPtr publishAdapter() const
IceUtil::TimerPtr timer() const
IceUtil::TimerPtr batchFlusher() const
int sendQueueSizeMax() const
IceStormElection::NodeIPtr node() const
Ice::CommunicatorPtr communicator() const
SendQueueSizeMaxPolicy sendQueueSizeMaxPolicy() const
Ice::ObjectAdapterPtr nodeAdapter() const
IceUtil::Time flushInterval() const
Ice::PropertiesPtr properties() const
std::string serviceName() const
IceStormElection::NodePrx nodeProxy() const
Ice::ObjectAdapterPtr topicAdapter() const
Ice::ObjectPrx topicReplicaProxy() const
std::string instanceName() const
Instance(const std::string &, const std::string &, const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &=0, const IceStormElection::NodePrx &=0)
IceStorm::Instrumentation::TopicManagerObserverPtr observer() const
Ice::ObjectPrx publisherReplicaProxy() const
TopicReaperPtr topicReaper() const
IceUtil::Time discardInterval() const
IceStormElection::ObserversPtr observers() const
PersistentInstance(const std::string &, const std::string &, const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &=0, const IceStormElection::NodePrx &=0)
void add(const std::string &)
std::vector< std::string > consumeReapedTopics()
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::Node > NodePrx
IceUtil::Handle< NodeI > NodeIPtr
IceUtil::Handle< Observers > ObserversPtr
int compareSubscriberRecordKey(const MDB_val *v1, const MDB_val *v2)
IceDB::IceContext dbContext
::IceInternal::Handle<::IceStorm::Instrumentation::TopicManagerObserver > TopicManagerObserverPtr
IceDB::Dbi< IceStorm::SubscriberRecordKey, IceStorm::SubscriberRecord, IceDB::IceContext, Ice::OutputStream > SubscriberMap
IceUtil::Handle< TraceLevels > TraceLevelsPtr
IceUtil::Handle< TopicReaper > TopicReaperPtr
IceDB::Dbi< std::string, IceStormElection::LogUpdate, IceDB::IceContext, Ice::OutputStream > LLUMap
IceUtil::Handle< Timer > TimerPtr
::IceInternal::Handle<::Ice::Properties > PropertiesPtr
::IceInternal::Handle<::Ice::Communicator > CommunicatorPtr
::IceInternal::Handle<::Ice::ObjectAdapter > ObjectAdapterPtr