Observers.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <IceStorm/Instance.h>
11#include <IceStorm/Observers.h>
13
14using namespace std;
15using namespace IceStorm;
16using namespace IceStormElection;
17
19 _traceLevels(instance->traceLevels()), _majority(0)
20{
21}
22
23void
24Observers::setMajority(unsigned int majority)
25{
26 _majority = majority;
27}
28
29bool
31{
32 Lock sync(*this);
33 if (_observers.size() >= _majority)
34 {
35 vector<ObserverInfo>::iterator p = _observers.begin();
36 while (p != _observers.end())
37 {
38 try
39 {
40 p->observer->ice_ping();
41 }
42 catch (const Ice::Exception& ex)
43 {
44 if (_traceLevels->replication > 0)
45 {
46 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
47 out << "ice_ping failed: " << ex;
48 }
49 int id = p->id;
50 p = _observers.erase(p);
51
52 // COMPILERFIX: Just using following causes double unlock with C++Builder 2007
53 //IceUtil::Mutex::Lock sync(_reapedMutex);
54 _reapedMutex.lock();
55 _reaped.push_back(id);
56 _reapedMutex.unlock();
57 continue;
58 }
59 ++p;
60 }
61 }
62 return _majority == 0 || _observers.size() >= _majority;
63}
64
65void
67{
68 Lock sync(*this);
69 _observers.clear();
70}
71
72void
73Observers::getReapedSlaves(std::vector<int>& d)
74{
75 IceUtil::Mutex::Lock sync(_reapedMutex);
76 d.swap(_reaped);
77}
78
79void
80Observers::init(const set<GroupNodeInfo>& slaves,
81 const LogUpdate& llu,
82 const TopicContentSeq& content)
83{
84 {
85 IceUtil::Mutex::Lock sync(_reapedMutex);
86 _reaped.clear();
87 }
88
89 Lock sync(*this);
90 _observers.clear();
91
92 vector<ObserverInfo> observers;
93
94 for (set<GroupNodeInfo>::const_iterator p = slaves.begin(); p != slaves.end(); ++p)
95 {
96 try
97 {
98 assert(p->observer);
99
100 ReplicaObserverPrx observer = ReplicaObserverPrx::uncheckedCast(p->observer);
101
102 Ice::AsyncResultPtr result = observer->begin_init(llu, content);
103 observers.push_back(ObserverInfo(p->id, observer, result));
104 }
105 catch (const Ice::Exception& ex)
106 {
107 if (_traceLevels->replication > 0)
108 {
109 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
110 out << "error calling init on " << p->id << ", exception: " << ex;
111 }
112 throw;
113 }
114 }
115
116 for (vector<ObserverInfo>::iterator p = observers.begin(); p != observers.end(); ++p)
117 {
118 try
119 {
120 p->observer->end_init(p->result);
121 p->result = 0;
122 }
123 catch (const Ice::Exception& ex)
124 {
125 if (_traceLevels->replication > 0)
126 {
127 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
128 out << "init on " << p->id << " failed with exception " << ex;
129 }
130 throw;
131 }
132 }
133
134 _observers.swap(observers);
135}
136
137void
138Observers::createTopic(const LogUpdate& llu, const string& name)
139{
140 Lock sync(*this);
141 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
142 {
143 p->result = p->observer->begin_createTopic(llu, name);
144 }
145 wait("createTopic");
146}
147
148void
149Observers::destroyTopic(const LogUpdate& llu, const string& id)
150{
151 Lock sync(*this);
152 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
153 {
154 p->result = p->observer->begin_destroyTopic(llu, id);
155 }
156 wait("destroyTopic");
157}
158
159void
160Observers::addSubscriber(const LogUpdate& llu, const string& name, const SubscriberRecord& rec)
161{
162 Lock sync(*this);
163 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
164 {
165 p->result = p->observer->begin_addSubscriber(llu, name, rec);
166 }
167 wait("addSubscriber");
168}
169
170void
171Observers::removeSubscriber(const LogUpdate& llu, const string& name, const Ice::IdentitySeq& id)
172{
173 Lock sync(*this);
174 for (vector<ObserverInfo>::iterator p = _observers.begin(); p != _observers.end(); ++p)
175 {
176 p->result = p->observer->begin_removeSubscriber(llu, name, id);
177 }
178 wait("removeSubscriber");
179}
180
181void
182Observers::wait(const string& op)
183{
184 vector<ObserverInfo>::iterator p = _observers.begin();
185 while (p != _observers.end())
186 {
187 try
188 {
189 p->result->waitForCompleted();
190 p->result->throwLocalException();
191 }
192 catch (const Ice::Exception& ex)
193 {
194 if (_traceLevels->replication > 0)
195 {
196 Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
197 out << op << ": " << ex;
198 }
199 int id = p->id;
200 p = _observers.erase(p);
201
202 IceUtil::Mutex::Lock sync(_reapedMutex);
203 _reaped.push_back(id);
204 continue;
205 }
206 ++p;
207 }
208 // If we now no longer have the majority of observers we raise.
209 if (_observers.size() < _majority)
210 {
211 // TODO: Trace here?
212 //Ice::Trace out(_traceLevels->logger, _traceLevels->replicationCat);
213 //out << op;
214 throw Ice::UnknownException(__FILE__, __LINE__);
215 }
216}
void getReapedSlaves(std::vector< int > &)
Definition Observers.cpp:73
void createTopic(const LogUpdate &, const std::string &)
void setMajority(unsigned int)
Definition Observers.cpp:24
void init(const std::set< IceStormElection::GroupNodeInfo > &, const LogUpdate &, const TopicContentSeq &)
Definition Observers.cpp:80
void addSubscriber(const LogUpdate &, const std::string &, const IceStorm::SubscriberRecord &)
Observers(const IceStorm::InstancePtr &)
Definition Observers.cpp:18
void removeSubscriber(const LogUpdate &, const std::string &, const Ice::IdentitySeq &)
void destroyTopic(const LogUpdate &, const std::string &)
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::ReplicaObserver > ReplicaObserverPrx
Definition Election.h:1195
::std::vector<::IceStormElection::TopicContent > TopicContentSeq
A sequence of topic content.
Definition Election.h:1225
IceUtil::Handle< Instance > InstancePtr
Definition Instance.h:128
A struct used for marking the last log update.
Definition LLURecord.h:103
Used to store persistent information for persistent subscribers.