22 class CheckTask :
public IceUtil::TimerTask
27 CheckTask(
const NodeIPtr& node) : _node(node)
38 class MergeTask :
public IceUtil::TimerTask
44 MergeTask(
const NodeIPtr& node,
const set<int>&
s) : _node(node), _s(
s)
55 class MergeContinueTask :
public IceUtil::TimerTask
60 MergeContinueTask(
const NodeIPtr& node) : _node(node)
67 _node->mergeContinue();
71 class TimeoutTask :
public IceUtil::TimerTask
76 TimeoutTask(
const NodeIPtr& node) : _node(node)
96 GroupNodeInfo::GroupNodeInfo(
int i) : id(i), llu(emptyLU)
101 id(i), llu(l), observer(o)
120 #if defined(__clang__) && defined(_LIBCPP_VERSION)
125 const_cast<int&
>(this->
id) = other.
id;
135 getTimeout(
const string& key,
140 int t = properties->getPropertyAsIntWithDefault(key, def);
143 Ice::Warning out(traceLevels->logger);
144 out << traceLevels->electionCat <<
": " << key <<
" < 0; Adjusted to 1";
147 return IceUtil::Time::seconds(t);
155 for (set<int>::const_iterator p =
s.begin(); p !=
s.end(); ++p)
171 const Ice::ObjectPrx& replicaProxy,
173 const map<int, NodePrx>& nodes) :
174 _timer(instance->timer()),
175 _traceLevels(instance->traceLevels()),
176 _observers(instance->observers()),
178 _replicaProxy(replicaProxy),
187 map<int, NodePrx> oneway;
188 for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
190 oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
192 const_cast<map<int, NodePrx>&
>(_nodesOneway) = oneway;
196 instance->serviceName() +
".Election.MasterTimeout", 10, properties, _traceLevels);
198 instance->serviceName() +
".Election.ElectionTimeout", 10, properties, _traceLevels);
200 instance->serviceName() +
".Election.ResponseTimeout", 10, properties, _traceLevels);
229 _checkTask =
new CheckTask(
this);
230 _timer->schedule(_checkTask, IceUtil::Time::seconds((_nodes.size() - _id) * 2));
248 _timer->schedule(_checkTask, _electionTimeout);
255 _observers->getReapedSlaves(dead);
258 for (vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
263 if (_traceLevels->election > 0)
265 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
266 out <<
"node " << _id <<
": reaping slave " << *p;
274 if (_up.size() < _nodes.size() / 2)
276 if (_traceLevels->election > 0)
278 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
279 out <<
"node " << _id <<
": stopping replica";
294 for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
302 if (p->second->areYouCoordinator())
308 tmpset.insert(p->first);
311 catch (
const Ice::Exception& ex)
313 if (_traceLevels->election > 0)
315 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
316 out <<
"node " << _id <<
": call on node " << p->first <<
" failed: " << ex;
339 _timer->schedule(_checkTask, _electionTimeout);
346 if (_traceLevels->election > 0)
348 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
349 out <<
"node " << _id <<
": highest priority node count: " <<
max;
356 delay = _mergeTimeout + _mergeTimeout * (
max - _id);
357 if (_traceLevels->election > 0)
359 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
360 out <<
"node " << _id <<
": scheduling merge in " << delay.toDuration() <<
" seconds";
365 _mergeTask =
new MergeTask(
this, tmpset);
366 _timer->schedule(_mergeTask, delay);
379 if (_destroy || _coord == _id)
390 map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
391 assert(p != _nodes.end());
392 if (!p->second->areYouThere(myGroup, _id))
394 if (_traceLevels->election > 0)
396 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
397 out <<
"node " << _id <<
": lost connection to coordinator " << myCoord
398 <<
": areYouThere returned false";
403 catch (
const Ice::Exception& ex)
405 if (_traceLevels->election > 0)
407 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
408 out <<
"node " << _id <<
": lost connection to coordinator " << myCoord <<
": " << ex;
439 while (!_destroy && _updateCounter > 0)
449 os << _id <<
":" << Ice::generateUUID();
453 _invitesAccepted.clear();
454 _invitesIssued.clear();
458 invited = coordinatorSet;
459 for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
461 invited.insert(p->id);
467 if (_traceLevels->election > 0)
469 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
470 out <<
"node " << _id <<
": inviting " <<
toString(invited) <<
" to group " << _group;
474 set<int>::iterator p = invited.begin();
475 while (p != invited.end())
479 if (_traceLevels->election > 0)
481 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
482 out <<
"node " << _id <<
": inviting node " << *p <<
" to group " << gp;
484 map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
485 assert(node != _nodesOneway.end());
486 node->second->invitation(_id, gp);
489 catch (
const Ice::Exception&)
504 _invitesIssued.insert(invited.begin(), invited.end());
506 if (_traceLevels->election > 0)
508 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
509 out <<
"node " << _id <<
": invites pending: " <<
toString(_invitesIssued);
513 assert(_mergeContinueTask == 0);
514 _mergeContinueTask =
new MergeContinueTask(
this);
520 if (_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted)
522 timeout = IceUtil::Time::seconds(0);
524 _timer->schedule(_mergeContinueTask,
timeout);
532 set<GroupNodeInfo> tmpSet;
544 assert(_mergeContinueTask);
545 _mergeContinueTask = 0;
551 if (_traceLevels->election > 0)
553 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
554 out <<
"node " << _id <<
": coordinator for " << (tmpSet.size() + 1)
555 <<
" nodes (including myself)";
563 unsigned int ingroup =
static_cast<unsigned int>(tmpSet.size());
564 if ((_max != _nodes.size() && ingroup != _nodes.size() - 1) || ingroup < _nodes.size() / 2)
566 if (_traceLevels->election > 0)
568 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
569 out <<
"node " << _id <<
": not enough nodes " << (ingroup + 1) <<
"/"
570 << _nodes.size() <<
" for replication to commence";
571 if (_max != _nodes.size())
573 out <<
" (require full participation for startup)";
585 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
587 if (_traceLevels->election > 0)
589 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
590 out <<
"node id=" << p->id <<
" llu=" << p->llu.generation <<
"/" << p->llu.iteration;
600 LogUpdate myLlu = _replica->getLastLogUpdate();
601 if (_traceLevels->election > 0)
603 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
612 if (_traceLevels->election > 0)
614 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
615 out <<
"node " << _id <<
": syncing database state with node " << maxid;
619 map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
620 assert(node != _nodes.end());
621 _replica->sync(node->second->sync());
623 catch (
const Ice::Exception& ex)
625 if (_traceLevels->election > 0)
627 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
628 out <<
"node " << _id <<
": syncing database state with node " << maxid
629 <<
" failed: " << ex;
637 if (_traceLevels->election > 0)
639 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
640 out <<
"node " << _id <<
": I have the latest database state.";
646 unsigned int max =
static_cast<unsigned int>(tmpSet.size()) + 1;
664 _replica->initMaster(tmpSet, maxllu);
666 catch (
const Ice::Exception& ex)
668 if (_traceLevels->election > 0)
670 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
671 out <<
"node " << _id <<
": initMaster failed: " << ex;
678 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
682 map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
683 assert(node != _nodes.end());
684 node->second->ready(_id, gp, _replicaProxy,
max, maxllu.
generation);
686 catch (
const Ice::Exception& ex)
688 if (_traceLevels->election > 0)
690 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
691 out <<
"node " << _id <<
": error calling ready on " << p->id <<
" ex: " << ex;
704 if (_traceLevels->election > 0)
706 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
707 out <<
"node " << _id <<
": reporting for duty in group " << _group
708 <<
" as coordinator. ";
709 out <<
"replication commencing with " << _up.size() + 1 <<
"/" << _nodes.size()
710 <<
" nodes with llu generation: " << maxllu.
generation;
713 _coordinatorProxy = 0;
718 _checkTask =
new CheckTask(
this);
719 _timer->schedule(_checkTask, _electionTimeout);
726 if (_traceLevels->election > 0)
728 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
729 out <<
"node " << _id <<
": invitation from " << j <<
" to group " << gn;
733 if (_nodes.find(j) == _nodes.end())
735 Ice::Warning warn(_traceLevels->logger);
736 warn << _traceLevels->electionCat <<
": ignoring invitation from unknown node " << j;
742 set<GroupNodeInfo> tmpSet;
753 if (_traceLevels->election > 0)
755 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
756 out <<
"node " << _id <<
": invitation ignored";
770 if (!_timer->cancel(_mergeTask))
784 while (!_destroy && _updateCounter > 0)
801 Ice::IntSeq forwardedInvites;
804 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
808 map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
809 assert(node != _nodesOneway.end());
810 node->second->invitation(j, gn);
811 forwardedInvites.push_back(p->id);
813 catch (
const Ice::Exception&)
833 _timeoutTask =
new TimeoutTask(
this);
834 _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
840 map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
841 assert(node != _nodesOneway.end());
842 node->second->accept(
843 _id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(),
max);
845 catch (
const Ice::Exception&)
855 const Ice::ObjectPrx& coordinator,
866 Ice::Warning warn(_traceLevels->logger);
867 warn << _traceLevels->electionCat <<
": ignoring ready call from replica node " << j
868 <<
" (real coordinator is " << _coord <<
")";
874 if (_traceLevels->election > 0)
876 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
877 out <<
"node " << _id <<
": reporting for duty in group " << gn <<
" with coordinator "
881 if (
static_cast<unsigned int>(
max) > _max)
885 _generation = generation;
890 _coordinatorProxy = coordinator;
894 _checkTask =
new CheckTask(
this);
895 _timer->schedule(_checkTask, _electionTimeout);
903 const Ice::IntSeq& forwardedInvites,
904 const Ice::ObjectPrx& observer,
910 if (_nodes.find(j) == _nodes.end())
912 Ice::Warning warn(_traceLevels->logger);
913 warn << _traceLevels->electionCat <<
": ignoring accept from unknown node " << j;
922 if (
static_cast<unsigned int>(
max) > _max)
927 if (_traceLevels->election > 0)
929 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
930 out <<
"node " << _id <<
": accept " << j <<
" forward invites (";
931 for (Ice::IntSeq::const_iterator p = forwardedInvites.begin();
932 p != forwardedInvites.end();
935 if (p != forwardedInvites.begin())
942 <<
" group size " << (_up.size() + 1);
948 _invitesIssued.insert(forwardedInvites.begin(), forwardedInvites.end());
950 _invitesAccepted.insert(j);
952 if (_traceLevels->election > 0)
954 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
955 out <<
"node " << _id <<
": invites pending: " <<
toString(_invitesIssued)
956 <<
" invites accepted: " <<
toString(_invitesAccepted);
963 if ((_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted) &&
964 _mergeContinueTask && _timer->cancel(_mergeContinueTask))
966 _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
982 return _group == gn && _coord == _id && _up.find(
GroupNodeInfo(j)) != _up.end();
988 return _replica->getSync();
995 for (map<int, NodePrx>::const_iterator
q = _nodes.begin();
q != _nodes.end(); ++
q)
1012 info.
coord = _coord;
1013 info.
group = _group;
1015 info.
state = _state;
1018 for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
1023 info.
up.push_back(gi);
1036 if (generation != -1 && generation != _generation)
1042 while (!_destroy && _updateCounter > 0)
1052 os << _id <<
":" << Ice::generateUUID();
1059 if (_traceLevels->election > 0)
1061 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1062 out <<
"node " << _id <<
": creating new self-coordinated group " << _group;
1068 _timer->cancel(_mergeTask);
1073 _timer->cancel(_timeoutTask);
1078 _checkTask =
new CheckTask(
this);
1079 _timer->schedule(_checkTask, _electionTimeout);
1089 while (_updateCounter > 0)
1099 _timer->cancel(_checkTask);
1105 _timer->cancel(_timeoutTask);
1111 _timer->cancel(_mergeTask);
1125 "init cannot block when state != NodeStateReorganization");
1137 bool majority = _observers->check();
1142 if (!_coordinatorProxy && !_destroy && _state ==
NodeStateNormal && !majority)
1153 throw Ice::UnknownException(file, line);
1155 if (!_coordinatorProxy)
1159 generation = _generation;
1160 return _coordinatorProxy;
1166 bool majority = _observers->check();
1172 if (_destroy || _coordinatorProxy)
1204 throw Ice::UnknownException(file, line);
1206 generation = _generation;
1208 return _coordinatorProxy;
1217 throw Ice::UnknownException(file, line);
1223 if (!_coordinatorProxy)
1227 if (generation != _generation)
1240 assert(_updateCounter >= 0);
1241 if (_updateCounter == 0)
1259 return "reorganization";
1272 if (_traceLevels->election > 0)
1274 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1275 out <<
"node " << _id <<
": transition from " << stateToString(_state) <<
" to "
1276 << stateToString(
s);