Service.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #define ICESTORM_SERVICE_API_EXPORTS
11 
12 //#include <Ice/PluginManagerI.h> // For loadPlugin
13 
14 #include <IceGrid/Registry.h>
15 #include <IceStorm/Instance.h>
16 #include <IceStorm/NodeI.h>
17 #include <IceStorm/Observers.h>
18 #include <IceStorm/Service.h>
19 #include <IceStorm/TopicI.h>
20 #include <IceStorm/TopicManagerI.h>
21 #include <IceStorm/TraceLevels.h>
24 #include <IceStorm/Util.h>
25 #include <IceUtil/StringUtil.h>
26 
27 using namespace std;
28 using namespace Ice;
29 using namespace IceStorm;
30 using namespace IceStormInternal;
31 using namespace IceStormElection;
32 
33 namespace
34 {
35 
36  class ServiceI : public IceStormInternal::Service
37  {
38  public:
39  ServiceI();
40  virtual ~ServiceI();
41 
42  virtual void start(const string&, const CommunicatorPtr&, const StringSeq&);
43 
44  virtual void start(const CommunicatorPtr&,
45  const ObjectAdapterPtr&,
46  const ObjectAdapterPtr&,
47  const string&,
48  const Ice::Identity&,
49  const string&);
50 
51  virtual TopicManagerPrx getTopicManager() const;
52 
53  virtual void stop();
54 
55  private:
56  void createDbEnv(const Ice::CommunicatorPtr&);
57  void validateProperties(const string&, const PropertiesPtr&, const LoggerPtr&);
58 
59  TopicManagerImplPtr _manager;
60  TransientTopicManagerImplPtr _transientManager;
61  TopicManagerPrx _managerProxy;
62  InstancePtr _instance;
63  };
64 
65  class FinderI : public IceStorm::Finder
66  {
67  public:
68  FinderI(const TopicManagerPrx& topicManager) : _topicManager(topicManager)
69  {
70  }
71 
72  virtual TopicManagerPrx
73  getTopicManager(const Ice::Current&)
74  {
75  return _topicManager;
76  }
77 
78  private:
79  const TopicManagerPrx _topicManager;
80  };
81 
82 } // namespace
83 
84 extern "C"
85 {
86 
87  ICESTORM_SERVICE_API ::IceBox::Service*
89  {
90  return new ServiceI;
91  }
92 }
93 
96  const ObjectAdapterPtr& topicAdapter,
97  const ObjectAdapterPtr& publishAdapter,
98  const string& name,
99  const Ice::Identity& id,
100  const string& dbEnv)
101 {
102  ServiceI* service = new ServiceI;
103  ServicePtr svc = service;
104  service->start(communicator, topicAdapter, publishAdapter, name, id, dbEnv);
105  return svc;
106 }
107 
108 ServiceI::ServiceI()
109 {
110 }
111 
112 ServiceI::~ServiceI()
113 {
114 }
115 
116 void
117 ServiceI::start(const string& name, const CommunicatorPtr& communicator, const StringSeq& /*args*/)
118 {
119  PropertiesPtr properties = communicator->getProperties();
120 
121  validateProperties(name, properties, communicator->getLogger());
122 
123  int id = properties->getPropertyAsIntWithDefault(name + ".NodeId", -1);
124 
125  // If we are using a replicated deployment and if the topic
126  // manager thread pool max size is not set then ensure it is set
127  // to some suitably high number. This ensures no deadlocks in the
128  // replicated case due to call forwarding from replicas to
129  // coordinators.
130  if (id != -1 && properties->getProperty(name + ".TopicManager.ThreadPool.SizeMax").empty())
131  {
132  properties->setProperty(name + ".TopicManager.ThreadPool.SizeMax", "100");
133  }
134 
135  Ice::ObjectAdapterPtr topicAdapter = communicator->createObjectAdapter(name + ".TopicManager");
136  Ice::ObjectAdapterPtr publishAdapter = communicator->createObjectAdapter(name + ".Publish");
137 
138  //
139  // We use the name of the service for the name of the database environment.
140  //
141  string instanceName = properties->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
142  Identity topicManagerId;
143  topicManagerId.category = instanceName;
144  topicManagerId.name = "TopicManager";
145 
146  if (properties->getPropertyAsIntWithDefault(name + ".Transient", 0) > 0)
147  {
148  _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
149  try
150  {
152  _managerProxy =
153  TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, topicManagerId));
154  }
155  catch (const Ice::Exception& ex)
156  {
157  _instance = 0;
158 
159  LoggerOutputBase s;
160  s << "exception while starting IceStorm service " << name << ":\n";
161  s << ex;
162 
163  IceBox::FailureException e(__FILE__, __LINE__);
164  e.reason = s.str();
165  throw e;
166  }
167  topicAdapter->activate();
168  publishAdapter->activate();
169  return;
170  }
171 
172  if (id == -1) // No replication.
173  {
174  try
175  {
177  instanceName, name, communicator, publishAdapter, topicAdapter);
178  _instance = instance;
179 
180  _manager = new TopicManagerImpl(instance);
181  _managerProxy = TopicManagerPrx::uncheckedCast(
182  topicAdapter->add(_manager->getServant(), topicManagerId));
183  }
184  catch (const IceUtil::Exception& ex)
185  {
186  _instance = 0;
187 
188  LoggerOutputBase s;
189  s << "exception while starting IceStorm service " << name << ":\n";
190  s << ex;
191 
192  IceBox::FailureException e(__FILE__, __LINE__);
193  e.reason = s.str();
194  throw e;
195  }
196  }
197  else
198  {
199  // Here we want to create a map of id -> election node
200  // proxies.
201  map<int, NodePrx> nodes;
202 
203  string topicManagerAdapterId = properties->getProperty(name + ".TopicManager.AdapterId");
204 
205  // We support two possible deployments. The first is a manual
206  // deployment, the second is IceGrid.
207  //
208  // Here we check for the manual deployment
209  const string prefix = name + ".Nodes.";
210  Ice::PropertyDict props = properties->getPropertiesForPrefix(prefix);
211  if (!props.empty())
212  {
213  for (Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
214  {
215  int nodeid = atoi(p->first.substr(prefix.size()).c_str());
216  nodes[nodeid] = NodePrx::uncheckedCast(communicator->propertyToProxy(p->first));
217  }
218  }
219  else
220  {
221  // If adapter id's are defined for the topic manager or
222  // node adapters then we consider this an IceGrid based
223  // deployment.
224  string nodeAdapterId = properties->getProperty(name + ".Node.AdapterId");
225 
226  // Validate first that the adapter ids match for the node
227  // and the topic manager otherwise some other deployment
228  // is being used.
229  const string suffix = ".TopicManager";
230  if (topicManagerAdapterId.empty() || nodeAdapterId.empty() ||
231  topicManagerAdapterId.replace(
232  topicManagerAdapterId.find(suffix), suffix.size(), ".Node") != nodeAdapterId)
233  {
234  Ice::Error error(communicator->getLogger());
235  error << "deployment error: `" << topicManagerAdapterId
236  << "' prefix does not match `" << nodeAdapterId << "'";
237  throw IceBox::FailureException(
238  __FILE__, __LINE__, "IceGrid deployment is incorrect");
239  }
240 
241  // Determine the set of node id and node proxies.
242  //
243  // This is determined by locating all topic manager
244  // replicas, and then working out the node for that
245  // replica.
246  //
247  // We work out the node id by removing the instance
248  // name. The node id must follow.
249  //
250  IceGrid::LocatorPrx locator =
251  IceGrid::LocatorPrx::checkedCast(communicator->getDefaultLocator());
252  assert(locator);
253  IceGrid::QueryPrx query = locator->getLocalQuery();
254  Ice::ObjectProxySeq replicas =
255  query->findAllReplicas(communicator->stringToProxy(instanceName + "/TopicManager"));
256 
257  for (Ice::ObjectProxySeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
258  {
259  string adapterid = (*p)->ice_getAdapterId();
260 
261  // Replace TopicManager with the node endpoint.
262  adapterid = adapterid.replace(adapterid.find(suffix), suffix.size(), ".Node");
263 
264  // The adapter id must start with the instance name.
265  if (adapterid.find(instanceName) != 0)
266  {
267  Ice::Error error(communicator->getLogger());
268  error << "deployment error: `" << adapterid << "' does not start with `"
269  << instanceName << "'";
270  throw IceBox::FailureException(
271  __FILE__, __LINE__, "IceGrid deployment is incorrect");
272  }
273 
274  // The node id follows. We find the first digit (the
275  // start of the node id, and then the end of the
276  // digits).
277  string::size_type start = instanceName.size();
278  while (start < adapterid.size() && !IceUtilInternal::isDigit(adapterid[start]))
279  {
280  ++start;
281  }
282  string::size_type end = start;
283  while (end < adapterid.size() && IceUtilInternal::isDigit(adapterid[end]))
284  {
285  ++end;
286  }
287  if (start == end)
288  {
289  // We must have at least one digit, otherwise there is
290  // some sort of deployment error.
291  Ice::Error error(communicator->getLogger());
292  error
293  << "deployment error: node id does not follow instance name. instance name:"
294  << instanceName << " adapter id: " << adapterid;
295  throw IceBox::FailureException(
296  __FILE__, __LINE__, "IceGrid deployment is incorrect");
297  }
298 
299  int nodeid = atoi(adapterid.substr(start, end - start).c_str());
300  ostringstream os;
301  os << "node" << nodeid;
302  Ice::Identity id;
303  id.category = instanceName;
304  id.name = os.str();
305 
306  nodes[nodeid] =
307  NodePrx::uncheckedCast((*p)->ice_adapterId(adapterid)->ice_identity(id));
308  }
309  }
310 
311  if (nodes.size() < 3)
312  {
313  Ice::Error error(communicator->getLogger());
314  error << "Replication requires at least 3 Nodes";
315  throw IceBox::FailureException(
316  __FILE__, __LINE__, "Replication requires at least 3 Nodes");
317  }
318 
319  try
320  {
321  // If the node thread pool size is not set then initialize
322  // to the number of nodes + 1 and disable thread pool size
323  // warnings.
324  if (properties->getProperty(name + ".Node.ThreadPool.Size").empty())
325  {
326  ostringstream os;
327  os << nodes.size() + 1;
328  properties->setProperty(name + ".Node.ThreadPool.Size", os.str());
329  properties->setProperty(name + ".Node.ThreadPool.SizeWarn", "0");
330  }
331  if (properties->getProperty(name + ".Node.MessageSizeMax").empty())
332  {
333  properties->setProperty(name + ".Node.MessageSizeMax",
334  "0"); // No limit on data exchanged internally
335  }
336 
337  Ice::ObjectAdapterPtr nodeAdapter = communicator->createObjectAdapter(name + ".Node");
338 
339  PersistentInstancePtr instance = new PersistentInstance(instanceName,
340  name,
341  communicator,
342  publishAdapter,
343  topicAdapter,
344  nodeAdapter,
345  nodes[id]);
346  _instance = instance;
347 
348  _instance->observers()->setMajority(static_cast<unsigned int>(nodes.size()) / 2);
349 
350  // Trace replication information.
351  TraceLevelsPtr traceLevels = _instance->traceLevels();
352  if (traceLevels->election > 0)
353  {
354  Ice::Trace out(traceLevels->logger, traceLevels->electionCat);
355  out << "I am node " << id << "\n";
356  for (map<int, NodePrx>::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
357  {
358  out << "\tnode: " << p->first << " proxy: " << p->second->ice_toString()
359  << "\n";
360  }
361  }
362 
363  if (topicManagerAdapterId.empty())
364  {
365  // We're not using an IceGrid deployment. Here we need
366  // a proxy which is used to create proxies to the
367  // replicas later.
368  _managerProxy =
369  TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId));
370  }
371  else
372  {
373  // If we're using IceGrid deployment we need to create
374  // indirect proxies.
375  _managerProxy = TopicManagerPrx::uncheckedCast(
376  topicAdapter->createIndirectProxy(topicManagerId));
377  }
378 
379  _manager = new TopicManagerImpl(instance);
380  topicAdapter->add(_manager->getServant(), topicManagerId);
381 
382  ostringstream os; // The node object identity.
383  os << "node" << id;
384  Ice::Identity nodeid;
385  nodeid.category = instanceName;
386  nodeid.name = os.str();
387 
388  NodeIPtr node = new NodeI(_instance, _manager, _managerProxy, id, nodes);
389  _instance->setNode(node);
390  nodeAdapter->add(node, nodeid);
391  nodeAdapter->activate();
392 
393  node->start();
394  }
395  catch (const IceUtil::Exception& ex)
396  {
397  _instance = 0;
398 
399  LoggerOutputBase s;
400  s << "exception while starting IceStorm service " << name << ":\n";
401  s << ex;
402 
403  IceBox::FailureException e(__FILE__, __LINE__);
404  e.reason = s.str();
405  throw e;
406  }
407  }
408 
409  topicAdapter->add(
410  new FinderI(TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId))),
411  stringToIdentity("IceStorm/Finder"));
412 
413  topicAdapter->activate();
414  publishAdapter->activate();
415 }
416 
417 void
418 ServiceI::start(const CommunicatorPtr& communicator,
419  const ObjectAdapterPtr& topicAdapter,
420  const ObjectAdapterPtr& publishAdapter,
421  const string& name,
422  const Ice::Identity& id,
423  const string& /*dbEnv*/)
424 {
425  //
426  // For IceGrid we don't validate the properties as all sorts of
427  // non-IceStorm properties are included in the prefix.
428  //
429  //validateProperties(name, communicator->getProperties(), communicator->getLogger());
430 
431  // This is for IceGrid only and as such we use a transient
432  // implementation of IceStorm.
433  string instanceName =
434  communicator->getProperties()->getPropertyWithDefault(name + ".InstanceName", "IceStorm");
435  _instance = new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
436 
437  try
438  {
440  _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, id));
441  }
442  catch (const Ice::Exception& ex)
443  {
444  _instance = 0;
445  LoggerOutputBase s;
446  s << "exception while starting IceStorm service " << name << ":\n";
447  s << ex;
448 
449  IceBox::FailureException e(__FILE__, __LINE__);
450  e.reason = s.str();
451  throw e;
452  }
453 }
454 
456 ServiceI::getTopicManager() const
457 {
458  return _managerProxy;
459 }
460 
461 void
462 ServiceI::stop()
463 {
464  // Shutdown the instance. This deactivates all OAs.
465  _instance->shutdown();
466 
467  //
468  // It's necessary to reap all destroyed topics on shutdown.
469  //
470  if (_manager)
471  {
472  _manager->shutdown();
473  }
474  if (_transientManager)
475  {
476  _transientManager->shutdown();
477  }
478 
479  //
480  // Destroy the instance. This step must occur last.
481  //
482  _instance->destroy();
483 }
484 
485 void
486 ServiceI::validateProperties(const string& name,
487  const PropertiesPtr& properties,
488  const LoggerPtr& logger)
489 {
490  static const string suffixes[] = {"ReplicatedTopicManagerEndpoints",
491  "ReplicatedPublishEndpoints",
492  "Nodes.*",
493  "Transient",
494  "NodeId",
495  "Flush.Timeout",
496  "InstanceName",
497  "Election.MasterTimeout",
498  "Election.ElectionTimeout",
499  "Election.ResponseTimeout",
500  "Publish.AdapterId",
501  "Publish.Endpoints",
502  "Publish.Locator",
503  "Publish.PublishedEndpoints",
504  "Publish.ReplicaGroupId",
505  "Publish.Router",
506  "Publish.ThreadPool.Size",
507  "Publish.ThreadPool.SizeMax",
508  "Publish.ThreadPool.SizeWarn",
509  "Publish.ThreadPool.StackSize",
510  "Node.AdapterId",
511  "Node.Endpoints",
512  "Node.Locator",
513  "Node.PublishedEndpoints",
514  "Node.ReplicaGroupId",
515  "Node.Router",
516  "Node.ThreadPool.Size",
517  "Node.ThreadPool.SizeMax",
518  "Node.ThreadPool.SizeWarn",
519  "Node.ThreadPool.StackSize",
520  "TopicManager.AdapterId",
521  "TopicManager.Endpoints",
522  "TopicManager.Locator",
523  "TopicManager.Proxy",
524  "TopicManager.Proxy.EndpointSelection",
525  "TopicManager.Proxy.ConnectionCached",
526  "TopicManager.Proxy.PreferSecure",
527  "TopicManager.Proxy.LocatorCacheTimeout",
528  "TopicManager.Proxy.Locator",
529  "TopicManager.Proxy.Router",
530  "TopicManager.Proxy.CollocationOptimization",
531  "TopicManager.PublishedEndpoints",
532  "TopicManager.ReplicaGroupId",
533  "TopicManager.Router",
534  "TopicManager.ThreadPool.Size",
535  "TopicManager.ThreadPool.SizeMax",
536  "TopicManager.ThreadPool.SizeWarn",
537  "TopicManager.ThreadPool.StackSize",
538  "Trace.Election",
539  "Trace.Replication",
540  "Trace.Subscriber",
541  "Trace.Topic",
542  "Trace.TopicManager",
543  "Send.Timeout",
544  "Send.QueueSizeMax",
545  "Send.QueueSizeMaxPolicy",
546  "Discard.Interval",
547  "LMDB.Path",
548  "LMDB.MapSize"};
549 
550  vector<string> unknownProps;
551  string prefix = name + ".";
552  PropertyDict props = properties->getPropertiesForPrefix(prefix);
553  for (PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
554  {
555  bool valid = false;
556  for (unsigned int i = 0; i < sizeof(suffixes) / sizeof(*suffixes); ++i)
557  {
558  string prop = prefix + suffixes[i];
559  if (IceUtilInternal::match(p->first, prop))
560  {
561  valid = true;
562  break;
563  }
564  }
565  if (!valid)
566  {
567  unknownProps.push_back(p->first);
568  }
569  }
570 
571  if (!unknownProps.empty())
572  {
573  Warning out(logger);
574  out << "found unknown properties for IceStorm service '" << name << "':";
575  for (vector<string>::const_iterator p = unknownProps.begin(); p != unknownProps.end(); ++p)
576  {
577  out << "\n " << *p;
578  }
579  }
580 }
IceStorm
Definition: DBTypes.ice:22
TopicI.h
createIceStorm
ICESTORM_SERVICE_API ::IceBox::Service * createIceStorm(CommunicatorPtr communicator)
Definition: Service.cpp:88
TopicManagerI.h
Util.h
IceInternal::Handle<::Ice::Communicator >
Service.h
IceStormInternal::Service::create
static ICESTORM_SERVICE_API ServicePtr create(const Ice::CommunicatorPtr &, const Ice::ObjectAdapterPtr &, const Ice::ObjectAdapterPtr &, const std::string &, const Ice::Identity &, const std::string &)
Definition: Service.cpp:95
IceStorm::Instance
Definition: Instance.h:58
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:570
IceStormElection
Definition: DBTypes.ice:17
TransientTopicI.h
IceStormElection::NodeI
Definition: NodeI.h:28
IceStormInternal::Service
Definition: Service.h:46
IceStorm::TopicManagerImpl
Definition: TopicManagerI.h:35
IceStorm::TransientTopicManagerImpl
Definition: TransientTopicManagerI.h:30
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:478
IceStorm::PersistentInstance
Definition: Instance.h:134
armarx::control::ethercat::reporting::Severity::Warning
@ Warning
Ice
Definition: DBTypes.cpp:63
TraceLevels.h
std
Definition: Application.h:66
IceStormInternal
Definition: Instance.cpp:26
IceUtil::Handle
Definition: forward_declarations.h:30
IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicManager >
Observers.h
TransientTopicManagerI.h
NodeI.h
Instance.h
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33