NodeI.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <Ice/Ice.h>
11#include <IceStorm/NodeI.h>
12#include <IceStorm/Observers.h>
14
15using namespace IceStorm;
16using namespace IceStormElection;
17using namespace std;
18
19namespace
20{
21
22 class CheckTask : public IceUtil::TimerTask
23 {
24 const NodeIPtr _node;
25
26 public:
27 CheckTask(const NodeIPtr& node) : _node(node)
28 {
29 }
30
31 virtual void
32 runTimerTask()
33 {
34 _node->check();
35 }
36 };
37
38 class MergeTask : public IceUtil::TimerTask
39 {
40 const NodeIPtr _node;
41 const set<int> _s;
42
43 public:
44 MergeTask(const NodeIPtr& node, const set<int>& s) : _node(node), _s(s)
45 {
46 }
47
48 virtual void
49 runTimerTask()
50 {
51 _node->merge(_s);
52 }
53 };
54
55 class MergeContinueTask : public IceUtil::TimerTask
56 {
57 const NodeIPtr _node;
58
59 public:
60 MergeContinueTask(const NodeIPtr& node) : _node(node)
61 {
62 }
63
64 virtual void
65 runTimerTask()
66 {
67 _node->mergeContinue();
68 }
69 };
70
71 class TimeoutTask : public IceUtil::TimerTask
72 {
73 const NodeIPtr _node;
74
75 public:
76 TimeoutTask(const NodeIPtr& node) : _node(node)
77 {
78 }
79
80 virtual void
81 runTimerTask()
82 {
83 _node->timeout();
84 }
85 };
86
87} // namespace
88
89namespace
90{
91
92 LogUpdate emptyLU = {0, 0};
93
94}
95
96GroupNodeInfo::GroupNodeInfo(int i) : id(i), llu(emptyLU)
97{
98}
99
100GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, const Ice::ObjectPrx& o) :
101 id(i), llu(l), observer(o)
102{
103}
104
105bool
107{
108 return id < rhs.id;
109}
110
111bool
113{
114 return id == rhs.id;
115}
116
117//
118// COMPILER FIX: Clang using libc++ requires to define operator=
119//
120#if defined(__clang__) && defined(_LIBCPP_VERSION)
122GroupNodeInfo::operator=(const GroupNodeInfo& other)
123
124{
125 const_cast<int&>(this->id) = other.id;
126 const_cast<LogUpdate&>(this->llu) = other.llu;
127 const_cast<Ice::ObjectPrx&>(this->observer) = other.observer;
128 return *this;
129}
130#endif
131
132namespace
133{
134 static IceUtil::Time
135 getTimeout(const string& key,
136 int def,
137 const Ice::PropertiesPtr& properties,
138 const TraceLevelsPtr& traceLevels)
139 {
140 int t = properties->getPropertyAsIntWithDefault(key, def);
141 if (t < 0)
142 {
143 Ice::Warning out(traceLevels->logger);
144 out << traceLevels->electionCat << ": " << key << " < 0; Adjusted to 1";
145 t = 1;
146 }
147 return IceUtil::Time::seconds(t);
148 }
149
150 static string
151 toString(const set<int>& s)
152 {
153 ostringstream os;
154 os << "(";
155 for (set<int>::const_iterator p = s.begin(); p != s.end(); ++p)
156 {
157 if (p != s.begin())
158 {
159 os << ",";
160 }
161 os << *p;
162 }
163 os << ")";
164 return os.str();
165 }
166
167} // namespace
168
169NodeI::NodeI(const InstancePtr& instance,
170 const ReplicaPtr& replica,
171 const Ice::ObjectPrx& replicaProxy,
172 int id,
173 const map<int, NodePrx>& nodes) :
174 _timer(instance->timer()),
175 _traceLevels(instance->traceLevels()),
176 _observers(instance->observers()),
177 _replica(replica),
178 _replicaProxy(replicaProxy),
179 _id(id),
180 _nodes(nodes),
181 _state(NodeStateInactive),
182 _updateCounter(0),
183 _max(0),
184 _generation(-1),
185 _destroy(false)
186{
187 map<int, NodePrx> oneway;
188 for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
189 {
190 oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
191 }
192 const_cast<map<int, NodePrx>&>(_nodesOneway) = oneway;
193
194 Ice::PropertiesPtr properties = instance->communicator()->getProperties();
195 const_cast<IceUtil::Time&>(_masterTimeout) = getTimeout(
196 instance->serviceName() + ".Election.MasterTimeout", 10, properties, _traceLevels);
197 const_cast<IceUtil::Time&>(_electionTimeout) = getTimeout(
198 instance->serviceName() + ".Election.ElectionTimeout", 10, properties, _traceLevels);
199 const_cast<IceUtil::Time&>(_mergeTimeout) = getTimeout(
200 instance->serviceName() + ".Election.ResponseTimeout", 10, properties, _traceLevels);
201}
202
203void
205{
206 // As an optimization we want the initial election to occur as
207 // soon as possible.
208 //
209 // However, if we have the node trigger the election immediately
210 // upon startup then we'll have a clash with lower priority nodes
211 // starting an election denying a higher priority node the
212 // opportunity to start the election that results in it becoming
213 // the leader. Of course, things will eventually reach a stable
214 // state but it will take longer.
215 //
216 // As such as we schedule the initial election check inversely
217 // proportional to our priority.
218 //
219 // By setting _checkTask first we stop recovery() from setting it
220 // to the regular election interval.
221 //
222
223 //
224 // We use this lock to ensure that recovery is called before CheckTask
225 // is scheduled, even if timeout is 0
226 //
227 Lock sync(*this);
228
229 _checkTask = new CheckTask(this);
230 _timer->schedule(_checkTask, IceUtil::Time::seconds((_nodes.size() - _id) * 2));
231 recovery();
232}
233
234void
236{
237 {
238 Lock sync(*this);
239 if (_destroy)
240 {
241 return;
242 }
243 assert(!_mergeTask);
244
245 if (_state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
246 {
247 assert(_checkTask);
248 _timer->schedule(_checkTask, _electionTimeout);
249 return;
250 }
251
252 // Next get the set of nodes that were detected as unreachable
253 // from the replica and remove them from our slave list.
254 vector<int> dead;
255 _observers->getReapedSlaves(dead);
256 if (!dead.empty())
257 {
258 for (vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
259 {
260 set<GroupNodeInfo>::iterator q = _up.find(GroupNodeInfo(*p));
261 if (q != _up.end())
262 {
263 if (_traceLevels->election > 0)
264 {
265 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
266 out << "node " << _id << ": reaping slave " << *p;
267 }
268 _up.erase(q);
269 }
270 }
271
272 // If we no longer have the majority of the nodes under our
273 // care then we need to stop our replica.
274 if (_up.size() < _nodes.size() / 2)
275 {
276 if (_traceLevels->election > 0)
277 {
278 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
279 out << "node " << _id << ": stopping replica";
280 }
281 // Clear _checkTask -- recovery() will reset the
282 // timer.
283 assert(_checkTask);
284 _checkTask = 0;
285 recovery();
286 return;
287 }
288 }
289 }
290
291 // See if other groups exist for possible merge.
292 set<int> tmpset;
293 int max = -1;
294 for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
295 {
296 if (p->first == _id)
297 {
298 continue;
299 }
300 try
301 {
302 if (p->second->areYouCoordinator())
303 {
304 if (p->first > max)
305 {
306 max = p->first;
307 }
308 tmpset.insert(p->first);
309 }
310 }
311 catch (const Ice::Exception& ex)
312 {
313 if (_traceLevels->election > 0)
314 {
315 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
316 out << "node " << _id << ": call on node " << p->first << " failed: " << ex;
317 }
318 }
319 }
320
321 Lock sync(*this);
322
323 // If the node state has changed while the mutex has been released
324 // then bail. We don't schedule a re-check since we're either
325 // destroyed in which case we're going to terminate or the end of
326 // the election/reorg will re-schedule the check.
327 if (_destroy || _state == NodeStateElection || _state == NodeStateReorganization ||
328 _coord != _id)
329 {
330 _checkTask = 0;
331 return;
332 }
333
334 // If we didn't find any coordinators then we're done. Reschedule
335 // the next check and terminate.
336 if (tmpset.empty())
337 {
338 assert(_checkTask);
339 _timer->schedule(_checkTask, _electionTimeout);
340 return;
341 }
342
343 // _checkTask == 0 means that the check isn't scheduled.
344 _checkTask = 0;
345
346 if (_traceLevels->election > 0)
347 {
348 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
349 out << "node " << _id << ": highest priority node count: " << max;
350 }
351
352 IceUtil::Time delay = IceUtil::Time::seconds(0);
353 if (_id < max)
354 {
355 // Reschedule timer proportial to p-i.
356 delay = _mergeTimeout + _mergeTimeout * (max - _id);
357 if (_traceLevels->election > 0)
358 {
359 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
360 out << "node " << _id << ": scheduling merge in " << delay.toDuration() << " seconds";
361 }
362 }
363
364 assert(!_mergeTask);
365 _mergeTask = new MergeTask(this, tmpset);
366 _timer->schedule(_mergeTask, delay);
367}
368
369// Called if the node has not heard from the coordinator in some time.
370void
372{
373 int myCoord;
374 string myGroup;
375 {
376 Lock sync(*this);
377 // If we're destroyed or we are our own coordinator then we're
378 // done.
379 if (_destroy || _coord == _id)
380 {
381 return;
382 }
383 myCoord = _coord;
384 myGroup = _group;
385 }
386
387 bool failed = false;
388 try
389 {
390 map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
391 assert(p != _nodes.end());
392 if (!p->second->areYouThere(myGroup, _id))
393 {
394 if (_traceLevels->election > 0)
395 {
396 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
397 out << "node " << _id << ": lost connection to coordinator " << myCoord
398 << ": areYouThere returned false";
399 }
400 failed = true;
401 }
402 }
403 catch (const Ice::Exception& ex)
404 {
405 if (_traceLevels->election > 0)
406 {
407 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
408 out << "node " << _id << ": lost connection to coordinator " << myCoord << ": " << ex;
409 }
410 failed = true;
411 }
412 if (failed)
413 {
414 recovery();
415 }
416}
417
418void
419NodeI::merge(const set<int>& coordinatorSet)
420{
421 set<int> invited;
422 string gp;
423 {
424 Lock sync(*this);
425 _mergeTask = 0;
426
427 // If the node is currently in an election, or reorganizing
428 // then we're done.
429 if (_state == NodeStateElection || _state == NodeStateReorganization)
430 {
431 return;
432 }
433
434 // This state change prevents this node from accepting
435 // invitations while the merge is executing.
436 setState(NodeStateElection);
437
438 // No more replica changes are permitted.
439 while (!_destroy && _updateCounter > 0)
440 {
441 wait();
442 }
443 if (_destroy)
444 {
445 return;
446 }
447
448 ostringstream os;
449 os << _id << ":" << Ice::generateUUID();
450 _group = os.str();
451 gp = _group;
452
453 _invitesAccepted.clear();
454 _invitesIssued.clear();
455
456 // Construct a set of node ids to invite. This is the union of
457 // _up and set of coordinators gathered in the check stage.
458 invited = coordinatorSet;
459 for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
460 {
461 invited.insert(p->id);
462 }
463
464 _coord = _id;
465 _up.clear();
466
467 if (_traceLevels->election > 0)
468 {
469 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
470 out << "node " << _id << ": inviting " << toString(invited) << " to group " << _group;
471 }
472 }
473
474 set<int>::iterator p = invited.begin();
475 while (p != invited.end())
476 {
477 try
478 {
479 if (_traceLevels->election > 0)
480 {
481 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
482 out << "node " << _id << ": inviting node " << *p << " to group " << gp;
483 }
484 map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
485 assert(node != _nodesOneway.end());
486 node->second->invitation(_id, gp);
487 ++p;
488 }
489 catch (const Ice::Exception&)
490 {
491 invited.erase(p++);
492 }
493 }
494
495 // Now we wait for responses to our invitation.
496 {
497 Lock sync(*this);
498 if (_destroy)
499 {
500 return;
501 }
502
503 // Add each of the invited nodes in the invites issed set.
504 _invitesIssued.insert(invited.begin(), invited.end());
505
506 if (_traceLevels->election > 0)
507 {
508 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
509 out << "node " << _id << ": invites pending: " << toString(_invitesIssued);
510 }
511
512 // Schedule the mergeContinueTask.
513 assert(_mergeContinueTask == 0);
514 _mergeContinueTask = new MergeContinueTask(this);
515
516 // At this point we may have already accepted all of the
517 // invitations, if so then we want to schedule the
518 // mergeContinue immediately.
519 IceUtil::Time timeout = _mergeTimeout;
520 if (_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted)
521 {
522 timeout = IceUtil::Time::seconds(0);
523 }
524 _timer->schedule(_mergeContinueTask, timeout);
525 }
526}
527
528void
530{
531 string gp;
532 set<GroupNodeInfo> tmpSet;
533 {
534 Lock sync(*this);
535 if (_destroy)
536 {
537 return;
538 }
539
540 // Copy variables for thread safety.
541 gp = _group;
542 tmpSet = _up;
543
544 assert(_mergeContinueTask);
545 _mergeContinueTask = 0;
546
547 // The node is now reorganizing.
548 assert(_state == NodeStateElection);
549 setState(NodeStateReorganization);
550
551 if (_traceLevels->election > 0)
552 {
553 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
554 out << "node " << _id << ": coordinator for " << (tmpSet.size() + 1)
555 << " nodes (including myself)";
556 }
557
558 // Now we need to decide whether we can start serving content. If
559 // we're on initial startup then we need all nodes to participate
560 // in the election. If we're running a subsequent election then we
561 // need a majority of the nodes to be active in order to start
562 // running.
563 unsigned int ingroup = static_cast<unsigned int>(tmpSet.size());
564 if ((_max != _nodes.size() && ingroup != _nodes.size() - 1) || ingroup < _nodes.size() / 2)
565 {
566 if (_traceLevels->election > 0)
567 {
568 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
569 out << "node " << _id << ": not enough nodes " << (ingroup + 1) << "/"
570 << _nodes.size() << " for replication to commence";
571 if (_max != _nodes.size())
572 {
573 out << " (require full participation for startup)";
574 }
575 }
576 recovery();
577 return;
578 }
579 }
580
581 // Find out who has the highest available set of database
582 // updates.
583 int maxid = -1;
584 LogUpdate maxllu = {-1, 0};
585 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
586 {
587 if (_traceLevels->election > 0)
588 {
589 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
590 out << "node id=" << p->id << " llu=" << p->llu.generation << "/" << p->llu.iteration;
591 }
592 if (p->llu.generation > maxllu.generation ||
593 (p->llu.generation == maxllu.generation && p->llu.iteration > maxllu.iteration))
594 {
595 maxid = p->id;
596 maxllu = p->llu;
597 }
598 }
599
600 LogUpdate myLlu = _replica->getLastLogUpdate();
601 if (_traceLevels->election > 0)
602 {
603 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
604 out << "node id=" << _id << " llu=" << myLlu.generation << "/" << myLlu.iteration;
605 }
606
607 // If its not us then we have to get the latest database data from
608 // the replica with the latest set.
609 //if(maxllu > _replica->getLastLogUpdate())
610 if (maxllu > myLlu)
611 {
612 if (_traceLevels->election > 0)
613 {
614 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
615 out << "node " << _id << ": syncing database state with node " << maxid;
616 }
617 try
618 {
619 map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
620 assert(node != _nodes.end());
621 _replica->sync(node->second->sync());
622 }
623 catch (const Ice::Exception& ex)
624 {
625 if (_traceLevels->election > 0)
626 {
627 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
628 out << "node " << _id << ": syncing database state with node " << maxid
629 << " failed: " << ex;
630 }
631 recovery();
632 return;
633 }
634 }
635 else
636 {
637 if (_traceLevels->election > 0)
638 {
639 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
640 out << "node " << _id << ": I have the latest database state.";
641 }
642 }
643
644 // At this point we've ensured that we have the latest database
645 // state, as such we can set our _max flag.
646 unsigned int max = static_cast<unsigned int>(tmpSet.size()) + 1;
647 {
648 Lock sync(*this);
649 if (max > _max)
650 {
651 _max = max;
652 }
653 max = _max;
654 }
655
656 // Prepare the LogUpdate for this generation.
657 maxllu.generation++;
658 maxllu.iteration = 0;
659
660 try
661 {
662 // Tell the replica that it is now the master with the given
663 // set of slaves and llu generation.
664 _replica->initMaster(tmpSet, maxllu);
665 }
666 catch (const Ice::Exception& ex)
667 {
668 if (_traceLevels->election > 0)
669 {
670 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
671 out << "node " << _id << ": initMaster failed: " << ex;
672 }
673 recovery();
674 return;
675 }
676
677 // Tell each node to go.
678 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
679 {
680 try
681 {
682 map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
683 assert(node != _nodes.end());
684 node->second->ready(_id, gp, _replicaProxy, max, maxllu.generation);
685 }
686 catch (const Ice::Exception& ex)
687 {
688 if (_traceLevels->election > 0)
689 {
690 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
691 out << "node " << _id << ": error calling ready on " << p->id << " ex: " << ex;
692 }
693 recovery();
694 return;
695 }
696 }
697
698 {
699 Lock sync(*this);
700 if (_destroy)
701 {
702 return;
703 }
704 if (_traceLevels->election > 0)
705 {
706 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
707 out << "node " << _id << ": reporting for duty in group " << _group
708 << " as coordinator. ";
709 out << "replication commencing with " << _up.size() + 1 << "/" << _nodes.size()
710 << " nodes with llu generation: " << maxllu.generation;
711 }
712 setState(NodeStateNormal);
713 _coordinatorProxy = 0;
714
715 _generation = maxllu.generation;
716
717 assert(!_checkTask);
718 _checkTask = new CheckTask(this);
719 _timer->schedule(_checkTask, _electionTimeout);
720 }
721}
722
723void
724NodeI::invitation(int j, const string& gn, const Ice::Current&)
725{
726 if (_traceLevels->election > 0)
727 {
728 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
729 out << "node " << _id << ": invitation from " << j << " to group " << gn;
730 }
731
732 // Verify that j exists in our node set.
733 if (_nodes.find(j) == _nodes.end())
734 {
735 Ice::Warning warn(_traceLevels->logger);
736 warn << _traceLevels->electionCat << ": ignoring invitation from unknown node " << j;
737 return;
738 }
739
740 int tmpCoord = -1;
741 int max = -1;
742 set<GroupNodeInfo> tmpSet;
743 {
744 Lock sync(*this);
745 if (_destroy)
746 {
747 return;
748 }
749 // If we're in the election or reorg state a merge has already
750 // started, so ignore the invitation.
751 if (_state == NodeStateElection || _state == NodeStateReorganization)
752 {
753 if (_traceLevels->election > 0)
754 {
755 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
756 out << "node " << _id << ": invitation ignored";
757 }
758 return;
759 }
760
761 //
762 // Upon receipt of an invitation we cancel any pending merge
763 // task.
764 //
765 if (_mergeTask)
766 {
767 // If the timer doesn't cancel it means that the timer has
768 // fired and the merge is currently in-progress in which
769 // case we should reject the invitation.
770 if (!_timer->cancel(_mergeTask))
771 {
772 // The merge task is cleared in the merge. This
773 // ensures two invitations cannot cause a race with
774 // the merge.
775 //_mergeTask = 0;
776 return;
777 }
778 _mergeTask = 0;
779 }
780
781 // We're now joining with another group. If we are active we
782 // must stop serving as a master or slave.
783 setState(NodeStateElection);
784 while (!_destroy && _updateCounter > 0)
785 {
786 wait();
787 }
788 if (_destroy)
789 {
790 return;
791 }
792
793 tmpCoord = _coord;
794 tmpSet = _up;
795
796 _coord = j;
797 _group = gn;
798 max = _max;
799 }
800
801 Ice::IntSeq forwardedInvites;
802 if (tmpCoord == _id) // Forward invitation to my old members.
803 {
804 for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
805 {
806 try
807 {
808 map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
809 assert(node != _nodesOneway.end());
810 node->second->invitation(j, gn);
811 forwardedInvites.push_back(p->id);
812 }
813 catch (const Ice::Exception&)
814 {
815 }
816 }
817 }
818
819 // Set the state and timer before calling accept. This ensures
820 // that if ready is called directly after accept is called then
821 // everything is fine. Setting the state *after* calling accept
822 // can cause a race.
823 {
824 Lock sync(*this);
825 if (_destroy)
826 {
827 return;
828 }
829 assert(_state == NodeStateElection);
830 setState(NodeStateReorganization);
831 if (!_timeoutTask)
832 {
833 _timeoutTask = new TimeoutTask(this);
834 _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
835 }
836 }
837
838 try
839 {
840 map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
841 assert(node != _nodesOneway.end());
842 node->second->accept(
843 _id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(), max);
844 }
845 catch (const Ice::Exception&)
846 {
847 recovery();
848 return;
849 }
850}
851
852void
854 const string& gn,
855 const Ice::ObjectPrx& coordinator,
856 int max,
857 Ice::Long generation,
858 const Ice::Current&)
859{
860 Lock sync(*this);
861 if (!_destroy && _state == NodeStateReorganization && _group == gn)
862 {
863 // The coordinator must be j (this was set in the invitation).
864 if (_coord != j)
865 {
866 Ice::Warning warn(_traceLevels->logger);
867 warn << _traceLevels->electionCat << ": ignoring ready call from replica node " << j
868 << " (real coordinator is " << _coord << ")";
869 return;
870 }
871
872 // Here we've already validated j in the invite call
873 // (otherwise _group != gn).
874 if (_traceLevels->election > 0)
875 {
876 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
877 out << "node " << _id << ": reporting for duty in group " << gn << " with coordinator "
878 << j;
879 }
880
881 if (static_cast<unsigned int>(max) > _max)
882 {
883 _max = max;
884 }
885 _generation = generation;
886
887 // Activate the replica here since the replica is now ready
888 // for duty.
889 setState(NodeStateNormal);
890 _coordinatorProxy = coordinator;
891
892 if (!_checkTask)
893 {
894 _checkTask = new CheckTask(this);
895 _timer->schedule(_checkTask, _electionTimeout);
896 }
897 }
898}
899
900void
902 const string& gn,
903 const Ice::IntSeq& forwardedInvites,
904 const Ice::ObjectPrx& observer,
905 const LogUpdate& llu,
906 int max,
907 const Ice::Current&)
908{
909 // Verify that j exists in our node set.
910 if (_nodes.find(j) == _nodes.end())
911 {
912 Ice::Warning warn(_traceLevels->logger);
913 warn << _traceLevels->electionCat << ": ignoring accept from unknown node " << j;
914 return;
915 }
916
917 Lock sync(*this);
918 if (!_destroy && _state == NodeStateElection && _group == gn && _coord == _id)
919 {
920 _up.insert(GroupNodeInfo(j, llu, observer));
921
922 if (static_cast<unsigned int>(max) > _max)
923 {
924 _max = max;
925 }
926
927 if (_traceLevels->election > 0)
928 {
929 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
930 out << "node " << _id << ": accept " << j << " forward invites (";
931 for (Ice::IntSeq::const_iterator p = forwardedInvites.begin();
932 p != forwardedInvites.end();
933 ++p)
934 {
935 if (p != forwardedInvites.begin())
936 {
937 out << ",";
938 }
939 out << *p;
940 }
941 out << ") with llu " << llu.generation << "/" << llu.iteration << " into group " << gn
942 << " group size " << (_up.size() + 1);
943 }
944
945 // Add each of the forwarded invites to the list of issued
946 // invitations. This doesn't use set_union since
947 // forwardedInvites may not be sorted.
948 _invitesIssued.insert(forwardedInvites.begin(), forwardedInvites.end());
949 // We've accepted the invitation from node j.
950 _invitesAccepted.insert(j);
951
952 if (_traceLevels->election > 0)
953 {
954 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
955 out << "node " << _id << ": invites pending: " << toString(_invitesIssued)
956 << " invites accepted: " << toString(_invitesAccepted);
957 }
958
959 // If invitations have been accepted from all nodes and the
960 // merge task has already been scheduled then reschedule the
961 // merge continue immediately. Otherwise, we let the existing
962 // merge() schedule continue.
963 if ((_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted) &&
964 _mergeContinueTask && _timer->cancel(_mergeContinueTask))
965 {
966 _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
967 }
968 }
969}
970
971bool
972NodeI::areYouCoordinator(const Ice::Current&) const
973{
974 Lock sync(*this);
975 return _state != NodeStateElection && _state != NodeStateReorganization && _coord == _id;
976}
977
978bool
979NodeI::areYouThere(const string& gn, int j, const Ice::Current&) const
980{
981 Lock sync(*this);
982 return _group == gn && _coord == _id && _up.find(GroupNodeInfo(j)) != _up.end();
983}
984
985Ice::ObjectPrx
986NodeI::sync(const Ice::Current&) const
987{
988 return _replica->getSync();
989}
990
992NodeI::nodes(const Ice::Current&) const
993{
994 NodeInfoSeq seq;
995 for (map<int, NodePrx>::const_iterator q = _nodes.begin(); q != _nodes.end(); ++q)
996 {
997 NodeInfo ni;
998 ni.id = q->first;
999 ni.n = q->second;
1000 seq.push_back(ni);
1001 }
1002
1003 return seq;
1004}
1005
1007NodeI::query(const Ice::Current&) const
1008{
1009 Lock sync(*this);
1010 QueryInfo info;
1011 info.id = _id;
1012 info.coord = _coord;
1013 info.group = _group;
1014 info.replica = _replicaProxy;
1015 info.state = _state;
1016 info.max = _max;
1017
1018 for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
1019 {
1020 GroupInfo gi;
1021 gi.id = p->id;
1022 gi.llu = p->llu;
1023 info.up.push_back(gi);
1024 }
1025
1026 return info;
1027}
1028
1029void
1030NodeI::recovery(Ice::Long generation)
1031{
1032 Lock sync(*this);
1033
1034 // Ignore the recovery if the node has already advanced a
1035 // generation.
1036 if (generation != -1 && generation != _generation)
1037 {
1038 return;
1039 }
1040
1041 setState(NodeStateInactive);
1042 while (!_destroy && _updateCounter > 0)
1043 {
1044 wait();
1045 }
1046 if (_destroy)
1047 {
1048 return;
1049 }
1050
1051 ostringstream os;
1052 os << _id << ":" << Ice::generateUUID();
1053 _group = os.str();
1054
1055 _generation = -1;
1056 _coord = _id;
1057 _up.clear();
1058
1059 if (_traceLevels->election > 0)
1060 {
1061 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1062 out << "node " << _id << ": creating new self-coordinated group " << _group;
1063 }
1064
1065 // Reset the timer states.
1066 if (_mergeTask)
1067 {
1068 _timer->cancel(_mergeTask);
1069 _mergeTask = 0;
1070 }
1071 if (_timeoutTask)
1072 {
1073 _timer->cancel(_timeoutTask);
1074 _timeoutTask = 0;
1075 }
1076 if (!_checkTask)
1077 {
1078 _checkTask = new CheckTask(this);
1079 _timer->schedule(_checkTask, _electionTimeout);
1080 }
1081}
1082
1083void
1085{
1086 Lock sync(*this);
1087 assert(!_destroy);
1088
1089 while (_updateCounter > 0)
1090 {
1091 wait();
1092 }
1093 _destroy = true;
1094 notifyAll();
1095
1096 // Cancel the timers.
1097 if (_checkTask)
1098 {
1099 _timer->cancel(_checkTask);
1100 _checkTask = 0;
1101 }
1102
1103 if (_timeoutTask)
1104 {
1105 _timer->cancel(_timeoutTask);
1106 _timeoutTask = 0;
1107 }
1108
1109 if (_mergeTask)
1110 {
1111 _timer->cancel(_mergeTask);
1112 _mergeTask = 0;
1113 }
1114}
1115
1116// A node should only receive an observer init call if the node is
1117// reorganizing and its not the coordinator.
1118void
1119NodeI::checkObserverInit(Ice::Long /*generation*/)
1120{
1121 Lock sync(*this);
1122 if (_state != NodeStateReorganization)
1123 {
1125 "init cannot block when state != NodeStateReorganization");
1126 }
1127 if (_coord == _id)
1128 {
1129 throw ObserverInconsistencyException("init called on coordinator");
1130 }
1131}
1132
1133// Notify the node that we're about to start an update.
1134Ice::ObjectPrx
1135NodeI::startUpdate(Ice::Long& generation, const char* file, int line)
1136{
1137 bool majority = _observers->check();
1138
1139 Lock sync(*this);
1140
1141 // If we've actively replicating & lost the majority of our replicas then recover.
1142 if (!_coordinatorProxy && !_destroy && _state == NodeStateNormal && !majority)
1143 {
1144 recovery();
1145 }
1146
1147 while (!_destroy && _state != NodeStateNormal)
1148 {
1149 wait();
1150 }
1151 if (_destroy)
1152 {
1153 throw Ice::UnknownException(file, line);
1154 }
1155 if (!_coordinatorProxy)
1156 {
1157 ++_updateCounter;
1158 }
1159 generation = _generation;
1160 return _coordinatorProxy;
1161}
1162
1163bool
1164NodeI::updateMaster(const char* /*file*/, int /*line*/)
1165{
1166 bool majority = _observers->check();
1167
1168 Lock sync(*this);
1169
1170 // If the node is destroyed, or is not a coordinator then we're
1171 // done.
1172 if (_destroy || _coordinatorProxy)
1173 {
1174 return false;
1175 }
1176
1177 // If we've lost the majority of our replicas then recover.
1178 if (_state == NodeStateNormal && !majority)
1179 {
1180 recovery();
1181 }
1182
1183 // If we're not replicating then we're done.
1184 if (_state != NodeStateNormal)
1185 {
1186 return false;
1187 }
1188
1189 // Otherwise adjust the update counter, and return success.
1190 ++_updateCounter;
1191 return true;
1192}
1193
1194Ice::ObjectPrx
1195NodeI::startCachedRead(Ice::Long& generation, const char* file, int line)
1196{
1197 Lock sync(*this);
1198 while (!_destroy && _state != NodeStateNormal)
1199 {
1200 wait();
1201 }
1202 if (_destroy)
1203 {
1204 throw Ice::UnknownException(file, line);
1205 }
1206 generation = _generation;
1207 ++_updateCounter;
1208 return _coordinatorProxy;
1209}
1210
1211void
1212NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line)
1213{
1214 Lock sync(*this);
1215 if (_destroy)
1216 {
1217 throw Ice::UnknownException(file, line);
1218 }
1219 if (_state != NodeStateNormal)
1220 {
1221 throw ObserverInconsistencyException("update called on inactive node");
1222 }
1223 if (!_coordinatorProxy)
1224 {
1225 throw ObserverInconsistencyException("update called on the master");
1226 }
1227 if (generation != _generation)
1228 {
1229 throw ObserverInconsistencyException("invalid generation");
1230 }
1231 ++_updateCounter;
1232}
1233
1234void
1236{
1237 Lock sync(*this);
1238 assert(!_destroy);
1239 --_updateCounter;
1240 assert(_updateCounter >= 0);
1241 if (_updateCounter == 0)
1242 {
1243 notifyAll();
1244 }
1245}
1246
1247namespace
1248{
1249 static string
1250 stateToString(NodeState s)
1251 {
1252 switch (s)
1253 {
1254 case NodeStateInactive:
1255 return "inactive";
1256 case NodeStateElection:
1257 return "election";
1259 return "reorganization";
1260 case NodeStateNormal:
1261 return "normal";
1262 }
1263 return "unknown";
1264 }
1265} // namespace
1266
1267void
1268NodeI::setState(NodeState s)
1269{
1270 if (s != _state)
1271 {
1272 if (_traceLevels->election > 0)
1273 {
1274 Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1275 out << "node " << _id << ": transition from " << stateToString(_state) << " to "
1276 << stateToString(s);
1277 }
1278 _state = s;
1279 if (_state == NodeStateNormal)
1280 {
1281 notifyAll();
1282 }
1283 }
1284}
void merge(const std::set< int > &)
Definition NodeI.cpp:419
void checkObserverInit(Ice::Long)
Definition NodeI.cpp:1119
Ice::ObjectPrx startUpdate(Ice::Long &, const char *, int)
Definition NodeI.cpp:1135
virtual void accept(int, const std::string &, const Ice::IntSeq &, const Ice::ObjectPrx &, const LogUpdate &, int, const Ice::Current &)
Definition NodeI.cpp:901
virtual bool areYouThere(const std::string &, int, const Ice::Current &) const
Definition NodeI.cpp:979
virtual void invitation(int, const std::string &, const Ice::Current &)
Definition NodeI.cpp:724
bool updateMaster(const char *, int)
Definition NodeI.cpp:1164
NodeI(const IceStorm::InstancePtr &, const ReplicaPtr &, const Ice::ObjectPrx &, int, const std::map< int, NodePrx > &)
Definition NodeI.cpp:169
void recovery(Ice::Long=-1)
Definition NodeI.cpp:1030
virtual NodeInfoSeq nodes(const Ice::Current &) const
Definition NodeI.cpp:992
virtual void ready(int, const std::string &, const Ice::ObjectPrx &, int, Ice::Long, const Ice::Current &)
Definition NodeI.cpp:853
void startObserverUpdate(Ice::Long, const char *, int)
Definition NodeI.cpp:1212
Ice::ObjectPrx startCachedRead(Ice::Long &, const char *, int)
Definition NodeI.cpp:1195
Thrown if an observer detects an inconsistency.
Definition Election.ice:37
T max(T t1, T t2)
Definition gdiam.h:51
idempotent Object * sync()
Get the sync object for the replica hosted by this node.
idempotent QueryInfo query()
Get the query information for the given node.
idempotent NodeInfoSeq nodes()
Get the replication group information.
idempotent bool areYouCoordinator()
Determine if this node is a coordinator.
#define q
NodeState
The node state.
Definition Election.ice:138
@ NodeStateElection
The node is electing a leader.
Definition Election.ice:142
@ NodeStateNormal
The replica group is active & replicating.
Definition Election.ice:146
@ NodeStateInactive
The node is inactive and awaiting an election.
Definition Election.ice:140
@ NodeStateReorganization
The replica group is reorganizing.
Definition Election.ice:144
::std::vector<::IceStormElection::NodeInfo > NodeInfoSeq
A sequence of node info.
Definition Election.h:1264
IceUtil::Handle< NodeI > NodeIPtr
Definition Instance.h:36
IceUtil::Handle< Replica > ReplicaPtr
Definition Replica.h:49
IceUtil::Handle< TraceLevels > TraceLevelsPtr
Definition Instance.h:44
IceUtil::Handle< Instance > InstancePtr
Definition Instance.h:128
::IceInternal::Handle<::Ice::Properties > PropertiesPtr
double s(double t, double s0, double v0, double a0, double j)
Definition CtrlUtil.h:33
const char * toString(InteractionFeedbackType type)
Definition Interaction.h:28
LogUpdate llu
The last known log update for this node.
Definition Election.ice:171
int id
The identity of the node.
Definition Election.ice:169
bool operator==(const GroupNodeInfo &rhs) const
Definition NodeI.cpp:112
const Ice::ObjectPrx observer
Definition Replica.h:35
bool operator<(const GroupNodeInfo &rhs) const
Definition NodeI.cpp:106
A struct used for marking the last log update.
Definition LLURecord.h:103
All nodes in the replication group.
Definition Election.ice:154
Node * n
The node proxy.
Definition Election.ice:158
int id
The identity of the node.
Definition Election.ice:156
NodeState state
The node state.
Definition Election.ice:192
Object * replica
The replica the node is managing.
Definition Election.ice:189
string group
The nodes group name.
Definition Election.ice:186
GroupInfoSeq up
The sequence of nodes in this nodes group.
Definition Election.ice:195
int coord
The nodes coordinator.
Definition Election.ice:183
int max
The highest priority node that this node has seen.
Definition Election.ice:198