Service.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#define ICESTORM_SERVICE_API_EXPORTS
11
12//#include <Ice/PluginManagerI.h> // For loadPlugin
13
14#include <IceGrid/Registry.h>
15#include <IceStorm/Instance.h>
16#include <IceStorm/NodeI.h>
17#include <IceStorm/Observers.h>
18#include <IceStorm/Service.h>
19#include <IceStorm/TopicI.h>
24#include <IceStorm/Util.h>
25#include <IceUtil/StringUtil.h>
26
27using namespace std;
28using namespace Ice;
29using namespace IceStorm;
30using namespace IceStormInternal;
31using namespace IceStormElection;
32
33namespace
34{
35
36 class ServiceI : public IceStormInternal::Service
37 {
38 public:
39 ServiceI();
40 virtual ~ServiceI();
41
42 virtual void start(const string&, const CommunicatorPtr&, const StringSeq&);
43
44 virtual void start(const CommunicatorPtr&,
45 const ObjectAdapterPtr&,
46 const ObjectAdapterPtr&,
47 const string&,
48 const Ice::Identity&,
49 const string&);
50
51 virtual TopicManagerPrx getTopicManager() const;
52
53 virtual void stop();
54
55 private:
56 void createDbEnv(const Ice::CommunicatorPtr&);
57 void validateProperties(const string&, const PropertiesPtr&, const LoggerPtr&);
58
59 TopicManagerImplPtr _manager;
60 TransientTopicManagerImplPtr _transientManager;
61 TopicManagerPrx _managerProxy;
62 InstancePtr _instance;
63 };
64
65 class FinderI : public IceStorm::Finder
66 {
67 public:
68 FinderI(const TopicManagerPrx& topicManager) : _topicManager(topicManager)
69 {
70 }
71
72 virtual TopicManagerPrx
73 getTopicManager(const Ice::Current&)
74 {
75 return _topicManager;
76 }
77
78 private:
79 const TopicManagerPrx _topicManager;
80 };
81
82} // namespace
83
84extern "C"
85{
86
87 ICESTORM_SERVICE_API ::IceBox::Service*
89 {
90 return new ServiceI;
91 }
92}
93
96 const ObjectAdapterPtr& topicAdapter,
97 const ObjectAdapterPtr& publishAdapter,
98 const string& name,
99 const Ice::Identity& id,
100 const string& dbEnv)
101{
102 ServiceI* service = new ServiceI;
103 ServicePtr svc = service;
104 service->start(communicator, topicAdapter, publishAdapter, name, id, dbEnv);
105 return svc;
106}
107
108ServiceI::ServiceI()
109{
110}
111
112ServiceI::~ServiceI()
113{
114}
115
116void
117ServiceI::start(const string& name, const CommunicatorPtr& communicator, const StringSeq& /*args*/)
118{
119 PropertiesPtr properties = communicator->getProperties();
120
121 validateProperties(name, properties, communicator->getLogger());
122
123 int id = properties->getPropertyAsIntWithDefault(name + ".NodeId", -1);
124
125 // If we are using a replicated deployment and if the topic
126 // manager thread pool max size is not set then ensure it is set
127 // to some suitably high number. This ensures no deadlocks in the
128 // replicated case due to call forwarding from replicas to
129 // coordinators.
130 if (id != -1 && properties->getProperty(name + ".TopicManager.ThreadPool.SizeMax").empty())
131 {
132 properties->setProperty(name + ".TopicManager.ThreadPool.SizeMax", "100");
133 }
134
135 Ice::ObjectAdapterPtr topicAdapter = communicator->createObjectAdapter(name + ".TopicManager");
136 Ice::ObjectAdapterPtr publishAdapter = communicator->createObjectAdapter(name + ".Publish");
137
138 //
139 // We use the name of the service for the name of the database environment.
140 //
141 string instanceName = properties->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
142 Identity topicManagerId;
143 topicManagerId.category = instanceName;
144 topicManagerId.name = "TopicManager";
145
146 if (properties->getPropertyAsIntWithDefault(name + ".Transient", 0) > 0)
147 {
148 _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
149 try
150 {
151 TransientTopicManagerImplPtr manager = new TransientTopicManagerImpl(_instance);
152 _managerProxy =
153 TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, topicManagerId));
154 }
155 catch (const Ice::Exception& ex)
156 {
157 _instance = 0;
158
159 LoggerOutputBase s;
160 s << "exception while starting IceStorm service " << name << ":\n";
161 s << ex;
162
163 IceBox::FailureException e(__FILE__, __LINE__);
164 e.reason = s.str();
165 throw e;
166 }
167 topicAdapter->activate();
168 publishAdapter->activate();
169 return;
170 }
171
172 if (id == -1) // No replication.
173 {
174 try
175 {
176 PersistentInstancePtr instance = new PersistentInstance(
177 instanceName, name, communicator, publishAdapter, topicAdapter);
178 _instance = instance;
179
180 _manager = new TopicManagerImpl(instance);
181 _managerProxy = TopicManagerPrx::uncheckedCast(
182 topicAdapter->add(_manager->getServant(), topicManagerId));
183 }
184 catch (const IceUtil::Exception& ex)
185 {
186 _instance = 0;
187
188 LoggerOutputBase s;
189 s << "exception while starting IceStorm service " << name << ":\n";
190 s << ex;
191
192 IceBox::FailureException e(__FILE__, __LINE__);
193 e.reason = s.str();
194 throw e;
195 }
196 }
197 else
198 {
199 // Here we want to create a map of id -> election node
200 // proxies.
201 map<int, NodePrx> nodes;
202
203 string topicManagerAdapterId = properties->getProperty(name + ".TopicManager.AdapterId");
204
205 // We support two possible deployments. The first is a manual
206 // deployment, the second is IceGrid.
207 //
208 // Here we check for the manual deployment
209 const string prefix = name + ".Nodes.";
210 Ice::PropertyDict props = properties->getPropertiesForPrefix(prefix);
211 if (!props.empty())
212 {
213 for (Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
214 {
215 int nodeid = atoi(p->first.substr(prefix.size()).c_str());
216 nodes[nodeid] = NodePrx::uncheckedCast(communicator->propertyToProxy(p->first));
217 }
218 }
219 else
220 {
221 // If adapter id's are defined for the topic manager or
222 // node adapters then we consider this an IceGrid based
223 // deployment.
224 string nodeAdapterId = properties->getProperty(name + ".Node.AdapterId");
225
226 // Validate first that the adapter ids match for the node
227 // and the topic manager otherwise some other deployment
228 // is being used.
229 const string suffix = ".TopicManager";
230 if (topicManagerAdapterId.empty() || nodeAdapterId.empty() ||
231 topicManagerAdapterId.replace(
232 topicManagerAdapterId.find(suffix), suffix.size(), ".Node") != nodeAdapterId)
233 {
234 Ice::Error error(communicator->getLogger());
235 error << "deployment error: `" << topicManagerAdapterId
236 << "' prefix does not match `" << nodeAdapterId << "'";
237 throw IceBox::FailureException(
238 __FILE__, __LINE__, "IceGrid deployment is incorrect");
239 }
240
241 // Determine the set of node id and node proxies.
242 //
243 // This is determined by locating all topic manager
244 // replicas, and then working out the node for that
245 // replica.
246 //
247 // We work out the node id by removing the instance
248 // name. The node id must follow.
249 //
250 IceGrid::LocatorPrx locator =
251 IceGrid::LocatorPrx::checkedCast(communicator->getDefaultLocator());
252 assert(locator);
253 IceGrid::QueryPrx query = locator->getLocalQuery();
254 Ice::ObjectProxySeq replicas =
255 query->findAllReplicas(communicator->stringToProxy(instanceName + "/TopicManager"));
256
257 for (Ice::ObjectProxySeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
258 {
259 string adapterid = (*p)->ice_getAdapterId();
260
261 // Replace TopicManager with the node endpoint.
262 adapterid = adapterid.replace(adapterid.find(suffix), suffix.size(), ".Node");
263
264 // The adapter id must start with the instance name.
265 if (adapterid.find(instanceName) != 0)
266 {
267 Ice::Error error(communicator->getLogger());
268 error << "deployment error: `" << adapterid << "' does not start with `"
269 << instanceName << "'";
270 throw IceBox::FailureException(
271 __FILE__, __LINE__, "IceGrid deployment is incorrect");
272 }
273
274 // The node id follows. We find the first digit (the
275 // start of the node id, and then the end of the
276 // digits).
277 string::size_type start = instanceName.size();
278 while (start < adapterid.size() && !IceUtilInternal::isDigit(adapterid[start]))
279 {
280 ++start;
281 }
282 string::size_type end = start;
283 while (end < adapterid.size() && IceUtilInternal::isDigit(adapterid[end]))
284 {
285 ++end;
286 }
287 if (start == end)
288 {
289 // We must have at least one digit, otherwise there is
290 // some sort of deployment error.
291 Ice::Error error(communicator->getLogger());
292 error
293 << "deployment error: node id does not follow instance name. instance name:"
294 << instanceName << " adapter id: " << adapterid;
295 throw IceBox::FailureException(
296 __FILE__, __LINE__, "IceGrid deployment is incorrect");
297 }
298
299 int nodeid = atoi(adapterid.substr(start, end - start).c_str());
300 ostringstream os;
301 os << "node" << nodeid;
302 Ice::Identity id;
303 id.category = instanceName;
304 id.name = os.str();
305
306 nodes[nodeid] =
307 NodePrx::uncheckedCast((*p)->ice_adapterId(adapterid)->ice_identity(id));
308 }
309 }
310
311 if (nodes.size() < 3)
312 {
313 Ice::Error error(communicator->getLogger());
314 error << "Replication requires at least 3 Nodes";
315 throw IceBox::FailureException(
316 __FILE__, __LINE__, "Replication requires at least 3 Nodes");
317 }
318
319 try
320 {
321 // If the node thread pool size is not set then initialize
322 // to the number of nodes + 1 and disable thread pool size
323 // warnings.
324 if (properties->getProperty(name + ".Node.ThreadPool.Size").empty())
325 {
326 ostringstream os;
327 os << nodes.size() + 1;
328 properties->setProperty(name + ".Node.ThreadPool.Size", os.str());
329 properties->setProperty(name + ".Node.ThreadPool.SizeWarn", "0");
330 }
331 if (properties->getProperty(name + ".Node.MessageSizeMax").empty())
332 {
333 properties->setProperty(name + ".Node.MessageSizeMax",
334 "0"); // No limit on data exchanged internally
335 }
336
337 Ice::ObjectAdapterPtr nodeAdapter = communicator->createObjectAdapter(name + ".Node");
338
339 PersistentInstancePtr instance = new PersistentInstance(instanceName,
340 name,
341 communicator,
342 publishAdapter,
343 topicAdapter,
344 nodeAdapter,
345 nodes[id]);
346 _instance = instance;
347
348 _instance->observers()->setMajority(static_cast<unsigned int>(nodes.size()) / 2);
349
350 // Trace replication information.
351 TraceLevelsPtr traceLevels = _instance->traceLevels();
352 if (traceLevels->election > 0)
353 {
354 Ice::Trace out(traceLevels->logger, traceLevels->electionCat);
355 out << "I am node " << id << "\n";
356 for (map<int, NodePrx>::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
357 {
358 out << "\tnode: " << p->first << " proxy: " << p->second->ice_toString()
359 << "\n";
360 }
361 }
362
363 if (topicManagerAdapterId.empty())
364 {
365 // We're not using an IceGrid deployment. Here we need
366 // a proxy which is used to create proxies to the
367 // replicas later.
368 _managerProxy =
369 TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId));
370 }
371 else
372 {
373 // If we're using IceGrid deployment we need to create
374 // indirect proxies.
375 _managerProxy = TopicManagerPrx::uncheckedCast(
376 topicAdapter->createIndirectProxy(topicManagerId));
377 }
378
379 _manager = new TopicManagerImpl(instance);
380 topicAdapter->add(_manager->getServant(), topicManagerId);
381
382 ostringstream os; // The node object identity.
383 os << "node" << id;
384 Ice::Identity nodeid;
385 nodeid.category = instanceName;
386 nodeid.name = os.str();
387
388 NodeIPtr node = new NodeI(_instance, _manager, _managerProxy, id, nodes);
389 _instance->setNode(node);
390 nodeAdapter->add(node, nodeid);
391 nodeAdapter->activate();
392
393 node->start();
394 }
395 catch (const IceUtil::Exception& ex)
396 {
397 _instance = 0;
398
399 LoggerOutputBase s;
400 s << "exception while starting IceStorm service " << name << ":\n";
401 s << ex;
402
403 IceBox::FailureException e(__FILE__, __LINE__);
404 e.reason = s.str();
405 throw e;
406 }
407 }
408
409 topicAdapter->add(
410 new FinderI(TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId))),
411 stringToIdentity("IceStorm/Finder"));
412
413 topicAdapter->activate();
414 publishAdapter->activate();
415}
416
417void
418ServiceI::start(const CommunicatorPtr& communicator,
419 const ObjectAdapterPtr& topicAdapter,
420 const ObjectAdapterPtr& publishAdapter,
421 const string& name,
422 const Ice::Identity& id,
423 const string& /*dbEnv*/)
424{
425 //
426 // For IceGrid we don't validate the properties as all sorts of
427 // non-IceStorm properties are included in the prefix.
428 //
429 //validateProperties(name, communicator->getProperties(), communicator->getLogger());
430
431 // This is for IceGrid only and as such we use a transient
432 // implementation of IceStorm.
433 string instanceName =
434 communicator->getProperties()->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
435 _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
436
437 try
438 {
439 TransientTopicManagerImplPtr manager = new TransientTopicManagerImpl(_instance);
440 _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, id));
441 }
442 catch (const Ice::Exception& ex)
443 {
444 _instance = 0;
445 LoggerOutputBase s;
446 s << "exception while starting IceStorm service " << name << ":\n";
447 s << ex;
448
449 IceBox::FailureException e(__FILE__, __LINE__);
450 e.reason = s.str();
451 throw e;
452 }
453}
454
456ServiceI::getTopicManager() const
457{
458 return _managerProxy;
459}
460
461void
462ServiceI::stop()
463{
464 // Shutdown the instance. This deactivates all OAs.
465 _instance->shutdown();
466
467 //
468 // It's necessary to reap all destroyed topics on shutdown.
469 //
470 if (_manager)
471 {
472 _manager->shutdown();
473 }
474 if (_transientManager)
475 {
476 _transientManager->shutdown();
477 }
478
479 //
480 // Destroy the instance. This step must occur last.
481 //
482 _instance->destroy();
483}
484
485void
486ServiceI::validateProperties(const string& name,
487 const PropertiesPtr& properties,
488 const LoggerPtr& logger)
489{
490 static const string suffixes[] = {"ReplicatedTopicManagerEndpoints",
491 "ReplicatedPublishEndpoints",
492 "Nodes.*",
493 "Transient",
494 "NodeId",
495 "Flush.Timeout",
496 "InstanceName",
497 "Election.MasterTimeout",
498 "Election.ElectionTimeout",
499 "Election.ResponseTimeout",
500 "Publish.AdapterId",
501 "Publish.Endpoints",
502 "Publish.Locator",
503 "Publish.PublishedEndpoints",
504 "Publish.ReplicaGroupId",
505 "Publish.Router",
506 "Publish.ThreadPool.Size",
507 "Publish.ThreadPool.SizeMax",
508 "Publish.ThreadPool.SizeWarn",
509 "Publish.ThreadPool.StackSize",
510 "Node.AdapterId",
511 "Node.Endpoints",
512 "Node.Locator",
513 "Node.PublishedEndpoints",
514 "Node.ReplicaGroupId",
515 "Node.Router",
516 "Node.ThreadPool.Size",
517 "Node.ThreadPool.SizeMax",
518 "Node.ThreadPool.SizeWarn",
519 "Node.ThreadPool.StackSize",
520 "TopicManager.AdapterId",
521 "TopicManager.Endpoints",
522 "TopicManager.Locator",
523 "TopicManager.Proxy",
524 "TopicManager.Proxy.EndpointSelection",
525 "TopicManager.Proxy.ConnectionCached",
526 "TopicManager.Proxy.PreferSecure",
527 "TopicManager.Proxy.LocatorCacheTimeout",
528 "TopicManager.Proxy.Locator",
529 "TopicManager.Proxy.Router",
530 "TopicManager.Proxy.CollocationOptimization",
531 "TopicManager.PublishedEndpoints",
532 "TopicManager.ReplicaGroupId",
533 "TopicManager.Router",
534 "TopicManager.ThreadPool.Size",
535 "TopicManager.ThreadPool.SizeMax",
536 "TopicManager.ThreadPool.SizeWarn",
537 "TopicManager.ThreadPool.StackSize",
538 "Trace.Election",
539 "Trace.Replication",
540 "Trace.Subscriber",
541 "Trace.Topic",
542 "Trace.TopicManager",
543 "Send.Timeout",
544 "Send.QueueSizeMax",
545 "Send.QueueSizeMaxPolicy",
546 "Discard.Interval",
547 "LMDB.Path",
548 "LMDB.MapSize"};
549
550 vector<string> unknownProps;
551 string prefix = name + ".";
552 PropertyDict props = properties->getPropertiesForPrefix(prefix);
553 for (PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
554 {
555 bool valid = false;
556 for (unsigned int i = 0; i < sizeof(suffixes) / sizeof(*suffixes); ++i)
557 {
558 string prop = prefix + suffixes[i];
559 if (IceUtilInternal::match(p->first, prop))
560 {
561 valid = true;
562 break;
563 }
564 }
565 if (!valid)
566 {
567 unknownProps.push_back(p->first);
568 }
569 }
570
571 if (!unknownProps.empty())
572 {
573 Warning out(logger);
574 out << "found unknown properties for IceStorm service '" << name << "':";
575 for (vector<string>::const_iterator p = unknownProps.begin(); p != unknownProps.end(); ++p)
576 {
577 out << "\n " << *p;
578 }
579 }
580}
ICESTORM_SERVICE_API::IceBox::Service * createIceStorm(CommunicatorPtr communicator)
Definition Service.cpp:88
static ICESTORM_SERVICE_API ServicePtr create(const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const std::string &, const Ice::Identity &, const std::string &)
Definition Service.cpp:95
void Identity(MatrixXX< N, N, T > *a)
Definition MatrixXX.h:570
IceUtil::Handle< NodeI > NodeIPtr
Definition Instance.h:36
::IceInternal::Handle< IceStormInternal::Service > ServicePtr
Definition Service.h:44
IceUtil::Handle< TransientTopicManagerImpl > TransientTopicManagerImplPtr
IceUtil::Handle< TraceLevels > TraceLevelsPtr
Definition Instance.h:44
IceUtil::Handle< TopicManagerImpl > TopicManagerImplPtr
IceUtil::Handle< PersistentInstance > PersistentInstancePtr
Definition Instance.h:172
IceUtil::Handle< Instance > InstancePtr
Definition Instance.h:128
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicManager > TopicManagerPrx
Definition IceManager.h:69
::IceInternal::Handle<::Ice::Properties > PropertiesPtr
::IceInternal::Handle<::Ice::Communicator > CommunicatorPtr
Definition IceManager.h:49
::IceInternal::Handle<::Ice::ObjectAdapter > ObjectAdapterPtr
Definition IceManager.h:52
double s(double t, double s0, double v0, double a0, double j)
Definition CtrlUtil.h:33