19 _traceLevels(instance->traceLevels()), _majority(0)
33 if (_observers.size() >= _majority)
35 vector<ObserverInfo>::iterator p = _observers.begin();
36 while (p != _observers.end())
40 p->observer->ice_ping();
42 catch (
const Ice::Exception& ex)
44 if (_traceLevels->replication > 0)
46 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
47 out <<
"ice_ping failed: " << ex;
50 p = _observers.erase(p);
55 _reaped.push_back(
id);
56 _reapedMutex.unlock();
62 return _majority == 0 || _observers.size() >= _majority;
75 IceUtil::Mutex::Lock sync(_reapedMutex);
85 IceUtil::Mutex::Lock sync(_reapedMutex);
92 vector<ObserverInfo> observers;
94 for (set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p)
102 Ice::AsyncResultPtr result = observer->begin_init(llu, content);
103 observers.push_back(ObserverInfo(p->id, observer, result));
105 catch (
const Ice::Exception& ex)
107 if (_traceLevels->replication > 0)
109 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
110 out <<
"error calling init on " << p->id <<
", exception: " << ex;
116 for (vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p)
120 p->observer->end_init(p->result);
123 catch (
const Ice::Exception& ex)
125 if (_traceLevels->replication > 0)
127 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
128 out <<
"init on " << p->id <<
" failed with exception " << ex;
134 _observers.swap(observers);
141 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
143 p->result = p->observer->begin_createTopic(llu, name);
152 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
154 p->result = p->observer->begin_destroyTopic(llu,
id);
156 wait(
"destroyTopic");
163 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
165 p->result = p->observer->begin_addSubscriber(llu, name, rec);
167 wait(
"addSubscriber");
174 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
176 p->result = p->observer->begin_removeSubscriber(llu, name,
id);
178 wait(
"removeSubscriber");
182 Observers::wait(
const string& op)
184 vector<ObserverInfo>::iterator p = _observers.begin();
185 while (p != _observers.end())
189 p->result->waitForCompleted();
190 p->result->throwLocalException();
192 catch (
const Ice::Exception& ex)
194 if (_traceLevels->replication > 0)
196 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
197 out << op <<
": " << ex;
200 p = _observers.erase(p);
202 IceUtil::Mutex::Lock sync(_reapedMutex);
203 _reaped.push_back(
id);
209 if (_observers.size() < _majority)
214 throw Ice::UnknownException(__FILE__, __LINE__);