10 #define ICESTORM_SERVICE_API_EXPORTS
14 #include <IceGrid/Registry.h>
25 #include <IceUtil/StringUtil.h>
42 virtual void start(
const string&,
const CommunicatorPtr&,
const StringSeq&);
57 void validateProperties(
const string&,
const PropertiesPtr&,
const LoggerPtr&);
65 class FinderI :
public IceStorm::Finder
68 FinderI(
const TopicManagerPrx& topicManager) : _topicManager(topicManager)
73 getTopicManager(
const Ice::Current&)
87 ICESTORM_SERVICE_API ::IceBox::Service*
102 ServiceI* service =
new ServiceI;
104 service->start(communicator, topicAdapter, publishAdapter, name,
id, dbEnv);
112 ServiceI::~ServiceI()
117 ServiceI::start(
const string& name,
const CommunicatorPtr& communicator,
const StringSeq& )
121 validateProperties(name, properties, communicator->getLogger());
123 int id = properties->getPropertyAsIntWithDefault(name +
".NodeId", -1);
130 if (
id != -1 && properties->getProperty(name +
".TopicManager.ThreadPool.SizeMax").empty())
132 properties->setProperty(name +
".TopicManager.ThreadPool.SizeMax",
"100");
141 string instanceName = properties->getPropertyWithDefault(name +
".InstanceName",
"IceStorm");
143 topicManagerId.category = instanceName;
144 topicManagerId.name =
"TopicManager";
146 if (properties->getPropertyAsIntWithDefault(name +
".Transient", 0) > 0)
148 _instance =
new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
153 TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, topicManagerId));
155 catch (
const Ice::Exception& ex)
160 s <<
"exception while starting IceStorm service " << name <<
":\n";
163 IceBox::FailureException e(__FILE__, __LINE__);
167 topicAdapter->activate();
168 publishAdapter->activate();
177 instanceName, name, communicator, publishAdapter, topicAdapter);
178 _instance = instance;
181 _managerProxy = TopicManagerPrx::uncheckedCast(
182 topicAdapter->add(_manager->getServant(), topicManagerId));
184 catch (
const IceUtil::Exception& ex)
189 s <<
"exception while starting IceStorm service " << name <<
":\n";
192 IceBox::FailureException e(__FILE__, __LINE__);
201 map<int, NodePrx> nodes;
203 string topicManagerAdapterId = properties->getProperty(name +
".TopicManager.AdapterId");
209 const string prefix = name +
".Nodes.";
210 Ice::PropertyDict props = properties->getPropertiesForPrefix(prefix);
213 for (Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
215 int nodeid = atoi(p->first.substr(prefix.size()).c_str());
216 nodes[nodeid] = NodePrx::uncheckedCast(communicator->propertyToProxy(p->first));
224 string nodeAdapterId = properties->getProperty(name +
".Node.AdapterId");
229 const string suffix =
".TopicManager";
230 if (topicManagerAdapterId.empty() || nodeAdapterId.empty() ||
231 topicManagerAdapterId.replace(
232 topicManagerAdapterId.find(suffix), suffix.size(),
".Node") != nodeAdapterId)
234 Ice::Error error(communicator->getLogger());
235 error <<
"deployment error: `" << topicManagerAdapterId
236 <<
"' prefix does not match `" << nodeAdapterId <<
"'";
237 throw IceBox::FailureException(
238 __FILE__, __LINE__,
"IceGrid deployment is incorrect");
250 IceGrid::LocatorPrx locator =
251 IceGrid::LocatorPrx::checkedCast(communicator->getDefaultLocator());
253 IceGrid::QueryPrx query = locator->getLocalQuery();
254 Ice::ObjectProxySeq replicas =
255 query->findAllReplicas(communicator->stringToProxy(instanceName +
"/TopicManager"));
257 for (Ice::ObjectProxySeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
259 string adapterid = (*p)->ice_getAdapterId();
262 adapterid = adapterid.replace(adapterid.find(suffix), suffix.size(),
".Node");
265 if (adapterid.find(instanceName) != 0)
267 Ice::Error error(communicator->getLogger());
268 error <<
"deployment error: `" << adapterid <<
"' does not start with `"
269 << instanceName <<
"'";
270 throw IceBox::FailureException(
271 __FILE__, __LINE__,
"IceGrid deployment is incorrect");
277 string::size_type start = instanceName.size();
278 while (start < adapterid.size() && !IceUtilInternal::isDigit(adapterid[start]))
282 string::size_type end = start;
283 while (end < adapterid.size() && IceUtilInternal::isDigit(adapterid[end]))
291 Ice::Error error(communicator->getLogger());
293 <<
"deployment error: node id does not follow instance name. instance name:"
294 << instanceName <<
" adapter id: " << adapterid;
295 throw IceBox::FailureException(
296 __FILE__, __LINE__,
"IceGrid deployment is incorrect");
299 int nodeid = atoi(adapterid.substr(start, end - start).c_str());
301 os <<
"node" << nodeid;
303 id.category = instanceName;
307 NodePrx::uncheckedCast((*p)->ice_adapterId(adapterid)->ice_identity(
id));
311 if (nodes.size() < 3)
313 Ice::Error error(communicator->getLogger());
314 error <<
"Replication requires at least 3 Nodes";
315 throw IceBox::FailureException(
316 __FILE__, __LINE__,
"Replication requires at least 3 Nodes");
324 if (properties->getProperty(name +
".Node.ThreadPool.Size").empty())
327 os << nodes.size() + 1;
328 properties->setProperty(name +
".Node.ThreadPool.Size", os.str());
329 properties->setProperty(name +
".Node.ThreadPool.SizeWarn",
"0");
331 if (properties->getProperty(name +
".Node.MessageSizeMax").empty())
333 properties->setProperty(name +
".Node.MessageSizeMax",
346 _instance = instance;
348 _instance->observers()->setMajority(
static_cast<unsigned int>(nodes.size()) / 2);
352 if (traceLevels->election > 0)
354 Ice::Trace out(traceLevels->logger, traceLevels->electionCat);
355 out <<
"I am node " <<
id <<
"\n";
356 for (map<int, NodePrx>::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
358 out <<
"\tnode: " << p->first <<
" proxy: " << p->second->ice_toString()
363 if (topicManagerAdapterId.empty())
369 TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId));
375 _managerProxy = TopicManagerPrx::uncheckedCast(
376 topicAdapter->createIndirectProxy(topicManagerId));
380 topicAdapter->add(_manager->getServant(), topicManagerId);
385 nodeid.category = instanceName;
386 nodeid.name = os.str();
388 NodeIPtr node =
new NodeI(_instance, _manager, _managerProxy,
id, nodes);
389 _instance->setNode(node);
390 nodeAdapter->add(node, nodeid);
391 nodeAdapter->activate();
395 catch (
const IceUtil::Exception& ex)
400 s <<
"exception while starting IceStorm service " << name <<
":\n";
403 IceBox::FailureException e(__FILE__, __LINE__);
410 new FinderI(TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId))),
411 stringToIdentity(
"IceStorm/Finder"));
413 topicAdapter->activate();
414 publishAdapter->activate();
433 string instanceName =
434 communicator->getProperties()->getPropertyWithDefault(name +
".InstanceName",
"IceStorm");
435 _instance =
new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
440 _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager,
id));
442 catch (
const Ice::Exception& ex)
446 s <<
"exception while starting IceStorm service " << name <<
":\n";
449 IceBox::FailureException e(__FILE__, __LINE__);
456 ServiceI::getTopicManager()
const
458 return _managerProxy;
465 _instance->shutdown();
472 _manager->shutdown();
474 if (_transientManager)
476 _transientManager->shutdown();
482 _instance->destroy();
486 ServiceI::validateProperties(
const string& name,
488 const LoggerPtr& logger)
490 static const string suffixes[] = {
"ReplicatedTopicManagerEndpoints",
491 "ReplicatedPublishEndpoints",
497 "Election.MasterTimeout",
498 "Election.ElectionTimeout",
499 "Election.ResponseTimeout",
503 "Publish.PublishedEndpoints",
504 "Publish.ReplicaGroupId",
506 "Publish.ThreadPool.Size",
507 "Publish.ThreadPool.SizeMax",
508 "Publish.ThreadPool.SizeWarn",
509 "Publish.ThreadPool.StackSize",
513 "Node.PublishedEndpoints",
514 "Node.ReplicaGroupId",
516 "Node.ThreadPool.Size",
517 "Node.ThreadPool.SizeMax",
518 "Node.ThreadPool.SizeWarn",
519 "Node.ThreadPool.StackSize",
520 "TopicManager.AdapterId",
521 "TopicManager.Endpoints",
522 "TopicManager.Locator",
523 "TopicManager.Proxy",
524 "TopicManager.Proxy.EndpointSelection",
525 "TopicManager.Proxy.ConnectionCached",
526 "TopicManager.Proxy.PreferSecure",
527 "TopicManager.Proxy.LocatorCacheTimeout",
528 "TopicManager.Proxy.Locator",
529 "TopicManager.Proxy.Router",
530 "TopicManager.Proxy.CollocationOptimization",
531 "TopicManager.PublishedEndpoints",
532 "TopicManager.ReplicaGroupId",
533 "TopicManager.Router",
534 "TopicManager.ThreadPool.Size",
535 "TopicManager.ThreadPool.SizeMax",
536 "TopicManager.ThreadPool.SizeWarn",
537 "TopicManager.ThreadPool.StackSize",
542 "Trace.TopicManager",
545 "Send.QueueSizeMaxPolicy",
550 vector<string> unknownProps;
551 string prefix = name +
".";
552 PropertyDict props = properties->getPropertiesForPrefix(prefix);
553 for (PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
556 for (
unsigned int i = 0; i <
sizeof(suffixes) /
sizeof(*suffixes); ++i)
558 string prop = prefix + suffixes[i];
559 if (IceUtilInternal::match(p->first, prop))
567 unknownProps.push_back(p->first);
571 if (!unknownProps.empty())
574 out <<
"found unknown properties for IceStorm service '" << name <<
"':";
575 for (vector<string>::const_iterator p = unknownProps.begin(); p != unknownProps.end(); ++p)