NodeI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <Ice/Ice.h>
11 #include <IceStorm/NodeI.h>
12 #include <IceStorm/Observers.h>
13 #include <IceStorm/TraceLevels.h>
14 
15 using namespace IceStorm;
16 using namespace IceStormElection;
17 using namespace std;
18 
19 namespace
20 {
21 
22  class CheckTask : public IceUtil::TimerTask
23  {
24  const NodeIPtr _node;
25 
26  public:
27 
28  CheckTask(const NodeIPtr& node) : _node(node) { }
29  virtual void runTimerTask()
30  {
31  _node->check();
32  }
33  };
34 
35  class MergeTask : public IceUtil::TimerTask
36  {
37  const NodeIPtr _node;
38  const set<int> _s;
39 
40  public:
41 
42  MergeTask(const NodeIPtr& node, const set<int>& s) : _node(node), _s(s) { }
43  virtual void runTimerTask()
44  {
45  _node->merge(_s);
46  }
47  };
48 
49  class MergeContinueTask : public IceUtil::TimerTask
50  {
51  const NodeIPtr _node;
52 
53  public:
54 
55  MergeContinueTask(const NodeIPtr& node) : _node(node) { }
56  virtual void runTimerTask()
57  {
58  _node->mergeContinue();
59  }
60  };
61 
62  class TimeoutTask: public IceUtil::TimerTask
63  {
64  const NodeIPtr _node;
65 
66  public:
67 
68  TimeoutTask(const NodeIPtr& node) : _node(node) { }
69  virtual void runTimerTask()
70  {
71  _node->timeout();
72  }
73  };
74 
75 }
76 
77 namespace
78 {
79 
80  LogUpdate emptyLU = {0, 0};
81 
82 }
83 
84 GroupNodeInfo::GroupNodeInfo(int i) :
85  id(i), llu(emptyLU)
86 {
87 }
88 
89 GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, const Ice::ObjectPrx& o) :
90  id(i), llu(l), observer(o)
91 {
92 }
93 
94 bool
96 {
97  return id < rhs.id;
98 }
99 
100 bool
102 {
103  return id == rhs.id;
104 }
105 
106 //
107 // COMPILER FIX: Clang using libc++ requires to define operator=
108 //
109 #if defined(__clang__) && defined(_LIBCPP_VERSION)
111 GroupNodeInfo::operator=(const GroupNodeInfo& other)
112 
113 {
114  const_cast<int&>(this->id) = other.id;
115  const_cast<LogUpdate&>(this->llu) = other.llu;
116  const_cast<Ice::ObjectPrx&>(this->observer) = other.observer;
117  return *this;
118 }
119 #endif
120 
121 namespace
122 {
123  static IceUtil::Time
124  getTimeout(const string& key, int def, const Ice::PropertiesPtr& properties, const TraceLevelsPtr& traceLevels)
125  {
126  int t = properties->getPropertyAsIntWithDefault(key, def);
127  if (t < 0)
128  {
129  Ice::Warning out(traceLevels->logger);
130  out << traceLevels->electionCat << ": " << key << " < 0; Adjusted to 1";
131  t = 1;
132  }
133  return IceUtil::Time::seconds(t);
134  }
135 
136  static string
137  toString(const set<int>& s)
138  {
139  ostringstream os;
140  os << "(";
141  for (set<int>::const_iterator p = s.begin(); p != s.end(); ++p)
142  {
143  if (p != s.begin())
144  {
145  os << ",";
146  }
147  os << *p;
148  }
149  os << ")";
150  return os.str();
151  }
152 
153 }
154 
155 NodeI::NodeI(const InstancePtr& instance,
156  const ReplicaPtr& replica,
157  const Ice::ObjectPrx& replicaProxy,
158  int id, const map<int, NodePrx>& nodes) :
159  _timer(instance->timer()),
160  _traceLevels(instance->traceLevels()),
161  _observers(instance->observers()),
162  _replica(replica),
163  _replicaProxy(replicaProxy),
164  _id(id),
165  _nodes(nodes),
166  _state(NodeStateInactive),
167  _updateCounter(0),
168  _max(0),
169  _generation(-1),
170  _destroy(false)
171 {
172  map<int, NodePrx> oneway;
173  for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
174  {
175  oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
176  }
177  const_cast<map<int, NodePrx>& >(_nodesOneway) = oneway;
178 
179  Ice::PropertiesPtr properties = instance->communicator()->getProperties();
180  const_cast<IceUtil::Time&>(_masterTimeout) = getTimeout(
181  instance->serviceName() + ".Election.MasterTimeout", 10, properties, _traceLevels);
182  const_cast<IceUtil::Time&>(_electionTimeout) = getTimeout(
183  instance->serviceName() + ".Election.ElectionTimeout", 10, properties, _traceLevels);
184  const_cast<IceUtil::Time&>(_mergeTimeout) = getTimeout(
185  instance->serviceName() + ".Election.ResponseTimeout", 10, properties, _traceLevels);
186 }
187 
188 void
190 {
191  // As an optimization we want the initial election to occur as
192  // soon as possible.
193  //
194  // However, if we have the node trigger the election immediately
195  // upon startup then we'll have a clash with lower priority nodes
196  // starting an election denying a higher priority node the
197  // opportunity to start the election that results in it becoming
198  // the leader. Of course, things will eventually reach a stable
199  // state but it will take longer.
200  //
201  // As such as we schedule the initial election check inversely
202  // proportional to our priority.
203  //
204  // By setting _checkTask first we stop recovery() from setting it
205  // to the regular election interval.
206  //
207 
208  //
209  // We use this lock to ensure that recovery is called before CheckTask
210  // is scheduled, even if timeout is 0
211  //
212  Lock sync(*this);
213 
214  _checkTask = new CheckTask(this);
215  _timer->schedule(_checkTask, IceUtil::Time::seconds((_nodes.size() - _id) * 2));
216  recovery();
217 }
218 
219 void
221 {
222  {
223  Lock sync(*this);
224  if (_destroy)
225  {
226  return;
227  }
228  assert(!_mergeTask);
229 
230  if (_state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
231  {
232  assert(_checkTask);
233  _timer->schedule(_checkTask, _electionTimeout);
234  return;
235  }
236 
237  // Next get the set of nodes that were detected as unreachable
238  // from the replica and remove them from our slave list.
239  vector<int> dead;
240  _observers->getReapedSlaves(dead);
241  if (!dead.empty())
242  {
243  for (vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
244  {
245  set<GroupNodeInfo>::iterator q = _up.find(GroupNodeInfo(*p));
246  if (q != _up.end())
247  {
248  if (_traceLevels->election > 0)
249  {
250  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
251  out << "node " << _id << ": reaping slave " << *p;
252  }
253  _up.erase(q);
254  }
255  }
256 
257  // If we no longer have the majority of the nodes under our
258  // care then we need to stop our replica.
259  if (_up.size() < _nodes.size() / 2)
260  {
261  if (_traceLevels->election > 0)
262  {
263  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
264  out << "node " << _id << ": stopping replica";
265  }
266  // Clear _checkTask -- recovery() will reset the
267  // timer.
268  assert(_checkTask);
269  _checkTask = 0;
270  recovery();
271  return;
272  }
273  }
274  }
275 
276  // See if other groups exist for possible merge.
277  set<int> tmpset;
278  int max = -1;
279  for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
280  {
281  if (p->first == _id)
282  {
283  continue;
284  }
285  try
286  {
287  if (p->second->areYouCoordinator())
288  {
289  if (p->first > max)
290  {
291  max = p->first;
292  }
293  tmpset.insert(p->first);
294  }
295  }
296  catch (const Ice::Exception& ex)
297  {
298  if (_traceLevels->election > 0)
299  {
300  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
301  out << "node " << _id << ": call on node " << p->first << " failed: " << ex;
302  }
303  }
304  }
305 
306  Lock sync(*this);
307 
308  // If the node state has changed while the mutex has been released
309  // then bail. We don't schedule a re-check since we're either
310  // destroyed in which case we're going to terminate or the end of
311  // the election/reorg will re-schedule the check.
312  if (_destroy || _state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
313  {
314  _checkTask = 0;
315  return;
316  }
317 
318  // If we didn't find any coordinators then we're done. Reschedule
319  // the next check and terminate.
320  if (tmpset.empty())
321  {
322  assert(_checkTask);
323  _timer->schedule(_checkTask, _electionTimeout);
324  return;
325  }
326 
327  // _checkTask == 0 means that the check isn't scheduled.
328  _checkTask = 0;
329 
330  if (_traceLevels->election > 0)
331  {
332  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
333  out << "node " << _id << ": highest priority node count: " << max;
334  }
335 
336  IceUtil::Time delay = IceUtil::Time::seconds(0);
337  if (_id < max)
338  {
339  // Reschedule timer proportial to p-i.
340  delay = _mergeTimeout + _mergeTimeout * (max - _id);
341  if (_traceLevels->election > 0)
342  {
343  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
344  out << "node " << _id << ": scheduling merge in " << delay.toDuration()
345  << " seconds";
346  }
347  }
348 
349  assert(!_mergeTask);
350  _mergeTask = new MergeTask(this, tmpset);
351  _timer->schedule(_mergeTask, delay);
352 }
353 
354 // Called if the node has not heard from the coordinator in some time.
355 void
357 {
358  int myCoord;
359  string myGroup;
360  {
361  Lock sync(*this);
362  // If we're destroyed or we are our own coordinator then we're
363  // done.
364  if (_destroy || _coord == _id)
365  {
366  return;
367  }
368  myCoord = _coord;
369  myGroup = _group;
370  }
371 
372  bool failed = false;
373  try
374  {
375  map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
376  assert(p != _nodes.end());
377  if (!p->second->areYouThere(myGroup, _id))
378  {
379  if (_traceLevels->election > 0)
380  {
381  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
382  out << "node " << _id << ": lost connection to coordinator " << myCoord
383  << ": areYouThere returned false";
384  }
385  failed = true;
386  }
387  }
388  catch (const Ice::Exception& ex)
389  {
390  if (_traceLevels->election > 0)
391  {
392  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
393  out << "node " << _id << ": lost connection to coordinator " << myCoord << ": " << ex;
394  }
395  failed = true;
396  }
397  if (failed)
398  {
399  recovery();
400  }
401 }
402 
403 void
404 NodeI::merge(const set<int>& coordinatorSet)
405 {
406  set<int> invited;
407  string gp;
408  {
409  Lock sync(*this);
410  _mergeTask = 0;
411 
412  // If the node is currently in an election, or reorganizing
413  // then we're done.
414  if (_state == NodeStateElection || _state == NodeStateReorganization)
415  {
416  return;
417  }
418 
419  // This state change prevents this node from accepting
420  // invitations while the merge is executing.
421  setState(NodeStateElection);
422 
423  // No more replica changes are permitted.
424  while (!_destroy && _updateCounter > 0)
425  {
426  wait();
427  }
428  if (_destroy)
429  {
430  return;
431  }
432 
433  ostringstream os;
434  os << _id << ":" << Ice::generateUUID();
435  _group = os.str();
436  gp = _group;
437 
438  _invitesAccepted.clear();
439  _invitesIssued.clear();
440 
441  // Construct a set of node ids to invite. This is the union of
442  // _up and set of coordinators gathered in the check stage.
443  invited = coordinatorSet;
444  for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
445  {
446  invited.insert(p->id);
447  }
448 
449  _coord = _id;
450  _up.clear();
451 
452  if (_traceLevels->election > 0)
453  {
454  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
455  out << "node " << _id << ": inviting " << toString(invited) << " to group " << _group;
456  }
457  }
458 
459  set<int>::iterator p = invited.begin();
460  while (p != invited.end())
461  {
462  try
463  {
464  if (_traceLevels->election > 0)
465  {
466  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
467  out << "node " << _id << ": inviting node " << *p << " to group " << gp;
468  }
469  map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
470  assert(node != _nodesOneway.end());
471  node->second->invitation(_id, gp);
472  ++p;
473  }
474  catch (const Ice::Exception&)
475  {
476  invited.erase(p++);
477  }
478  }
479 
480  // Now we wait for responses to our invitation.
481  {
482  Lock sync(*this);
483  if (_destroy)
484  {
485  return;
486  }
487 
488  // Add each of the invited nodes in the invites issed set.
489  _invitesIssued.insert(invited.begin(), invited.end());
490 
491  if (_traceLevels->election > 0)
492  {
493  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
494  out << "node " << _id << ": invites pending: " << toString(_invitesIssued);
495  }
496 
497  // Schedule the mergeContinueTask.
498  assert(_mergeContinueTask == 0);
499  _mergeContinueTask = new MergeContinueTask(this);
500 
501  // At this point we may have already accepted all of the
502  // invitations, if so then we want to schedule the
503  // mergeContinue immediately.
504  IceUtil::Time timeout = _mergeTimeout;
505  if (_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted)
506  {
507  timeout = IceUtil::Time::seconds(0);
508  }
509  _timer->schedule(_mergeContinueTask, timeout);
510  }
511 }
512 
513 void
515 {
516  string gp;
517  set<GroupNodeInfo> tmpSet;
518  {
519  Lock sync(*this);
520  if (_destroy)
521  {
522  return;
523  }
524 
525  // Copy variables for thread safety.
526  gp = _group;
527  tmpSet = _up;
528 
529  assert(_mergeContinueTask);
530  _mergeContinueTask = 0;
531 
532  // The node is now reorganizing.
533  assert(_state == NodeStateElection);
534  setState(NodeStateReorganization);
535 
536  if (_traceLevels->election > 0)
537  {
538  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
539  out << "node " << _id << ": coordinator for " << (tmpSet.size() + 1) << " nodes (including myself)";
540  }
541 
542  // Now we need to decide whether we can start serving content. If
543  // we're on initial startup then we need all nodes to participate
544  // in the election. If we're running a subsequent election then we
545  // need a majority of the nodes to be active in order to start
546  // running.
547  unsigned int ingroup = static_cast<unsigned int>(tmpSet.size());
548  if ((_max != _nodes.size() && ingroup != _nodes.size() - 1) || ingroup < _nodes.size() / 2)
549  {
550  if (_traceLevels->election > 0)
551  {
552  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
553  out << "node " << _id << ": not enough nodes " << (ingroup + 1) << "/" << _nodes.size()
554  << " for replication to commence";
555  if (_max != _nodes.size())
556  {
557  out << " (require full participation for startup)";
558  }
559  }
560  recovery();
561  return;
562  }
563  }
564 
565  // Find out who has the highest available set of database
566  // updates.
567  int maxid = -1;
568  LogUpdate maxllu = { -1, 0 };
569  for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
570  {
571  if (_traceLevels->election > 0)
572  {
573  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
574  out << "node id=" << p->id << " llu=" << p->llu.generation << "/" << p->llu.iteration;
575  }
576  if (p->llu.generation > maxllu.generation ||
577  (p->llu.generation == maxllu.generation && p->llu.iteration > maxllu.iteration))
578  {
579  maxid = p->id;
580  maxllu = p->llu;
581  }
582  }
583 
584  LogUpdate myLlu = _replica->getLastLogUpdate();
585  if (_traceLevels->election > 0)
586  {
587  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
588  out << "node id=" << _id << " llu=" << myLlu.generation << "/" << myLlu.iteration;
589  }
590 
591  // If its not us then we have to get the latest database data from
592  // the replica with the latest set.
593  //if(maxllu > _replica->getLastLogUpdate())
594  if (maxllu > myLlu)
595  {
596  if (_traceLevels->election > 0)
597  {
598  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
599  out << "node " << _id << ": syncing database state with node " << maxid;
600  }
601  try
602  {
603  map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
604  assert(node != _nodes.end());
605  _replica->sync(node->second->sync());
606  }
607  catch (const Ice::Exception& ex)
608  {
609  if (_traceLevels->election > 0)
610  {
611  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
612  out << "node " << _id << ": syncing database state with node "
613  << maxid << " failed: " << ex;
614  }
615  recovery();
616  return;
617  }
618  }
619  else
620  {
621  if (_traceLevels->election > 0)
622  {
623  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
624  out << "node " << _id << ": I have the latest database state.";
625  }
626  }
627 
628  // At this point we've ensured that we have the latest database
629  // state, as such we can set our _max flag.
630  unsigned int max = static_cast<unsigned int>(tmpSet.size()) + 1;
631  {
632  Lock sync(*this);
633  if (max > _max)
634  {
635  _max = max;
636  }
637  max = _max;
638  }
639 
640  // Prepare the LogUpdate for this generation.
641  maxllu.generation++;
642  maxllu.iteration = 0;
643 
644  try
645  {
646  // Tell the replica that it is now the master with the given
647  // set of slaves and llu generation.
648  _replica->initMaster(tmpSet, maxllu);
649  }
650  catch (const Ice::Exception& ex)
651  {
652  if (_traceLevels->election > 0)
653  {
654  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
655  out << "node " << _id << ": initMaster failed: " << ex;
656  }
657  recovery();
658  return;
659  }
660 
661  // Tell each node to go.
662  for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
663  {
664  try
665  {
666  map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
667  assert(node != _nodes.end());
668  node->second->ready(_id, gp, _replicaProxy, max, maxllu.generation);
669  }
670  catch (const Ice::Exception& ex)
671  {
672  if (_traceLevels->election > 0)
673  {
674  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
675  out << "node " << _id << ": error calling ready on " << p->id << " ex: " << ex;
676  }
677  recovery();
678  return;
679  }
680  }
681 
682  {
683  Lock sync(*this);
684  if (_destroy)
685  {
686  return;
687  }
688  if (_traceLevels->election > 0)
689  {
690  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
691  out << "node " << _id << ": reporting for duty in group " << _group << " as coordinator. ";
692  out << "replication commencing with " << _up.size() + 1 << "/" << _nodes.size()
693  << " nodes with llu generation: " << maxllu.generation;
694  }
695  setState(NodeStateNormal);
696  _coordinatorProxy = 0;
697 
698  _generation = maxllu.generation;
699 
700  assert(!_checkTask);
701  _checkTask = new CheckTask(this);
702  _timer->schedule(_checkTask, _electionTimeout);
703  }
704 }
705 
706 void
707 NodeI::invitation(int j, const string& gn, const Ice::Current&)
708 {
709  if (_traceLevels->election > 0)
710  {
711  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
712  out << "node " << _id << ": invitation from " << j << " to group " << gn;
713  }
714 
715  // Verify that j exists in our node set.
716  if (_nodes.find(j) == _nodes.end())
717  {
718  Ice::Warning warn(_traceLevels->logger);
719  warn << _traceLevels->electionCat << ": ignoring invitation from unknown node " << j;
720  return;
721  }
722 
723  int tmpCoord = -1;
724  int max = -1;
725  set<GroupNodeInfo> tmpSet;
726  {
727  Lock sync(*this);
728  if (_destroy)
729  {
730  return;
731  }
732  // If we're in the election or reorg state a merge has already
733  // started, so ignore the invitation.
734  if (_state == NodeStateElection || _state == NodeStateReorganization)
735  {
736  if (_traceLevels->election > 0)
737  {
738  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
739  out << "node " << _id << ": invitation ignored";
740  }
741  return;
742  }
743 
744  //
745  // Upon receipt of an invitation we cancel any pending merge
746  // task.
747  //
748  if (_mergeTask)
749  {
750  // If the timer doesn't cancel it means that the timer has
751  // fired and the merge is currently in-progress in which
752  // case we should reject the invitation.
753  if (!_timer->cancel(_mergeTask))
754  {
755  // The merge task is cleared in the merge. This
756  // ensures two invitations cannot cause a race with
757  // the merge.
758  //_mergeTask = 0;
759  return;
760  }
761  _mergeTask = 0;
762  }
763 
764  // We're now joining with another group. If we are active we
765  // must stop serving as a master or slave.
766  setState(NodeStateElection);
767  while (!_destroy && _updateCounter > 0)
768  {
769  wait();
770  }
771  if (_destroy)
772  {
773  return;
774  }
775 
776  tmpCoord = _coord;
777  tmpSet = _up;
778 
779  _coord = j;
780  _group = gn;
781  max = _max;
782  }
783 
784  Ice::IntSeq forwardedInvites;
785  if (tmpCoord == _id) // Forward invitation to my old members.
786  {
787  for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
788  {
789  try
790  {
791  map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
792  assert(node != _nodesOneway.end());
793  node->second->invitation(j, gn);
794  forwardedInvites.push_back(p->id);
795  }
796  catch (const Ice::Exception&)
797  {
798  }
799  }
800  }
801 
802  // Set the state and timer before calling accept. This ensures
803  // that if ready is called directly after accept is called then
804  // everything is fine. Setting the state *after* calling accept
805  // can cause a race.
806  {
807  Lock sync(*this);
808  if (_destroy)
809  {
810  return;
811  }
812  assert(_state == NodeStateElection);
813  setState(NodeStateReorganization);
814  if (!_timeoutTask)
815  {
816  _timeoutTask = new TimeoutTask(this);
817  _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
818  }
819  }
820 
821  try
822  {
823  map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
824  assert(node != _nodesOneway.end());
825  node->second->accept(_id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(), max);
826  }
827  catch (const Ice::Exception&)
828  {
829  recovery();
830  return;
831  }
832 }
833 
834 void
835 NodeI::ready(int j, const string& gn, const Ice::ObjectPrx& coordinator, int max, Ice::Long generation,
836  const Ice::Current&)
837 {
838  Lock sync(*this);
839  if (!_destroy && _state == NodeStateReorganization && _group == gn)
840  {
841  // The coordinator must be j (this was set in the invitation).
842  if (_coord != j)
843  {
844  Ice::Warning warn(_traceLevels->logger);
845  warn << _traceLevels->electionCat << ": ignoring ready call from replica node " << j
846  << " (real coordinator is " << _coord << ")";
847  return;
848  }
849 
850  // Here we've already validated j in the invite call
851  // (otherwise _group != gn).
852  if (_traceLevels->election > 0)
853  {
854  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
855  out << "node " << _id << ": reporting for duty in group " << gn << " with coordinator " << j;
856  }
857 
858  if (static_cast<unsigned int>(max) > _max)
859  {
860  _max = max;
861  }
862  _generation = generation;
863 
864  // Activate the replica here since the replica is now ready
865  // for duty.
866  setState(NodeStateNormal);
867  _coordinatorProxy = coordinator;
868 
869  if (!_checkTask)
870  {
871  _checkTask = new CheckTask(this);
872  _timer->schedule(_checkTask, _electionTimeout);
873  }
874  }
875 }
876 
877 void
878 NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, const Ice::ObjectPrx& observer,
879  const LogUpdate& llu, int max, const Ice::Current&)
880 {
881  // Verify that j exists in our node set.
882  if (_nodes.find(j) == _nodes.end())
883  {
884  Ice::Warning warn(_traceLevels->logger);
885  warn << _traceLevels->electionCat << ": ignoring accept from unknown node " << j;
886  return;
887  }
888 
889  Lock sync(*this);
890  if (!_destroy && _state == NodeStateElection && _group == gn && _coord == _id)
891  {
892  _up.insert(GroupNodeInfo(j, llu, observer));
893 
894  if (static_cast<unsigned int>(max) > _max)
895  {
896  _max = max;
897  }
898 
899  if (_traceLevels->election > 0)
900  {
901  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
902  out << "node " << _id << ": accept " << j << " forward invites (";
903  for (Ice::IntSeq::const_iterator p = forwardedInvites.begin(); p != forwardedInvites.end(); ++p)
904  {
905  if (p != forwardedInvites.begin())
906  {
907  out << ",";
908  }
909  out << *p;
910  }
911  out << ") with llu "
912  << llu.generation << "/" << llu.iteration << " into group " << gn
913  << " group size " << (_up.size() + 1);
914  }
915 
916  // Add each of the forwarded invites to the list of issued
917  // invitations. This doesn't use set_union since
918  // forwardedInvites may not be sorted.
919  _invitesIssued.insert(forwardedInvites.begin(), forwardedInvites.end());
920  // We've accepted the invitation from node j.
921  _invitesAccepted.insert(j);
922 
923  if (_traceLevels->election > 0)
924  {
925  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
926  out << "node " << _id << ": invites pending: " << toString(_invitesIssued)
927  << " invites accepted: " << toString(_invitesAccepted);
928  }
929 
930  // If invitations have been accepted from all nodes and the
931  // merge task has already been scheduled then reschedule the
932  // merge continue immediately. Otherwise, we let the existing
933  // merge() schedule continue.
934  if ((_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted) &&
935  _mergeContinueTask && _timer->cancel(_mergeContinueTask))
936  {
937  _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
938  }
939  }
940 }
941 
942 bool
943 NodeI::areYouCoordinator(const Ice::Current&) const
944 {
945  Lock sync(*this);
946  return _state != NodeStateElection && _state != NodeStateReorganization && _coord == _id;
947 }
948 
949 bool
950 NodeI::areYouThere(const string& gn, int j, const Ice::Current&) const
951 {
952  Lock sync(*this);
953  return _group == gn && _coord == _id && _up.find(GroupNodeInfo(j)) != _up.end();
954 }
955 
956 Ice::ObjectPrx
957 NodeI::sync(const Ice::Current&) const
958 {
959  return _replica->getSync();
960 }
961 
963 NodeI::nodes(const Ice::Current&) const
964 {
965  NodeInfoSeq seq;
966  for (map<int, NodePrx>::const_iterator q = _nodes.begin(); q != _nodes.end(); ++q)
967  {
968  NodeInfo ni;
969  ni.id = q->first;
970  ni.n = q->second;
971  seq.push_back(ni);
972  }
973 
974  return seq;
975 }
976 
977 QueryInfo
978 NodeI::query(const Ice::Current&) const
979 {
980  Lock sync(*this);
981  QueryInfo info;
982  info.id = _id;
983  info.coord = _coord;
984  info.group = _group;
985  info.replica = _replicaProxy;
986  info.state = _state;
987  info.max = _max;
988 
989  for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
990  {
991  GroupInfo gi;
992  gi.id = p->id;
993  gi.llu = p->llu;
994  info.up.push_back(gi);
995  }
996 
997  return info;
998 }
999 
1000 void
1002 {
1003  Lock sync(*this);
1004 
1005  // Ignore the recovery if the node has already advanced a
1006  // generation.
1007  if (generation != -1 && generation != _generation)
1008  {
1009  return;
1010  }
1011 
1012  setState(NodeStateInactive);
1013  while (!_destroy && _updateCounter > 0)
1014  {
1015  wait();
1016  }
1017  if (_destroy)
1018  {
1019  return;
1020  }
1021 
1022  ostringstream os;
1023  os << _id << ":" << Ice::generateUUID();
1024  _group = os.str();
1025 
1026  _generation = -1;
1027  _coord = _id;
1028  _up.clear();
1029 
1030  if (_traceLevels->election > 0)
1031  {
1032  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1033  out << "node " << _id << ": creating new self-coordinated group " << _group;
1034  }
1035 
1036  // Reset the timer states.
1037  if (_mergeTask)
1038  {
1039  _timer->cancel(_mergeTask);
1040  _mergeTask = 0;
1041  }
1042  if (_timeoutTask)
1043  {
1044  _timer->cancel(_timeoutTask);
1045  _timeoutTask = 0;
1046  }
1047  if (!_checkTask)
1048  {
1049  _checkTask = new CheckTask(this);
1050  _timer->schedule(_checkTask, _electionTimeout);
1051  }
1052 }
1053 
1054 void
1056 {
1057  Lock sync(*this);
1058  assert(!_destroy);
1059 
1060  while (_updateCounter > 0)
1061  {
1062  wait();
1063  }
1064  _destroy = true;
1065  notifyAll();
1066 
1067  // Cancel the timers.
1068  if (_checkTask)
1069  {
1070  _timer->cancel(_checkTask);
1071  _checkTask = 0;
1072  }
1073 
1074  if (_timeoutTask)
1075  {
1076  _timer->cancel(_timeoutTask);
1077  _timeoutTask = 0;
1078  }
1079 
1080  if (_mergeTask)
1081  {
1082  _timer->cancel(_mergeTask);
1083  _mergeTask = 0;
1084  }
1085 }
1086 
1087 // A node should only receive an observer init call if the node is
1088 // reorganizing and its not the coordinator.
1089 void
1091 {
1092  Lock sync(*this);
1093  if (_state != NodeStateReorganization)
1094  {
1095  throw ObserverInconsistencyException("init cannot block when state != NodeStateReorganization");
1096  }
1097  if (_coord == _id)
1098  {
1099  throw ObserverInconsistencyException("init called on coordinator");
1100  }
1101 }
1102 
1103 // Notify the node that we're about to start an update.
1104 Ice::ObjectPrx
1105 NodeI::startUpdate(Ice::Long& generation, const char* file, int line)
1106 {
1107  bool majority = _observers->check();
1108 
1109  Lock sync(*this);
1110 
1111  // If we've actively replicating & lost the majority of our replicas then recover.
1112  if (!_coordinatorProxy && !_destroy && _state == NodeStateNormal && !majority)
1113  {
1114  recovery();
1115  }
1116 
1117  while (!_destroy && _state != NodeStateNormal)
1118  {
1119  wait();
1120  }
1121  if (_destroy)
1122  {
1123  throw Ice::UnknownException(file, line);
1124  }
1125  if (!_coordinatorProxy)
1126  {
1127  ++_updateCounter;
1128  }
1129  generation = _generation;
1130  return _coordinatorProxy;
1131 }
1132 
1133 bool
1134 NodeI::updateMaster(const char* /*file*/, int /*line*/)
1135 {
1136  bool majority = _observers->check();
1137 
1138  Lock sync(*this);
1139 
1140  // If the node is destroyed, or is not a coordinator then we're
1141  // done.
1142  if (_destroy || _coordinatorProxy)
1143  {
1144  return false;
1145  }
1146 
1147  // If we've lost the majority of our replicas then recover.
1148  if (_state == NodeStateNormal && !majority)
1149  {
1150  recovery();
1151  }
1152 
1153  // If we're not replicating then we're done.
1154  if (_state != NodeStateNormal)
1155  {
1156  return false;
1157  }
1158 
1159  // Otherwise adjust the update counter, and return success.
1160  ++_updateCounter;
1161  return true;
1162 }
1163 
1164 Ice::ObjectPrx
1165 NodeI::startCachedRead(Ice::Long& generation, const char* file, int line)
1166 {
1167  Lock sync(*this);
1168  while (!_destroy && _state != NodeStateNormal)
1169  {
1170  wait();
1171  }
1172  if (_destroy)
1173  {
1174  throw Ice::UnknownException(file, line);
1175  }
1176  generation = _generation;
1177  ++_updateCounter;
1178  return _coordinatorProxy;
1179 }
1180 
1181 void
1182 NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line)
1183 {
1184  Lock sync(*this);
1185  if (_destroy)
1186  {
1187  throw Ice::UnknownException(file, line);
1188  }
1189  if (_state != NodeStateNormal)
1190  {
1191  throw ObserverInconsistencyException("update called on inactive node");
1192  }
1193  if (!_coordinatorProxy)
1194  {
1195  throw ObserverInconsistencyException("update called on the master");
1196  }
1197  if (generation != _generation)
1198  {
1199  throw ObserverInconsistencyException("invalid generation");
1200  }
1201  ++_updateCounter;
1202 }
1203 
1204 void
1206 {
1207  Lock sync(*this);
1208  assert(!_destroy);
1209  --_updateCounter;
1210  assert(_updateCounter >= 0);
1211  if (_updateCounter == 0)
1212  {
1213  notifyAll();
1214  }
1215 }
1216 
1217 namespace
1218 {
1219  static string
1220  stateToString(NodeState s)
1221  {
1222  switch (s)
1223  {
1224  case NodeStateInactive:
1225  return "inactive";
1226  case NodeStateElection:
1227  return "election";
1229  return "reorganization";
1230  case NodeStateNormal:
1231  return "normal";
1232  }
1233  return "unknown";
1234  }
1235 }
1236 
1237 void
1238 NodeI::setState(NodeState s)
1239 {
1240  if (s != _state)
1241  {
1242  if (_traceLevels->election > 0)
1243  {
1244  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1245  out << "node " << _id << ": transition from " << stateToString(_state) << " to "
1246  << stateToString(s);
1247  }
1248  _state = s;
1249  if (_state == NodeStateNormal)
1250  {
1251  notifyAll();
1252  }
1253  }
1254 }
IceStormElection::GroupNodeInfo
Definition: Replica.h:20
IceStorm
Definition: DBTypes.ice:22
IceStormElection::GroupNodeInfo::operator<
bool operator<(const GroupNodeInfo &rhs) const
Definition: NodeI.cpp:95
IceStormElection::NodeI::merge
void merge(const std::set< int > &)
Definition: NodeI.cpp:404
IceStormElection::QueryInfo::coord
int coord
The nodes coordinator.
Definition: Election.ice:183
IceStormElection::Node::nodes
idempotent NodeInfoSeq nodes()
Get the replication group information.
IceStormElection::GroupInfo::llu
LogUpdate llu
The last known log update for this node.
Definition: Election.ice:172
IceStormElection::QueryInfo
Definition: Election.ice:177
IceStormElection::QueryInfo::up
GroupInfoSeq up
The sequence of nodes in this nodes group.
Definition: Election.ice:195
IceStormElection::LogUpdate
A struct used for marking the last log update.
Definition: LLURecord.h:100
IceStormElection::Node::areYouCoordinator
idempotent bool areYouCoordinator()
Determine if this node is a coordinator.
IceStormElection::NodeStateElection
@ NodeStateElection
The node is electing a leader.
Definition: Election.ice:144
IceStormElection::NodeI::recovery
void recovery(Ice::Long=-1)
Definition: NodeI.cpp:1001
IceStormElection::NodeI::destroy
void destroy()
Definition: NodeI.cpp:1055
IceStormElection::NodeInfo::n
Node * n
The node proxy.
Definition: Election.ice:160
IceStormElection::QueryInfo::id
int id
The node id.
Definition: Election.ice:180
IceStormElection::NodeI::updateMaster
bool updateMaster(const char *, int)
Definition: NodeI.cpp:1134
IceStormElection::ObserverInconsistencyException
Thrown if an observer detects an inconsistency.
Definition: Election.ice:35
IceInternal::Handle< ::Ice::Properties >
IceStormElection::GroupNodeInfo::operator==
bool operator==(const GroupNodeInfo &rhs) const
Definition: NodeI.cpp:101
IceStormElection::NodeI::start
void start()
Definition: NodeI.cpp:189
IceStormElection::Node::sync
idempotent Object * sync()
Get the sync object for the replica hosted by this node.
IceStormElection::NodeI::startCachedRead
Ice::ObjectPrx startCachedRead(Ice::Long &, const char *, int)
Definition: NodeI.cpp:1165
IceStormElection::NodeStateInactive
@ NodeStateInactive
The node is inactive and awaiting an election.
Definition: Election.ice:142
IceStormElection::LogUpdate::generation
::Ice::Long generation
Definition: LLURecord.h:102
IceStormElection::QueryInfo::max
int max
The highest priority node that this node has seen.
Definition: Election.ice:198
IceStormElection::NodeState
NodeState
The node state.
Definition: Election.ice:139
IceStormElection
Definition: DBTypes.ice:17
IceStormElection::NodeI::timeout
void timeout()
Definition: NodeI.cpp:356
IceStormElection::QueryInfo::group
string group
The nodes group name.
Definition: Election.ice:186
IceStormElection::NodeI::check
void check()
Definition: NodeI.cpp:220
IceStormElection::LogUpdate::iteration
::Ice::Long iteration
Definition: LLURecord.h:103
IceStormElection::NodeI::ready
virtual void ready(int, const std::string &, const Ice::ObjectPrx &, int, Ice::Long, const Ice::Current &)
Definition: NodeI.cpp:835
IceStormElection::GroupInfo
The group info.
Definition: Election.ice:167
IceStormElection::NodeInfo
All nodes in the replication group.
Definition: Election.ice:155
IceStormElection::NodeStateReorganization
@ NodeStateReorganization
The replica group is reorganizing.
Definition: Election.ice:146
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:917
IceStormElection::GroupNodeInfo::id
const int id
Definition: Replica.h:32
max
T max(T t1, T t2)
Definition: gdiam.h:48
IceStormElection::NodeStateNormal
@ NodeStateNormal
The replica group is active & replicating.
Definition: Election.ice:148
q
#define q
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:438
IceStormElection::NodeI::finishUpdate
void finishUpdate()
Definition: NodeI.cpp:1205
IceStormElection::GroupNodeInfo::observer
const Ice::ObjectPrx observer
Definition: Replica.h:34
IceStormElection::NodeInfo::id
int id
The identity of the node.
Definition: Election.ice:158
IceStormElection::NodeI::accept
virtual void accept(int, const std::string &, const Ice::IntSeq &, const Ice::ObjectPrx &, const LogUpdate &, int, const Ice::Current &)
Definition: NodeI.cpp:878
IceStormElection::NodeI::mergeContinue
void mergeContinue()
Definition: NodeI.cpp:514
TraceLevels.h
IceStormElection::GroupNodeInfo::llu
const LogUpdate llu
Definition: Replica.h:33
std
Definition: Application.h:66
IceStormElection::QueryInfo::state
NodeState state
The node state.
Definition: Election.ice:192
IceUtil::Handle< NodeI >
IceStormElection::NodeI::invitation
virtual void invitation(int, const std::string &, const Ice::Current &)
Definition: NodeI.cpp:707
IceStormElection::NodeInfoSeq
::std::vector< ::IceStormElection::NodeInfo > NodeInfoSeq
A sequence of node info.
Definition: Election.h:857
IceStormElection::NodeI::checkObserverInit
void checkObserverInit(Ice::Long)
Definition: NodeI.cpp:1090
armarx::viz::toString
const char * toString(InteractionFeedbackType type)
Definition: Interaction.h:27
IceStormElection::Node::query
idempotent QueryInfo query()
Get the query information for the given node.
Observers.h
IceStormElection::GroupInfo::id
int id
The identity of the node.
Definition: Election.ice:170
IceStormElection::GroupNodeInfo::GroupNodeInfo
GroupNodeInfo(int i)
Definition: NodeI.cpp:84
IceStormElection::QueryInfo::replica
Object * replica
The replica the node is managing.
Definition: Election.ice:189
NodeI.h
IceStormElection::NodeI::startUpdate
Ice::ObjectPrx startUpdate(Ice::Long &, const char *, int)
Definition: NodeI.cpp:1105
IceStormElection::NodeI::areYouThere
virtual bool areYouThere(const std::string &, int, const Ice::Current &) const
Definition: NodeI.cpp:950
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
IceStormElection::NodeI::NodeI
NodeI(const IceStorm::InstancePtr &, const ReplicaPtr &, const Ice::ObjectPrx &, int, const std::map< int, NodePrx > &)
Definition: NodeI.cpp:155
IceStormElection::NodeI::startObserverUpdate
void startObserverUpdate(Ice::Long, const char *, int)
Definition: NodeI.cpp:1182