Subscriber.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <IceStorm/Subscriber.h>
11 #include <IceStorm/Instance.h>
12 #include <IceStorm/TraceLevels.h>
13 #include <IceStorm/NodeI.h>
14 #include <IceStorm/Util.h>
15 #include <Ice/LoggerUtil.h>
16 #include <IceUtil/StringUtil.h>
17 #include <iterator>
18 
19 using namespace std;
20 using namespace IceStorm;
21 using namespace IceStormElection;
22 
23 //
24 // Per Subscriber object.
25 //
26 namespace
27 {
28 
29  class PerSubscriberPublisherI : public Ice::BlobjectArray
30  {
31  public:
32 
33  PerSubscriberPublisherI(const InstancePtr& instance) :
34  _instance(instance)
35  {
36  }
37 
38  void
39  setSubscriber(const SubscriberPtr& subscriber)
40  {
41  _subscriber = subscriber;
42  }
43 
44  virtual bool
45  ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
46  vector<Ice::Byte>&,
47  const Ice::Current& current)
48  {
49  // Use cached reads.
50  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
51 
52  EventDataPtr event = new EventData(
53  current.operation,
54  current.mode,
55  Ice::ByteSeq(),
56  current.ctx);
57 
58  //
59  // COMPILERBUG: gcc 4.0.1 doesn't like this.
60  //
61  //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
62  Ice::ByteSeq data(inParams.first, inParams.second);
63  event->data.swap(data);
64 
65  EventDataSeq e;
66  e.push_back(event);
67  _subscriber->queue(false, e);
68  return true;
69  }
70 
71  private:
72 
73  const InstancePtr _instance;
74  /*const*/ SubscriberPtr _subscriber;
75  };
76  typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr;
77 
79  toSubscriberState(Subscriber::SubscriberState s)
80  {
81  switch (s)
82  {
88  case Subscriber::SubscriberStateReaped:
90  default:
91  assert(false);
93  }
94  }
95 
96 }
97 
98 // Each of the various Subscriber types.
99 namespace
100 {
101 
102  class SubscriberBatch : public Subscriber
103  {
104  public:
105 
106  SubscriberBatch(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&);
107 
108  virtual void flush();
109 
110  void exception(const Ice::Exception& ex)
111  {
112  error(false, ex);
113  }
114 
115  void doFlush();
116  void sent(bool);
117 
118  private:
119 
120  const Ice::ObjectPrx _obj;
121  const IceUtil::Time _interval;
122  };
123  typedef IceUtil::Handle<SubscriberBatch> SubscriberBatchPtr;
124 
125  class SubscriberOneway : public Subscriber
126  {
127  public:
128 
129  SubscriberOneway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&);
130 
131  virtual void flush();
132 
133  void exception(const Ice::Exception& ex)
134  {
135  error(true, ex);
136  }
137  void sent(bool);
138 
139  private:
140 
141  const Ice::ObjectPrx _obj;
142  };
143  typedef IceUtil::Handle<SubscriberOneway> SubscriberOnewayPtr;
144 
145  class SubscriberTwoway : public Subscriber
146  {
147  public:
148 
149  SubscriberTwoway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, int,
150  const Ice::ObjectPrx&);
151 
152  virtual void flush();
153 
154  private:
155 
156  const Ice::ObjectPrx _obj;
157  };
158 
159  class SubscriberLink : public Subscriber
160  {
161  public:
162 
163  SubscriberLink(const InstancePtr&, const SubscriberRecord&);
164 
165  virtual void flush();
166 
167  private:
168 
169  const TopicLinkPrx _obj;
170  };
171 
172  class FlushTimerTask : public IceUtil::TimerTask
173  {
174  public:
175 
176  FlushTimerTask(const SubscriberBatchPtr& subscriber) :
177  _subscriber(subscriber)
178  {
179  }
180 
181  virtual void
182  runTimerTask()
183  {
184  _subscriber->doFlush();
185  }
186 
187  private:
188 
189  const SubscriberBatchPtr _subscriber;
190  };
191 
192 }
193 
194 SubscriberBatch::SubscriberBatch(
195  const InstancePtr& instance,
196  const SubscriberRecord& rec,
197  const Ice::ObjectPrx& proxy,
198  int retryCount,
199  const Ice::ObjectPrx& obj) :
200  Subscriber(instance, rec, proxy, retryCount, 1),
201  _obj(obj),
202  _interval(instance->flushInterval())
203 {
204  assert(retryCount == 0);
205 }
206 
207 void
209 {
210  if (_outstanding == 0)
211  {
212  ++_outstanding;
213  _instance->batchFlusher()->schedule(new FlushTimerTask(this), _interval);
214  }
215 }
216 
217 void
218 SubscriberBatch::doFlush()
219 {
220  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
221 
222  //
223  // If the subscriber isn't online we're done.
224  //
225  if (_state != SubscriberStateOnline)
226  {
227  return;
228  }
229 
230  EventDataSeq v;
231  v.swap(_events);
232  assert(!v.empty());
233 
234  if (_observer)
235  {
236  _outstandingCount = static_cast<Ice::Int>(v.size());
237  _observer->outstanding(_outstandingCount);
238  }
239 
240  try
241  {
242  vector<Ice::Byte> dummy;
243  for (EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
244  {
245  _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
246  }
247 
248  Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests(
249  Ice::newCallback_Object_ice_flushBatchRequests(this,
250  &SubscriberBatch::exception,
251  &SubscriberBatch::sent));
252  if (result->sentSynchronously())
253  {
254  --_outstanding;
255  assert(_outstanding == 0);
256  if (_observer)
257  {
258  _observer->delivered(_outstandingCount);
259  }
260  }
261  }
262  catch (const Ice::Exception& ex)
263  {
264  error(false, ex);
265  return;
266  }
267 
268  if (_events.empty() && _outstanding == 0 && _shutdown)
269  {
270  _lock.notify();
271  }
272 
273  // This is significantly faster than the async version, but it can
274  // block the calling thread. Bad news!
275 
276  //_obj->ice_flushBatchRequests();
277 }
278 
279 void
280 SubscriberBatch::sent(bool sentSynchronously)
281 {
282  if (sentSynchronously)
283  {
284  return;
285  }
286 
287  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
288 
289  // Decrement the _outstanding count.
290  --_outstanding;
291  assert(_outstanding == 0);
292  if (_observer)
293  {
294  _observer->delivered(_outstandingCount);
295  }
296 
297  if (_events.empty() && _outstanding == 0 && _shutdown)
298  {
299  _lock.notify();
300  }
301  else if (!_events.empty())
302  {
303  flush();
304  }
305 
306 }
307 
308 SubscriberOneway::SubscriberOneway(
309  const InstancePtr& instance,
310  const SubscriberRecord& rec,
311  const Ice::ObjectPrx& proxy,
312  int retryCount,
313  const Ice::ObjectPrx& obj) :
314  Subscriber(instance, rec, proxy, retryCount, 5),
315  _obj(obj)
316 {
317  assert(retryCount == 0);
318 }
319 
320 void
322 {
323  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
324 
325  //
326  // If the subscriber isn't online we're done.
327  //
328  if (_state != SubscriberStateOnline || _events.empty())
329  {
330  return;
331  }
332 
333  // Send up to _maxOutstanding pending events.
334  while (_outstanding < _maxOutstanding && !_events.empty())
335  {
336  //
337  // Dequeue the head event, count one more outstanding AMI
338  // request.
339  //
340  EventDataPtr e = _events.front();
341  _events.erase(_events.begin());
342  if (_observer)
343  {
344  _observer->outstanding(1);
345  }
346 
347  try
348  {
349  Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
350  e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this,
351  &SubscriberOneway::exception,
352  &SubscriberOneway::sent));
353  if (!result->sentSynchronously())
354  {
355  ++_outstanding;
356  }
357  else if (_observer)
358  {
359  _observer->delivered(1);
360  }
361  }
362  catch (const Ice::Exception& ex)
363  {
364  error(true, ex);
365  return;
366  }
367  }
368 
369  if (_events.empty() && _outstanding == 0 && _shutdown)
370  {
371  _lock.notify();
372  }
373 }
374 
375 void
376 SubscriberOneway::sent(bool sentSynchronously)
377 {
378  if (sentSynchronously)
379  {
380  return;
381  }
382 
383  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
384 
385  // Decrement the _outstanding count.
386  --_outstanding;
387  assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
388  if (_observer)
389  {
390  _observer->delivered(1);
391  }
392 
393  if (_events.empty() && _outstanding == 0 && _shutdown)
394  {
395  _lock.notify();
396  }
397  else if (_outstanding <= 0 && !_events.empty())
398  {
399  flush();
400  }
401 }
402 
403 SubscriberTwoway::SubscriberTwoway(
404  const InstancePtr& instance,
405  const SubscriberRecord& rec,
406  const Ice::ObjectPrx& proxy,
407  int retryCount,
408  int maxOutstanding,
409  const Ice::ObjectPrx& obj) :
410  Subscriber(instance, rec, proxy, retryCount, maxOutstanding),
411  _obj(obj)
412 {
413 }
414 
415 void
417 {
418  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
419 
420  //
421  // If the subscriber isn't online we're done.
422  //
423  if (_state != SubscriberStateOnline || _events.empty())
424  {
425  return;
426  }
427 
428  // Send up to _maxOutstanding pending events.
429  while (_outstanding < _maxOutstanding && !_events.empty())
430  {
431  //
432  // Dequeue the head event, count one more outstanding AMI
433  // request.
434  //
435  EventDataPtr e = _events.front();
436  _events.erase(_events.begin());
437  ++_outstanding;
438  if (_observer)
439  {
440  _observer->outstanding(1);
441  }
442 
443  try
444  {
445  _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context,
446  Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
447  }
448  catch (const Ice::Exception& ex)
449  {
450  error(true, ex);
451  return;
452  }
453  }
454 }
455 
456 namespace
457 {
458 
459  SubscriberLink::SubscriberLink(
460  const InstancePtr& instance,
461  const SubscriberRecord& rec) :
462  Subscriber(instance, rec, 0, -1, 1),
463  _obj(TopicLinkPrx::uncheckedCast(rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout())))
464  {
465  }
466 
467  void
469  {
470  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
471 
472  if (_state != SubscriberStateOnline || _outstanding > 0)
473  {
474  return;
475  }
476 
477  EventDataSeq v;
478  v.swap(_events);
479 
480  EventDataSeq::iterator p = v.begin();
481  while (p != v.end())
482  {
483  if (_rec.cost != 0)
484  {
485  int cost = 0;
486  Ice::Context::const_iterator q = (*p)->context.find("cost");
487  if (q != (*p)->context.end())
488  {
489  cost = atoi(q->second.c_str());
490  }
491  if (cost > _rec.cost)
492  {
493  p = v.erase(p);
494  continue;
495  }
496  }
497  ++p;
498  }
499 
500  if (!v.empty())
501  {
502  try
503  {
504  ++_outstanding;
505  if (_observer)
506  {
507  _outstandingCount = static_cast<Ice::Int>(v.size());
508  _observer->outstanding(_outstandingCount);
509  }
510  _obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
511  }
512  catch (const Ice::Exception& ex)
513  {
514  error(true, ex);
515  }
516  }
517  }
518 
519 }
520 
522 Subscriber::create(
523  const InstancePtr& instance,
524  const SubscriberRecord& rec)
525 {
526  if (rec.link)
527  {
528  return new SubscriberLink(instance, rec);
529  }
530  else
531  {
532  PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance);
533  Ice::Identity perId;
534  perId.category = instance->instanceName();
535  perId.name = "topic." + rec.topicName + ".publish." +
536  instance->communicator()->identityToString(rec.obj->ice_getIdentity());
537  Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId);
538  TraceLevelsPtr traceLevels = instance->traceLevels();
539  SubscriberPtr subscriber;
540 
541  try
542  {
543  int retryCount = 0;
544  QoS::const_iterator p = rec.theQoS.find("retryCount");
545  if (p != rec.theQoS.end())
546  {
547  retryCount = atoi(p->second.c_str());
548  }
549 
550  string reliability;
551  p = rec.theQoS.find("reliability");
552  if (p != rec.theQoS.end())
553  {
554  reliability = p->second;
555  }
556  if (!reliability.empty() && reliability != "ordered")
557  {
558  throw BadQoS("invalid reliability: " + reliability);
559  }
560 
561  //
562  // Override the timeout.
563  //
564  Ice::ObjectPrx newObj;
565  try
566  {
567  newObj = rec.obj->ice_timeout(instance->sendTimeout());
568  }
569  catch (const Ice::FixedProxyException&)
570  {
571  //
572  // In the event IceStorm is collocated this could be a
573  // fixed proxy in which case its not possible to set the
574  // timeout.
575  //
576  newObj = rec.obj;
577  }
578 
579  p = rec.theQoS.find("locatorCacheTimeout");
580  if (p != rec.theQoS.end())
581  {
582  istringstream is(IceUtilInternal::trim(p->second));
583  int locatorCacheTimeout;
584  if (!(is >> locatorCacheTimeout) || !is.eof())
585  {
586  throw BadQoS("invalid locator cache timeout (numeric value required): " + p->second);
587  }
588  newObj = newObj->ice_locatorCacheTimeout(locatorCacheTimeout);
589  }
590 
591  p = rec.theQoS.find("connectionCached");
592  if (p != rec.theQoS.end())
593  {
594  istringstream is(IceUtilInternal::trim(p->second));
595  int connectionCached;
596  if (!(is >> connectionCached) || !is.eof())
597  {
598  throw BadQoS("invalid connection cached setting (numeric value required): " + p->second);
599  }
600  newObj = newObj->ice_connectionCached(connectionCached > 0);
601  }
602 
603  if (reliability == "ordered")
604  {
605  if (!newObj->ice_isTwoway())
606  {
607  throw BadQoS("ordered reliability requires a twoway proxy");
608  }
609  subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj);
610  }
611  else if (newObj->ice_isOneway() || newObj->ice_isDatagram())
612  {
613  if (retryCount > 0)
614  {
615  throw BadQoS("non-zero retryCount QoS requires a twoway proxy");
616  }
617  subscriber = new SubscriberOneway(instance, rec, proxy, retryCount, newObj);
618  }
619  else if (newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
620  {
621  if (retryCount > 0)
622  {
623  throw BadQoS("non-zero retryCount QoS requires a twoway proxy");
624  }
625  subscriber = new SubscriberBatch(instance, rec, proxy, retryCount, newObj);
626  }
627  else //if(newObj->ice_isTwoway())
628  {
629  assert(newObj->ice_isTwoway());
630  subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj);
631  }
632  per->setSubscriber(subscriber);
633  }
634  catch (const Ice::Exception&)
635  {
636  instance->publishAdapter()->remove(proxy->ice_getIdentity());
637  throw;
638  }
639 
640  return subscriber;
641  }
642 }
643 
644 Ice::ObjectPrx
645 Subscriber::proxy() const
646 {
647  return _proxyReplica;
648 }
649 
651 Subscriber::id() const
652 {
653  return _rec.id;
654 }
655 
657 Subscriber::record() const
658 {
659  return _rec;
660 }
661 
662 bool
663 Subscriber::queue(bool forwarded, const EventDataSeq& events)
664 {
665  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
666 
667  // If this is a link subscriber if the set of events were
668  // forwarded from another IceStorm instance then do not queue the
669  // events.
670  if (forwarded && _rec.link)
671  {
672  return true;
673  }
674 
675  switch (_state)
676  {
678  {
679  if (IceUtil::Time::now(IceUtil::Time::Monotonic) < _next)
680  {
681  break;
682  }
683 
684  //
685  // State transition to online.
686  //
687  setState(SubscriberStateOnline);
688  // fall through
689  [[fallthrough]];
690  }
691 
693  {
694  for (EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
695  {
696  if (static_cast<int>(_events.size()) == _instance->sendQueueSizeMax())
697  {
698  if (_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber)
699  {
700  error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__));
701  return false;
702  }
703  else // DropEvents
704  {
705  _events.pop_front();
706  }
707  }
708  _events.push_back(*p);
709  }
710 
711  if (_observer)
712  {
713  _observer->queued(static_cast<Ice::Int>(events.size()));
714  }
715  flush();
716  break;
717  }
719  return false;
720 
721  case SubscriberStateReaped:
722  break;
723  }
724 
725  return true;
726 }
727 
728 bool
729 Subscriber::reap()
730 {
731  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
732  assert(_state >= SubscriberStateError);
733  if (_state == SubscriberStateError)
734  {
735  setState(SubscriberStateReaped);
736  return true;
737  }
738  return false;
739 }
740 
741 void
742 Subscriber::resetIfReaped()
743 {
744  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
745  if (_state == SubscriberStateReaped)
746  {
747  setState(SubscriberStateError);
748  }
749 }
750 
751 bool
752 Subscriber::errored() const
753 {
754  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
755  return _state >= SubscriberStateError;
756 }
757 
758 void
759 Subscriber::destroy()
760 {
761  //
762  // Clear the per-subscriber object if it exists.
763  //
764  if (_proxy)
765  {
766  try
767  {
768  _instance->publishAdapter()->remove(_proxy->ice_getIdentity());
769  }
770  catch (const Ice::NotRegisteredException&)
771  {
772  // Ignore
773  }
774  catch (const Ice::ObjectAdapterDeactivatedException&)
775  {
776  // Ignore
777  }
778  }
779 
780  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
781  _observer.detach();
782 }
783 
784 void
785 Subscriber::error(bool dec, const Ice::Exception& e)
786 {
787  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
788 
789  if (dec)
790  {
791  // Decrement the _outstanding count.
792  --_outstanding;
793  assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
794  }
795 
796  //
797  // It's possible to be already in the error state if the queue maximum size
798  // has been reached or if an ObjectNotExistException occured before.
799  //
800  if (_state >= SubscriberStateError)
801  {
802  if (_shutdown)
803  {
804  _lock.notify();
805  }
806  return;
807  }
808 
809  // A hard error is an ObjectNotExistException or
810  // NotRegisteredException.
811  bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) ||
812  dynamic_cast<const Ice::NotRegisteredException*>(&e) ||
813  dynamic_cast<const IceStorm::SendQueueSizeMaxReached*>(&e);
814 
815  //
816  // A twoway subscriber can queue multiple send events and
817  // therefore its possible to get multiple error'd replies. Ignore
818  // replies if we're retrying and its not yet time to process the
819  // next request.
820  //
821  IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
822  if (!hardError && _state == SubscriberStateOffline && now < _next)
823  {
824  return;
825  }
826 
827  //
828  // If we're in our retry limits and the error isn't a hard failure
829  // (that is ObjectNotExistException or NotRegisteredException)
830  // then we transition to an offline state.
831  //
832  if (!hardError && (_retryCount == -1 || _currentRetry < _retryCount))
833  {
834  assert(_state < SubscriberStateError);
835 
836  TraceLevelsPtr traceLevels = _instance->traceLevels();
837  if (_currentRetry == 0)
838  {
839  Ice::Warning warn(traceLevels->logger);
840  warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_rec.id);
841  if (traceLevels->subscriber > 1)
842  {
843  warn << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
844  }
845  warn << " subscriber offline: " << e
846  << " discarding events: " << _instance->discardInterval() << "s retryCount: " << _retryCount;
847  }
848  else
849  {
850  if (traceLevels->subscriber > 0)
851  {
852  Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
853  out << _instance->communicator()->identityToString(_rec.id);
854  if (traceLevels->subscriber > 1)
855  {
856  out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
857  }
858  out << " subscriber offline: " << e
859  << " discarding events: " << _instance->discardInterval() << "s retry: "
860  << _currentRetry << "/" << _retryCount;
861  }
862  }
863 
864  // Transition to offline state, increment the retry count and
865  // clear all queued events.
866  _next = now + _instance->discardInterval();
867  ++_currentRetry;
868  _events.clear();
869  setState(SubscriberStateOffline);
870  }
871  // Errored out.
872  else if (_state < SubscriberStateError)
873  {
874  _events.clear();
875  setState(SubscriberStateError);
876 
877  TraceLevelsPtr traceLevels = _instance->traceLevels();
878  if (traceLevels->subscriber > 0)
879  {
880  Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
881  out << _instance->communicator()->identityToString(_rec.id);
882  if (traceLevels->subscriber > 1)
883  {
884  out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
885  }
886  out << " subscriber errored out: " << e
887  << " retry: " << _currentRetry << "/" << _retryCount;
888  }
889  }
890 
891  if (_shutdown && _events.empty())
892  {
893  _lock.notify();
894  }
895 }
896 
897 void
898 Subscriber::completed(const Ice::AsyncResultPtr& result)
899 {
900  try
901  {
902  result->throwLocalException();
903 
904  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
905 
906  // Decrement the _outstanding count.
907  --_outstanding;
908  assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
909  if (_observer)
910  {
911  _observer->delivered(_outstandingCount);
912  }
913 
914  //
915  // A successful response means we're no longer retrying, we're
916  // back active.
917  //
918  _currentRetry = 0;
919 
920  if (_events.empty() && _outstanding == 0 && _shutdown)
921  {
922  _lock.notify();
923  }
924  else
925  {
926  flush();
927  }
928  }
929  catch (const Ice::LocalException& ex)
930  {
931  error(true, ex);
932  }
933 }
934 
935 void
936 Subscriber::shutdown()
937 {
938  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
939 
940  _shutdown = true;
941  while (_outstanding > 0 && !_events.empty())
942  {
943  _lock.wait();
944  }
945 
946  _observer.detach();
947 }
948 
949 void
950 Subscriber::updateObserver()
951 {
952  IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
953  if (_instance->observer())
954  {
955  _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
956  _rec.topicName,
957  _rec.obj,
958  _rec.theQoS,
959  _rec.theTopic,
960  toSubscriberState(_state),
961  _observer.get()));
962  }
963 }
964 
965 Subscriber::Subscriber(
966  const InstancePtr& instance,
967  const SubscriberRecord& rec,
968  const Ice::ObjectPrx& proxy,
969  int retryCount,
970  int maxOutstanding) :
971  _instance(instance),
972  _rec(rec),
973  _retryCount(retryCount),
974  _maxOutstanding(maxOutstanding),
975  _proxy(proxy),
976  _proxyReplica(proxy),
977  _shutdown(false),
978  _state(SubscriberStateOnline),
979  _outstanding(0),
980  _outstandingCount(1),
981  _currentRetry(0)
982 {
983  if (_proxy && _instance->publisherReplicaProxy())
984  {
985  const_cast<Ice::ObjectPrx&>(_proxyReplica) =
986  _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity());
987  }
988 
989  if (_instance->observer())
990  {
991  _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
992  rec.topicName,
993  rec.obj,
994  rec.theQoS,
995  rec.theTopic,
996  toSubscriberState(_state),
997  0));
998  }
999 }
1000 
1001 namespace
1002 {
1003 
1004  string
1005  stateToString(Subscriber::SubscriberState state)
1006  {
1007  switch (state)
1008  {
1010  return "online";
1012  return "offline";
1014  return "error";
1016  return "reaped";
1017  default:
1018  return "???";
1019  }
1020  }
1021 
1022 }
1023 
1024 void
1026 {
1027  if (state != _state)
1028  {
1029  TraceLevelsPtr traceLevels = _instance->traceLevels();
1030  if (traceLevels->subscriber > 1)
1031  {
1032  Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
1033  out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj)
1034  << " transition from: " << stateToString(_state) << " to: " << stateToString(state);
1035  }
1036  _state = state;
1037 
1038  if (_instance->observer())
1039  {
1040  _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
1041  _rec.topicName,
1042  _rec.obj,
1043  _rec.theQoS,
1044  _rec.theTopic,
1045  toSubscriberState(_state),
1046  _observer.get()));
1047  }
1048  }
1049 }
1050 
1051 bool
1052 IceStorm::operator==(const SubscriberPtr& subscriber, const Ice::Identity& id)
1053 {
1054  return subscriber->id() == id;
1055 }
1056 
1057 bool
1059 {
1060  return &s1 == &s2;
1061 }
1062 
1063 bool
1065 {
1066  return &s1 != &s2;
1067 }
1068 
1069 bool
1071 {
1072  return &s1 < &s2;
1073 }
IceStorm::Subscriber::SubscriberStateOnline
@ SubscriberStateOnline
Definition: Subscriber.h:56
IceStorm
Definition: DBTypes.ice:22
IceStorm::Subscriber
Definition: Subscriber.h:28
IceStorm::Subscriber::_state
SubscriberState _state
Definition: Subscriber.h:82
IceStorm::EventDataSeq
std::deque< ::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
Definition: IceStormInternal.h:463
IceStorm::Subscriber::setState
void setState(SubscriberState)
Definition: Subscriber.cpp:1025
IceStorm::operator!=
bool operator!=(const IceStorm::Subscriber &, const IceStorm::Subscriber &)
Definition: Subscriber.cpp:1064
IceStorm::operator==
bool operator==(const TopicLink &lhs, const TopicLink &rhs)
Definition: IceStormInternal.h:724
IceStorm::Subscriber::SubscriberStateReaped
@ SubscriberStateReaped
Definition: Subscriber.h:59
IceStorm::Instrumentation::SubscriberState
SubscriberState
Definition: Instrumentation.h:203
Util.h
IceStorm::Subscriber::_proxyReplica
const Ice::ObjectPrx _proxyReplica
Definition: Subscriber.h:76
IceStorm::SubscriberRecord::theQoS
::IceStorm::QoS theQoS
Definition: SubscriberRecord.h:220
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:214
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:523
IceStorm::Subscriber::SubscriberStateError
@ SubscriberStateError
Definition: Subscriber.h:58
IceStormElection
Definition: DBTypes.ice:17
armarx::flush
const LogSender::manipulator flush
Definition: LogSender.h:251
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
IceStorm::Subscriber::SubscriberState
SubscriberState
Definition: Subscriber.h:54
IceStorm::Instrumentation::SubscriberStateOffline
@ SubscriberStateOffline
Offline, retrying.
Definition: Instrumentation.h:206
IceStorm::EventData
The event data.
Definition: IceStormInternal.h:429
IceStorm::SubscriberRecord::obj
::Ice::ObjectPrx obj
Definition: SubscriberRecord.h:219
Subscriber.h
q
#define q
IceStormInternal::describeEndpoints
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition: Util.cpp:51
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:438
IceStorm::SendQueueSizeMaxReached
Definition: IceStormInternal.h:451
IceStorm::Subscriber::_proxy
const Ice::ObjectPrx _proxy
Definition: Subscriber.h:75
IceStorm::operator<
bool operator<(const TopicLink &lhs, const TopicLink &rhs)
Definition: IceStormInternal.h:729
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
IceStorm::SubscriberRecord::theTopic
::IceStorm::TopicPrx theTopic
Definition: SubscriberRecord.h:222
TraceLevels.h
std
Definition: Application.h:66
IceStormElection::CachedReadHelper
Definition: NodeI.h:127
IceUtil::Handle
Definition: forward_declarations.h:29
IceStorm::Subscriber::_rec
const IceStorm::SubscriberRecord _rec
Definition: Subscriber.h:72
IceInternal::ProxyHandle< ::IceProxy::IceStorm::TopicLink >
armarx::VariantType::Int
const VariantTypeId Int
Definition: Variant.h:916
IceStorm::Subscriber::_instance
const InstancePtr _instance
Definition: Subscriber.h:71
IceStorm::SubscriberRecord::link
bool link
Definition: SubscriberRecord.h:218
NodeI.h
Instance.h
IceStorm::Subscriber::_observer
IceInternal::ObserverHelperT< IceStorm::Instrumentation::SubscriberObserver > _observer
Definition: Subscriber.h:92
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
IceStorm::Subscriber::SubscriberStateOffline
@ SubscriberStateOffline
Definition: Subscriber.h:57
IceStorm::Instrumentation::SubscriberStateError
@ SubscriberStateError
Error state, awaiting to be destroyed.
Definition: Instrumentation.h:207
IceStorm::SubscriberRecord::topicName
::std::string topicName
Definition: SubscriberRecord.h:216
IceStorm::Instrumentation::SubscriberStateOnline
@ SubscriberStateOnline
Online waiting to send events.
Definition: Instrumentation.h:205