TransientTopicI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <algorithm>
11 #include <list>
12 
13 #include <Ice/Ice.h>
14 #include <IceStorm/Instance.h>
15 #include <IceStorm/Subscriber.h>
16 #include <IceStorm/TraceLevels.h>
18 #include <IceStorm/Util.h>
19 
20 using namespace IceStorm;
21 using namespace std;
22 
23 namespace
24 {
25 
26  //
27  // The servant has a 1-1 association with a topic. It is used to
28  // receive events from Publishers.
29  //
30  class TransientPublisherI : public Ice::BlobjectArray
31  {
32  public:
33  TransientPublisherI(const TransientTopicImplPtr& impl) : _impl(impl)
34  {
35  }
36 
37  virtual bool
38  ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
39  Ice::ByteSeq&,
40  const Ice::Current& current)
41  {
42  // Use cached reads.
43  EventDataPtr event =
44  new EventData(current.operation, current.mode, Ice::ByteSeq(), current.ctx);
45 
46  //
47  // COMPILERBUG: gcc 4.0.1 doesn't like this.
48  //
49  //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
50  Ice::ByteSeq data(inParams.first, inParams.second);
51  event->data.swap(data);
52 
54  v.push_back(event);
55  _impl->publish(false, v);
56 
57  return true;
58  }
59 
60  private:
61  const TransientTopicImplPtr _impl;
62  };
63 
64  //
65  // The servant has a 1-1 association with a topic. It is used to
66  // receive events from linked Topics.
67  //
68  class TransientTopicLinkI : public TopicLink
69  {
70  public:
71  TransientTopicLinkI(const TransientTopicImplPtr& impl) : _impl(impl)
72  {
73  }
74 
75  virtual void
76  forward(const EventDataSeq& v, const Ice::Current& /*current*/)
77  {
78  _impl->publish(true, v);
79  }
80 
81  private:
82  const TransientTopicImplPtr _impl;
83  };
84 
85 } // namespace
86 
88  const string& name,
89  const Ice::Identity& id) :
90  _instance(instance), _name(name), _id(id), _destroyed(false)
91 {
92  //
93  // Create a servant per topic to receive event data. If the
94  // category is empty then we are in backwards compatibility
95  // mode. In this case the servant's identity is
96  // category=<topicname>, name=publish, otherwise the name is
97  // <instancename>/<topicname>.publish. The same applies to the
98  // link proxy.
99  //
100  // Activate the object and save a reference to give to publishers.
101  //
102  Ice::Identity pubid;
103  Ice::Identity linkid;
104  if (id.category.empty())
105  {
106  pubid.category = _name;
107  pubid.name = "publish";
108  linkid.category = _name;
109  linkid.name = "link";
110  }
111  else
112  {
113  pubid.category = id.category;
114  pubid.name = _name + ".publish";
115  linkid.category = id.category;
116  linkid.name = _name + ".link";
117  }
118 
119  _publisherPrx = _instance->publishAdapter()->add(new TransientPublisherI(this), pubid);
120  _linkPrx = TopicLinkPrx::uncheckedCast(
121  _instance->publishAdapter()->add(new TransientTopicLinkI(this), linkid));
122 }
123 
125 {
126 }
127 
128 string
129 TransientTopicImpl::getName(const Ice::Current&) const
130 {
131  // Immutable
132  return _name;
133 }
134 
135 Ice::ObjectPrx
136 TransientTopicImpl::getPublisher(const Ice::Current&) const
137 {
138  // Immutable
139  return _publisherPrx;
140 }
141 
142 Ice::ObjectPrx
144 {
145  // Immutable
146  return _publisherPrx;
147 }
148 
149 void
150 TransientTopicImpl::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
151 {
152  if (!obj)
153  {
154  TraceLevelsPtr traceLevels = _instance->traceLevels();
155  if (traceLevels->topic > 0)
156  {
157  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
158  out << _name << ": subscribe: null proxy";
159  }
160  throw InvalidSubscriber("subscriber is a null proxy");
161  }
162  Ice::Identity id = obj->ice_getIdentity();
163  TraceLevelsPtr traceLevels = _instance->traceLevels();
164  QoS qos = origQoS;
165  if (traceLevels->topic > 0)
166  {
167  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
168  out << _name << ": subscribe: " << _instance->communicator()->identityToString(id);
169 
170  if (traceLevels->topic > 1)
171  {
172  out << " endpoints: " << IceStormInternal::describeEndpoints(obj) << " QoS: ";
173  for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
174  {
175  if (p != qos.begin())
176  {
177  out << ',';
178  }
179  out << '[' << p->first << "," << p->second << ']';
180  }
181  }
182  }
183 
184  string reliability = "oneway";
185  {
186  QoS::iterator p = qos.find("reliability");
187  if (p != qos.end())
188  {
189  reliability = p->second;
190  qos.erase(p);
191  }
192  }
193 
194  Ice::ObjectPrx newObj = obj;
195  if (reliability == "batch")
196  {
197  if (newObj->ice_isDatagram())
198  {
199  newObj = newObj->ice_batchDatagram();
200  }
201  else
202  {
203  newObj = newObj->ice_batchOneway();
204  }
205  }
206  else if (reliability == "twoway")
207  {
208  newObj = newObj->ice_twoway();
209  }
210  else if (reliability == "twoway ordered")
211  {
212  qos["reliability"] = "ordered";
213  newObj = newObj->ice_twoway();
214  }
215  else // reliability == "oneway"
216  {
217  if (reliability != "oneway" && traceLevels->subscriber > 0)
218  {
219  Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
220  out << reliability << " mode not understood.";
221  }
222  if (!newObj->ice_isDatagram())
223  {
224  newObj = newObj->ice_oneway();
225  }
226  }
227 
228  Lock sync(*this);
229  SubscriberRecord record;
230  record.id = id;
231  record.obj = newObj;
232  record.theQoS = qos;
233  record.topicName = _name;
234  record.link = false;
235  record.cost = 0;
236 
237  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
238  if (p != _subscribers.end())
239  {
240  // If we already have this subscriber remove it from our
241  // subscriber list and remove it from the database.
242  (*p)->destroy();
243  _subscribers.erase(p);
244  }
245 
246  SubscriberPtr subscriber = Subscriber::create(_instance, record);
247  _subscribers.push_back(subscriber);
248 }
249 
250 Ice::ObjectPrx
252  const Ice::ObjectPrx& obj,
253  const Ice::Current&)
254 {
255  if (!obj)
256  {
257  TraceLevelsPtr traceLevels = _instance->traceLevels();
258  if (traceLevels->topic > 0)
259  {
260  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
261  out << _name << ": subscribe: null proxy";
262  }
263  throw InvalidSubscriber("subscriber is a null proxy");
264  }
265  Ice::Identity id = obj->ice_getIdentity();
266 
267  TraceLevelsPtr traceLevels = _instance->traceLevels();
268  if (traceLevels->topic > 0)
269  {
270  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
271  out << _name
272  << ": subscribeAndGetPublisher: " << _instance->communicator()->identityToString(id);
273 
274  if (traceLevels->topic > 1)
275  {
276  out << " endpoints: " << IceStormInternal::describeEndpoints(obj) << " QoS: ";
277  for (QoS::const_iterator p = qos.begin(); p != qos.end(); ++p)
278  {
279  if (p != qos.begin())
280  {
281  out << ',';
282  }
283  }
284  }
285  }
286 
287  Lock sync(*this);
288 
289  SubscriberRecord record;
290  record.id = id;
291  record.obj = obj;
292  record.theQoS = qos;
293  record.topicName = _name;
294  record.link = false;
295  record.cost = 0;
296 
297  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
298  if (p != _subscribers.end())
299  {
300  throw AlreadySubscribed();
301  }
302 
303  SubscriberPtr subscriber = Subscriber::create(_instance, record);
304  _subscribers.push_back(subscriber);
305 
306  return subscriber->proxy();
307 }
308 
309 void
310 TransientTopicImpl::unsubscribe(const Ice::ObjectPrx& subscriber, const Ice::Current&)
311 {
312  TraceLevelsPtr traceLevels = _instance->traceLevels();
313  if (!subscriber)
314  {
315  if (traceLevels->topic > 0)
316  {
317  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
318  out << _name << ": unsubscribe: null proxy";
319  }
320  throw InvalidSubscriber("subscriber is a null proxy");
321  }
322 
323  Ice::Identity id = subscriber->ice_getIdentity();
324 
325  if (traceLevels->topic > 0)
326  {
327  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
328  out << _name << ": unsubscribe: " << _instance->communicator()->identityToString(id);
329  if (traceLevels->topic > 1)
330  {
331  out << " endpoints: " << IceStormInternal::describeEndpoints(subscriber);
332  }
333  }
334 
335  Lock sync(*this);
336  // First remove the subscriber from the subscribers list. Note
337  // that its possible that the subscriber isn't in the list, but is
338  // in the database if the subscriber was locally reaped.
339  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
340  if (p != _subscribers.end())
341  {
342  (*p)->destroy();
343  _subscribers.erase(p);
344  }
345 }
346 
349 {
350  // immutable
351  return _linkPrx;
352 }
353 
354 void
355 TransientTopicImpl::link(const TopicPrx& topic, Ice::Int cost, const Ice::Current&)
356 {
357  TopicInternalPrx internal = TopicInternalPrx::uncheckedCast(topic);
358  TopicLinkPrx link = internal->getLinkProxy();
359 
360  TraceLevelsPtr traceLevels = _instance->traceLevels();
361  if (traceLevels->topic > 0)
362  {
363  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
364  out << _name << ": link "
365  << _instance->communicator()->identityToString(topic->ice_getIdentity()) << " cost "
366  << cost;
367  }
368 
369  Lock sync(*this);
370 
371  Ice::Identity id = topic->ice_getIdentity();
372 
373  SubscriberRecord record;
374  record.id = id;
375  record.obj = link;
376  record.theTopic = topic;
377  record.topicName = _name;
378  record.link = true;
379  record.cost = cost;
380 
381  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), record.id);
382  if (p != _subscribers.end())
383  {
384  string name = IceStormInternal::identityToTopicName(id);
385  LinkExists ex;
386  ex.name = name;
387  throw ex;
388  }
389 
390  SubscriberPtr subscriber = Subscriber::create(_instance, record);
391  _subscribers.push_back(subscriber);
392 }
393 
394 void
395 TransientTopicImpl::unlink(const TopicPrx& topic, const Ice::Current&)
396 {
397  Lock sync(*this);
398  if (_destroyed)
399  {
400  throw Ice::ObjectNotExistException(__FILE__, __LINE__);
401  }
402 
403  Ice::Identity id = topic->ice_getIdentity();
404 
405  vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
406  if (p == _subscribers.end())
407  {
408  string name = IceStormInternal::identityToTopicName(id);
409  TraceLevelsPtr traceLevels = _instance->traceLevels();
410  if (traceLevels->topic > 0)
411  {
412  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
413  out << _name << ": unlink " << name << " failed - not linked";
414  }
415 
416  NoSuchLink ex;
417  ex.name = name;
418  throw ex;
419  }
420 
421  TraceLevelsPtr traceLevels = _instance->traceLevels();
422  if (traceLevels->topic > 0)
423  {
424  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
425  out << _name << " unlink " << _instance->communicator()->identityToString(id);
426  }
427 
428  // Remove the subscriber from the subscribers list. Note
429  // that its possible that the subscriber isn't in the list, but is
430  // in the database if the subscriber was locally reaped.
431  p = find(_subscribers.begin(), _subscribers.end(), id);
432  if (p != _subscribers.end())
433  {
434  (*p)->destroy();
435  _subscribers.erase(p);
436  }
437 }
438 
439 LinkInfoSeq
440 TransientTopicImpl::getLinkInfoSeq(const Ice::Current&) const
441 {
442  Lock sync(*this);
443  LinkInfoSeq seq;
444  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
445  ++p)
446  {
447  SubscriberRecord record = (*p)->record();
448  if (record.link && !(*p)->errored())
449  {
450  LinkInfo info;
451  info.name = IceStormInternal::identityToTopicName(record.theTopic->ice_getIdentity());
452  info.cost = record.cost;
453  info.theTopic = record.theTopic;
454  seq.push_back(info);
455  }
456  }
457  return seq;
458 }
459 
460 Ice::IdentitySeq
461 TransientTopicImpl::getSubscribers(const Ice::Current&) const
462 {
463  IceUtil::Mutex::Lock sync(*this);
464 
465  Ice::IdentitySeq subscribers;
466  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
467  ++p)
468  {
469  subscribers.push_back((*p)->id());
470  }
471  return subscribers;
472 }
473 
474 void
475 TransientTopicImpl::destroy(const Ice::Current&)
476 {
477  Lock sync(*this);
478 
479  if (_destroyed)
480  {
481  throw Ice::ObjectNotExistException(__FILE__, __LINE__);
482  }
483  _destroyed = true;
484 
485  TraceLevelsPtr traceLevels = _instance->traceLevels();
486  if (traceLevels->topic > 0)
487  {
488  Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
489  out << _name << ": destroy";
490  }
491 
492  try
493  {
494  _instance->publishAdapter()->remove(_linkPrx->ice_getIdentity());
495  _instance->publishAdapter()->remove(_publisherPrx->ice_getIdentity());
496  }
497  catch (const Ice::ObjectAdapterDeactivatedException&)
498  {
499  // Ignore -- this could occur on shutdown.
500  }
501 
502  // Destroy all of the subscribers.
503  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
504  ++p)
505  {
506  (*p)->destroy();
507  }
508  _subscribers.clear();
509 }
510 
511 void
512 TransientTopicImpl::reap(const Ice::IdentitySeq&, const Ice::Current&)
513 {
514 }
515 
516 bool
518 {
519  Lock sync(*this);
520  return _destroyed;
521 }
522 
525 {
526  // immutable
527  return _id;
528 }
529 
530 void
531 TransientTopicImpl::publish(bool forwarded, const EventDataSeq& events)
532 {
533  //
534  // Copy of the subscriber list so that event publishing can occur
535  // in parallel.
536  //
537  vector<SubscriberPtr> copy;
538  {
539  Lock sync(*this);
540  copy = _subscribers;
541  }
542 
543  //
544  // Queue each event, gathering a list of those subscribers that
545  // must be reaped.
546  //
547  vector<Ice::Identity> e;
548  for (vector<SubscriberPtr>::const_iterator p = copy.begin(); p != copy.end(); ++p)
549  {
550  if (!(*p)->queue(forwarded, events) && (*p)->reap())
551  {
552  e.push_back((*p)->id());
553  }
554  }
555 
556  //
557  // Run through the error list removing those subscribers that are
558  // in error from the subscriber list.
559  //
560  if (!e.empty())
561  {
562  Lock sync(*this);
563  for (vector<Ice::Identity>::const_iterator ep = e.begin(); ep != e.end(); ++ep)
564  {
565  //
566  // Its possible for the subscriber to already have been
567  // removed since the copy is iterated over outside of
568  // mutex protection.
569  //
570  // Note that although this could be quicker if we used a
571  // map, the most optimal case should be pushing around
572  // events not searching for a particular subscriber.
573  //
574  // The subscriber is immediately destroyed & removed from
575  // the _subscribers list. Add the subscriber to a list of
576  // error'd subscribers and remove it from the database on
577  // the next reap.
578  //
579  vector<SubscriberPtr>::iterator q = find(_subscribers.begin(), _subscribers.end(), *ep);
580  if (q != _subscribers.end())
581  {
582  SubscriberPtr subscriber = *q;
583  //
584  // Destroy the subscriber.
585  //
586  subscriber->destroy();
587  _subscribers.erase(q);
588  }
589  }
590  }
591 }
592 
593 void
595 {
596  Lock sync(*this);
597 
598  // Shutdown each subscriber. This waits for the event queues to drain.
599  for (vector<SubscriberPtr>::const_iterator p = _subscribers.begin(); p != _subscribers.end();
600  ++p)
601  {
602  (*p)->shutdown();
603  }
604 }
IceStorm::TransientTopicImpl::getSubscribers
virtual Ice::IdentitySeq getSubscribers(const Ice::Current &) const
Definition: TransientTopicI.cpp:461
IceStorm
Definition: DBTypes.ice:22
IceStorm::TransientTopicImpl::getPublisher
virtual Ice::ObjectPrx getPublisher(const Ice::Current &) const
Definition: TransientTopicI.cpp:136
IceStorm::TransientTopicImpl::TransientTopicImpl
TransientTopicImpl(const InstancePtr &, const std::string &, const Ice::Identity &)
Definition: TransientTopicI.cpp:87
IceStorm::EventDataSeq
std::deque<::IceStorm::EventDataPtr > EventDataSeq
A sequence of EventData.
Definition: IceStormInternal.h:528
IceStorm::TransientTopicImpl::unlink
virtual void unlink(const TopicPrx &, const Ice::Current &)
Definition: TransientTopicI.cpp:395
IceStorm::TransientTopicImpl::unsubscribe
virtual void unsubscribe(const Ice::ObjectPrx &, const Ice::Current &)
Definition: TransientTopicI.cpp:310
IceStorm::TopicInternal::getLinkProxy
idempotent TopicLink * getLinkProxy()
Retrieve a proxy to the TopicLink interface.
IceStorm::TransientTopicImpl::getName
virtual std::string getName(const Ice::Current &) const
Definition: TransientTopicI.cpp:129
IceStorm::Subscriber::create
static SubscriberPtr create(const InstancePtr &, const IceStorm::SubscriberRecord &)
Definition: Subscriber.cpp:522
IceStorm::TransientTopicImpl::subscribe
virtual void subscribe(const QoS &, const Ice::ObjectPrx &, const Ice::Current &)
Definition: TransientTopicI.cpp:150
Util.h
IceStorm::SubscriberRecord::theQoS
::IceStorm::QoS theQoS
Definition: SubscriberRecord.h:239
IceStorm::SubscriberRecord
Used to store persistent information for persistent subscribers.
Definition: SubscriberRecord.h:233
IceStorm::TransientTopicImpl::reap
virtual void reap(const Ice::IdentitySeq &, const Ice::Current &)
Definition: TransientTopicI.cpp:512
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:570
copy
Use of this software is granted under one of the following two to be chosen freely by the user Boost Software License Version Marcin Kalicinski Permission is hereby free of to any person or organization obtaining a copy of the software and accompanying documentation covered by this and transmit the and to prepare derivative works of the and to permit third parties to whom the Software is furnished to do all subject to the including the above license this restriction and the following must be included in all copies of the in whole or in and all derivative works of the unless such copies or derivative works are solely in the form of machine executable object code generated by a source language processor THE SOFTWARE IS PROVIDED AS WITHOUT WARRANTY OF ANY EXPRESS OR INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF FITNESS FOR A PARTICULAR TITLE AND NON INFRINGEMENT IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER WHETHER IN TORT OR ARISING OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE The MIT Marcin Kalicinski Permission is hereby free of to any person obtaining a copy of this software and associated documentation to deal in the Software without including without limitation the rights to copy
Definition: license.txt:39
TransientTopicI.h
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
IceStorm::TransientTopicImpl::id
Ice::Identity id() const
Definition: TransientTopicI.cpp:524
IceStorm::TransientTopicImpl::subscribeAndGetPublisher
virtual Ice::ObjectPrx subscribeAndGetPublisher(const QoS &, const Ice::ObjectPrx &, const Ice::Current &)
Definition: TransientTopicI.cpp:251
IceStorm::TransientTopicImpl::~TransientTopicImpl
~TransientTopicImpl()
Definition: TransientTopicI.cpp:124
IceStorm::EventData
The event data.
Definition: IceStormInternal.h:494
IceStorm::SubscriberRecord::obj
::Ice::ObjectPrx obj
Definition: SubscriberRecord.h:238
IceStorm::TransientTopicImpl::publish
void publish(bool, const EventDataSeq &)
Definition: TransientTopicI.cpp:531
Subscriber.h
IceStorm::TransientTopicImpl::destroyed
bool destroyed() const
Definition: TransientTopicI.cpp:517
q
#define q
IceStormInternal::describeEndpoints
std::string describeEndpoints(const Ice::ObjectPrx &)
Definition: Util.cpp:51
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:478
IceStorm::TransientTopicImpl::shutdown
void shutdown()
Definition: TransientTopicI.cpp:594
IceStorm::TransientTopicImpl::getLinkInfoSeq
virtual LinkInfoSeq getLinkInfoSeq(const Ice::Current &) const
Definition: TransientTopicI.cpp:440
armarx::ctrlutil::v
double v(double t, double v0, double a0, double j)
Definition: CtrlUtil.h:39
IceStorm::SubscriberRecord::cost
::Ice::Int cost
Definition: SubscriberRecord.h:240
IceStorm::SubscriberRecord::theTopic
::IceStorm::TopicPrx theTopic
Definition: SubscriberRecord.h:241
TraceLevels.h
IceStormInternal::identityToTopicName
std::string identityToTopicName(const Ice::Identity &)
Definition: Util.cpp:23
std
Definition: Application.h:66
IceUtil::Handle
Definition: forward_declarations.h:30
IceInternal::ProxyHandle<::IceProxy::IceStorm::TopicLink >
IceStorm::TransientTopicImpl::link
virtual void link(const TopicPrx &, Ice::Int, const Ice::Current &)
Definition: TransientTopicI.cpp:355
armarx::VariantType::Int
const VariantTypeId Int
Definition: Variant.h:917
IceStorm::SubscriberRecord::id
::Ice::Identity id
Definition: SubscriberRecord.h:236
IceStorm::TransientTopicImpl::getNonReplicatedPublisher
virtual Ice::ObjectPrx getNonReplicatedPublisher(const Ice::Current &) const
Definition: TransientTopicI.cpp:143
IceStorm::SubscriberRecord::link
bool link
Definition: SubscriberRecord.h:237
IceStorm::TransientTopicImpl::destroy
virtual void destroy(const Ice::Current &)
Definition: TransientTopicI.cpp:475
Instance.h
IceStorm::SubscriberRecord::topicName
::std::string topicName
Definition: SubscriberRecord.h:235