12#include <Ice/LoggerUtil.h>
18#include <IceUtil/StringUtil.h>
30 class PerSubscriberPublisherI :
public Ice::BlobjectArray
33 PerSubscriberPublisherI(
const InstancePtr& instance) : _instance(instance)
40 _subscriber = subscriber;
44 ice_invoke(
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
46 const Ice::Current& current)
49 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
52 new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
58 Ice::ByteSeq
data(inParams.first, inParams.second);
59 event->data.swap(
data);
63 _subscriber->queue(
false, e);
102 const SubscriberRecord&,
103 const Ice::ObjectPrx&,
105 const Ice::ObjectPrx&);
107 virtual void flush();
110 exception(
const Ice::Exception& ex)
119 const Ice::ObjectPrx _obj;
120 const IceUtil::Time _interval;
129 const SubscriberRecord&,
130 const Ice::ObjectPrx&,
132 const Ice::ObjectPrx&);
134 virtual void flush();
137 exception(
const Ice::Exception& ex)
145 const Ice::ObjectPrx _obj;
154 const SubscriberRecord&,
155 const Ice::ObjectPrx&,
158 const Ice::ObjectPrx&);
160 virtual void flush();
163 const Ice::ObjectPrx _obj;
169 SubscriberLink(
const InstancePtr&,
const SubscriberRecord&);
171 virtual void flush();
177 class FlushTimerTask :
public IceUtil::TimerTask
180 FlushTimerTask(
const SubscriberBatchPtr& subscriber) : _subscriber(subscriber)
187 _subscriber->doFlush();
191 const SubscriberBatchPtr _subscriber;
196SubscriberBatch::SubscriberBatch(
const InstancePtr& instance,
198 const Ice::ObjectPrx& proxy,
200 const Ice::ObjectPrx& obj) :
201 Subscriber(instance, rec, proxy, retryCount, 1), _obj(obj), _interval(instance->flushInterval())
203 assert(retryCount == 0);
207SubscriberBatch::flush()
209 if (_outstanding == 0)
212 _instance->batchFlusher()->schedule(
new FlushTimerTask(
this), _interval);
217SubscriberBatch::doFlush()
219 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
224 if (_state != SubscriberStateOnline)
235 _outstandingCount =
static_cast<Ice::Int
>(
v.size());
236 _observer->outstanding(_outstandingCount);
241 vector<Ice::Byte> dummy;
242 for (EventDataSeq::const_iterator p =
v.begin(); p !=
v.end(); ++p)
244 _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
247 Ice::AsyncResultPtr result =
248 _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(
249 this, &SubscriberBatch::exception, &SubscriberBatch::sent));
250 if (result->sentSynchronously())
253 assert(_outstanding == 0);
256 _observer->delivered(_outstandingCount);
260 catch (
const Ice::Exception& ex)
266 if (_events.empty() && _outstanding == 0 && _shutdown)
278SubscriberBatch::sent(
bool sentSynchronously)
280 if (sentSynchronously)
285 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
289 assert(_outstanding == 0);
292 _observer->delivered(_outstandingCount);
295 if (_events.empty() && _outstanding == 0 && _shutdown)
299 else if (!_events.empty())
305SubscriberOneway::SubscriberOneway(
const InstancePtr& instance,
306 const SubscriberRecord& rec,
307 const Ice::ObjectPrx& proxy,
309 const Ice::ObjectPrx& obj) :
310 Subscriber(instance, rec, proxy, retryCount, 5), _obj(obj)
312 assert(retryCount == 0);
316SubscriberOneway::flush()
318 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
323 if (_state != SubscriberStateOnline || _events.empty())
329 while (_outstanding < _maxOutstanding && !_events.empty())
336 _events.erase(_events.begin());
339 _observer->outstanding(1);
344 Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
349 Ice::newCallback_Object_ice_invoke(
350 this, &SubscriberOneway::exception, &SubscriberOneway::sent));
351 if (!result->sentSynchronously())
357 _observer->delivered(1);
360 catch (
const Ice::Exception& ex)
367 if (_events.empty() && _outstanding == 0 && _shutdown)
374SubscriberOneway::sent(
bool sentSynchronously)
376 if (sentSynchronously)
381 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
385 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
388 _observer->delivered(1);
391 if (_events.empty() && _outstanding == 0 && _shutdown)
395 else if (_outstanding <= 0 && !_events.empty())
401SubscriberTwoway::SubscriberTwoway(
const InstancePtr& instance,
402 const SubscriberRecord& rec,
403 const Ice::ObjectPrx& proxy,
406 const Ice::ObjectPrx& obj) :
407 Subscriber(instance, rec, proxy, retryCount, maxOutstanding), _obj(obj)
412SubscriberTwoway::flush()
414 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
419 if (_state != SubscriberStateOnline || _events.empty())
425 while (_outstanding < _maxOutstanding && !_events.empty())
432 _events.erase(_events.begin());
436 _observer->outstanding(1);
441 _obj->begin_ice_invoke(
448 catch (
const Ice::Exception& ex)
459 SubscriberLink::SubscriberLink(
const InstancePtr& instance,
const SubscriberRecord& rec) :
460 Subscriber(instance, rec, 0, -1, 1),
462 rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout())))
467 SubscriberLink::flush()
469 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
471 if (_state != SubscriberStateOnline || _outstanding > 0)
479 EventDataSeq::iterator p =
v.begin();
485 Ice::Context::const_iterator
q = (*p)->context.find(
"cost");
486 if (
q != (*p)->context.end())
488 cost = atoi(
q->second.c_str());
490 if (cost > _rec.cost)
506 _outstandingCount =
static_cast<Ice::Int
>(
v.size());
507 _observer->outstanding(_outstandingCount);
512 catch (
const Ice::Exception& ex)
526 return new SubscriberLink(instance, rec);
530 PerSubscriberPublisherIPtr per =
new PerSubscriberPublisherI(instance);
532 perId.category = instance->instanceName();
533 perId.name =
"topic." + rec.
topicName +
".publish." +
534 instance->communicator()->identityToString(rec.
obj->ice_getIdentity());
535 Ice::ObjectPrx
proxy = instance->publishAdapter()->add(per, perId);
542 QoS::const_iterator p = rec.
theQoS.find(
"retryCount");
543 if (p != rec.
theQoS.end())
545 retryCount = atoi(p->second.c_str());
549 p = rec.
theQoS.find(
"reliability");
550 if (p != rec.
theQoS.end())
552 reliability = p->second;
554 if (!reliability.empty() && reliability !=
"ordered")
556 throw BadQoS(
"invalid reliability: " + reliability);
562 Ice::ObjectPrx newObj;
565 newObj = rec.
obj->ice_timeout(instance->sendTimeout());
567 catch (
const Ice::FixedProxyException&)
577 p = rec.
theQoS.find(
"locatorCacheTimeout");
578 if (p != rec.
theQoS.end())
580 istringstream is(IceUtilInternal::trim(p->second));
581 int locatorCacheTimeout;
582 if (!(is >> locatorCacheTimeout) || !is.eof())
584 throw BadQoS(
"invalid locator cache timeout (numeric value required): " +
587 newObj = newObj->ice_locatorCacheTimeout(locatorCacheTimeout);
590 p = rec.
theQoS.find(
"connectionCached");
591 if (p != rec.
theQoS.end())
593 istringstream is(IceUtilInternal::trim(p->second));
594 int connectionCached;
595 if (!(is >> connectionCached) || !is.eof())
597 throw BadQoS(
"invalid connection cached setting (numeric value required): " +
600 newObj = newObj->ice_connectionCached(connectionCached > 0);
603 if (reliability ==
"ordered")
605 if (!newObj->ice_isTwoway())
607 throw BadQoS(
"ordered reliability requires a twoway proxy");
609 subscriber =
new SubscriberTwoway(instance, rec,
proxy, retryCount, 1, newObj);
611 else if (newObj->ice_isOneway() || newObj->ice_isDatagram())
615 throw BadQoS(
"non-zero retryCount QoS requires a twoway proxy");
617 subscriber =
new SubscriberOneway(instance, rec,
proxy, retryCount, newObj);
619 else if (newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
623 throw BadQoS(
"non-zero retryCount QoS requires a twoway proxy");
625 subscriber =
new SubscriberBatch(instance, rec,
proxy, retryCount, newObj);
629 assert(newObj->ice_isTwoway());
630 subscriber =
new SubscriberTwoway(instance, rec,
proxy, retryCount, 5, newObj);
632 per->setSubscriber(subscriber);
634 catch (
const Ice::Exception&)
636 instance->publishAdapter()->remove(
proxy->ice_getIdentity());
665 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
670 if (forwarded &&
_rec.link)
679 if (IceUtil::Time::now(IceUtil::Time::Monotonic) <
_next)
694 for (EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
713 _observer->queued(
static_cast<Ice::Int
>(events.size()));
731 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
744 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
754 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
770 catch (
const Ice::NotRegisteredException&)
774 catch (
const Ice::ObjectAdapterDeactivatedException&)
780 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
787 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
811 bool hardError =
dynamic_cast<const Ice::ObjectNotExistException*
>(&e) ||
812 dynamic_cast<const Ice::NotRegisteredException*
>(&e) ||
821 IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
839 Ice::Warning warn(traceLevels->logger);
840 warn << traceLevels->subscriberCat <<
":"
842 if (traceLevels->subscriber > 1)
846 warn <<
" subscriber offline: " << e
847 <<
" discarding events: " <<
_instance->discardInterval()
852 if (traceLevels->subscriber > 0)
854 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
856 if (traceLevels->subscriber > 1)
860 out <<
" subscriber offline: " << e
861 <<
" discarding events: " <<
_instance->discardInterval()
880 if (traceLevels->subscriber > 0)
882 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
884 if (traceLevels->subscriber > 1)
888 out <<
" subscriber errored out: " << e <<
" retry: " <<
_currentRetry <<
"/"
904 result->throwLocalException();
906 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
931 catch (
const Ice::LocalException& ex)
940 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
954 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(
_lock);
962 toSubscriberState(
_state),
969 const Ice::ObjectPrx&
proxy,
971 int maxOutstanding) :
986 const_cast<Ice::ObjectPrx&>(_proxyReplica) =
987 _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity());
992 _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
997 toSubscriberState(_state),
1031 if (traceLevels->subscriber > 1)
1033 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
1035 <<
" transition from: " << stateToString(
_state) <<
" to: " << stateToString(state);
1046 toSubscriberState(
_state),
1055 return subscriber->id() == id;
bool queue(bool, const EventDataSeq &)
Subscriber(const InstancePtr &, const IceStorm::SubscriberRecord &, const Ice::ObjectPrx &, int, int)
const IceStorm::SubscriberRecord _rec
const Ice::ObjectPrx _proxyReplica
IceInternal::ObserverHelperT< IceStorm::Instrumentation::SubscriberObserver > _observer
Ice::ObjectPrx proxy() const
IceStorm::SubscriberRecord record() const
const Ice::ObjectPrx _proxy
void completed(const Ice::AsyncResultPtr &)
const int _maxOutstanding
void error(bool, const Ice::Exception &)
const InstancePtr _instance
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
void setState(SubscriberState)
IceUtil::Monitor< IceUtil::RecMutex > _lock
std::string describeEndpoints(const Ice::ObjectPrx &)
@ SubscriberStateOnline
Online waiting to send events.
@ SubscriberStateOffline
Offline, retrying.
@ SubscriberStateError
Error state, awaiting to be destroyed.
IceUtil::Handle< Subscriber > SubscriberPtr
std::deque<::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
IceUtil::Handle< TraceLevels > TraceLevelsPtr
bool operator!=(const IceStorm::Subscriber &, const IceStorm::Subscriber &)
::IceUtil::Handle<::IceStorm::EventData > EventDataPtr
bool operator<(const TopicLink &lhs, const TopicLink &rhs)
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicLink > TopicLinkPrx
IceUtil::Handle< Instance > InstancePtr
bool operator==(const TopicLink &lhs, const TopicLink &rhs)
double v(double t, double v0, double a0, double j)
const LogSender::manipulator flush
Used to store persistent information for persistent subscribers.