NodeI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <Ice/Ice.h>
11 #include <IceStorm/NodeI.h>
12 #include <IceStorm/Observers.h>
13 #include <IceStorm/TraceLevels.h>
14 
15 using namespace IceStorm;
16 using namespace IceStormElection;
17 using namespace std;
18 
19 namespace
20 {
21 
22  class CheckTask : public IceUtil::TimerTask
23  {
24  const NodeIPtr _node;
25 
26  public:
27  CheckTask(const NodeIPtr& node) : _node(node)
28  {
29  }
30 
31  virtual void
32  runTimerTask()
33  {
34  _node->check();
35  }
36  };
37 
38  class MergeTask : public IceUtil::TimerTask
39  {
40  const NodeIPtr _node;
41  const set<int> _s;
42 
43  public:
44  MergeTask(const NodeIPtr& node, const set<int>& s) : _node(node), _s(s)
45  {
46  }
47 
48  virtual void
49  runTimerTask()
50  {
51  _node->merge(_s);
52  }
53  };
54 
55  class MergeContinueTask : public IceUtil::TimerTask
56  {
57  const NodeIPtr _node;
58 
59  public:
60  MergeContinueTask(const NodeIPtr& node) : _node(node)
61  {
62  }
63 
64  virtual void
65  runTimerTask()
66  {
67  _node->mergeContinue();
68  }
69  };
70 
71  class TimeoutTask : public IceUtil::TimerTask
72  {
73  const NodeIPtr _node;
74 
75  public:
76  TimeoutTask(const NodeIPtr& node) : _node(node)
77  {
78  }
79 
80  virtual void
81  runTimerTask()
82  {
83  _node->timeout();
84  }
85  };
86 
87 } // namespace
88 
89 namespace
90 {
91 
92  LogUpdate emptyLU = {0, 0};
93 
94 }
95 
96 GroupNodeInfo::GroupNodeInfo(int i) : id(i), llu(emptyLU)
97 {
98 }
99 
100 GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, const Ice::ObjectPrx& o) :
101  id(i), llu(l), observer(o)
102 {
103 }
104 
105 bool
107 {
108  return id < rhs.id;
109 }
110 
111 bool
113 {
114  return id == rhs.id;
115 }
116 
117 //
118 // COMPILER FIX: Clang using libc++ requires to define operator=
119 //
120 #if defined(__clang__) && defined(_LIBCPP_VERSION)
122 GroupNodeInfo::operator=(const GroupNodeInfo& other)
123 
124 {
125  const_cast<int&>(this->id) = other.id;
126  const_cast<LogUpdate&>(this->llu) = other.llu;
127  const_cast<Ice::ObjectPrx&>(this->observer) = other.observer;
128  return *this;
129 }
130 #endif
131 
132 namespace
133 {
134  static IceUtil::Time
135  getTimeout(const string& key,
136  int def,
137  const Ice::PropertiesPtr& properties,
138  const TraceLevelsPtr& traceLevels)
139  {
140  int t = properties->getPropertyAsIntWithDefault(key, def);
141  if (t < 0)
142  {
143  Ice::Warning out(traceLevels->logger);
144  out << traceLevels->electionCat << ": " << key << " < 0; Adjusted to 1";
145  t = 1;
146  }
147  return IceUtil::Time::seconds(t);
148  }
149 
150  static string
151  toString(const set<int>& s)
152  {
153  ostringstream os;
154  os << "(";
155  for (set<int>::const_iterator p = s.begin(); p != s.end(); ++p)
156  {
157  if (p != s.begin())
158  {
159  os << ",";
160  }
161  os << *p;
162  }
163  os << ")";
164  return os.str();
165  }
166 
167 } // namespace
168 
169 NodeI::NodeI(const InstancePtr& instance,
170  const ReplicaPtr& replica,
171  const Ice::ObjectPrx& replicaProxy,
172  int id,
173  const map<int, NodePrx>& nodes) :
174  _timer(instance->timer()),
175  _traceLevels(instance->traceLevels()),
176  _observers(instance->observers()),
177  _replica(replica),
178  _replicaProxy(replicaProxy),
179  _id(id),
180  _nodes(nodes),
181  _state(NodeStateInactive),
182  _updateCounter(0),
183  _max(0),
184  _generation(-1),
185  _destroy(false)
186 {
187  map<int, NodePrx> oneway;
188  for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
189  {
190  oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
191  }
192  const_cast<map<int, NodePrx>&>(_nodesOneway) = oneway;
193 
194  Ice::PropertiesPtr properties = instance->communicator()->getProperties();
195  const_cast<IceUtil::Time&>(_masterTimeout) = getTimeout(
196  instance->serviceName() + ".Election.MasterTimeout", 10, properties, _traceLevels);
197  const_cast<IceUtil::Time&>(_electionTimeout) = getTimeout(
198  instance->serviceName() + ".Election.ElectionTimeout", 10, properties, _traceLevels);
199  const_cast<IceUtil::Time&>(_mergeTimeout) = getTimeout(
200  instance->serviceName() + ".Election.ResponseTimeout", 10, properties, _traceLevels);
201 }
202 
203 void
205 {
206  // As an optimization we want the initial election to occur as
207  // soon as possible.
208  //
209  // However, if we have the node trigger the election immediately
210  // upon startup then we'll have a clash with lower priority nodes
211  // starting an election denying a higher priority node the
212  // opportunity to start the election that results in it becoming
213  // the leader. Of course, things will eventually reach a stable
214  // state but it will take longer.
215  //
216  // As such as we schedule the initial election check inversely
217  // proportional to our priority.
218  //
219  // By setting _checkTask first we stop recovery() from setting it
220  // to the regular election interval.
221  //
222 
223  //
224  // We use this lock to ensure that recovery is called before CheckTask
225  // is scheduled, even if timeout is 0
226  //
227  Lock sync(*this);
228 
229  _checkTask = new CheckTask(this);
230  _timer->schedule(_checkTask, IceUtil::Time::seconds((_nodes.size() - _id) * 2));
231  recovery();
232 }
233 
234 void
236 {
237  {
238  Lock sync(*this);
239  if (_destroy)
240  {
241  return;
242  }
243  assert(!_mergeTask);
244 
245  if (_state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
246  {
247  assert(_checkTask);
248  _timer->schedule(_checkTask, _electionTimeout);
249  return;
250  }
251 
252  // Next get the set of nodes that were detected as unreachable
253  // from the replica and remove them from our slave list.
254  vector<int> dead;
255  _observers->getReapedSlaves(dead);
256  if (!dead.empty())
257  {
258  for (vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
259  {
260  set<GroupNodeInfo>::iterator q = _up.find(GroupNodeInfo(*p));
261  if (q != _up.end())
262  {
263  if (_traceLevels->election > 0)
264  {
265  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
266  out << "node " << _id << ": reaping slave " << *p;
267  }
268  _up.erase(q);
269  }
270  }
271 
272  // If we no longer have the majority of the nodes under our
273  // care then we need to stop our replica.
274  if (_up.size() < _nodes.size() / 2)
275  {
276  if (_traceLevels->election > 0)
277  {
278  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
279  out << "node " << _id << ": stopping replica";
280  }
281  // Clear _checkTask -- recovery() will reset the
282  // timer.
283  assert(_checkTask);
284  _checkTask = 0;
285  recovery();
286  return;
287  }
288  }
289  }
290 
291  // See if other groups exist for possible merge.
292  set<int> tmpset;
293  int max = -1;
294  for (map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
295  {
296  if (p->first == _id)
297  {
298  continue;
299  }
300  try
301  {
302  if (p->second->areYouCoordinator())
303  {
304  if (p->first > max)
305  {
306  max = p->first;
307  }
308  tmpset.insert(p->first);
309  }
310  }
311  catch (const Ice::Exception& ex)
312  {
313  if (_traceLevels->election > 0)
314  {
315  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
316  out << "node " << _id << ": call on node " << p->first << " failed: " << ex;
317  }
318  }
319  }
320 
321  Lock sync(*this);
322 
323  // If the node state has changed while the mutex has been released
324  // then bail. We don't schedule a re-check since we're either
325  // destroyed in which case we're going to terminate or the end of
326  // the election/reorg will re-schedule the check.
327  if (_destroy || _state == NodeStateElection || _state == NodeStateReorganization ||
328  _coord != _id)
329  {
330  _checkTask = 0;
331  return;
332  }
333 
334  // If we didn't find any coordinators then we're done. Reschedule
335  // the next check and terminate.
336  if (tmpset.empty())
337  {
338  assert(_checkTask);
339  _timer->schedule(_checkTask, _electionTimeout);
340  return;
341  }
342 
343  // _checkTask == 0 means that the check isn't scheduled.
344  _checkTask = 0;
345 
346  if (_traceLevels->election > 0)
347  {
348  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
349  out << "node " << _id << ": highest priority node count: " << max;
350  }
351 
352  IceUtil::Time delay = IceUtil::Time::seconds(0);
353  if (_id < max)
354  {
355  // Reschedule timer proportial to p-i.
356  delay = _mergeTimeout + _mergeTimeout * (max - _id);
357  if (_traceLevels->election > 0)
358  {
359  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
360  out << "node " << _id << ": scheduling merge in " << delay.toDuration() << " seconds";
361  }
362  }
363 
364  assert(!_mergeTask);
365  _mergeTask = new MergeTask(this, tmpset);
366  _timer->schedule(_mergeTask, delay);
367 }
368 
369 // Called if the node has not heard from the coordinator in some time.
370 void
372 {
373  int myCoord;
374  string myGroup;
375  {
376  Lock sync(*this);
377  // If we're destroyed or we are our own coordinator then we're
378  // done.
379  if (_destroy || _coord == _id)
380  {
381  return;
382  }
383  myCoord = _coord;
384  myGroup = _group;
385  }
386 
387  bool failed = false;
388  try
389  {
390  map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
391  assert(p != _nodes.end());
392  if (!p->second->areYouThere(myGroup, _id))
393  {
394  if (_traceLevels->election > 0)
395  {
396  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
397  out << "node " << _id << ": lost connection to coordinator " << myCoord
398  << ": areYouThere returned false";
399  }
400  failed = true;
401  }
402  }
403  catch (const Ice::Exception& ex)
404  {
405  if (_traceLevels->election > 0)
406  {
407  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
408  out << "node " << _id << ": lost connection to coordinator " << myCoord << ": " << ex;
409  }
410  failed = true;
411  }
412  if (failed)
413  {
414  recovery();
415  }
416 }
417 
418 void
419 NodeI::merge(const set<int>& coordinatorSet)
420 {
421  set<int> invited;
422  string gp;
423  {
424  Lock sync(*this);
425  _mergeTask = 0;
426 
427  // If the node is currently in an election, or reorganizing
428  // then we're done.
429  if (_state == NodeStateElection || _state == NodeStateReorganization)
430  {
431  return;
432  }
433 
434  // This state change prevents this node from accepting
435  // invitations while the merge is executing.
436  setState(NodeStateElection);
437 
438  // No more replica changes are permitted.
439  while (!_destroy && _updateCounter > 0)
440  {
441  wait();
442  }
443  if (_destroy)
444  {
445  return;
446  }
447 
448  ostringstream os;
449  os << _id << ":" << Ice::generateUUID();
450  _group = os.str();
451  gp = _group;
452 
453  _invitesAccepted.clear();
454  _invitesIssued.clear();
455 
456  // Construct a set of node ids to invite. This is the union of
457  // _up and set of coordinators gathered in the check stage.
458  invited = coordinatorSet;
459  for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
460  {
461  invited.insert(p->id);
462  }
463 
464  _coord = _id;
465  _up.clear();
466 
467  if (_traceLevels->election > 0)
468  {
469  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
470  out << "node " << _id << ": inviting " << toString(invited) << " to group " << _group;
471  }
472  }
473 
474  set<int>::iterator p = invited.begin();
475  while (p != invited.end())
476  {
477  try
478  {
479  if (_traceLevels->election > 0)
480  {
481  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
482  out << "node " << _id << ": inviting node " << *p << " to group " << gp;
483  }
484  map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
485  assert(node != _nodesOneway.end());
486  node->second->invitation(_id, gp);
487  ++p;
488  }
489  catch (const Ice::Exception&)
490  {
491  invited.erase(p++);
492  }
493  }
494 
495  // Now we wait for responses to our invitation.
496  {
497  Lock sync(*this);
498  if (_destroy)
499  {
500  return;
501  }
502 
503  // Add each of the invited nodes in the invites issed set.
504  _invitesIssued.insert(invited.begin(), invited.end());
505 
506  if (_traceLevels->election > 0)
507  {
508  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
509  out << "node " << _id << ": invites pending: " << toString(_invitesIssued);
510  }
511 
512  // Schedule the mergeContinueTask.
513  assert(_mergeContinueTask == 0);
514  _mergeContinueTask = new MergeContinueTask(this);
515 
516  // At this point we may have already accepted all of the
517  // invitations, if so then we want to schedule the
518  // mergeContinue immediately.
519  IceUtil::Time timeout = _mergeTimeout;
520  if (_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted)
521  {
522  timeout = IceUtil::Time::seconds(0);
523  }
524  _timer->schedule(_mergeContinueTask, timeout);
525  }
526 }
527 
528 void
530 {
531  string gp;
532  set<GroupNodeInfo> tmpSet;
533  {
534  Lock sync(*this);
535  if (_destroy)
536  {
537  return;
538  }
539 
540  // Copy variables for thread safety.
541  gp = _group;
542  tmpSet = _up;
543 
544  assert(_mergeContinueTask);
545  _mergeContinueTask = 0;
546 
547  // The node is now reorganizing.
548  assert(_state == NodeStateElection);
549  setState(NodeStateReorganization);
550 
551  if (_traceLevels->election > 0)
552  {
553  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
554  out << "node " << _id << ": coordinator for " << (tmpSet.size() + 1)
555  << " nodes (including myself)";
556  }
557 
558  // Now we need to decide whether we can start serving content. If
559  // we're on initial startup then we need all nodes to participate
560  // in the election. If we're running a subsequent election then we
561  // need a majority of the nodes to be active in order to start
562  // running.
563  unsigned int ingroup = static_cast<unsigned int>(tmpSet.size());
564  if ((_max != _nodes.size() && ingroup != _nodes.size() - 1) || ingroup < _nodes.size() / 2)
565  {
566  if (_traceLevels->election > 0)
567  {
568  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
569  out << "node " << _id << ": not enough nodes " << (ingroup + 1) << "/"
570  << _nodes.size() << " for replication to commence";
571  if (_max != _nodes.size())
572  {
573  out << " (require full participation for startup)";
574  }
575  }
576  recovery();
577  return;
578  }
579  }
580 
581  // Find out who has the highest available set of database
582  // updates.
583  int maxid = -1;
584  LogUpdate maxllu = {-1, 0};
585  for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
586  {
587  if (_traceLevels->election > 0)
588  {
589  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
590  out << "node id=" << p->id << " llu=" << p->llu.generation << "/" << p->llu.iteration;
591  }
592  if (p->llu.generation > maxllu.generation ||
593  (p->llu.generation == maxllu.generation && p->llu.iteration > maxllu.iteration))
594  {
595  maxid = p->id;
596  maxllu = p->llu;
597  }
598  }
599 
600  LogUpdate myLlu = _replica->getLastLogUpdate();
601  if (_traceLevels->election > 0)
602  {
603  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
604  out << "node id=" << _id << " llu=" << myLlu.generation << "/" << myLlu.iteration;
605  }
606 
607  // If its not us then we have to get the latest database data from
608  // the replica with the latest set.
609  //if(maxllu > _replica->getLastLogUpdate())
610  if (maxllu > myLlu)
611  {
612  if (_traceLevels->election > 0)
613  {
614  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
615  out << "node " << _id << ": syncing database state with node " << maxid;
616  }
617  try
618  {
619  map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
620  assert(node != _nodes.end());
621  _replica->sync(node->second->sync());
622  }
623  catch (const Ice::Exception& ex)
624  {
625  if (_traceLevels->election > 0)
626  {
627  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
628  out << "node " << _id << ": syncing database state with node " << maxid
629  << " failed: " << ex;
630  }
631  recovery();
632  return;
633  }
634  }
635  else
636  {
637  if (_traceLevels->election > 0)
638  {
639  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
640  out << "node " << _id << ": I have the latest database state.";
641  }
642  }
643 
644  // At this point we've ensured that we have the latest database
645  // state, as such we can set our _max flag.
646  unsigned int max = static_cast<unsigned int>(tmpSet.size()) + 1;
647  {
648  Lock sync(*this);
649  if (max > _max)
650  {
651  _max = max;
652  }
653  max = _max;
654  }
655 
656  // Prepare the LogUpdate for this generation.
657  maxllu.generation++;
658  maxllu.iteration = 0;
659 
660  try
661  {
662  // Tell the replica that it is now the master with the given
663  // set of slaves and llu generation.
664  _replica->initMaster(tmpSet, maxllu);
665  }
666  catch (const Ice::Exception& ex)
667  {
668  if (_traceLevels->election > 0)
669  {
670  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
671  out << "node " << _id << ": initMaster failed: " << ex;
672  }
673  recovery();
674  return;
675  }
676 
677  // Tell each node to go.
678  for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
679  {
680  try
681  {
682  map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
683  assert(node != _nodes.end());
684  node->second->ready(_id, gp, _replicaProxy, max, maxllu.generation);
685  }
686  catch (const Ice::Exception& ex)
687  {
688  if (_traceLevels->election > 0)
689  {
690  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
691  out << "node " << _id << ": error calling ready on " << p->id << " ex: " << ex;
692  }
693  recovery();
694  return;
695  }
696  }
697 
698  {
699  Lock sync(*this);
700  if (_destroy)
701  {
702  return;
703  }
704  if (_traceLevels->election > 0)
705  {
706  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
707  out << "node " << _id << ": reporting for duty in group " << _group
708  << " as coordinator. ";
709  out << "replication commencing with " << _up.size() + 1 << "/" << _nodes.size()
710  << " nodes with llu generation: " << maxllu.generation;
711  }
712  setState(NodeStateNormal);
713  _coordinatorProxy = 0;
714 
715  _generation = maxllu.generation;
716 
717  assert(!_checkTask);
718  _checkTask = new CheckTask(this);
719  _timer->schedule(_checkTask, _electionTimeout);
720  }
721 }
722 
723 void
724 NodeI::invitation(int j, const string& gn, const Ice::Current&)
725 {
726  if (_traceLevels->election > 0)
727  {
728  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
729  out << "node " << _id << ": invitation from " << j << " to group " << gn;
730  }
731 
732  // Verify that j exists in our node set.
733  if (_nodes.find(j) == _nodes.end())
734  {
735  Ice::Warning warn(_traceLevels->logger);
736  warn << _traceLevels->electionCat << ": ignoring invitation from unknown node " << j;
737  return;
738  }
739 
740  int tmpCoord = -1;
741  int max = -1;
742  set<GroupNodeInfo> tmpSet;
743  {
744  Lock sync(*this);
745  if (_destroy)
746  {
747  return;
748  }
749  // If we're in the election or reorg state a merge has already
750  // started, so ignore the invitation.
751  if (_state == NodeStateElection || _state == NodeStateReorganization)
752  {
753  if (_traceLevels->election > 0)
754  {
755  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
756  out << "node " << _id << ": invitation ignored";
757  }
758  return;
759  }
760 
761  //
762  // Upon receipt of an invitation we cancel any pending merge
763  // task.
764  //
765  if (_mergeTask)
766  {
767  // If the timer doesn't cancel it means that the timer has
768  // fired and the merge is currently in-progress in which
769  // case we should reject the invitation.
770  if (!_timer->cancel(_mergeTask))
771  {
772  // The merge task is cleared in the merge. This
773  // ensures two invitations cannot cause a race with
774  // the merge.
775  //_mergeTask = 0;
776  return;
777  }
778  _mergeTask = 0;
779  }
780 
781  // We're now joining with another group. If we are active we
782  // must stop serving as a master or slave.
783  setState(NodeStateElection);
784  while (!_destroy && _updateCounter > 0)
785  {
786  wait();
787  }
788  if (_destroy)
789  {
790  return;
791  }
792 
793  tmpCoord = _coord;
794  tmpSet = _up;
795 
796  _coord = j;
797  _group = gn;
798  max = _max;
799  }
800 
801  Ice::IntSeq forwardedInvites;
802  if (tmpCoord == _id) // Forward invitation to my old members.
803  {
804  for (set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
805  {
806  try
807  {
808  map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
809  assert(node != _nodesOneway.end());
810  node->second->invitation(j, gn);
811  forwardedInvites.push_back(p->id);
812  }
813  catch (const Ice::Exception&)
814  {
815  }
816  }
817  }
818 
819  // Set the state and timer before calling accept. This ensures
820  // that if ready is called directly after accept is called then
821  // everything is fine. Setting the state *after* calling accept
822  // can cause a race.
823  {
824  Lock sync(*this);
825  if (_destroy)
826  {
827  return;
828  }
829  assert(_state == NodeStateElection);
830  setState(NodeStateReorganization);
831  if (!_timeoutTask)
832  {
833  _timeoutTask = new TimeoutTask(this);
834  _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
835  }
836  }
837 
838  try
839  {
840  map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
841  assert(node != _nodesOneway.end());
842  node->second->accept(
843  _id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(), max);
844  }
845  catch (const Ice::Exception&)
846  {
847  recovery();
848  return;
849  }
850 }
851 
852 void
854  const string& gn,
855  const Ice::ObjectPrx& coordinator,
856  int max,
857  Ice::Long generation,
858  const Ice::Current&)
859 {
860  Lock sync(*this);
861  if (!_destroy && _state == NodeStateReorganization && _group == gn)
862  {
863  // The coordinator must be j (this was set in the invitation).
864  if (_coord != j)
865  {
866  Ice::Warning warn(_traceLevels->logger);
867  warn << _traceLevels->electionCat << ": ignoring ready call from replica node " << j
868  << " (real coordinator is " << _coord << ")";
869  return;
870  }
871 
872  // Here we've already validated j in the invite call
873  // (otherwise _group != gn).
874  if (_traceLevels->election > 0)
875  {
876  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
877  out << "node " << _id << ": reporting for duty in group " << gn << " with coordinator "
878  << j;
879  }
880 
881  if (static_cast<unsigned int>(max) > _max)
882  {
883  _max = max;
884  }
885  _generation = generation;
886 
887  // Activate the replica here since the replica is now ready
888  // for duty.
889  setState(NodeStateNormal);
890  _coordinatorProxy = coordinator;
891 
892  if (!_checkTask)
893  {
894  _checkTask = new CheckTask(this);
895  _timer->schedule(_checkTask, _electionTimeout);
896  }
897  }
898 }
899 
900 void
902  const string& gn,
903  const Ice::IntSeq& forwardedInvites,
904  const Ice::ObjectPrx& observer,
905  const LogUpdate& llu,
906  int max,
907  const Ice::Current&)
908 {
909  // Verify that j exists in our node set.
910  if (_nodes.find(j) == _nodes.end())
911  {
912  Ice::Warning warn(_traceLevels->logger);
913  warn << _traceLevels->electionCat << ": ignoring accept from unknown node " << j;
914  return;
915  }
916 
917  Lock sync(*this);
918  if (!_destroy && _state == NodeStateElection && _group == gn && _coord == _id)
919  {
920  _up.insert(GroupNodeInfo(j, llu, observer));
921 
922  if (static_cast<unsigned int>(max) > _max)
923  {
924  _max = max;
925  }
926 
927  if (_traceLevels->election > 0)
928  {
929  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
930  out << "node " << _id << ": accept " << j << " forward invites (";
931  for (Ice::IntSeq::const_iterator p = forwardedInvites.begin();
932  p != forwardedInvites.end();
933  ++p)
934  {
935  if (p != forwardedInvites.begin())
936  {
937  out << ",";
938  }
939  out << *p;
940  }
941  out << ") with llu " << llu.generation << "/" << llu.iteration << " into group " << gn
942  << " group size " << (_up.size() + 1);
943  }
944 
945  // Add each of the forwarded invites to the list of issued
946  // invitations. This doesn't use set_union since
947  // forwardedInvites may not be sorted.
948  _invitesIssued.insert(forwardedInvites.begin(), forwardedInvites.end());
949  // We've accepted the invitation from node j.
950  _invitesAccepted.insert(j);
951 
952  if (_traceLevels->election > 0)
953  {
954  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
955  out << "node " << _id << ": invites pending: " << toString(_invitesIssued)
956  << " invites accepted: " << toString(_invitesAccepted);
957  }
958 
959  // If invitations have been accepted from all nodes and the
960  // merge task has already been scheduled then reschedule the
961  // merge continue immediately. Otherwise, we let the existing
962  // merge() schedule continue.
963  if ((_up.size() == _nodes.size() - 1 || _invitesIssued == _invitesAccepted) &&
964  _mergeContinueTask && _timer->cancel(_mergeContinueTask))
965  {
966  _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
967  }
968  }
969 }
970 
971 bool
972 NodeI::areYouCoordinator(const Ice::Current&) const
973 {
974  Lock sync(*this);
975  return _state != NodeStateElection && _state != NodeStateReorganization && _coord == _id;
976 }
977 
978 bool
979 NodeI::areYouThere(const string& gn, int j, const Ice::Current&) const
980 {
981  Lock sync(*this);
982  return _group == gn && _coord == _id && _up.find(GroupNodeInfo(j)) != _up.end();
983 }
984 
985 Ice::ObjectPrx
986 NodeI::sync(const Ice::Current&) const
987 {
988  return _replica->getSync();
989 }
990 
992 NodeI::nodes(const Ice::Current&) const
993 {
994  NodeInfoSeq seq;
995  for (map<int, NodePrx>::const_iterator q = _nodes.begin(); q != _nodes.end(); ++q)
996  {
997  NodeInfo ni;
998  ni.id = q->first;
999  ni.n = q->second;
1000  seq.push_back(ni);
1001  }
1002 
1003  return seq;
1004 }
1005 
1006 QueryInfo
1007 NodeI::query(const Ice::Current&) const
1008 {
1009  Lock sync(*this);
1010  QueryInfo info;
1011  info.id = _id;
1012  info.coord = _coord;
1013  info.group = _group;
1014  info.replica = _replicaProxy;
1015  info.state = _state;
1016  info.max = _max;
1017 
1018  for (set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
1019  {
1020  GroupInfo gi;
1021  gi.id = p->id;
1022  gi.llu = p->llu;
1023  info.up.push_back(gi);
1024  }
1025 
1026  return info;
1027 }
1028 
1029 void
1031 {
1032  Lock sync(*this);
1033 
1034  // Ignore the recovery if the node has already advanced a
1035  // generation.
1036  if (generation != -1 && generation != _generation)
1037  {
1038  return;
1039  }
1040 
1041  setState(NodeStateInactive);
1042  while (!_destroy && _updateCounter > 0)
1043  {
1044  wait();
1045  }
1046  if (_destroy)
1047  {
1048  return;
1049  }
1050 
1051  ostringstream os;
1052  os << _id << ":" << Ice::generateUUID();
1053  _group = os.str();
1054 
1055  _generation = -1;
1056  _coord = _id;
1057  _up.clear();
1058 
1059  if (_traceLevels->election > 0)
1060  {
1061  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1062  out << "node " << _id << ": creating new self-coordinated group " << _group;
1063  }
1064 
1065  // Reset the timer states.
1066  if (_mergeTask)
1067  {
1068  _timer->cancel(_mergeTask);
1069  _mergeTask = 0;
1070  }
1071  if (_timeoutTask)
1072  {
1073  _timer->cancel(_timeoutTask);
1074  _timeoutTask = 0;
1075  }
1076  if (!_checkTask)
1077  {
1078  _checkTask = new CheckTask(this);
1079  _timer->schedule(_checkTask, _electionTimeout);
1080  }
1081 }
1082 
1083 void
1085 {
1086  Lock sync(*this);
1087  assert(!_destroy);
1088 
1089  while (_updateCounter > 0)
1090  {
1091  wait();
1092  }
1093  _destroy = true;
1094  notifyAll();
1095 
1096  // Cancel the timers.
1097  if (_checkTask)
1098  {
1099  _timer->cancel(_checkTask);
1100  _checkTask = 0;
1101  }
1102 
1103  if (_timeoutTask)
1104  {
1105  _timer->cancel(_timeoutTask);
1106  _timeoutTask = 0;
1107  }
1108 
1109  if (_mergeTask)
1110  {
1111  _timer->cancel(_mergeTask);
1112  _mergeTask = 0;
1113  }
1114 }
1115 
1116 // A node should only receive an observer init call if the node is
1117 // reorganizing and its not the coordinator.
1118 void
1120 {
1121  Lock sync(*this);
1122  if (_state != NodeStateReorganization)
1123  {
1125  "init cannot block when state != NodeStateReorganization");
1126  }
1127  if (_coord == _id)
1128  {
1129  throw ObserverInconsistencyException("init called on coordinator");
1130  }
1131 }
1132 
1133 // Notify the node that we're about to start an update.
1134 Ice::ObjectPrx
1135 NodeI::startUpdate(Ice::Long& generation, const char* file, int line)
1136 {
1137  bool majority = _observers->check();
1138 
1139  Lock sync(*this);
1140 
1141  // If we've actively replicating & lost the majority of our replicas then recover.
1142  if (!_coordinatorProxy && !_destroy && _state == NodeStateNormal && !majority)
1143  {
1144  recovery();
1145  }
1146 
1147  while (!_destroy && _state != NodeStateNormal)
1148  {
1149  wait();
1150  }
1151  if (_destroy)
1152  {
1153  throw Ice::UnknownException(file, line);
1154  }
1155  if (!_coordinatorProxy)
1156  {
1157  ++_updateCounter;
1158  }
1159  generation = _generation;
1160  return _coordinatorProxy;
1161 }
1162 
1163 bool
1164 NodeI::updateMaster(const char* /*file*/, int /*line*/)
1165 {
1166  bool majority = _observers->check();
1167 
1168  Lock sync(*this);
1169 
1170  // If the node is destroyed, or is not a coordinator then we're
1171  // done.
1172  if (_destroy || _coordinatorProxy)
1173  {
1174  return false;
1175  }
1176 
1177  // If we've lost the majority of our replicas then recover.
1178  if (_state == NodeStateNormal && !majority)
1179  {
1180  recovery();
1181  }
1182 
1183  // If we're not replicating then we're done.
1184  if (_state != NodeStateNormal)
1185  {
1186  return false;
1187  }
1188 
1189  // Otherwise adjust the update counter, and return success.
1190  ++_updateCounter;
1191  return true;
1192 }
1193 
1194 Ice::ObjectPrx
1195 NodeI::startCachedRead(Ice::Long& generation, const char* file, int line)
1196 {
1197  Lock sync(*this);
1198  while (!_destroy && _state != NodeStateNormal)
1199  {
1200  wait();
1201  }
1202  if (_destroy)
1203  {
1204  throw Ice::UnknownException(file, line);
1205  }
1206  generation = _generation;
1207  ++_updateCounter;
1208  return _coordinatorProxy;
1209 }
1210 
1211 void
1212 NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line)
1213 {
1214  Lock sync(*this);
1215  if (_destroy)
1216  {
1217  throw Ice::UnknownException(file, line);
1218  }
1219  if (_state != NodeStateNormal)
1220  {
1221  throw ObserverInconsistencyException("update called on inactive node");
1222  }
1223  if (!_coordinatorProxy)
1224  {
1225  throw ObserverInconsistencyException("update called on the master");
1226  }
1227  if (generation != _generation)
1228  {
1229  throw ObserverInconsistencyException("invalid generation");
1230  }
1231  ++_updateCounter;
1232 }
1233 
1234 void
1236 {
1237  Lock sync(*this);
1238  assert(!_destroy);
1239  --_updateCounter;
1240  assert(_updateCounter >= 0);
1241  if (_updateCounter == 0)
1242  {
1243  notifyAll();
1244  }
1245 }
1246 
1247 namespace
1248 {
1249  static string
1250  stateToString(NodeState s)
1251  {
1252  switch (s)
1253  {
1254  case NodeStateInactive:
1255  return "inactive";
1256  case NodeStateElection:
1257  return "election";
1259  return "reorganization";
1260  case NodeStateNormal:
1261  return "normal";
1262  }
1263  return "unknown";
1264  }
1265 } // namespace
1266 
1267 void
1268 NodeI::setState(NodeState s)
1269 {
1270  if (s != _state)
1271  {
1272  if (_traceLevels->election > 0)
1273  {
1274  Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1275  out << "node " << _id << ": transition from " << stateToString(_state) << " to "
1276  << stateToString(s);
1277  }
1278  _state = s;
1279  if (_state == NodeStateNormal)
1280  {
1281  notifyAll();
1282  }
1283  }
1284 }
IceStormElection::GroupNodeInfo
Definition: Replica.h:21
IceStorm
Definition: DBTypes.ice:22
IceStormElection::GroupNodeInfo::operator<
bool operator<(const GroupNodeInfo &rhs) const
Definition: NodeI.cpp:106
IceStormElection::NodeI::merge
void merge(const std::set< int > &)
Definition: NodeI.cpp:419
IceStormElection::QueryInfo::coord
int coord
The nodes coordinator.
Definition: Election.ice:183
IceStormElection::Node::nodes
idempotent NodeInfoSeq nodes()
Get the replication group information.
IceStormElection::GroupInfo::llu
LogUpdate llu
The last known log update for this node.
Definition: Election.ice:171
IceStormElection::QueryInfo
Definition: Election.ice:177
IceStormElection::QueryInfo::up
GroupInfoSeq up
The sequence of nodes in this nodes group.
Definition: Election.ice:195
IceStormElection::LogUpdate
A struct used for marking the last log update.
Definition: LLURecord.h:102
IceStormElection::Node::areYouCoordinator
idempotent bool areYouCoordinator()
Determine if this node is a coordinator.
IceStormElection::NodeStateElection
@ NodeStateElection
The node is electing a leader.
Definition: Election.ice:142
IceStormElection::NodeI::recovery
void recovery(Ice::Long=-1)
Definition: NodeI.cpp:1030
IceStormElection::NodeI::destroy
void destroy()
Definition: NodeI.cpp:1084
IceStormElection::NodeInfo::n
Node * n
The node proxy.
Definition: Election.ice:158
IceStormElection::QueryInfo::id
int id
The node id.
Definition: Election.ice:180
IceStormElection::NodeI::updateMaster
bool updateMaster(const char *, int)
Definition: NodeI.cpp:1164
IceStormElection::ObserverInconsistencyException
Thrown if an observer detects an inconsistency.
Definition: Election.ice:36
IceInternal::Handle<::Ice::Properties >
IceStormElection::GroupNodeInfo::operator==
bool operator==(const GroupNodeInfo &rhs) const
Definition: NodeI.cpp:112
IceStormElection::NodeI::start
void start()
Definition: NodeI.cpp:204
IceStormElection::Node::sync
idempotent Object * sync()
Get the sync object for the replica hosted by this node.
IceStormElection::NodeI::startCachedRead
Ice::ObjectPrx startCachedRead(Ice::Long &, const char *, int)
Definition: NodeI.cpp:1195
IceStormElection::NodeStateInactive
@ NodeStateInactive
The node is inactive and awaiting an election.
Definition: Election.ice:140
IceStormElection::LogUpdate::generation
::Ice::Long generation
Definition: LLURecord.h:104
IceStormElection::QueryInfo::max
int max
The highest priority node that this node has seen.
Definition: Election.ice:198
IceStormElection::NodeState
NodeState
The node state.
Definition: Election.ice:137
IceStormElection
Definition: DBTypes.ice:17
IceStormElection::NodeI::timeout
void timeout()
Definition: NodeI.cpp:371
IceStormElection::QueryInfo::group
string group
The nodes group name.
Definition: Election.ice:186
IceStormElection::NodeI::check
void check()
Definition: NodeI.cpp:235
IceStormElection::LogUpdate::iteration
::Ice::Long iteration
Definition: LLURecord.h:105
IceStormElection::NodeI::ready
virtual void ready(int, const std::string &, const Ice::ObjectPrx &, int, Ice::Long, const Ice::Current &)
Definition: NodeI.cpp:853
IceStormElection::GroupInfo
The group info.
Definition: Election.ice:166
IceStormElection::NodeInfo
All nodes in the replication group.
Definition: Election.ice:153
IceStormElection::NodeStateReorganization
@ NodeStateReorganization
The replica group is reorganizing.
Definition: Election.ice:144
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:918
IceStormElection::GroupNodeInfo::id
const int id
Definition: Replica.h:33
max
T max(T t1, T t2)
Definition: gdiam.h:51
IceStormElection::NodeStateNormal
@ NodeStateNormal
The replica group is active & replicating.
Definition: Election.ice:146
q
#define q
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:478
IceStormElection::NodeI::finishUpdate
void finishUpdate()
Definition: NodeI.cpp:1235
IceStormElection::GroupNodeInfo::observer
const Ice::ObjectPrx observer
Definition: Replica.h:35
IceStormElection::NodeInfo::id
int id
The identity of the node.
Definition: Election.ice:156
IceStormElection::NodeI::accept
virtual void accept(int, const std::string &, const Ice::IntSeq &, const Ice::ObjectPrx &, const LogUpdate &, int, const Ice::Current &)
Definition: NodeI.cpp:901
IceStormElection::NodeI::mergeContinue
void mergeContinue()
Definition: NodeI.cpp:529
TraceLevels.h
IceStormElection::GroupNodeInfo::llu
const LogUpdate llu
Definition: Replica.h:34
std
Definition: Application.h:66
IceStormElection::QueryInfo::state
NodeState state
The node state.
Definition: Election.ice:192
IceUtil::Handle< NodeI >
IceStormElection::NodeI::invitation
virtual void invitation(int, const std::string &, const Ice::Current &)
Definition: NodeI.cpp:724
IceStormElection::NodeInfoSeq
::std::vector<::IceStormElection::NodeInfo > NodeInfoSeq
A sequence of node info.
Definition: Election.h:1264
IceStormElection::NodeI::checkObserverInit
void checkObserverInit(Ice::Long)
Definition: NodeI.cpp:1119
armarx::viz::toString
const char * toString(InteractionFeedbackType type)
Definition: Interaction.h:28
IceStormElection::Node::query
idempotent QueryInfo query()
Get the query information for the given node.
Observers.h
IceStormElection::GroupInfo::id
int id
The identity of the node.
Definition: Election.ice:169
IceStormElection::GroupNodeInfo::GroupNodeInfo
GroupNodeInfo(int i)
Definition: NodeI.cpp:96
IceStormElection::QueryInfo::replica
Object * replica
The replica the node is managing.
Definition: Election.ice:189
NodeI.h
IceStormElection::NodeI::startUpdate
Ice::ObjectPrx startUpdate(Ice::Long &, const char *, int)
Definition: NodeI.cpp:1135
IceStormElection::NodeI::areYouThere
virtual bool areYouThere(const std::string &, int, const Ice::Current &) const
Definition: NodeI.cpp:979
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
IceStormElection::NodeI::NodeI
NodeI(const IceStorm::InstancePtr &, const ReplicaPtr &, const Ice::ObjectPrx &, int, const std::map< int, NodePrx > &)
Definition: NodeI.cpp:169
IceStormElection::NodeI::startObserverUpdate
void startObserverUpdate(Ice::Long, const char *, int)
Definition: NodeI.cpp:1212