15 #include <IceUtil/Timer.h>
18 #include <Ice/Communicator.h>
19 #include <Ice/Properties.h>
33 TopicReaper::add(
const string& name)
36 _topics.push_back(name);
40 TopicReaper::consumeReapedTopics()
43 vector<string> reaped;
48 PersistentInstance::PersistentInstance(
49 const string& instanceName,
56 Instance(instanceName, name, communicator, publishAdapter, topicAdapter, nodeAdapter, nodeProxy),
57 _dbLock(communicator->getProperties()->getPropertyWithDefault(name +
".LMDB.Path", name) +
"/icedb.lock"),
58 _dbEnv(communicator->getProperties()->getPropertyWithDefault(name +
".LMDB.Path", name), 2,
59 IceDB::
getMapSize(communicator->getProperties()->getPropertyAsInt(name +
".LMDB.MapSize")))
93 const string& instanceName,
100 _instanceName(instanceName),
102 _communicator(communicator),
103 _publishAdapter(publishAdapter),
104 _topicAdapter(topicAdapter),
105 _nodeAdapter(nodeAdapter),
106 _nodeProxy(nodeProxy),
107 _traceLevels(new
TraceLevels(name, communicator->getProperties(), communicator->getLogger())),
108 _discardInterval(
IceUtil::
Time::seconds(communicator->getProperties()->getPropertyAsIntWithDefault(
109 name +
".Discard.Interval", 60))),
110 _flushInterval(
IceUtil::
Time::milliSeconds(communicator->getProperties()->getPropertyAsIntWithDefault(
111 name +
".Flush.Timeout", 1000))),
113 _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Send.Timeout", 60 * 1000)),
114 _sendQueueSizeMax(communicator->getProperties()->getPropertyAsIntWithDefault(name +
".Send.QueueSizeMax", -1)),
115 _sendQueueSizeMaxPolicy(RemoveSubscriber),
123 if (
properties->getProperty(name +
".TopicManager.AdapterId").empty())
125 string p =
properties->getProperty(name +
".ReplicatedTopicManagerEndpoints");
128 const_cast<Ice::ObjectPrx&
>(_topicReplicaProxy) =
communicator->stringToProxy(
"dummy:" + p);
130 p =
properties->getProperty(name +
".ReplicatedPublishEndpoints");
133 const_cast<Ice::ObjectPrx&
>(_publisherReplicaProxy) =
communicator->stringToProxy(
"dummy:" + p);
137 _batchFlusher =
new IceUtil::Timer();
138 _timer =
new IceUtil::Timer();
140 string policy =
properties->getProperty(name +
".Send.QueueSizeMaxPolicy");
141 if (policy ==
"RemoveSubscriber")
145 else if (policy ==
"DropEvents")
149 else if (!policy.empty())
151 Ice::Warning warn(_traceLevels->logger);
152 warn <<
"invalid value `" << policy <<
"' for `" << name <<
".Send.QueueSizeMaxPolicy'";
159 IceInternal::CommunicatorObserverIPtr o =
160 IceInternal::CommunicatorObserverIPtr::dynamicCast(
communicator->getObserver());
170 __setNoDelete(
false);
174 __setNoDelete(
false);
186 return _instanceName;
198 return _communicator;
204 return _communicator->getProperties();
210 return _publishAdapter;
216 return _topicAdapter;
252 return _batchFlusher;
264 return _topicReplicaProxy;
270 return _publisherReplicaProxy;
288 return _discardInterval;
294 return _flushInterval;
306 return _sendQueueSizeMax;
312 return _sendQueueSizeMaxPolicy;
321 assert(_nodeAdapter);
322 _nodeAdapter->destroy();
325 _topicAdapter->destroy();
326 _publishAdapter->destroy();
339 _batchFlusher->destroy();