10 #define ICESTORM_SERVICE_API_EXPORTS
24 #include <IceUtil/StringUtil.h>
29 #include <IceGrid/Registry.h>
47 virtual void start(
const string&,
65 void validateProperties(
const string&,
const PropertiesPtr&,
const LoggerPtr&);
73 class FinderI :
public IceStorm::Finder
77 FinderI(
const TopicManagerPrx& topicManager) : _topicManager(topicManager)
82 getTopicManager(
const Ice::Current&)
97 ICESTORM_SERVICE_API ::IceBox::Service*
113 ServiceI* service =
new ServiceI;
115 service->start(communicator, topicAdapter, publishAdapter, name,
id, dbEnv);
123 ServiceI::~ServiceI()
135 validateProperties(name, properties, communicator->getLogger());
137 int id = properties->getPropertyAsIntWithDefault(name +
".NodeId", -1);
144 if (
id != -1 && properties->getProperty(name +
".TopicManager.ThreadPool.SizeMax").empty())
146 properties->setProperty(name +
".TopicManager.ThreadPool.SizeMax",
"100");
155 string instanceName = properties->getPropertyWithDefault(name +
".InstanceName",
"IceStorm");
157 topicManagerId.category = instanceName;
158 topicManagerId.name =
"TopicManager";
160 if (properties->getPropertyAsIntWithDefault(name +
".Transient", 0) > 0)
162 _instance =
new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
166 _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager, topicManagerId));
168 catch (
const Ice::Exception& ex)
173 s <<
"exception while starting IceStorm service " << name <<
":\n";
176 IceBox::FailureException e(__FILE__, __LINE__);
180 topicAdapter->activate();
181 publishAdapter->activate();
190 new PersistentInstance(instanceName, name, communicator, publishAdapter, topicAdapter);
191 _instance = instance;
194 _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(_manager->getServant(), topicManagerId));
196 catch (
const IceUtil::Exception& ex)
201 s <<
"exception while starting IceStorm service " << name <<
":\n";
204 IceBox::FailureException e(__FILE__, __LINE__);
213 map<int, NodePrx> nodes;
215 string topicManagerAdapterId = properties->getProperty(name +
".TopicManager.AdapterId");
221 const string prefix = name +
".Nodes.";
222 Ice::PropertyDict props = properties->getPropertiesForPrefix(prefix);
225 for (Ice::PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
227 int nodeid = atoi(p->first.substr(prefix.size()).c_str());
228 nodes[nodeid] = NodePrx::uncheckedCast(communicator->propertyToProxy(p->first));
236 string nodeAdapterId = properties->getProperty(name +
".Node.AdapterId");
241 const string suffix =
".TopicManager";
242 if (topicManagerAdapterId.empty() || nodeAdapterId.empty() ||
243 topicManagerAdapterId.replace(
244 topicManagerAdapterId.find(suffix), suffix.size(),
".Node") != nodeAdapterId)
246 Ice::Error error(communicator->getLogger());
247 error <<
"deployment error: `" << topicManagerAdapterId <<
"' prefix does not match `"
248 << nodeAdapterId <<
"'";
249 throw IceBox::FailureException(__FILE__, __LINE__,
"IceGrid deployment is incorrect");
261 IceGrid::LocatorPrx locator = IceGrid::LocatorPrx::checkedCast(communicator->getDefaultLocator());
263 IceGrid::QueryPrx query = locator->getLocalQuery();
264 Ice::ObjectProxySeq replicas = query->findAllReplicas(
265 communicator->stringToProxy(instanceName +
"/TopicManager"));
267 for (Ice::ObjectProxySeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
269 string adapterid = (*p)->ice_getAdapterId();
272 adapterid = adapterid.replace(adapterid.find(suffix), suffix.size(),
".Node");
275 if (adapterid.find(instanceName) != 0)
277 Ice::Error error(communicator->getLogger());
278 error <<
"deployment error: `" << adapterid <<
"' does not start with `" << instanceName <<
"'";
279 throw IceBox::FailureException(__FILE__, __LINE__,
"IceGrid deployment is incorrect");
285 string::size_type start = instanceName.size();
286 while (start < adapterid.size() && !IceUtilInternal::isDigit(adapterid[start]))
290 string::size_type end = start;
291 while (end < adapterid.size() && IceUtilInternal::isDigit(adapterid[end]))
299 Ice::Error error(communicator->getLogger());
300 error <<
"deployment error: node id does not follow instance name. instance name:"
301 << instanceName <<
" adapter id: " << adapterid;
302 throw IceBox::FailureException(__FILE__, __LINE__,
"IceGrid deployment is incorrect");
305 int nodeid = atoi(adapterid.substr(start, end - start).c_str());
307 os <<
"node" << nodeid;
309 id.category = instanceName;
312 nodes[nodeid] = NodePrx::uncheckedCast((*p)->ice_adapterId(adapterid)->ice_identity(
id));
316 if (nodes.size() < 3)
318 Ice::Error error(communicator->getLogger());
319 error <<
"Replication requires at least 3 Nodes";
320 throw IceBox::FailureException(__FILE__, __LINE__,
"Replication requires at least 3 Nodes");
328 if (properties->getProperty(name +
".Node.ThreadPool.Size").empty())
331 os << nodes.size() + 1;
332 properties->setProperty(name +
".Node.ThreadPool.Size", os.str());
333 properties->setProperty(name +
".Node.ThreadPool.SizeWarn",
"0");
335 if (properties->getProperty(name +
".Node.MessageSizeMax").empty())
337 properties->setProperty(name +
".Node.MessageSizeMax",
"0");
343 new PersistentInstance(instanceName, name, communicator, publishAdapter, topicAdapter,
344 nodeAdapter, nodes[
id]);
345 _instance = instance;
347 _instance->observers()->setMajority(
static_cast<unsigned int>(nodes.size()) / 2);
351 if (traceLevels->election > 0)
353 Ice::Trace out(traceLevels->logger, traceLevels->electionCat);
354 out <<
"I am node " <<
id <<
"\n";
355 for (map<int, NodePrx>::const_iterator p = nodes.begin(); p != nodes.end(); ++p)
357 out <<
"\tnode: " << p->first <<
" proxy: " << p->second->ice_toString() <<
"\n";
361 if (topicManagerAdapterId.empty())
366 _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId));
372 _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->createIndirectProxy(topicManagerId));
376 topicAdapter->add(_manager->getServant(), topicManagerId);
381 nodeid.category = instanceName;
382 nodeid.name = os.str();
384 NodeIPtr node =
new NodeI(_instance, _manager, _managerProxy,
id, nodes);
385 _instance->setNode(node);
386 nodeAdapter->add(node, nodeid);
387 nodeAdapter->activate();
391 catch (
const IceUtil::Exception& ex)
396 s <<
"exception while starting IceStorm service " << name <<
":\n";
399 IceBox::FailureException e(__FILE__, __LINE__);
405 topicAdapter->add(
new FinderI(TopicManagerPrx::uncheckedCast(topicAdapter->createProxy(topicManagerId))),
406 stringToIdentity(
"IceStorm/Finder"));
408 topicAdapter->activate();
409 publishAdapter->activate();
428 string instanceName = communicator->getProperties()->getPropertyWithDefault(name +
".InstanceName",
"IceStorm");
429 _instance =
new Instance(instanceName, name, communicator, publishAdapter, topicAdapter, 0);
434 _managerProxy = TopicManagerPrx::uncheckedCast(topicAdapter->add(manager,
id));
436 catch (
const Ice::Exception& ex)
440 s <<
"exception while starting IceStorm service " << name <<
":\n";
443 IceBox::FailureException e(__FILE__, __LINE__);
450 ServiceI::getTopicManager()
const
452 return _managerProxy;
459 _instance->shutdown();
466 _manager->shutdown();
468 if (_transientManager)
470 _transientManager->shutdown();
476 _instance->destroy();
480 ServiceI::validateProperties(
const string& name,
const PropertiesPtr& properties,
const LoggerPtr& logger)
482 static const string suffixes[] =
484 "ReplicatedTopicManagerEndpoints",
485 "ReplicatedPublishEndpoints",
491 "Election.MasterTimeout",
492 "Election.ElectionTimeout",
493 "Election.ResponseTimeout",
497 "Publish.PublishedEndpoints",
498 "Publish.ReplicaGroupId",
500 "Publish.ThreadPool.Size",
501 "Publish.ThreadPool.SizeMax",
502 "Publish.ThreadPool.SizeWarn",
503 "Publish.ThreadPool.StackSize",
507 "Node.PublishedEndpoints",
508 "Node.ReplicaGroupId",
510 "Node.ThreadPool.Size",
511 "Node.ThreadPool.SizeMax",
512 "Node.ThreadPool.SizeWarn",
513 "Node.ThreadPool.StackSize",
514 "TopicManager.AdapterId",
515 "TopicManager.Endpoints",
516 "TopicManager.Locator",
517 "TopicManager.Proxy",
518 "TopicManager.Proxy.EndpointSelection",
519 "TopicManager.Proxy.ConnectionCached",
520 "TopicManager.Proxy.PreferSecure",
521 "TopicManager.Proxy.LocatorCacheTimeout",
522 "TopicManager.Proxy.Locator",
523 "TopicManager.Proxy.Router",
524 "TopicManager.Proxy.CollocationOptimization",
525 "TopicManager.PublishedEndpoints",
526 "TopicManager.ReplicaGroupId",
527 "TopicManager.Router",
528 "TopicManager.ThreadPool.Size",
529 "TopicManager.ThreadPool.SizeMax",
530 "TopicManager.ThreadPool.SizeWarn",
531 "TopicManager.ThreadPool.StackSize",
536 "Trace.TopicManager",
539 "Send.QueueSizeMaxPolicy",
545 vector<string> unknownProps;
546 string prefix = name +
".";
547 PropertyDict props = properties->getPropertiesForPrefix(prefix);
548 for (PropertyDict::const_iterator p = props.begin(); p != props.end(); ++p)
551 for (
unsigned int i = 0; i <
sizeof(suffixes) /
sizeof(*suffixes); ++i)
553 string prop = prefix + suffixes[i];
554 if (IceUtilInternal::match(p->first, prop))
562 unknownProps.push_back(p->first);
566 if (!unknownProps.empty())
569 out <<
"found unknown properties for IceStorm service '" << name <<
"':";
570 for (vector<string>::const_iterator p = unknownProps.begin(); p != unknownProps.end(); ++p)