10 #include <Ice/Communicator.h>
12 #include <Ice/Properties.h>
19 #include <IceUtil/Timer.h>
32 TopicReaper::add(
const string& name)
35 _topics.push_back(name);
39 TopicReaper::consumeReapedTopics()
42 vector<string> reaped;
47 PersistentInstance::PersistentInstance(
const string& instanceName,
61 _dbLock(communicator->getProperties()->getPropertyWithDefault(name +
".LMDB.Path", name) +
64 communicator->getProperties()->getPropertyWithDefault(name +
".LMDB.Path", name),
66 IceDB::
getMapSize(communicator->getProperties()->getPropertyAsInt(name +
".LMDB.MapSize")))
107 _instanceName(instanceName),
109 _communicator(communicator),
110 _publishAdapter(publishAdapter),
111 _topicAdapter(topicAdapter),
112 _nodeAdapter(nodeAdapter),
113 _nodeProxy(nodeProxy),
114 _traceLevels(new
TraceLevels(name, communicator->getProperties(), communicator->getLogger())),
116 communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Discard.Interval",
119 communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Flush.Timeout",
122 _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Send.Timeout",
125 communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Send.QueueSizeMax",
127 _sendQueueSizeMaxPolicy(RemoveSubscriber),
135 if (
properties->getProperty(name +
".TopicManager.AdapterId").empty())
137 string p =
properties->getProperty(name +
".ReplicatedTopicManagerEndpoints");
140 const_cast<Ice::ObjectPrx&
>(_topicReplicaProxy) =
143 p =
properties->getProperty(name +
".ReplicatedPublishEndpoints");
146 const_cast<Ice::ObjectPrx&
>(_publisherReplicaProxy) =
151 _batchFlusher =
new IceUtil::Timer();
152 _timer =
new IceUtil::Timer();
154 string policy =
properties->getProperty(name +
".Send.QueueSizeMaxPolicy");
155 if (policy ==
"RemoveSubscriber")
159 else if (policy ==
"DropEvents")
163 else if (!policy.empty())
165 Ice::Warning warn(_traceLevels->logger);
166 warn <<
"invalid value `" << policy <<
"' for `" << name <<
".Send.QueueSizeMaxPolicy'";
173 IceInternal::CommunicatorObserverIPtr o =
174 IceInternal::CommunicatorObserverIPtr::dynamicCast(
communicator->getObserver());
184 __setNoDelete(
false);
188 __setNoDelete(
false);
200 return _instanceName;
212 return _communicator;
218 return _communicator->getProperties();
224 return _publishAdapter;
230 return _topicAdapter;
266 return _batchFlusher;
278 return _topicReplicaProxy;
284 return _publisherReplicaProxy;
302 return _discardInterval;
308 return _flushInterval;
320 return _sendQueueSizeMax;
326 return _sendQueueSizeMaxPolicy;
335 assert(_nodeAdapter);
336 _nodeAdapter->destroy();
339 _topicAdapter->destroy();
340 _publishAdapter->destroy();
353 _batchFlusher->destroy();