27 #include <IceStorm/IceStorm.h>
52 loop = getProperty<bool>(
"Loop").getValue();
54 std::string storageMode = getProperty<std::string>(
"StorageMode").getValue();
55 ARMARX_INFO <<
"reading from file: " << getProperty<std::string>(
"RecordFile").getValue();
56 if (!storageMode.compare(
"file"))
58 replayer.reset(
new FileTopicReader(getProperty<std::string>(
"RecordFile").getValue()));
60 else if (!storageMode.compare(
"database"))
68 <<
"StorageMode " << storageMode
69 <<
" is not supported (database, file). Falling back to default 'database' mode.";
75 offeringTopic(getProperty<std::string>(
"DebugObserverName").getValue());
81 debugObserver = getTopic<DebugObserverInterfacePrx>(
82 getProperty<std::string>(
"DebugObserverName").getValue());
88 getTopic<TopicReplayerListenerInterfacePrx>(getProperty<std::string>(
"TopicName"));
102 return "TopicReplayer";
108 std::map<std::string, Ice::ObjectPrx> topics;
111 while (!
task->isStopped())
113 if (replayer->read(
data))
116 if (topics.count(
topic) == 0)
118 Ice::ObjectPrx topicPrx = getTopic<Ice::ObjectPrx>(
topic);
119 topics[
topic] = topicPrx;
121 Ice::ObjectPrx t = topics[
topic];
122 std::vector<Ice::Byte> dataBytesOut;
124 while (this->timeKeeper.
getTime() <
data.timestamp && !
task->isStopped())
128 if (this->replayingTopicsNotSupportedByFile ||
129 replayingTopics.count(
data.topicName))
131 t->ice_invoke(
data.operationName, Ice::Normal,
data.inParams, dataBytesOut);
136 if (!replayer->seekTo(IceUtil::Time::seconds(0.0)))
148 debugValues[
"file"] =
149 new Variant(getProperty<std::string>(
"RecordFile").getValue());
150 debugValues[
"status"] =
new Variant(
"rewinding");
151 debugObserver->setDebugChannel(
getName(), debugValues);
158 replayerListener->onStopReply();
161 debugValues[
"status"] =
new Variant(
"stopped");
162 debugObserver->setDebugChannel(
getName(), debugValues);
188 this->timeKeeper.
stop();
191 debugValues[
"status"] =
new Variant(
"paused");
192 debugObserver->setDebugChannel(
getName(), debugValues);
198 if (
task->isStopped())
202 this->timeKeeper.
start();
203 this->replayerListener->onStartReplay(getProperty<std::string>(
"RecordFile").getValue());
206 debugValues[
"file"] =
new Variant(getProperty<std::string>(
"RecordFile").getValue());
207 debugValues[
"status"] =
new Variant(
"started");
208 debugObserver->setDebugChannel(
getName(), debugValues);
214 this->timeKeeper.
setSpeed((
float)factor);
220 ARMARX_INFO <<
"Jump called with TimeStamp: " << timestamp.toDuration();
221 this->timeKeeper.
step(timestamp - this->timeKeeper.
getTime());
222 replayer->seekTo(timestamp);
229 std::vector<std::string>
232 std::vector<std::string> topics = replayer->getReplayTopics();
235 replayingTopicsNotSupportedByFile =
true;
244 this->replayingTopics = std::unordered_set<std::string>(topics.begin(), topics.end());
250 return replayer->getReplayLength();
262 this->autoplay = autoplay;