12 #include <Ice/LoggerUtil.h>
18 #include <IceUtil/StringUtil.h>
30 class PerSubscriberPublisherI :
public Ice::BlobjectArray
33 PerSubscriberPublisherI(
const InstancePtr& instance) : _instance(instance)
40 _subscriber = subscriber;
44 ice_invoke(
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
46 const Ice::Current& current)
52 new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
58 Ice::ByteSeq
data(inParams.first, inParams.second);
59 event->data.swap(
data);
63 _subscriber->queue(
false, e);
84 case Subscriber::SubscriberStateReaped:
103 const Ice::ObjectPrx&,
105 const Ice::ObjectPrx&);
107 virtual void flush();
110 exception(
const Ice::Exception& ex)
119 const Ice::ObjectPrx _obj;
130 const Ice::ObjectPrx&,
132 const Ice::ObjectPrx&);
134 virtual void flush();
137 exception(
const Ice::Exception& ex)
145 const Ice::ObjectPrx _obj;
155 const Ice::ObjectPrx&,
158 const Ice::ObjectPrx&);
160 virtual void flush();
163 const Ice::ObjectPrx _obj;
171 virtual void flush();
177 class FlushTimerTask :
public IceUtil::TimerTask
180 FlushTimerTask(
const SubscriberBatchPtr& subscriber) : _subscriber(subscriber)
187 _subscriber->doFlush();
191 const SubscriberBatchPtr _subscriber;
196 SubscriberBatch::SubscriberBatch(
const InstancePtr& instance,
198 const Ice::ObjectPrx& proxy,
200 const Ice::ObjectPrx& obj) :
201 Subscriber(instance, rec, proxy, retryCount, 1), _obj(obj), _interval(instance->flushInterval())
203 assert(retryCount == 0);
209 if (_outstanding == 0)
212 _instance->batchFlusher()->schedule(
new FlushTimerTask(
this), _interval);
217 SubscriberBatch::doFlush()
219 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
235 _outstandingCount =
static_cast<Ice::Int>(
v.size());
236 _observer->outstanding(_outstandingCount);
241 vector<Ice::Byte> dummy;
242 for (EventDataSeq::const_iterator p =
v.begin(); p !=
v.end(); ++p)
244 _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
247 Ice::AsyncResultPtr result =
248 _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(
249 this, &SubscriberBatch::exception, &SubscriberBatch::sent));
250 if (result->sentSynchronously())
253 assert(_outstanding == 0);
256 _observer->delivered(_outstandingCount);
260 catch (
const Ice::Exception& ex)
266 if (_events.empty() && _outstanding == 0 && _shutdown)
278 SubscriberBatch::sent(
bool sentSynchronously)
280 if (sentSynchronously)
285 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
289 assert(_outstanding == 0);
292 _observer->delivered(_outstandingCount);
295 if (_events.empty() && _outstanding == 0 && _shutdown)
299 else if (!_events.empty())
305 SubscriberOneway::SubscriberOneway(
const InstancePtr& instance,
307 const Ice::ObjectPrx& proxy,
309 const Ice::ObjectPrx& obj) :
310 Subscriber(instance, rec, proxy, retryCount, 5), _obj(obj)
312 assert(retryCount == 0);
318 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
329 while (_outstanding < _maxOutstanding && !_events.empty())
336 _events.erase(_events.begin());
339 _observer->outstanding(1);
344 Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
349 Ice::newCallback_Object_ice_invoke(
350 this, &SubscriberOneway::exception, &SubscriberOneway::sent));
351 if (!result->sentSynchronously())
357 _observer->delivered(1);
360 catch (
const Ice::Exception& ex)
367 if (_events.empty() && _outstanding == 0 && _shutdown)
374 SubscriberOneway::sent(
bool sentSynchronously)
376 if (sentSynchronously)
381 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
385 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
388 _observer->delivered(1);
391 if (_events.empty() && _outstanding == 0 && _shutdown)
395 else if (_outstanding <= 0 && !_events.empty())
401 SubscriberTwoway::SubscriberTwoway(
const InstancePtr& instance,
403 const Ice::ObjectPrx& proxy,
406 const Ice::ObjectPrx& obj) :
407 Subscriber(instance, rec, proxy, retryCount, maxOutstanding), _obj(obj)
414 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
425 while (_outstanding < _maxOutstanding && !_events.empty())
432 _events.erase(_events.begin());
436 _observer->outstanding(1);
441 _obj->begin_ice_invoke(
446 Ice::newCallback(
static_cast<Subscriber*
>(
this), &Subscriber::completed));
448 catch (
const Ice::Exception& ex)
462 rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout())))
469 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
479 EventDataSeq::iterator p =
v.begin();
485 Ice::Context::const_iterator
q = (*p)->context.find(
"cost");
486 if (
q != (*p)->context.end())
488 cost = atoi(
q->second.c_str());
490 if (cost > _rec.cost)
506 _outstandingCount =
static_cast<Ice::Int>(
v.size());
507 _observer->outstanding(_outstandingCount);
510 v, Ice::newCallback(
static_cast<Subscriber*
>(
this), &Subscriber::completed));
512 catch (
const Ice::Exception& ex)
526 return new SubscriberLink(instance, rec);
530 PerSubscriberPublisherIPtr per =
new PerSubscriberPublisherI(instance);
532 perId.category = instance->instanceName();
533 perId.name =
"topic." + rec.
topicName +
".publish." +
534 instance->communicator()->identityToString(rec.
obj->ice_getIdentity());
535 Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId);
542 QoS::const_iterator p = rec.
theQoS.find(
"retryCount");
543 if (p != rec.
theQoS.end())
545 retryCount = atoi(p->second.c_str());
549 p = rec.
theQoS.find(
"reliability");
550 if (p != rec.
theQoS.end())
552 reliability = p->second;
554 if (!reliability.empty() && reliability !=
"ordered")
556 throw BadQoS(
"invalid reliability: " + reliability);
562 Ice::ObjectPrx newObj;
565 newObj = rec.
obj->ice_timeout(instance->sendTimeout());
567 catch (
const Ice::FixedProxyException&)
577 p = rec.
theQoS.find(
"locatorCacheTimeout");
578 if (p != rec.
theQoS.end())
580 istringstream is(IceUtilInternal::trim(p->second));
581 int locatorCacheTimeout;
582 if (!(is >> locatorCacheTimeout) || !is.eof())
584 throw BadQoS(
"invalid locator cache timeout (numeric value required): " +
587 newObj = newObj->ice_locatorCacheTimeout(locatorCacheTimeout);
590 p = rec.
theQoS.find(
"connectionCached");
591 if (p != rec.
theQoS.end())
593 istringstream is(IceUtilInternal::trim(p->second));
594 int connectionCached;
595 if (!(is >> connectionCached) || !is.eof())
597 throw BadQoS(
"invalid connection cached setting (numeric value required): " +
600 newObj = newObj->ice_connectionCached(connectionCached > 0);
603 if (reliability ==
"ordered")
605 if (!newObj->ice_isTwoway())
607 throw BadQoS(
"ordered reliability requires a twoway proxy");
609 subscriber =
new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj);
611 else if (newObj->ice_isOneway() || newObj->ice_isDatagram())
615 throw BadQoS(
"non-zero retryCount QoS requires a twoway proxy");
617 subscriber =
new SubscriberOneway(instance, rec, proxy, retryCount, newObj);
619 else if (newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
623 throw BadQoS(
"non-zero retryCount QoS requires a twoway proxy");
625 subscriber =
new SubscriberBatch(instance, rec, proxy, retryCount, newObj);
629 assert(newObj->ice_isTwoway());
630 subscriber =
new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj);
632 per->setSubscriber(subscriber);
634 catch (
const Ice::Exception&)
636 instance->publishAdapter()->remove(proxy->ice_getIdentity());
645 Subscriber::proxy()
const
647 return _proxyReplica;
651 Subscriber::id()
const
657 Subscriber::record()
const
665 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
670 if (forwarded && _rec.link)
679 if (IceUtil::Time::now(IceUtil::Time::Monotonic) < _next)
694 for (EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
696 if (
static_cast<int>(_events.size()) == _instance->sendQueueSizeMax())
698 if (_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber)
708 _events.push_back(*p);
713 _observer->queued(
static_cast<Ice::Int>(events.size()));
721 case SubscriberStateReaped:
731 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
735 setState(SubscriberStateReaped);
742 Subscriber::resetIfReaped()
744 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
745 if (_state == SubscriberStateReaped)
752 Subscriber::errored()
const
754 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
759 Subscriber::destroy()
768 _instance->publishAdapter()->remove(_proxy->ice_getIdentity());
770 catch (
const Ice::NotRegisteredException&)
774 catch (
const Ice::ObjectAdapterDeactivatedException&)
780 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
785 Subscriber::error(
bool dec,
const Ice::Exception& e)
787 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
793 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
811 bool hardError =
dynamic_cast<const Ice::ObjectNotExistException*
>(&e) ||
812 dynamic_cast<const Ice::NotRegisteredException*
>(&e) ||
821 IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
832 if (!hardError && (_retryCount == -1 || _currentRetry < _retryCount))
837 if (_currentRetry == 0)
839 Ice::Warning warn(traceLevels->logger);
840 warn << traceLevels->subscriberCat <<
":"
841 << _instance->communicator()->identityToString(_rec.id);
842 if (traceLevels->subscriber > 1)
846 warn <<
" subscriber offline: " << e
847 <<
" discarding events: " << _instance->discardInterval()
848 <<
"s retryCount: " << _retryCount;
852 if (traceLevels->subscriber > 0)
854 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
855 out << _instance->communicator()->identityToString(_rec.id);
856 if (traceLevels->subscriber > 1)
860 out <<
" subscriber offline: " << e
861 <<
" discarding events: " << _instance->discardInterval()
862 <<
"s retry: " << _currentRetry <<
"/" << _retryCount;
868 _next = now + _instance->discardInterval();
880 if (traceLevels->subscriber > 0)
882 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
883 out << _instance->communicator()->identityToString(_rec.id);
884 if (traceLevels->subscriber > 1)
888 out <<
" subscriber errored out: " << e <<
" retry: " << _currentRetry <<
"/"
893 if (_shutdown && _events.empty())
900 Subscriber::completed(
const Ice::AsyncResultPtr& result)
904 result->throwLocalException();
906 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
910 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
913 _observer->delivered(_outstandingCount);
922 if (_events.empty() && _outstanding == 0 && _shutdown)
931 catch (
const Ice::LocalException& ex)
938 Subscriber::shutdown()
940 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
943 while (_outstanding > 0 && !_events.empty())
952 Subscriber::updateObserver()
954 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
955 if (_instance->observer())
957 _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
962 toSubscriberState(_state),
969 const Ice::ObjectPrx& proxy,
971 int maxOutstanding) :
974 _retryCount(retryCount),
975 _maxOutstanding(maxOutstanding),
977 _proxyReplica(proxy),
981 _outstandingCount(1),
987 _instance->publisherReplicaProxy()->ice_identity(
_proxy->ice_getIdentity());
997 toSubscriberState(
_state),
1031 if (traceLevels->subscriber > 1)
1033 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
1035 <<
" transition from: " << stateToString(
_state) <<
" to: " << stateToString(state);
1046 toSubscriberState(
_state),
1055 return subscriber->id() == id;