TransientTopicI.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <algorithm>
11#include <list>
12
13#include <Ice/Ice.h>
14#include <IceStorm/Instance.h>
15#include <IceStorm/Subscriber.h>
18#include <IceStorm/Util.h>
19
20using namespace IceStorm;
21using namespace std;
22
23namespace
24{
25
26 //
27 // The servant has a 1-1 association with a topic. It is used to
28 // receive events from Publishers.
29 //
30 class TransientPublisherI : public Ice::BlobjectArray
31 {
32 public:
33 TransientPublisherI(const TransientTopicImplPtr& impl) : _impl(impl)
34 {
35 }
36
37 virtual bool
38 ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
39 Ice::ByteSeq&,
40 const Ice::Current& current)
41 {
42 // Use cached reads.
43 EventDataPtr event =
44 new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
45
46 //
47 // COMPILERBUG: gcc 4.0.1 doesn't like this.
48 //
49 //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
50 Ice::ByteSeq data(inParams.first, inParams.second);
51 event->data.swap(data);
52
54 v.push_back(event);
55 _impl->publish(false, v);
56
57 return true;
58 }
59
60 private:
61 const TransientTopicImplPtr _impl;
62 };
63
64 //
65 // The servant has a 1-1 association with a topic. It is used to
66 // receive events from linked Topics.
67 //
68 class TransientTopicLinkI : public TopicLink
69 {
70 public:
71 TransientTopicLinkI(const TransientTopicImplPtr& impl) : _impl(impl)
72 {
73 }
74
75 virtual void
76 forward(const EventDataSeq& v, const Ice::Current& /*current*/)
77 {
78 _impl->publish(true, v);
79 }
80
81 private:
82 const TransientTopicImplPtr _impl;
83 };
84
85} // namespace
86
88 const string& name,
89 const Ice::Identity& id) :
90 _instance(instance), _name(name), _id(id), _destroyed(false)
91{
92 //
93 // Create a servant per topic to receive event data. If the
94 // category is empty then we are in backwards compatibility
95 // mode. In this case the servant's identity is
96 // category=<topicname>, name=publish, otherwise the name is
97 // <instancename>/<topicname>.publish. The same applies to the
98 // link proxy.
99 //
100 // Activate the object and save a reference to give to publishers.
101 //
102 Ice::Identity pubid;
103 Ice::Identity linkid;
104 if (id.category.empty())
105 {
106 pubid.category = _name;
107 pubid.name = "publish";
108 linkid.category = _name;
109 linkid.name = "link";
110 }
111 else
112 {
113 pubid.category = id.category;
114 pubid.name = _name + ".publish";
115 linkid.category = id.category;
116 linkid.name = _name + ".link";
117 }
118
119 _publisherPrx = _instance->publishAdapter()->add(new TransientPublisherI(this), pubid);
120 _linkPrx = TopicLinkPrx::uncheckedCast(
121 _instance->publishAdapter()->add(new TransientTopicLinkI(this), linkid));
122}
123
127
128string
129TransientTopicImpl::getName(const Ice::Current&) const
130{
131 // Immutable
132 return _name;
133}
134
135Ice::ObjectPrx
136TransientTopicImpl::getPublisher(const Ice::Current&) const
137{
138 // Immutable
139 return _publisherPrx;
140}
141
142Ice::ObjectPrx
144{
145 // Immutable
146 return _publisherPrx;
147}
148
149void
150TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
151{
152 if (!obj)
153 {
154 TraceLevelsPtr traceLevels = _instance->traceLevels();
155 if (traceLevels->topic > 0)
156 {
157 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
158 out << _name << ": subscribe: null proxy";
159 }
160 throw InvalidSubscriber("subscriber is a null proxy");
161 }
162 Ice::Identity id = obj->ice_getIdentity();
163 TraceLevelsPtr traceLevels = _instance->traceLevels();
164 QoS qos = origQoS;
165 if (traceLevels->topic > 0)
166 {
167 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
168 out << _name << ": subscribe: " << _instance->communicator()->identityToString(id);
169
170 if (traceLevels->topic > 1)
171 {
172 out << " endpoints: " << IceStormInternal::describeEndpoints(obj) << " QoS: ";
173 for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
174 {
175 if (p != qos.begin())
176 {
177 out << ',';
178 }
179 out << '[' << p->first << "," << p->second << ']';
180 }
181 }
182 }
183
184 string reliability = "oneway";
185 {
186 QoS::iterator p = qos.find("reliability");
187 if (p != qos.end())
188 {
189 reliability = p->second;
190 qos.erase(p);
191 }
192 }
193
194 Ice::ObjectPrx newObj = obj;
195 if (reliability == "batch")
196 {
197 if (newObj->ice_isDatagram())
198 {
199 newObj = newObj->ice_batchDatagram();
200 }
201 else
202 {
203 newObj = newObj->ice_batchOneway();
204 }
205 }
206 else if (reliability == "twoway")
207 {
208 newObj = newObj->ice_twoway();
209 }
210 else if (reliability == "twoway ordered")
211 {
212 qos["reliability"] = "ordered";
213 newObj = newObj->ice_twoway();
214 }
215 else // reliability == "oneway"
216 {
217 if (reliability != "oneway" && traceLevels->subscriber > 0)
218 {
219 Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
220 out << reliability << " mode not understood.";
221 }
222 if (!newObj->ice_isDatagram())
223 {
224 newObj = newObj->ice_oneway();
225 }
226 }
227
228 Lock sync(*this);
229 SubscriberRecord record;
230 record.id = id;
231 record.obj = newObj;
232 record.theQoS = qos;
233 record.topicName = _name;
234 record.link = false;
235 record.cost = 0;
236
237 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
238 if (p != _subscribers.end())
239 {
240 // If we already have this subscriber remove it from our
241 // subscriber list and remove it from the database.
242 (*p)->destroy();
243 _subscribers.erase(p);
244 }
245
246 SubscriberPtr subscriber = Subscriber::create(_instance, record);
247 _subscribers.push_back(subscriber);
248}
249
250Ice::ObjectPrx
252 const Ice::ObjectPrx& obj,
253 const Ice::Current&)
254{
255 if (!obj)
256 {
257 TraceLevelsPtr traceLevels = _instance->traceLevels();
258 if (traceLevels->topic > 0)
259 {
260 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
261 out << _name << ": subscribe: null proxy";
262 }
263 throw InvalidSubscriber("subscriber is a null proxy");
264 }
265 Ice::Identity id = obj->ice_getIdentity();
266
267 TraceLevelsPtr traceLevels = _instance->traceLevels();
268 if (traceLevels->topic > 0)
269 {
270 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
271 out << _name
272 << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
273
274 if (traceLevels->topic > 1)
275 {
276 out << " endpoints: " << IceStormInternal::describeEndpoints(obj) << " QoS: ";
277 for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
278 {
279 if (p != qos.begin())
280 {
281 out << ',';
282 }
283 }
284 }
285 }
286
287 Lock sync(*this);
288
289 SubscriberRecord record;
290 record.id = id;
291 record.obj = obj;
292 record.theQoS = qos;
293 record.topicName = _name;
294 record.link = false;
295 record.cost = 0;
296
297 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
298 if (p != _subscribers.end())
299 {
300 throw AlreadySubscribed();
301 }
302
303 SubscriberPtr subscriber = Subscriber::create(_instance, record);
304 _subscribers.push_back(subscriber);
305
306 return subscriber->proxy();
307}
308
309void
310TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
311{
312 TraceLevelsPtr traceLevels = _instance->traceLevels();
313 if (!subscriber)
314 {
315 if (traceLevels->topic > 0)
316 {
317 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
318 out << _name << ": unsubscribe: null proxy";
319 }
320 throw InvalidSubscriber("subscriber is a null proxy");
321 }
322
323 Ice::Identity id = subscriber->ice_getIdentity();
324
325 if (traceLevels->topic > 0)
326 {
327 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
328 out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
329 if (traceLevels->topic > 1)
330 {
331 out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
332 }
333 }
334
335 Lock sync(*this);
336 // First remove the subscriber from the subscribers list. Note
337 // that its possible that the subscriber isn't in the list, but is
338 // in the database if the subscriber was locally reaped.
339 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
340 if (p != _subscribers.end())
341 {
342 (*p)->destroy();
343 _subscribers.erase(p);
344 }
345}
346
349{
350 // immutable
351 return _linkPrx;
352}
353
354void
355TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
356{
357 TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
358 TopicLinkPrx link = internal->getLinkProxy();
359
360 TraceLevelsPtr traceLevels = _instance->traceLevels();
361 if (traceLevels->topic > 0)
362 {
363 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
364 out << _name << ": link "
365 << _instance->communicator()->identityToString(topic->ice_getIdentity()) << " cost "
366 << cost;
367 }
368
369 Lock sync(*this);
370
371 Ice::Identity id = topic->ice_getIdentity();
372
373 SubscriberRecord record;
374 record.id = id;
375 record.obj = link;
376 record.theTopic = topic;
377 record.topicName = _name;
378 record.link = true;
379 record.cost = cost;
380
381 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
382 if (p != _subscribers.end())
383 {
385 LinkExists ex;
386 ex.name = name;
387 throw ex;
388 }
389
390 SubscriberPtr subscriber = Subscriber::create(_instance, record);
391 _subscribers.push_back(subscriber);
392}
393
394void
395TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
396{
397 Lock sync(*this);
398 if (_destroyed)
399 {
400 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
401 }
402
403 Ice::Identity id = topic->ice_getIdentity();
404
405 vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
406 if (p == _subscribers.end())
407 {
409 TraceLevelsPtr traceLevels = _instance->traceLevels();
410 if (traceLevels->topic > 0)
411 {
412 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
413 out << _name << ": unlink " << name << " failed - not linked";
414 }
415
416 NoSuchLink ex;
417 ex.name = name;
418 throw ex;
419 }
420
421 TraceLevelsPtr traceLevels = _instance->traceLevels();
422 if (traceLevels->topic > 0)
423 {
424 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
425 out << _name << " unlink " << _instance->communicator()->identityToString(id);
426 }
427
428 // Remove the subscriber from the subscribers list. Note
429 // that its possible that the subscriber isn't in the list, but is
430 // in the database if the subscriber was locally reaped.
431 p = find(_subscribers.begin(), _subscribers.end(), id);
432 if (p != _subscribers.end())
433 {
434 (*p)->destroy();
435 _subscribers.erase(p);
436 }
437}
438
439LinkInfoSeq
440TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const
441{
442 Lock sync(*this);
443 LinkInfoSeq seq;
444 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
445 ++p)
446 {
447 SubscriberRecord record = (*p)->record();
448 if (record.link && !(*p)->errored())
449 {
450 LinkInfo info;
451 info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
452 info.cost = record.cost;
453 info.theTopic = record.theTopic;
454 seq.push_back(info);
455 }
456 }
457 return seq;
458}
459
460Ice::IdentitySeq
461TransientTopicImpl::getSubscribers(const Ice::Current&) const
462{
463 IceUtil::Mutex::Lock sync(*this);
464
465 Ice::IdentitySeq subscribers;
466 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
467 ++p)
468 {
469 subscribers.push_back((*p)->id());
470 }
471 return subscribers;
472}
473
474void
475TransientTopicImpl::destroy(const Ice::Current&)
476{
477 Lock sync(*this);
478
479 if (_destroyed)
480 {
481 throw Ice::ObjectNotExistException(__FILE__, __LINE__);
482 }
483 _destroyed = true;
484
485 TraceLevelsPtr traceLevels = _instance->traceLevels();
486 if (traceLevels->topic > 0)
487 {
488 Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
489 out << _name << ": destroy";
490 }
491
492 try
493 {
494 _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
495 _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
496 }
497 catch (const Ice::ObjectAdapterDeactivatedException&)
498 {
499 // Ignore -- this could occur on shutdown.
500 }
501
502 // Destroy all of the subscribers.
503 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
504 ++p)
505 {
506 (*p)->destroy();
507 }
508 _subscribers.clear();
509}
510
511void
512TransientTopicImpl::reap(const Ice::IdentitySeq&, const Ice::Current&)
513{
514}
515
516bool
518{
519 Lock sync(*this);
520 return _destroyed;
521}
522
523Ice::Identity
525{
526 // immutable
527 return _id;
528}
529
530void
531TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
532{
533 //
534 // Copy of the subscriber list so that event publishing can occur
535 // in parallel.
536 //
537 vector<SubscriberPtr> copy;
538 {
539 Lock sync(*this);
540 copy = _subscribers;
541 }
542
543 //
544 // Queue each event, gathering a list of those subscribers that
545 // must be reaped.
546 //
547 vector<Ice::Identity> e;
548 for (vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
549 {
550 if (!(*p)->queue(forwarded, events) && (*p)->reap())
551 {
552 e.push_back((*p)->id());
553 }
554 }
555
556 //
557 // Run through the error list removing those subscribers that are
558 // in error from the subscriber list.
559 //
560 if (!e.empty())
561 {
562 Lock sync(*this);
563 for (vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
564 {
565 //
566 // Its possible for the subscriber to already have been
567 // removed since the copy is iterated over outside of
568 // mutex protection.
569 //
570 // Note that although this could be quicker if we used a
571 // map, the most optimal case should be pushing around
572 // events not searching for a particular subscriber.
573 //
574 // The subscriber is immediately destroyed & removed from
575 // the _subscribers list. Add the subscriber to a list of
576 // error'd subscribers and remove it from the database on
577 // the next reap.
578 //
579 vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
580 if (q != _subscribers.end())
581 {
582 SubscriberPtr subscriber = *q;
583 //
584 // Destroy the subscriber.
585 //
586 subscriber->destroy();
587 _subscribers.erase(q);
588 }
589 }
590 }
591}
592
593void
595{
596 Lock sync(*this);
597
598 // Shutdown each subscriber. This waits for the event queues to drain.
599 for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
600 ++p)
601 {
602 (*p)->shutdown();
603 }
604}
uint8_t data[1]
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
idempotent TopicLink * getLinkProxy()
Retrieve a proxy to the TopicLink interface.
virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS &, const Ice::ObjectPrx &, const Ice::Current &)
void publish(bool, const EventDataSeq &)
virtual Ice::ObjectPrx getPublisher(const Ice::Current &) const
virtual std::string getName(const Ice::Current &) const
virtual void link(const TopicPrx &, Ice::Int, const Ice::Current &)
virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current &) const
virtual void subscribe(const QoS &, const Ice::ObjectPrx &, const Ice::Current &)
virtual void unlink(const TopicPrx &, const Ice::Current &)
virtual void destroy(const Ice::Current &)
virtual void reap(const Ice::IdentitySeq &, const Ice::Current &)
virtual Ice::IdentitySeq getSubscribers(const Ice::Current &) const
virtual void unsubscribe(const Ice::ObjectPrx &, const Ice::Current &)
TransientTopicImpl(const InstancePtr &, const std::string &, const Ice::Identity &)
virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current &) const
#define q
std::string identityToTopicName(const Ice::Identity &)
Definition Util.cpp:23
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition Util.cpp:51
IceUtil::Handle< Subscriber > SubscriberPtr
Definition Subscriber.h:26
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
Definition IceManager.h:70
std::deque<::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
IceUtil::Handle< TraceLevels > TraceLevelsPtr
Definition Instance.h:44
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicInternal > TopicInternalPrx
::IceUtil::Handle<::IceStorm::EventData > EventDataPtr
::IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicLink > TopicLinkPrx
IceUtil::Handle< Instance > InstancePtr
Definition Instance.h:128
IceUtil::Handle< TransientTopicImpl > TransientTopicImplPtr
double v(double t, double v0, double a0, double j)
Definition CtrlUtil.h:39
Used to store persistent information for persistent subscribers.
::IceStorm::TopicPrx theTopic