Subscriber.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <iterator>
11
12#include <Ice/LoggerUtil.h>
13#include <IceStorm/Instance.h>
14#include <IceStorm/NodeI.h>
15#include <IceStorm/Subscriber.h>
17#include <IceStorm/Util.h>
18#include <IceUtil/StringUtil.h>
19
20using namespace std;
21using namespace IceStorm;
22using namespace IceStormElection;
23
24//
25// Per Subscriber object.
26//
27namespace
28{
29
30 class PerSubscriberPublisherI : public Ice::BlobjectArray
31 {
32 public:
33 PerSubscriberPublisherI(const InstancePtr& instance) : _instance(instance)
34 {
35 }
36
37 void
38 setSubscriber(const SubscriberPtr& subscriber)
39 {
40 _subscriber = subscriber;
41 }
42
43 virtual bool
44 ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
45 vector<Ice::Byte>&,
46 const Ice::Current& current)
47 {
48 // Use cached reads.
49 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
50
51 EventDataPtr event =
52 new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
53
54 //
55 // COMPILERBUG: gcc 4.0.1 doesn't like this.
56 //
57 //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
58 Ice::ByteSeq data(inParams.first, inParams.second);
59 event->data.swap(data);
60
62 e.push_back(event);
63 _subscriber->queue(false, e);
64 return true;
65 }
66
67 private:
68 const InstancePtr _instance;
69 /*const*/ SubscriberPtr _subscriber;
70 };
71
72 typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr;
73
75 toSubscriberState(Subscriber::SubscriberState s)
76 {
77 switch (s)
78 {
86 default:
87 assert(false);
89 }
90 }
91
92} // namespace
93
94// Each of the various Subscriber types.
95namespace
96{
97
98 class SubscriberBatch : public Subscriber
99 {
100 public:
101 SubscriberBatch(const InstancePtr&,
102 const SubscriberRecord&,
103 const Ice::ObjectPrx&,
104 int,
105 const Ice::ObjectPrx&);
106
107 virtual void flush();
108
109 void
110 exception(const Ice::Exception& ex)
111 {
112 error(false, ex);
113 }
114
115 void doFlush();
116 void sent(bool);
117
118 private:
119 const Ice::ObjectPrx _obj;
120 const IceUtil::Time _interval;
121 };
122
123 typedef IceUtil::Handle<SubscriberBatch> SubscriberBatchPtr;
124
125 class SubscriberOneway : public Subscriber
126 {
127 public:
128 SubscriberOneway(const InstancePtr&,
129 const SubscriberRecord&,
130 const Ice::ObjectPrx&,
131 int,
132 const Ice::ObjectPrx&);
133
134 virtual void flush();
135
136 void
137 exception(const Ice::Exception& ex)
138 {
139 error(true, ex);
140 }
141
142 void sent(bool);
143
144 private:
145 const Ice::ObjectPrx _obj;
146 };
147
148 typedef IceUtil::Handle<SubscriberOneway> SubscriberOnewayPtr;
149
150 class SubscriberTwoway : public Subscriber
151 {
152 public:
153 SubscriberTwoway(const InstancePtr&,
154 const SubscriberRecord&,
155 const Ice::ObjectPrx&,
156 int,
157 int,
158 const Ice::ObjectPrx&);
159
160 virtual void flush();
161
162 private:
163 const Ice::ObjectPrx _obj;
164 };
165
166 class SubscriberLink : public Subscriber
167 {
168 public:
169 SubscriberLink(const InstancePtr&, const SubscriberRecord&);
170
171 virtual void flush();
172
173 private:
174 const TopicLinkPrx _obj;
175 };
176
177 class FlushTimerTask : public IceUtil::TimerTask
178 {
179 public:
180 FlushTimerTask(const SubscriberBatchPtr& subscriber) : _subscriber(subscriber)
181 {
182 }
183
184 virtual void
185 runTimerTask()
186 {
187 _subscriber->doFlush();
188 }
189
190 private:
191 const SubscriberBatchPtr _subscriber;
192 };
193
194} // namespace
195
196SubscriberBatch::SubscriberBatch(const InstancePtr& instance,
197 const SubscriberRecord& rec,
198 const Ice::ObjectPrx& proxy,
199 int retryCount,
200 const Ice::ObjectPrx& obj) :
201 Subscriber(instance, rec, proxy, retryCount, 1), _obj(obj), _interval(instance->flushInterval())
202{
203 assert(retryCount == 0);
204}
205
206void
207SubscriberBatch::flush()
208{
209 if (_outstanding == 0)
210 {
211 ++_outstanding;
212 _instance->batchFlusher()->schedule(new FlushTimerTask(this), _interval);
213 }
214}
215
216void
217SubscriberBatch::doFlush()
218{
219 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
220
221 //
222 // If the subscriber isn't online we're done.
223 //
224 if (_state != SubscriberStateOnline)
225 {
226 return;
227 }
228
230 v.swap(_events);
231 assert(!v.empty());
232
233 if (_observer)
234 {
235 _outstandingCount = static_cast<Ice::Int>(v.size());
236 _observer->outstanding(_outstandingCount);
237 }
238
239 try
240 {
241 vector<Ice::Byte> dummy;
242 for (EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
243 {
244 _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
245 }
246
247 Ice::AsyncResultPtr result =
248 _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(
249 this, &SubscriberBatch::exception, &SubscriberBatch::sent));
250 if (result->sentSynchronously())
251 {
252 --_outstanding;
253 assert(_outstanding == 0);
254 if (_observer)
255 {
256 _observer->delivered(_outstandingCount);
257 }
258 }
259 }
260 catch (const Ice::Exception& ex)
261 {
262 error(false, ex);
263 return;
264 }
265
266 if (_events.empty() && _outstanding == 0 && _shutdown)
267 {
268 _lock.notify();
269 }
270
271 // This is significantly faster than the async version, but it can
272 // block the calling thread. Bad news!
273
274 //_obj->ice_flushBatchRequests();
275}
276
277void
278SubscriberBatch::sent(bool sentSynchronously)
279{
280 if (sentSynchronously)
281 {
282 return;
283 }
284
285 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
286
287 // Decrement the _outstanding count.
288 --_outstanding;
289 assert(_outstanding == 0);
290 if (_observer)
291 {
292 _observer->delivered(_outstandingCount);
293 }
294
295 if (_events.empty() && _outstanding == 0 && _shutdown)
296 {
297 _lock.notify();
298 }
299 else if (!_events.empty())
300 {
301 flush();
302 }
303}
304
305SubscriberOneway::SubscriberOneway(const InstancePtr& instance,
306 const SubscriberRecord& rec,
307 const Ice::ObjectPrx& proxy,
308 int retryCount,
309 const Ice::ObjectPrx& obj) :
310 Subscriber(instance, rec, proxy, retryCount, 5), _obj(obj)
311{
312 assert(retryCount == 0);
313}
314
315void
316SubscriberOneway::flush()
317{
318 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
319
320 //
321 // If the subscriber isn't online we're done.
322 //
323 if (_state != SubscriberStateOnline || _events.empty())
324 {
325 return;
326 }
327
328 // Send up to _maxOutstanding pending events.
329 while (_outstanding < _maxOutstanding && !_events.empty())
330 {
331 //
332 // Dequeue the head event, count one more outstanding AMI
333 // request.
334 //
335 EventDataPtr e = _events.front();
336 _events.erase(_events.begin());
337 if (_observer)
338 {
339 _observer->outstanding(1);
340 }
341
342 try
343 {
344 Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
345 e->op,
346 e->mode,
347 e->data,
348 e->context,
349 Ice::newCallback_Object_ice_invoke(
350 this, &SubscriberOneway::exception, &SubscriberOneway::sent));
351 if (!result->sentSynchronously())
352 {
353 ++_outstanding;
354 }
355 else if (_observer)
356 {
357 _observer->delivered(1);
358 }
359 }
360 catch (const Ice::Exception& ex)
361 {
362 error(true, ex);
363 return;
364 }
365 }
366
367 if (_events.empty() && _outstanding == 0 && _shutdown)
368 {
369 _lock.notify();
370 }
371}
372
373void
374SubscriberOneway::sent(bool sentSynchronously)
375{
376 if (sentSynchronously)
377 {
378 return;
379 }
380
381 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
382
383 // Decrement the _outstanding count.
384 --_outstanding;
385 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
386 if (_observer)
387 {
388 _observer->delivered(1);
389 }
390
391 if (_events.empty() && _outstanding == 0 && _shutdown)
392 {
393 _lock.notify();
394 }
395 else if (_outstanding <= 0 && !_events.empty())
396 {
397 flush();
398 }
399}
400
401SubscriberTwoway::SubscriberTwoway(const InstancePtr& instance,
402 const SubscriberRecord& rec,
403 const Ice::ObjectPrx& proxy,
404 int retryCount,
405 int maxOutstanding,
406 const Ice::ObjectPrx& obj) :
407 Subscriber(instance, rec, proxy, retryCount, maxOutstanding), _obj(obj)
408{
409}
410
411void
412SubscriberTwoway::flush()
413{
414 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
415
416 //
417 // If the subscriber isn't online we're done.
418 //
419 if (_state != SubscriberStateOnline || _events.empty())
420 {
421 return;
422 }
423
424 // Send up to _maxOutstanding pending events.
425 while (_outstanding < _maxOutstanding && !_events.empty())
426 {
427 //
428 // Dequeue the head event, count one more outstanding AMI
429 // request.
430 //
431 EventDataPtr e = _events.front();
432 _events.erase(_events.begin());
433 ++_outstanding;
434 if (_observer)
435 {
436 _observer->outstanding(1);
437 }
438
439 try
440 {
441 _obj->begin_ice_invoke(
442 e->op,
443 e->mode,
444 e->data,
445 e->context,
446 Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
447 }
448 catch (const Ice::Exception& ex)
449 {
450 error(true, ex);
451 return;
452 }
453 }
454}
455
456namespace
457{
458
459 SubscriberLink::SubscriberLink(const InstancePtr& instance, const SubscriberRecord& rec) :
460 Subscriber(instance, rec, 0, -1, 1),
461 _obj(TopicLinkPrx::uncheckedCast(
462 rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout())))
463 {
464 }
465
466 void
467 SubscriberLink::flush()
468 {
469 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
470
471 if (_state != SubscriberStateOnline || _outstanding > 0)
472 {
473 return;
474 }
475
477 v.swap(_events);
478
479 EventDataSeq::iterator p = v.begin();
480 while (p != v.end())
481 {
482 if (_rec.cost != 0)
483 {
484 int cost = 0;
485 Ice::Context::const_iterator q = (*p)->context.find("cost");
486 if (q != (*p)->context.end())
487 {
488 cost = atoi(q->second.c_str());
489 }
490 if (cost > _rec.cost)
491 {
492 p = v.erase(p);
493 continue;
494 }
495 }
496 ++p;
497 }
498
499 if (!v.empty())
500 {
501 try
502 {
503 ++_outstanding;
504 if (_observer)
505 {
506 _outstandingCount = static_cast<Ice::Int>(v.size());
507 _observer->outstanding(_outstandingCount);
508 }
509 _obj->begin_forward(
510 v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
511 }
512 catch (const Ice::Exception& ex)
513 {
514 error(true, ex);
515 }
516 }
517 }
518
519} // namespace
520
523{
524 if (rec.link)
525 {
526 return new SubscriberLink(instance, rec);
527 }
528 else
529 {
530 PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance);
531 Ice::Identity perId;
532 perId.category = instance->instanceName();
533 perId.name = "topic." + rec.topicName + ".publish." +
534 instance->communicator()->identityToString(rec.obj->ice_getIdentity());
535 Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId);
536 TraceLevelsPtr traceLevels = instance->traceLevels();
537 SubscriberPtr subscriber;
538
539 try
540 {
541 int retryCount = 0;
542 QoS::const_iterator p = rec.theQoS.find("retryCount");
543 if (p != rec.theQoS.end())
544 {
545 retryCount = atoi(p->second.c_str());
546 }
547
548 string reliability;
549 p = rec.theQoS.find("reliability");
550 if (p != rec.theQoS.end())
551 {
552 reliability = p->second;
553 }
554 if (!reliability.empty() && reliability != "ordered")
555 {
556 throw BadQoS("invalid reliability: " + reliability);
557 }
558
559 //
560 // Override the timeout.
561 //
562 Ice::ObjectPrx newObj;
563 try
564 {
565 newObj = rec.obj->ice_timeout(instance->sendTimeout());
566 }
567 catch (const Ice::FixedProxyException&)
568 {
569 //
570 // In the event IceStorm is collocated this could be a
571 // fixed proxy in which case its not possible to set the
572 // timeout.
573 //
574 newObj = rec.obj;
575 }
576
577 p = rec.theQoS.find("locatorCacheTimeout");
578 if (p != rec.theQoS.end())
579 {
580 istringstream is(IceUtilInternal::trim(p->second));
581 int locatorCacheTimeout;
582 if (!(is >> locatorCacheTimeout) || !is.eof())
583 {
584 throw BadQoS("invalid locator cache timeout (numeric value required): " +
585 p->second);
586 }
587 newObj = newObj->ice_locatorCacheTimeout(locatorCacheTimeout);
588 }
589
590 p = rec.theQoS.find("connectionCached");
591 if (p != rec.theQoS.end())
592 {
593 istringstream is(IceUtilInternal::trim(p->second));
594 int connectionCached;
595 if (!(is >> connectionCached) || !is.eof())
596 {
597 throw BadQoS("invalid connection cached setting (numeric value required): " +
598 p->second);
599 }
600 newObj = newObj->ice_connectionCached(connectionCached > 0);
601 }
602
603 if (reliability == "ordered")
604 {
605 if (!newObj->ice_isTwoway())
606 {
607 throw BadQoS("ordered reliability requires a twoway proxy");
608 }
609 subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj);
610 }
611 else if (newObj->ice_isOneway() || newObj->ice_isDatagram())
612 {
613 if (retryCount > 0)
614 {
615 throw BadQoS("non-zero retryCount QoS requires a twoway proxy");
616 }
617 subscriber = new SubscriberOneway(instance, rec, proxy, retryCount, newObj);
618 }
619 else if (newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
620 {
621 if (retryCount > 0)
622 {
623 throw BadQoS("non-zero retryCount QoS requires a twoway proxy");
624 }
625 subscriber = new SubscriberBatch(instance, rec, proxy, retryCount, newObj);
626 }
627 else //if(newObj->ice_isTwoway())
628 {
629 assert(newObj->ice_isTwoway());
630 subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj);
631 }
632 per->setSubscriber(subscriber);
633 }
634 catch (const Ice::Exception&)
635 {
636 instance->publishAdapter()->remove(proxy->ice_getIdentity());
637 throw;
638 }
639
640 return subscriber;
641 }
642}
643
644Ice::ObjectPrx
646{
647 return _proxyReplica;
648}
649
650Ice::Identity
652{
653 return _rec.id;
654}
655
658{
659 return _rec;
660}
661
662bool
663Subscriber::queue(bool forwarded, const EventDataSeq& events)
664{
665 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
666
667 // If this is a link subscriber if the set of events were
668 // forwarded from another IceStorm instance then do not queue the
669 // events.
670 if (forwarded && _rec.link)
671 {
672 return true;
673 }
674
675 switch (_state)
676 {
678 {
679 if (IceUtil::Time::now(IceUtil::Time::Monotonic) < _next)
680 {
681 break;
682 }
683
684 //
685 // State transition to online.
686 //
688 // fall through
689 [[fallthrough]];
690 }
691
693 {
694 for (EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
695 {
696 if (static_cast<int>(_events.size()) == _instance->sendQueueSizeMax())
697 {
698 if (_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber)
699 {
700 error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__));
701 return false;
702 }
703 else // DropEvents
704 {
705 _events.pop_front();
706 }
707 }
708 _events.push_back(*p);
709 }
710
711 if (_observer)
712 {
713 _observer->queued(static_cast<Ice::Int>(events.size()));
714 }
715 flush();
716 break;
717 }
719 return false;
720
722 break;
723 }
724
725 return true;
726}
727
728bool
730{
731 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
732 assert(_state >= SubscriberStateError);
734 {
736 return true;
737 }
738 return false;
739}
740
741void
743{
744 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
746 {
748 }
749}
750
751bool
753{
754 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
756}
757
758void
760{
761 //
762 // Clear the per-subscriber object if it exists.
763 //
764 if (_proxy)
765 {
766 try
767 {
768 _instance->publishAdapter()->remove(_proxy->ice_getIdentity());
769 }
770 catch (const Ice::NotRegisteredException&)
771 {
772 // Ignore
773 }
774 catch (const Ice::ObjectAdapterDeactivatedException&)
775 {
776 // Ignore
777 }
778 }
779
780 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
781 _observer.detach();
782}
783
784void
785Subscriber::error(bool dec, const Ice::Exception& e)
786{
787 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
788
789 if (dec)
790 {
791 // Decrement the _outstanding count.
792 --_outstanding;
793 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
794 }
795
796 //
797 // It's possible to be already in the error state if the queue maximum size
798 // has been reached or if an ObjectNotExistException occured before.
799 //
801 {
802 if (_shutdown)
803 {
804 _lock.notify();
805 }
806 return;
807 }
808
809 // A hard error is an ObjectNotExistException or
810 // NotRegisteredException.
811 bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) ||
812 dynamic_cast<const Ice::NotRegisteredException*>(&e) ||
813 dynamic_cast<const IceStorm::SendQueueSizeMaxReached*>(&e);
814
815 //
816 // A twoway subscriber can queue multiple send events and
817 // therefore its possible to get multiple error'd replies. Ignore
818 // replies if we're retrying and its not yet time to process the
819 // next request.
820 //
821 IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
822 if (!hardError && _state == SubscriberStateOffline && now < _next)
823 {
824 return;
825 }
826
827 //
828 // If we're in our retry limits and the error isn't a hard failure
829 // (that is ObjectNotExistException or NotRegisteredException)
830 // then we transition to an offline state.
831 //
832 if (!hardError && (_retryCount == -1 || _currentRetry < _retryCount))
833 {
835
836 TraceLevelsPtr traceLevels = _instance->traceLevels();
837 if (_currentRetry == 0)
838 {
839 Ice::Warning warn(traceLevels->logger);
840 warn << traceLevels->subscriberCat << ":"
841 << _instance->communicator()->identityToString(_rec.id);
842 if (traceLevels->subscriber > 1)
843 {
844 warn << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
845 }
846 warn << " subscriber offline: " << e
847 << " discarding events: " << _instance->discardInterval()
848 << "s retryCount: " << _retryCount;
849 }
850 else
851 {
852 if (traceLevels->subscriber > 0)
853 {
854 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
855 out << _instance->communicator()->identityToString(_rec.id);
856 if (traceLevels->subscriber > 1)
857 {
858 out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
859 }
860 out << " subscriber offline: " << e
861 << " discarding events: " << _instance->discardInterval()
862 << "s retry: " << _currentRetry << "/" << _retryCount;
863 }
864 }
865
866 // Transition to offline state, increment the retry count and
867 // clear all queued events.
868 _next = now + _instance->discardInterval();
870 _events.clear();
872 }
873 // Errored out.
874 else if (_state < SubscriberStateError)
875 {
876 _events.clear();
878
879 TraceLevelsPtr traceLevels = _instance->traceLevels();
880 if (traceLevels->subscriber > 0)
881 {
882 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
883 out << _instance->communicator()->identityToString(_rec.id);
884 if (traceLevels->subscriber > 1)
885 {
886 out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
887 }
888 out << " subscriber errored out: " << e << " retry: " << _currentRetry << "/"
889 << _retryCount;
890 }
891 }
892
893 if (_shutdown && _events.empty())
894 {
895 _lock.notify();
896 }
897}
898
899void
900Subscriber::completed(const Ice::AsyncResultPtr& result)
901{
902 try
903 {
904 result->throwLocalException();
905
906 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
907
908 // Decrement the _outstanding count.
909 --_outstanding;
910 assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
911 if (_observer)
912 {
913 _observer->delivered(_outstandingCount);
914 }
915
916 //
917 // A successful response means we're no longer retrying, we're
918 // back active.
919 //
920 _currentRetry = 0;
921
922 if (_events.empty() && _outstanding == 0 && _shutdown)
923 {
924 _lock.notify();
925 }
926 else
927 {
928 flush();
929 }
930 }
931 catch (const Ice::LocalException& ex)
932 {
933 error(true, ex);
934 }
935}
936
937void
939{
940 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
941
942 _shutdown = true;
943 while (_outstanding > 0 && !_events.empty())
944 {
945 _lock.wait();
946 }
947
948 _observer.detach();
949}
950
951void
953{
954 IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
955 if (_instance->observer())
956 {
957 _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
958 _rec.topicName,
959 _rec.obj,
960 _rec.theQoS,
961 _rec.theTopic,
962 toSubscriberState(_state),
963 _observer.get()));
964 }
965}
966
968 const SubscriberRecord& rec,
969 const Ice::ObjectPrx& proxy,
970 int retryCount,
971 int maxOutstanding) :
972 _instance(instance),
973 _rec(rec),
974 _retryCount(retryCount),
975 _maxOutstanding(maxOutstanding),
976 _proxy(proxy),
978 _shutdown(false),
980 _outstanding(0),
983{
984 if (_proxy && _instance->publisherReplicaProxy())
985 {
986 const_cast<Ice::ObjectPrx&>(_proxyReplica) =
987 _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity());
988 }
989
990 if (_instance->observer())
991 {
992 _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
993 rec.topicName,
994 rec.obj,
995 rec.theQoS,
996 rec.theTopic,
997 toSubscriberState(_state),
998 0));
999 }
1000}
1001
1002namespace
1003{
1004
1005 string
1006 stateToString(Subscriber::SubscriberState state)
1007 {
1008 switch (state)
1009 {
1011 return "online";
1013 return "offline";
1015 return "error";
1017 return "reaped";
1018 default:
1019 return "???";
1020 }
1021 }
1022
1023} // namespace
1024
1025void
1027{
1028 if (state != _state)
1029 {
1030 TraceLevelsPtr traceLevels = _instance->traceLevels();
1031 if (traceLevels->subscriber > 1)
1032 {
1033 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
1034 out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj)
1035 << " transition from: " << stateToString(_state) << " to: " << stateToString(state);
1036 }
1037 _state = state;
1038
1039 if (_instance->observer())
1040 {
1041 _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
1042 _rec.topicName,
1043 _rec.obj,
1044 _rec.theQoS,
1045 _rec.theTopic,
1046 toSubscriberState(_state),
1047 _observer.get()));
1048 }
1049 }
1050}
1051
1052bool
1053IceStorm::operator==(const SubscriberPtr& subscriber, const Ice::Identity& id)
1054{
1055 return subscriber->id() == id;
1056}
1057
1058bool
1060{
1061 return &s1 == &s2;
1062}
1063
1064bool
1066{
1067 return &s1 != &s2;
1068}
1069
1070bool
1071IceStorm::operator<(const Subscriber& s1, const Subscriber& s2)
1072{
1073 return &s1 < &s2;
1074}
uint8_t data[1]
if(!yyvaluep)
Definition Grammar.cpp:645
bool queue(bool, const EventDataSeq &)
Subscriber(const InstancePtr &, const IceStorm::SubscriberRecord &, const Ice::ObjectPrx &, int, int)
const IceStorm::SubscriberRecord _rec
Definition Subscriber.h:74
IceUtil::Time _next
Definition Subscriber.h:91
const Ice::ObjectPrx _proxyReplica
Definition Subscriber.h:78
IceInternal::ObserverHelperT< IceStorm::Instrumentation::SubscriberObserver > _observer
Definition Subscriber.h:94
virtual void flush()=0
Ice::ObjectPrx proxy() const
IceStorm::SubscriberRecord record() const
SubscriberState _state
Definition Subscriber.h:84
const Ice::ObjectPrx _proxy
Definition Subscriber.h:77
void completed(const Ice::AsyncResultPtr &)
const int _maxOutstanding
Definition Subscriber.h:76
void error(bool, const Ice::Exception &)
const InstancePtr _instance
Definition Subscriber.h:73
EventDataSeq _events
Definition Subscriber.h:88
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
const int _retryCount
Definition Subscriber.h:75
Ice::Identity id() const
void setState(SubscriberState)
IceUtil::Monitor< IceUtil::RecMutex > _lock
Definition Subscriber.h:80
#define q
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition Util.cpp:51
@ SubscriberStateOnline
Online waiting to send events.
@ SubscriberStateOffline
Offline, retrying.
@ SubscriberStateError
Error state, awaiting to be destroyed.
IceUtil::Handle< Subscriber > SubscriberPtr
Definition Subscriber.h:26
std::deque<::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
IceUtil::Handle< TraceLevels > TraceLevelsPtr
Definition Instance.h:44
bool operator!=(const IceStorm::Subscriber &, const IceStorm::Subscriber &)
::IceUtil::Handle<::IceStorm::EventData > EventDataPtr
bool operator<(const TopicLink &lhs, const TopicLink &rhs)
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicLink > TopicLinkPrx
IceUtil::Handle< Instance > InstancePtr
Definition Instance.h:128
bool operator==(const TopicLink &lhs, const TopicLink &rhs)
double v(double t, double v0, double a0, double j)
Definition CtrlUtil.h:39
const LogSender::manipulator flush
Definition LogSender.h:251
Used to store persistent information for persistent subscribers.