TransientTopicManagerI.cpp
Go to the documentation of this file.
1// **********************************************************************
2//
3// Copyright (c) 2003-2017 ZeroC, Inc. All rights reserved.
4//
5// This copy of Ice is licensed to you under the terms described in the
6// ICE_LICENSE file included in this distribution.
7//
8// **********************************************************************
9
10#include <functional>
11
12#include <Ice/Ice.h>
13#include <Ice/SliceChecksums.h>
14#include <IceStorm/Instance.h>
15#include <IceStorm/Subscriber.h>
19
20using namespace IceStorm;
21using namespace std;
22
24 _instance(instance)
25{
26}
27
31
33TransientTopicManagerImpl::create(const string& name, const Ice::Current&)
34{
35 Lock sync(*this);
36
37 reap();
38
39 if (_topics.find(name) != _topics.end())
40 {
41 TopicExists ex;
42 ex.name = name;
43 throw ex;
44 }
45
46 Ice::Identity id = IceStormInternal::nameToIdentity(_instance, name);
47
48 //
49 // Called by constructor or with 'this' mutex locked.
50 //
51 TraceLevelsPtr traceLevels = _instance->traceLevels();
52 if (traceLevels->topicMgr > 0)
53 {
54 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
55 out << "creating new topic \"" << name
56 << "\". id: " << _instance->communicator()->identityToString(id);
57 }
58
59 //
60 // Create topic implementation
61 //
62 TransientTopicImplPtr topicImpl = new TransientTopicImpl(_instance, name, id);
63
64 //
65 // The identity is the name of the Topic.
66 //
67 TopicPrx prx = TopicPrx::uncheckedCast(_instance->topicAdapter()->add(topicImpl, id));
68 _topics.insert(map<string, TransientTopicImplPtr>::value_type(name, topicImpl));
69 return prx;
70}
71
73TransientTopicManagerImpl::retrieve(const string& name, const Ice::Current&) const
74{
75 Lock sync(*this);
76
78 This->reap();
79
80 map<string, TransientTopicImplPtr>::const_iterator p = _topics.find(name);
81 if (p == _topics.end())
82 {
83 NoSuchTopic ex;
84 ex.name = name;
85 throw ex;
86 }
87
88 // Here we cannot just reconstruct the identity since the
89 // identity could be either instanceName/topic name, or if
90 // created with pre-3.2 IceStorm / topic name.
91 return TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id()));
92}
93
94TopicDict
96{
97 Lock sync(*this);
98
100 This->reap();
101
102 TopicDict all;
103 for (map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end();
104 ++p)
105 {
106 //
107 // Here we cannot just reconstruct the identity since the
108 // identity could be either "<instanceName>/topic.<topicname>"
109 // name, or if created with pre-3.2 IceStorm "/<topicname>".
110 //
111 all.insert(TopicDict::value_type(
112 p->first,
113 TopicPrx::uncheckedCast(_instance->topicAdapter()->createProxy(p->second->id()))));
114 }
115
116 return all;
117}
118
119Ice::SliceChecksumDict
121{
122 return Ice::sliceChecksums();
123}
124
127{
129}
130
131void
133{
134 //
135 // Always called with mutex locked.
136 //
137 // Lock sync(*this);
138 //
139 vector<string> reaped = _instance->topicReaper()->consumeReapedTopics();
140 for (vector<string>::const_iterator p = reaped.begin(); p != reaped.end(); ++p)
141 {
142 map<string, TransientTopicImplPtr>::iterator i = _topics.find(*p);
143 if (i != _topics.end() && i->second->destroyed())
144 {
145 Ice::Identity id = i->second->id();
146 TraceLevelsPtr traceLevels = _instance->traceLevels();
147 if (traceLevels->topicMgr > 0)
148 {
149 Ice::Trace out(traceLevels->logger, traceLevels->topicMgrCat);
150 out << "Reaping " << i->first;
151 }
152
153 try
154 {
155 _instance->topicAdapter()->remove(id);
156 }
157 catch (const Ice::ObjectAdapterDeactivatedException&)
158 {
159 // Ignore
160 }
161
162 _topics.erase(i);
163 }
164 }
165}
166
167void
169{
170 Lock sync(*this);
171
172 for (map<string, TransientTopicImplPtr>::const_iterator p = _topics.begin(); p != _topics.end();
173 ++p)
174 {
175 p->second->shutdown();
176 }
177}
idempotent IceStormElection::Node * getReplicaNode()
Return the replica node proxy for this topic manager.
virtual TopicDict retrieveAll(const Ice::Current &) const
virtual TopicPrx create(const std::string &, const Ice::Current &)
virtual Ice::SliceChecksumDict getSliceChecksums(const Ice::Current &) const
virtual TopicPrx retrieve(const std::string &, const Ice::Current &) const
::IceInternal::ProxyHandle<::IceProxy::IceStormElection::Node > NodePrx
Definition Election.h:1210
Ice::Identity nameToIdentity(const IceStorm::InstancePtr &, const std::string &)
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
Definition IceManager.h:70
IceUtil::Handle< TraceLevels > TraceLevelsPtr
Definition Instance.h:44
IceUtil::Handle< Instance > InstancePtr
Definition Instance.h:128
IceUtil::Handle< TransientTopicImpl > TransientTopicImplPtr