Instance.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <Ice/Communicator.h>
11 #include <Ice/InstrumentationI.h>
12 #include <Ice/Properties.h>
13 #include <Ice/TraceUtil.h>
14 #include <IceStorm/Instance.h>
16 #include <IceStorm/NodeI.h>
17 #include <IceStorm/Observers.h>
18 #include <IceStorm/TraceLevels.h>
19 #include <IceUtil/Timer.h>
20 
21 using namespace std;
22 using namespace IceStorm;
23 using namespace IceStormElection;
24 using namespace IceStormInternal;
25 
27 {
29 }
30 
31 void
32 TopicReaper::add(const string& name)
33 {
34  Lock sync(*this);
35  _topics.push_back(name);
36 }
37 
38 vector<string>
39 TopicReaper::consumeReapedTopics()
40 {
41  Lock sync(*this);
42  vector<string> reaped;
43  reaped.swap(_topics);
44  return reaped;
45 }
46 
47 PersistentInstance::PersistentInstance(const string& instanceName,
48  const string& name,
49  const Ice::CommunicatorPtr& communicator,
50  const Ice::ObjectAdapterPtr& publishAdapter,
51  const Ice::ObjectAdapterPtr& topicAdapter,
52  const Ice::ObjectAdapterPtr& nodeAdapter,
53  const NodePrx& nodeProxy) :
54  Instance(instanceName,
55  name,
56  communicator,
57  publishAdapter,
58  topicAdapter,
59  nodeAdapter,
60  nodeProxy),
61  _dbLock(communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name) +
62  "/icedb.lock"),
63  _dbEnv(
64  communicator->getProperties()->getPropertyWithDefault(name + ".LMDB.Path", name),
65  2,
66  IceDB::getMapSize(communicator->getProperties()->getPropertyAsInt(name + ".LMDB.MapSize")))
67 {
68  try
69  {
71  dbContext.encoding.minor = 1;
72  dbContext.encoding.major = 1;
73 
74  IceDB::ReadWriteTxn txn(_dbEnv);
75 
76  _lluMap = LLUMap(txn, "llu", dbContext, MDB_CREATE);
77  _subscriberMap =
78  SubscriberMap(txn, "subscribers", dbContext, MDB_CREATE, compareSubscriberRecordKey);
79 
80  txn.commit();
81  }
82  catch (...)
83  {
84  shutdown();
85  destroy();
86 
87  throw;
88  }
89 }
90 
91 void
93 {
94  _dbEnv.close();
96 
98 }
99 
100 Instance::Instance(const string& instanceName,
101  const string& name,
102  const Ice::CommunicatorPtr& communicator,
103  const Ice::ObjectAdapterPtr& publishAdapter,
104  const Ice::ObjectAdapterPtr& topicAdapter,
105  const Ice::ObjectAdapterPtr& nodeAdapter,
106  const NodePrx& nodeProxy) :
107  _instanceName(instanceName),
108  _serviceName(name),
109  _communicator(communicator),
110  _publishAdapter(publishAdapter),
111  _topicAdapter(topicAdapter),
112  _nodeAdapter(nodeAdapter),
113  _nodeProxy(nodeProxy),
114  _traceLevels(new TraceLevels(name, communicator->getProperties(), communicator->getLogger())),
115  _discardInterval(IceUtil::Time::seconds(
116  communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Discard.Interval",
117  60))), // default one minute.
118  _flushInterval(IceUtil::Time::milliSeconds(
119  communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Flush.Timeout",
120  1000))), // default one second.
121  // default one minute.
122  _sendTimeout(communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.Timeout",
123  60 * 1000)),
124  _sendQueueSizeMax(
125  communicator->getProperties()->getPropertyAsIntWithDefault(name + ".Send.QueueSizeMax",
126  -1)),
127  _sendQueueSizeMaxPolicy(RemoveSubscriber),
128  _topicReaper(new TopicReaper())
129 {
130  try
131  {
132  __setNoDelete(true);
133 
134  Ice::PropertiesPtr properties = communicator->getProperties();
135  if (properties->getProperty(name + ".TopicManager.AdapterId").empty())
136  {
137  string p = properties->getProperty(name + ".ReplicatedTopicManagerEndpoints");
138  if (!p.empty())
139  {
140  const_cast<Ice::ObjectPrx&>(_topicReplicaProxy) =
141  communicator->stringToProxy("dummy:" + p);
142  }
143  p = properties->getProperty(name + ".ReplicatedPublishEndpoints");
144  if (!p.empty())
145  {
146  const_cast<Ice::ObjectPrx&>(_publisherReplicaProxy) =
147  communicator->stringToProxy("dummy:" + p);
148  }
149  }
150  _observers = new Observers(this);
151  _batchFlusher = new IceUtil::Timer();
152  _timer = new IceUtil::Timer();
153 
154  string policy = properties->getProperty(name + ".Send.QueueSizeMaxPolicy");
155  if (policy == "RemoveSubscriber")
156  {
157  const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = RemoveSubscriber;
158  }
159  else if (policy == "DropEvents")
160  {
161  const_cast<SendQueueSizeMaxPolicy&>(_sendQueueSizeMaxPolicy) = DropEvents;
162  }
163  else if (!policy.empty())
164  {
165  Ice::Warning warn(_traceLevels->logger);
166  warn << "invalid value `" << policy << "' for `" << name << ".Send.QueueSizeMaxPolicy'";
167  }
168 
169  //
170  // If an Ice metrics observer is setup on the communicator, also
171  // enable metrics for IceStorm.
172  //
173  IceInternal::CommunicatorObserverIPtr o =
174  IceInternal::CommunicatorObserverIPtr::dynamicCast(communicator->getObserver());
175  if (o)
176  {
177  _observer = new TopicManagerObserverI(o->getFacet());
178  }
179  }
180  catch (...)
181  {
182  shutdown();
183  destroy();
184  __setNoDelete(false);
185 
186  throw;
187  }
188  __setNoDelete(false);
189 }
190 
191 void
193 {
194  _node = node;
195 }
196 
197 string
199 {
200  return _instanceName;
201 }
202 
203 string
205 {
206  return _serviceName;
207 }
208 
211 {
212  return _communicator;
213 }
214 
217 {
218  return _communicator->getProperties();
219 }
220 
223 {
224  return _publishAdapter;
225 }
226 
229 {
230  return _topicAdapter;
231 }
232 
235 {
236  return _nodeAdapter;
237 }
238 
241 {
242  return _observers;
243 }
244 
245 NodeIPtr
247 {
248  return _node;
249 }
250 
251 NodePrx
253 {
254  return _nodeProxy;
255 }
256 
259 {
260  return _traceLevels;
261 }
262 
265 {
266  return _batchFlusher;
267 }
268 
271 {
272  return _timer;
273 }
274 
275 Ice::ObjectPrx
277 {
278  return _topicReplicaProxy;
279 }
280 
281 Ice::ObjectPrx
283 {
284  return _publisherReplicaProxy;
285 }
286 
289 {
290  return _observer;
291 }
292 
295 {
296  return _topicReaper;
297 }
298 
301 {
302  return _discardInterval;
303 }
304 
307 {
308  return _flushInterval;
309 }
310 
311 int
313 {
314  return _sendTimeout;
315 }
316 
317 int
319 {
320  return _sendQueueSizeMax;
321 }
322 
325 {
326  return _sendQueueSizeMaxPolicy;
327 }
328 
329 void
331 {
332  if (_node)
333  {
334  _node->destroy();
335  assert(_nodeAdapter);
336  _nodeAdapter->destroy();
337  }
338 
339  _topicAdapter->destroy();
340  _publishAdapter->destroy();
341 
342  if (_timer)
343  {
344  _timer->destroy();
345  }
346 }
347 
348 void
350 {
351  if (_batchFlusher)
352  {
353  _batchFlusher->destroy();
354  }
355 
356  // The node instance must be cleared as the node holds the
357  // replica (TopicManager) which holds the instance causing a
358  // cyclic reference.
359  _node = 0;
360  //
361  // The observer instance must be cleared as it holds the
362  // TopicManagerImpl which hodlds the instance causing a
363  // cyclic reference.
364  //
365  _observer = 0;
366 }
IceStorm::Instance::nodeAdapter
Ice::ObjectAdapterPtr nodeAdapter() const
Definition: Instance.cpp:234
IceStorm
Definition: DBTypes.ice:22
IceStorm::Instance::topicAdapter
Ice::ObjectAdapterPtr topicAdapter() const
Definition: Instance.cpp:228
IceDB
Definition: IceDB.h:42
IceDB::ReadWriteTxn
Definition: IceDB.h:194
IceDB::IceContext::encoding
Ice::EncodingVersion encoding
Definition: IceDB.h:490
IceStorm::Instance::node
IceStormElection::NodeIPtr node() const
Definition: Instance.cpp:246
IceStorm::Instance::DropEvents
@ DropEvents
Definition: Instance.h:64
IceDB::Txn::commit
void commit()
IceStorm::Instance::traceLevels
TraceLevelsPtr traceLevels() const
Definition: Instance.cpp:258
IceStorm::Instance::sendQueueSizeMax
int sendQueueSizeMax() const
Definition: Instance.cpp:318
IceStorm::Instance::setNode
void setNode(const IceStormElection::NodeIPtr &)
Definition: Instance.cpp:192
IceStormInternal::dbContext
IceDB::IceContext dbContext
Definition: Util.cpp:19
IceStorm::Instance::Instance
Instance(const std::string &, const std::string &, const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &=0, const IceStormElection::NodePrx &=0)
Definition: Instance.cpp:100
IceStormElection::Observers
Definition: Observers.h:33
IceStorm::Instance::communicator
Ice::CommunicatorPtr communicator() const
Definition: Instance.cpp:210
IceStorm::Instance::publishAdapter
Ice::ObjectAdapterPtr publishAdapter() const
Definition: Instance.cpp:222
IceStorm::Instance::destroy
virtual void destroy()
Definition: Instance.cpp:349
IceUtil
Definition: Instance.h:21
IceDB::getMapSize
ICE_DB_API size_t getMapSize(int)
IceStorm::Instance::topicReaper
TopicReaperPtr topicReaper() const
Definition: Instance.cpp:294
IceStorm::TopicManagerObserverI
Definition: InstrumentationI.h:39
IceInternal::Handle<::Ice::Communicator >
IceStorm::Instance::flushInterval
IceUtil::Time flushInterval() const
Definition: Instance.cpp:306
IceDB::IceContext::communicator
Ice::CommunicatorPtr communicator
Definition: IceDB.h:489
IceStorm::Instance
Definition: Instance.h:58
IceStorm::Instance::serviceName
std::string serviceName() const
Definition: Instance.cpp:204
IceStorm::Instance::nodeProxy
IceStormElection::NodePrx nodeProxy() const
Definition: Instance.cpp:252
IceStormElection
Definition: DBTypes.ice:17
IceStorm::Instance::properties
Ice::PropertiesPtr properties() const
Definition: Instance.cpp:216
IceStormInternal::compareSubscriberRecordKey
int compareSubscriberRecordKey(const MDB_val *v1, const MDB_val *v2)
Definition: Util.cpp:74
IceStorm::Instance::instanceName
std::string instanceName() const
Definition: Instance.cpp:198
IceDB::Env::close
void close()
InstrumentationI.h
IceStorm::TopicReaper
Definition: Instance.h:46
IceStorm::Instance::publisherReplicaProxy
Ice::ObjectPrx publisherReplicaProxy() const
Definition: Instance.cpp:282
IceDB::IceContext
Definition: IceDB.h:487
IceStorm::Instance::SendQueueSizeMaxPolicy
SendQueueSizeMaxPolicy
Definition: Instance.h:61
IceStorm::Instance::RemoveSubscriber
@ RemoveSubscriber
Definition: Instance.h:63
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
IceStorm::Instance::observer
IceStorm::Instrumentation::TopicManagerObserverPtr observer() const
Definition: Instance.cpp:288
IceStorm::TraceLevels
Definition: TraceLevels.h:21
IceStorm::Instance::sendQueueSizeMaxPolicy
SendQueueSizeMaxPolicy sendQueueSizeMaxPolicy() const
Definition: Instance.cpp:324
IceStorm::Instance::shutdown
void shutdown()
Definition: Instance.cpp:330
IceStorm::SubscriberMap
IceDB::Dbi< IceStorm::SubscriberRecordKey, IceStorm::SubscriberRecord, IceDB::IceContext, Ice::OutputStream > SubscriberMap
Definition: Util.h:31
IceStorm::Instance::sendTimeout
int sendTimeout() const
Definition: Instance.cpp:312
InstrumentationI.h
TraceLevels.h
IceStorm::Instance::discardInterval
IceUtil::Time discardInterval() const
Definition: Instance.cpp:300
std
Definition: Application.h:66
IceStormInternal
Definition: Instance.cpp:26
IceUtil::Handle< NodeI >
IceInternal::ProxyHandle<::IceProxy::IceStormElection::Node >
IceStorm::Instance::observers
IceStormElection::ObserversPtr observers() const
Definition: Instance.cpp:240
Observers.h
TraceUtil.h
IceStorm::Instance::timer
IceUtil::TimerPtr timer() const
Definition: Instance.cpp:270
IceStorm::PersistentInstance::destroy
virtual void destroy()
Definition: Instance.cpp:92
IceStorm::LLUMap
IceDB::Dbi< std::string, IceStormElection::LogUpdate, IceDB::IceContext, Ice::OutputStream > LLUMap
Definition: Util.h:34
NodeI.h
Instance.h
IceStorm::Instance::topicReplicaProxy
Ice::ObjectPrx topicReplicaProxy() const
Definition: Instance.cpp:276
IceStorm::Instance::batchFlusher
IceUtil::TimerPtr batchFlusher() const
Definition: Instance.cpp:264