TopicReplayer.cpp
Go to the documentation of this file.
1 /*
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @package ArmarX
17 * @author Mirko Waechter( mirko.waechter at kit dot edu)
18 * @date 2016
19 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20 * GNU General Public License
21 */
22 
23 #include "TopicReplayer.h"
24 #include "TopicUtil.h"
25 #include "DatabaseTopicReader.h"
26 #include "FileTopicReader.h"
27 
31 
32 #include <IceStorm/IceStorm.h>
33 
34 #include <string>
35 
36 namespace armarx
37 {
39  {
40 
41  }
42 
44  {
46 
47  }
48 
50  {
51  loop = getProperty<bool>("Loop").getValue();
52  //Check which storage mode to use and init the reader
53  std::string storageMode = getProperty<std::string>("StorageMode").getValue();
54  ARMARX_INFO << "reading from file: " << getProperty<std::string>("RecordFile").getValue();
55  if (!storageMode.compare("file"))
56  {
57  replayer.reset(new FileTopicReader(getProperty<std::string>("RecordFile").getValue()));
58  }
59  else if (!storageMode.compare("database"))
60  {
61  replayer.reset(new DatabaseTopicReader(getProperty<std::string>("RecordFile").getValue()));
62  }
63  else
64  {
65  ARMARX_WARNING << "StorageMode " << storageMode << " is not supported (database, file). Falling back to default 'database' mode.";
66  replayer.reset(new DatabaseTopicReader(getProperty<std::string>("RecordFile").getValue()));
67  }
68 
69  offeringTopic(getProperty<std::string>("TopicName"));
70  offeringTopic(getProperty<std::string>("DebugObserverName").getValue());
71  }
72 
74  {
75  debugObserver = getTopic<DebugObserverInterfacePrx>(getProperty<std::string>("DebugObserverName").getValue());
76 
77  timeKeeper.reset();
78  timeKeeper.setSpeed(1.0);
79 
80  replayerListener = getTopic<TopicReplayerListenerInterfacePrx>(getProperty<std::string>("TopicName"));
81 
82  task = new RunningTask<TopicReplayer>(this, &TopicReplayer::play, "ReplayThread");
83  task->start();
84  if (autoplay)
85  {
86  setReplayingTopics(replayer->getReplayTopics());
87  resumeReplay();
88  }
89 
90  }
91 
92  std::string TopicReplayer::getDefaultName() const
93  {
94  return "TopicReplayer";
95  }
96 
98  {
99  std::map<std::string, Ice::ObjectPrx> topics;
101 
102  while (!task->isStopped())
103  {
104  if (replayer->read(data))
105  {
106  std::string topic = data.topicName;
107  if (topics.count(topic) == 0)
108  {
109  Ice::ObjectPrx topicPrx = getTopic<Ice::ObjectPrx>(topic);
110  topics [topic] = topicPrx;
111  }
112  Ice::ObjectPrx t = topics[topic];
113  std::vector<Ice::Byte> dataBytesOut;
114 
115  while (this->timeKeeper.getTime() < data.timestamp && !task->isStopped())
116  {
117  usleep(100);
118  }
119  if (this->replayingTopicsNotSupportedByFile || replayingTopics.count(data.topicName))
120  {
121  t->ice_invoke(data.operationName, Ice::Normal, data.inParams, dataBytesOut);
122  }
123  }
124  else if (loop)
125  {
126  if (!replayer->seekTo(IceUtil::Time::seconds(0.0)))
127  {
128  ARMARX_WARNING << "unable to rewind ";
129  task->stop();
130  timeKeeper.stop();
131 
132  }
133  else
134  {
135  ARMARX_INFO << "rewinding...";
136  timeKeeper.reset();
137 
138  StringVariantBaseMap debugValues;
139  debugValues["file"] = new Variant(getProperty<std::string>("RecordFile").getValue());
140  debugValues["status"] = new Variant("rewinding");
141  debugObserver->setDebugChannel(getName(), debugValues);
142  }
143  }
144  else
145  {
146  task->stop();
147  timeKeeper.stop();
148  replayerListener->onStopReply();
149 
150  StringVariantBaseMap debugValues;
151  debugValues["status"] = new Variant("stopped");
152  debugObserver->setDebugChannel(getName(), debugValues);
153 
154  if (autoplay)
155  {
156  getArmarXManager()->asyncShutdown();
157  }
158  }
159 
160  // StringVariantBaseMap debugValues;
161  // debugValues["replay_time"] = new Variant(timeKeeper.getTime());
162  // debugObserver->setDebugChannel(getName(), debugValues);
163  }
164  }
165 
167  {
168  if (task)
169  {
170  task->stop();
171  }
172  }
173 
175  {
176  this->timeKeeper.stop();
177 
178  StringVariantBaseMap debugValues;
179  debugValues["status"] = new Variant("paused");
180  debugObserver->setDebugChannel(getName(), debugValues);
181  }
182 
184  {
185  if (task->isStopped())
186  {
187  task->start();
188  }
189  this->timeKeeper.start();
190  this->replayerListener->onStartReplay(getProperty<std::string>("RecordFile").getValue());
191 
192  StringVariantBaseMap debugValues;
193  debugValues["file"] = new Variant(getProperty<std::string>("RecordFile").getValue());
194  debugValues["status"] = new Variant("started");
195  debugObserver->setDebugChannel(getName(), debugValues);
196  }
197 
198  void TopicReplayer::setReplaySpeed(double factor)
199  {
200  this->timeKeeper.setSpeed((float) factor);
201  }
202 
204  {
205  ARMARX_INFO << "Jump called with TimeStamp: " << timestamp.toDuration();
206  this->timeKeeper.step(timestamp - this->timeKeeper.getTime());
207  replayer->seekTo(timestamp);
208 
209  task->stop();
210  task = new RunningTask<TopicReplayer>(this, &TopicReplayer::play, "ReplayThread");
211  task->start();
212  }
213 
214  std::vector<std::string> TopicReplayer::getRecordedTopics()
215  {
216  std::vector<std::string> topics = replayer->getReplayTopics();
217  if (topics.empty())
218  {
219  replayingTopicsNotSupportedByFile = true;
220  }
221 
222  return topics;
223  }
224 
225  void TopicReplayer::setReplayingTopics(std::vector<std::string> topics)
226  {
227  this->replayingTopics = std::unordered_set<std::string>(topics.begin(), topics.end());
228  }
229 
231  {
232  return replayer->getReplayLength();
233  }
234 
236  {
237  return timeKeeper.getTime();
238  }
239 
240  void TopicReplayer::setAutoplay(bool autoplay)
241  {
242  this->autoplay = autoplay;
243  }
244 
245  void TopicReplayer::setLoop(bool loop)
246  {
247  this->loop = loop;
248  }
249 } // namespace armarx
250 
armarx::Variant
The Variant class is described here: Variants.
Definition: Variant.h:224
armarx::TopicReplayer::setReplayingTopics
void setReplayingTopics(std::vector< std::string > topics)
Definition: TopicReplayer.cpp:225
armarx::TopicReplayer::resumeReplay
void resumeReplay()
Definition: TopicReplayer.cpp:183
DatabaseTopicReader.h
armarx::TopicReplayer::onDisconnectComponent
void onDisconnectComponent() override
Hook for subclass.
Definition: TopicReplayer.cpp:166
armarx::StringVariantBaseMap
std::map< std::string, VariantBasePtr > StringVariantBaseMap
Definition: ManagedIceObject.h:111
JSONObject.h
armarx::FileTopicReader
Definition: FileTopicReader.h:33
ArmarXManager.h
armarx::TopicReplayer::createPropertyDefinitions
PropertyDefinitionsPtr createPropertyDefinitions() override
Definition: TopicReplayer.cpp:43
armarx::TopicReplayer::TopicReplayer
TopicReplayer()
Definition: TopicReplayer.cpp:38
armarx::TopicReplayer::jumpToPosition
void jumpToPosition(IceUtil::Time timestamp)
Definition: TopicReplayer.cpp:203
armarx::TimeKeeper::stop
void stop()
stops the clock.
Definition: TimeKeeper.cpp:76
armarx::DatabaseTopicReader
Definition: DatabaseTopicReader.h:36
armarx::ManagedIceObject::getArmarXManager
ArmarXManagerPtr getArmarXManager() const
Returns the ArmarX manager used to add and remove components.
Definition: ManagedIceObject.cpp:348
armarx::TimeKeeper::step
void step(IceUtil::Time stepSize)
adds a timedelta to the current time
Definition: TimeKeeper.cpp:85
armarx::TopicReplayer::getDefaultName
std::string getDefaultName() const override
Retrieve default name of component.
Definition: TopicReplayer.cpp:92
armarx::RunningTask
Definition: ArmarXMultipleObjectsScheduler.h:35
armarx::TopicReplayer::pauseReplay
void pauseReplay()
Definition: TopicReplayer.cpp:174
armarx::TopicReplayer::setLoop
void setLoop(bool loop)
Definition: TopicReplayer.cpp:245
armarx::TopicReplayer::setReplaySpeed
void setReplaySpeed(double factor)
Definition: TopicReplayer.cpp:198
armarx::TopicUtil::TopicData
Definition: TopicUtil.h:34
armarx::TopicReplayer::getReplayLength
IceUtil::Time getReplayLength() const
Definition: TopicReplayer.cpp:230
armarx::TopicReplayer::getCurrentTimePosition
IceUtil::Time getCurrentTimePosition() const
Definition: TopicReplayer.cpp:235
FileTopicReader.h
armarx::TimeKeeper::setSpeed
void setSpeed(float newSpeed)
sets the speed factor of the clock
Definition: TimeKeeper.cpp:55
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
armarx::ProxyType::topic
@ topic
armarx::TopicReplayer::task
RunningTask< TopicReplayer >::pointer_type task
Definition: TopicReplayer.h:84
TopicUtil.h
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
armarx::TimeKeeper::reset
void reset()
resets the clock
Definition: TimeKeeper.cpp:48
armarx::TopicReplayerProperties
Definition: TopicReplayer.h:41
armarx::TimeKeeper::getTime
IceUtil::Time getTime() const
get the current time of this clock
Definition: TimeKeeper.cpp:35
armarx::Component::getConfigIdentifier
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition: Component.cpp:74
TimeUtil.h
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:174
armarx::ManagedIceObject::offeringTopic
void offeringTopic(const std::string &name)
Registers a topic for retrival after initialization.
Definition: ManagedIceObject.cpp:290
IceUtil::Handle< class PropertyDefinitionContainer >
armarx::TopicReplayer::play
void play()
Definition: TopicReplayer.cpp:97
armarx::ManagedIceObject::getName
std::string getName() const
Retrieve name of object.
Definition: ManagedIceObject.cpp:107
armarx::TopicReplayer::getRecordedTopics
std::vector< std::string > getRecordedTopics()
Definition: TopicReplayer.cpp:214
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:186
armarx::TopicReplayer::onInitComponent
void onInitComponent() override
Pure virtual hook for the subclass.
Definition: TopicReplayer.cpp:49
TopicReplayer.h
armarx::TopicReplayer::setAutoplay
void setAutoplay(bool autoplay)
Definition: TopicReplayer.cpp:240
armarx::TopicReplayer::onConnectComponent
void onConnectComponent() override
Pure virtual hook for the subclass.
Definition: TopicReplayer.cpp:73
armarx::TimeKeeper::start
void start()
starts the clock
Definition: TimeKeeper.cpp:67
armarx
This file offers overloads of toIce() and fromIce() functions for STL container types.
Definition: ArmarXTimeserver.cpp:28