10 #include <Ice/Communicator.h>
11 #include <Ice/LocalException.h>
12 #include <Ice/LoggerUtil.h>
18 using namespace IceMX;
23 class TopicHelper :
public MetricsHelperT<TopicMetrics>
26 class Attributes :
public AttributeResolverT<TopicHelper>
31 add(
"parent", &TopicHelper::getService);
32 add(
"id", &TopicHelper::getId);
33 add(
"topic", &TopicHelper::getId);
34 add(
"service", &TopicHelper::getService);
38 static Attributes attributes;
40 TopicHelper(
const string& service,
const string& name) : _service(service), _name(name)
45 operator()(
const string& attribute)
const
47 return attributes(
this, attribute);
63 const string& _service;
67 TopicHelper::Attributes TopicHelper::attributes;
69 class SubscriberHelper :
public MetricsHelperT<SubscriberMetrics>
72 class Attributes :
public AttributeResolverT<SubscriberHelper>
77 add(
"parent", &SubscriberHelper::getTopic);
78 add(
"id", &SubscriberHelper::getId);
79 add(
"topic", &SubscriberHelper::getTopic);
80 add(
"service", &SubscriberHelper::getService);
82 add(
"identity", &SubscriberHelper::getIdentity);
83 add(
"facet", &SubscriberHelper::getProxy, &IceProxy::Ice::Object::ice_getFacet);
85 &SubscriberHelper::getProxy,
86 &IceProxy::Ice::Object::ice_getEncodingVersion);
87 add(
"mode", &SubscriberHelper::getMode);
88 add(
"proxy", &SubscriberHelper::getProxy);
89 add(
"link", &SubscriberHelper::_link);
90 add(
"state", &SubscriberHelper::getState);
92 setDefault(&SubscriberHelper::resolve);
96 static Attributes attributes;
98 SubscriberHelper(
const string& svc,
100 const ::Ice::ObjectPrx& proxy,
101 const IceStorm::QoS& qos,
104 _service(svc), _topic(
topic), _proxy(proxy), _qos(qos), _link(link), _state(state)
109 operator()(
const string& attribute)
const
111 return attributes(
this, attribute);
115 resolve(
const string& attribute)
const
117 if (attribute.compare(0, 4,
"qos.") == 0)
119 IceStorm::QoS::const_iterator p = _qos.find(attribute.substr(4));
129 throw invalid_argument(attribute);
147 if (_proxy->ice_isTwoway())
151 else if (_proxy->ice_isOneway())
155 else if (_proxy->ice_isBatchOneway())
157 return "batch-oneway";
159 else if (_proxy->ice_isDatagram())
163 else if (_proxy->ice_isBatchDatagram())
165 return "batch-datagram";
180 _id = _proxy->ice_toString();
182 catch (const ::Ice::FixedProxyException&)
185 _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
191 const ::Ice::ObjectPrx&
217 return _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
221 const string& _service;
222 const string& _topic;
223 const ::Ice::ObjectPrx& _proxy;
224 const IceStorm::QoS& _qos;
230 SubscriberHelper::Attributes SubscriberHelper::attributes;
235 TopicObserverI::published()
237 forEach(inc(&TopicMetrics::published));
241 TopicObserverI::forwarded()
243 forEach(inc(&TopicMetrics::forwarded));
251 QueuedUpdate(
int count) : count(count)
256 operator()(
const SubscriberMetricsPtr&
v)
267 SubscriberObserverI::queued(
int count)
269 forEach(QueuedUpdate(count));
275 struct OutstandingUpdate
277 OutstandingUpdate(
int count) : count(count)
282 operator()(
const SubscriberMetricsPtr&
v)
288 v->outstanding += count;
297 SubscriberObserverI::outstanding(
int count)
299 forEach(OutstandingUpdate(count));
305 struct DeliveredUpdate
307 DeliveredUpdate(
int count) : count(count)
312 operator()(
const SubscriberMetricsPtr&
v)
314 if (
v->outstanding > 0)
316 v->outstanding -= count;
318 v->delivered += count;
327 SubscriberObserverI::delivered(
int count)
329 forEach(DeliveredUpdate(count));
332 TopicManagerObserverI::TopicManagerObserverI(
const IceInternal::MetricsAdminIPtr& metrics) :
333 _metrics(metrics), _topics(metrics,
"Topic"), _subscribers(metrics,
"Subscriber")
340 _topics.setUpdater(newUpdater(updater, &ObserverUpdater::updateTopicObservers));
341 _subscribers.setUpdater(newUpdater(updater, &ObserverUpdater::updateSubscriberObservers));
349 if (_topics.isEnabled())
353 return _topics.getObserver(TopicHelper(service, topic), old);
355 catch (
const exception& ex)
357 ::Ice::Error error(_metrics->getLogger());
358 error <<
"unexpected exception trying to obtain observer:\n" << ex;
367 const ::Ice::ObjectPrx& proxy,
368 const IceStorm::QoS& qos,
373 if (_subscribers.isEnabled())
377 return _subscribers.getObserver(SubscriberHelper(svc, topic, proxy, qos, link, state),
380 catch (
const exception& ex)
382 ::Ice::Error error(_metrics->getLogger());
383 error <<
"unexpected exception trying to obtain observer:\n" << ex;