Instance.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <Ice/Communicator.h>
12#include <Ice/Properties.h>
13#include <Ice/TraceUtil.h>
14#include <IceStorm/Instance.h>
16#include <IceStorm/NodeI.h>
17#include <IceStorm/Observers.h>
19#include <IceUtil/Timer.h>
20
21using namespace std;
22using namespace IceStorm;
23using namespace IceStormElection;
24using namespace IceStormInternal;
25
27{
29}
30
31void
32TopicReaper::add(const string& name)
33{
34 Lock sync(*this);
35 _topics.push_back(name);
36}
37
38vector<string>
40{
41 Lock sync(*this);
42 vector<string> reaped;
43 reaped.swap(_topics);
44 return reaped;
45}
46
48 const string& name,
53 const NodePrx& nodeProxy) :
55 name,
60 nodeProxy),
61 _dbLock(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name) +
62 "/icedb.lock"),
63 _dbEnv(
64 communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name),
65 2,
66 IceDB::getMapSize(communicator->getProperties()->getPropertyAsInt(name + ".LMDB.MapSize")))
67{
68 try
69 {
70 dbContext.communicator = communicator;
71 dbContext.encoding.minor = 1;
72 dbContext.encoding.major = 1;
73
74 IceDB::ReadWriteTxn txn(_dbEnv);
75
76 _lluMap = LLUMap(txn, "llu", dbContext, MDB_CREATE);
77 _subscriberMap =
78 SubscriberMap(txn, "subscribers", dbContext, MDB_CREATE, compareSubscriberRecordKey);
79
80 txn.commit();
81 }
82 catch (...)
83 {
84 shutdown();
85 destroy();
86
87 throw;
88 }
89}
90
91void
93{
94 _dbEnv.close();
95 dbContext.communicator = 0;
96
98}
99
101 const string& name,
106 const NodePrx& nodeProxy) :
107 _instanceName(instanceName),
108 _serviceName(name),
109 _communicator(communicator),
110 _publishAdapter(publishAdapter),
111 _topicAdapter(topicAdapter),
112 _nodeAdapter(nodeAdapter),
113 _nodeProxy(nodeProxy),
114 _traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())),
115 _discardInterval(IceUtil::Time::seconds(
116 communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Discard.Interval",
117 60))), // default one minute.
118 _flushInterval(IceUtil::Time::milliSeconds(
119 communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Flush.Timeout",
120 1000))), // default one second.
121 // default one minute.
122 _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout",
123 60 * 1000)),
124 _sendQueueSizeMax(
125 communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax",
126 -1)),
127 _sendQueueSizeMaxPolicy(RemoveSubscriber),
128 _topicReaper(new TopicReaper())
129{
130 try
131 {
132 __setNoDelete(true);
133
134 Ice::PropertiesPtr properties = communicator->getProperties();
135 if (properties->getProperty(name + ".TopicManager.AdapterId").empty())
136 {
137 string p = properties->getProperty(name + ".ReplicatedTopicManagerEndpoints");
138 if (!p.empty())
139 {
140 const_cast<Ice::ObjectPrx&>(_topicReplicaProxy) =
141 communicator->stringToProxy("dummy:" + p);
142 }
143 p = properties->getProperty(name + ".ReplicatedPublishEndpoints");
144 if (!p.empty())
145 {
146 const_cast<Ice::ObjectPrx&>(_publisherReplicaProxy) =
147 communicator->stringToProxy("dummy:" + p);
148 }
149 }
150 _observers = new Observers(this);
151 _batchFlusher = new IceUtil::Timer();
152 _timer = new IceUtil::Timer();
153
154 string policy = properties->getProperty(name + ".Send.QueueSizeMaxPolicy");
155 if (policy == "RemoveSubscriber")
156 {
157 const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = RemoveSubscriber;
158 }
159 else if (policy == "DropEvents")
160 {
161 const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = DropEvents;
162 }
163 else if (!policy.empty())
164 {
165 Ice::Warning warn(_traceLevels->logger);
166 warn << "invalid value `" << policy << "' for `" << name << ".Send.QueueSizeMaxPolicy'";
167 }
168
169 //
170 // If an Ice metrics observer is setup on the communicator, also
171 // enable metrics for IceStorm.
172 //
173 IceInternal::CommunicatorObserverIPtr o =
174 IceInternal::CommunicatorObserverIPtr::dynamicCast(communicator->getObserver());
175 if (o)
176 {
177 _observer = new TopicManagerObserverI(o->getFacet());
178 }
179 }
180 catch (...)
181 {
182 shutdown();
183 destroy();
184 __setNoDelete(false);
185
186 throw;
187 }
188 __setNoDelete(false);
189}
190
191void
193{
194 _node = node;
195}
196
197string
199{
200 return _instanceName;
201}
202
203string
205{
206 return _serviceName;
207}
208
211{
212 return _communicator;
213}
214
217{
218 return _communicator->getProperties();
219}
220
223{
224 return _publishAdapter;
225}
226
229{
230 return _topicAdapter;
231}
232
235{
236 return _nodeAdapter;
237}
238
241{
242 return _observers;
243}
244
247{
248 return _node;
249}
250
253{
254 return _nodeProxy;
255}
256
259{
260 return _traceLevels;
261}
262
265{
266 return _batchFlusher;
267}
268
271{
272 return _timer;
273}
274
275Ice::ObjectPrx
277{
278 return _topicReplicaProxy;
279}
280
281Ice::ObjectPrx
283{
284 return _publisherReplicaProxy;
285}
286
289{
290 return _observer;
291}
292
295{
296 return _topicReaper;
297}
298
299IceUtil::Time
301{
302 return _discardInterval;
303}
304
305IceUtil::Time
307{
308 return _flushInterval;
309}
310
311int
313{
314 return _sendTimeout;
315}
316
317int
319{
320 return _sendQueueSizeMax;
321}
322
325{
326 return _sendQueueSizeMaxPolicy;
327}
328
329void
331{
332 if (_node)
333 {
334 _node->destroy();
335 assert(_nodeAdapter);
336 _nodeAdapter->destroy();
337 }
338
339 _topicAdapter->destroy();
340 _publishAdapter->destroy();
341
342 if (_timer)
343 {
344 _timer->destroy();
345 }
346}
347
348void
350{
351 if (_batchFlusher)
352 {
353 _batchFlusher->destroy();
354 }
355
356 // The node instance must be cleared as the node holds the
357 // replica (TopicManager) which holds the instance causing a
358 // cyclic reference.
359 _node = 0;
360 //
361 // The observer instance must be cleared as it holds the
362 // TopicManagerImpl which hodlds the instance causing a
363 // cyclic reference.
364 //
365 _observer = 0;
366}
void commit()
void setNode(const IceStormElection::NodeIPtr &)
Definition Instance.cpp:192
TraceLevelsPtr traceLevels() const
Definition Instance.cpp:258
Ice::ObjectAdapterPtr publishAdapter() const
Definition Instance.cpp:222
IceUtil::TimerPtr timer() const
Definition Instance.cpp:270
IceUtil::TimerPtr batchFlusher() const
Definition Instance.cpp:264
int sendQueueSizeMax() const
Definition Instance.cpp:318
virtual void destroy()
Definition Instance.cpp:349
IceStormElection::NodeIPtr node() const
Definition Instance.cpp:246
Ice::CommunicatorPtr communicator() const
Definition Instance.cpp:210
SendQueueSizeMaxPolicy sendQueueSizeMaxPolicy() const
Definition Instance.cpp:324
Ice::ObjectAdapterPtr nodeAdapter() const
Definition Instance.cpp:234
IceUtil::Time flushInterval() const
Definition Instance.cpp:306
Ice::PropertiesPtr properties() const
Definition Instance.cpp:216
std::string serviceName() const
Definition Instance.cpp:204
IceStormElection::NodePrx nodeProxy() const
Definition Instance.cpp:252
Ice::ObjectAdapterPtr topicAdapter() const
Definition Instance.cpp:228
Ice::ObjectPrx topicReplicaProxy() const
Definition Instance.cpp:276
int sendTimeout() const
Definition Instance.cpp:312
std::string instanceName() const
Definition Instance.cpp:198
Instance(const std::string &, const std::string &, const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &=0, const IceStormElection::NodePrx &=0)
Definition Instance.cpp:100
IceStorm::Instrumentation::TopicManagerObserverPtr observer() const
Definition Instance.cpp:288
Ice::ObjectPrx publisherReplicaProxy() const
Definition Instance.cpp:282
TopicReaperPtr topicReaper() const
Definition Instance.cpp:294
IceUtil::Time discardInterval() const
Definition Instance.cpp:300
IceStormElection::ObserversPtr observers() const
Definition Instance.cpp:240
PersistentInstance(const std::string &, const std::string &, const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &=0, const IceStormElection::NodePrx &=0)
Definition Instance.cpp:47
void add(const std::string &)
Definition Instance.cpp:32
std::vector< std::string > consumeReapedTopics()
Definition Instance.cpp:39
Definition IceDB.h:43
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::Node > NodePrx
Definition Election.h:1210
IceUtil::Handle< NodeI > NodeIPtr
Definition Instance.h:36
IceUtil::Handle< Observers > ObserversPtr
Definition Instance.h:33
int compareSubscriberRecordKey(const MDB_val *v1, const MDB_val *v2)
Definition Util.cpp:74
IceDB::IceContext dbContext
Definition Util.cpp:19
::IceInternal::Handle<::IceStorm::Instrumentation::TopicManagerObserver > TopicManagerObserverPtr
IceDB::Dbi< IceStorm::SubscriberRecordKey, IceStorm::SubscriberRecord, IceDB::IceContext, Ice::OutputStream > SubscriberMap
Definition Util.h:31
IceUtil::Handle< TraceLevels > TraceLevelsPtr
Definition Instance.h:44
IceUtil::Handle< TopicReaper > TopicReaperPtr
Definition Instance.h:56
IceDB::Dbi< std::string, IceStormElection::LogUpdate, IceDB::IceContext, Ice::OutputStream > LLUMap
Definition Util.h:34
IceUtil::Handle< Timer > TimerPtr
Definition Instance.h:25
::IceInternal::Handle<::Ice::Properties > PropertiesPtr
::IceInternal::Handle<::Ice::Communicator > CommunicatorPtr
Definition IceManager.h:49
::IceInternal::Handle<::Ice::ObjectAdapter > ObjectAdapterPtr
Definition IceManager.h:52