TransientTopicManagerI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
12 #include <IceStorm/TraceLevels.h>
13 #include <IceStorm/Instance.h>
14 #include <IceStorm/Subscriber.h>
15 
16 #include <Ice/Ice.h>
17 
18 #include <Ice/SliceChecksums.h>
19 
20 #include <functional>
21 
22 using namespace IceStorm;
23 using namespace std;
24 
26  _instance(instance)
27 {
28 }
29 
31 {
32 }
33 
35 TransientTopicManagerImpl::create(const string& name, const Ice::Current&)
36 {
37  Lock sync(*this);
38 
39  reap();
40 
41  if (_topics.find(name) != _topics.end())
42  {
43  TopicExists ex;
44  ex.name = name;
45  throw ex;
46  }
47 
48  Ice::Identity id = IceStormInternal::nameToIdentity(_instance, name);
49 
50  //
51  // Called by constructor or with 'this' mutex locked.
52  //
53  TraceLevelsPtr traceLevels = _instance->traceLevels();
54  if (traceLevels->topicMgr > 0)
55  {
56  Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
57  out << "creating new topic \"" << name << "\". id: "
58  << _instance->communicator()->identityToString(id);
59  }
60 
61  //
62  // Create topic implementation
63  //
64  TransientTopicImplPtr topicImpl = new TransientTopicImpl(_instance, name, id);
65 
66  //
67  // The identity is the name of the Topic.
68  //
69  TopicPrx prx = TopicPrx::uncheckedCast(_instance->topicAdapter()->add(topicImpl, id));
70  _topics.insert(map<string, TransientTopicImplPtr>::value_type(name, topicImpl));
71  return prx;
72 }
73 
75 TransientTopicManagerImpl::retrieve(const string& name, const Ice::Current&) const
76 {
77  Lock sync(*this);
78 
79  TransientTopicManagerImpl* This = const_cast<TransientTopicManagerImpl*>(this);
80  This->reap();
81 
82  map<string, TransientTopicImplPtr>::const_iterator p = _topics.find(name);
83  if (p == _topics.end())
84  {
85  NoSuchTopic ex;
86  ex.name = name;
87  throw ex;
88  }
89 
90  // Here we cannot just reconstruct the identity since the
91  // identity could be either instanceName/topic name, or if
92  // created with pre-3.2 IceStorm / topic name.
93  return TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id()));
94 }
95 
96 TopicDict
97 TransientTopicManagerImpl::retrieveAll(const Ice::Current&) const
98 {
99  Lock sync(*this);
100 
101  TransientTopicManagerImpl* This = const_cast<TransientTopicManagerImpl*>(this);
102  This->reap();
103 
104  TopicDict all;
105  for (map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
106  {
107  //
108  // Here we cannot just reconstruct the identity since the
109  // identity could be either "<instanceName>/topic.<topicname>"
110  // name, or if created with pre-3.2 IceStorm "/<topicname>".
111  //
112  all.insert(TopicDict::value_type(
113  p->first, TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id()))));
114  }
115 
116  return all;
117 }
118 Ice::SliceChecksumDict
120 {
121  return Ice::sliceChecksums();
122 }
123 
126 {
127  return IceStormElection::NodePrx();
128 }
129 
130 void
132 {
133  //
134  // Always called with mutex locked.
135  //
136  // Lock sync(*this);
137  //
138  vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
139  for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
140  {
141  map<string, TransientTopicImplPtr>::iterator i = _topics.find(*p);
142  if (i != _topics.end() && i->second->destroyed())
143  {
144  Ice::Identity id = i->second->id();
145  TraceLevelsPtr traceLevels = _instance->traceLevels();
146  if (traceLevels->topicMgr > 0)
147  {
148  Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
149  out << "Reaping " << i->first;
150  }
151 
152  try
153  {
154  _instance->topicAdapter()->remove(id);
155  }
156  catch (const Ice::ObjectAdapterDeactivatedException&)
157  {
158  // Ignore
159  }
160 
161  _topics.erase(i);
162  }
163  }
164 }
165 
166 void
168 {
169  Lock sync(*this);
170 
171  for (map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end(); ++p)
172  {
173  p->second->shutdown();
174  }
175 }
IceStorm
Definition: DBTypes.ice:22
IceStorm::TransientTopicManagerImpl::retrieveAll
virtual TopicDict retrieveAll(const Ice::Current &) const
Definition: TransientTopicManagerI.cpp:97
IceStorm::TransientTopicManagerImpl::create
virtual TopicPrx create(const std::string &, const Ice::Current &)
Definition: TransientTopicManagerI.cpp:35
IceStormElection::NodePrx
::IceInternal::ProxyHandle< ::IceProxy::IceStormElection::Node > NodePrx
Definition: Election.h:804
IceStorm::TransientTopicImpl
Definition: TransientTopicI.h:25
IceStorm::TransientTopicManagerImpl::~TransientTopicManagerImpl
~TransientTopicManagerImpl()
Definition: TransientTopicManagerI.cpp:30
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:523
armarx::armem::client::query_fns::all
auto all()
Definition: query_fns.h:10
IceStorm::TopicManagerInternal::getReplicaNode
idempotent IceStormElection::Node * getReplicaNode()
Return the replica node proxy for this topic manager.
TransientTopicI.h
Subscriber.h
IceStorm::TransientTopicManagerImpl
Definition: TransientTopicManagerI.h:30
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:438
IceStormInternal::nameToIdentity
Ice::Identity nameToIdentity(const IceStorm::InstancePtr &, const std::string &)
TraceLevels.h
std
Definition: Application.h:66
IceUtil::Handle
Definition: forward_declarations.h:29
IceInternal::ProxyHandle< ::IceProxy::IceStorm::Topic >
TransientTopicManagerI.h
IceStorm::TransientTopicManagerImpl::reap
void reap()
Definition: TransientTopicManagerI.cpp:131
IceStorm::TransientTopicManagerImpl::shutdown
void shutdown()
Definition: TransientTopicManagerI.cpp:167
IceStorm::TransientTopicManagerImpl::getSliceChecksums
virtual Ice::SliceChecksumDict getSliceChecksums(const Ice::Current &) const
Definition: TransientTopicManagerI.cpp:119
IceStorm::TransientTopicManagerImpl::TransientTopicManagerImpl
TransientTopicManagerImpl(const InstancePtr &)
Definition: TransientTopicManagerI.cpp:25
Instance.h
IceStorm::TransientTopicManagerImpl::retrieve
virtual TopicPrx retrieve(const std::string &, const Ice::Current &) const
Definition: TransientTopicManagerI.cpp:75