InstrumentationI.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <Ice/Communicator.h>
11#include <Ice/LocalException.h>
12#include <Ice/LoggerUtil.h>
14
15using namespace std;
16using namespace IceStorm;
17using namespace IceStorm::Instrumentation;
18using namespace IceMX;
19
20namespace
21{
22
23 class TopicHelper : public MetricsHelperT<TopicMetrics>
24 {
25 public:
26 class Attributes : public AttributeResolverT<TopicHelper>
27 {
28 public:
29 Attributes()
30 {
31 add("parent", &TopicHelper::getService);
32 add("id", &TopicHelper::getId);
33 add("topic", &TopicHelper::getId);
34 add("service", &TopicHelper::getService);
35 }
36 };
37
38 static Attributes attributes;
39
40 TopicHelper(const string& service, const string& name) : _service(service), _name(name)
41 {
42 }
43
44 virtual string
45 operator()(const string& attribute) const
46 {
47 return attributes(this, attribute);
48 }
49
50 const string&
51 getService() const
52 {
53 return _service;
54 }
55
56 const string&
57 getId() const
58 {
59 return _name;
60 }
61
62 private:
63 const string& _service;
64 const string& _name;
65 };
66
67 TopicHelper::Attributes TopicHelper::attributes;
68
69 class SubscriberHelper : public MetricsHelperT<SubscriberMetrics>
70 {
71 public:
72 class Attributes : public AttributeResolverT<SubscriberHelper>
73 {
74 public:
75 Attributes()
76 {
77 add("parent", &SubscriberHelper::getTopic);
78 add("id", &SubscriberHelper::getId);
79 add("topic", &SubscriberHelper::getTopic);
80 add("service", &SubscriberHelper::getService);
81
82 add("identity", &SubscriberHelper::getIdentity);
83 add("facet", &SubscriberHelper::getProxy, &IceProxy::Ice::Object::ice_getFacet);
84 add("encoding",
85 &SubscriberHelper::getProxy,
86 &IceProxy::Ice::Object::ice_getEncodingVersion);
87 add("mode", &SubscriberHelper::getMode);
88 add("proxy", &SubscriberHelper::getProxy);
89 add("link", &SubscriberHelper::_link);
90 add("state", &SubscriberHelper::getState);
91
92 setDefault(&SubscriberHelper::resolve);
93 }
94 };
95
96 static Attributes attributes;
97
98 SubscriberHelper(const string& svc,
99 const string& topic,
100 const ::Ice::ObjectPrx& proxy,
101 const IceStorm::QoS& qos,
102 const IceStorm::TopicPrx& link,
103 SubscriberState state) :
104 _service(svc), _topic(topic), _proxy(proxy), _qos(qos), _link(link), _state(state)
105 {
106 }
107
108 virtual string
109 operator()(const string& attribute) const
110 {
111 return attributes(this, attribute);
112 }
113
114 string
115 resolve(const string& attribute) const
116 {
117 if (attribute.compare(0, 4, "qos.") == 0)
118 {
119 IceStorm::QoS::const_iterator p = _qos.find(attribute.substr(4));
120 if (p != _qos.end())
121 {
122 return p->second;
123 }
124 else
125 {
126 return "default";
127 }
128 }
129 throw invalid_argument(attribute);
130 }
131
132 const string&
133 getService() const
134 {
135 return _service;
136 }
137
138 const string&
139 getTopic() const
140 {
141 return _topic;
142 }
143
144 string
145 getMode() const
146 {
147 if (_proxy->ice_isTwoway())
148 {
149 return "twoway";
150 }
151 else if (_proxy->ice_isOneway())
152 {
153 return "oneway";
154 }
155 else if (_proxy->ice_isBatchOneway())
156 {
157 return "batch-oneway";
158 }
159 else if (_proxy->ice_isDatagram())
160 {
161 return "datagram";
162 }
163 else if (_proxy->ice_isBatchDatagram())
164 {
165 return "batch-datagram";
166 }
167 else
168 {
169 return "unknown";
170 }
171 }
172
173 const string&
174 getId() const
175 {
176 if (_id.empty())
177 {
178 try
179 {
180 _id = _proxy->ice_toString();
181 }
182 catch (const ::Ice::FixedProxyException&)
183 {
184 _id =
185 _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
186 }
187 }
188 return _id;
189 }
190
191 const ::Ice::ObjectPrx&
192 getProxy() const
193 {
194 return _proxy;
195 }
196
197 string
198 getState() const
199 {
200 switch (_state)
201 {
203 return "online";
205 return "offline";
207 return "error";
208 default:
209 assert(false);
210 return "";
211 }
212 }
213
214 string
215 getIdentity() const
216 {
217 return _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
218 }
219
220 private:
221 const string& _service;
222 const string& _topic;
223 const ::Ice::ObjectPrx& _proxy;
224 const IceStorm::QoS& _qos;
225 const IceStorm::TopicPrx _link;
226 const SubscriberState _state;
227 mutable string _id;
228 };
229
230 SubscriberHelper::Attributes SubscriberHelper::attributes;
231
232} // namespace
233
234void
236{
237 forEach(inc(&TopicMetrics::published));
238}
239
240void
242{
243 forEach(inc(&TopicMetrics::forwarded));
244}
245
246namespace
247{
248
249 struct QueuedUpdate
250 {
251 QueuedUpdate(int count) : count(count)
252 {
253 }
254
255 void
256 operator()(const SubscriberMetricsPtr& v)
257 {
258 v->queued += count;
259 }
260
261 int count;
262 };
263
264} // namespace
265
266void
268{
269 forEach(QueuedUpdate(count));
270}
271
272namespace
273{
274
275 struct OutstandingUpdate
276 {
277 OutstandingUpdate(int count) : count(count)
278 {
279 }
280
281 void
282 operator()(const SubscriberMetricsPtr& v)
283 {
284 if (v->queued > 0)
285 {
286 v->queued -= count;
287 }
288 v->outstanding += count;
289 }
290
291 int count;
292 };
293
294} // namespace
295
296void
298{
299 forEach(OutstandingUpdate(count));
300}
301
302namespace
303{
304
305 struct DeliveredUpdate
306 {
307 DeliveredUpdate(int count) : count(count)
308 {
309 }
310
311 void
312 operator()(const SubscriberMetricsPtr& v)
313 {
314 if (v->outstanding > 0)
315 {
316 v->outstanding -= count;
317 }
318 v->delivered += count;
319 }
320
321 int count;
322 };
323
324} // namespace
325
326void
328{
329 forEach(DeliveredUpdate(count));
330}
331
332TopicManagerObserverI::TopicManagerObserverI(const IceInternal::MetricsAdminIPtr& metrics) :
333 _metrics(metrics), _topics(metrics, "Topic"), _subscribers(metrics, "Subscriber")
334{
335}
336
337void
339{
340 _topics.setUpdater(newUpdater(updater, &ObserverUpdater::updateTopicObservers));
341 _subscribers.setUpdater(newUpdater(updater, &ObserverUpdater::updateSubscriberObservers));
342}
343
346 const string& topic,
347 const TopicObserverPtr& old)
348{
349 if (_topics.isEnabled())
350 {
351 try
352 {
353 return _topics.getObserver(TopicHelper(service, topic), old);
354 }
355 catch (const exception& ex)
356 {
357 ::Ice::Error error(_metrics->getLogger());
358 error << "unexpected exception trying to obtain observer:\n" << ex;
359 }
360 }
361 return 0;
362}
363
366 const string& topic,
367 const ::Ice::ObjectPrx& proxy,
368 const IceStorm::QoS& qos,
369 const IceStorm::TopicPrx& link,
370 SubscriberState state,
371 const SubscriberObserverPtr& old)
372{
373 if (_subscribers.isEnabled())
374 {
375 try
376 {
377 return _subscribers.getObserver(SubscriberHelper(svc, topic, proxy, qos, link, state),
378 old);
379 }
380 catch (const exception& ex)
381 {
382 ::Ice::Error error(_metrics->getLogger());
383 error << "unexpected exception trying to obtain observer:\n" << ex;
384 }
385 }
386 return 0;
387}
virtual void outstanding(int)
Notification of a some events being sent.
virtual void delivered(int)
Notification of some events being delivered.
virtual void queued(int)
Notification of some events being queued.
TopicManagerObserverI(const IceInternal::MetricsAdminIPtr &)
virtual IceStorm::Instrumentation::SubscriberObserverPtr getSubscriberObserver(const std::string &, const std::string &, const Ice::ObjectPrx &, const IceStorm::QoS &, const IceStorm::TopicPrx &, IceStorm::Instrumentation::SubscriberState, const IceStorm::Instrumentation::SubscriberObserverPtr &)
virtual IceStorm::Instrumentation::TopicObserverPtr getTopicObserver(const std::string &, const std::string &, const IceStorm::Instrumentation::TopicObserverPtr &)
virtual void setObserverUpdater(const IceStorm::Instrumentation::ObserverUpdaterPtr &)
::IceInternal::Handle<::IceStorm::Instrumentation::ObserverUpdater > ObserverUpdaterPtr
::IceInternal::Handle<::IceStorm::Instrumentation::SubscriberObserver > SubscriberObserverPtr
@ SubscriberStateOnline
Online waiting to send events.
@ SubscriberStateOffline
Offline, retrying.
@ SubscriberStateError
Error state, awaiting to be destroyed.
::IceInternal::Handle<::IceStorm::Instrumentation::TopicObserver > TopicObserverPtr
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
Definition IceManager.h:70
double v(double t, double v0, double a0, double j)
Definition CtrlUtil.h:39