TopicI.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <algorithm>
11
12#include <Ice/LoggerUtil.h>
13#include <IceStorm/Instance.h>
14#include <IceStorm/NodeI.h>
15#include <IceStorm/Observers.h>
16#include <IceStorm/Subscriber.h>
17#include <IceStorm/TopicI.h>
19#include <IceStorm/Util.h>
20
21using namespace std;
22using namespace IceStorm;
23using namespace IceStormElection;
24using namespace IceStormInternal;
25
26namespace
27{
28
29 void
30 logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
31 {
32 Ice::Error error(com->getLogger());
33 error << "LMDB error: " << ex;
34 }
35
36 //
37 // The servant has a 1-1 association with a topic. It is used to
38 // receive events from Publishers.
39 //
40 class PublisherI : public Ice::BlobjectArray
41 {
42 public:
43 PublisherI(const TopicImplPtr& topic, const PersistentInstancePtr& instance) :
44 _topic(topic), _instance(instance)
45 {
46 }
47
48 virtual bool
49 ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
50 Ice::ByteSeq&,
51 const Ice::Current& current)
52 {
53 // The publish call does a cached read.
54 EventDataPtr event =
55 new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
56
57 //
58 // COMPILERBUG: gcc 4.0.1 doesn't like this.
59 //
60 //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
61 Ice::ByteSeq data(inParams.first, inParams.second);
62 event->data.swap(data);
63
65 v.push_back(event);
66 _topic->publish(false, v);
67
68 return true;
69 }
70
71 private:
72 const TopicImplPtr _topic;
73 const PersistentInstancePtr _instance;
74 };
75
76 //
77 // The servant has a 1-1 association with a topic. It is used to
78 // receive events from linked Topics.
79 //
80 class TopicLinkI : public TopicLink
81 {
82 public:
83 TopicLinkI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
84 _impl(impl), _instance(instance)
85 {
86 }
87
88 virtual void
89 forward(const EventDataSeq& v, const Ice::Current& /*current*/)
90 {
91 // The publish call does a cached read.
92 _impl->publish(true, v);
93 }
94
95 private:
96 const TopicImplPtr _impl;
97 const PersistentInstancePtr _instance;
98 };
99
100 class TopicI : public TopicInternal
101 {
102 public:
103 TopicI(const TopicImplPtr& impl, const PersistentInstancePtr& instance) :
104 _impl(impl), _instance(instance)
105 {
106 }
107
108 virtual string
109 getName(const Ice::Current&) const
110 {
111 // Use cached reads.
112 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
113 return _impl->getName();
114 }
115
116 virtual Ice::ObjectPrx
117 getPublisher(const Ice::Current&) const
118 {
119 // Use cached reads.
120 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
121 return _impl->getPublisher();
122 }
123
124 virtual Ice::ObjectPrx
125 getNonReplicatedPublisher(const Ice::Current&) const
126 {
127 // Use cached reads.
128 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
129 return _impl->getNonReplicatedPublisher();
130 }
131
132 virtual Ice::ObjectPrx
133 subscribeAndGetPublisher(const QoS& qos,
134 const Ice::ObjectPrx& obj,
135 const Ice::Current& current)
136 {
137 while (true)
138 {
139 Ice::Long generation = -1;
140 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
141 if (master)
142 {
143 try
144 {
145 return master->subscribeAndGetPublisher(qos, obj);
146 }
147 catch (const Ice::ConnectFailedException&)
148 {
149 _instance->node()->recovery(generation);
150 continue;
151 }
152 catch (const Ice::TimeoutException&)
153 {
154 _instance->node()->recovery(generation);
155 continue;
156 }
157 }
158 else
159 {
160 FinishUpdateHelper unlock(_instance->node());
161 return _impl->subscribeAndGetPublisher(qos, obj);
162 }
163 }
164 }
165
166 virtual void
167 unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current& current)
168 {
169 while (true)
170 {
171 Ice::Long generation = -1;
172 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
173 if (master)
174 {
175 try
176 {
177 master->unsubscribe(subscriber);
178 }
179 catch (const Ice::ConnectFailedException&)
180 {
181 _instance->node()->recovery(generation);
182 continue;
183 }
184 catch (const Ice::TimeoutException&)
185 {
186 _instance->node()->recovery(generation);
187 continue;
188 }
189 }
190 else
191 {
192 FinishUpdateHelper unlock(_instance->node());
193 _impl->unsubscribe(subscriber);
194 }
195 break;
196 }
197 }
198
199 virtual TopicLinkPrx
200 getLinkProxy(const Ice::Current&)
201 {
202 // Use cached reads.
203 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
204 return _impl->getLinkProxy();
205 }
206
207 virtual void
208 reap(const Ice::IdentitySeq& ids, const Ice::Current& /*current*/)
209 {
210 NodeIPtr node = _instance->node();
211 if (!node->updateMaster(__FILE__, __LINE__))
212 {
213 throw ReapWouldBlock();
214 }
215 FinishUpdateHelper unlock(node);
216 _impl->reap(ids);
217 }
218
219 virtual void
220 link(const TopicPrx& topic, Ice::Int cost, const Ice::Current& current)
221 {
222 while (true)
223 {
224 Ice::Long generation = -1;
225 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
226 if (master)
227 {
228 try
229 {
230 master->link(topic, cost);
231 }
232 catch (const Ice::ConnectFailedException&)
233 {
234 _instance->node()->recovery(generation);
235 continue;
236 }
237 catch (const Ice::TimeoutException&)
238 {
239 _instance->node()->recovery(generation);
240 continue;
241 }
242 }
243 else
244 {
245 FinishUpdateHelper unlock(_instance->node());
246 _impl->link(topic, cost);
247 }
248 break;
249 }
250 }
251
252 virtual void
253 unlink(const TopicPrx& topic, const Ice::Current& current)
254 {
255 while (true)
256 {
257 Ice::Long generation = -1;
258 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
259 if (master)
260 {
261 try
262 {
263 master->unlink(topic);
264 }
265 catch (const Ice::ConnectFailedException&)
266 {
267 _instance->node()->recovery(generation);
268 continue;
269 }
270 catch (const Ice::TimeoutException&)
271 {
272 _instance->node()->recovery(generation);
273 continue;
274 }
275 }
276 else
277 {
278 FinishUpdateHelper unlock(_instance->node());
279 _impl->unlink(topic);
280 }
281 break;
282 }
283 }
284
285 virtual LinkInfoSeq
286 getLinkInfoSeq(const Ice::Current&) const
287 {
288 // Use cached reads.
289 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
290 return _impl->getLinkInfoSeq();
291 }
292
293 virtual Ice::IdentitySeq
294 getSubscribers(const Ice::Current&) const
295 {
296 return _impl->getSubscribers();
297 }
298
299 virtual void
300 destroy(const Ice::Current& current)
301 {
302 while (true)
303 {
304 Ice::Long generation = -1;
305 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
306 if (master)
307 {
308 try
309 {
310 master->destroy();
311 }
312 catch (const Ice::ConnectFailedException&)
313 {
314 _instance->node()->recovery(generation);
315 continue;
316 }
317 catch (const Ice::TimeoutException&)
318 {
319 _instance->node()->recovery(generation);
320 continue;
321 }
322 }
323 else
324 {
325 FinishUpdateHelper unlock(_instance->node());
326 _impl->destroy();
327 }
328 break;
329 }
330 }
331
332 private:
334 getMasterFor(const Ice::Current& cur,
335 Ice::Long& generation,
336 const char* file,
337 int line) const
338 {
339 NodeIPtr node = _instance->node();
340 Ice::ObjectPrx master;
341 if (node)
342 {
343 master = _instance->node()->startUpdate(generation, file, line);
344 }
345 return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) : TopicPrx();
346 }
347
348 const TopicImplPtr _impl;
349 const PersistentInstancePtr _instance;
350 };
351
352} // namespace
353
355 const string& name,
356 const Ice::Identity& id,
357 const SubscriberRecordSeq& subscribers) :
358 _instance(instance),
359 _name(name),
360 _id(id),
361 _destroyed(false),
362 _lluMap(_instance->lluMap()),
363 _subscriberMap(_instance->subscriberMap())
364{
365 try
366 {
367 __setNoDelete(true);
368
369 // TODO: If we want to improve the performance of the
370 // non-replicated case we could allocate a null-topic impl here.
371 _servant = new TopicI(this, instance);
372
373 //
374 // Create a servant per topic to receive event data. If the
375 // category is empty then we are in backwards compatibility
376 // mode. In this case the servant's identity is
377 // category=<topicname>, name=publish, otherwise the name is
378 // <instancename>/<topicname>.publish. The same applies to the
379 // link proxy.
380 //
381 // Activate the object and save a reference to give to publishers.
382 //
383 Ice::Identity pubid;
384 Ice::Identity linkid;
385 if (id.category.empty())
386 {
387 pubid.category = _name;
388 pubid.name = "publish";
389 linkid.category = _name;
390 linkid.name = "link";
391 }
392 else
393 {
394 pubid.category = id.category;
395 pubid.name = _name + ".publish";
396 linkid.category = id.category;
397 linkid.name = _name + ".link";
398 }
399
400 _publisherPrx = _instance->publishAdapter()->add(new PublisherI(this, instance), pubid);
401 _linkPrx = TopicLinkPrx::uncheckedCast(
402 _instance->publishAdapter()->add(new TopicLinkI(this, instance), linkid));
403
404 //
405 // Re-establish subscribers.
406 //
407 for (SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end();
408 ++p)
409 {
410 Ice::Identity id = p->obj->ice_getIdentity();
411 TraceLevelsPtr traceLevels = _instance->traceLevels();
412 if (traceLevels->topic > 0)
413 {
414 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
415 out << _name << " recreate " << _instance->communicator()->identityToString(id);
416 if (traceLevels->topic > 1)
417 {
418 out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
419 }
420 }
421
422 try
423 {
424 //
425 // Create the subscriber object add it to the set of
426 // subscribers.
427 //
428 SubscriberPtr subscriber = Subscriber::create(_instance, *p);
429 _subscribers.push_back(subscriber);
430 }
431 catch (const Ice::Exception& ex)
432 {
433 Ice::Warning out(traceLevels->logger);
434 out << _name << " recreate " << _instance->communicator()->identityToString(id);
435 if (traceLevels->topic > 1)
436 {
437 out << " endpoints: " << IceStormInternal::describeEndpoints(p->obj);
438 }
439 out << " failed: " << ex;
440 }
441 }
442
443 if (_instance->observer())
444 {
445 _observer.attach(
446 _instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0));
447 }
448 }
449 catch (...)
450 {
451 shutdown();
452 __setNoDelete(false);
453 throw;
454 }
455 __setNoDelete(false);
456}
457
458string
460{
461 // Immutable
462 return _name;
463}
464
465Ice::ObjectPrx
467{
468 // Immutable
469 if (_instance->publisherReplicaProxy())
470 {
471 return _instance->publisherReplicaProxy()->ice_identity(_publisherPrx->ice_getIdentity());
472 }
473 return _publisherPrx;
474}
475
476Ice::ObjectPrx
478{
479 // If there is an adapter id configured then we're using icegrid
480 // so create an indirect proxy, otherwise create a direct proxy.
481 if (!_publisherPrx->ice_getAdapterId().empty())
482 {
483 return _instance->publishAdapter()->createIndirectProxy(_publisherPrx->ice_getIdentity());
484 }
485 else
486 {
487 return _instance->publishAdapter()->createDirectProxy(_publisherPrx->ice_getIdentity());
488 }
489}
490
491namespace
492{
493 void
494 trace(Ice::Trace& out, const PersistentInstancePtr& instance, const vector<SubscriberPtr>& s)
495 {
496 out << '[';
497 for (vector<SubscriberPtr>::const_iterator p = s.begin(); p != s.end(); ++p)
498 {
499 if (p != s.begin())
500 {
501 out << ",";
502 }
503 out << instance->communicator()->identityToString((*p)->id());
504 }
505 out << "]";
506 }
507} // namespace
508
509Ice::ObjectPrx
510TopicImpl::subscribeAndGetPublisher(const QoS& qos, const Ice::ObjectPrx& obj)
511{
512 if (!obj)
513 {
514 TraceLevelsPtr traceLevels = _instance->traceLevels();
515 if (traceLevels->topic > 0)
516 {
517 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
518 out << _name << ": subscribeAndGetPublisher: null proxy";
519 }
520 throw InvalidSubscriber("subscriber is a null proxy");
521 }
522 Ice::Identity id = obj->ice_getIdentity();
523
524 TraceLevelsPtr traceLevels = _instance->traceLevels();
525 if (traceLevels->topic > 0)
526 {
527 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
528 out << _name
529 << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
530
531 if (traceLevels->topic > 1)
532 {
533 out << " endpoints: " << IceStormInternal::describeEndpoints(obj) << " QoS: ";
534 for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
535 {
536 if (p != qos.begin())
537 {
538 out << ',';
539 }
540 }
541 out << " subscriptions: ";
542 trace(out, _instance, _subscribers);
543 }
544 }
545
546 IceUtil::Mutex::Lock sync(_subscribersMutex);
547
548 SubscriberRecord record;
549 record.id = id;
550 record.obj = obj;
551 record.theQoS = qos;
552 record.topicName = _name;
553 record.link = false;
554 record.cost = 0;
555
556 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
557 if (p != _subscribers.end())
558 {
559 throw AlreadySubscribed();
560 }
561
562 LogUpdate llu;
563
564 SubscriberPtr subscriber = Subscriber::create(_instance, record);
565 try
566 {
567 IceDB::ReadWriteTxn txn(_instance->dbEnv());
568
570 key.topic = _id;
571 key.id = subscriber->id();
572
573 _subscriberMap.put(txn, key, record);
574
575 llu = getIncrementedLLU(txn, _lluMap);
576
577 txn.commit();
578 }
579 catch (const IceDB::LMDBException& ex)
580 {
581 logError(_instance->communicator(), ex);
582 throw; // will become UnknownException in caller
583 }
584
585 _subscribers.push_back(subscriber);
586
587 _instance->observers()->addSubscriber(llu, _name, record);
588
589 return subscriber->proxy();
590}
591
592void
593TopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber)
594{
595 TraceLevelsPtr traceLevels = _instance->traceLevels();
596 if (!subscriber)
597 {
598 if (traceLevels->topic > 0)
599 {
600 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
601 out << _name << ": unsubscribe: null proxy";
602 }
603 throw InvalidSubscriber("subscriber is a null proxy");
604 }
605
606 Ice::Identity id = subscriber->ice_getIdentity();
607
608 if (traceLevels->topic > 0)
609 {
610 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
611 out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
612
613 if (traceLevels->topic > 1)
614 {
615 out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
616 trace(out, _instance, _subscribers);
617 }
618 }
619
620 IceUtil::Mutex::Lock sync(_subscribersMutex);
621 Ice::IdentitySeq ids;
622 ids.push_back(id);
623 removeSubscribers(ids);
624}
625
628{
629 // immutable
630 if (_instance->publisherReplicaProxy())
631 {
632 return TopicLinkPrx::uncheckedCast(
633 _instance->publisherReplicaProxy()->ice_identity(_linkPrx->ice_getIdentity()));
634 }
635 return _linkPrx;
636}
637
638void
639TopicImpl::link(const TopicPrx& topic, Ice::Int cost)
640{
641 TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
642 TopicLinkPrx link = internal->getLinkProxy();
643
644 TraceLevelsPtr traceLevels = _instance->traceLevels();
645 if (traceLevels->topic > 0)
646 {
647 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
648 out << _name << ": link "
649 << _instance->communicator()->identityToString(topic->ice_getIdentity()) << " cost "
650 << cost;
651 }
652
653 IceUtil::Mutex::Lock sync(_subscribersMutex);
654
655 Ice::Identity id = topic->ice_getIdentity();
656
657 SubscriberRecord record;
658 record.id = id;
659 record.obj = link;
660 record.theTopic = topic;
661 record.topicName = _name;
662 record.link = true;
663 record.cost = cost;
664
665 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
666 if (p != _subscribers.end())
667 {
669 LinkExists ex;
670 ex.name = name;
671 throw ex;
672 }
673
674 LogUpdate llu;
675
676 SubscriberPtr subscriber = Subscriber::create(_instance, record);
677
678 try
679 {
680 IceDB::ReadWriteTxn txn(_instance->dbEnv());
681
683 key.topic = _id;
684 key.id = id;
685
686 _subscriberMap.put(txn, key, record);
687
688 llu = getIncrementedLLU(txn, _lluMap);
689
690 txn.commit();
691 }
692 catch (const IceDB::LMDBException& ex)
693 {
694 logError(_instance->communicator(), ex);
695 throw; // will become UnknownException in caller
696 }
697
698 _subscribers.push_back(subscriber);
699
700 _instance->observers()->addSubscriber(llu, _name, record);
701}
702
703void
705{
706 IceUtil::Mutex::Lock sync(_subscribersMutex);
707 if (_destroyed)
708 {
709 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
710 }
711
712 Ice::Identity id = topic->ice_getIdentity();
713
714 vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(), id);
715 if (p == _subscribers.end())
716 {
718 TraceLevelsPtr traceLevels = _instance->traceLevels();
719 if (traceLevels->topic > 0)
720 {
721 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
722 out << _name << ": unlink " << name << " failed - not linked";
723 }
724
725 NoSuchLink ex;
726 ex.name = name;
727 throw ex;
728 }
729
730 TraceLevelsPtr traceLevels = _instance->traceLevels();
731 if (traceLevels->topic > 0)
732 {
733 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
734 out << _name << " unlink " << _instance->communicator()->identityToString(id);
735 }
736
737 Ice::IdentitySeq ids;
738 ids.push_back(id);
739 removeSubscribers(ids);
740}
741
742void
743TopicImpl::reap(const Ice::IdentitySeq& ids)
744{
745 IceUtil::Mutex::Lock sync(_subscribersMutex);
746
747 TraceLevelsPtr traceLevels = _instance->traceLevels();
748 if (traceLevels->topic > 0)
749 {
750 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
751 out << _name << ": reap ";
752 for (Ice::IdentitySeq::const_iterator p = ids.begin(); p != ids.end(); ++p)
753 {
754 if (p != ids.begin())
755 {
756 out << ",";
757 }
758 out << _instance->communicator()->identityToString(*p);
759 }
760 }
761
762 removeSubscribers(ids);
763}
764
765void
767{
768 IceUtil::Mutex::Lock sync(_subscribersMutex);
769 _servant = 0;
770
771 // Shutdown each subscriber. This waits for the event queues to drain.
772 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
773 ++p)
774 {
775 (*p)->shutdown();
776 }
777
778 _observer.detach();
779}
780
781LinkInfoSeq
783{
784 IceUtil::Mutex::Lock sync(_subscribersMutex);
785
786 LinkInfoSeq seq;
787 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
788 ++p)
789 {
790 SubscriberRecord record = (*p)->record();
791 if (record.link && !(*p)->errored())
792 {
793 LinkInfo info;
794 info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
795 info.cost = record.cost;
796 info.theTopic = record.theTopic;
797 seq.push_back(info);
798 }
799 }
800 return seq;
801}
802
803Ice::IdentitySeq
805{
806 IceUtil::Mutex::Lock sync(_subscribersMutex);
807
808 Ice::IdentitySeq subscribers;
809 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
810 ++p)
811 {
812 subscribers.push_back((*p)->id());
813 }
814 return subscribers;
815}
816
817void
819{
820 IceUtil::Mutex::Lock sync(_subscribersMutex);
821
822 if (_destroyed)
823 {
824 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
825 }
826 _destroyed = true;
827
828 TraceLevelsPtr traceLevels = _instance->traceLevels();
829 if (traceLevels->topic > 0)
830 {
831 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
832 out << _name << ": destroy";
833 }
834
835 // destroyInternal clears out the topic content.
836 LogUpdate llu = {0, 0};
837 _instance->observers()->destroyTopic(destroyInternal(llu, true), _name);
838
839 _observer.detach();
840}
841
844{
845 IceUtil::Mutex::Lock sync(_subscribersMutex);
846
847 TopicContent content;
848 content.id = _id;
849 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
850 ++p)
851 {
852 // Don't return errored subscribers (subscribers that have
853 // errored out, but not reaped due to a failure with the
854 // master). This means we can avoid the reaping step later.
855 if (!(*p)->errored())
856 {
857 content.records.push_back((*p)->record());
858 }
859 }
860 return content;
861}
862
863void
865{
866 IceUtil::Mutex::Lock sync(_subscribersMutex);
867
868 // We do this with two scans. The first runs through the subscribers
869 // that we have and removes those not in the init list. The second
870 // runs through the init list and add the ones that don't
871 // exist.
872
873 {
874 vector<SubscriberPtr>::iterator p = _subscribers.begin();
875 while (p != _subscribers.end())
876 {
877 SubscriberRecordSeq::const_iterator q;
878 for (q = records.begin(); q != records.end(); ++q)
879 {
880 if ((*p)->id() == q->id)
881 {
882 break;
883 }
884 }
885 // The subscriber doesn't exist in the incoming subscriber
886 // set so destroy it.
887 if (q == records.end())
888 {
889 (*p)->destroy();
890 p = _subscribers.erase(p);
891 }
892 else
893 {
894 // Otherwise reset the reaped status if necessary.
895 (*p)->resetIfReaped();
896 ++p;
897 }
898 }
899 }
900
901 for (SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p)
902 {
903 vector<SubscriberPtr>::iterator q;
904 for (q = _subscribers.begin(); q != _subscribers.end(); ++q)
905 {
906 if ((*q)->id() == p->id)
907 {
908 break;
909 }
910 }
911 if (q == _subscribers.end())
912 {
913 SubscriberPtr subscriber = Subscriber::create(_instance, *p);
914 _subscribers.push_back(subscriber);
915 }
916 }
917}
918
919bool
921{
922 IceUtil::Mutex::Lock sync(_subscribersMutex);
923 return _destroyed;
924}
925
926Ice::Identity
928{
929 // immutable
930 return _id;
931}
932
935{
936 // immutable
937 Ice::ObjectPrx prx;
938 if (_instance->topicReplicaProxy())
939 {
940 prx = _instance->topicReplicaProxy()->ice_identity(_id);
941 }
942 else
943 {
944 prx = _instance->topicAdapter()->createProxy(_id);
945 }
946 return TopicPrx::uncheckedCast(prx);
947}
948
949namespace
950{
951
952 class TopicInternalReapCB : public IceUtil::Shared
953 {
954 public:
955 TopicInternalReapCB(const PersistentInstancePtr& instance, Ice::Long generation) :
956 _instance(instance), _generation(generation)
957 {
958 }
959
960 virtual void
961 exception(const Ice::Exception& ex)
962 {
963 TraceLevelsPtr traceLevels = _instance->traceLevels();
964 if (traceLevels->topic > 0)
965 {
966 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
967 out << "exception when calling `reap' on the master replica: " << ex;
968 }
969 _instance->node()->recovery(_generation);
970 }
971
972 private:
973 const PersistentInstancePtr _instance;
974 const Ice::Long _generation;
975 };
976
977} // namespace
978
979void
980TopicImpl::publish(bool forwarded, const EventDataSeq& events)
981{
982 TopicInternalPrx masterInternal;
983 Ice::Long generation = -1;
984 Ice::IdentitySeq reap;
985 {
986 // Use cached reads.
987 CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
988
989 //
990 // Copy of the subscriber list so that event publishing can occur
991 // in parallel.
992 //
993 vector<SubscriberPtr> copy;
994 {
995 IceUtil::Mutex::Lock sync(_subscribersMutex);
996 if (_observer)
997 {
998 if (forwarded)
999 {
1000 _observer->forwarded();
1001 }
1002 else
1003 {
1004 _observer->published();
1005 }
1006 }
1007 copy = _subscribers;
1008 }
1009
1010 //
1011 // Queue each event, gathering a list of those subscribers that
1012 // must be reaped.
1013 //
1014 for (vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
1015 {
1016 if (!(*p)->queue(forwarded, events) && (*p)->reap())
1017 {
1018 reap.push_back((*p)->id());
1019 }
1020 }
1021
1022 // If there are no subscribers in error then we're done.
1023 if (reap.empty())
1024 {
1025 return;
1026 }
1027 if (!unlock.getMaster())
1028 {
1029 IceUtil::Mutex::Lock sync(_subscribersMutex);
1030 removeSubscribers(reap);
1031 return;
1032 }
1033 masterInternal = TopicInternalPrx::uncheckedCast(unlock.getMaster()->ice_identity(_id));
1034 generation = unlock.generation();
1035 }
1036
1037 // Tell the master to reap this set of subscribers. This is an
1038 // AMI invocation so it shouldn't block the caller (in the
1039 // typical case) we do it outside of the mutex lock for
1040 // performance reasons.
1041 //
1042 // We must release the cached lock before calling this as the AMI
1043 // call may raise an exception in the caller (that is directly
1044 // call ice_exception) which calls recover() on the node which
1045 // would result in a deadlock since the node is locked.
1046 masterInternal->begin_reap(
1047 reap,
1048 newCallback_TopicInternal_reap(new TopicInternalReapCB(_instance, generation),
1049 &TopicInternalReapCB::exception));
1050}
1051
1052void
1054{
1055 IceUtil::Mutex::Lock sync(_subscribersMutex);
1056
1057 TraceLevelsPtr traceLevels = _instance->traceLevels();
1058 if (traceLevels->topic > 0)
1059 {
1060 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1061 out << _name
1062 << ": add replica observer: " << _instance->communicator()->identityToString(record.id);
1063
1064 if (traceLevels->topic > 1)
1065 {
1066 out << " endpoints: " << IceStormInternal::describeEndpoints(record.obj) << " QoS: ";
1067 for (QoS::const_iterator p = record.theQoS.begin(); p != record.theQoS.end(); ++p)
1068 {
1069 if (p != record.theQoS.begin())
1070 {
1071 out << ',';
1072 }
1073 out << '[' << p->first << "," << p->second << ']';
1074 }
1075 }
1076 out << " llu: " << llu.generation << "/" << llu.iteration;
1077 }
1078
1079 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
1080 if (p != _subscribers.end())
1081 {
1082 // If the subscriber is already in the database display a
1083 // diagnostic.
1084 TraceLevelsPtr traceLevels = _instance->traceLevels();
1085 if (traceLevels->topic > 0)
1086 {
1087 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1088 out << _instance->communicator()->identityToString(record.id) << ": already subscribed";
1089 }
1090 return;
1091 }
1092
1093 SubscriberPtr subscriber = Subscriber::create(_instance, record);
1094 try
1095 {
1096 IceDB::ReadWriteTxn txn(_instance->dbEnv());
1097
1099 key.topic = _id;
1100 key.id = subscriber->id();
1101
1102 _subscriberMap.put(txn, key, record);
1103
1104 // Update the LLU.
1105 _lluMap.put(txn, lluDbKey, llu);
1106
1107 txn.commit();
1108 }
1109 catch (const IceDB::LMDBException& ex)
1110 {
1111 logError(_instance->communicator(), ex);
1112 throw; // will become UnknownException in caller
1113 }
1114
1115 _subscribers.push_back(subscriber);
1116}
1117
1118void
1119TopicImpl::observerRemoveSubscriber(const LogUpdate& llu, const Ice::IdentitySeq& ids)
1120{
1121 TraceLevelsPtr traceLevels = _instance->traceLevels();
1122 if (traceLevels->topic > 0)
1123 {
1124 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1125 out << _name << ": remove replica observer: ";
1126 for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1127 {
1128 if (id != ids.begin())
1129 {
1130 out << ",";
1131 }
1132 out << _instance->communicator()->identityToString(*id);
1133 }
1134 out << " llu: " << llu.generation << "/" << llu.iteration;
1135 }
1136
1137 IceUtil::Mutex::Lock sync(_subscribersMutex);
1138
1139 // First remove from the database.
1140 try
1141 {
1142 IceDB::ReadWriteTxn txn(_instance->dbEnv());
1143
1144 for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1145 {
1147 key.topic = _id;
1148 key.id = *id;
1149
1150 _subscriberMap.del(txn, key);
1151 }
1152
1153 _lluMap.put(txn, lluDbKey, llu);
1154
1155 txn.commit();
1156 }
1157 catch (const IceDB::LMDBException& ex)
1158 {
1159 logError(_instance->communicator(), ex);
1160 throw; // will become UnknownException in caller
1161 }
1162
1163 // Then remove the subscriber from the subscribers list. If the
1164 // subscriber had a local failure and was removed from the
1165 // subscriber list it could already be gone. That's not a problem.
1166 for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1167 {
1168 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
1169 if (p != _subscribers.end())
1170 {
1171 (*p)->destroy();
1172 _subscribers.erase(p);
1173 }
1174 }
1175}
1176
1177void
1179{
1180 IceUtil::Mutex::Lock sync(_subscribersMutex);
1181
1182 if (_destroyed)
1183 {
1184 return;
1185 }
1186 _destroyed = true;
1187
1188 TraceLevelsPtr traceLevels = _instance->traceLevels();
1189 if (traceLevels->topic > 0)
1190 {
1191 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1192 out << _name << ": destroyed";
1193 out << " llu: " << llu.generation << "/" << llu.iteration;
1194 }
1195 destroyInternal(llu, false);
1196}
1197
1198Ice::ObjectPtr
1200{
1201 return _servant;
1202}
1203
1204void
1206{
1207 IceUtil::Mutex::Lock sync(_subscribersMutex);
1208 if (_instance->observer())
1209 {
1210 _observer.attach(_instance->observer()->getTopicObserver(
1211 _instance->serviceName(), _name, _observer.get()));
1212 }
1213}
1214
1215void
1217{
1218 IceUtil::Mutex::Lock sync(_subscribersMutex);
1219 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
1220 ++p)
1221 {
1222 (*p)->updateObserver();
1223 }
1224}
1225
1227TopicImpl::destroyInternal(const LogUpdate& origLLU, bool master)
1228{
1229
1230 // Clear out the database records related to this topic.
1231 LogUpdate llu;
1232 try
1233 {
1234 IceDB::ReadWriteTxn txn(_instance->dbEnv());
1235
1236 // Erase all subscriber records and the topic record.
1238 key.topic = _id;
1239
1240 SubscriberMapRWCursor cursor(_subscriberMap, txn);
1241 if (cursor.find(key))
1242 {
1243 _subscriberMap.del(txn, key);
1244
1247 while (cursor.get(k, v, MDB_NEXT) && k.topic == key.topic)
1248 {
1249 _subscriberMap.del(txn, k);
1250 }
1251 }
1252
1253 // Update the LLU.
1254 if (master)
1255 {
1256 llu = getIncrementedLLU(txn, _lluMap);
1257 }
1258 else
1259 {
1260 llu = origLLU;
1261 _lluMap.put(txn, lluDbKey, llu);
1262 }
1263
1264 txn.commit();
1265 }
1266 catch (const IceDB::LMDBException& ex)
1267 {
1268 logError(_instance->communicator(), ex);
1269 throw; // will become UnknownException in caller
1270 }
1271
1272 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
1273 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
1274 _instance->topicReaper()->add(_name);
1275
1276 // Destroy each of the subscribers.
1277 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
1278 ++p)
1279 {
1280 (*p)->destroy();
1281 }
1282 _subscribers.clear();
1283
1284 _instance->topicAdapter()->remove(_id);
1285
1286 _servant = 0;
1287
1288 return llu;
1289}
1290
1291void
1292TopicImpl::removeSubscribers(const Ice::IdentitySeq& ids)
1293{
1294 // First update the database
1295
1296 LogUpdate llu;
1297 bool found = false;
1298 try
1299 {
1300 IceDB::ReadWriteTxn txn(_instance->dbEnv());
1301
1302 for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1303 {
1304 SubscriberRecordKey key;
1305 key.topic = _id;
1306 key.id = *id;
1307
1308 if (_subscriberMap.del(txn, key))
1309 {
1310 found = true;
1311 }
1312 }
1313
1314 if (found)
1315 {
1316 llu = getIncrementedLLU(txn, _lluMap);
1317 txn.commit();
1318 }
1319 else
1320 {
1321 txn.rollback();
1322 }
1323 }
1324 catch (const IceDB::LMDBException& ex)
1325 {
1326 logError(_instance->communicator(), ex);
1327 throw; // will become UnknownException in caller
1328 }
1329
1330 if (found)
1331 {
1332 // Then remove the subscriber from the subscribers list. Its
1333 // possible that some of these subscribers have already been
1334 // removed (consider, for example, a concurrent reap call from two
1335 // replicas on the same subscriber). To avoid sending unnecessary
1336 // observer updates keep track of the observers that are actually
1337 // removed.
1338 for (Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1339 {
1340 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *id);
1341 if (p != _subscribers.end())
1342 {
1343 (*p)->destroy();
1344 _subscribers.erase(p);
1345 }
1346 }
1347
1348 _instance->observers()->removeSubscriber(llu, _name, ids);
1349 }
1350}
uint8_t data[1]
bool del(const ReadWriteTxn &txn, const K &key, const D &data)
Definition IceDB.h:295
void commit()
Ice::Long generation() const
Definition NodeI.h:160
Ice::ObjectPrx getMaster() const
Definition NodeI.h:154
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
bool destroyed() const
Definition TopicI.cpp:920
LinkInfoSeq getLinkInfoSeq() const
Definition TopicI.cpp:782
void publish(bool, const EventDataSeq &)
Definition TopicI.cpp:980
std::string getName() const
Definition TopicI.cpp:459
void update(const SubscriberRecordSeq &)
Definition TopicI.cpp:864
void observerDestroyTopic(const IceStormElection::LogUpdate &)
Definition TopicI.cpp:1178
void unsubscribe(const Ice::ObjectPrx &)
Definition TopicI.cpp:593
Ice::IdentitySeq getSubscribers() const
Definition TopicI.cpp:804
void link(const TopicPrx &, Ice::Int)
Definition TopicI.cpp:639
TopicPrx proxy() const
Definition TopicI.cpp:934
Ice::ObjectPrx getNonReplicatedPublisher() const
Definition TopicI.cpp:477
Ice::ObjectPtr getServant() const
Definition TopicI.cpp:1199
void reap(const Ice::IdentitySeq &)
Definition TopicI.cpp:743
Ice::ObjectPrx getPublisher() const
Definition TopicI.cpp:466
TopicLinkPrx getLinkProxy()
Definition TopicI.cpp:627
TopicImpl(const PersistentInstancePtr &, const std::string &, const Ice::Identity &, const SubscriberRecordSeq &)
Definition TopicI.cpp:354
Ice::ObjectPrx subscribeAndGetPublisher(const QoS &, const Ice::ObjectPrx &)
Definition TopicI.cpp:510
void updateSubscriberObservers()
Definition TopicI.cpp:1216
void observerRemoveSubscriber(const IceStormElection::LogUpdate &, const Ice::IdentitySeq &)
Definition TopicI.cpp:1119
IceStormElection::TopicContent getContent() const
Definition TopicI.cpp:843
void unlink(const TopicPrx &)
Definition TopicI.cpp:704
void observerAddSubscriber(const IceStormElection::LogUpdate &, const SubscriberRecord &)
Definition TopicI.cpp:1053
Ice::Identity id() const
Definition TopicI.cpp:927
Internal operations for a topic.
#define q
IceUtil::Handle< NodeI > NodeIPtr
Definition Instance.h:36
std::string identityToTopicName(const Ice::Identity &)
Definition Util.cpp:23
IceStormElection::LogUpdate getIncrementedLLU(const IceDB::ReadWriteTxn &, IceStorm::LLUMap &)
Definition Util.cpp:96
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition Util.cpp:51
IceUtil::Handle< Subscriber > SubscriberPtr
Definition Subscriber.h:26
IceDB::ReadWriteCursor< SubscriberRecordKey, SubscriberRecord, IceDB::IceContext, Ice::OutputStream > SubscriberMapRWCursor
Definition Instance.h:132
Callback_TopicInternal_reapPtr newCallback_TopicInternal_reap(const IceUtil::Handle< T > &instance, void(T::*cb)(), void(T::*excb)(const ::Ice::Exception &), void(T::*sentcb)(bool)=0)
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
Definition IceManager.h:70
std::deque<::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
IceUtil::Handle< TraceLevels > TraceLevelsPtr
Definition Instance.h:44
::std::vector<::IceStorm::SubscriberRecord > SubscriberRecordSeq
IceUtil::Handle< PersistentInstance > PersistentInstancePtr
Definition Instance.h:172
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicInternal > TopicInternalPrx
::IceUtil::Handle<::IceStorm::EventData > EventDataPtr
IceUtil::Handle< TopicImpl > TopicImplPtr
Definition TopicI.h:110
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicLink > TopicLinkPrx
const std::string lluDbKey
Definition Util.h:36
::IceInternal::Handle<::Ice::Communicator > CommunicatorPtr
Definition IceManager.h:49
double v(double t, double v0, double a0, double j)
Definition CtrlUtil.h:39
constexpr std::size_t find(string_view str, char_type c) noexcept
A struct used for marking the last log update.
Definition LLURecord.h:103
The contents of topic.
Definition Election.ice:24
IceStorm::SubscriberRecordSeq records
The topic subscribers.
Definition Election.ice:28
Ice::Identity id
The topic identity.
Definition Election.ice:26
The key for persistent subscribers, or topics.
Used to store persistent information for persistent subscribers.
::IceStorm::TopicPrx theTopic