TopicReplayer.cpp
Go to the documentation of this file.
1/*
2* This file is part of ArmarX.
3*
4* ArmarX is free software; you can redistribute it and/or modify
5* it under the terms of the GNU General Public License version 2 as
6* published by the Free Software Foundation.
7*
8* ArmarX is distributed in the hope that it will be useful, but
9* WITHOUT ANY WARRANTY; without even the implied warranty of
10* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11* GNU General Public License for more details.
12*
13* You should have received a copy of the GNU General Public License
14* along with this program. If not, see <http://www.gnu.org/licenses/>.
15*
16* @package ArmarX
17* @author Mirko Waechter( mirko.waechter at kit dot edu)
18* @date 2016
19* @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20* GNU General Public License
21*/
22
23#include "TopicReplayer.h"
24
25#include <string>
26
27#include <IceStorm/IceStorm.h>
28
32
33#include "DatabaseTopicReader.h"
34#include "FileTopicReader.h"
35#include "TopicUtil.h"
36
37namespace armarx
38{
42
48
49 void
51 {
52 loop = getProperty<bool>("Loop").getValue();
53 //Check which storage mode to use and init the reader
54 std::string storageMode = getProperty<std::string>("StorageMode").getValue();
55 ARMARX_INFO << "reading from file: " << getProperty<std::string>("RecordFile").getValue();
56 if (!storageMode.compare("file"))
57 {
58 replayer.reset(new FileTopicReader(getProperty<std::string>("RecordFile").getValue()));
59 }
60 else if (!storageMode.compare("database"))
61 {
62 replayer.reset(
63 new DatabaseTopicReader(getProperty<std::string>("RecordFile").getValue()));
64 }
65 else
66 {
68 << "StorageMode " << storageMode
69 << " is not supported (database, file). Falling back to default 'database' mode.";
70 replayer.reset(
71 new DatabaseTopicReader(getProperty<std::string>("RecordFile").getValue()));
72 }
73
75 offeringTopic(getProperty<std::string>("DebugObserverName").getValue());
76 }
77
78 void
80 {
82 getProperty<std::string>("DebugObserverName").getValue());
83
84 timeKeeper.reset();
85 timeKeeper.setSpeed(1.0);
86
87 replayerListener =
89
90 task = new RunningTask<TopicReplayer>(this, &TopicReplayer::play, "ReplayThread");
91 task->start();
92 if (autoplay)
93 {
94 setReplayingTopics(replayer->getReplayTopics());
96 }
97 }
98
99 std::string
101 {
102 return "TopicReplayer";
103 }
104
105 void
107 {
108 std::map<std::string, Ice::ObjectPrx> topics;
110
111 while (!task->isStopped())
112 {
113 if (replayer->read(data))
114 {
115 std::string topic = data.topicName;
116 if (topics.count(topic) == 0)
117 {
118 Ice::ObjectPrx topicPrx = getTopic<Ice::ObjectPrx>(topic);
119 topics[topic] = topicPrx;
120 }
121 Ice::ObjectPrx t = topics[topic];
122 std::vector<Ice::Byte> dataBytesOut;
123
124 while (this->timeKeeper.getTime() < data.timestamp && !task->isStopped())
125 {
126 usleep(100);
127 }
128 if (this->replayingTopicsNotSupportedByFile ||
129 replayingTopics.count(data.topicName))
130 {
131 t->ice_invoke(data.operationName, Ice::Normal, data.inParams, dataBytesOut);
132 }
133 }
134 else if (loop)
135 {
136 if (!replayer->seekTo(IceUtil::Time::seconds(0.0)))
137 {
138 ARMARX_WARNING << "unable to rewind ";
139 task->stop();
140 timeKeeper.stop();
141 }
142 else
143 {
144 ARMARX_INFO << "rewinding...";
145 timeKeeper.reset();
146
147 StringVariantBaseMap debugValues;
148 debugValues["file"] =
149 new Variant(getProperty<std::string>("RecordFile").getValue());
150 debugValues["status"] = new Variant("rewinding");
151 debugObserver->setDebugChannel(getName(), debugValues);
152 }
153 }
154 else
155 {
156 task->stop();
157 timeKeeper.stop();
158 replayerListener->onStopReply();
159
160 StringVariantBaseMap debugValues;
161 debugValues["status"] = new Variant("stopped");
162 debugObserver->setDebugChannel(getName(), debugValues);
163
164 if (autoplay)
165 {
166 getArmarXManager()->asyncShutdown();
167 }
168 }
169
170 // StringVariantBaseMap debugValues;
171 // debugValues["replay_time"] = new Variant(timeKeeper.getTime());
172 // debugObserver->setDebugChannel(getName(), debugValues);
173 }
174 }
175
176 void
178 {
179 if (task)
180 {
181 task->stop();
182 }
183 }
184
185 void
187 {
188 this->timeKeeper.stop();
189
190 StringVariantBaseMap debugValues;
191 debugValues["status"] = new Variant("paused");
192 debugObserver->setDebugChannel(getName(), debugValues);
193 }
194
195 void
197 {
198 if (task->isStopped())
199 {
200 task->start();
201 }
202 this->timeKeeper.start();
203 this->replayerListener->onStartReplay(getProperty<std::string>("RecordFile").getValue());
204
205 StringVariantBaseMap debugValues;
206 debugValues["file"] = new Variant(getProperty<std::string>("RecordFile").getValue());
207 debugValues["status"] = new Variant("started");
208 debugObserver->setDebugChannel(getName(), debugValues);
209 }
210
211 void
213 {
214 this->timeKeeper.setSpeed((float)factor);
215 }
216
217 void
219 {
220 ARMARX_INFO << "Jump called with TimeStamp: " << timestamp.toDuration();
221 this->timeKeeper.step(timestamp - this->timeKeeper.getTime());
222 replayer->seekTo(timestamp);
223
224 task->stop();
225 task = new RunningTask<TopicReplayer>(this, &TopicReplayer::play, "ReplayThread");
226 task->start();
227 }
228
229 std::vector<std::string>
231 {
232 std::vector<std::string> topics = replayer->getReplayTopics();
233 if (topics.empty())
234 {
235 replayingTopicsNotSupportedByFile = true;
236 }
237
238 return topics;
239 }
240
241 void
242 TopicReplayer::setReplayingTopics(std::vector<std::string> topics)
243 {
244 this->replayingTopics = std::unordered_set<std::string>(topics.begin(), topics.end());
245 }
246
247 IceUtil::Time
249 {
250 return replayer->getReplayLength();
251 }
252
253 IceUtil::Time
255 {
256 return timeKeeper.getTime();
257 }
258
259 void
261 {
262 this->autoplay = autoplay;
263 }
264
265 void
267 {
268 this->loop = loop;
269 }
270} // namespace armarx
std::string timestamp()
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition Component.cpp:90
Property< PropertyType > getProperty(const std::string &name)
void offeringTopic(const std::string &name)
Registers a topic for retrival after initialization.
TopicProxyType getTopic(const std::string &name)
Returns a proxy of the specified topic.
std::string getName() const
Retrieve name of object.
ArmarXManagerPtr getArmarXManager() const
Returns the ArmarX manager used to add and remove components.
void onInitComponent() override
Pure virtual hook for the subclass.
IceUtil::Time getCurrentTimePosition() const
RunningTask< TopicReplayer >::pointer_type task
void onDisconnectComponent() override
Hook for subclass.
void setReplaySpeed(double factor)
void setAutoplay(bool autoplay)
std::vector< std::string > getRecordedTopics()
IceUtil::Time getReplayLength() const
void setLoop(bool loop)
void onConnectComponent() override
Pure virtual hook for the subclass.
PropertyDefinitionsPtr createPropertyDefinitions() override
void setReplayingTopics(std::vector< std::string > topics)
void jumpToPosition(IceUtil::Time timestamp)
std::string getDefaultName() const override
Retrieve default name of component.
The Variant class is described here: Variants.
Definition Variant.h:224
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
This file offers overloads of toIce() and fromIce() functions for STL container types.
std::map< std::string, VariantBasePtr > StringVariantBaseMap
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.