InstrumentationI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
11 
12 #include <Ice/LocalException.h>
13 #include <Ice/Communicator.h>
14 #include <Ice/LoggerUtil.h>
15 
16 using namespace std;
17 using namespace IceStorm;
18 using namespace IceStorm::Instrumentation;
19 using namespace IceMX;
20 
21 namespace
22 {
23 
24  class TopicHelper : public MetricsHelperT<TopicMetrics>
25  {
26  public:
27 
28  class Attributes : public AttributeResolverT<TopicHelper>
29  {
30  public:
31 
32  Attributes()
33  {
34  add("parent", &TopicHelper::getService);
35  add("id", &TopicHelper::getId);
36  add("topic", &TopicHelper::getId);
37  add("service", &TopicHelper::getService);
38  }
39  };
40  static Attributes attributes;
41 
42  TopicHelper(const string& service, const string& name) : _service(service), _name(name)
43  {
44  }
45 
46  virtual string operator()(const string& attribute) const
47  {
48  return attributes(this, attribute);
49  }
50 
51  const string& getService() const
52  {
53  return _service;
54  }
55 
56  const string& getId() const
57  {
58  return _name;
59  }
60 
61  private:
62 
63  const string& _service;
64  const string& _name;
65  };
66 
67  TopicHelper::Attributes TopicHelper::attributes;
68 
69  class SubscriberHelper : public MetricsHelperT<SubscriberMetrics>
70  {
71  public:
72 
73  class Attributes : public AttributeResolverT<SubscriberHelper>
74  {
75  public:
76 
77  Attributes()
78  {
79  add("parent", &SubscriberHelper::getTopic);
80  add("id", &SubscriberHelper::getId);
81  add("topic", &SubscriberHelper::getTopic);
82  add("service", &SubscriberHelper::getService);
83 
84  add("identity", &SubscriberHelper::getIdentity);
85  add("facet", &SubscriberHelper::getProxy, &IceProxy::Ice::Object::ice_getFacet);
86  add("encoding", &SubscriberHelper::getProxy, &IceProxy::Ice::Object::ice_getEncodingVersion);
87  add("mode", &SubscriberHelper::getMode);
88  add("proxy", &SubscriberHelper::getProxy);
89  add("link", &SubscriberHelper::_link);
90  add("state", &SubscriberHelper::getState);
91 
92  setDefault(&SubscriberHelper::resolve);
93  }
94  };
95  static Attributes attributes;
96 
97  SubscriberHelper(const string& svc, const string& topic, const ::Ice::ObjectPrx& proxy, const IceStorm::QoS& qos,
98  const IceStorm::TopicPrx& link, SubscriberState state) :
99  _service(svc), _topic(topic), _proxy(proxy), _qos(qos), _link(link), _state(state)
100  {
101  }
102 
103  virtual string operator()(const string& attribute) const
104  {
105  return attributes(this, attribute);
106  }
107 
108  string resolve(const string& attribute) const
109  {
110  if (attribute.compare(0, 4, "qos.") == 0)
111  {
112  IceStorm::QoS::const_iterator p = _qos.find(attribute.substr(4));
113  if (p != _qos.end())
114  {
115  return p->second;
116  }
117  else
118  {
119  return "default";
120  }
121  }
122  throw invalid_argument(attribute);
123  }
124 
125  const string&
126  getService() const
127  {
128  return _service;
129  }
130 
131  const string&
132  getTopic() const
133  {
134  return _topic;
135  }
136 
137  string
138  getMode() const
139  {
140  if (_proxy->ice_isTwoway())
141  {
142  return "twoway";
143  }
144  else if (_proxy->ice_isOneway())
145  {
146  return "oneway";
147  }
148  else if (_proxy->ice_isBatchOneway())
149  {
150  return "batch-oneway";
151  }
152  else if (_proxy->ice_isDatagram())
153  {
154  return "datagram";
155  }
156  else if (_proxy->ice_isBatchDatagram())
157  {
158  return "batch-datagram";
159  }
160  else
161  {
162  return "unknown";
163  }
164  }
165 
166  const string&
167  getId() const
168  {
169  if (_id.empty())
170  {
171  try
172  {
173  _id = _proxy->ice_toString();
174  }
175  catch (const ::Ice::FixedProxyException&)
176  {
177  _id = _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
178  }
179  }
180  return _id;
181  }
182 
183  const ::Ice::ObjectPrx&
184  getProxy() const
185  {
186  return _proxy;
187  }
188 
189  string
190  getState() const
191  {
192  switch (_state)
193  {
195  return "online";
197  return "offline";
199  return "error";
200  default:
201  assert(false);
202  return "";
203  }
204  }
205 
206  string
207  getIdentity() const
208  {
209  return _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
210  }
211 
212  private:
213 
214  const string& _service;
215  const string& _topic;
216  const ::Ice::ObjectPrx& _proxy;
217  const IceStorm::QoS& _qos;
218  const IceStorm::TopicPrx _link;
219  const SubscriberState _state;
220  mutable string _id;
221  };
222 
223  SubscriberHelper::Attributes SubscriberHelper::attributes;
224 
225 }
226 
227 void
228 TopicObserverI::published()
229 {
230  forEach(inc(&TopicMetrics::published));
231 }
232 
233 void
234 TopicObserverI::forwarded()
235 {
236  forEach(inc(&TopicMetrics::forwarded));
237 }
238 
239 namespace
240 {
241 
242  struct QueuedUpdate
243  {
244  QueuedUpdate(int count) : count(count)
245  {
246  }
247 
248  void operator()(const SubscriberMetricsPtr& v)
249  {
250  v->queued += count;
251  }
252 
253  int count;
254  };
255 
256 }
257 void
258 SubscriberObserverI::queued(int count)
259 {
260  forEach(QueuedUpdate(count));
261 }
262 
263 namespace
264 {
265 
266  struct OutstandingUpdate
267  {
268  OutstandingUpdate(int count) : count(count)
269  {
270  }
271 
272  void operator()(const SubscriberMetricsPtr& v)
273  {
274  if (v->queued > 0)
275  {
276  v->queued -= count;
277  }
278  v->outstanding += count;
279  }
280 
281  int count;
282  };
283 
284 }
285 
286 void
287 SubscriberObserverI::outstanding(int count)
288 {
289  forEach(OutstandingUpdate(count));
290 }
291 
292 namespace
293 {
294 
295  struct DeliveredUpdate
296  {
297  DeliveredUpdate(int count) : count(count)
298  {
299  }
300 
301  void operator()(const SubscriberMetricsPtr& v)
302  {
303  if (v->outstanding > 0)
304  {
305  v->outstanding -= count;
306  }
307  v->delivered += count;
308  }
309 
310  int count;
311  };
312 
313 }
314 
315 void
316 SubscriberObserverI::delivered(int count)
317 {
318  forEach(DeliveredUpdate(count));
319 }
320 
321 TopicManagerObserverI::TopicManagerObserverI(const IceInternal::MetricsAdminIPtr& metrics) :
322  _metrics(metrics),
323  _topics(metrics, "Topic"),
324  _subscribers(metrics, "Subscriber")
325 {
326 }
327 
328 void
330 {
331  _topics.setUpdater(newUpdater(updater, &ObserverUpdater::updateTopicObservers));
332  _subscribers.setUpdater(newUpdater(updater, &ObserverUpdater::updateSubscriberObservers));
333 }
334 
336 TopicManagerObserverI::getTopicObserver(const string& service, const string& topic, const TopicObserverPtr& old)
337 {
338  if (_topics.isEnabled())
339  {
340  try
341  {
342  return _topics.getObserver(TopicHelper(service, topic), old);
343  }
344  catch (const exception& ex)
345  {
346  ::Ice::Error error(_metrics->getLogger());
347  error << "unexpected exception trying to obtain observer:\n" << ex;
348  }
349  }
350  return 0;
351 }
352 
355  const string& topic,
356  const ::Ice::ObjectPrx& proxy,
357  const IceStorm::QoS& qos,
358  const IceStorm::TopicPrx& link,
359  SubscriberState state,
360  const SubscriberObserverPtr& old)
361 {
362  if (_subscribers.isEnabled())
363  {
364  try
365  {
366  return _subscribers.getObserver(SubscriberHelper(svc, topic, proxy, qos, link, state), old);
367  }
368  catch (const exception& ex)
369  {
370  ::Ice::Error error(_metrics->getLogger());
371  error << "unexpected exception trying to obtain observer:\n" << ex;
372  }
373  }
374  return 0;
375 }
IceStorm
Definition: DBTypes.ice:22
IceStorm::TopicManagerObserverI::getTopicObserver
virtual IceStorm::Instrumentation::TopicObserverPtr getTopicObserver(const std::string &, const std::string &, const IceStorm::Instrumentation::TopicObserverPtr &)
Definition: InstrumentationI.cpp:336
IceStorm::TopicManagerObserverI::setObserverUpdater
virtual void setObserverUpdater(const IceStorm::Instrumentation::ObserverUpdaterPtr &)
Definition: InstrumentationI.cpp:329
IceStorm::TopicManagerObserverI::getSubscriberObserver
virtual IceStorm::Instrumentation::SubscriberObserverPtr getSubscriberObserver(const std::string &, const std::string &, const Ice::ObjectPrx &, const IceStorm::QoS &, const IceStorm::TopicPrx &, IceStorm::Instrumentation::SubscriberState, const IceStorm::Instrumentation::SubscriberObserverPtr &)
Definition: InstrumentationI.cpp:354
IceStorm::Instrumentation::SubscriberState
SubscriberState
Definition: Instrumentation.h:203
IceStorm::Instrumentation
Definition: Instrumentation.h:177
IceInternal::Handle
Definition: forward_declarations.h:8
InstrumentationI.h
armarx::ProxyType::topic
@ topic
IceStorm::Instrumentation::SubscriberStateOffline
@ SubscriberStateOffline
Offline, retrying.
Definition: Instrumentation.h:206
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
std
Definition: Application.h:66
IceInternal::ProxyHandle< ::IceProxy::IceStorm::Topic >
IceStorm::Instrumentation::SubscriberStateError
@ SubscriberStateError
Error state, awaiting to be destroyed.
Definition: Instrumentation.h:207
IceStorm::Instrumentation::SubscriberStateOnline
@ SubscriberStateOnline
Online waiting to send events.
Definition: Instrumentation.h:205