Subscriber.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <iterator>
11 
12 #include <Ice/LoggerUtil.h>
13 #include <IceStorm/Instance.h>
14 #include <IceStorm/NodeI.h>
15 #include <IceStorm/Subscriber.h>
16 #include <IceStorm/TraceLevels.h>
17 #include <IceStorm/Util.h>
18 #include <IceUtil/StringUtil.h>
19 
20 using namespace std;
21 using namespace IceStorm;
22 using namespace IceStormElection;
23 
24 //
25 // Per Subscriber object.
26 //
27 namespace
28 {
29 
30  class PerSubscriberPublisherI : public Ice::BlobjectArray
31  {
32  public:
33  PerSubscriberPublisherI(const InstancePtr& instance) : _instance(instance)
34  {
35  }
36 
37  void
38  setSubscriber(const SubscriberPtr& subscriber)
39  {
40  _subscriber = subscriber;
41  }
42 
43  virtual bool
44  ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
45  vector<Ice::Byte>&,
46  const Ice::Current& current)
47  {
48  // Use cached reads.
49  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
50 
51  EventDataPtr event =
52  new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
53 
54  //
55  // COMPILERBUG: gcc 4.0.1 doesn't like this.
56  //
57  //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
58  Ice::ByteSeq data(inParams.first, inParams.second);
59  event->data.swap(data);
60 
61  EventDataSeq e;
62  e.push_back(event);
63  _subscriber->queue(false, e);
64  return true;
65  }
66 
67  private:
68  const InstancePtr _instance;
69  /*const*/ SubscriberPtr _subscriber;
70  };
71 
72  typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr;
73 
75  toSubscriberState(Subscriber::SubscriberState s)
76  {
77  switch (s)
78  {
84  case Subscriber::SubscriberStateReaped:
86  default:
87  assert(false);
89  }
90  }
91 
92 } // namespace
93 
94 // Each of the various Subscriber types.
95 namespace
96 {
97 
98  class SubscriberBatch : public Subscriber
99  {
100  public:
101  SubscriberBatch(const InstancePtr&,
102  const SubscriberRecord&,
103  const Ice::ObjectPrx&,
104  int,
105  const Ice::ObjectPrx&);
106 
107  virtual void flush();
108 
109  void
110  exception(const Ice::Exception& ex)
111  {
112  error(false, ex);
113  }
114 
115  void doFlush();
116  void sent(bool);
117 
118  private:
119  const Ice::ObjectPrx _obj;
120  const IceUtil::Time _interval;
121  };
122 
123  typedef IceUtil::Handle<SubscriberBatch> SubscriberBatchPtr;
124 
125  class SubscriberOneway : public Subscriber
126  {
127  public:
128  SubscriberOneway(const InstancePtr&,
129  const SubscriberRecord&,
130  const Ice::ObjectPrx&,
131  int,
132  const Ice::ObjectPrx&);
133 
134  virtual void flush();
135 
136  void
137  exception(const Ice::Exception& ex)
138  {
139  error(true, ex);
140  }
141 
142  void sent(bool);
143 
144  private:
145  const Ice::ObjectPrx _obj;
146  };
147 
148  typedef IceUtil::Handle<SubscriberOneway> SubscriberOnewayPtr;
149 
150  class SubscriberTwoway : public Subscriber
151  {
152  public:
153  SubscriberTwoway(const InstancePtr&,
154  const SubscriberRecord&,
155  const Ice::ObjectPrx&,
156  int,
157  int,
158  const Ice::ObjectPrx&);
159 
160  virtual void flush();
161 
162  private:
163  const Ice::ObjectPrx _obj;
164  };
165 
166  class SubscriberLink : public Subscriber
167  {
168  public:
169  SubscriberLink(const InstancePtr&, const SubscriberRecord&);
170 
171  virtual void flush();
172 
173  private:
174  const TopicLinkPrx _obj;
175  };
176 
177  class FlushTimerTask : public IceUtil::TimerTask
178  {
179  public:
180  FlushTimerTask(const SubscriberBatchPtr& subscriber) : _subscriber(subscriber)
181  {
182  }
183 
184  virtual void
185  runTimerTask()
186  {
187  _subscriber->doFlush();
188  }
189 
190  private:
191  const SubscriberBatchPtr _subscriber;
192  };
193 
194 } // namespace
195 
196 SubscriberBatch::SubscriberBatch(const InstancePtr& instance,
197  const SubscriberRecord& rec,
198  const Ice::ObjectPrx& proxy,
199  int retryCount,
200  const Ice::ObjectPrx& obj) :
201  Subscriber(instance, rec, proxy, retryCount, 1), _obj(obj), _interval(instance->flushInterval())
202 {
203  assert(retryCount == 0);
204 }
205 
206 void
208 {
209  if (_outstanding == 0)
210  {
211  ++_outstanding;
212  _instance->batchFlusher()->schedule(new FlushTimerTask(this), _interval);
213  }
214 }
215 
216 void
217 SubscriberBatch::doFlush()
218 {
219  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
220 
221  //
222  // If the subscriber isn't online we're done.
223  //
224  if (_state != SubscriberStateOnline)
225  {
226  return;
227  }
228 
229  EventDataSeq v;
230  v.swap(_events);
231  assert(!v.empty());
232 
233  if (_observer)
234  {
235  _outstandingCount = static_cast<Ice::Int>(v.size());
236  _observer->outstanding(_outstandingCount);
237  }
238 
239  try
240  {
241  vector<Ice::Byte> dummy;
242  for (EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
243  {
244  _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
245  }
246 
247  Ice::AsyncResultPtr result =
248  _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(
249  this, &SubscriberBatch::exception, &SubscriberBatch::sent));
250  if (result->sentSynchronously())
251  {
252  --_outstanding;
253  assert(_outstanding == 0);
254  if (_observer)
255  {
256  _observer->delivered(_outstandingCount);
257  }
258  }
259  }
260  catch (const Ice::Exception& ex)
261  {
262  error(false, ex);
263  return;
264  }
265 
266  if (_events.empty() && _outstanding == 0 && _shutdown)
267  {
268  _lock.notify();
269  }
270 
271  // This is significantly faster than the async version, but it can
272  // block the calling thread. Bad news!
273 
274  //_obj->ice_flushBatchRequests();
275 }
276 
277 void
278 SubscriberBatch::sent(bool sentSynchronously)
279 {
280  if (sentSynchronously)
281  {
282  return;
283  }
284 
285  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
286 
287  // Decrement the _outstanding count.
288  --_outstanding;
289  assert(_outstanding == 0);
290  if (_observer)
291  {
292  _observer->delivered(_outstandingCount);
293  }
294 
295  if (_events.empty() && _outstanding == 0 && _shutdown)
296  {
297  _lock.notify();
298  }
299  else if (!_events.empty())
300  {
301  flush();
302  }
303 }
304 
305 SubscriberOneway::SubscriberOneway(const InstancePtr& instance,
306  const SubscriberRecord& rec,
307  const Ice::ObjectPrx& proxy,
308  int retryCount,
309  const Ice::ObjectPrx& obj) :
310  Subscriber(instance, rec, proxy, retryCount, 5), _obj(obj)
311 {
312  assert(retryCount == 0);
313 }
314 
315 void
317 {
318  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
319 
320  //
321  // If the subscriber isn't online we're done.
322  //
323  if (_state != SubscriberStateOnline || _events.empty())
324  {
325  return;
326  }
327 
328  // Send up to _maxOutstanding pending events.
329  while (_outstanding < _maxOutstanding && !_events.empty())
330  {
331  //
332  // Dequeue the head event, count one more outstanding AMI
333  // request.
334  //
335  EventDataPtr e = _events.front();
336  _events.erase(_events.begin());
337  if (_observer)
338  {
339  _observer->outstanding(1);
340  }
341 
342  try
343  {
344  Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
345  e->op,
346  e->mode,
347  e->data,
348  e->context,
349  Ice::newCallback_Object_ice_invoke(
350  this, &SubscriberOneway::exception, &SubscriberOneway::sent));
351  if (!result->sentSynchronously())
352  {
353  ++_outstanding;
354  }
355  else if (_observer)
356  {
357  _observer->delivered(1);
358  }
359  }
360  catch (const Ice::Exception& ex)
361  {
362  error(true, ex);
363  return;
364  }
365  }
366 
367  if (_events.empty() && _outstanding == 0 && _shutdown)
368  {
369  _lock.notify();
370  }
371 }
372 
373 void
374 SubscriberOneway::sent(bool sentSynchronously)
375 {
376  if (sentSynchronously)
377  {
378  return;
379  }
380 
381  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
382 
383  // Decrement the _outstanding count.
384  --_outstanding;
385  assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
386  if (_observer)
387  {
388  _observer->delivered(1);
389  }
390 
391  if (_events.empty() && _outstanding == 0 && _shutdown)
392  {
393  _lock.notify();
394  }
395  else if (_outstanding <= 0 && !_events.empty())
396  {
397  flush();
398  }
399 }
400 
401 SubscriberTwoway::SubscriberTwoway(const InstancePtr& instance,
402  const SubscriberRecord& rec,
403  const Ice::ObjectPrx& proxy,
404  int retryCount,
405  int maxOutstanding,
406  const Ice::ObjectPrx& obj) :
407  Subscriber(instance, rec, proxy, retryCount, maxOutstanding), _obj(obj)
408 {
409 }
410 
411 void
413 {
414  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
415 
416  //
417  // If the subscriber isn't online we're done.
418  //
419  if (_state != SubscriberStateOnline || _events.empty())
420  {
421  return;
422  }
423 
424  // Send up to _maxOutstanding pending events.
425  while (_outstanding < _maxOutstanding && !_events.empty())
426  {
427  //
428  // Dequeue the head event, count one more outstanding AMI
429  // request.
430  //
431  EventDataPtr e = _events.front();
432  _events.erase(_events.begin());
433  ++_outstanding;
434  if (_observer)
435  {
436  _observer->outstanding(1);
437  }
438 
439  try
440  {
441  _obj->begin_ice_invoke(
442  e->op,
443  e->mode,
444  e->data,
445  e->context,
446  Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
447  }
448  catch (const Ice::Exception& ex)
449  {
450  error(true, ex);
451  return;
452  }
453  }
454 }
455 
456 namespace
457 {
458 
459  SubscriberLink::SubscriberLink(const InstancePtr& instance, const SubscriberRecord& rec) :
460  Subscriber(instance, rec, 0, -1, 1),
461  _obj(TopicLinkPrx::uncheckedCast(
462  rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout())))
463  {
464  }
465 
466  void
468  {
469  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
470 
471  if (_state != SubscriberStateOnline || _outstanding > 0)
472  {
473  return;
474  }
475 
476  EventDataSeq v;
477  v.swap(_events);
478 
479  EventDataSeq::iterator p = v.begin();
480  while (p != v.end())
481  {
482  if (_rec.cost != 0)
483  {
484  int cost = 0;
485  Ice::Context::const_iterator q = (*p)->context.find("cost");
486  if (q != (*p)->context.end())
487  {
488  cost = atoi(q->second.c_str());
489  }
490  if (cost > _rec.cost)
491  {
492  p = v.erase(p);
493  continue;
494  }
495  }
496  ++p;
497  }
498 
499  if (!v.empty())
500  {
501  try
502  {
503  ++_outstanding;
504  if (_observer)
505  {
506  _outstandingCount = static_cast<Ice::Int>(v.size());
507  _observer->outstanding(_outstandingCount);
508  }
509  _obj->begin_forward(
510  v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
511  }
512  catch (const Ice::Exception& ex)
513  {
514  error(true, ex);
515  }
516  }
517  }
518 
519 } // namespace
520 
522 Subscriber::create(const InstancePtr& instance, const SubscriberRecord& rec)
523 {
524  if (rec.link)
525  {
526  return new SubscriberLink(instance, rec);
527  }
528  else
529  {
530  PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance);
531  Ice::Identity perId;
532  perId.category = instance->instanceName();
533  perId.name = "topic." + rec.topicName + ".publish." +
534  instance->communicator()->identityToString(rec.obj->ice_getIdentity());
535  Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId);
536  TraceLevelsPtr traceLevels = instance->traceLevels();
537  SubscriberPtr subscriber;
538 
539  try
540  {
541  int retryCount = 0;
542  QoS::const_iterator p = rec.theQoS.find("retryCount");
543  if (p != rec.theQoS.end())
544  {
545  retryCount = atoi(p->second.c_str());
546  }
547 
548  string reliability;
549  p = rec.theQoS.find("reliability");
550  if (p != rec.theQoS.end())
551  {
552  reliability = p->second;
553  }
554  if (!reliability.empty() && reliability != "ordered")
555  {
556  throw BadQoS("invalid reliability: " + reliability);
557  }
558 
559  //
560  // Override the timeout.
561  //
562  Ice::ObjectPrx newObj;
563  try
564  {
565  newObj = rec.obj->ice_timeout(instance->sendTimeout());
566  }
567  catch (const Ice::FixedProxyException&)
568  {
569  //
570  // In the event IceStorm is collocated this could be a
571  // fixed proxy in which case its not possible to set the
572  // timeout.
573  //
574  newObj = rec.obj;
575  }
576 
577  p = rec.theQoS.find("locatorCacheTimeout");
578  if (p != rec.theQoS.end())
579  {
580  istringstream is(IceUtilInternal::trim(p->second));
581  int locatorCacheTimeout;
582  if (!(is >> locatorCacheTimeout) || !is.eof())
583  {
584  throw BadQoS("invalid locator cache timeout (numeric value required): " +
585  p->second);
586  }
587  newObj = newObj->ice_locatorCacheTimeout(locatorCacheTimeout);
588  }
589 
590  p = rec.theQoS.find("connectionCached");
591  if (p != rec.theQoS.end())
592  {
593  istringstream is(IceUtilInternal::trim(p->second));
594  int connectionCached;
595  if (!(is >> connectionCached) || !is.eof())
596  {
597  throw BadQoS("invalid connection cached setting (numeric value required): " +
598  p->second);
599  }
600  newObj = newObj->ice_connectionCached(connectionCached > 0);
601  }
602 
603  if (reliability == "ordered")
604  {
605  if (!newObj->ice_isTwoway())
606  {
607  throw BadQoS("ordered reliability requires a twoway proxy");
608  }
609  subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj);
610  }
611  else if (newObj->ice_isOneway() || newObj->ice_isDatagram())
612  {
613  if (retryCount > 0)
614  {
615  throw BadQoS("non-zero retryCount QoS requires a twoway proxy");
616  }
617  subscriber = new SubscriberOneway(instance, rec, proxy, retryCount, newObj);
618  }
619  else if (newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
620  {
621  if (retryCount > 0)
622  {
623  throw BadQoS("non-zero retryCount QoS requires a twoway proxy");
624  }
625  subscriber = new SubscriberBatch(instance, rec, proxy, retryCount, newObj);
626  }
627  else //if(newObj->ice_isTwoway())
628  {
629  assert(newObj->ice_isTwoway());
630  subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj);
631  }
632  per->setSubscriber(subscriber);
633  }
634  catch (const Ice::Exception&)
635  {
636  instance->publishAdapter()->remove(proxy->ice_getIdentity());
637  throw;
638  }
639 
640  return subscriber;
641  }
642 }
643 
644 Ice::ObjectPrx
645 Subscriber::proxy() const
646 {
647  return _proxyReplica;
648 }
649 
651 Subscriber::id() const
652 {
653  return _rec.id;
654 }
655 
657 Subscriber::record() const
658 {
659  return _rec;
660 }
661 
662 bool
663 Subscriber::queue(bool forwarded, const EventDataSeq& events)
664 {
665  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
666 
667  // If this is a link subscriber if the set of events were
668  // forwarded from another IceStorm instance then do not queue the
669  // events.
670  if (forwarded && _rec.link)
671  {
672  return true;
673  }
674 
675  switch (_state)
676  {
678  {
679  if (IceUtil::Time::now(IceUtil::Time::Monotonic) < _next)
680  {
681  break;
682  }
683 
684  //
685  // State transition to online.
686  //
687  setState(SubscriberStateOnline);
688  // fall through
689  [[fallthrough]];
690  }
691 
693  {
694  for (EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
695  {
696  if (static_cast<int>(_events.size()) == _instance->sendQueueSizeMax())
697  {
698  if (_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber)
699  {
700  error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__));
701  return false;
702  }
703  else // DropEvents
704  {
705  _events.pop_front();
706  }
707  }
708  _events.push_back(*p);
709  }
710 
711  if (_observer)
712  {
713  _observer->queued(static_cast<Ice::Int>(events.size()));
714  }
715  flush();
716  break;
717  }
719  return false;
720 
721  case SubscriberStateReaped:
722  break;
723  }
724 
725  return true;
726 }
727 
728 bool
729 Subscriber::reap()
730 {
731  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
732  assert(_state >= SubscriberStateError);
733  if (_state == SubscriberStateError)
734  {
735  setState(SubscriberStateReaped);
736  return true;
737  }
738  return false;
739 }
740 
741 void
742 Subscriber::resetIfReaped()
743 {
744  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
745  if (_state == SubscriberStateReaped)
746  {
747  setState(SubscriberStateError);
748  }
749 }
750 
751 bool
752 Subscriber::errored() const
753 {
754  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
755  return _state >= SubscriberStateError;
756 }
757 
758 void
759 Subscriber::destroy()
760 {
761  //
762  // Clear the per-subscriber object if it exists.
763  //
764  if (_proxy)
765  {
766  try
767  {
768  _instance->publishAdapter()->remove(_proxy->ice_getIdentity());
769  }
770  catch (const Ice::NotRegisteredException&)
771  {
772  // Ignore
773  }
774  catch (const Ice::ObjectAdapterDeactivatedException&)
775  {
776  // Ignore
777  }
778  }
779 
780  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
781  _observer.detach();
782 }
783 
784 void
785 Subscriber::error(bool dec, const Ice::Exception& e)
786 {
787  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
788 
789  if (dec)
790  {
791  // Decrement the _outstanding count.
792  --_outstanding;
793  assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
794  }
795 
796  //
797  // It's possible to be already in the error state if the queue maximum size
798  // has been reached or if an ObjectNotExistException occured before.
799  //
800  if (_state >= SubscriberStateError)
801  {
802  if (_shutdown)
803  {
804  _lock.notify();
805  }
806  return;
807  }
808 
809  // A hard error is an ObjectNotExistException or
810  // NotRegisteredException.
811  bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) ||
812  dynamic_cast<const Ice::NotRegisteredException*>(&e) ||
813  dynamic_cast<const IceStorm::SendQueueSizeMaxReached*>(&e);
814 
815  //
816  // A twoway subscriber can queue multiple send events and
817  // therefore its possible to get multiple error'd replies. Ignore
818  // replies if we're retrying and its not yet time to process the
819  // next request.
820  //
821  IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
822  if (!hardError && _state == SubscriberStateOffline && now < _next)
823  {
824  return;
825  }
826 
827  //
828  // If we're in our retry limits and the error isn't a hard failure
829  // (that is ObjectNotExistException or NotRegisteredException)
830  // then we transition to an offline state.
831  //
832  if (!hardError && (_retryCount == -1 || _currentRetry < _retryCount))
833  {
834  assert(_state < SubscriberStateError);
835 
836  TraceLevelsPtr traceLevels = _instance->traceLevels();
837  if (_currentRetry == 0)
838  {
839  Ice::Warning warn(traceLevels->logger);
840  warn << traceLevels->subscriberCat << ":"
841  << _instance->communicator()->identityToString(_rec.id);
842  if (traceLevels->subscriber > 1)
843  {
844  warn << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
845  }
846  warn << " subscriber offline: " << e
847  << " discarding events: " << _instance->discardInterval()
848  << "s retryCount: " << _retryCount;
849  }
850  else
851  {
852  if (traceLevels->subscriber > 0)
853  {
854  Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
855  out << _instance->communicator()->identityToString(_rec.id);
856  if (traceLevels->subscriber > 1)
857  {
858  out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
859  }
860  out << " subscriber offline: " << e
861  << " discarding events: " << _instance->discardInterval()
862  << "s retry: " << _currentRetry << "/" << _retryCount;
863  }
864  }
865 
866  // Transition to offline state, increment the retry count and
867  // clear all queued events.
868  _next = now + _instance->discardInterval();
869  ++_currentRetry;
870  _events.clear();
871  setState(SubscriberStateOffline);
872  }
873  // Errored out.
874  else if (_state < SubscriberStateError)
875  {
876  _events.clear();
877  setState(SubscriberStateError);
878 
879  TraceLevelsPtr traceLevels = _instance->traceLevels();
880  if (traceLevels->subscriber > 0)
881  {
882  Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
883  out << _instance->communicator()->identityToString(_rec.id);
884  if (traceLevels->subscriber > 1)
885  {
886  out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
887  }
888  out << " subscriber errored out: " << e << " retry: " << _currentRetry << "/"
889  << _retryCount;
890  }
891  }
892 
893  if (_shutdown && _events.empty())
894  {
895  _lock.notify();
896  }
897 }
898 
899 void
900 Subscriber::completed(const Ice::AsyncResultPtr& result)
901 {
902  try
903  {
904  result->throwLocalException();
905 
906  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
907 
908  // Decrement the _outstanding count.
909  --_outstanding;
910  assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
911  if (_observer)
912  {
913  _observer->delivered(_outstandingCount);
914  }
915 
916  //
917  // A successful response means we're no longer retrying, we're
918  // back active.
919  //
920  _currentRetry = 0;
921 
922  if (_events.empty() && _outstanding == 0 && _shutdown)
923  {
924  _lock.notify();
925  }
926  else
927  {
928  flush();
929  }
930  }
931  catch (const Ice::LocalException& ex)
932  {
933  error(true, ex);
934  }
935 }
936 
937 void
938 Subscriber::shutdown()
939 {
940  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
941 
942  _shutdown = true;
943  while (_outstanding > 0 && !_events.empty())
944  {
945  _lock.wait();
946  }
947 
948  _observer.detach();
949 }
950 
951 void
952 Subscriber::updateObserver()
953 {
954  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
955  if (_instance->observer())
956  {
957  _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
958  _rec.topicName,
959  _rec.obj,
960  _rec.theQoS,
961  _rec.theTopic,
962  toSubscriberState(_state),
963  _observer.get()));
964  }
965 }
966 
967 Subscriber::Subscriber(const InstancePtr& instance,
968  const SubscriberRecord& rec,
969  const Ice::ObjectPrx& proxy,
970  int retryCount,
971  int maxOutstanding) :
972  _instance(instance),
973  _rec(rec),
974  _retryCount(retryCount),
975  _maxOutstanding(maxOutstanding),
976  _proxy(proxy),
977  _proxyReplica(proxy),
978  _shutdown(false),
979  _state(SubscriberStateOnline),
980  _outstanding(0),
981  _outstandingCount(1),
982  _currentRetry(0)
983 {
984  if (_proxy && _instance->publisherReplicaProxy())
985  {
986  const_cast<Ice::ObjectPrx&>(_proxyReplica) =
987  _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity());
988  }
989 
990  if (_instance->observer())
991  {
992  _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
993  rec.topicName,
994  rec.obj,
995  rec.theQoS,
996  rec.theTopic,
997  toSubscriberState(_state),
998  0));
999  }
1000 }
1001 
1002 namespace
1003 {
1004 
1005  string
1006  stateToString(Subscriber::SubscriberState state)
1007  {
1008  switch (state)
1009  {
1011  return "online";
1013  return "offline";
1015  return "error";
1017  return "reaped";
1018  default:
1019  return "???";
1020  }
1021  }
1022 
1023 } // namespace
1024 
1025 void
1027 {
1028  if (state != _state)
1029  {
1030  TraceLevelsPtr traceLevels = _instance->traceLevels();
1031  if (traceLevels->subscriber > 1)
1032  {
1033  Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
1034  out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj)
1035  << " transition from: " << stateToString(_state) << " to: " << stateToString(state);
1036  }
1037  _state = state;
1038 
1039  if (_instance->observer())
1040  {
1041  _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
1042  _rec.topicName,
1043  _rec.obj,
1044  _rec.theQoS,
1045  _rec.theTopic,
1046  toSubscriberState(_state),
1047  _observer.get()));
1048  }
1049  }
1050 }
1051 
1052 bool
1053 IceStorm::operator==(const SubscriberPtr& subscriber, const Ice::Identity& id)
1054 {
1055  return subscriber->id() == id;
1056 }
1057 
1058 bool
1060 {
1061  return &s1 == &s2;
1062 }
1063 
1064 bool
1066 {
1067  return &s1 != &s2;
1068 }
1069 
1070 bool
1072 {
1073  return &s1 < &s2;
1074 }
IceStorm::Subscriber::SubscriberStateOnline
@ SubscriberStateOnline
Definition: Subscriber.h:55
IceStorm
Definition: DBTypes.ice:22
IceStorm::Subscriber
Definition: Subscriber.h:28
IceStorm::Subscriber::_state
SubscriberState _state
Definition: Subscriber.h:84
IceStorm::EventDataSeq
std::deque<::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
Definition: IceStormInternal.h:528
IceStorm::Subscriber::setState
void setState(SubscriberState)
Definition: Subscriber.cpp:1026
IceStorm::operator!=
bool operator!=(const IceStorm::Subscriber &, const IceStorm::Subscriber &)
Definition: Subscriber.cpp:1065
IceStorm::operator==
bool operator==(const TopicLink &lhs, const TopicLink &rhs)
Definition: IceStormInternal.h:867
IceStorm::Subscriber::SubscriberStateReaped
@ SubscriberStateReaped
Definition: Subscriber.h:58
IceStorm::Instrumentation::SubscriberState
SubscriberState
Definition: Instrumentation.h:204
Util.h
IceStorm::Subscriber::_proxyReplica
const Ice::ObjectPrx _proxyReplica
Definition: Subscriber.h:78
IceStorm::SubscriberRecord::theQoS
::IceStorm::QoS theQoS
Definition: SubscriberRecord.h:239
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:233
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:570
IceStorm::Subscriber::SubscriberStateError
@ SubscriberStateError
Definition: Subscriber.h:57
IceStormElection
Definition: DBTypes.ice:17
armarx::flush
const LogSender::manipulator flush
Definition: LogSender.h:251
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
IceStorm::Subscriber::SubscriberState
SubscriberState
Definition: Subscriber.h:53
IceStorm::Instrumentation::SubscriberStateOffline
@ SubscriberStateOffline
Offline, retrying.
Definition: Instrumentation.h:207
IceStorm::EventData
The event data.
Definition: IceStormInternal.h:494
IceStorm::SubscriberRecord::obj
::Ice::ObjectPrx obj
Definition: SubscriberRecord.h:238
Subscriber.h
q
#define q
IceStormInternal::describeEndpoints
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition: Util.cpp:51
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:478
IceStorm::SendQueueSizeMaxReached
Definition: IceStormInternal.h:517
IceStorm::Subscriber::_proxy
const Ice::ObjectPrx _proxy
Definition: Subscriber.h:77
IceStorm::operator<
bool operator<(const TopicLink &lhs, const TopicLink &rhs)
Definition: IceStormInternal.h:873
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
IceStorm::SubscriberRecord::theTopic
::IceStorm::TopicPrx theTopic
Definition: SubscriberRecord.h:241
TraceLevels.h
std
Definition: Application.h:66
IceStormElection::CachedReadHelper
Definition: NodeI.h:134
IceUtil::Handle
Definition: forward_declarations.h:30
IceStorm::Subscriber::_rec
const IceStorm::SubscriberRecord _rec
Definition: Subscriber.h:74
IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicLink >
armarx::VariantType::Int
const VariantTypeId Int
Definition: Variant.h:917
IceStorm::Subscriber::_instance
const InstancePtr _instance
Definition: Subscriber.h:73
IceStorm::SubscriberRecord::link
bool link
Definition: SubscriberRecord.h:237
NodeI.h
Instance.h
IceStorm::Subscriber::_observer
IceInternal::ObserverHelperT< IceStorm::Instrumentation::SubscriberObserver > _observer
Definition: Subscriber.h:94
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
IceStorm::Subscriber::SubscriberStateOffline
@ SubscriberStateOffline
Definition: Subscriber.h:56
IceStorm::Instrumentation::SubscriberStateError
@ SubscriberStateError
Error state, awaiting to be destroyed.
Definition: Instrumentation.h:208
IceStorm::SubscriberRecord::topicName
::std::string topicName
Definition: SubscriberRecord.h:235
IceStorm::Instrumentation::SubscriberStateOnline
@ SubscriberStateOnline
Online waiting to send events.
Definition: Instrumentation.h:206