Observers.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <IceStorm/Observers.h>
11 #include <IceStorm/Instance.h>
12 #include <IceStorm/TraceLevels.h>
13 
14 using namespace std;
15 using namespace IceStorm;
16 using namespace IceStormElection;
17 
18 Observers::Observers(const InstancePtr& instance) :
19  _traceLevels(instance->traceLevels()),
20  _majority(0)
21 {
22 }
23 
24 void
25 Observers::setMajority(unsigned int majority)
26 {
27  _majority = majority;
28 }
29 
30 bool
32 {
33  Lock sync(*this);
34  if (_observers.size() >= _majority)
35  {
36  vector<ObserverInfo>::iterator p = _observers.begin();
37  while (p != _observers.end())
38  {
39  try
40  {
41  p->observer->ice_ping();
42  }
43  catch (const Ice::Exception& ex)
44  {
45  if (_traceLevels->replication > 0)
46  {
47  Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
48  out << "ice_ping failed: " << ex;
49  }
50  int id = p->id;
51  p = _observers.erase(p);
52 
53  // COMPILERFIX: Just using following causes double unlock with C++Builder 2007
54  //IceUtil::Mutex::Lock sync(_reapedMutex);
55  _reapedMutex.lock();
56  _reaped.push_back(id);
57  _reapedMutex.unlock();
58  continue;
59  }
60  ++p;
61  }
62  }
63  return _majority == 0 || _observers.size() >= _majority;
64 }
65 
66 void
68 {
69  Lock sync(*this);
70  _observers.clear();
71 }
72 
73 void
74 Observers::getReapedSlaves(std::vector<int>& d)
75 {
76  IceUtil::Mutex::Lock sync(_reapedMutex);
77  d.swap(_reaped);
78 }
79 
80 void
81 Observers::init(const set<GroupNodeInfo>& slaves, const LogUpdate& llu, const TopicContentSeq& content)
82 {
83  {
84  IceUtil::Mutex::Lock sync(_reapedMutex);
85  _reaped.clear();
86  }
87 
88  Lock sync(*this);
89  _observers.clear();
90 
91  vector<ObserverInfo> observers;
92 
93  for (set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p)
94  {
95  try
96  {
97  assert(p->observer);
98 
99  ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer);
100 
101  Ice::AsyncResultPtr result = observer->begin_init(llu, content);
102  observers.push_back(ObserverInfo(p->id, observer, result));
103  }
104  catch (const Ice::Exception& ex)
105  {
106  if (_traceLevels->replication > 0)
107  {
108  Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
109  out << "error calling init on " << p->id << ", exception: " << ex;
110  }
111  throw;
112  }
113  }
114 
115  for (vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p)
116  {
117  try
118  {
119  p->observer->end_init(p->result);
120  p->result = 0;
121  }
122  catch (const Ice::Exception& ex)
123  {
124  if (_traceLevels->replication > 0)
125  {
126  Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
127  out << "init on " << p->id << " failed with exception " << ex;
128  }
129  throw;
130  }
131  }
132 
133  _observers.swap(observers);
134 }
135 
136 void
137 Observers::createTopic(const LogUpdate& llu, const string& name)
138 {
139  Lock sync(*this);
140  for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
141  {
142  p->result = p->observer->begin_createTopic(llu, name);
143  }
144  wait("createTopic");
145 }
146 
147 void
148 Observers::destroyTopic(const LogUpdate& llu, const string& id)
149 {
150  Lock sync(*this);
151  for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
152  {
153  p->result = p->observer->begin_destroyTopic(llu, id);
154  }
155  wait("destroyTopic");
156 }
157 
158 void
159 Observers::addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec)
160 {
161  Lock sync(*this);
162  for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
163  {
164  p->result = p->observer->begin_addSubscriber(llu, name, rec);
165  }
166  wait("addSubscriber");
167 }
168 
169 void
170 Observers::removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id)
171 {
172  Lock sync(*this);
173  for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
174  {
175  p->result = p->observer->begin_removeSubscriber(llu, name, id);
176  }
177  wait("removeSubscriber");
178 }
179 
180 void
181 Observers::wait(const string& op)
182 {
183  vector<ObserverInfo>::iterator p = _observers.begin();
184  while (p != _observers.end())
185  {
186  try
187  {
188  p->result->waitForCompleted();
189  p->result->throwLocalException();
190  }
191  catch (const Ice::Exception& ex)
192  {
193  if (_traceLevels->replication > 0)
194  {
195  Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
196  out << op << ": " << ex;
197  }
198  int id = p->id;
199  p = _observers.erase(p);
200 
201  IceUtil::Mutex::Lock sync(_reapedMutex);
202  _reaped.push_back(id);
203  continue;
204  }
205  ++p;
206  }
207  // If we now no longer have the majority of observers we raise.
208  if (_observers.size() < _majority)
209  {
210  // TODO: Trace here?
211  //Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
212  //out << op;
213  throw Ice::UnknownException(__FILE__, __LINE__);
214  }
215 }
IceStorm
Definition: DBTypes.ice:22
IceStormElection::Observers::removeSubscriber
void removeSubscriber(const LogUpdate &, const std::string &, const Ice::IdentitySeq &)
Definition: Observers.cpp:170
IceStormElection::LogUpdate
A struct used for marking the last log update.
Definition: LLURecord.h:100
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:214
IceStormElection::Observers::getReapedSlaves
void getReapedSlaves(std::vector< int > &)
Definition: Observers.cpp:74
IceStormElection::TopicContentSeq
::std::vector< ::IceStormElection::TopicContent > TopicContentSeq
A sequence of topic content.
Definition: Election.h:819
IceStormElection
Definition: DBTypes.ice:17
IceStormElection::Observers::check
bool check()
Definition: Observers.cpp:31
IceStormElection::Observers::clear
void clear()
Definition: Observers.cpp:67
IceStormElection::Observers::destroyTopic
void destroyTopic(const LogUpdate &, const std::string &)
Definition: Observers.cpp:148
IceStormElection::Observers::addSubscriber
void addSubscriber(const LogUpdate &, const std::string &, const IceStorm::SubscriberRecord &)
Definition: Observers.cpp:159
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:438
IceStormElection::Observers::setMajority
void setMajority(unsigned int)
Definition: Observers.cpp:25
TraceLevels.h
std
Definition: Application.h:66
IceUtil::Handle
Definition: forward_declarations.h:29
IceStormElection::Observers::init
void init(const std::set< IceStormElection::GroupNodeInfo > &, const LogUpdate &, const TopicContentSeq &)
Definition: Observers.cpp:81
IceInternal::ProxyHandle< ::IceProxy::IceStormElection::ReplicaObserver >
Observers.h
Instance.h
IceStormElection::Observers::createTopic
void createTopic(const LogUpdate &, const std::string &)
Definition: Observers.cpp:137