12 #include <Ice/LocalException.h>
13 #include <Ice/Communicator.h>
14 #include <Ice/LoggerUtil.h>
19 using namespace IceMX;
24 class TopicHelper :
public MetricsHelperT<TopicMetrics>
28 class Attributes :
public AttributeResolverT<TopicHelper>
34 add(
"parent", &TopicHelper::getService);
35 add(
"id", &TopicHelper::getId);
36 add(
"topic", &TopicHelper::getId);
37 add(
"service", &TopicHelper::getService);
40 static Attributes attributes;
42 TopicHelper(
const string& service,
const string& name) : _service(service), _name(name)
46 virtual string operator()(
const string& attribute)
const
48 return attributes(
this, attribute);
51 const string& getService()
const
56 const string& getId()
const
63 const string& _service;
67 TopicHelper::Attributes TopicHelper::attributes;
69 class SubscriberHelper :
public MetricsHelperT<SubscriberMetrics>
73 class Attributes :
public AttributeResolverT<SubscriberHelper>
79 add(
"parent", &SubscriberHelper::getTopic);
80 add(
"id", &SubscriberHelper::getId);
81 add(
"topic", &SubscriberHelper::getTopic);
82 add(
"service", &SubscriberHelper::getService);
84 add(
"identity", &SubscriberHelper::getIdentity);
85 add(
"facet", &SubscriberHelper::getProxy, &IceProxy::Ice::Object::ice_getFacet);
86 add(
"encoding", &SubscriberHelper::getProxy, &IceProxy::Ice::Object::ice_getEncodingVersion);
87 add(
"mode", &SubscriberHelper::getMode);
88 add(
"proxy", &SubscriberHelper::getProxy);
89 add(
"link", &SubscriberHelper::_link);
90 add(
"state", &SubscriberHelper::getState);
92 setDefault(&SubscriberHelper::resolve);
95 static Attributes attributes;
97 SubscriberHelper(
const string& svc,
const string& topic, const ::Ice::ObjectPrx& proxy,
const IceStorm::QoS& qos,
99 _service(svc), _topic(
topic), _proxy(proxy), _qos(qos), _link(link), _state(state)
103 virtual string operator()(
const string& attribute)
const
105 return attributes(
this, attribute);
108 string resolve(
const string& attribute)
const
110 if (attribute.compare(0, 4,
"qos.") == 0)
112 IceStorm::QoS::const_iterator p = _qos.find(attribute.substr(4));
122 throw invalid_argument(attribute);
140 if (_proxy->ice_isTwoway())
144 else if (_proxy->ice_isOneway())
148 else if (_proxy->ice_isBatchOneway())
150 return "batch-oneway";
152 else if (_proxy->ice_isDatagram())
156 else if (_proxy->ice_isBatchDatagram())
158 return "batch-datagram";
173 _id = _proxy->ice_toString();
175 catch (const ::Ice::FixedProxyException&)
177 _id = _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
183 const ::Ice::ObjectPrx&
209 return _proxy->ice_getCommunicator()->identityToString(_proxy->ice_getIdentity());
214 const string& _service;
215 const string& _topic;
216 const ::Ice::ObjectPrx& _proxy;
217 const IceStorm::QoS& _qos;
223 SubscriberHelper::Attributes SubscriberHelper::attributes;
228 TopicObserverI::published()
230 forEach(inc(&TopicMetrics::published));
234 TopicObserverI::forwarded()
236 forEach(inc(&TopicMetrics::forwarded));
244 QueuedUpdate(
int count) : count(count)
248 void operator()(
const SubscriberMetricsPtr&
v)
258 SubscriberObserverI::queued(
int count)
260 forEach(QueuedUpdate(count));
266 struct OutstandingUpdate
268 OutstandingUpdate(
int count) : count(count)
272 void operator()(
const SubscriberMetricsPtr&
v)
278 v->outstanding += count;
287 SubscriberObserverI::outstanding(
int count)
289 forEach(OutstandingUpdate(count));
295 struct DeliveredUpdate
297 DeliveredUpdate(
int count) : count(count)
301 void operator()(
const SubscriberMetricsPtr&
v)
303 if (
v->outstanding > 0)
305 v->outstanding -= count;
307 v->delivered += count;
316 SubscriberObserverI::delivered(
int count)
318 forEach(DeliveredUpdate(count));
321 TopicManagerObserverI::TopicManagerObserverI(
const IceInternal::MetricsAdminIPtr& metrics) :
323 _topics(metrics,
"Topic"),
324 _subscribers(metrics,
"Subscriber")
331 _topics.setUpdater(newUpdater(updater, &ObserverUpdater::updateTopicObservers));
332 _subscribers.setUpdater(newUpdater(updater, &ObserverUpdater::updateSubscriberObservers));
338 if (_topics.isEnabled())
342 return _topics.getObserver(TopicHelper(service, topic), old);
344 catch (
const exception& ex)
346 ::Ice::Error error(_metrics->getLogger());
347 error <<
"unexpected exception trying to obtain observer:\n" << ex;
356 const ::Ice::ObjectPrx& proxy,
357 const IceStorm::QoS& qos,
362 if (_subscribers.isEnabled())
366 return _subscribers.getObserver(SubscriberHelper(svc, topic, proxy, qos, link, state), old);
368 catch (
const exception& ex)
370 ::Ice::Error error(_metrics->getLogger());
371 error <<
"unexpected exception trying to obtain observer:\n" << ex;