17 #include <Ice/LoggerUtil.h>
31 Ice::Error error(com->getLogger());
32 error <<
"LMDB error: " << ex;
39 class PublisherI :
public Ice::BlobjectArray
44 _topic(
topic), _instance(instance)
49 ice_invoke(
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
51 const Ice::Current& current)
60 Ice::ByteSeq
data(inParams.first, inParams.second);
61 event->data.swap(
data);
65 _topic->publish(
false,
v);
85 _impl(impl), _instance(instance)
93 _impl->publish(
true,
v);
107 _impl(impl), _instance(instance)
111 virtual string getName(
const Ice::Current&)
const
115 return _impl->getName();
118 virtual Ice::ObjectPrx getPublisher(
const Ice::Current&)
const
122 return _impl->getPublisher();
125 virtual Ice::ObjectPrx getNonReplicatedPublisher(
const Ice::Current&)
const
129 return _impl->getNonReplicatedPublisher();
132 virtual Ice::ObjectPrx subscribeAndGetPublisher(
const QoS& qos,
const Ice::ObjectPrx& obj,
133 const Ice::Current& current)
138 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
143 return master->subscribeAndGetPublisher(qos, obj);
145 catch (
const Ice::ConnectFailedException&)
147 _instance->node()->recovery(generation);
150 catch (
const Ice::TimeoutException&)
152 _instance->node()->recovery(generation);
159 return _impl->subscribeAndGetPublisher(qos, obj);
164 virtual void unsubscribe(
const Ice::ObjectPrx& subscriber,
const Ice::Current& current)
169 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
174 master->unsubscribe(subscriber);
176 catch (
const Ice::ConnectFailedException&)
178 _instance->node()->recovery(generation);
181 catch (
const Ice::TimeoutException&)
183 _instance->node()->recovery(generation);
190 _impl->unsubscribe(subscriber);
200 return _impl->getLinkProxy();
203 virtual void reap(
const Ice::IdentitySeq& ids,
const Ice::Current& )
206 if (!node->updateMaster(__FILE__, __LINE__))
214 virtual void link(
const TopicPrx& topic,
Ice::Int cost,
const Ice::Current& current)
219 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
224 master->link(topic, cost);
226 catch (
const Ice::ConnectFailedException&)
228 _instance->node()->recovery(generation);
231 catch (
const Ice::TimeoutException&)
233 _instance->node()->recovery(generation);
240 _impl->link(topic, cost);
246 virtual void unlink(
const TopicPrx& topic,
const Ice::Current& current)
251 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
256 master->unlink(topic);
258 catch (
const Ice::ConnectFailedException&)
260 _instance->node()->recovery(generation);
263 catch (
const Ice::TimeoutException&)
265 _instance->node()->recovery(generation);
272 _impl->unlink(topic);
278 virtual LinkInfoSeq getLinkInfoSeq(
const Ice::Current&)
const
282 return _impl->getLinkInfoSeq();
285 virtual Ice::IdentitySeq getSubscribers(
const Ice::Current&)
const
287 return _impl->getSubscribers();
290 virtual void destroy(
const Ice::Current& current)
295 TopicPrx master = getMasterFor(current, generation, __FILE__, __LINE__);
302 catch (
const Ice::ConnectFailedException&)
304 _instance->node()->recovery(generation);
307 catch (
const Ice::TimeoutException&)
309 _instance->node()->recovery(generation);
324 TopicPrx getMasterFor(
const Ice::Current& cur,
Ice::Long& generation,
const char* file,
int line)
const
327 Ice::ObjectPrx master;
330 master = _instance->node()->startUpdate(generation, file, line);
332 return (master) ? TopicPrx::uncheckedCast(master->ice_identity(cur.id)) :
TopicPrx();
341 TopicImpl::TopicImpl(
350 _lluMap(_instance->lluMap()),
351 _subscriberMap(_instance->subscriberMap())
359 _servant =
new TopicI(
this, instance);
373 if (
id.category.empty())
375 pubid.category = _name;
376 pubid.name =
"publish";
377 linkid.category = _name;
378 linkid.name =
"link";
382 pubid.category =
id.category;
383 pubid.name = _name +
".publish";
384 linkid.category =
id.category;
385 linkid.name = _name +
".link";
388 _publisherPrx = _instance->publishAdapter()->add(
new PublisherI(
this, instance), pubid);
389 _linkPrx = TopicLinkPrx::uncheckedCast(
390 _instance->publishAdapter()->add(
new TopicLinkI(
this, instance), linkid));
395 for (SubscriberRecordSeq::const_iterator p = subscribers.begin(); p != subscribers.end(); ++p)
399 if (traceLevels->topic > 0)
401 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
402 out << _name <<
" recreate " << _instance->communicator()->identityToString(
id);
403 if (traceLevels->topic > 1)
416 _subscribers.push_back(subscriber);
418 catch (
const Ice::Exception& ex)
420 Ice::Warning out(traceLevels->logger);
421 out << _name <<
" recreate " << _instance->communicator()->identityToString(
id);
422 if (traceLevels->topic > 1)
426 out <<
" failed: " << ex;
430 if (_instance->observer())
432 _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, 0));
438 __setNoDelete(
false);
441 __setNoDelete(
false);
455 if (_instance->publisherReplicaProxy())
457 return _instance->publisherReplicaProxy()->ice_identity(_publisherPrx->ice_getIdentity());
459 return _publisherPrx;
467 if (!_publisherPrx->ice_getAdapterId().empty())
469 return _instance->publishAdapter()->createIndirectProxy(_publisherPrx->ice_getIdentity());
473 return _instance->publishAdapter()->createDirectProxy(_publisherPrx->ice_getIdentity());
483 for (vector<SubscriberPtr>::const_iterator p =
s.begin(); p !=
s.end(); ++p)
489 out << instance->communicator()->identityToString((*p)->id());
501 if (traceLevels->topic > 0)
503 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
504 out << _name <<
": subscribeAndGetPublisher: null proxy";
506 throw InvalidSubscriber(
"subscriber is a null proxy");
511 if (traceLevels->topic > 0)
513 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
514 out << _name <<
": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(
id);
516 if (traceLevels->topic > 1)
520 for (QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
522 if (p != qos.begin())
528 out <<
" subscriptions: ";
529 trace(out, _instance, _subscribers);
533 IceUtil::Mutex::Lock sync(_subscribersMutex);
543 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
544 if (p != _subscribers.end())
546 throw AlreadySubscribed();
558 key.
id = subscriber->id();
560 _subscriberMap.
put(txn, key, record);
568 logError(_instance->communicator(), ex);
572 _subscribers.push_back(subscriber);
574 _instance->observers()->addSubscriber(llu, _name, record);
576 return subscriber->proxy();
585 if (traceLevels->topic > 0)
587 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
588 out << _name <<
": unsubscribe: null proxy";
590 throw InvalidSubscriber(
"subscriber is a null proxy");
595 if (traceLevels->topic > 0)
597 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
598 out << _name <<
": unsubscribe: " << _instance->communicator()->identityToString(
id);
600 if (traceLevels->topic > 1)
603 trace(out, _instance, _subscribers);
607 IceUtil::Mutex::Lock sync(_subscribersMutex);
608 Ice::IdentitySeq ids;
610 removeSubscribers(ids);
617 if (_instance->publisherReplicaProxy())
619 return TopicLinkPrx::uncheckedCast(_instance->publisherReplicaProxy()->ice_identity(
620 _linkPrx->ice_getIdentity()));
632 if (traceLevels->topic > 0)
634 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
635 out << _name <<
": link " << _instance->communicator()->identityToString(topic->ice_getIdentity())
639 IceUtil::Mutex::Lock sync(_subscribersMutex);
651 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
652 if (p != _subscribers.end())
672 _subscriberMap.
put(txn, key, record);
680 logError(_instance->communicator(), ex);
684 _subscribers.push_back(subscriber);
686 _instance->observers()->addSubscriber(llu, _name, record);
692 IceUtil::Mutex::Lock sync(_subscribersMutex);
695 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
700 vector<SubscriberPtr>::const_iterator p = find(_subscribers.begin(), _subscribers.end(),
id);
701 if (p == _subscribers.end())
705 if (traceLevels->topic > 0)
707 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
708 out << _name <<
": unlink " << name <<
" failed - not linked";
717 if (traceLevels->topic > 0)
719 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
720 out << _name <<
" unlink " << _instance->communicator()->identityToString(
id);
723 Ice::IdentitySeq ids;
725 removeSubscribers(ids);
731 IceUtil::Mutex::Lock sync(_subscribersMutex);
734 if (traceLevels->topic > 0)
736 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
737 out << _name <<
": reap ";
738 for (Ice::IdentitySeq::const_iterator p = ids.begin(); p != ids.end() ; ++p)
740 if (p != ids.begin())
744 out << _instance->communicator()->identityToString(*p);
748 removeSubscribers(ids);
754 IceUtil::Mutex::Lock sync(_subscribersMutex);
758 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
769 IceUtil::Mutex::Lock sync(_subscribersMutex);
772 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
775 if (record.
link && !(*p)->errored())
779 info.cost = record.
cost;
790 IceUtil::Mutex::Lock sync(_subscribersMutex);
792 Ice::IdentitySeq subscribers;
793 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
795 subscribers.push_back((*p)->id());
803 IceUtil::Mutex::Lock sync(_subscribersMutex);
807 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
812 if (traceLevels->topic > 0)
814 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
815 out << _name <<
": destroy";
820 _instance->observers()->destroyTopic(destroyInternal(llu,
true), _name);
828 IceUtil::Mutex::Lock sync(_subscribersMutex);
832 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
837 if (!(*p)->errored())
839 content.
records.push_back((*p)->record());
848 IceUtil::Mutex::Lock sync(_subscribersMutex);
856 vector<SubscriberPtr>::iterator p = _subscribers.begin();
857 while (p != _subscribers.end())
859 SubscriberRecordSeq::const_iterator
q;
860 for (
q = records.begin();
q != records.end(); ++
q)
862 if ((*p)->id() ==
q->id)
869 if (
q == records.end())
872 p = _subscribers.erase(p);
877 (*p)->resetIfReaped();
883 for (SubscriberRecordSeq::const_iterator p = records.begin(); p != records.end(); ++p)
885 vector<SubscriberPtr>::iterator
q;
886 for (
q = _subscribers.begin();
q != _subscribers.end(); ++
q)
888 if ((*q)->id() == p->id)
893 if (
q == _subscribers.end())
896 _subscribers.push_back(subscriber);
904 IceUtil::Mutex::Lock sync(_subscribersMutex);
920 if (_instance->topicReplicaProxy())
922 prx = _instance->topicReplicaProxy()->ice_identity(_id);
926 prx = _instance->topicAdapter()->createProxy(_id);
928 return TopicPrx::uncheckedCast(prx);
934 class TopicInternalReapCB :
public IceUtil::Shared
939 _instance(instance), _generation(generation)
943 virtual void exception(
const Ice::Exception& ex)
946 if (traceLevels->topic > 0)
948 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
949 out <<
"exception when calling `reap' on the master replica: " << ex;
951 _instance->node()->recovery(_generation);
967 Ice::IdentitySeq
reap;
976 vector<SubscriberPtr>
copy;
978 IceUtil::Mutex::Lock sync(_subscribersMutex);
983 _observer->forwarded();
987 _observer->published();
997 for (vector<SubscriberPtr>::const_iterator p =
copy.begin(); p !=
copy.end(); ++p)
999 if (!(*p)->queue(forwarded, events) && (*p)->reap())
1001 reap.push_back((*p)->id());
1012 IceUtil::Mutex::Lock sync(_subscribersMutex);
1013 removeSubscribers(
reap);
1016 masterInternal = TopicInternalPrx::uncheckedCast(unlock.
getMaster()->ice_identity(_id));
1030 &TopicInternalReapCB::exception));
1036 IceUtil::Mutex::Lock sync(_subscribersMutex);
1039 if (traceLevels->topic > 0)
1041 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1042 out << _name <<
": add replica observer: " << _instance->communicator()->identityToString(record.
id);
1044 if (traceLevels->topic > 1)
1048 for (QoS::const_iterator p = record.
theQoS.begin(); p != record.
theQoS.end() ; ++p)
1050 if (p != record.
theQoS.begin())
1054 out <<
'[' << p->first <<
"," << p->second <<
']';
1060 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
1061 if (p != _subscribers.end())
1066 if (traceLevels->topic > 0)
1068 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1069 out << _instance->communicator()->identityToString(record.
id) <<
": already subscribed";
1081 key.
id = subscriber->id();
1083 _subscriberMap.
put(txn, key, record);
1092 logError(_instance->communicator(), ex);
1096 _subscribers.push_back(subscriber);
1103 if (traceLevels->topic > 0)
1105 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1106 out << _name <<
": remove replica observer: ";
1107 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1109 if (
id != ids.begin())
1113 out << _instance->communicator()->identityToString(*
id);
1118 IceUtil::Mutex::Lock sync(_subscribersMutex);
1125 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1131 _subscriberMap.
del(txn, key);
1140 logError(_instance->communicator(), ex);
1147 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1149 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *
id);
1150 if (p != _subscribers.end())
1153 _subscribers.erase(p);
1161 IceUtil::Mutex::Lock sync(_subscribersMutex);
1170 if (traceLevels->topic > 0)
1172 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
1173 out << _name <<
": destroyed";
1176 destroyInternal(llu,
false);
1188 IceUtil::Mutex::Lock sync(_subscribersMutex);
1189 if (_instance->observer())
1191 _observer.attach(_instance->observer()->getTopicObserver(_instance->serviceName(), _name, _observer.get()));
1198 IceUtil::Mutex::Lock sync(_subscribersMutex);
1199 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
1201 (*p)->updateObserver();
1206 TopicImpl::destroyInternal(
const LogUpdate& origLLU,
bool master)
1220 if (cursor.find(key))
1222 _subscriberMap.
del(txn, key);
1226 while (cursor.get(k,
v, MDB_NEXT) && k.
topic == key.
topic)
1228 _subscriberMap.
del(txn, k);
1247 logError(_instance->communicator(), ex);
1251 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
1252 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
1253 _instance->topicReaper()->add(_name);
1256 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
1260 _subscribers.clear();
1262 _instance->topicAdapter()->remove(_id);
1270 TopicImpl::removeSubscribers(
const Ice::IdentitySeq& ids)
1280 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1286 if (_subscriberMap.
del(txn, key))
1304 logError(_instance->communicator(), ex);
1316 for (Ice::IdentitySeq::const_iterator
id = ids.begin();
id != ids.end(); ++
id)
1318 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), *
id);
1319 if (p != _subscribers.end())
1322 _subscribers.erase(p);
1326 _instance->observers()->removeSubscriber(llu, _name, ids);