Instance.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <IceStorm/Instance.h>
11 #include <IceStorm/TraceLevels.h>
12 #include <IceStorm/Observers.h>
13 #include <IceStorm/NodeI.h>
15 #include <IceUtil/Timer.h>
16 
17 #include <Ice/InstrumentationI.h>
18 #include <Ice/Communicator.h>
19 #include <Ice/Properties.h>
20 #include <Ice/TraceUtil.h>
21 
22 using namespace std;
23 using namespace IceStorm;
24 using namespace IceStormElection;
25 using namespace IceStormInternal;
26 
28 {
30 }
31 
32 void
33 TopicReaper::add(const string& name)
34 {
35  Lock sync(*this);
36  _topics.push_back(name);
37 }
38 
39 vector<string>
40 TopicReaper::consumeReapedTopics()
41 {
42  Lock sync(*this);
43  vector<string> reaped;
44  reaped.swap(_topics);
45  return reaped;
46 }
47 
48 PersistentInstance::PersistentInstance(
49  const string& instanceName,
50  const string& name,
51  const Ice::CommunicatorPtr& communicator,
52  const Ice::ObjectAdapterPtr& publishAdapter,
53  const Ice::ObjectAdapterPtr& topicAdapter,
54  const Ice::ObjectAdapterPtr& nodeAdapter,
55  const NodePrx& nodeProxy) :
56  Instance(instanceName, name, communicator, publishAdapter, topicAdapter, nodeAdapter, nodeProxy),
57  _dbLock(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name) + "/icedb.lock"),
58  _dbEnv(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name), 2,
59  IceDB::getMapSize(communicator->getProperties()->getPropertyAsInt(name + ".LMDB.MapSize")))
60 {
61  try
62  {
64  dbContext.encoding.minor = 1;
65  dbContext.encoding.major = 1;
66 
67  IceDB::ReadWriteTxn txn(_dbEnv);
68 
69  _lluMap = LLUMap(txn, "llu", dbContext, MDB_CREATE);
70  _subscriberMap = SubscriberMap(txn, "subscribers", dbContext, MDB_CREATE, compareSubscriberRecordKey);
71 
72  txn.commit();
73  }
74  catch (...)
75  {
76  shutdown();
77  destroy();
78 
79  throw;
80  }
81 }
82 
83 void
85 {
86  _dbEnv.close();
88 
90 }
91 
93  const string& instanceName,
94  const string& name,
95  const Ice::CommunicatorPtr& communicator,
96  const Ice::ObjectAdapterPtr& publishAdapter,
97  const Ice::ObjectAdapterPtr& topicAdapter,
98  const Ice::ObjectAdapterPtr& nodeAdapter,
99  const NodePrx& nodeProxy) :
100  _instanceName(instanceName),
101  _serviceName(name),
102  _communicator(communicator),
103  _publishAdapter(publishAdapter),
104  _topicAdapter(topicAdapter),
105  _nodeAdapter(nodeAdapter),
106  _nodeProxy(nodeProxy),
107  _traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())),
108  _discardInterval(IceUtil::Time::seconds(communicator->getProperties()->getPropertyAsIntWithDefault(
109  name + ".Discard.Interval", 60))), // default one minute.
110  _flushInterval(IceUtil::Time::milliSeconds(communicator->getProperties()->getPropertyAsIntWithDefault(
111  name + ".Flush.Timeout", 1000))), // default one second.
112  // default one minute.
113  _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout", 60 * 1000)),
114  _sendQueueSizeMax(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax", -1)),
115  _sendQueueSizeMaxPolicy(RemoveSubscriber),
116  _topicReaper(new TopicReaper())
117 {
118  try
119  {
120  __setNoDelete(true);
121 
122  Ice::PropertiesPtr properties = communicator->getProperties();
123  if (properties->getProperty(name + ".TopicManager.AdapterId").empty())
124  {
125  string p = properties->getProperty(name + ".ReplicatedTopicManagerEndpoints");
126  if (!p.empty())
127  {
128  const_cast<Ice::ObjectPrx&>(_topicReplicaProxy) = communicator->stringToProxy("dummy:" + p);
129  }
130  p = properties->getProperty(name + ".ReplicatedPublishEndpoints");
131  if (!p.empty())
132  {
133  const_cast<Ice::ObjectPrx&>(_publisherReplicaProxy) = communicator->stringToProxy("dummy:" + p);
134  }
135  }
136  _observers = new Observers(this);
137  _batchFlusher = new IceUtil::Timer();
138  _timer = new IceUtil::Timer();
139 
140  string policy = properties->getProperty(name + ".Send.QueueSizeMaxPolicy");
141  if (policy == "RemoveSubscriber")
142  {
143  const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = RemoveSubscriber;
144  }
145  else if (policy == "DropEvents")
146  {
147  const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = DropEvents;
148  }
149  else if (!policy.empty())
150  {
151  Ice::Warning warn(_traceLevels->logger);
152  warn << "invalid value `" << policy << "' for `" << name << ".Send.QueueSizeMaxPolicy'";
153  }
154 
155  //
156  // If an Ice metrics observer is setup on the communicator, also
157  // enable metrics for IceStorm.
158  //
159  IceInternal::CommunicatorObserverIPtr o =
160  IceInternal::CommunicatorObserverIPtr::dynamicCast(communicator->getObserver());
161  if (o)
162  {
163  _observer = new TopicManagerObserverI(o->getFacet());
164  }
165  }
166  catch (...)
167  {
168  shutdown();
169  destroy();
170  __setNoDelete(false);
171 
172  throw;
173  }
174  __setNoDelete(false);
175 }
176 
177 void
179 {
180  _node = node;
181 }
182 
183 string
185 {
186  return _instanceName;
187 }
188 
189 string
191 {
192  return _serviceName;
193 }
194 
197 {
198  return _communicator;
199 }
200 
203 {
204  return _communicator->getProperties();
205 }
206 
209 {
210  return _publishAdapter;
211 }
212 
215 {
216  return _topicAdapter;
217 }
218 
221 {
222  return _nodeAdapter;
223 }
224 
227 {
228  return _observers;
229 }
230 
231 NodeIPtr
233 {
234  return _node;
235 }
236 
237 NodePrx
239 {
240  return _nodeProxy;
241 }
242 
245 {
246  return _traceLevels;
247 }
248 
251 {
252  return _batchFlusher;
253 }
254 
257 {
258  return _timer;
259 }
260 
261 Ice::ObjectPrx
263 {
264  return _topicReplicaProxy;
265 }
266 
267 Ice::ObjectPrx
269 {
270  return _publisherReplicaProxy;
271 }
272 
275 {
276  return _observer;
277 }
278 
281 {
282  return _topicReaper;
283 }
284 
287 {
288  return _discardInterval;
289 }
290 
293 {
294  return _flushInterval;
295 }
296 
297 int
299 {
300  return _sendTimeout;
301 }
302 
303 int
305 {
306  return _sendQueueSizeMax;
307 }
308 
311 {
312  return _sendQueueSizeMaxPolicy;
313 }
314 
315 void
317 {
318  if (_node)
319  {
320  _node->destroy();
321  assert(_nodeAdapter);
322  _nodeAdapter->destroy();
323  }
324 
325  _topicAdapter->destroy();
326  _publishAdapter->destroy();
327 
328  if (_timer)
329  {
330  _timer->destroy();
331  }
332 }
333 
334 void
336 {
337  if (_batchFlusher)
338  {
339  _batchFlusher->destroy();
340  }
341 
342  // The node instance must be cleared as the node holds the
343  // replica (TopicManager) which holds the instance causing a
344  // cyclic reference.
345  _node = 0;
346  //
347  // The observer instance must be cleared as it holds the
348  // TopicManagerImpl which hodlds the instance causing a
349  // cyclic reference.
350  //
351  _observer = 0;
352 }
IceStorm::Instance::nodeAdapter
Ice::ObjectAdapterPtr nodeAdapter() const
Definition: Instance.cpp:220
IceStorm
Definition: DBTypes.ice:22
IceStorm::Instance::topicAdapter
Ice::ObjectAdapterPtr topicAdapter() const
Definition: Instance.cpp:214
IceDB
Definition: IceDB.h:42
IceDB::ReadWriteTxn
Definition: IceDB.h:206
IceDB::IceContext::encoding
Ice::EncodingVersion encoding
Definition: IceDB.h:502
IceStorm::Instance::node
IceStormElection::NodeIPtr node() const
Definition: Instance.cpp:232
IceStorm::Instance::DropEvents
@ DropEvents
Definition: Instance.h:66
IceDB::Txn::commit
void commit()
IceStorm::Instance::traceLevels
TraceLevelsPtr traceLevels() const
Definition: Instance.cpp:244
IceStorm::Instance::sendQueueSizeMax
int sendQueueSizeMax() const
Definition: Instance.cpp:304
IceStorm::Instance::setNode
void setNode(const IceStormElection::NodeIPtr &)
Definition: Instance.cpp:178
IceStormInternal::dbContext
IceDB::IceContext dbContext
Definition: Util.cpp:19
IceStorm::Instance::Instance
Instance(const std::string &, const std::string &, const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &=0, const IceStormElection::NodePrx &=0)
Definition: Instance.cpp:92
IceStormElection::Observers
Definition: Observers.h:33
IceStorm::Instance::communicator
Ice::CommunicatorPtr communicator() const
Definition: Instance.cpp:196
IceStorm::Instance::publishAdapter
Ice::ObjectAdapterPtr publishAdapter() const
Definition: Instance.cpp:208
IceStorm::Instance::destroy
virtual void destroy()
Definition: Instance.cpp:335
IceUtil
Definition: Instance.h:21
IceDB::getMapSize
ICE_DB_API size_t getMapSize(int)
IceStorm::Instance::topicReaper
TopicReaperPtr topicReaper() const
Definition: Instance.cpp:280
IceStorm::TopicManagerObserverI
Definition: InstrumentationI.h:40
IceInternal::Handle< ::Ice::Communicator >
IceStorm::Instance::flushInterval
IceUtil::Time flushInterval() const
Definition: Instance.cpp:292
IceDB::IceContext::communicator
Ice::CommunicatorPtr communicator
Definition: IceDB.h:501
IceStorm::Instance
Definition: Instance.h:59
IceStorm::Instance::serviceName
std::string serviceName() const
Definition: Instance.cpp:190
IceStorm::Instance::nodeProxy
IceStormElection::NodePrx nodeProxy() const
Definition: Instance.cpp:238
IceStormElection
Definition: DBTypes.ice:17
IceStorm::Instance::properties
Ice::PropertiesPtr properties() const
Definition: Instance.cpp:202
IceStormInternal::compareSubscriberRecordKey
int compareSubscriberRecordKey(const MDB_val *v1, const MDB_val *v2)
Definition: Util.cpp:74
IceStorm::Instance::instanceName
std::string instanceName() const
Definition: Instance.cpp:184
IceDB::Env::close
void close()
InstrumentationI.h
IceStorm::TopicReaper
Definition: Instance.h:46
IceStorm::Instance::publisherReplicaProxy
Ice::ObjectPrx publisherReplicaProxy() const
Definition: Instance.cpp:268
IceDB::IceContext
Definition: IceDB.h:499
IceStorm::Instance::SendQueueSizeMaxPolicy
SendQueueSizeMaxPolicy
Definition: Instance.h:63
IceStorm::Instance::RemoveSubscriber
@ RemoveSubscriber
Definition: Instance.h:65
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
IceStorm::LLUMap
IceDB::Dbi< std::string, IceStormElection::LogUpdate, IceDB::IceContext, Ice::OutputStream > LLUMap
Definition: Util.h:29
IceStorm::Instance::observer
IceStorm::Instrumentation::TopicManagerObserverPtr observer() const
Definition: Instance.cpp:274
IceStorm::TraceLevels
Definition: TraceLevels.h:21
IceStorm::Instance::sendQueueSizeMaxPolicy
SendQueueSizeMaxPolicy sendQueueSizeMaxPolicy() const
Definition: Instance.cpp:310
IceStorm::Instance::shutdown
void shutdown()
Definition: Instance.cpp:316
IceStorm::SubscriberMap
IceDB::Dbi< IceStorm::SubscriberRecordKey, IceStorm::SubscriberRecord, IceDB::IceContext, Ice::OutputStream > SubscriberMap
Definition: Util.h:28
IceStorm::Instance::sendTimeout
int sendTimeout() const
Definition: Instance.cpp:298
InstrumentationI.h
TraceLevels.h
IceStorm::Instance::discardInterval
IceUtil::Time discardInterval() const
Definition: Instance.cpp:286
std
Definition: Application.h:66
IceStormInternal
Definition: Instance.cpp:27
IceUtil::Handle< NodeI >
IceInternal::ProxyHandle< ::IceProxy::IceStormElection::Node >
IceStorm::Instance::observers
IceStormElection::ObserversPtr observers() const
Definition: Instance.cpp:226
Observers.h
TraceUtil.h
IceStorm::Instance::timer
IceUtil::TimerPtr timer() const
Definition: Instance.cpp:256
IceStorm::PersistentInstance::destroy
virtual void destroy()
Definition: Instance.cpp:84
NodeI.h
Instance.h
IceStorm::Instance::topicReplicaProxy
Ice::ObjectPrx topicReplicaProxy() const
Definition: Instance.cpp:262
IceStorm::Instance::batchFlusher
IceUtil::TimerPtr batchFlusher() const
Definition: Instance.cpp:250