32 #include <IceStorm/IceStorm.h>
51 loop = getProperty<bool>(
"Loop").getValue();
53 std::string storageMode = getProperty<std::string>(
"StorageMode").getValue();
54 ARMARX_INFO <<
"reading from file: " << getProperty<std::string>(
"RecordFile").getValue();
55 if (!storageMode.compare(
"file"))
57 replayer.reset(
new FileTopicReader(getProperty<std::string>(
"RecordFile").getValue()));
59 else if (!storageMode.compare(
"database"))
65 ARMARX_WARNING <<
"StorageMode " << storageMode <<
" is not supported (database, file). Falling back to default 'database' mode.";
70 offeringTopic(getProperty<std::string>(
"DebugObserverName").getValue());
75 debugObserver = getTopic<DebugObserverInterfacePrx>(getProperty<std::string>(
"DebugObserverName").getValue());
80 replayerListener = getTopic<TopicReplayerListenerInterfacePrx>(getProperty<std::string>(
"TopicName"));
94 return "TopicReplayer";
99 std::map<std::string, Ice::ObjectPrx> topics;
102 while (!
task->isStopped())
104 if (replayer->read(
data))
107 if (topics.count(
topic) == 0)
109 Ice::ObjectPrx topicPrx = getTopic<Ice::ObjectPrx>(
topic);
110 topics [
topic] = topicPrx;
112 Ice::ObjectPrx t = topics[
topic];
113 std::vector<Ice::Byte> dataBytesOut;
115 while (this->timeKeeper.
getTime() <
data.timestamp && !
task->isStopped())
119 if (this->replayingTopicsNotSupportedByFile || replayingTopics.count(
data.topicName))
121 t->ice_invoke(
data.operationName, Ice::Normal,
data.inParams, dataBytesOut);
126 if (!replayer->seekTo(IceUtil::Time::seconds(0.0)))
139 debugValues[
"file"] =
new Variant(getProperty<std::string>(
"RecordFile").getValue());
140 debugValues[
"status"] =
new Variant(
"rewinding");
141 debugObserver->setDebugChannel(
getName(), debugValues);
148 replayerListener->onStopReply();
151 debugValues[
"status"] =
new Variant(
"stopped");
152 debugObserver->setDebugChannel(
getName(), debugValues);
176 this->timeKeeper.
stop();
179 debugValues[
"status"] =
new Variant(
"paused");
180 debugObserver->setDebugChannel(
getName(), debugValues);
185 if (
task->isStopped())
189 this->timeKeeper.
start();
190 this->replayerListener->onStartReplay(getProperty<std::string>(
"RecordFile").getValue());
193 debugValues[
"file"] =
new Variant(getProperty<std::string>(
"RecordFile").getValue());
194 debugValues[
"status"] =
new Variant(
"started");
195 debugObserver->setDebugChannel(
getName(), debugValues);
200 this->timeKeeper.
setSpeed((
float) factor);
205 ARMARX_INFO <<
"Jump called with TimeStamp: " << timestamp.toDuration();
206 this->timeKeeper.
step(timestamp - this->timeKeeper.
getTime());
207 replayer->seekTo(timestamp);
216 std::vector<std::string> topics = replayer->getReplayTopics();
219 replayingTopicsNotSupportedByFile =
true;
227 this->replayingTopics = std::unordered_set<std::string>(topics.begin(), topics.end());
232 return replayer->getReplayLength();
242 this->autoplay = autoplay;