15 #include <Ice/LoggerUtil.h>
16 #include <IceUtil/StringUtil.h>
29 class PerSubscriberPublisherI :
public Ice::BlobjectArray
33 PerSubscriberPublisherI(
const InstancePtr& instance) :
41 _subscriber = subscriber;
45 ice_invoke(
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
47 const Ice::Current& current)
62 Ice::ByteSeq
data(inParams.first, inParams.second);
63 event->data.swap(
data);
67 _subscriber->queue(
false, e);
88 case Subscriber::SubscriberStateReaped:
108 virtual void flush();
110 void exception(
const Ice::Exception& ex)
120 const Ice::ObjectPrx _obj;
131 virtual void flush();
133 void exception(
const Ice::Exception& ex)
141 const Ice::ObjectPrx _obj;
150 const Ice::ObjectPrx&);
152 virtual void flush();
156 const Ice::ObjectPrx _obj;
165 virtual void flush();
172 class FlushTimerTask :
public IceUtil::TimerTask
176 FlushTimerTask(
const SubscriberBatchPtr& subscriber) :
177 _subscriber(subscriber)
184 _subscriber->doFlush();
189 const SubscriberBatchPtr _subscriber;
194 SubscriberBatch::SubscriberBatch(
197 const Ice::ObjectPrx& proxy,
199 const Ice::ObjectPrx& obj) :
200 Subscriber(instance, rec, proxy, retryCount, 1),
202 _interval(instance->flushInterval())
204 assert(retryCount == 0);
210 if (_outstanding == 0)
213 _instance->batchFlusher()->schedule(
new FlushTimerTask(
this), _interval);
218 SubscriberBatch::doFlush()
220 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
236 _outstandingCount =
static_cast<Ice::Int>(
v.size());
237 _observer->outstanding(_outstandingCount);
242 vector<Ice::Byte> dummy;
243 for (EventDataSeq::const_iterator p =
v.begin(); p !=
v.end(); ++p)
245 _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
248 Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests(
249 Ice::newCallback_Object_ice_flushBatchRequests(
this,
250 &SubscriberBatch::exception,
251 &SubscriberBatch::sent));
252 if (result->sentSynchronously())
255 assert(_outstanding == 0);
258 _observer->delivered(_outstandingCount);
262 catch (
const Ice::Exception& ex)
268 if (_events.empty() && _outstanding == 0 && _shutdown)
280 SubscriberBatch::sent(
bool sentSynchronously)
282 if (sentSynchronously)
287 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
291 assert(_outstanding == 0);
294 _observer->delivered(_outstandingCount);
297 if (_events.empty() && _outstanding == 0 && _shutdown)
301 else if (!_events.empty())
308 SubscriberOneway::SubscriberOneway(
311 const Ice::ObjectPrx& proxy,
313 const Ice::ObjectPrx& obj) :
314 Subscriber(instance, rec, proxy, retryCount, 5),
317 assert(retryCount == 0);
323 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
334 while (_outstanding < _maxOutstanding && !_events.empty())
341 _events.erase(_events.begin());
344 _observer->outstanding(1);
349 Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
350 e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(
this,
351 &SubscriberOneway::exception,
352 &SubscriberOneway::sent));
353 if (!result->sentSynchronously())
359 _observer->delivered(1);
362 catch (
const Ice::Exception& ex)
369 if (_events.empty() && _outstanding == 0 && _shutdown)
376 SubscriberOneway::sent(
bool sentSynchronously)
378 if (sentSynchronously)
383 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
387 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
390 _observer->delivered(1);
393 if (_events.empty() && _outstanding == 0 && _shutdown)
397 else if (_outstanding <= 0 && !_events.empty())
403 SubscriberTwoway::SubscriberTwoway(
406 const Ice::ObjectPrx& proxy,
409 const Ice::ObjectPrx& obj) :
410 Subscriber(instance, rec, proxy, retryCount, maxOutstanding),
418 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
429 while (_outstanding < _maxOutstanding && !_events.empty())
436 _events.erase(_events.begin());
440 _observer->outstanding(1);
445 _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context,
446 Ice::newCallback(
static_cast<Subscriber*
>(
this), &Subscriber::completed));
448 catch (
const Ice::Exception& ex)
459 SubscriberLink::SubscriberLink(
463 _obj(
TopicLinkPrx::uncheckedCast(rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout())))
470 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
480 EventDataSeq::iterator p =
v.begin();
486 Ice::Context::const_iterator
q = (*p)->context.find(
"cost");
487 if (
q != (*p)->context.end())
489 cost = atoi(
q->second.c_str());
491 if (cost > _rec.cost)
507 _outstandingCount =
static_cast<Ice::Int>(
v.size());
508 _observer->outstanding(_outstandingCount);
510 _obj->begin_forward(
v, Ice::newCallback(
static_cast<Subscriber*
>(
this), &Subscriber::completed));
512 catch (
const Ice::Exception& ex)
528 return new SubscriberLink(instance, rec);
532 PerSubscriberPublisherIPtr per =
new PerSubscriberPublisherI(instance);
534 perId.category = instance->instanceName();
535 perId.name =
"topic." + rec.
topicName +
".publish." +
536 instance->communicator()->identityToString(rec.
obj->ice_getIdentity());
537 Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId);
544 QoS::const_iterator p = rec.
theQoS.find(
"retryCount");
545 if (p != rec.
theQoS.end())
547 retryCount = atoi(p->second.c_str());
551 p = rec.
theQoS.find(
"reliability");
552 if (p != rec.
theQoS.end())
554 reliability = p->second;
556 if (!reliability.empty() && reliability !=
"ordered")
558 throw BadQoS(
"invalid reliability: " + reliability);
564 Ice::ObjectPrx newObj;
567 newObj = rec.
obj->ice_timeout(instance->sendTimeout());
569 catch (
const Ice::FixedProxyException&)
579 p = rec.
theQoS.find(
"locatorCacheTimeout");
580 if (p != rec.
theQoS.end())
582 istringstream is(IceUtilInternal::trim(p->second));
583 int locatorCacheTimeout;
584 if (!(is >> locatorCacheTimeout) || !is.eof())
586 throw BadQoS(
"invalid locator cache timeout (numeric value required): " + p->second);
588 newObj = newObj->ice_locatorCacheTimeout(locatorCacheTimeout);
591 p = rec.
theQoS.find(
"connectionCached");
592 if (p != rec.
theQoS.end())
594 istringstream is(IceUtilInternal::trim(p->second));
595 int connectionCached;
596 if (!(is >> connectionCached) || !is.eof())
598 throw BadQoS(
"invalid connection cached setting (numeric value required): " + p->second);
600 newObj = newObj->ice_connectionCached(connectionCached > 0);
603 if (reliability ==
"ordered")
605 if (!newObj->ice_isTwoway())
607 throw BadQoS(
"ordered reliability requires a twoway proxy");
609 subscriber =
new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj);
611 else if (newObj->ice_isOneway() || newObj->ice_isDatagram())
615 throw BadQoS(
"non-zero retryCount QoS requires a twoway proxy");
617 subscriber =
new SubscriberOneway(instance, rec, proxy, retryCount, newObj);
619 else if (newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
623 throw BadQoS(
"non-zero retryCount QoS requires a twoway proxy");
625 subscriber =
new SubscriberBatch(instance, rec, proxy, retryCount, newObj);
629 assert(newObj->ice_isTwoway());
630 subscriber =
new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj);
632 per->setSubscriber(subscriber);
634 catch (
const Ice::Exception&)
636 instance->publishAdapter()->remove(proxy->ice_getIdentity());
645 Subscriber::proxy()
const
647 return _proxyReplica;
651 Subscriber::id()
const
657 Subscriber::record()
const
665 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
670 if (forwarded && _rec.link)
679 if (IceUtil::Time::now(IceUtil::Time::Monotonic) < _next)
694 for (EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
696 if (
static_cast<int>(_events.size()) == _instance->sendQueueSizeMax())
698 if (_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber)
708 _events.push_back(*p);
713 _observer->queued(
static_cast<Ice::Int>(events.size()));
721 case SubscriberStateReaped:
731 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
735 setState(SubscriberStateReaped);
742 Subscriber::resetIfReaped()
744 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
745 if (_state == SubscriberStateReaped)
752 Subscriber::errored()
const
754 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
759 Subscriber::destroy()
768 _instance->publishAdapter()->remove(_proxy->ice_getIdentity());
770 catch (
const Ice::NotRegisteredException&)
774 catch (
const Ice::ObjectAdapterDeactivatedException&)
780 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
785 Subscriber::error(
bool dec,
const Ice::Exception& e)
787 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
793 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
811 bool hardError =
dynamic_cast<const Ice::ObjectNotExistException*
>(&e) ||
812 dynamic_cast<const Ice::NotRegisteredException*
>(&e) ||
821 IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
832 if (!hardError && (_retryCount == -1 || _currentRetry < _retryCount))
837 if (_currentRetry == 0)
839 Ice::Warning warn(traceLevels->logger);
840 warn << traceLevels->subscriberCat <<
":" << _instance->communicator()->identityToString(_rec.id);
841 if (traceLevels->subscriber > 1)
845 warn <<
" subscriber offline: " << e
846 <<
" discarding events: " << _instance->discardInterval() <<
"s retryCount: " << _retryCount;
850 if (traceLevels->subscriber > 0)
852 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
853 out << _instance->communicator()->identityToString(_rec.id);
854 if (traceLevels->subscriber > 1)
858 out <<
" subscriber offline: " << e
859 <<
" discarding events: " << _instance->discardInterval() <<
"s retry: "
860 << _currentRetry <<
"/" << _retryCount;
866 _next = now + _instance->discardInterval();
878 if (traceLevels->subscriber > 0)
880 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
881 out << _instance->communicator()->identityToString(_rec.id);
882 if (traceLevels->subscriber > 1)
886 out <<
" subscriber errored out: " << e
887 <<
" retry: " << _currentRetry <<
"/" << _retryCount;
891 if (_shutdown && _events.empty())
898 Subscriber::completed(
const Ice::AsyncResultPtr& result)
902 result->throwLocalException();
904 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
908 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
911 _observer->delivered(_outstandingCount);
920 if (_events.empty() && _outstanding == 0 && _shutdown)
929 catch (
const Ice::LocalException& ex)
936 Subscriber::shutdown()
938 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
941 while (_outstanding > 0 && !_events.empty())
950 Subscriber::updateObserver()
952 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
953 if (_instance->observer())
955 _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
960 toSubscriberState(_state),
965 Subscriber::Subscriber(
968 const Ice::ObjectPrx& proxy,
970 int maxOutstanding) :
973 _retryCount(retryCount),
974 _maxOutstanding(maxOutstanding),
976 _proxyReplica(proxy),
980 _outstandingCount(1),
986 _instance->publisherReplicaProxy()->ice_identity(
_proxy->ice_getIdentity());
996 toSubscriberState(
_state),
1030 if (traceLevels->subscriber > 1)
1032 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
1034 <<
" transition from: " << stateToString(
_state) <<
" to: " << stateToString(state);
1045 toSubscriberState(
_state),
1054 return subscriber->id() == id;