TopicI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <IceStorm/TopicI.h>
11 #include <IceStorm/Instance.h>
12 #include <IceStorm/Subscriber.h>
13 #include <IceStorm/TraceLevels.h>
14 #include <IceStorm/NodeI.h>
15 #include <IceStorm/Observers.h>
16 #include <IceStorm/Util.h>
17 #include <Ice/LoggerUtil.h>
18 #include <algorithm>
19 
20 using namespace std;
21 using namespace IceStorm;
22 using namespace IceStormElection;
23 using namespace IceStormInternal;
24 
25 namespace
26 {
27 
28  void
29  logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
30  {
31  Ice::Error error(com->getLogger());
32  error << "LMDB error: " << ex;
33  }
34 
35  //
36  // The servant has a 1-1 association with a topic. It is used to
37  // receive events from Publishers.
38  //
39  class PublisherI : public Ice::BlobjectArray
40  {
41  public:
42 
43  PublisherI(const TopicImplPtr& topic, const PersistentInstancePtr& instance) :
44  _topic(topic), _instance(instance)
45  {
46  }
47 
48  virtual bool
49  ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
50  Ice::ByteSeq&,
51  const Ice::Current& current)
52  {
53  // The publish call does a cached read.
54  EventDataPtr event = new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
55 
56  //
57  // COMPILERBUG: gcc 4.0.1 doesn't like this.
58  //
59  //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
60  Ice::ByteSeq data(inParams.first, inParams.second);
61  event->data.swap(data);
62 
64  v.push_back(event);
65  _topic->publish(false, v);
66 
67  return true;
68  }
69 
70  private:
71 
72  const TopicImplPtr _topic;
73  const PersistentInstancePtr _instance;
74  };
75 
76  //
77  // The servant has a 1-1 association with a topic. It is used to
78  // receive events from linked Topics.
79  //
80  class TopicLinkI : public TopicLink
81  {
82  public:
83 
84  TopicLinkI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
85  _impl(impl), _instance(instance)
86  {
87  }
88 
89  virtual void
90  forward(const EventDataSeq& v, const Ice::Current& /*current*/)
91  {
92  // The publish call does a cached read.
93  _impl->publish(true, v);
94  }
95 
96  private:
97 
98  const TopicImplPtr _impl;
99  const PersistentInstancePtr _instance;
100  };
101 
102  class TopicI : public TopicInternal
103  {
104  public:
105 
106  TopicI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
107  _impl(impl), _instance(instance)
108  {
109  }
110 
111  virtual string getName(const Ice::Current&) const
112  {
113  // Use cached reads.
114  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
115  return _impl->getName();
116  }
117 
118  virtual Ice::ObjectPrx getPublisher(const Ice::Current&) const
119  {
120  // Use cached reads.
121  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
122  return _impl->getPublisher();
123  }
124 
125  virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current&) const
126  {
127  // Use cached reads.
128  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
129  return _impl->getNonReplicatedPublisher();
130  }
131 
132  virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj,
133  const Ice::Current& current)
134  {
135  while (true)
136  {
137  Ice::Long generation = -1;
138  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
139  if (master)
140  {
141  try
142  {
143  return master->subscribeAndGetPublisher(qos, obj);
144  }
145  catch (const Ice::ConnectFailedException&)
146  {
147  _instance->node()->recovery(generation);
148  continue;
149  }
150  catch (const Ice::TimeoutException&)
151  {
152  _instance->node()->recovery(generation);
153  continue;
154  }
155  }
156  else
157  {
158  FinishUpdateHelper unlock(_instance->node());
159  return _impl->subscribeAndGetPublisher(qos, obj);
160  }
161  }
162  }
163 
164  virtual void unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current& current)
165  {
166  while (true)
167  {
168  Ice::Long generation = -1;
169  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
170  if (master)
171  {
172  try
173  {
174  master->unsubscribe(subscriber);
175  }
176  catch (const Ice::ConnectFailedException&)
177  {
178  _instance->node()->recovery(generation);
179  continue;
180  }
181  catch (const Ice::TimeoutException&)
182  {
183  _instance->node()->recovery(generation);
184  continue;
185  }
186  }
187  else
188  {
189  FinishUpdateHelper unlock(_instance->node());
190  _impl->unsubscribe(subscriber);
191  }
192  break;
193  }
194  }
195 
196  virtual TopicLinkPrx getLinkProxy(const Ice::Current&)
197  {
198  // Use cached reads.
199  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
200  return _impl->getLinkProxy();
201  }
202 
203  virtual void reap(const Ice::IdentitySeq& ids, const Ice::Current& /*current*/)
204  {
205  NodeIPtr node = _instance->node();
206  if (!node->updateMaster(__FILE__, __LINE__))
207  {
208  throw ReapWouldBlock();
209  }
210  FinishUpdateHelper unlock(node);
211  _impl->reap(ids);
212  }
213 
214  virtual void link(const TopicPrx& topic, Ice::Int cost, const Ice::Current& current)
215  {
216  while (true)
217  {
218  Ice::Long generation = -1;
219  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
220  if (master)
221  {
222  try
223  {
224  master->link(topic, cost);
225  }
226  catch (const Ice::ConnectFailedException&)
227  {
228  _instance->node()->recovery(generation);
229  continue;
230  }
231  catch (const Ice::TimeoutException&)
232  {
233  _instance->node()->recovery(generation);
234  continue;
235  }
236  }
237  else
238  {
239  FinishUpdateHelper unlock(_instance->node());
240  _impl->link(topic, cost);
241  }
242  break;
243  }
244  }
245 
246  virtual void unlink(const TopicPrx& topic, const Ice::Current& current)
247  {
248  while (true)
249  {
250  Ice::Long generation = -1;
251  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
252  if (master)
253  {
254  try
255  {
256  master->unlink(topic);
257  }
258  catch (const Ice::ConnectFailedException&)
259  {
260  _instance->node()->recovery(generation);
261  continue;
262  }
263  catch (const Ice::TimeoutException&)
264  {
265  _instance->node()->recovery(generation);
266  continue;
267  }
268  }
269  else
270  {
271  FinishUpdateHelper unlock(_instance->node());
272  _impl->unlink(topic);
273  }
274  break;
275  }
276  }
277 
278  virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current&) const
279  {
280  // Use cached reads.
281  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
282  return _impl->getLinkInfoSeq();
283  }
284 
285  virtual Ice::IdentitySeq getSubscribers(const Ice::Current&) const
286  {
287  return _impl->getSubscribers();
288  }
289 
290  virtual void destroy(const Ice::Current& current)
291  {
292  while (true)
293  {
294  Ice::Long generation = -1;
295  TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
296  if (master)
297  {
298  try
299  {
300  master->destroy();
301  }
302  catch (const Ice::ConnectFailedException&)
303  {
304  _instance->node()->recovery(generation);
305  continue;
306  }
307  catch (const Ice::TimeoutException&)
308  {
309  _instance->node()->recovery(generation);
310  continue;
311  }
312  }
313  else
314  {
315  FinishUpdateHelper unlock(_instance->node());
316  _impl->destroy();
317  }
318  break;
319  }
320  }
321 
322  private:
323 
324  TopicPrx getMasterFor(const Ice::Current& cur, Ice::Long& generation, const char* file, int line) const
325  {
326  NodeIPtr node = _instance->node();
327  Ice::ObjectPrx master;
328  if (node)
329  {
330  master = _instance->node()->startUpdate(generation, file, line);
331  }
332  return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) : TopicPrx();
333  }
334 
335  const TopicImplPtr _impl;
336  const PersistentInstancePtr _instance;
337  };
338 
339 }
340 
341 TopicImpl::TopicImpl(
342  const PersistentInstancePtr& instance,
343  const string& name,
344  const Ice::Identity& id,
345  const SubscriberRecordSeq& subscribers) :
346  _instance(instance),
347  _name(name),
348  _id(id),
349  _destroyed(false),
350  _lluMap(_instance->lluMap()),
351  _subscriberMap(_instance->subscriberMap())
352 {
353  try
354  {
355  __setNoDelete(true);
356 
357  // TODO: If we want to improve the performance of the
358  // non-replicated case we could allocate a null-topic impl here.
359  _servant = new TopicI(this, instance);
360 
361  //
362  // Create a servant per topic to receive event data. If the
363  // category is empty then we are in backwards compatibility
364  // mode. In this case the servant's identity is
365  // category=<topicname>, name=publish, otherwise the name is
366  // <instancename>/<topicname>.publish. The same applies to the
367  // link proxy.
368  //
369  // Activate the object and save a reference to give to publishers.
370  //
371  Ice::Identity pubid;
372  Ice::Identity linkid;
373  if (id.category.empty())
374  {
375  pubid.category = _name;
376  pubid.name = "publish";
377  linkid.category = _name;
378  linkid.name = "link";
379  }
380  else
381  {
382  pubid.category = id.category;
383  pubid.name = _name + ".publish";
384  linkid.category = id.category;
385  linkid.name = _name + ".link";
386  }
387 
388  _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid);
389  _linkPrx = TopicLinkPrx::uncheckedCast(
390  _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid));
391 
392  //
393  // Re-establish subscribers.
394  //
395  for (SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p)
396  {
397  Ice::Identity id = p->obj->ice_getIdentity();
398  TraceLevelsPtr traceLevels = _instance->traceLevels();
399  if (traceLevels->topic > 0)
400  {
401  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
402  out << _name << " recreate " << _instance->communicator()->identityToString(id);
403  if (traceLevels->topic > 1)
404  {
405  out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
406  }
407  }
408 
409  try
410  {
411  //
412  // Create the subscriber object add it to the set of
413  // subscribers.
414  //
415  SubscriberPtr subscriber = Subscriber::create(_instance, *p);
416  _subscribers.push_back(subscriber);
417  }
418  catch (const Ice::Exception& ex)
419  {
420  Ice::Warning out(traceLevels->logger);
421  out << _name << " recreate " << _instance->communicator()->identityToString(id);
422  if (traceLevels->topic > 1)
423  {
424  out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
425  }
426  out << " failed: " << ex;
427  }
428  }
429 
430  if (_instance->observer())
431  {
432  _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0));
433  }
434  }
435  catch (...)
436  {
437  shutdown();
438  __setNoDelete(false);
439  throw;
440  }
441  __setNoDelete(false);
442 }
443 
444 string
446 {
447  // Immutable
448  return _name;
449 }
450 
451 Ice::ObjectPrx
453 {
454  // Immutable
455  if (_instance->publisherReplicaProxy())
456  {
457  return _instance->publisherReplicaProxy()->ice_identity(_publisherPrx->ice_getIdentity());
458  }
459  return _publisherPrx;
460 }
461 
462 Ice::ObjectPrx
464 {
465  // If there is an adapter id configured then we're using icegrid
466  // so create an indirect proxy, otherwise create a direct proxy.
467  if (!_publisherPrx->ice_getAdapterId().empty())
468  {
469  return _instance->publishAdapter()->createIndirectProxy(_publisherPrx->ice_getIdentity());
470  }
471  else
472  {
473  return _instance->publishAdapter()->createDirectProxy(_publisherPrx->ice_getIdentity());
474  }
475 }
476 
477 namespace
478 {
479  void
480  trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<SubscriberPtr>& s)
481  {
482  out << '[';
483  for (vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p)
484  {
485  if (p != s.begin())
486  {
487  out << ",";
488  }
489  out << instance->communicator()->identityToString((*p)->id());
490  }
491  out << "]";
492  }
493 }
494 
495 Ice::ObjectPrx
496 TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
497 {
498  if (!obj)
499  {
500  TraceLevelsPtr traceLevels = _instance->traceLevels();
501  if (traceLevels->topic > 0)
502  {
503  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
504  out << _name << ": subscribeAndGetPublisher: null proxy";
505  }
506  throw InvalidSubscriber("subscriber is a null proxy");
507  }
508  Ice::Identity id = obj->ice_getIdentity();
509 
510  TraceLevelsPtr traceLevels = _instance->traceLevels();
511  if (traceLevels->topic > 0)
512  {
513  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
514  out << _name << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
515 
516  if (traceLevels->topic > 1)
517  {
518  out << " endpoints: " << IceStormInternal::describeEndpoints(obj)
519  << " QoS: ";
520  for (QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
521  {
522  if (p != qos.begin())
523  {
524  out << ',';
525  }
526 
527  }
528  out << " subscriptions: ";
529  trace(out, _instance, _subscribers);
530  }
531  }
532 
533  IceUtil::Mutex::Lock sync(_subscribersMutex);
534 
535  SubscriberRecord record;
536  record.id = id;
537  record.obj = obj;
538  record.theQoS = qos;
539  record.topicName = _name;
540  record.link = false;
541  record.cost = 0;
542 
543  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
544  if (p != _subscribers.end())
545  {
546  throw AlreadySubscribed();
547  }
548 
549  LogUpdate llu;
550 
551  SubscriberPtr subscriber = Subscriber::create(_instance, record);
552  try
553  {
554  IceDB::ReadWriteTxn txn(_instance->dbEnv());
555 
557  key.topic = _id;
558  key.id = subscriber->id();
559 
560  _subscriberMap.put(txn, key, record);
561 
562  llu = getIncrementedLLU(txn, _lluMap);
563 
564  txn.commit();
565  }
566  catch (const IceDB::LMDBException& ex)
567  {
568  logError(_instance->communicator(), ex);
569  throw; // will become UnknownException in caller
570  }
571 
572  _subscribers.push_back(subscriber);
573 
574  _instance->observers()->addSubscriber(llu, _name, record);
575 
576  return subscriber->proxy();
577 }
578 
579 void
580 TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber)
581 {
582  TraceLevelsPtr traceLevels = _instance->traceLevels();
583  if (!subscriber)
584  {
585  if (traceLevels->topic > 0)
586  {
587  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
588  out << _name << ": unsubscribe: null proxy";
589  }
590  throw InvalidSubscriber("subscriber is a null proxy");
591  }
592 
593  Ice::Identity id = subscriber->ice_getIdentity();
594 
595  if (traceLevels->topic > 0)
596  {
597  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
598  out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
599 
600  if (traceLevels->topic > 1)
601  {
602  out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
603  trace(out, _instance, _subscribers);
604  }
605  }
606 
607  IceUtil::Mutex::Lock sync(_subscribersMutex);
608  Ice::IdentitySeq ids;
609  ids.push_back(id);
610  removeSubscribers(ids);
611 }
612 
615 {
616  // immutable
617  if (_instance->publisherReplicaProxy())
618  {
619  return TopicLinkPrx::uncheckedCast(_instance->publisherReplicaProxy()->ice_identity(
620  _linkPrx->ice_getIdentity()));
621  }
622  return _linkPrx;
623 }
624 
625 void
626 TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
627 {
628  TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
629  TopicLinkPrx link = internal->getLinkProxy();
630 
631  TraceLevelsPtr traceLevels = _instance->traceLevels();
632  if (traceLevels->topic > 0)
633  {
634  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
635  out << _name << ": link " << _instance->communicator()->identityToString(topic->ice_getIdentity())
636  << " cost " << cost;
637  }
638 
639  IceUtil::Mutex::Lock sync(_subscribersMutex);
640 
641  Ice::Identity id = topic->ice_getIdentity();
642 
643  SubscriberRecord record;
644  record.id = id;
645  record.obj = link;
646  record.theTopic = topic;
647  record.topicName = _name;
648  record.link = true;
649  record.cost = cost;
650 
651  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
652  if (p != _subscribers.end())
653  {
654  string name = IceStormInternal::identityToTopicName(id);
655  LinkExists ex;
656  ex.name = name;
657  throw ex;
658  }
659 
660  LogUpdate llu;
661 
662  SubscriberPtr subscriber = Subscriber::create(_instance, record);
663 
664  try
665  {
666  IceDB::ReadWriteTxn txn(_instance->dbEnv());
667 
669  key.topic = _id;
670  key.id = id;
671 
672  _subscriberMap.put(txn, key, record);
673 
674  llu = getIncrementedLLU(txn, _lluMap);
675 
676  txn.commit();
677  }
678  catch (const IceDB::LMDBException& ex)
679  {
680  logError(_instance->communicator(), ex);
681  throw; // will become UnknownException in caller
682  }
683 
684  _subscribers.push_back(subscriber);
685 
686  _instance->observers()->addSubscriber(llu, _name, record);
687 }
688 
689 void
691 {
692  IceUtil::Mutex::Lock sync(_subscribersMutex);
693  if (_destroyed)
694  {
695  throw Ice::ObjectNotExistException(__FILE__, __LINE__);
696  }
697 
698  Ice::Identity id = topic->ice_getIdentity();
699 
700  vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id);
701  if (p == _subscribers.end())
702  {
703  string name = IceStormInternal::identityToTopicName(id);
704  TraceLevelsPtr traceLevels = _instance->traceLevels();
705  if (traceLevels->topic > 0)
706  {
707  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
708  out << _name << ": unlink " << name << " failed - not linked";
709  }
710 
711  NoSuchLink ex;
712  ex.name = name;
713  throw ex;
714  }
715 
716  TraceLevelsPtr traceLevels = _instance->traceLevels();
717  if (traceLevels->topic > 0)
718  {
719  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
720  out << _name << " unlink " << _instance->communicator()->identityToString(id);
721  }
722 
723  Ice::IdentitySeq ids;
724  ids.push_back(id);
725  removeSubscribers(ids);
726 }
727 
728 void
729 TopicImpl::reap(const Ice::IdentitySeq& ids)
730 {
731  IceUtil::Mutex::Lock sync(_subscribersMutex);
732 
733  TraceLevelsPtr traceLevels = _instance->traceLevels();
734  if (traceLevels->topic > 0)
735  {
736  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
737  out << _name << ": reap ";
738  for (Ice::IdentitySeq::const_iterator p = ids.begin(); p != ids.end() ; ++p)
739  {
740  if (p != ids.begin())
741  {
742  out << ",";
743  }
744  out << _instance->communicator()->identityToString(*p);
745  }
746  }
747 
748  removeSubscribers(ids);
749 }
750 
751 void
753 {
754  IceUtil::Mutex::Lock sync(_subscribersMutex);
755  _servant = 0;
756 
757  // Shutdown each subscriber. This waits for the event queues to drain.
758  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
759  {
760  (*p)->shutdown();
761  }
762 
763  _observer.detach();
764 }
765 
766 LinkInfoSeq
768 {
769  IceUtil::Mutex::Lock sync(_subscribersMutex);
770 
771  LinkInfoSeq seq;
772  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
773  {
774  SubscriberRecord record = (*p)->record();
775  if (record.link && !(*p)->errored())
776  {
777  LinkInfo info;
778  info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
779  info.cost = record.cost;
780  info.theTopic = record.theTopic;
781  seq.push_back(info);
782  }
783  }
784  return seq;
785 }
786 
787 Ice::IdentitySeq
789 {
790  IceUtil::Mutex::Lock sync(_subscribersMutex);
791 
792  Ice::IdentitySeq subscribers;
793  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
794  {
795  subscribers.push_back((*p)->id());
796  }
797  return subscribers;
798 }
799 
800 void
802 {
803  IceUtil::Mutex::Lock sync(_subscribersMutex);
804 
805  if (_destroyed)
806  {
807  throw Ice::ObjectNotExistException(__FILE__, __LINE__);
808  }
809  _destroyed = true;
810 
811  TraceLevelsPtr traceLevels = _instance->traceLevels();
812  if (traceLevels->topic > 0)
813  {
814  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
815  out << _name << ": destroy";
816  }
817 
818  // destroyInternal clears out the topic content.
819  LogUpdate llu = {0, 0};
820  _instance->observers()->destroyTopic(destroyInternal(llu, true), _name);
821 
822  _observer.detach();
823 }
824 
827 {
828  IceUtil::Mutex::Lock sync(_subscribersMutex);
829 
830  TopicContent content;
831  content.id = _id;
832  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
833  {
834  // Don't return errored subscribers (subscribers that have
835  // errored out, but not reaped due to a failure with the
836  // master). This means we can avoid the reaping step later.
837  if (!(*p)->errored())
838  {
839  content.records.push_back((*p)->record());
840  }
841  }
842  return content;
843 }
844 
845 void
847 {
848  IceUtil::Mutex::Lock sync(_subscribersMutex);
849 
850  // We do this with two scans. The first runs through the subscribers
851  // that we have and removes those not in the init list. The second
852  // runs through the init list and add the ones that don't
853  // exist.
854 
855  {
856  vector<SubscriberPtr>::iterator p = _subscribers.begin();
857  while (p != _subscribers.end())
858  {
859  SubscriberRecordSeq::const_iterator q;
860  for (q = records.begin(); q != records.end(); ++q)
861  {
862  if ((*p)->id() == q->id)
863  {
864  break;
865  }
866  }
867  // The subscriber doesn't exist in the incoming subscriber
868  // set so destroy it.
869  if (q == records.end())
870  {
871  (*p)->destroy();
872  p = _subscribers.erase(p);
873  }
874  else
875  {
876  // Otherwise reset the reaped status if necessary.
877  (*p)->resetIfReaped();
878  ++p;
879  }
880  }
881  }
882 
883  for (SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p)
884  {
885  vector<SubscriberPtr>::iterator q;
886  for (q = _subscribers.begin(); q != _subscribers.end(); ++q)
887  {
888  if ((*q)->id() == p->id)
889  {
890  break;
891  }
892  }
893  if (q == _subscribers.end())
894  {
895  SubscriberPtr subscriber = Subscriber::create(_instance, *p);
896  _subscribers.push_back(subscriber);
897  }
898  }
899 }
900 
901 bool
903 {
904  IceUtil::Mutex::Lock sync(_subscribersMutex);
905  return _destroyed;
906 }
907 
910 {
911  // immutable
912  return _id;
913 }
914 
915 TopicPrx
917 {
918  // immutable
919  Ice::ObjectPrx prx;
920  if (_instance->topicReplicaProxy())
921  {
922  prx = _instance->topicReplicaProxy()->ice_identity(_id);
923  }
924  else
925  {
926  prx = _instance->topicAdapter()->createProxy(_id);
927  }
928  return TopicPrx::uncheckedCast(prx);
929 }
930 
931 namespace
932 {
933 
934  class TopicInternalReapCB : public IceUtil::Shared
935  {
936  public:
937 
938  TopicInternalReapCB(const PersistentInstancePtr& instance, Ice::Long generation) :
939  _instance(instance), _generation(generation)
940  {
941  }
942 
943  virtual void exception(const Ice::Exception& ex)
944  {
945  TraceLevelsPtr traceLevels = _instance->traceLevels();
946  if (traceLevels->topic > 0)
947  {
948  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
949  out << "exception when calling `reap' on the master replica: " << ex;
950  }
951  _instance->node()->recovery(_generation);
952  }
953 
954  private:
955 
956  const PersistentInstancePtr _instance;
957  const Ice::Long _generation;
958  };
959 
960 }
961 
962 void
963 TopicImpl::publish(bool forwarded, const EventDataSeq& events)
964 {
965  TopicInternalPrx masterInternal;
966  Ice::Long generation = -1;
967  Ice::IdentitySeq reap;
968  {
969  // Use cached reads.
970  CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
971 
972  //
973  // Copy of the subscriber list so that event publishing can occur
974  // in parallel.
975  //
976  vector<SubscriberPtr> copy;
977  {
978  IceUtil::Mutex::Lock sync(_subscribersMutex);
979  if (_observer)
980  {
981  if (forwarded)
982  {
983  _observer->forwarded();
984  }
985  else
986  {
987  _observer->published();
988  }
989  }
990  copy = _subscribers;
991  }
992 
993  //
994  // Queue each event, gathering a list of those subscribers that
995  // must be reaped.
996  //
997  for (vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
998  {
999  if (!(*p)->queue(forwarded, events) && (*p)->reap())
1000  {
1001  reap.push_back((*p)->id());
1002  }
1003  }
1004 
1005  // If there are no subscribers in error then we're done.
1006  if (reap.empty())
1007  {
1008  return;
1009  }
1010  if (!unlock.getMaster())
1011  {
1012  IceUtil::Mutex::Lock sync(_subscribersMutex);
1013  removeSubscribers(reap);
1014  return;
1015  }
1016  masterInternal = TopicInternalPrx::uncheckedCast(unlock.getMaster()->ice_identity(_id));
1017  generation = unlock.generation();
1018  }
1019 
1020  // Tell the master to reap this set of subscribers. This is an
1021  // AMI invocation so it shouldn't block the caller (in the
1022  // typical case) we do it outside of the mutex lock for
1023  // performance reasons.
1024  //
1025  // We must release the cached lock before calling this as the AMI
1026  // call may raise an exception in the caller (that is directly
1027  // call ice_exception) which calls recover() on the node which
1028  // would result in a deadlock since the node is locked.
1029  masterInternal->begin_reap(reap, newCallback_TopicInternal_reap(new TopicInternalReapCB(_instance, generation),
1030  &TopicInternalReapCB::exception));
1031 }
1032 
1033 void
1035 {
1036  IceUtil::Mutex::Lock sync(_subscribersMutex);
1037 
1038  TraceLevelsPtr traceLevels = _instance->traceLevels();
1039  if (traceLevels->topic > 0)
1040  {
1041  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1042  out << _name << ": add replica observer: " << _instance->communicator()->identityToString(record.id);
1043 
1044  if (traceLevels->topic > 1)
1045  {
1046  out << " endpoints: " << IceStormInternal::describeEndpoints(record.obj)
1047  << " QoS: ";
1048  for (QoS::const_iterator p = record.theQoS.begin(); p != record.theQoS.end() ; ++p)
1049  {
1050  if (p != record.theQoS.begin())
1051  {
1052  out << ',';
1053  }
1054  out << '[' << p->first << "," << p->second << ']';
1055  }
1056  }
1057  out << " llu: " << llu.generation << "/" << llu.iteration;
1058  }
1059 
1060  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
1061  if (p != _subscribers.end())
1062  {
1063  // If the subscriber is already in the database display a
1064  // diagnostic.
1065  TraceLevelsPtr traceLevels = _instance->traceLevels();
1066  if (traceLevels->topic > 0)
1067  {
1068  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1069  out << _instance->communicator()->identityToString(record.id) << ": already subscribed";
1070  }
1071  return;
1072  }
1073 
1074  SubscriberPtr subscriber = Subscriber::create(_instance, record);
1075  try
1076  {
1077  IceDB::ReadWriteTxn txn(_instance->dbEnv());
1078 
1079  SubscriberRecordKey key;
1080  key.topic = _id;
1081  key.id = subscriber->id();
1082 
1083  _subscriberMap.put(txn, key, record);
1084 
1085  // Update the LLU.
1086  _lluMap.put(txn, lluDbKey, llu);
1087 
1088  txn.commit();
1089  }
1090  catch (const IceDB::LMDBException& ex)
1091  {
1092  logError(_instance->communicator(), ex);
1093  throw; // will become UnknownException in caller
1094  }
1095 
1096  _subscribers.push_back(subscriber);
1097 }
1098 
1099 void
1100 TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq& ids)
1101 {
1102  TraceLevelsPtr traceLevels = _instance->traceLevels();
1103  if (traceLevels->topic > 0)
1104  {
1105  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1106  out << _name << ": remove replica observer: ";
1107  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1108  {
1109  if (id != ids.begin())
1110  {
1111  out << ",";
1112  }
1113  out << _instance->communicator()->identityToString(*id);
1114  }
1115  out << " llu: " << llu.generation << "/" << llu.iteration;
1116  }
1117 
1118  IceUtil::Mutex::Lock sync(_subscribersMutex);
1119 
1120  // First remove from the database.
1121  try
1122  {
1123  IceDB::ReadWriteTxn txn(_instance->dbEnv());
1124 
1125  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1126  {
1127  SubscriberRecordKey key;
1128  key.topic = _id;
1129  key.id = *id;
1130 
1131  _subscriberMap.del(txn, key);
1132  }
1133 
1134  _lluMap.put(txn, lluDbKey, llu);
1135 
1136  txn.commit();
1137  }
1138  catch (const IceDB::LMDBException& ex)
1139  {
1140  logError(_instance->communicator(), ex);
1141  throw; // will become UnknownException in caller
1142  }
1143 
1144  // Then remove the subscriber from the subscribers list. If the
1145  // subscriber had a local failure and was removed from the
1146  // subscriber list it could already be gone. That's not a problem.
1147  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1148  {
1149  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
1150  if (p != _subscribers.end())
1151  {
1152  (*p)->destroy();
1153  _subscribers.erase(p);
1154  }
1155  }
1156 }
1157 
1158 void
1160 {
1161  IceUtil::Mutex::Lock sync(_subscribersMutex);
1162 
1163  if (_destroyed)
1164  {
1165  return;
1166  }
1167  _destroyed = true;
1168 
1169  TraceLevelsPtr traceLevels = _instance->traceLevels();
1170  if (traceLevels->topic > 0)
1171  {
1172  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1173  out << _name << ": destroyed";
1174  out << " llu: " << llu.generation << "/" << llu.iteration;
1175  }
1176  destroyInternal(llu, false);
1177 }
1178 
1181 {
1182  return _servant;
1183 }
1184 
1185 void
1187 {
1188  IceUtil::Mutex::Lock sync(_subscribersMutex);
1189  if (_instance->observer())
1190  {
1191  _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, _observer.get()));
1192  }
1193 }
1194 
1195 void
1197 {
1198  IceUtil::Mutex::Lock sync(_subscribersMutex);
1199  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
1200  {
1201  (*p)->updateObserver();
1202  }
1203 }
1204 
1205 LogUpdate
1206 TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
1207 {
1208 
1209  // Clear out the database records related to this topic.
1210  LogUpdate llu;
1211  try
1212  {
1213  IceDB::ReadWriteTxn txn(_instance->dbEnv());
1214 
1215  // Erase all subscriber records and the topic record.
1216  SubscriberRecordKey key;
1217  key.topic = _id;
1218 
1219  SubscriberMapRWCursor cursor(_subscriberMap, txn);
1220  if (cursor.find(key))
1221  {
1222  _subscriberMap.del(txn, key);
1223 
1226  while (cursor.get(k, v, MDB_NEXT) && k.topic == key.topic)
1227  {
1228  _subscriberMap.del(txn, k);
1229  }
1230  }
1231 
1232  // Update the LLU.
1233  if (master)
1234  {
1235  llu = getIncrementedLLU(txn, _lluMap);
1236  }
1237  else
1238  {
1239  llu = origLLU;
1240  _lluMap.put(txn, lluDbKey, llu);
1241  }
1242 
1243  txn.commit();
1244  }
1245  catch (const IceDB::LMDBException& ex)
1246  {
1247  logError(_instance->communicator(), ex);
1248  throw; // will become UnknownException in caller
1249  }
1250 
1251  _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
1252  _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
1253  _instance->topicReaper()->add(_name);
1254 
1255  // Destroy each of the subscribers.
1256  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
1257  {
1258  (*p)->destroy();
1259  }
1260  _subscribers.clear();
1261 
1262  _instance->topicAdapter()->remove(_id);
1263 
1264  _servant = 0;
1265 
1266  return llu;
1267 }
1268 
1269 void
1270 TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
1271 {
1272  // First update the database
1273 
1274  LogUpdate llu;
1275  bool found = false;
1276  try
1277  {
1278  IceDB::ReadWriteTxn txn(_instance->dbEnv());
1279 
1280  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1281  {
1282  SubscriberRecordKey key;
1283  key.topic = _id;
1284  key.id = *id;
1285 
1286  if (_subscriberMap.del(txn, key))
1287  {
1288  found = true;
1289  }
1290  }
1291 
1292  if (found)
1293  {
1294  llu = getIncrementedLLU(txn, _lluMap);
1295  txn.commit();
1296  }
1297  else
1298  {
1299  txn.rollback();
1300  }
1301  }
1302  catch (const IceDB::LMDBException& ex)
1303  {
1304  logError(_instance->communicator(), ex);
1305  throw; // will become UnknownException in caller
1306  }
1307 
1308  if (found)
1309  {
1310  // Then remove the subscriber from the subscribers list. Its
1311  // possible that some of these subscribers have already been
1312  // removed (consider, for example, a concurrent reap call from two
1313  // replicas on the same subscriber). To avoid sending unnecessary
1314  // observer updates keep track of the observers that are actually
1315  // removed.
1316  for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1317  {
1318  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
1319  if (p != _subscribers.end())
1320  {
1321  (*p)->destroy();
1322  _subscribers.erase(p);
1323  }
1324  }
1325 
1326  _instance->observers()->removeSubscriber(llu, _name, ids);
1327  }
1328 }
IceStorm::SubscriberRecordSeq
::std::vector< ::IceStorm::SubscriberRecord > SubscriberRecordSeq
Definition: SubscriberRecord.h:225
IceStorm::TopicImpl::destroy
void destroy()
Definition: TopicI.cpp:801
IceStorm::TopicImpl::getLinkInfoSeq
LinkInfoSeq getLinkInfoSeq() const
Definition: TopicI.cpp:767
IceStorm
Definition: DBTypes.ice:22
IceDB::ReadWriteTxn
Definition: IceDB.h:206
IceStorm::SubscriberRecordKey::topic
::Ice::Identity topic
Definition: SubscriberRecord.h:151
IceStorm::TopicImpl::getName
std::string getName() const
Definition: TopicI.cpp:445
IceStorm::TopicImpl::observerAddSubscriber
void observerAddSubscriber(const IceStormElection::LogUpdate &, const SubscriberRecord &)
Definition: TopicI.cpp:1034
TopicI.h
IceDB::Txn::commit
void commit()
IceStorm::SubscriberRecordKey
The key for persistent subscribers, or topics.
Definition: SubscriberRecord.h:149
IceStorm::EventDataSeq
std::deque< ::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
Definition: IceStormInternal.h:463
IceStorm::TopicImpl::getServant
Ice::ObjectPtr getServant() const
Definition: TopicI.cpp:1180
IceStorm::TopicImpl::shutdown
void shutdown()
Definition: TopicI.cpp:752
IceStorm::ReapWouldBlock
Thrown if the reap call would block.
Definition: IceStormInternal.h:465
IceStormElection::LogUpdate
A struct used for marking the last log update.
Definition: LLURecord.h:100
IceStorm::TopicImpl::destroyed
bool destroyed() const
Definition: TopicI.cpp:902
IceStorm::TopicImpl::updateObserver
void updateObserver()
Definition: TopicI.cpp:1186
IceStormElection::CachedReadHelper::getMaster
Ice::ObjectPrx getMaster() const
Definition: NodeI.h:149
IceStorm::Subscriber::create
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
Definition: Subscriber.cpp:522
IceStorm::TopicImpl::unsubscribe
void unsubscribe(const Ice::ObjectPrx &)
Definition: TopicI.cpp:580
IceStorm::TopicImpl::update
void update(const SubscriberRecordSeq &)
Definition: TopicI.cpp:846
IceStorm::TopicImpl::unlink
void unlink(const TopicPrx &)
Definition: TopicI.cpp:690
IceStorm::newCallback_TopicInternal_reap
Callback_TopicInternal_reapPtr newCallback_TopicInternal_reap(const IceUtil::Handle< T > &instance, void(T::*cb)(), void(T::*excb)(const ::Ice::Exception &), void(T::*sentcb)(bool)=0)
Definition: IceStormInternal.h:1083
Util.h
IceStorm::SubscriberRecord::theQoS
::IceStorm::QoS theQoS
Definition: SubscriberRecord.h:220
IceInternal::Handle< ::Ice::Communicator >
IceStorm::TopicImpl::getPublisher
Ice::ObjectPrx getPublisher() const
Definition: TopicI.cpp:452
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:214
IceDB::Dbi::put
void put(const ReadWriteTxn &txn, const K &key, const D &data, unsigned int flags=0)
Definition: IceDB.h:273
IceStorm::TopicImpl::reap
void reap(const Ice::IdentitySeq &)
Definition: TopicI.cpp:729
IceStorm::TopicInternal
Internal operations for a topic.
Definition: IceStormInternal.h:734
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:523
IceStormElection::TopicContent::id
Ice::Identity id
The topic identity.
Definition: Election.ice:26
IceStormElection::TopicContent
The contents of topic.
Definition: Election.ice:23
IceStormElection::LogUpdate::generation
::Ice::Long generation
Definition: LLURecord.h:102
copy
Use of this software is granted under one of the following two to be chosen freely by the user Boost Software License Version Marcin Kalicinski Permission is hereby free of to any person or organization obtaining a copy of the software and accompanying documentation covered by this and transmit the and to prepare derivative works of the and to permit third parties to whom the Software is furnished to do all subject to the including the above license this restriction and the following must be included in all copies of the in whole or in and all derivative works of the unless such copies or derivative works are solely in the form of machine executable object code generated by a source language processor THE SOFTWARE IS PROVIDED AS WITHOUT WARRANTY OF ANY EXPRESS OR INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF FITNESS FOR A PARTICULAR TITLE AND NON INFRINGEMENT IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER WHETHER IN TORT OR ARISING OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE The MIT Marcin Kalicinski Permission is hereby free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to copy
Definition: license.txt:39
IceStormElection
Definition: DBTypes.ice:17
IceStormElection::LogUpdate::iteration
::Ice::Long iteration
Definition: LLURecord.h:103
IceStorm::TopicImpl::updateSubscriberObservers
void updateSubscriberObservers()
Definition: TopicI.cpp:1196
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
armarx::ProxyType::topic
@ topic
IceStorm::lluDbKey
const std::string lluDbKey
Definition: Util.h:31
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:917
IceDB::ReadWriteCursor
Definition: IceDB.h:448
IceStorm::TopicImpl::proxy
TopicPrx proxy() const
Definition: TopicI.cpp:916
IceStorm::EventData
The event data.
Definition: IceStormInternal.h:429
IceStorm::SubscriberRecord::obj
::Ice::ObjectPrx obj
Definition: SubscriberRecord.h:219
Subscriber.h
q
#define q
IceStormInternal::describeEndpoints
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition: Util.cpp:51
IceStorm::TopicImpl::publish
void publish(bool, const EventDataSeq &)
Definition: TopicI.cpp:963
IceStorm::TopicImpl::observerRemoveSubscriber
void observerRemoveSubscriber(const IceStormElection::LogUpdate &, const Ice::IdentitySeq &)
Definition: TopicI.cpp:1100
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:438
IceStorm::TopicImpl::getLinkProxy
TopicLinkPrx getLinkProxy()
Definition: TopicI.cpp:614
IceStormElection::CachedReadHelper::generation
Ice::Long generation() const
Definition: NodeI.h:155
IceStormInternal::getIncrementedLLU
IceStormElection::LogUpdate getIncrementedLLU(const IceDB::ReadWriteTxn &, IceStorm::LLUMap &)
Definition: Util.cpp:94
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
IceStorm::SubscriberRecord::cost
::Ice::Int cost
Definition: SubscriberRecord.h:221
IceStorm::SubscriberRecord::theTopic
::IceStorm::TopicPrx theTopic
Definition: SubscriberRecord.h:222
TraceLevels.h
IceStormInternal::identityToTopicName
std::string identityToTopicName(const Ice::Identity &)
Definition: Util.cpp:23
std
Definition: Application.h:66
IceStormElection::CachedReadHelper
Definition: NodeI.h:127
IceStormInternal
Definition: Instance.cpp:27
IceUtil::Handle
Definition: forward_declarations.h:29
IceStorm::SubscriberRecordKey::id
::Ice::Identity id
Definition: SubscriberRecord.h:152
IceStorm::TopicImpl::id
Ice::Identity id() const
Definition: TopicI.cpp:909
IceInternal::ProxyHandle< ::IceProxy::IceStorm::Topic >
Observers.h
armarx::VariantType::Int
const VariantTypeId Int
Definition: Variant.h:916
IceStorm::TopicImpl::observerDestroyTopic
void observerDestroyTopic(const IceStormElection::LogUpdate &)
Definition: TopicI.cpp:1159
IceStorm::SubscriberRecord::id
::Ice::Identity id
Definition: SubscriberRecord.h:217
IceDB::Dbi::del
bool del(const ReadWriteTxn &txn, const K &key, const D &data)
Definition: IceDB.h:305
IceStorm::TopicImpl::subscribeAndGetPublisher
Ice::ObjectPrx subscribeAndGetPublisher(const QoS &, const Ice::ObjectPrx &)
Definition: TopicI.cpp:496
armarx::aron::type::ObjectPtr
std::shared_ptr< Object > ObjectPtr
Definition: Object.h:36
IceStorm::TopicImpl::link
void link(const TopicPrx &, Ice::Int)
Definition: TopicI.cpp:626
IceStorm::SubscriberRecord::link
bool link
Definition: SubscriberRecord.h:218
IceStorm::TopicImpl::getNonReplicatedPublisher
Ice::ObjectPrx getNonReplicatedPublisher() const
Definition: TopicI.cpp:463
NodeI.h
IceStorm::TopicImpl::getSubscribers
Ice::IdentitySeq getSubscribers() const
Definition: TopicI.cpp:788
Instance.h
IceDB::LMDBException
Definition: IceDB.h:51
IceStormElection::TopicContent::records
IceStorm::SubscriberRecordSeq records
The topic subscribers.
Definition: Election.ice:28
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
IceInternal::trace
void trace(const char *, const ::Ice::OutputStream &, const ::Ice::LoggerPtr &, const TraceLevelsPtr &)
IceStorm::SubscriberRecord::topicName
::std::string topicName
Definition: SubscriberRecord.h:216
IceStorm::TopicImpl::getContent
IceStormElection::TopicContent getContent() const
Definition: TopicI.cpp:826
IceStormElection::FinishUpdateHelper
Definition: NodeI.h:105