Observers.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <IceStorm/Instance.h>
11 #include <IceStorm/Observers.h>
12 #include <IceStorm/TraceLevels.h>
13 
14 using namespace std;
15 using namespace IceStorm;
16 using namespace IceStormElection;
17 
18 Observers::Observers(const InstancePtr& instance) :
19  _traceLevels(instance->traceLevels()), _majority(0)
20 {
21 }
22 
23 void
24 Observers::setMajority(unsigned int majority)
25 {
26  _majority = majority;
27 }
28 
29 bool
31 {
32  Lock sync(*this);
33  if (_observers.size() >= _majority)
34  {
35  vector<ObserverInfo>::iterator p = _observers.begin();
36  while (p != _observers.end())
37  {
38  try
39  {
40  p->observer->ice_ping();
41  }
42  catch (const Ice::Exception& ex)
43  {
44  if (_traceLevels->replication > 0)
45  {
46  Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
47  out << "ice_ping failed: " << ex;
48  }
49  int id = p->id;
50  p = _observers.erase(p);
51 
52  // COMPILERFIX: Just using following causes double unlock with C++Builder 2007
53  //IceUtil::Mutex::Lock sync(_reapedMutex);
54  _reapedMutex.lock();
55  _reaped.push_back(id);
56  _reapedMutex.unlock();
57  continue;
58  }
59  ++p;
60  }
61  }
62  return _majority == 0 || _observers.size() >= _majority;
63 }
64 
65 void
67 {
68  Lock sync(*this);
69  _observers.clear();
70 }
71 
72 void
73 Observers::getReapedSlaves(std::vector<int>& d)
74 {
75  IceUtil::Mutex::Lock sync(_reapedMutex);
76  d.swap(_reaped);
77 }
78 
79 void
80 Observers::init(const set<GroupNodeInfo>& slaves,
81  const LogUpdate& llu,
82  const TopicContentSeq& content)
83 {
84  {
85  IceUtil::Mutex::Lock sync(_reapedMutex);
86  _reaped.clear();
87  }
88 
89  Lock sync(*this);
90  _observers.clear();
91 
92  vector<ObserverInfo> observers;
93 
94  for (set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p)
95  {
96  try
97  {
98  assert(p->observer);
99 
100  ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer);
101 
102  Ice::AsyncResultPtr result = observer->begin_init(llu, content);
103  observers.push_back(ObserverInfo(p->id, observer, result));
104  }
105  catch (const Ice::Exception& ex)
106  {
107  if (_traceLevels->replication > 0)
108  {
109  Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
110  out << "error calling init on " << p->id << ", exception: " << ex;
111  }
112  throw;
113  }
114  }
115 
116  for (vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p)
117  {
118  try
119  {
120  p->observer->end_init(p->result);
121  p->result = 0;
122  }
123  catch (const Ice::Exception& ex)
124  {
125  if (_traceLevels->replication > 0)
126  {
127  Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
128  out << "init on " << p->id << " failed with exception " << ex;
129  }
130  throw;
131  }
132  }
133 
134  _observers.swap(observers);
135 }
136 
137 void
138 Observers::createTopic(const LogUpdate& llu, const string& name)
139 {
140  Lock sync(*this);
141  for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
142  {
143  p->result = p->observer->begin_createTopic(llu, name);
144  }
145  wait("createTopic");
146 }
147 
148 void
149 Observers::destroyTopic(const LogUpdate& llu, const string& id)
150 {
151  Lock sync(*this);
152  for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
153  {
154  p->result = p->observer->begin_destroyTopic(llu, id);
155  }
156  wait("destroyTopic");
157 }
158 
159 void
160 Observers::addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec)
161 {
162  Lock sync(*this);
163  for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
164  {
165  p->result = p->observer->begin_addSubscriber(llu, name, rec);
166  }
167  wait("addSubscriber");
168 }
169 
170 void
171 Observers::removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id)
172 {
173  Lock sync(*this);
174  for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
175  {
176  p->result = p->observer->begin_removeSubscriber(llu, name, id);
177  }
178  wait("removeSubscriber");
179 }
180 
181 void
182 Observers::wait(const string& op)
183 {
184  vector<ObserverInfo>::iterator p = _observers.begin();
185  while (p != _observers.end())
186  {
187  try
188  {
189  p->result->waitForCompleted();
190  p->result->throwLocalException();
191  }
192  catch (const Ice::Exception& ex)
193  {
194  if (_traceLevels->replication > 0)
195  {
196  Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
197  out << op << ": " << ex;
198  }
199  int id = p->id;
200  p = _observers.erase(p);
201 
202  IceUtil::Mutex::Lock sync(_reapedMutex);
203  _reaped.push_back(id);
204  continue;
205  }
206  ++p;
207  }
208  // If we now no longer have the majority of observers we raise.
209  if (_observers.size() < _majority)
210  {
211  // TODO: Trace here?
212  //Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
213  //out << op;
214  throw Ice::UnknownException(__FILE__, __LINE__);
215  }
216 }
IceStorm
Definition: DBTypes.ice:22
IceStormElection::Observers::removeSubscriber
void removeSubscriber(const LogUpdate &, const std::string &, const Ice::IdentitySeq &)
Definition: Observers.cpp:171
IceStormElection::LogUpdate
A struct used for marking the last log update.
Definition: LLURecord.h:102
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:233
IceStormElection::Observers::getReapedSlaves
void getReapedSlaves(std::vector< int > &)
Definition: Observers.cpp:73
IceStormElection::TopicContentSeq
::std::vector<::IceStormElection::TopicContent > TopicContentSeq
A sequence of topic content.
Definition: Election.h:1225
IceStormElection
Definition: DBTypes.ice:17
IceStormElection::Observers::check
bool check()
Definition: Observers.cpp:30
IceStormElection::Observers::clear
void clear()
Definition: Observers.cpp:66
IceStormElection::Observers::destroyTopic
void destroyTopic(const LogUpdate &, const std::string &)
Definition: Observers.cpp:149
IceStormElection::Observers::addSubscriber
void addSubscriber(const LogUpdate &, const std::string &, const IceStorm::SubscriberRecord &)
Definition: Observers.cpp:160
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:478
IceStormElection::Observers::setMajority
void setMajority(unsigned int)
Definition: Observers.cpp:24
TraceLevels.h
std
Definition: Application.h:66
IceUtil::Handle
Definition: forward_declarations.h:30
IceStormElection::Observers::init
void init(const std::set< IceStormElection::GroupNodeInfo > &, const LogUpdate &, const TopicContentSeq &)
Definition: Observers.cpp:80
IceInternal::ProxyHandle<::IceProxy::IceStormElection::ReplicaObserver >
Observers.h
Instance.h
IceStormElection::Observers::createTopic
void createTopic(const LogUpdate &, const std::string &)
Definition: Observers.cpp:138