InstrumentationI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <Ice/Communicator.h>
11 #include <Ice/LocalException.h>
12 #include <Ice/LoggerUtil.h>
14 
15 using namespace std;
16 using namespace IceStorm;
17 using namespace IceStorm::Instrumentation;
18 using namespace IceMX;
19 
20 namespace
21 {
22 
23  class TopicHelper : public MetricsHelperT<TopicMetrics>
24  {
25  public:
26  class Attributes : public AttributeResolverT<TopicHelper>
27  {
28  public:
29  Attributes()
30  {
31  add("parent", &TopicHelper::getService);
32  add("id", &TopicHelper::getId);
33  add("topic", &TopicHelper::getId);
34  add("service", &TopicHelper::getService);
35  }
36  };
37 
38  static Attributes attributes;
39 
40  TopicHelper(const string& service, const string& name) : _service(service), _name(name)
41  {
42  }
43 
44  virtual string
45  operator()(const string& attribute) const
46  {
47  return attributes(this, attribute);
48  }
49 
50  const string&
51  getService() const
52  {
53  return _service;
54  }
55 
56  const string&
57  getId() const
58  {
59  return _name;
60  }
61 
62  private:
63  const string& _service;
64  const string& _name;
65  };
66 
67  TopicHelper::Attributes TopicHelper::attributes;
68 
69  class SubscriberHelper : public MetricsHelperT<SubscriberMetrics>
70  {
71  public:
72  class Attributes : public AttributeResolverT<SubscriberHelper>
73  {
74  public:
75  Attributes()
76  {
77  add("parent", &SubscriberHelper::getTopic);
78  add("id", &SubscriberHelper::getId);
79  add("topic", &SubscriberHelper::getTopic);
80  add("service", &SubscriberHelper::getService);
81 
82  add("identity", &SubscriberHelper::getIdentity);
83  add("facet", &SubscriberHelper::getProxy, &IceProxy::Ice::Object::ice_getFacet);
84  add("encoding",
85  &SubscriberHelper::getProxy,
86  &IceProxy::Ice::Object::ice_getEncodingVersion);
87  add("mode", &SubscriberHelper::getMode);
88  add("proxy", &SubscriberHelper::getProxy);
89  add("link", &SubscriberHelper::_link);
90  add("state", &SubscriberHelper::getState);
91 
92  setDefault(&SubscriberHelper::resolve);
93  }
94  };
95 
96  static Attributes attributes;
97 
98  SubscriberHelper(const string& svc,
99  const string& topic,
100  const ::Ice::ObjectPrx& proxy,
101  const IceStorm::QoS& qos,
102  const IceStorm::TopicPrx& link,
103  SubscriberState state) :
104  _service(svc), _topic(topic), _proxy(proxy), _qos(qos), _link(link), _state(state)
105  {
106  }
107 
108  virtual string
109  operator()(const string& attribute) const
110  {
111  return attributes(this, attribute);
112  }
113 
114  string
115  resolve(const string& attribute) const
116  {
117  if (attribute.compare(0, 4, "qos.") == 0)
118  {
119  IceStorm::QoS::const_iterator p = _qos.find(attribute.substr(4));
120  if (p != _qos.end())
121  {
122  return p->second;
123  }
124  else
125  {
126  return "default";
127  }
128  }
129  throw invalid_argument(attribute);
130  }
131 
132  const string&
133  getService() const
134  {
135  return _service;
136  }
137 
138  const string&
139  getTopic() const
140  {
141  return _topic;
142  }
143 
144  string
145  getMode() const
146  {
147  if (_proxy->ice_isTwoway())
148  {
149  return "twoway";
150  }
151  else if (_proxy->ice_isOneway())
152  {
153  return "oneway";
154  }
155  else if (_proxy->ice_isBatchOneway())
156  {
157  return "batch-oneway";
158  }
159  else if (_proxy->ice_isDatagram())
160  {
161  return "datagram";
162  }
163  else if (_proxy->ice_isBatchDatagram())
164  {
165  return "batch-datagram";
166  }
167  else
168  {
169  return "unknown";
170  }
171  }
172 
173  const string&
174  getId() const
175  {
176  if (_id.empty())
177  {
178  try
179  {
180  _id = _proxy->ice_toString();
181  }
182  catch (const ::Ice::FixedProxyException&)
183  {
184  _id =
185  _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
186  }
187  }
188  return _id;
189  }
190 
191  const ::Ice::ObjectPrx&
192  getProxy() const
193  {
194  return _proxy;
195  }
196 
197  string
198  getState() const
199  {
200  switch (_state)
201  {
203  return "online";
205  return "offline";
207  return "error";
208  default:
209  assert(false);
210  return "";
211  }
212  }
213 
214  string
215  getIdentity() const
216  {
217  return _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
218  }
219 
220  private:
221  const string& _service;
222  const string& _topic;
223  const ::Ice::ObjectPrx& _proxy;
224  const IceStorm::QoS& _qos;
225  const IceStorm::TopicPrx _link;
226  const SubscriberState _state;
227  mutable string _id;
228  };
229 
230  SubscriberHelper::Attributes SubscriberHelper::attributes;
231 
232 } // namespace
233 
234 void
235 TopicObserverI::published()
236 {
237  forEach(inc(&TopicMetrics::published));
238 }
239 
240 void
241 TopicObserverI::forwarded()
242 {
243  forEach(inc(&TopicMetrics::forwarded));
244 }
245 
246 namespace
247 {
248 
249  struct QueuedUpdate
250  {
251  QueuedUpdate(int count) : count(count)
252  {
253  }
254 
255  void
256  operator()(const SubscriberMetricsPtr& v)
257  {
258  v->queued += count;
259  }
260 
261  int count;
262  };
263 
264 } // namespace
265 
266 void
267 SubscriberObserverI::queued(int count)
268 {
269  forEach(QueuedUpdate(count));
270 }
271 
272 namespace
273 {
274 
275  struct OutstandingUpdate
276  {
277  OutstandingUpdate(int count) : count(count)
278  {
279  }
280 
281  void
282  operator()(const SubscriberMetricsPtr& v)
283  {
284  if (v->queued > 0)
285  {
286  v->queued -= count;
287  }
288  v->outstanding += count;
289  }
290 
291  int count;
292  };
293 
294 } // namespace
295 
296 void
297 SubscriberObserverI::outstanding(int count)
298 {
299  forEach(OutstandingUpdate(count));
300 }
301 
302 namespace
303 {
304 
305  struct DeliveredUpdate
306  {
307  DeliveredUpdate(int count) : count(count)
308  {
309  }
310 
311  void
312  operator()(const SubscriberMetricsPtr& v)
313  {
314  if (v->outstanding > 0)
315  {
316  v->outstanding -= count;
317  }
318  v->delivered += count;
319  }
320 
321  int count;
322  };
323 
324 } // namespace
325 
326 void
327 SubscriberObserverI::delivered(int count)
328 {
329  forEach(DeliveredUpdate(count));
330 }
331 
332 TopicManagerObserverI::TopicManagerObserverI(const IceInternal::MetricsAdminIPtr& metrics) :
333  _metrics(metrics), _topics(metrics, "Topic"), _subscribers(metrics, "Subscriber")
334 {
335 }
336 
337 void
339 {
340  _topics.setUpdater(newUpdater(updater, &ObserverUpdater::updateTopicObservers));
341  _subscribers.setUpdater(newUpdater(updater, &ObserverUpdater::updateSubscriberObservers));
342 }
343 
346  const string& topic,
347  const TopicObserverPtr& old)
348 {
349  if (_topics.isEnabled())
350  {
351  try
352  {
353  return _topics.getObserver(TopicHelper(service, topic), old);
354  }
355  catch (const exception& ex)
356  {
357  ::Ice::Error error(_metrics->getLogger());
358  error << "unexpected exception trying to obtain observer:\n" << ex;
359  }
360  }
361  return 0;
362 }
363 
366  const string& topic,
367  const ::Ice::ObjectPrx& proxy,
368  const IceStorm::QoS& qos,
369  const IceStorm::TopicPrx& link,
370  SubscriberState state,
371  const SubscriberObserverPtr& old)
372 {
373  if (_subscribers.isEnabled())
374  {
375  try
376  {
377  return _subscribers.getObserver(SubscriberHelper(svc, topic, proxy, qos, link, state),
378  old);
379  }
380  catch (const exception& ex)
381  {
382  ::Ice::Error error(_metrics->getLogger());
383  error << "unexpected exception trying to obtain observer:\n" << ex;
384  }
385  }
386  return 0;
387 }
IceStorm
Definition: DBTypes.ice:22
IceStorm::TopicManagerObserverI::getTopicObserver
virtual IceStorm::Instrumentation::TopicObserverPtr getTopicObserver(const std::string &, const std::string &, const IceStorm::Instrumentation::TopicObserverPtr &)
Definition: InstrumentationI.cpp:345
IceStorm::TopicManagerObserverI::setObserverUpdater
virtual void setObserverUpdater(const IceStorm::Instrumentation::ObserverUpdaterPtr &)
Definition: InstrumentationI.cpp:338
IceStorm::TopicManagerObserverI::getSubscriberObserver
virtual IceStorm::Instrumentation::SubscriberObserverPtr getSubscriberObserver(const std::string &, const std::string &, const Ice::ObjectPrx &, const IceStorm::QoS &, const IceStorm::TopicPrx &, IceStorm::Instrumentation::SubscriberState, const IceStorm::Instrumentation::SubscriberObserverPtr &)
Definition: InstrumentationI.cpp:365
IceStorm::Instrumentation::SubscriberState
SubscriberState
Definition: Instrumentation.h:204
IceStorm::Instrumentation
Definition: Instrumentation.h:178
IceInternal::Handle
Definition: forward_declarations.h:8
InstrumentationI.h
armarx::ProxyType::topic
@ topic
IceStorm::Instrumentation::SubscriberStateOffline
@ SubscriberStateOffline
Offline, retrying.
Definition: Instrumentation.h:207
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
std
Definition: Application.h:66
IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic >
IceStorm::Instrumentation::SubscriberStateError
@ SubscriberStateError
Error state, awaiting to be destroyed.
Definition: Instrumentation.h:208
IceStorm::Instrumentation::SubscriberStateOnline
@ SubscriberStateOnline
Online waiting to send events.
Definition: Instrumentation.h:206