22 class CheckTask :
public IceUtil::TimerTask
28 CheckTask(
const NodeIPtr& node) : _node(node) { }
29 virtual void runTimerTask()
35 class MergeTask :
public IceUtil::TimerTask
42 MergeTask(
const NodeIPtr& node,
const set<int>&
s) : _node(node), _s(
s) { }
43 virtual void runTimerTask()
49 class MergeContinueTask :
public IceUtil::TimerTask
55 MergeContinueTask(
const NodeIPtr& node) : _node(node) { }
56 virtual void runTimerTask()
58 _node->mergeContinue();
62 class TimeoutTask:
public IceUtil::TimerTask
68 TimeoutTask(
const NodeIPtr& node) : _node(node) { }
69 virtual void runTimerTask()
84 GroupNodeInfo::GroupNodeInfo(
int i) :
90 id(i), llu(l), observer(o)
109 #if defined(__clang__) && defined(_LIBCPP_VERSION)
114 const_cast<int&
>(this->
id) = other.
id;
126 int t = properties->getPropertyAsIntWithDefault(key, def);
129 Ice::Warning out(traceLevels->logger);
130 out << traceLevels->electionCat <<
": " << key <<
" < 0; Adjusted to 1";
133 return IceUtil::Time::seconds(t);
141 for (set<int>::const_iterator p =
s.begin(); p !=
s.end(); ++p)
157 const Ice::ObjectPrx& replicaProxy,
158 int id,
const map<int, NodePrx>& nodes) :
159 _timer(instance->timer()),
160 _traceLevels(instance->traceLevels()),
161 _observers(instance->observers()),
163 _replicaProxy(replicaProxy),
172 map<int, NodePrx> oneway;
173 for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
175 oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
177 const_cast<map<int, NodePrx>&
>(_nodesOneway) = oneway;
181 instance->serviceName() +
".Election.MasterTimeout", 10, properties, _traceLevels);
183 instance->serviceName() +
".Election.ElectionTimeout", 10, properties, _traceLevels);
185 instance->serviceName() +
".Election.ResponseTimeout", 10, properties, _traceLevels);
214 _checkTask =
new CheckTask(
this);
215 _timer->schedule(_checkTask, IceUtil::Time::seconds((_nodes.size() - _id) * 2));
233 _timer->schedule(_checkTask, _electionTimeout);
240 _observers->getReapedSlaves(dead);
243 for (vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
248 if (_traceLevels->election > 0)
250 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
251 out <<
"node " << _id <<
": reaping slave " << *p;
259 if (_up.size() < _nodes.size() / 2)
261 if (_traceLevels->election > 0)
263 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
264 out <<
"node " << _id <<
": stopping replica";
279 for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
287 if (p->second->areYouCoordinator())
293 tmpset.insert(p->first);
296 catch (
const Ice::Exception& ex)
298 if (_traceLevels->election > 0)
300 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
301 out <<
"node " << _id <<
": call on node " << p->first <<
" failed: " << ex;
323 _timer->schedule(_checkTask, _electionTimeout);
330 if (_traceLevels->election > 0)
332 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
333 out <<
"node " << _id <<
": highest priority node count: " <<
max;
340 delay = _mergeTimeout + _mergeTimeout * (
max - _id);
341 if (_traceLevels->election > 0)
343 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
344 out <<
"node " << _id <<
": scheduling merge in " << delay.toDuration()
350 _mergeTask =
new MergeTask(
this, tmpset);
351 _timer->schedule(_mergeTask, delay);
364 if (_destroy || _coord == _id)
375 map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
376 assert(p != _nodes.end());
377 if (!p->second->areYouThere(myGroup, _id))
379 if (_traceLevels->election > 0)
381 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
382 out <<
"node " << _id <<
": lost connection to coordinator " << myCoord
383 <<
": areYouThere returned false";
388 catch (
const Ice::Exception& ex)
390 if (_traceLevels->election > 0)
392 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
393 out <<
"node " << _id <<
": lost connection to coordinator " << myCoord <<
": " << ex;
424 while (!_destroy && _updateCounter > 0)
434 os << _id <<
":" << Ice::generateUUID();
438 _invitesAccepted.clear();
439 _invitesIssued.clear();
443 invited = coordinatorSet;
444 for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
446 invited.insert(p->id);
452 if (_traceLevels->election > 0)
454 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
455 out <<
"node " << _id <<
": inviting " <<
toString(invited) <<
" to group " << _group;
459 set<int>::iterator p = invited.begin();
460 while (p != invited.end())
464 if (_traceLevels->election > 0)
466 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
467 out <<
"node " << _id <<
": inviting node " << *p <<
" to group " << gp;
469 map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
470 assert(node != _nodesOneway.end());
471 node->second->invitation(_id, gp);
474 catch (
const Ice::Exception&)
489 _invitesIssued.insert(invited.begin(), invited.end());
491 if (_traceLevels->election > 0)
493 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
494 out <<
"node " << _id <<
": invites pending: " <<
toString(_invitesIssued);
498 assert(_mergeContinueTask == 0);
499 _mergeContinueTask =
new MergeContinueTask(
this);
505 if (_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted)
507 timeout = IceUtil::Time::seconds(0);
509 _timer->schedule(_mergeContinueTask,
timeout);
517 set<GroupNodeInfo> tmpSet;
529 assert(_mergeContinueTask);
530 _mergeContinueTask = 0;
536 if (_traceLevels->election > 0)
538 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
539 out <<
"node " << _id <<
": coordinator for " << (tmpSet.size() + 1) <<
" nodes (including myself)";
547 unsigned int ingroup =
static_cast<unsigned int>(tmpSet.size());
548 if ((_max != _nodes.size() && ingroup != _nodes.size() - 1) || ingroup < _nodes.size() / 2)
550 if (_traceLevels->election > 0)
552 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
553 out <<
"node " << _id <<
": not enough nodes " << (ingroup + 1) <<
"/" << _nodes.size()
554 <<
" for replication to commence";
555 if (_max != _nodes.size())
557 out <<
" (require full participation for startup)";
569 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
571 if (_traceLevels->election > 0)
573 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
574 out <<
"node id=" << p->id <<
" llu=" << p->llu.generation <<
"/" << p->llu.iteration;
584 LogUpdate myLlu = _replica->getLastLogUpdate();
585 if (_traceLevels->election > 0)
587 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
596 if (_traceLevels->election > 0)
598 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
599 out <<
"node " << _id <<
": syncing database state with node " << maxid;
603 map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
604 assert(node != _nodes.end());
605 _replica->sync(node->second->sync());
607 catch (
const Ice::Exception& ex)
609 if (_traceLevels->election > 0)
611 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
612 out <<
"node " << _id <<
": syncing database state with node "
613 << maxid <<
" failed: " << ex;
621 if (_traceLevels->election > 0)
623 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
624 out <<
"node " << _id <<
": I have the latest database state.";
630 unsigned int max =
static_cast<unsigned int>(tmpSet.size()) + 1;
648 _replica->initMaster(tmpSet, maxllu);
650 catch (
const Ice::Exception& ex)
652 if (_traceLevels->election > 0)
654 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
655 out <<
"node " << _id <<
": initMaster failed: " << ex;
662 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
666 map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
667 assert(node != _nodes.end());
668 node->second->ready(_id, gp, _replicaProxy,
max, maxllu.
generation);
670 catch (
const Ice::Exception& ex)
672 if (_traceLevels->election > 0)
674 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
675 out <<
"node " << _id <<
": error calling ready on " << p->id <<
" ex: " << ex;
688 if (_traceLevels->election > 0)
690 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
691 out <<
"node " << _id <<
": reporting for duty in group " << _group <<
" as coordinator. ";
692 out <<
"replication commencing with " << _up.size() + 1 <<
"/" << _nodes.size()
693 <<
" nodes with llu generation: " << maxllu.
generation;
696 _coordinatorProxy = 0;
701 _checkTask =
new CheckTask(
this);
702 _timer->schedule(_checkTask, _electionTimeout);
709 if (_traceLevels->election > 0)
711 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
712 out <<
"node " << _id <<
": invitation from " << j <<
" to group " << gn;
716 if (_nodes.find(j) == _nodes.end())
718 Ice::Warning warn(_traceLevels->logger);
719 warn << _traceLevels->electionCat <<
": ignoring invitation from unknown node " << j;
725 set<GroupNodeInfo> tmpSet;
736 if (_traceLevels->election > 0)
738 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
739 out <<
"node " << _id <<
": invitation ignored";
753 if (!_timer->cancel(_mergeTask))
767 while (!_destroy && _updateCounter > 0)
784 Ice::IntSeq forwardedInvites;
787 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
791 map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
792 assert(node != _nodesOneway.end());
793 node->second->invitation(j, gn);
794 forwardedInvites.push_back(p->id);
796 catch (
const Ice::Exception&)
816 _timeoutTask =
new TimeoutTask(
this);
817 _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
823 map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
824 assert(node != _nodesOneway.end());
825 node->second->accept(_id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(),
max);
827 catch (
const Ice::Exception&)
844 Ice::Warning warn(_traceLevels->logger);
845 warn << _traceLevels->electionCat <<
": ignoring ready call from replica node " << j
846 <<
" (real coordinator is " << _coord <<
")";
852 if (_traceLevels->election > 0)
854 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
855 out <<
"node " << _id <<
": reporting for duty in group " << gn <<
" with coordinator " << j;
858 if (
static_cast<unsigned int>(
max) > _max)
862 _generation = generation;
867 _coordinatorProxy = coordinator;
871 _checkTask =
new CheckTask(
this);
872 _timer->schedule(_checkTask, _electionTimeout);
878 NodeI::accept(
int j,
const string& gn,
const Ice::IntSeq& forwardedInvites,
const Ice::ObjectPrx& observer,
882 if (_nodes.find(j) == _nodes.end())
884 Ice::Warning warn(_traceLevels->logger);
885 warn << _traceLevels->electionCat <<
": ignoring accept from unknown node " << j;
894 if (
static_cast<unsigned int>(
max) > _max)
899 if (_traceLevels->election > 0)
901 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
902 out <<
"node " << _id <<
": accept " << j <<
" forward invites (";
903 for (Ice::IntSeq::const_iterator p = forwardedInvites.begin(); p != forwardedInvites.end(); ++p)
905 if (p != forwardedInvites.begin())
913 <<
" group size " << (_up.size() + 1);
919 _invitesIssued.insert(forwardedInvites.begin(), forwardedInvites.end());
921 _invitesAccepted.insert(j);
923 if (_traceLevels->election > 0)
925 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
926 out <<
"node " << _id <<
": invites pending: " <<
toString(_invitesIssued)
927 <<
" invites accepted: " <<
toString(_invitesAccepted);
934 if ((_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted) &&
935 _mergeContinueTask && _timer->cancel(_mergeContinueTask))
937 _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
953 return _group == gn && _coord == _id && _up.find(
GroupNodeInfo(j)) != _up.end();
959 return _replica->getSync();
966 for (map<int, NodePrx>::const_iterator
q = _nodes.begin();
q != _nodes.end(); ++
q)
989 for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
994 info.
up.push_back(gi);
1007 if (generation != -1 && generation != _generation)
1013 while (!_destroy && _updateCounter > 0)
1023 os << _id <<
":" << Ice::generateUUID();
1030 if (_traceLevels->election > 0)
1032 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1033 out <<
"node " << _id <<
": creating new self-coordinated group " << _group;
1039 _timer->cancel(_mergeTask);
1044 _timer->cancel(_timeoutTask);
1049 _checkTask =
new CheckTask(
this);
1050 _timer->schedule(_checkTask, _electionTimeout);
1060 while (_updateCounter > 0)
1070 _timer->cancel(_checkTask);
1076 _timer->cancel(_timeoutTask);
1082 _timer->cancel(_mergeTask);
1107 bool majority = _observers->check();
1112 if (!_coordinatorProxy && !_destroy && _state ==
NodeStateNormal && !majority)
1123 throw Ice::UnknownException(file, line);
1125 if (!_coordinatorProxy)
1129 generation = _generation;
1130 return _coordinatorProxy;
1136 bool majority = _observers->check();
1142 if (_destroy || _coordinatorProxy)
1174 throw Ice::UnknownException(file, line);
1176 generation = _generation;
1178 return _coordinatorProxy;
1187 throw Ice::UnknownException(file, line);
1193 if (!_coordinatorProxy)
1197 if (generation != _generation)
1210 assert(_updateCounter >= 0);
1211 if (_updateCounter == 0)
1229 return "reorganization";
1242 if (_traceLevels->election > 0)
1244 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1245 out <<
"node " << _id <<
": transition from " << stateToString(_state) <<
" to "
1246 << stateToString(
s);