22 class CheckTask :
public IceUtil::TimerTask
27 CheckTask(
const NodeIPtr& node) : _node(node)
38 class MergeTask :
public IceUtil::TimerTask
44 MergeTask(
const NodeIPtr& node,
const set<int>& s) : _node(node), _s(
s)
55 class MergeContinueTask :
public IceUtil::TimerTask
60 MergeContinueTask(
const NodeIPtr& node) : _node(node)
67 _node->mergeContinue();
71 class TimeoutTask :
public IceUtil::TimerTask
76 TimeoutTask(
const NodeIPtr& node) : _node(node)
120#if defined(__clang__) && defined(_LIBCPP_VERSION)
125 const_cast<int&
>(this->
id) = other.
id;
127 const_cast<Ice::ObjectPrx&
>(this->
observer) = other.observer;
135 getTimeout(
const string& key,
140 int t = properties->getPropertyAsIntWithDefault(key, def);
143 Ice::Warning out(traceLevels->logger);
144 out << traceLevels->electionCat <<
": " << key <<
" < 0; Adjusted to 1";
147 return IceUtil::Time::seconds(t);
155 for (set<int>::const_iterator p =
s.begin(); p !=
s.end(); ++p)
171 const Ice::ObjectPrx& replicaProxy,
173 const map<int, NodePrx>&
nodes) :
174 _timer(instance->timer()),
175 _traceLevels(instance->traceLevels()),
176 _observers(instance->observers()),
178 _replicaProxy(replicaProxy),
187 map<int, NodePrx> oneway;
188 for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
190 oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
192 const_cast<map<int, NodePrx>&
>(_nodesOneway) = oneway;
195 const_cast<IceUtil::Time&
>(_masterTimeout) = getTimeout(
196 instance->serviceName() +
".Election.MasterTimeout", 10, properties, _traceLevels);
197 const_cast<IceUtil::Time&
>(_electionTimeout) = getTimeout(
198 instance->serviceName() +
".Election.ElectionTimeout", 10, properties, _traceLevels);
199 const_cast<IceUtil::Time&
>(_mergeTimeout) = getTimeout(
200 instance->serviceName() +
".Election.ResponseTimeout", 10, properties, _traceLevels);
229 _checkTask =
new CheckTask(
this);
230 _timer->schedule(_checkTask, IceUtil::Time::seconds((_nodes.size() - _id) * 2));
248 _timer->schedule(_checkTask, _electionTimeout);
255 _observers->getReapedSlaves(dead);
258 for (vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
263 if (_traceLevels->election > 0)
265 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
266 out <<
"node " << _id <<
": reaping slave " << *p;
274 if (_up.size() < _nodes.size() / 2)
276 if (_traceLevels->election > 0)
278 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
279 out <<
"node " << _id <<
": stopping replica";
294 for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
302 if (p->second->areYouCoordinator())
308 tmpset.insert(p->first);
311 catch (
const Ice::Exception& ex)
313 if (_traceLevels->election > 0)
315 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
316 out <<
"node " << _id <<
": call on node " << p->first <<
" failed: " << ex;
339 _timer->schedule(_checkTask, _electionTimeout);
346 if (_traceLevels->election > 0)
348 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
349 out <<
"node " << _id <<
": highest priority node count: " <<
max;
352 IceUtil::Time delay = IceUtil::Time::seconds(0);
356 delay = _mergeTimeout + _mergeTimeout * (
max - _id);
357 if (_traceLevels->election > 0)
359 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
360 out <<
"node " << _id <<
": scheduling merge in " << delay.toDuration() <<
" seconds";
365 _mergeTask =
new MergeTask(
this, tmpset);
366 _timer->schedule(_mergeTask, delay);
379 if (_destroy || _coord == _id)
390 map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
391 assert(p != _nodes.end());
392 if (!p->second->areYouThere(myGroup, _id))
394 if (_traceLevels->election > 0)
396 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
397 out <<
"node " << _id <<
": lost connection to coordinator " << myCoord
398 <<
": areYouThere returned false";
403 catch (
const Ice::Exception& ex)
405 if (_traceLevels->election > 0)
407 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
408 out <<
"node " << _id <<
": lost connection to coordinator " << myCoord <<
": " << ex;
439 while (!_destroy && _updateCounter > 0)
449 os << _id <<
":" << Ice::generateUUID();
453 _invitesAccepted.clear();
454 _invitesIssued.clear();
458 invited = coordinatorSet;
459 for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
461 invited.insert(p->id);
467 if (_traceLevels->election > 0)
469 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
470 out <<
"node " << _id <<
": inviting " << toString(invited) <<
" to group " << _group;
474 set<int>::iterator p = invited.begin();
475 while (p != invited.end())
479 if (_traceLevels->election > 0)
481 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
482 out <<
"node " << _id <<
": inviting node " << *p <<
" to group " << gp;
484 map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
485 assert(node != _nodesOneway.end());
486 node->second->invitation(_id, gp);
489 catch (
const Ice::Exception&)
504 _invitesIssued.insert(invited.begin(), invited.end());
506 if (_traceLevels->election > 0)
508 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
509 out <<
"node " << _id <<
": invites pending: " << toString(_invitesIssued);
513 assert(_mergeContinueTask == 0);
514 _mergeContinueTask =
new MergeContinueTask(
this);
519 IceUtil::Time
timeout = _mergeTimeout;
520 if (_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted)
522 timeout = IceUtil::Time::seconds(0);
524 _timer->schedule(_mergeContinueTask,
timeout);
532 set<GroupNodeInfo> tmpSet;
544 assert(_mergeContinueTask);
545 _mergeContinueTask = 0;
551 if (_traceLevels->election > 0)
553 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
554 out <<
"node " << _id <<
": coordinator for " << (tmpSet.size() + 1)
555 <<
" nodes (including myself)";
563 unsigned int ingroup =
static_cast<unsigned int>(tmpSet.size());
564 if ((_max != _nodes.size() && ingroup != _nodes.size() - 1) || ingroup < _nodes.size() / 2)
566 if (_traceLevels->election > 0)
568 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
569 out <<
"node " << _id <<
": not enough nodes " << (ingroup + 1) <<
"/"
570 << _nodes.size() <<
" for replication to commence";
571 if (_max != _nodes.size())
573 out <<
" (require full participation for startup)";
585 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
587 if (_traceLevels->election > 0)
589 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
590 out <<
"node id=" << p->id <<
" llu=" << p->llu.generation <<
"/" << p->llu.iteration;
600 LogUpdate myLlu = _replica->getLastLogUpdate();
601 if (_traceLevels->election > 0)
603 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
612 if (_traceLevels->election > 0)
614 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
615 out <<
"node " << _id <<
": syncing database state with node " << maxid;
619 map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
620 assert(node != _nodes.end());
621 _replica->sync(node->second->sync());
623 catch (
const Ice::Exception& ex)
625 if (_traceLevels->election > 0)
627 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
628 out <<
"node " << _id <<
": syncing database state with node " << maxid
629 <<
" failed: " << ex;
637 if (_traceLevels->election > 0)
639 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
640 out <<
"node " << _id <<
": I have the latest database state.";
646 unsigned int max =
static_cast<unsigned int>(tmpSet.size()) + 1;
664 _replica->initMaster(tmpSet, maxllu);
666 catch (
const Ice::Exception& ex)
668 if (_traceLevels->election > 0)
670 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
671 out <<
"node " << _id <<
": initMaster failed: " << ex;
678 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
682 map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
683 assert(node != _nodes.end());
684 node->second->ready(_id, gp, _replicaProxy,
max, maxllu.
generation);
686 catch (
const Ice::Exception& ex)
688 if (_traceLevels->election > 0)
690 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
691 out <<
"node " << _id <<
": error calling ready on " << p->id <<
" ex: " << ex;
704 if (_traceLevels->election > 0)
706 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
707 out <<
"node " << _id <<
": reporting for duty in group " << _group
708 <<
" as coordinator. ";
709 out <<
"replication commencing with " << _up.size() + 1 <<
"/" << _nodes.size()
710 <<
" nodes with llu generation: " << maxllu.
generation;
713 _coordinatorProxy = 0;
718 _checkTask =
new CheckTask(
this);
719 _timer->schedule(_checkTask, _electionTimeout);
726 if (_traceLevels->election > 0)
728 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
729 out <<
"node " << _id <<
": invitation from " << j <<
" to group " << gn;
733 if (_nodes.find(j) == _nodes.end())
735 Ice::Warning warn(_traceLevels->logger);
736 warn << _traceLevels->electionCat <<
": ignoring invitation from unknown node " << j;
742 set<GroupNodeInfo> tmpSet;
753 if (_traceLevels->election > 0)
755 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
756 out <<
"node " << _id <<
": invitation ignored";
770 if (!_timer->cancel(_mergeTask))
784 while (!_destroy && _updateCounter > 0)
801 Ice::IntSeq forwardedInvites;
804 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
808 map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
809 assert(node != _nodesOneway.end());
810 node->second->invitation(j, gn);
811 forwardedInvites.push_back(p->id);
813 catch (
const Ice::Exception&)
833 _timeoutTask =
new TimeoutTask(
this);
834 _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
840 map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
841 assert(node != _nodesOneway.end());
842 node->second->accept(
843 _id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(),
max);
845 catch (
const Ice::Exception&)
855 const Ice::ObjectPrx& coordinator,
857 Ice::Long generation,
866 Ice::Warning warn(_traceLevels->logger);
867 warn << _traceLevels->electionCat <<
": ignoring ready call from replica node " << j
868 <<
" (real coordinator is " << _coord <<
")";
874 if (_traceLevels->election > 0)
876 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
877 out <<
"node " << _id <<
": reporting for duty in group " << gn <<
" with coordinator "
881 if (
static_cast<unsigned int>(
max) > _max)
885 _generation = generation;
890 _coordinatorProxy = coordinator;
894 _checkTask =
new CheckTask(
this);
895 _timer->schedule(_checkTask, _electionTimeout);
903 const Ice::IntSeq& forwardedInvites,
904 const Ice::ObjectPrx& observer,
910 if (_nodes.find(j) == _nodes.end())
912 Ice::Warning warn(_traceLevels->logger);
913 warn << _traceLevels->electionCat <<
": ignoring accept from unknown node " << j;
922 if (
static_cast<unsigned int>(
max) > _max)
927 if (_traceLevels->election > 0)
929 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
930 out <<
"node " << _id <<
": accept " << j <<
" forward invites (";
931 for (Ice::IntSeq::const_iterator p = forwardedInvites.begin();
932 p != forwardedInvites.end();
935 if (p != forwardedInvites.begin())
942 <<
" group size " << (_up.size() + 1);
948 _invitesIssued.insert(forwardedInvites.begin(), forwardedInvites.end());
950 _invitesAccepted.insert(j);
952 if (_traceLevels->election > 0)
954 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
955 out <<
"node " << _id <<
": invites pending: " << toString(_invitesIssued)
956 <<
" invites accepted: " << toString(_invitesAccepted);
963 if ((_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted) &&
964 _mergeContinueTask && _timer->cancel(_mergeContinueTask))
966 _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
982 return _group == gn && _coord == _id && _up.find(
GroupNodeInfo(j)) != _up.end();
988 return _replica->getSync();
995 for (map<int, NodePrx>::const_iterator
q = _nodes.begin();
q != _nodes.end(); ++
q)
1012 info.
coord = _coord;
1013 info.
group = _group;
1015 info.
state = _state;
1018 for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
1023 info.
up.push_back(gi);
1036 if (generation != -1 && generation != _generation)
1042 while (!_destroy && _updateCounter > 0)
1052 os << _id <<
":" << Ice::generateUUID();
1059 if (_traceLevels->election > 0)
1061 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1062 out <<
"node " << _id <<
": creating new self-coordinated group " << _group;
1068 _timer->cancel(_mergeTask);
1073 _timer->cancel(_timeoutTask);
1078 _checkTask =
new CheckTask(
this);
1079 _timer->schedule(_checkTask, _electionTimeout);
1089 while (_updateCounter > 0)
1099 _timer->cancel(_checkTask);
1105 _timer->cancel(_timeoutTask);
1111 _timer->cancel(_mergeTask);
1125 "init cannot block when state != NodeStateReorganization");
1137 bool majority = _observers->check();
1142 if (!_coordinatorProxy && !_destroy && _state ==
NodeStateNormal && !majority)
1153 throw Ice::UnknownException(file, line);
1155 if (!_coordinatorProxy)
1159 generation = _generation;
1160 return _coordinatorProxy;
1166 bool majority = _observers->check();
1172 if (_destroy || _coordinatorProxy)
1204 throw Ice::UnknownException(file, line);
1206 generation = _generation;
1208 return _coordinatorProxy;
1217 throw Ice::UnknownException(file, line);
1223 if (!_coordinatorProxy)
1227 if (generation != _generation)
1240 assert(_updateCounter >= 0);
1241 if (_updateCounter == 0)
1259 return "reorganization";
1272 if (_traceLevels->election > 0)
1274 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1275 out <<
"node " << _id <<
": transition from " << stateToString(_state) <<
" to "
1276 << stateToString(s);
void merge(const std::set< int > &)
void checkObserverInit(Ice::Long)
Ice::ObjectPrx startUpdate(Ice::Long &, const char *, int)
virtual void accept(int, const std::string &, const Ice::IntSeq &, const Ice::ObjectPrx &, const LogUpdate &, int, const Ice::Current &)
virtual bool areYouThere(const std::string &, int, const Ice::Current &) const
virtual void invitation(int, const std::string &, const Ice::Current &)
bool updateMaster(const char *, int)
NodeI(const IceStorm::InstancePtr &, const ReplicaPtr &, const Ice::ObjectPrx &, int, const std::map< int, NodePrx > &)
void recovery(Ice::Long=-1)
virtual NodeInfoSeq nodes(const Ice::Current &) const
virtual void ready(int, const std::string &, const Ice::ObjectPrx &, int, Ice::Long, const Ice::Current &)
void startObserverUpdate(Ice::Long, const char *, int)
Ice::ObjectPrx startCachedRead(Ice::Long &, const char *, int)
Thrown if an observer detects an inconsistency.
idempotent Object * sync()
Get the sync object for the replica hosted by this node.
idempotent QueryInfo query()
Get the query information for the given node.
idempotent NodeInfoSeq nodes()
Get the replication group information.
idempotent bool areYouCoordinator()
Determine if this node is a coordinator.
@ NodeStateElection
The node is electing a leader.
@ NodeStateNormal
The replica group is active & replicating.
@ NodeStateInactive
The node is inactive and awaiting an election.
@ NodeStateReorganization
The replica group is reorganizing.
::std::vector<::IceStormElection::NodeInfo > NodeInfoSeq
A sequence of node info.
IceUtil::Handle< NodeI > NodeIPtr
IceUtil::Handle< Replica > ReplicaPtr
IceUtil::Handle< TraceLevels > TraceLevelsPtr
IceUtil::Handle< Instance > InstancePtr
::IceInternal::Handle<::Ice::Properties > PropertiesPtr
double s(double t, double s0, double v0, double a0, double j)
const char * toString(InteractionFeedbackType type)
LogUpdate llu
The last known log update for this node.
int id
The identity of the node.
bool operator==(const GroupNodeInfo &rhs) const
const Ice::ObjectPrx observer
bool operator<(const GroupNodeInfo &rhs) const
A struct used for marking the last log update.
All nodes in the replication group.
int id
The identity of the node.
NodeState state
The node state.
Object * replica
The replica the node is managing.
string group
The nodes group name.
GroupInfoSeq up
The sequence of nodes in this nodes group.
int coord
The nodes coordinator.
int max
The highest priority node that this node has seen.