12 #include <Ice/LoggerUtil.h>
32 Ice::Error error(com->getLogger());
33 error <<
"LMDB error: " << ex;
40 class PublisherI :
public Ice::BlobjectArray
44 _topic(
topic), _instance(instance)
49 ice_invoke(
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
51 const Ice::Current& current)
55 new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
61 Ice::ByteSeq
data(inParams.first, inParams.second);
62 event->data.swap(
data);
66 _topic->publish(
false,
v);
84 _impl(impl), _instance(instance)
92 _impl->publish(
true,
v);
104 _impl(impl), _instance(instance)
109 getName(
const Ice::Current&)
const
113 return _impl->getName();
116 virtual Ice::ObjectPrx
117 getPublisher(
const Ice::Current&)
const
121 return _impl->getPublisher();
124 virtual Ice::ObjectPrx
125 getNonReplicatedPublisher(
const Ice::Current&)
const
129 return _impl->getNonReplicatedPublisher();
132 virtual Ice::ObjectPrx
133 subscribeAndGetPublisher(
const QoS& qos,
134 const Ice::ObjectPrx& obj,
135 const Ice::Current& current)
140 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
145 return master->subscribeAndGetPublisher(qos, obj);
147 catch (
const Ice::ConnectFailedException&)
149 _instance->node()->recovery(generation);
152 catch (
const Ice::TimeoutException&)
154 _instance->node()->recovery(generation);
161 return _impl->subscribeAndGetPublisher(qos, obj);
167 unsubscribe(
const Ice::ObjectPrx& subscriber,
const Ice::Current& current)
172 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
177 master->unsubscribe(subscriber);
179 catch (
const Ice::ConnectFailedException&)
181 _instance->node()->recovery(generation);
184 catch (
const Ice::TimeoutException&)
186 _instance->node()->recovery(generation);
193 _impl->unsubscribe(subscriber);
200 getLinkProxy(
const Ice::Current&)
204 return _impl->getLinkProxy();
208 reap(
const Ice::IdentitySeq& ids,
const Ice::Current& )
211 if (!node->updateMaster(__FILE__, __LINE__))
225 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
230 master->link(topic, cost);
232 catch (
const Ice::ConnectFailedException&)
234 _instance->node()->recovery(generation);
237 catch (
const Ice::TimeoutException&)
239 _instance->node()->recovery(generation);
246 _impl->link(topic, cost);
253 unlink(
const TopicPrx& topic,
const Ice::Current& current)
258 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
263 master->unlink(topic);
265 catch (
const Ice::ConnectFailedException&)
267 _instance->node()->recovery(generation);
270 catch (
const Ice::TimeoutException&)
272 _instance->node()->recovery(generation);
279 _impl->unlink(topic);
286 getLinkInfoSeq(
const Ice::Current&)
const
290 return _impl->getLinkInfoSeq();
293 virtual Ice::IdentitySeq
294 getSubscribers(
const Ice::Current&)
const
296 return _impl->getSubscribers();
300 destroy(
const Ice::Current& current)
305 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
312 catch (
const Ice::ConnectFailedException&)
314 _instance->node()->recovery(generation);
317 catch (
const Ice::TimeoutException&)
319 _instance->node()->recovery(generation);
334 getMasterFor(
const Ice::Current& cur,
340 Ice::ObjectPrx master;
343 master = _instance->node()->startUpdate(generation, file, line);
345 return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) :
TopicPrx();
362 _lluMap(_instance->lluMap()),
363 _subscriberMap(_instance->subscriberMap())
371 _servant =
new TopicI(
this, instance);
385 if (
id.category.empty())
387 pubid.category = _name;
388 pubid.name =
"publish";
389 linkid.category = _name;
390 linkid.name =
"link";
394 pubid.category =
id.category;
395 pubid.name = _name +
".publish";
396 linkid.category =
id.category;
397 linkid.name = _name +
".link";
400 _publisherPrx = _instance->publishAdapter()->add(
new PublisherI(
this, instance), pubid);
401 _linkPrx = TopicLinkPrx::uncheckedCast(
402 _instance->publishAdapter()->add(
new TopicLinkI(
this, instance), linkid));
407 for (SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end();
412 if (traceLevels->topic > 0)
414 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
415 out << _name <<
" recreate " << _instance->communicator()->identityToString(
id);
416 if (traceLevels->topic > 1)
429 _subscribers.push_back(subscriber);
431 catch (
const Ice::Exception& ex)
433 Ice::Warning out(traceLevels->logger);
434 out << _name <<
" recreate " << _instance->communicator()->identityToString(
id);
435 if (traceLevels->topic > 1)
439 out <<
" failed: " << ex;
443 if (_instance->observer())
446 _instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0));
452 __setNoDelete(
false);
455 __setNoDelete(
false);
469 if (_instance->publisherReplicaProxy())
471 return _instance->publisherReplicaProxy()->ice_identity(_publisherPrx->ice_getIdentity());
473 return _publisherPrx;
481 if (!_publisherPrx->ice_getAdapterId().empty())
483 return _instance->publishAdapter()->createIndirectProxy(_publisherPrx->ice_getIdentity());
487 return _instance->publishAdapter()->createDirectProxy(_publisherPrx->ice_getIdentity());
497 for (vector<SubscriberPtr>::const_iterator p =
s.begin(); p !=
s.end(); ++p)
503 out << instance->communicator()->identityToString((*p)->id());
515 if (traceLevels->topic > 0)
517 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
518 out << _name <<
": subscribeAndGetPublisher: null proxy";
520 throw InvalidSubscriber(
"subscriber is a null proxy");
525 if (traceLevels->topic > 0)
527 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
529 <<
": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(
id);
531 if (traceLevels->topic > 1)
534 for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
536 if (p != qos.begin())
541 out <<
" subscriptions: ";
542 trace(out, _instance, _subscribers);
546 IceUtil::Mutex::Lock sync(_subscribersMutex);
556 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
557 if (p != _subscribers.end())
559 throw AlreadySubscribed();
571 key.
id = subscriber->id();
573 _subscriberMap.
put(txn, key, record);
581 logError(_instance->communicator(), ex);
585 _subscribers.push_back(subscriber);
587 _instance->observers()->addSubscriber(llu, _name, record);
589 return subscriber->proxy();
598 if (traceLevels->topic > 0)
600 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
601 out << _name <<
": unsubscribe: null proxy";
603 throw InvalidSubscriber(
"subscriber is a null proxy");
608 if (traceLevels->topic > 0)
610 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
611 out << _name <<
": unsubscribe: " << _instance->communicator()->identityToString(
id);
613 if (traceLevels->topic > 1)
616 trace(out, _instance, _subscribers);
620 IceUtil::Mutex::Lock sync(_subscribersMutex);
621 Ice::IdentitySeq ids;
623 removeSubscribers(ids);
630 if (_instance->publisherReplicaProxy())
632 return TopicLinkPrx::uncheckedCast(
633 _instance->publisherReplicaProxy()->ice_identity(_linkPrx->ice_getIdentity()));
645 if (traceLevels->topic > 0)
647 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
648 out << _name <<
": link "
649 << _instance->communicator()->identityToString(topic->ice_getIdentity()) <<
" cost "
653 IceUtil::Mutex::Lock sync(_subscribersMutex);
665 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
666 if (p != _subscribers.end())
686 _subscriberMap.
put(txn, key, record);
694 logError(_instance->communicator(), ex);
698 _subscribers.push_back(subscriber);
700 _instance->observers()->addSubscriber(llu, _name, record);
706 IceUtil::Mutex::Lock sync(_subscribersMutex);
709 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
714 vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(),
id);
715 if (p == _subscribers.end())
719 if (traceLevels->topic > 0)
721 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
722 out << _name <<
": unlink " << name <<
" failed - not linked";
731 if (traceLevels->topic > 0)
733 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
734 out << _name <<
" unlink " << _instance->communicator()->identityToString(
id);
737 Ice::IdentitySeq ids;
739 removeSubscribers(ids);
745 IceUtil::Mutex::Lock sync(_subscribersMutex);
748 if (traceLevels->topic > 0)
750 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
751 out << _name <<
": reap ";
752 for (Ice::IdentitySeq::const_iterator p = ids.begin(); p != ids.end(); ++p)
754 if (p != ids.begin())
758 out << _instance->communicator()->identityToString(*p);
762 removeSubscribers(ids);
768 IceUtil::Mutex::Lock sync(_subscribersMutex);
772 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
784 IceUtil::Mutex::Lock sync(_subscribersMutex);
787 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
791 if (record.
link && !(*p)->errored())
795 info.cost = record.
cost;
806 IceUtil::Mutex::Lock sync(_subscribersMutex);
808 Ice::IdentitySeq subscribers;
809 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
812 subscribers.push_back((*p)->id());
820 IceUtil::Mutex::Lock sync(_subscribersMutex);
824 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
829 if (traceLevels->topic > 0)
831 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
832 out << _name <<
": destroy";
837 _instance->observers()->destroyTopic(destroyInternal(llu,
true), _name);
845 IceUtil::Mutex::Lock sync(_subscribersMutex);
849 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
855 if (!(*p)->errored())
857 content.
records.push_back((*p)->record());
866 IceUtil::Mutex::Lock sync(_subscribersMutex);
874 vector<SubscriberPtr>::iterator p = _subscribers.begin();
875 while (p != _subscribers.end())
877 SubscriberRecordSeq::const_iterator
q;
878 for (
q = records.begin();
q != records.end(); ++
q)
880 if ((*p)->id() ==
q->id)
887 if (
q == records.end())
890 p = _subscribers.erase(p);
895 (*p)->resetIfReaped();
901 for (SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p)
903 vector<SubscriberPtr>::iterator
q;
904 for (
q = _subscribers.begin();
q != _subscribers.end(); ++
q)
906 if ((*q)->id() == p->id)
911 if (
q == _subscribers.end())
914 _subscribers.push_back(subscriber);
922 IceUtil::Mutex::Lock sync(_subscribersMutex);
938 if (_instance->topicReplicaProxy())
940 prx = _instance->topicReplicaProxy()->ice_identity(_id);
944 prx = _instance->topicAdapter()->createProxy(_id);
946 return TopicPrx::uncheckedCast(prx);
952 class TopicInternalReapCB :
public IceUtil::Shared
956 _instance(instance), _generation(generation)
961 exception(
const Ice::Exception& ex)
964 if (traceLevels->topic > 0)
966 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
967 out <<
"exception when calling `reap' on the master replica: " << ex;
969 _instance->node()->recovery(_generation);
984 Ice::IdentitySeq
reap;
993 vector<SubscriberPtr>
copy;
995 IceUtil::Mutex::Lock sync(_subscribersMutex);
1000 _observer->forwarded();
1004 _observer->published();
1007 copy = _subscribers;
1014 for (vector<SubscriberPtr>::const_iterator p =
copy.begin(); p !=
copy.end(); ++p)
1016 if (!(*p)->queue(forwarded, events) && (*p)->reap())
1018 reap.push_back((*p)->id());
1029 IceUtil::Mutex::Lock sync(_subscribersMutex);
1030 removeSubscribers(
reap);
1033 masterInternal = TopicInternalPrx::uncheckedCast(unlock.
getMaster()->ice_identity(_id));
1046 masterInternal->begin_reap(
1049 &TopicInternalReapCB::exception));
1055 IceUtil::Mutex::Lock sync(_subscribersMutex);
1058 if (traceLevels->topic > 0)
1060 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1062 <<
": add replica observer: " << _instance->communicator()->identityToString(record.
id);
1064 if (traceLevels->topic > 1)
1067 for (QoS::const_iterator p = record.
theQoS.begin(); p != record.
theQoS.end(); ++p)
1069 if (p != record.
theQoS.begin())
1073 out <<
'[' << p->first <<
"," << p->second <<
']';
1079 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
1080 if (p != _subscribers.end())
1085 if (traceLevels->topic > 0)
1087 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1088 out << _instance->communicator()->identityToString(record.
id) <<
": already subscribed";
1100 key.
id = subscriber->id();
1102 _subscriberMap.
put(txn, key, record);
1111 logError(_instance->communicator(), ex);
1115 _subscribers.push_back(subscriber);
1122 if (traceLevels->topic > 0)
1124 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1125 out << _name <<
": remove replica observer: ";
1126 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1128 if (
id != ids.begin())
1132 out << _instance->communicator()->identityToString(*
id);
1137 IceUtil::Mutex::Lock sync(_subscribersMutex);
1144 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1150 _subscriberMap.
del(txn, key);
1159 logError(_instance->communicator(), ex);
1166 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1168 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *
id);
1169 if (p != _subscribers.end())
1172 _subscribers.erase(p);
1180 IceUtil::Mutex::Lock sync(_subscribersMutex);
1189 if (traceLevels->topic > 0)
1191 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1192 out << _name <<
": destroyed";
1195 destroyInternal(llu,
false);
1207 IceUtil::Mutex::Lock sync(_subscribersMutex);
1208 if (_instance->observer())
1210 _observer.attach(_instance->observer()->getTopicObserver(
1211 _instance->serviceName(), _name, _observer.get()));
1218 IceUtil::Mutex::Lock sync(_subscribersMutex);
1219 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
1222 (*p)->updateObserver();
1227 TopicImpl::destroyInternal(
const LogUpdate& origLLU,
bool master)
1241 if (cursor.find(key))
1243 _subscriberMap.
del(txn, key);
1247 while (cursor.get(k,
v, MDB_NEXT) && k.
topic == key.
topic)
1249 _subscriberMap.
del(txn, k);
1268 logError(_instance->communicator(), ex);
1272 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
1273 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
1274 _instance->topicReaper()->add(_name);
1277 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
1282 _subscribers.clear();
1284 _instance->topicAdapter()->remove(_id);
1292 TopicImpl::removeSubscribers(
const Ice::IdentitySeq& ids)
1302 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1308 if (_subscriberMap.
del(txn, key))
1326 logError(_instance->communicator(), ex);
1338 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1340 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *
id);
1341 if (p != _subscribers.end())
1344 _subscribers.erase(p);
1348 _instance->observers()->removeSubscriber(llu, _name, ids);