30 class TransientPublisherI :
public Ice::BlobjectArray
38 ice_invoke(
const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
40 const Ice::Current& current)
44 new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
50 Ice::ByteSeq
data(inParams.first, inParams.second);
51 event->data.swap(
data);
55 _impl->publish(
false, v);
68 class TransientTopicLinkI :
public TopicLink
78 _impl->publish(
true, v);
89 const Ice::Identity&
id) :
90 _instance(instance), _name(name), _id(
id), _destroyed(false)
103 Ice::Identity linkid;
104 if (
id.category.empty())
106 pubid.category = _name;
107 pubid.name =
"publish";
108 linkid.category = _name;
109 linkid.name =
"link";
113 pubid.category = id.category;
114 pubid.name = _name +
".publish";
115 linkid.category = id.category;
116 linkid.name = _name +
".link";
119 _publisherPrx = _instance->publishAdapter()->add(
new TransientPublisherI(
this), pubid);
120 _linkPrx = TopicLinkPrx::uncheckedCast(
121 _instance->publishAdapter()->add(
new TransientTopicLinkI(
this), linkid));
139 return _publisherPrx;
146 return _publisherPrx;
155 if (traceLevels->topic > 0)
157 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
158 out << _name <<
": subscribe: null proxy";
160 throw InvalidSubscriber(
"subscriber is a null proxy");
162 Ice::Identity
id = obj->ice_getIdentity();
165 if (traceLevels->topic > 0)
167 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
168 out << _name <<
": subscribe: " << _instance->communicator()->identityToString(
id);
170 if (traceLevels->topic > 1)
173 for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
175 if (p != qos.begin())
179 out <<
'[' << p->first <<
"," << p->second <<
']';
184 string reliability =
"oneway";
186 QoS::iterator p = qos.find(
"reliability");
189 reliability = p->second;
194 Ice::ObjectPrx newObj = obj;
195 if (reliability ==
"batch")
197 if (newObj->ice_isDatagram())
199 newObj = newObj->ice_batchDatagram();
203 newObj = newObj->ice_batchOneway();
206 else if (reliability ==
"twoway")
208 newObj = newObj->ice_twoway();
210 else if (reliability ==
"twoway ordered")
212 qos[
"reliability"] =
"ordered";
213 newObj = newObj->ice_twoway();
217 if (reliability !=
"oneway" && traceLevels->subscriber > 0)
219 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
220 out << reliability <<
" mode not understood.";
222 if (!newObj->ice_isDatagram())
224 newObj = newObj->ice_oneway();
237 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
238 if (p != _subscribers.end())
243 _subscribers.erase(p);
247 _subscribers.push_back(subscriber);
252 const Ice::ObjectPrx& obj,
258 if (traceLevels->topic > 0)
260 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
261 out << _name <<
": subscribe: null proxy";
263 throw InvalidSubscriber(
"subscriber is a null proxy");
265 Ice::Identity
id = obj->ice_getIdentity();
268 if (traceLevels->topic > 0)
270 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
272 <<
": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(
id);
274 if (traceLevels->topic > 1)
277 for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
279 if (p != qos.begin())
297 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
298 if (p != _subscribers.end())
300 throw AlreadySubscribed();
304 _subscribers.push_back(subscriber);
306 return subscriber->proxy();
315 if (traceLevels->topic > 0)
317 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
318 out << _name <<
": unsubscribe: null proxy";
320 throw InvalidSubscriber(
"subscriber is a null proxy");
323 Ice::Identity
id = subscriber->ice_getIdentity();
325 if (traceLevels->topic > 0)
327 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
328 out << _name <<
": unsubscribe: " << _instance->communicator()->identityToString(
id);
329 if (traceLevels->topic > 1)
339 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(),
id);
340 if (p != _subscribers.end())
343 _subscribers.erase(p);
361 if (traceLevels->topic > 0)
363 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
364 out << _name <<
": link "
365 << _instance->communicator()->identityToString(topic->ice_getIdentity()) <<
" cost "
371 Ice::Identity
id = topic->ice_getIdentity();
381 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.
id);
382 if (p != _subscribers.end())
391 _subscribers.push_back(subscriber);
400 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
403 Ice::Identity
id = topic->ice_getIdentity();
405 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(),
id);
406 if (p == _subscribers.end())
410 if (traceLevels->topic > 0)
412 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
413 out << _name <<
": unlink " << name <<
" failed - not linked";
422 if (traceLevels->topic > 0)
424 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
425 out << _name <<
" unlink " << _instance->communicator()->identityToString(
id);
431 p = find(_subscribers.begin(), _subscribers.end(),
id);
432 if (p != _subscribers.end())
435 _subscribers.erase(p);
444 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
448 if (record.
link && !(*p)->errored())
452 info.cost = record.
cost;
463 IceUtil::Mutex::Lock sync(*
this);
465 Ice::IdentitySeq subscribers;
466 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
469 subscribers.push_back((*p)->id());
481 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
486 if (traceLevels->topic > 0)
488 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
489 out << _name <<
": destroy";
494 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
495 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
497 catch (
const Ice::ObjectAdapterDeactivatedException&)
503 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
508 _subscribers.clear();
537 vector<SubscriberPtr> copy;
547 vector<Ice::Identity> e;
548 for (vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
550 if (!(*p)->queue(forwarded, events) && (*p)->reap())
552 e.push_back((*p)->id());
563 for (vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
579 vector<SubscriberPtr>::iterator
q = find(_subscribers.begin(), _subscribers.end(), *ep);
580 if (
q != _subscribers.end())
586 subscriber->destroy();
587 _subscribers.erase(
q);
599 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
idempotent TopicLink * getLinkProxy()
Retrieve a proxy to the TopicLink interface.
virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS &, const Ice::ObjectPrx &, const Ice::Current &)
void publish(bool, const EventDataSeq &)
virtual Ice::ObjectPrx getPublisher(const Ice::Current &) const
virtual std::string getName(const Ice::Current &) const
virtual void link(const TopicPrx &, Ice::Int, const Ice::Current &)
virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current &) const
virtual void subscribe(const QoS &, const Ice::ObjectPrx &, const Ice::Current &)
virtual void unlink(const TopicPrx &, const Ice::Current &)
virtual void destroy(const Ice::Current &)
virtual void reap(const Ice::IdentitySeq &, const Ice::Current &)
virtual Ice::IdentitySeq getSubscribers(const Ice::Current &) const
virtual void unsubscribe(const Ice::ObjectPrx &, const Ice::Current &)
TransientTopicImpl(const InstancePtr &, const std::string &, const Ice::Identity &)
virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current &) const
std::string identityToTopicName(const Ice::Identity &)
std::string describeEndpoints(const Ice::ObjectPrx &)
IceUtil::Handle< Subscriber > SubscriberPtr
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
std::deque<::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
IceUtil::Handle< TraceLevels > TraceLevelsPtr
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicInternal > TopicInternalPrx
::IceUtil::Handle<::IceStorm::EventData > EventDataPtr
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicLink > TopicLinkPrx
IceUtil::Handle< Instance > InstancePtr
IceUtil::Handle< TransientTopicImpl > TransientTopicImplPtr
double v(double t, double v0, double a0, double j)
Used to store persistent information for persistent subscribers.
::IceStorm::TopicPrx theTopic