TopicReplayer.cpp
Go to the documentation of this file.
1 /*
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @package ArmarX
17 * @author Mirko Waechter( mirko.waechter at kit dot edu)
18 * @date 2016
19 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20 * GNU General Public License
21 */
22 
23 #include "TopicReplayer.h"
24 
25 #include <string>
26 
27 #include <IceStorm/IceStorm.h>
28 
32 
33 #include "DatabaseTopicReader.h"
34 #include "FileTopicReader.h"
35 #include "TopicUtil.h"
36 
37 namespace armarx
38 {
40  {
41  }
42 
45  {
47  }
48 
49  void
51  {
52  loop = getProperty<bool>("Loop").getValue();
53  //Check which storage mode to use and init the reader
54  std::string storageMode = getProperty<std::string>("StorageMode").getValue();
55  ARMARX_INFO << "reading from file: " << getProperty<std::string>("RecordFile").getValue();
56  if (!storageMode.compare("file"))
57  {
58  replayer.reset(new FileTopicReader(getProperty<std::string>("RecordFile").getValue()));
59  }
60  else if (!storageMode.compare("database"))
61  {
62  replayer.reset(
63  new DatabaseTopicReader(getProperty<std::string>("RecordFile").getValue()));
64  }
65  else
66  {
68  << "StorageMode " << storageMode
69  << " is not supported (database, file). Falling back to default 'database' mode.";
70  replayer.reset(
71  new DatabaseTopicReader(getProperty<std::string>("RecordFile").getValue()));
72  }
73 
74  offeringTopic(getProperty<std::string>("TopicName"));
75  offeringTopic(getProperty<std::string>("DebugObserverName").getValue());
76  }
77 
78  void
80  {
81  debugObserver = getTopic<DebugObserverInterfacePrx>(
82  getProperty<std::string>("DebugObserverName").getValue());
83 
84  timeKeeper.reset();
85  timeKeeper.setSpeed(1.0);
86 
87  replayerListener =
88  getTopic<TopicReplayerListenerInterfacePrx>(getProperty<std::string>("TopicName"));
89 
90  task = new RunningTask<TopicReplayer>(this, &TopicReplayer::play, "ReplayThread");
91  task->start();
92  if (autoplay)
93  {
94  setReplayingTopics(replayer->getReplayTopics());
95  resumeReplay();
96  }
97  }
98 
99  std::string
101  {
102  return "TopicReplayer";
103  }
104 
105  void
107  {
108  std::map<std::string, Ice::ObjectPrx> topics;
110 
111  while (!task->isStopped())
112  {
113  if (replayer->read(data))
114  {
115  std::string topic = data.topicName;
116  if (topics.count(topic) == 0)
117  {
118  Ice::ObjectPrx topicPrx = getTopic<Ice::ObjectPrx>(topic);
119  topics[topic] = topicPrx;
120  }
121  Ice::ObjectPrx t = topics[topic];
122  std::vector<Ice::Byte> dataBytesOut;
123 
124  while (this->timeKeeper.getTime() < data.timestamp && !task->isStopped())
125  {
126  usleep(100);
127  }
128  if (this->replayingTopicsNotSupportedByFile ||
129  replayingTopics.count(data.topicName))
130  {
131  t->ice_invoke(data.operationName, Ice::Normal, data.inParams, dataBytesOut);
132  }
133  }
134  else if (loop)
135  {
136  if (!replayer->seekTo(IceUtil::Time::seconds(0.0)))
137  {
138  ARMARX_WARNING << "unable to rewind ";
139  task->stop();
140  timeKeeper.stop();
141  }
142  else
143  {
144  ARMARX_INFO << "rewinding...";
145  timeKeeper.reset();
146 
147  StringVariantBaseMap debugValues;
148  debugValues["file"] =
149  new Variant(getProperty<std::string>("RecordFile").getValue());
150  debugValues["status"] = new Variant("rewinding");
151  debugObserver->setDebugChannel(getName(), debugValues);
152  }
153  }
154  else
155  {
156  task->stop();
157  timeKeeper.stop();
158  replayerListener->onStopReply();
159 
160  StringVariantBaseMap debugValues;
161  debugValues["status"] = new Variant("stopped");
162  debugObserver->setDebugChannel(getName(), debugValues);
163 
164  if (autoplay)
165  {
166  getArmarXManager()->asyncShutdown();
167  }
168  }
169 
170  // StringVariantBaseMap debugValues;
171  // debugValues["replay_time"] = new Variant(timeKeeper.getTime());
172  // debugObserver->setDebugChannel(getName(), debugValues);
173  }
174  }
175 
176  void
178  {
179  if (task)
180  {
181  task->stop();
182  }
183  }
184 
185  void
187  {
188  this->timeKeeper.stop();
189 
190  StringVariantBaseMap debugValues;
191  debugValues["status"] = new Variant("paused");
192  debugObserver->setDebugChannel(getName(), debugValues);
193  }
194 
195  void
197  {
198  if (task->isStopped())
199  {
200  task->start();
201  }
202  this->timeKeeper.start();
203  this->replayerListener->onStartReplay(getProperty<std::string>("RecordFile").getValue());
204 
205  StringVariantBaseMap debugValues;
206  debugValues["file"] = new Variant(getProperty<std::string>("RecordFile").getValue());
207  debugValues["status"] = new Variant("started");
208  debugObserver->setDebugChannel(getName(), debugValues);
209  }
210 
211  void
213  {
214  this->timeKeeper.setSpeed((float)factor);
215  }
216 
217  void
219  {
220  ARMARX_INFO << "Jump called with TimeStamp: " << timestamp.toDuration();
221  this->timeKeeper.step(timestamp - this->timeKeeper.getTime());
222  replayer->seekTo(timestamp);
223 
224  task->stop();
225  task = new RunningTask<TopicReplayer>(this, &TopicReplayer::play, "ReplayThread");
226  task->start();
227  }
228 
229  std::vector<std::string>
231  {
232  std::vector<std::string> topics = replayer->getReplayTopics();
233  if (topics.empty())
234  {
235  replayingTopicsNotSupportedByFile = true;
236  }
237 
238  return topics;
239  }
240 
241  void
242  TopicReplayer::setReplayingTopics(std::vector<std::string> topics)
243  {
244  this->replayingTopics = std::unordered_set<std::string>(topics.begin(), topics.end());
245  }
246 
249  {
250  return replayer->getReplayLength();
251  }
252 
255  {
256  return timeKeeper.getTime();
257  }
258 
259  void
261  {
262  this->autoplay = autoplay;
263  }
264 
265  void
267  {
268  this->loop = loop;
269  }
270 } // namespace armarx
armarx::Variant
The Variant class is described here: Variants.
Definition: Variant.h:223
armarx::TopicReplayer::setReplayingTopics
void setReplayingTopics(std::vector< std::string > topics)
Definition: TopicReplayer.cpp:242
armarx::TopicReplayer::resumeReplay
void resumeReplay()
Definition: TopicReplayer.cpp:196
DatabaseTopicReader.h
armarx::TopicReplayer::onDisconnectComponent
void onDisconnectComponent() override
Hook for subclass.
Definition: TopicReplayer.cpp:177
armarx::StringVariantBaseMap
std::map< std::string, VariantBasePtr > StringVariantBaseMap
Definition: ManagedIceObject.h:110
JSONObject.h
armarx::FileTopicReader
Definition: FileTopicReader.h:33
ArmarXManager.h
armarx::TopicReplayer::createPropertyDefinitions
PropertyDefinitionsPtr createPropertyDefinitions() override
Definition: TopicReplayer.cpp:44
armarx::TopicReplayer::TopicReplayer
TopicReplayer()
Definition: TopicReplayer.cpp:39
armarx::TopicReplayer::jumpToPosition
void jumpToPosition(IceUtil::Time timestamp)
Definition: TopicReplayer.cpp:218
armarx::TimeKeeper::stop
void stop()
stops the clock.
Definition: TimeKeeper.cpp:82
armarx::DatabaseTopicReader
Definition: DatabaseTopicReader.h:35
armarx::ManagedIceObject::getArmarXManager
ArmarXManagerPtr getArmarXManager() const
Returns the ArmarX manager used to add and remove components.
Definition: ManagedIceObject.cpp:360
armarx::TimeKeeper::step
void step(IceUtil::Time stepSize)
adds a timedelta to the current time
Definition: TimeKeeper.cpp:92
armarx::TopicReplayer::getDefaultName
std::string getDefaultName() const override
Retrieve default name of component.
Definition: TopicReplayer.cpp:100
armarx::RunningTask
Definition: ArmarXMultipleObjectsScheduler.h:36
armarx::TopicReplayer::pauseReplay
void pauseReplay()
Definition: TopicReplayer.cpp:186
armarx::TopicReplayer::setLoop
void setLoop(bool loop)
Definition: TopicReplayer.cpp:266
armarx::TopicReplayer::setReplaySpeed
void setReplaySpeed(double factor)
Definition: TopicReplayer.cpp:212
armarx::TopicUtil::TopicData
Definition: TopicUtil.h:34
armarx::TopicReplayer::getReplayLength
IceUtil::Time getReplayLength() const
Definition: TopicReplayer.cpp:248
armarx::TopicReplayer::getCurrentTimePosition
IceUtil::Time getCurrentTimePosition() const
Definition: TopicReplayer.cpp:254
FileTopicReader.h
armarx::TimeKeeper::setSpeed
void setSpeed(float newSpeed)
sets the speed factor of the clock
Definition: TimeKeeper.cpp:58
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
armarx::ProxyType::topic
@ topic
armarx::TopicReplayer::task
RunningTask< TopicReplayer >::pointer_type task
Definition: TopicReplayer.h:87
TopicUtil.h
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
armarx::TimeKeeper::reset
void reset()
resets the clock
Definition: TimeKeeper.cpp:50
armarx::TopicReplayerProperties
Definition: TopicReplayer.h:38
armarx::TimeKeeper::getTime
IceUtil::Time getTime() const
get the current time of this clock
Definition: TimeKeeper.cpp:36
armarx::Component::getConfigIdentifier
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition: Component.cpp:79
TimeUtil.h
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:181
armarx::ManagedIceObject::offeringTopic
void offeringTopic(const std::string &name)
Registers a topic for retrival after initialization.
Definition: ManagedIceObject.cpp:300
IceUtil::Handle
Definition: forward_declarations.h:30
armarx::TopicReplayer::play
void play()
Definition: TopicReplayer.cpp:106
armarx::ManagedIceObject::getName
std::string getName() const
Retrieve name of object.
Definition: ManagedIceObject.cpp:108
armarx::TopicReplayer::getRecordedTopics
std::vector< std::string > getRecordedTopics()
Definition: TopicReplayer.cpp:230
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
armarx::TopicReplayer::onInitComponent
void onInitComponent() override
Pure virtual hook for the subclass.
Definition: TopicReplayer.cpp:50
TopicReplayer.h
armarx::TopicReplayer::setAutoplay
void setAutoplay(bool autoplay)
Definition: TopicReplayer.cpp:260
armarx::TopicReplayer::onConnectComponent
void onConnectComponent() override
Pure virtual hook for the subclass.
Definition: TopicReplayer.cpp:79
armarx::TimeKeeper::start
void start()
starts the clock
Definition: TimeKeeper.cpp:72
armarx
This file offers overloads of toIce() and fromIce() functions for STL container types.
Definition: ArmarXTimeserver.cpp:27