TransientTopicManagerI.cpp
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4 //
5 // This copy of Ice is licensed to you under the terms described in the
6 // ICE_LICENSE file included in this distribution.
7 //
8 // **********************************************************************
9 
10 #include <functional>
11 
12 #include <Ice/Ice.h>
13 #include <Ice/SliceChecksums.h>
14 #include <IceStorm/Instance.h>
15 #include <IceStorm/Subscriber.h>
16 #include <IceStorm/TraceLevels.h>
19 
20 using namespace IceStorm;
21 using namespace std;
22 
24  _instance(instance)
25 {
26 }
27 
29 {
30 }
31 
33 TransientTopicManagerImpl::create(const string& name, const Ice::Current&)
34 {
35  Lock sync(*this);
36 
37  reap();
38 
39  if (_topics.find(name) != _topics.end())
40  {
41  TopicExists ex;
42  ex.name = name;
43  throw ex;
44  }
45 
46  Ice::Identity id = IceStormInternal::nameToIdentity(_instance, name);
47 
48  //
49  // Called by constructor or with 'this' mutex locked.
50  //
51  TraceLevelsPtr traceLevels = _instance->traceLevels();
52  if (traceLevels->topicMgr > 0)
53  {
54  Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
55  out << "creating new topic \"" << name
56  << "\". id: " << _instance->communicator()->identityToString(id);
57  }
58 
59  //
60  // Create topic implementation
61  //
62  TransientTopicImplPtr topicImpl = new TransientTopicImpl(_instance, name, id);
63 
64  //
65  // The identity is the name of the Topic.
66  //
67  TopicPrx prx = TopicPrx::uncheckedCast(_instance->topicAdapter()->add(topicImpl, id));
68  _topics.insert(map<string, TransientTopicImplPtr>::value_type(name, topicImpl));
69  return prx;
70 }
71 
73 TransientTopicManagerImpl::retrieve(const string& name, const Ice::Current&) const
74 {
75  Lock sync(*this);
76 
77  TransientTopicManagerImpl* This = const_cast<TransientTopicManagerImpl*>(this);
78  This->reap();
79 
80  map<string, TransientTopicImplPtr>::const_iterator p = _topics.find(name);
81  if (p == _topics.end())
82  {
83  NoSuchTopic ex;
84  ex.name = name;
85  throw ex;
86  }
87 
88  // Here we cannot just reconstruct the identity since the
89  // identity could be either instanceName/topic name, or if
90  // created with pre-3.2 IceStorm / topic name.
91  return TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id()));
92 }
93 
94 TopicDict
95 TransientTopicManagerImpl::retrieveAll(const Ice::Current&) const
96 {
97  Lock sync(*this);
98 
99  TransientTopicManagerImpl* This = const_cast<TransientTopicManagerImpl*>(this);
100  This->reap();
101 
102  TopicDict all;
103  for (map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end();
104  ++p)
105  {
106  //
107  // Here we cannot just reconstruct the identity since the
108  // identity could be either "<instanceName>/topic.<topicname>"
109  // name, or if created with pre-3.2 IceStorm "/<topicname>".
110  //
111  all.insert(TopicDict::value_type(
112  p->first,
113  TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id()))));
114  }
115 
116  return all;
117 }
118 
119 Ice::SliceChecksumDict
121 {
122  return Ice::sliceChecksums();
123 }
124 
127 {
128  return IceStormElection::NodePrx();
129 }
130 
131 void
133 {
134  //
135  // Always called with mutex locked.
136  //
137  // Lock sync(*this);
138  //
139  vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
140  for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
141  {
142  map<string, TransientTopicImplPtr>::iterator i = _topics.find(*p);
143  if (i != _topics.end() && i->second->destroyed())
144  {
145  Ice::Identity id = i->second->id();
146  TraceLevelsPtr traceLevels = _instance->traceLevels();
147  if (traceLevels->topicMgr > 0)
148  {
149  Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
150  out << "Reaping " << i->first;
151  }
152 
153  try
154  {
155  _instance->topicAdapter()->remove(id);
156  }
157  catch (const Ice::ObjectAdapterDeactivatedException&)
158  {
159  // Ignore
160  }
161 
162  _topics.erase(i);
163  }
164  }
165 }
166 
167 void
169 {
170  Lock sync(*this);
171 
172  for (map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end();
173  ++p)
174  {
175  p->second->shutdown();
176  }
177 }
IceStorm
Definition: DBTypes.ice:22
IceStorm::TransientTopicManagerImpl::retrieveAll
virtual TopicDict retrieveAll(const Ice::Current &) const
Definition: TransientTopicManagerI.cpp:95
IceStorm::TransientTopicManagerImpl::create
virtual TopicPrx create(const std::string &, const Ice::Current &)
Definition: TransientTopicManagerI.cpp:33
IceStormElection::NodePrx
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::Node > NodePrx
Definition: Election.h:1210
IceStorm::TransientTopicImpl
Definition: TransientTopicI.h:25
IceStorm::TransientTopicManagerImpl::~TransientTopicManagerImpl
~TransientTopicManagerImpl()
Definition: TransientTopicManagerI.cpp:28
GfxTL::Identity
void Identity(MatrixXX< N, N, T > *a)
Definition: MatrixXX.h:570
armarx::armem::client::query_fns::all
auto all()
Definition: query_fns.h:9
IceStorm::TopicManagerInternal::getReplicaNode
idempotent IceStormElection::Node * getReplicaNode()
Return the replica node proxy for this topic manager.
TransientTopicI.h
Subscriber.h
IceStorm::TransientTopicManagerImpl
Definition: TransientTopicManagerI.h:30
GfxTL::Trace
T Trace(const MatrixXX< N, N, T > &a)
Definition: MatrixXX.h:478
IceStormInternal::nameToIdentity
Ice::Identity nameToIdentity(const IceStorm::InstancePtr &, const std::string &)
TraceLevels.h
std
Definition: Application.h:66
IceUtil::Handle
Definition: forward_declarations.h:30
IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic >
TransientTopicManagerI.h
IceStorm::TransientTopicManagerImpl::reap
void reap()
Definition: TransientTopicManagerI.cpp:132
IceStorm::TransientTopicManagerImpl::shutdown
void shutdown()
Definition: TransientTopicManagerI.cpp:168
IceStorm::TransientTopicManagerImpl::getSliceChecksums
virtual Ice::SliceChecksumDict getSliceChecksums(const Ice::Current &) const
Definition: TransientTopicManagerI.cpp:120
IceStorm::TransientTopicManagerImpl::TransientTopicManagerImpl
TransientTopicManagerImpl(const InstancePtr &)
Definition: TransientTopicManagerI.cpp:23
Instance.h
IceStorm::TransientTopicManagerImpl::retrieve
virtual TopicPrx retrieve(const std::string &, const Ice::Current &) const
Definition: TransientTopicManagerI.cpp:73