Service.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #define ICESTORM_SERVICE_API_EXPORTS
11 
12 //#include <Ice/PluginManagerI.h> // For loadPlugin
13 
14 #include <IceStorm/TopicI.h>
15 #include <IceStorm/TopicManagerI.h>
17 #include <IceStorm/Instance.h>
18 #include <IceStorm/Util.h>
19 
20 #include <IceStorm/Service.h>
21 
22 #include <IceStorm/Observers.h>
23 #include <IceStorm/TraceLevels.h>
24 #include <IceUtil/StringUtil.h>
25 
26 #include <IceStorm/NodeI.h>
28 
29 #include <IceGrid/Registry.h>
30 
31 using namespace std;
32 using namespace Ice;
33 using namespace IceStorm;
34 using namespace IceStormInternal;
35 using namespace IceStormElection;
36 
37 namespace
38 {
39 
40  class ServiceI : public IceStormInternal::Service
41  {
42  public:
43 
44  ServiceI();
45  virtual ~ServiceI();
46 
47  virtual void start(const string&,
48  const CommunicatorPtr&,
49  const StringSeq&);
50 
51  virtual void start(const CommunicatorPtr&,
52  const ObjectAdapterPtr&,
53  const ObjectAdapterPtr&,
54  const string&,
55  const Ice::Identity&,
56  const string&);
57 
58  virtual TopicManagerPrx getTopicManager() const;
59 
60  virtual void stop();
61 
62  private:
63 
64  void createDbEnv(const Ice::CommunicatorPtr&);
65  void validateProperties(const string&, const PropertiesPtr&, const LoggerPtr&);
66 
67  TopicManagerImplPtr _manager;
68  TransientTopicManagerImplPtr _transientManager;
69  TopicManagerPrx _managerProxy;
70  InstancePtr _instance;
71  };
72 
73  class FinderI : public IceStorm::Finder
74  {
75  public:
76 
77  FinderI(const TopicManagerPrx& topicManager) : _topicManager(topicManager)
78  {
79  }
80 
81  virtual TopicManagerPrx
82  getTopicManager(const Ice::Current&)
83  {
84  return _topicManager;
85  }
86 
87  private:
88 
89  const TopicManagerPrx _topicManager;
90  };
91 
92 }
93 
94 extern "C"
95 {
96 
97  ICESTORM_SERVICE_API ::IceBox::Service*
99  {
100  return new ServiceI;
101  }
102 
103 }
104 
107  const ObjectAdapterPtr& topicAdapter,
108  const ObjectAdapterPtr& publishAdapter,
109  const string& name,
110  const Ice::Identity& id,
111  const string& dbEnv)
112 {
113  ServiceI* service = new ServiceI;
114  ServicePtr svc = service;
115  service->start(communicator, topicAdapter, publishAdapter, name, id, dbEnv);
116  return svc;
117 }
118 
119 ServiceI::ServiceI()
120 {
121 }
122 
123 ServiceI::~ServiceI()
124 {
125 }
126 
127 void
128 ServiceI::start(
129  const string& name,
130  const CommunicatorPtr& communicator,
131  const StringSeq& /*args*/)
132 {
133  PropertiesPtr properties = communicator->getProperties();
134 
135  validateProperties(name, properties, communicator->getLogger());
136 
137  int id = properties->getPropertyAsIntWithDefault(name + ".NodeId", -1);
138 
139  // If we are using a replicated deployment and if the topic
140  // manager thread pool max size is not set then ensure it is set
141  // to some suitably high number. This ensures no deadlocks in the
142  // replicated case due to call forwarding from replicas to
143  // coordinators.
144  if (id != -1 && properties->getProperty(name + ".TopicManager.ThreadPool.SizeMax").empty())
145  {
146  properties->setProperty(name + ".TopicManager.ThreadPool.SizeMax", "100");
147  }
148 
149  Ice::ObjectAdapterPtr topicAdapter = communicator->createObjectAdapter(name + ".TopicManager");
150  Ice::ObjectAdapterPtr publishAdapter = communicator->createObjectAdapter(name + ".Publish");
151 
152  //
153  // We use the name of the service for the name of the database environment.
154  //
155  string instanceName = properties->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
156  Identity topicManagerId;
157  topicManagerId.category = instanceName;
158  topicManagerId.name = "TopicManager";
159 
160  if (properties->getPropertyAsIntWithDefault(name + ".Transient", 0) > 0)
161  {
162  _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
163  try
164  {
166  _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, topicManagerId));
167  }
168  catch (const Ice::Exception& ex)
169  {
170  _instance = 0;
171 
172  LoggerOutputBase s;
173  s << "exception while starting IceStorm service " << name << ":\n";
174  s << ex;
175 
176  IceBox::FailureException e(__FILE__, __LINE__);
177  e.reason = s.str();
178  throw e;
179  }
180  topicAdapter->activate();
181  publishAdapter->activate();
182  return;
183  }
184 
185  if (id == -1) // No replication.
186  {
187  try
188  {
189  PersistentInstancePtr instance =
190  new PersistentInstance(instanceName, name, communicator, publishAdapter, topicAdapter);
191  _instance = instance;
192 
193  _manager = new TopicManagerImpl(instance);
194  _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager->getServant(), topicManagerId));
195  }
196  catch (const IceUtil::Exception& ex)
197  {
198  _instance = 0;
199 
200  LoggerOutputBase s;
201  s << "exception while starting IceStorm service " << name << ":\n";
202  s << ex;
203 
204  IceBox::FailureException e(__FILE__, __LINE__);
205  e.reason = s.str();
206  throw e;
207  }
208  }
209  else
210  {
211  // Here we want to create a map of id -> election node
212  // proxies.
213  map<int, NodePrx> nodes;
214 
215  string topicManagerAdapterId = properties->getProperty(name + ".TopicManager.AdapterId");
216 
217  // We support two possible deployments. The first is a manual
218  // deployment, the second is IceGrid.
219  //
220  // Here we check for the manual deployment
221  const string prefix = name + ".Nodes.";
222  Ice::PropertyDict props = properties->getPropertiesForPrefix(prefix);
223  if (!props.empty())
224  {
225  for (Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
226  {
227  int nodeid = atoi(p->first.substr(prefix.size()).c_str());
228  nodes[nodeid] = NodePrx::uncheckedCast(communicator->propertyToProxy(p->first));
229  }
230  }
231  else
232  {
233  // If adapter id's are defined for the topic manager or
234  // node adapters then we consider this an IceGrid based
235  // deployment.
236  string nodeAdapterId = properties->getProperty(name + ".Node.AdapterId");
237 
238  // Validate first that the adapter ids match for the node
239  // and the topic manager otherwise some other deployment
240  // is being used.
241  const string suffix = ".TopicManager";
242  if (topicManagerAdapterId.empty() || nodeAdapterId.empty() ||
243  topicManagerAdapterId.replace(
244  topicManagerAdapterId.find(suffix), suffix.size(), ".Node") != nodeAdapterId)
245  {
246  Ice::Error error(communicator->getLogger());
247  error << "deployment error: `" << topicManagerAdapterId << "' prefix does not match `"
248  << nodeAdapterId << "'";
249  throw IceBox::FailureException(__FILE__, __LINE__, "IceGrid deployment is incorrect");
250  }
251 
252  // Determine the set of node id and node proxies.
253  //
254  // This is determined by locating all topic manager
255  // replicas, and then working out the node for that
256  // replica.
257  //
258  // We work out the node id by removing the instance
259  // name. The node id must follow.
260  //
261  IceGrid::LocatorPrx locator = IceGrid::LocatorPrx::checkedCast(communicator->getDefaultLocator());
262  assert(locator);
263  IceGrid::QueryPrx query = locator->getLocalQuery();
264  Ice::ObjectProxySeq replicas = query->findAllReplicas(
265  communicator->stringToProxy(instanceName + "/TopicManager"));
266 
267  for (Ice::ObjectProxySeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
268  {
269  string adapterid = (*p)->ice_getAdapterId();
270 
271  // Replace TopicManager with the node endpoint.
272  adapterid = adapterid.replace(adapterid.find(suffix), suffix.size(), ".Node");
273 
274  // The adapter id must start with the instance name.
275  if (adapterid.find(instanceName) != 0)
276  {
277  Ice::Error error(communicator->getLogger());
278  error << "deployment error: `" << adapterid << "' does not start with `" << instanceName << "'";
279  throw IceBox::FailureException(__FILE__, __LINE__, "IceGrid deployment is incorrect");
280  }
281 
282  // The node id follows. We find the first digit (the
283  // start of the node id, and then the end of the
284  // digits).
285  string::size_type start = instanceName.size();
286  while (start < adapterid.size() && !IceUtilInternal::isDigit(adapterid[start]))
287  {
288  ++start;
289  }
290  string::size_type end = start;
291  while (end < adapterid.size() && IceUtilInternal::isDigit(adapterid[end]))
292  {
293  ++end;
294  }
295  if (start == end)
296  {
297  // We must have at least one digit, otherwise there is
298  // some sort of deployment error.
299  Ice::Error error(communicator->getLogger());
300  error << "deployment error: node id does not follow instance name. instance name:"
301  << instanceName << " adapter id: " << adapterid;
302  throw IceBox::FailureException(__FILE__, __LINE__, "IceGrid deployment is incorrect");
303  }
304 
305  int nodeid = atoi(adapterid.substr(start, end - start).c_str());
306  ostringstream os;
307  os << "node" << nodeid;
308  Ice::Identity id;
309  id.category = instanceName;
310  id.name = os.str();
311 
312  nodes[nodeid] = NodePrx::uncheckedCast((*p)->ice_adapterId(adapterid)->ice_identity(id));
313  }
314  }
315 
316  if (nodes.size() < 3)
317  {
318  Ice::Error error(communicator->getLogger());
319  error << "Replication requires at least 3 Nodes";
320  throw IceBox::FailureException(__FILE__, __LINE__, "Replication requires at least 3 Nodes");
321  }
322 
323  try
324  {
325  // If the node thread pool size is not set then initialize
326  // to the number of nodes + 1 and disable thread pool size
327  // warnings.
328  if (properties->getProperty(name + ".Node.ThreadPool.Size").empty())
329  {
330  ostringstream os;
331  os << nodes.size() + 1;
332  properties->setProperty(name + ".Node.ThreadPool.Size", os.str());
333  properties->setProperty(name + ".Node.ThreadPool.SizeWarn", "0");
334  }
335  if (properties->getProperty(name + ".Node.MessageSizeMax").empty())
336  {
337  properties->setProperty(name + ".Node.MessageSizeMax", "0"); // No limit on data exchanged internally
338  }
339 
340  Ice::ObjectAdapterPtr nodeAdapter = communicator->createObjectAdapter(name + ".Node");
341 
342  PersistentInstancePtr instance =
343  new PersistentInstance(instanceName, name, communicator, publishAdapter, topicAdapter,
344  nodeAdapter, nodes[id]);
345  _instance = instance;
346 
347  _instance->observers()->setMajority(static_cast<unsigned int>(nodes.size()) / 2);
348 
349  // Trace replication information.
350  TraceLevelsPtr traceLevels = _instance->traceLevels();
351  if (traceLevels->election > 0)
352  {
353  Ice::Trace out(traceLevels->logger, traceLevels->electionCat);
354  out << "I am node " << id << "\n";
355  for (map<int, NodePrx>::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
356  {
357  out << "\tnode: " << p->first << " proxy: " << p->second->ice_toString() << "\n";
358  }
359  }
360 
361  if (topicManagerAdapterId.empty())
362  {
363  // We're not using an IceGrid deployment. Here we need
364  // a proxy which is used to create proxies to the
365  // replicas later.
366  _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId));
367  }
368  else
369  {
370  // If we're using IceGrid deployment we need to create
371  // indirect proxies.
372  _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->createIndirectProxy(topicManagerId));
373  }
374 
375  _manager = new TopicManagerImpl(instance);
376  topicAdapter->add(_manager->getServant(), topicManagerId);
377 
378  ostringstream os; // The node object identity.
379  os << "node" << id;
380  Ice::Identity nodeid;
381  nodeid.category = instanceName;
382  nodeid.name = os.str();
383 
384  NodeIPtr node = new NodeI(_instance, _manager, _managerProxy, id, nodes);
385  _instance->setNode(node);
386  nodeAdapter->add(node, nodeid);
387  nodeAdapter->activate();
388 
389  node->start();
390  }
391  catch (const IceUtil::Exception& ex)
392  {
393  _instance = 0;
394 
395  LoggerOutputBase s;
396  s << "exception while starting IceStorm service " << name << ":\n";
397  s << ex;
398 
399  IceBox::FailureException e(__FILE__, __LINE__);
400  e.reason = s.str();
401  throw e;
402  }
403  }
404 
405  topicAdapter->add(new FinderI(TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId))),
406  stringToIdentity("IceStorm/Finder"));
407 
408  topicAdapter->activate();
409  publishAdapter->activate();
410 }
411 
412 void
413 ServiceI::start(const CommunicatorPtr& communicator,
414  const ObjectAdapterPtr& topicAdapter,
415  const ObjectAdapterPtr& publishAdapter,
416  const string& name,
417  const Ice::Identity& id,
418  const string& /*dbEnv*/)
419 {
420  //
421  // For IceGrid we don't validate the properties as all sorts of
422  // non-IceStorm properties are included in the prefix.
423  //
424  //validateProperties(name, communicator->getProperties(), communicator->getLogger());
425 
426  // This is for IceGrid only and as such we use a transient
427  // implementation of IceStorm.
428  string instanceName = communicator->getProperties()->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
429  _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
430 
431  try
432  {
434  _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, id));
435  }
436  catch (const Ice::Exception& ex)
437  {
438  _instance = 0;
439  LoggerOutputBase s;
440  s << "exception while starting IceStorm service " << name << ":\n";
441  s << ex;
442 
443  IceBox::FailureException e(__FILE__, __LINE__);
444  e.reason = s.str();
445  throw e;
446  }
447 }
448 
450 ServiceI::getTopicManager() const
451 {
452  return _managerProxy;
453 }
454 
455 void
456 ServiceI::stop()
457 {
458  // Shutdown the instance. This deactivates all OAs.
459  _instance->shutdown();
460 
461  //
462  // It's necessary to reap all destroyed topics on shutdown.
463  //
464  if (_manager)
465  {
466  _manager->shutdown();
467  }
468  if (_transientManager)
469  {
470  _transientManager->shutdown();
471  }
472 
473  //
474  // Destroy the instance. This step must occur last.
475  //
476  _instance->destroy();
477 }
478 
479 void
480 ServiceI::validateProperties(const string& name, const PropertiesPtr& properties, const LoggerPtr& logger)
481 {
482  static const string suffixes[] =
483  {
484  "ReplicatedTopicManagerEndpoints",
485  "ReplicatedPublishEndpoints",
486  "Nodes.*",
487  "Transient",
488  "NodeId",
489  "Flush.Timeout",
490  "InstanceName",
491  "Election.MasterTimeout",
492  "Election.ElectionTimeout",
493  "Election.ResponseTimeout",
494  "Publish.AdapterId",
495  "Publish.Endpoints",
496  "Publish.Locator",
497  "Publish.PublishedEndpoints",
498  "Publish.ReplicaGroupId",
499  "Publish.Router",
500  "Publish.ThreadPool.Size",
501  "Publish.ThreadPool.SizeMax",
502  "Publish.ThreadPool.SizeWarn",
503  "Publish.ThreadPool.StackSize",
504  "Node.AdapterId",
505  "Node.Endpoints",
506  "Node.Locator",
507  "Node.PublishedEndpoints",
508  "Node.ReplicaGroupId",
509  "Node.Router",
510  "Node.ThreadPool.Size",
511  "Node.ThreadPool.SizeMax",
512  "Node.ThreadPool.SizeWarn",
513  "Node.ThreadPool.StackSize",
514  "TopicManager.AdapterId",
515  "TopicManager.Endpoints",
516  "TopicManager.Locator",
517  "TopicManager.Proxy",
518  "TopicManager.Proxy.EndpointSelection",
519  "TopicManager.Proxy.ConnectionCached",
520  "TopicManager.Proxy.PreferSecure",
521  "TopicManager.Proxy.LocatorCacheTimeout",
522  "TopicManager.Proxy.Locator",
523  "TopicManager.Proxy.Router",
524  "TopicManager.Proxy.CollocationOptimization",
525  "TopicManager.PublishedEndpoints",
526  "TopicManager.ReplicaGroupId",
527  "TopicManager.Router",
528  "TopicManager.ThreadPool.Size",
529  "TopicManager.ThreadPool.SizeMax",
530  "TopicManager.ThreadPool.SizeWarn",
531  "TopicManager.ThreadPool.StackSize",
532  "Trace.Election",
533  "Trace.Replication",
534  "Trace.Subscriber",
535  "Trace.Topic",
536  "Trace.TopicManager",
537  "Send.Timeout",
538  "Send.QueueSizeMax",
539  "Send.QueueSizeMaxPolicy",
540  "Discard.Interval",
541  "LMDB.Path",
542  "LMDB.MapSize"
543  };
544 
545  vector<string> unknownProps;
546  string prefix = name + ".";
547  PropertyDict props = properties->getPropertiesForPrefix(prefix);
548  for (PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
549  {
550  bool valid = false;
551  for (unsigned int i = 0; i < sizeof(suffixes) / sizeof(*suffixes); ++i)
552  {
553  string prop = prefix + suffixes[i];
554  if (IceUtilInternal::match(p->first, prop))
555  {
556  valid = true;
557  break;
558  }
559  }
560  if (!valid)
561  {
562  unknownProps.push_back(p->first);
563  }
564  }
565 
566  if (!unknownProps.empty())
567  {
568  Warning out(logger);
569  out << "found unknown properties for IceStorm service '" << name << "':";
570  for (vector<string>::const_iterator p = unknownProps.begin(); p != unknownProps.end(); ++p)
571  {
572  out << "\n " << *p;
573  }
574  }
575 }
IceStorm
Definition: DBTypes.ice:22
TopicI.h
createIceStorm
ICESTORM_SERVICE_API ::IceBox::Service * createIceStorm(CommunicatorPtr communicator)
Definition: Service.cpp:98
TopicManagerI.h
Util.h
IceInternal::Handle< ::Ice::Communicator >
Service.h
IceStormInternal::Service::create
static ICESTORM_SERVICE_API ServicePtr create(const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const std::string &, const Ice::Identity &, const std::string &)
Definition: Service.cpp:106
IceStorm::Instance
Definition: Instance.h:59
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:523
IceStormElection
Definition: DBTypes.ice:17
TransientTopicI.h
IceStormElection::NodeI
Definition: NodeI.h:27
IceStormInternal::Service
Definition: Service.h:46
IceStorm::TopicManagerImpl
Definition: TopicManagerI.h:37
IceStorm::TransientTopicManagerImpl
Definition: TransientTopicManagerI.h:30
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:438
IceStorm::PersistentInstance
Definition: Instance.h:131
armarx::control::ethercat::reporting::Severity::Warning
@ Warning
Ice
Definition: DBTypes.cpp:64
TraceLevels.h
std
Definition: Application.h:66
IceStormInternal
Definition: Instance.cpp:27
IceUtil::Handle
Definition: forward_declarations.h:29
IceInternal::ProxyHandle< ::IceProxy::IceStorm::TopicManager >
Observers.h
TransientTopicManagerI.h
NodeI.h
Instance.h
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33