19 _traceLevels(instance->traceLevels()),
34 if (_observers.size() >= _majority)
36 vector<ObserverInfo>::iterator p = _observers.begin();
37 while (p != _observers.end())
41 p->observer->ice_ping();
43 catch (
const Ice::Exception& ex)
45 if (_traceLevels->replication > 0)
47 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
48 out <<
"ice_ping failed: " << ex;
51 p = _observers.erase(p);
56 _reaped.push_back(
id);
57 _reapedMutex.unlock();
63 return _majority == 0 || _observers.size() >= _majority;
76 IceUtil::Mutex::Lock sync(_reapedMutex);
84 IceUtil::Mutex::Lock sync(_reapedMutex);
91 vector<ObserverInfo> observers;
93 for (set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p)
101 Ice::AsyncResultPtr result = observer->begin_init(llu, content);
102 observers.push_back(ObserverInfo(p->id, observer, result));
104 catch (
const Ice::Exception& ex)
106 if (_traceLevels->replication > 0)
108 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
109 out <<
"error calling init on " << p->id <<
", exception: " << ex;
115 for (vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p)
119 p->observer->end_init(p->result);
122 catch (
const Ice::Exception& ex)
124 if (_traceLevels->replication > 0)
126 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
127 out <<
"init on " << p->id <<
" failed with exception " << ex;
133 _observers.swap(observers);
140 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
142 p->result = p->observer->begin_createTopic(llu, name);
151 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
153 p->result = p->observer->begin_destroyTopic(llu,
id);
155 wait(
"destroyTopic");
162 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
164 p->result = p->observer->begin_addSubscriber(llu, name, rec);
166 wait(
"addSubscriber");
173 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
175 p->result = p->observer->begin_removeSubscriber(llu, name,
id);
177 wait(
"removeSubscriber");
181 Observers::wait(
const string& op)
183 vector<ObserverInfo>::iterator p = _observers.begin();
184 while (p != _observers.end())
188 p->result->waitForCompleted();
189 p->result->throwLocalException();
191 catch (
const Ice::Exception& ex)
193 if (_traceLevels->replication > 0)
195 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
196 out << op <<
": " << ex;
199 p = _observers.erase(p);
201 IceUtil::Mutex::Lock sync(_reapedMutex);
202 _reaped.push_back(
id);
208 if (_observers.size() < _majority)
213 throw Ice::UnknownException(__FILE__, __LINE__);