31 class TransientPublisherI :
public Ice::BlobjectArray
41 ice_invoke(
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
43 const Ice::Current& current)
56 Ice::ByteSeq
data(inParams.first, inParams.second);
57 event->data.swap(
data);
61 _impl->publish(
false,
v);
75 class TransientTopicLinkI :
public TopicLink
87 _impl->publish(
true,
v);
118 if (
id.category.empty())
120 pubid.category = _name;
121 pubid.name =
"publish";
122 linkid.category = _name;
123 linkid.name =
"link";
127 pubid.category =
id.category;
128 pubid.name = _name +
".publish";
129 linkid.category =
id.category;
130 linkid.name = _name +
".link";
133 _publisherPrx = _instance->publishAdapter()->add(
new TransientPublisherI(
this), pubid);
134 _linkPrx = TopicLinkPrx::uncheckedCast(_instance->publishAdapter()->add(
new TransientTopicLinkI(
this), linkid));
152 return _publisherPrx;
159 return _publisherPrx;
168 if (traceLevels->topic > 0)
170 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
171 out << _name <<
": subscribe: null proxy";
173 throw InvalidSubscriber(
"subscriber is a null proxy");
178 if (traceLevels->topic > 0)
180 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
181 out << _name <<
": subscribe: " << _instance->communicator()->identityToString(
id);
183 if (traceLevels->topic > 1)
187 for (QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
189 if (p != qos.begin())
193 out <<
'[' << p->first <<
"," << p->second <<
']';
198 string reliability =
"oneway";
200 QoS::iterator p = qos.find(
"reliability");
203 reliability = p->second;
208 Ice::ObjectPrx newObj = obj;
209 if (reliability ==
"batch")
211 if (newObj->ice_isDatagram())
213 newObj = newObj->ice_batchDatagram();
217 newObj = newObj->ice_batchOneway();
220 else if (reliability ==
"twoway")
222 newObj = newObj->ice_twoway();
224 else if (reliability ==
"twoway ordered")
226 qos[
"reliability"] =
"ordered";
227 newObj = newObj->ice_twoway();
231 if (reliability !=
"oneway" && traceLevels->subscriber > 0)
233 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
234 out << reliability <<
" mode not understood.";
236 if (!newObj->ice_isDatagram())
238 newObj = newObj->ice_oneway();
251 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
252 if (p != _subscribers.end())
257 _subscribers.erase(p);
261 _subscribers.push_back(subscriber);
270 if (traceLevels->topic > 0)
272 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
273 out << _name <<
": subscribe: null proxy";
275 throw InvalidSubscriber(
"subscriber is a null proxy");
280 if (traceLevels->topic > 0)
282 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
283 out << _name <<
": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(
id);
285 if (traceLevels->topic > 1)
289 for (QoS::const_iterator p = qos.begin(); p != qos.end() ; ++p)
291 if (p != qos.begin())
310 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
311 if (p != _subscribers.end())
313 throw AlreadySubscribed();
317 _subscribers.push_back(subscriber);
319 return subscriber->proxy();
328 if (traceLevels->topic > 0)
330 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
331 out << _name <<
": unsubscribe: null proxy";
333 throw InvalidSubscriber(
"subscriber is a null proxy");
338 if (traceLevels->topic > 0)
340 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
341 out << _name <<
": unsubscribe: " << _instance->communicator()->identityToString(
id);
342 if (traceLevels->topic > 1)
352 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(),
id);
353 if (p != _subscribers.end())
356 _subscribers.erase(p);
374 if (traceLevels->topic > 0)
376 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
377 out << _name <<
": link " << _instance->communicator()->identityToString(topic->ice_getIdentity())
393 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
394 if (p != _subscribers.end())
403 _subscribers.push_back(subscriber);
412 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
417 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(),
id);
418 if (p == _subscribers.end())
422 if (traceLevels->topic > 0)
424 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
425 out << _name <<
": unlink " << name <<
" failed - not linked";
434 if (traceLevels->topic > 0)
436 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
437 out << _name <<
" unlink " << _instance->communicator()->identityToString(
id);
443 p = find(_subscribers.begin(), _subscribers.end(),
id);
444 if (p != _subscribers.end())
447 _subscribers.erase(p);
456 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
459 if (record.
link && !(*p)->errored())
463 info.cost = record.
cost;
474 IceUtil::Mutex::Lock sync(*
this);
476 Ice::IdentitySeq subscribers;
477 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
479 subscribers.push_back((*p)->id());
491 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
496 if (traceLevels->topic > 0)
498 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
499 out << _name <<
": destroy";
504 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
505 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
507 catch (
const Ice::ObjectAdapterDeactivatedException&)
513 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)
517 _subscribers.clear();
546 vector<SubscriberPtr>
copy;
556 vector<Ice::Identity> e;
557 for (vector<SubscriberPtr>::const_iterator p =
copy.begin(); p !=
copy.end(); ++p)
559 if (!(*p)->queue(forwarded, events) && (*p)->reap())
561 e.push_back((*p)->id());
572 for (vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
588 vector<SubscriberPtr>::iterator
q = find(_subscribers.begin(), _subscribers.end(), *ep);
589 if (
q != _subscribers.end())
595 subscriber->destroy();
596 _subscribers.erase(
q);
608 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end(); ++p)