30 class TransientPublisherI :
public Ice::BlobjectArray
38 ice_invoke(
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
40 const Ice::Current& current)
44 new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
50 Ice::ByteSeq
data(inParams.first, inParams.second);
51 event->data.swap(
data);
55 _impl->publish(
false,
v);
68 class TransientTopicLinkI :
public TopicLink
78 _impl->publish(
true,
v);
90 _instance(instance), _name(name), _id(id), _destroyed(false)
104 if (
id.category.empty())
106 pubid.category = _name;
107 pubid.name =
"publish";
108 linkid.category = _name;
109 linkid.name =
"link";
113 pubid.category =
id.category;
114 pubid.name = _name +
".publish";
115 linkid.category =
id.category;
116 linkid.name = _name +
".link";
119 _publisherPrx = _instance->publishAdapter()->add(
new TransientPublisherI(
this), pubid);
120 _linkPrx = TopicLinkPrx::uncheckedCast(
121 _instance->publishAdapter()->add(
new TransientTopicLinkI(
this), linkid));
139 return _publisherPrx;
146 return _publisherPrx;
155 if (traceLevels->topic > 0)
157 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
158 out << _name <<
": subscribe: null proxy";
160 throw InvalidSubscriber(
"subscriber is a null proxy");
165 if (traceLevels->topic > 0)
167 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
168 out << _name <<
": subscribe: " << _instance->communicator()->identityToString(
id);
170 if (traceLevels->topic > 1)
173 for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
175 if (p != qos.begin())
179 out <<
'[' << p->first <<
"," << p->second <<
']';
184 string reliability =
"oneway";
186 QoS::iterator p = qos.find(
"reliability");
189 reliability = p->second;
194 Ice::ObjectPrx newObj = obj;
195 if (reliability ==
"batch")
197 if (newObj->ice_isDatagram())
199 newObj = newObj->ice_batchDatagram();
203 newObj = newObj->ice_batchOneway();
206 else if (reliability ==
"twoway")
208 newObj = newObj->ice_twoway();
210 else if (reliability ==
"twoway ordered")
212 qos[
"reliability"] =
"ordered";
213 newObj = newObj->ice_twoway();
217 if (reliability !=
"oneway" && traceLevels->subscriber > 0)
219 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
220 out << reliability <<
" mode not understood.";
222 if (!newObj->ice_isDatagram())
224 newObj = newObj->ice_oneway();
237 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
238 if (p != _subscribers.end())
243 _subscribers.erase(p);
247 _subscribers.push_back(subscriber);
252 const Ice::ObjectPrx& obj,
258 if (traceLevels->topic > 0)
260 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
261 out << _name <<
": subscribe: null proxy";
263 throw InvalidSubscriber(
"subscriber is a null proxy");
268 if (traceLevels->topic > 0)
270 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
272 <<
": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(
id);
274 if (traceLevels->topic > 1)
277 for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
279 if (p != qos.begin())
297 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
298 if (p != _subscribers.end())
300 throw AlreadySubscribed();
304 _subscribers.push_back(subscriber);
306 return subscriber->proxy();
315 if (traceLevels->topic > 0)
317 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
318 out << _name <<
": unsubscribe: null proxy";
320 throw InvalidSubscriber(
"subscriber is a null proxy");
325 if (traceLevels->topic > 0)
327 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
328 out << _name <<
": unsubscribe: " << _instance->communicator()->identityToString(
id);
329 if (traceLevels->topic > 1)
339 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(),
id);
340 if (p != _subscribers.end())
343 _subscribers.erase(p);
361 if (traceLevels->topic > 0)
363 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
364 out << _name <<
": link "
365 << _instance->communicator()->identityToString(topic->ice_getIdentity()) <<
" cost "
381 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
382 if (p != _subscribers.end())
391 _subscribers.push_back(subscriber);
400 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
405 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(),
id);
406 if (p == _subscribers.end())
410 if (traceLevels->topic > 0)
412 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
413 out << _name <<
": unlink " << name <<
" failed - not linked";
422 if (traceLevels->topic > 0)
424 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
425 out << _name <<
" unlink " << _instance->communicator()->identityToString(
id);
431 p = find(_subscribers.begin(), _subscribers.end(),
id);
432 if (p != _subscribers.end())
435 _subscribers.erase(p);
444 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
448 if (record.
link && !(*p)->errored())
452 info.cost = record.
cost;
463 IceUtil::Mutex::Lock sync(*
this);
465 Ice::IdentitySeq subscribers;
466 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
469 subscribers.push_back((*p)->id());
481 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
486 if (traceLevels->topic > 0)
488 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
489 out << _name <<
": destroy";
494 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
495 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
497 catch (
const Ice::ObjectAdapterDeactivatedException&)
503 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
508 _subscribers.clear();
537 vector<SubscriberPtr>
copy;
547 vector<Ice::Identity> e;
548 for (vector<SubscriberPtr>::const_iterator p =
copy.begin(); p !=
copy.end(); ++p)
550 if (!(*p)->queue(forwarded, events) && (*p)->reap())
552 e.push_back((*p)->id());
563 for (vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
579 vector<SubscriberPtr>::iterator
q = find(_subscribers.begin(), _subscribers.end(), *ep);
580 if (
q != _subscribers.end())
586 subscriber->destroy();
587 _subscribers.erase(
q);
599 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();