10#include <Ice/Communicator.h>
11#include <Ice/LocalException.h>
12#include <Ice/LoggerUtil.h>
23 class TopicHelper :
public MetricsHelperT<TopicMetrics>
26 class Attributes :
public AttributeResolverT<TopicHelper>
31 add(
"parent", &TopicHelper::getService);
32 add(
"id", &TopicHelper::getId);
33 add(
"topic", &TopicHelper::getId);
34 add(
"service", &TopicHelper::getService);
38 static Attributes attributes;
40 TopicHelper(
const string& service,
const string& name) : _service(service), _name(name)
45 operator()(
const string& attribute)
const
47 return attributes(
this, attribute);
63 const string& _service;
67 TopicHelper::Attributes TopicHelper::attributes;
69 class SubscriberHelper :
public MetricsHelperT<SubscriberMetrics>
72 class Attributes :
public AttributeResolverT<SubscriberHelper>
77 add(
"parent", &SubscriberHelper::getTopic);
78 add(
"id", &SubscriberHelper::getId);
79 add(
"topic", &SubscriberHelper::getTopic);
80 add(
"service", &SubscriberHelper::getService);
82 add(
"identity", &SubscriberHelper::getIdentity);
83 add(
"facet", &SubscriberHelper::getProxy, &IceProxy::Ice::Object::ice_getFacet);
85 &SubscriberHelper::getProxy,
86 &IceProxy::Ice::Object::ice_getEncodingVersion);
87 add(
"mode", &SubscriberHelper::getMode);
88 add(
"proxy", &SubscriberHelper::getProxy);
89 add(
"link", &SubscriberHelper::_link);
90 add(
"state", &SubscriberHelper::getState);
92 setDefault(&SubscriberHelper::resolve);
96 static Attributes attributes;
98 SubscriberHelper(
const string& svc,
100 const ::Ice::ObjectPrx& proxy,
101 const IceStorm::QoS& qos,
104 _service(svc), _topic(
topic), _proxy(proxy), _qos(qos), _link(link), _state(state)
109 operator()(
const string& attribute)
const
111 return attributes(
this, attribute);
115 resolve(
const string& attribute)
const
117 if (attribute.compare(0, 4,
"qos.") == 0)
119 IceStorm::QoS::const_iterator p = _qos.find(attribute.substr(4));
129 throw invalid_argument(attribute);
147 if (_proxy->ice_isTwoway())
151 else if (_proxy->ice_isOneway())
155 else if (_proxy->ice_isBatchOneway())
157 return "batch-oneway";
159 else if (_proxy->ice_isDatagram())
163 else if (_proxy->ice_isBatchDatagram())
165 return "batch-datagram";
180 _id = _proxy->ice_toString();
182 catch (const ::Ice::FixedProxyException&)
185 _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
191 const ::Ice::ObjectPrx&
217 return _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
221 const string& _service;
222 const string& _topic;
223 const ::Ice::ObjectPrx& _proxy;
224 const IceStorm::QoS& _qos;
230 SubscriberHelper::Attributes SubscriberHelper::attributes;
237 forEach(inc(&TopicMetrics::published));
243 forEach(inc(&TopicMetrics::forwarded));
251 QueuedUpdate(
int count) : count(count)
256 operator()(
const SubscriberMetricsPtr& v)
269 forEach(QueuedUpdate(count));
275 struct OutstandingUpdate
277 OutstandingUpdate(
int count) : count(count)
282 operator()(
const SubscriberMetricsPtr& v)
288 v->outstanding += count;
299 forEach(OutstandingUpdate(count));
305 struct DeliveredUpdate
307 DeliveredUpdate(
int count) : count(count)
312 operator()(
const SubscriberMetricsPtr& v)
314 if (v->outstanding > 0)
316 v->outstanding -= count;
318 v->delivered += count;
329 forEach(DeliveredUpdate(count));
333 _metrics(metrics), _topics(metrics,
"Topic"), _subscribers(metrics,
"Subscriber")
349 if (_topics.isEnabled())
353 return _topics.getObserver(TopicHelper(service, topic), old);
355 catch (
const exception& ex)
357 ::Ice::Error error(_metrics->getLogger());
358 error <<
"unexpected exception trying to obtain observer:\n" << ex;
367 const ::Ice::ObjectPrx& proxy,
368 const IceStorm::QoS& qos,
373 if (_subscribers.isEnabled())
377 return _subscribers.getObserver(SubscriberHelper(svc, topic, proxy, qos, link, state),
380 catch (
const exception& ex)
382 ::Ice::Error error(_metrics->getLogger());
383 error <<
"unexpected exception trying to obtain observer:\n" << ex;
virtual void updateTopicObservers()=0
virtual void updateSubscriberObservers()=0
virtual void outstanding(int)
Notification of a some events being sent.
virtual void delivered(int)
Notification of some events being delivered.
virtual void queued(int)
Notification of some events being queued.
TopicManagerObserverI(const IceInternal::MetricsAdminIPtr &)
virtual IceStorm::Instrumentation::SubscriberObserverPtr getSubscriberObserver(const std::string &, const std::string &, const Ice::ObjectPrx &, const IceStorm::QoS &, const IceStorm::TopicPrx &, IceStorm::Instrumentation::SubscriberState, const IceStorm::Instrumentation::SubscriberObserverPtr &)
virtual IceStorm::Instrumentation::TopicObserverPtr getTopicObserver(const std::string &, const std::string &, const IceStorm::Instrumentation::TopicObserverPtr &)
virtual void setObserverUpdater(const IceStorm::Instrumentation::ObserverUpdaterPtr &)
::IceInternal::Handle<::IceStorm::Instrumentation::ObserverUpdater > ObserverUpdaterPtr
::IceInternal::Handle<::IceStorm::Instrumentation::SubscriberObserver > SubscriberObserverPtr
@ SubscriberStateOnline
Online waiting to send events.
@ SubscriberStateOffline
Offline, retrying.
@ SubscriberStateError
Error state, awaiting to be destroyed.
::IceInternal::Handle<::IceStorm::Instrumentation::TopicObserver > TopicObserverPtr
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
double v(double t, double v0, double a0, double j)