TopicRecorder.cpp
Go to the documentation of this file.
1 /*
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @package ArmarX
17 * @author Mirko Waechter( mirko.waechter at kit dot edu)
18 * @date 2016
19 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20 * GNU General Public License
21 */
22 #include "DatabaseTopicWriter.h"
23 #include "FileTopicWriter.h"
25 #include "GenericTopicSubscriber.h"
26 #include "TopicRecorder.h"
27 #include <Ice/ObjectAdapter.h>
28 #include <fstream>
29 #include <SimoxUtility/algorithm/string/string_tools.h>
31 #include <IceStorm/IceStorm.h> // for TopicPrx, TopicManagerPrx, etc
32 
34 namespace armarx
35 {
36 
38  {
39  writer.reset();
40  }
41 
43  {
45 
46  }
47 
49  {
50  idleCondition.notify_all();
51  }
52 
53 
55  {
56  isRecordingEnabled = false;
57  outputFilename = getProperty<std::string>("Outputfile").getValue();
58 
59  offeringTopic("TopicRecorderListener");
60  }
61 
62 
64  {
65  topicRecoderListener = getTopic<TopicRecorderListenerInterfacePrx>("TopicRecorderListener");
66 
67  auto allTopics = getIceManager()->getTopicManager()->retrieveAll();
68  for (auto& elem : allTopics)
69  {
70  ARMARX_INFO << "Topic: " << elem.first;
71  }
72 
73  auto startTimestamp = TimeUtil::GetTime();
74  Ice::StringSeq topics;
75  if (getProperty<std::string>("TopicsToLog").getValue() == "*")
76  {
77  for (auto& elem : allTopics)
78  {
79  topics.push_back(elem.first);
80  }
81  }
82  else
83  {
84  topics = Split(getProperty<std::string>("TopicsToLog").getValue(), ",");
85  }
86  for (auto topic : topics)
87  {
88  float maxFrequency = -1.f;
89  if (Contains(topic, ":"))
90  {
91  auto topicAndFreq = Split(topic, ":");
92  maxFrequency = atof(topicAndFreq.at(1).c_str());
93  topic = topicAndFreq.at(0);
94  }
95  GenericTopicSubscriberPtr ptr(new GenericTopicSubscriber(this, topic, startTimestamp, maxFrequency));
96  Ice::ObjectPrx prx = getObjectAdapter()->addWithUUID(ptr);
97  getIceManager()->subscribeTopic(prx, topic, true);
98  topicsSubscribers[topic] = ptr;
99  }
100 
101 
102  if (getProperty<bool>("EnableRecording").getValue())
103  {
104  startRecording(getProperty<int>("Duration").getValue());
105  }
106  else
107  {
108  ARMARX_INFO << "automatic recording disabled.";
109  }
110  }
111 
112 
113 
114  void armarx::TopicRecorderComponent::startRecording(const int maxDuration, const Ice::Current& c)
115  {
116  std::lock_guard<std::mutex> lock(mutex);
117 
118  if (isRecordingEnabled)
119  {
120  ARMARX_WARNING << "topic recorder is already recording.";
121  return;
122  }
123 
124 
125  this->maxDuration = maxDuration;
126  outputfilePath = outputFilename;
127  if (getProperty<bool>("TimestampedFilename").getValue())
128  {
129  std::string time = IceUtil::Time::now().toDateTime();
130  time = simox::alg::replace_all(time, "/", "-");
131  time = simox::alg::replace_all(time, " ", "_");
132  time = simox::alg::replace_all(time, ":", "-");
133  outputfilePath = outputfilePath.parent_path() / (outputfilePath.stem().string() + "-" + time + outputfilePath.extension().string());
134  }
135 
136  std::string outputfilePathStr = outputfilePath.string();
137  ArmarXDataPath::ResolveHomePath(outputfilePathStr);
138  outputfilePath = outputfilePathStr;
139  ARMARX_IMPORTANT << "Writing to file '" << outputfilePath.string() << "'";
140 
141  //Check which storage mode to use and init the writer
142  std::string storageMode = getProperty<std::string>("StorageMode").getValue();
143  if (!storageMode.compare("file"))
144  {
145  writer.reset(new FileTopicWriter(outputfilePath));
146  }
147  else if (!storageMode.compare("database"))
148  {
149  writer.reset(new DatabaseTopicWriter(outputfilePath));
150  }
151  else
152  {
153  ARMARX_WARNING << "StorageMode " << storageMode << " is not supported (database, file). Falling back to default 'database' mode.";
154  writer.reset(new DatabaseTopicWriter(outputfilePath));
155  }
156 
157  startTime = armarx::TimeUtil::GetTime();
158 
159  for (auto kv : topicsSubscribers)
160  {
161  kv.second->setTime(startTime);
162  std::queue<TopicUtil::TopicData> data;
163  kv.second->getData(data);
164  }
165 
166  queueTask = new RunningTask<TopicRecorderComponent>(this, &TopicRecorderComponent::write, "WriterThread");
167  queueTask->start();
168 
169  isRecordingEnabled = true;
170 
171  topicRecoderListener->onStartRecording();
172  }
173 
174 
176  {
177  std::lock_guard<std::mutex> lock(mutex);
178 
179  if (!isRecordingEnabled)
180  {
181  ARMARX_WARNING << "topic recorder is not recording.";
182  return;
183  }
184 
185 
186  idleCondition.notify_all();
187  if (queueTask->isRunning())
188  {
189  queueTask->stop();
190  }
191 
192  ARMARX_IMPORTANT << "Writing log ";
193  writer.reset();
194  ARMARX_IMPORTANT << "Wrote everything to file '" << outputfilePath.string() << "'";
195 
196  isRecordingEnabled = false;
197  }
198 
199 
200 
202  {
203  stopRecording();
204  }
205 
207  {
208  topicsSubscribers.clear();
209  }
210 
212  {
213  return "TopicRecorder";
214  }
215 
217  {
218  auto sortFunc = [](const TopicUtil::TopicData & l, const TopicUtil::TopicData & r)
219  {
220  return (l.timestamp < r.timestamp);
221  };
222 
223  while (!queueTask->isStopped())
224  {
225  if (maxDuration)
226  {
227  if ((startTime + IceUtil::Time::seconds(maxDuration)) < armarx::TimeUtil::GetTime())
228  {
229  ARMARX_INFO << "max recording duration reached.";
230  stopRecording();
231  }
232  }
233 
234  std::unique_lock lock(queueMutex);
235 
236  std::vector<TopicUtil::TopicData> newDataOfAllTopics;
237  for (auto& elem : topicsSubscribers)
238  {
239  std::queue<TopicUtil::TopicData> data;
240  elem.second->getData(data);
241 
242  while (!data.empty())
243  {
244  newDataOfAllTopics.push_back(data.front());
245  data.pop();
246  }
247  }
248  std::sort(newDataOfAllTopics.begin(), newDataOfAllTopics.end(),
249  sortFunc);
250  for (auto& e : newDataOfAllTopics)
251  {
252  writer->write(e);
253  }
254  if (newDataOfAllTopics.empty())
255  {
256  usleep(10000);
257  }
258  }
259 
260  if (getProperty<int>("Duration").getValue() > 0)
261  {
262  getArmarXManager()->asyncShutdown();
263  }
264  }
265 
266 
267 
268 
269 } // namespace armarx
270 
271 
272 
273 
274 
armarx::ManagedIceObject::getIceManager
IceManagerPtr getIceManager() const
Returns the IceManager.
Definition: ManagedIceObject.cpp:353
armarx::TopicRecorderComponent::writer
TopicWriterInterfacePtr writer
Definition: TopicRecorder.h:133
ARMARX_IMPORTANT
#define ARMARX_IMPORTANT
Definition: Logging.h:183
armarx::TopicRecorderComponent::queueMutex
std::mutex queueMutex
Definition: TopicRecorder.h:139
armarx::TopicRecorderComponent::outputFilename
std::string outputFilename
Definition: TopicRecorder.h:143
armarx::ArmarXDataPath::ResolveHomePath
static void ResolveHomePath(std::string &path)
Resolves a path like ~/myfile.txt or $HOME/myfile.txt to /home/user/myfile.txt.
Definition: ArmarXDataPath.cpp:521
armarx::ManagedIceObject::getObjectAdapter
Ice::ObjectAdapterPtr getObjectAdapter() const
Returns object's Ice adapter.
Definition: ManagedIceObject.cpp:140
armarx::TopicRecorderComponent::maxDuration
int maxDuration
Definition: TopicRecorder.h:145
ArmarXManager.h
armarx::TopicRecorderComponent::topicRecoderListener
TopicRecorderListenerInterfacePrx topicRecoderListener
Definition: TopicRecorder.h:151
armarx::TopicRecorderComponent::onExitComponent
void onExitComponent() override
Hook for subclass.
Definition: TopicRecorder.cpp:206
armarx::Contains
bool Contains(const ContainerType &container, const ElementType &searchElement)
Definition: algorithm.h:295
armarx::GenericTopicSubscriber
Definition: GenericTopicSubscriber.h:38
armarx::TopicRecorderComponent::stopRecording
void stopRecording(const Ice::Current &c=Ice::emptyCurrent) override
Definition: TopicRecorder.cpp:175
armarx::Split
std::vector< std::string > Split(const std::string &source, const std::string &splitBy, bool trimElements=false, bool removeEmptyElements=false)
Definition: StringHelperTemplates.h:35
armarx::ManagedIceObject::getArmarXManager
ArmarXManagerPtr getArmarXManager() const
Returns the ArmarX manager used to add and remove components.
Definition: ManagedIceObject.cpp:348
armarx::TopicRecorderComponent::topicsSubscribers
std::map< std::string, GenericTopicSubscriberPtr > topicsSubscribers
Definition: TopicRecorder.h:138
armarx::TopicRecorderComponent::startRecording
void startRecording(const int maxDuration, const Ice::Current &c=Ice::emptyCurrent) override
Definition: TopicRecorder.cpp:114
c
constexpr T c
Definition: UnscentedKalmanFilterTest.cpp:43
armarx::TopicRecorderComponent::startTime
IceUtil::Time startTime
Definition: TopicRecorder.h:135
armarx::RunningTask
Definition: ArmarXMultipleObjectsScheduler.h:35
armarx::DatabaseTopicWriter
Definition: DatabaseTopicWriter.h:31
armarx::TopicRecorderComponent::isRecordingEnabled
bool isRecordingEnabled
Definition: TopicRecorder.h:147
IceInternal::Handle
Definition: forward_declarations.h:8
armarx::TopicUtil::TopicData
Definition: TopicUtil.h:34
FileTopicWriter.h
GenericTopicSubscriber.h
armarx::FileTopicWriter
Definition: FileTopicWriter.h:33
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
TopicRecorder.h
armarx::TopicRecorderComponent::write
void write()
Definition: TopicRecorder.cpp:216
armarx::ProxyType::topic
@ topic
armarx::TopicRecorderComponent::idleCondition
std::condition_variable idleCondition
Definition: TopicRecorder.h:140
armarx::TopicRecorderComponent::getDefaultName
std::string getDefaultName() const override
Retrieve default name of component.
Definition: TopicRecorder.cpp:211
armarx::TopicRecorderComponent::onConnectComponent
void onConnectComponent() override
Pure virtual hook for the subclass.
Definition: TopicRecorder.cpp:63
armarx::TimeUtil::GetTime
static IceUtil::Time GetTime(TimeMode timeMode=TimeMode::VirtualTime)
Get the current time.
Definition: TimeUtil.cpp:42
armarx::TopicUtil::TopicData::timestamp
IceUtil::Time timestamp
Definition: TopicUtil.h:43
armarx::Component::getConfigIdentifier
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition: Component.cpp:74
armarx::TopicRecorderComponent::onDisconnectComponent
void onDisconnectComponent() override
Hook for subclass.
Definition: TopicRecorder.cpp:201
TimeUtil.h
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:174
armarx::ManagedIceObject::offeringTopic
void offeringTopic(const std::string &name)
Registers a topic for retrival after initialization.
Definition: ManagedIceObject.cpp:290
armarx::TopicRecorderComponent::onInitComponent
void onInitComponent() override
Pure virtual hook for the subclass.
Definition: TopicRecorder.cpp:54
IceUtil::Handle< class PropertyDefinitionContainer >
armarx::TopicRecorderComponent::createPropertyDefinitions
PropertyDefinitionsPtr createPropertyDefinitions() override
Definition: TopicRecorder.cpp:42
armarx::TopicRecorderComponent::wakeUp
void wakeUp()
Definition: TopicRecorder.cpp:48
armarx::TopicRecorderComponent::queueTask
RunningTask< TopicRecorderComponent >::pointer_type queueTask
Definition: TopicRecorder.h:137
DatabaseTopicWriter.h
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:186
ArmarXDataPath.h
armarx::TopicRecorderProperties
Definition: TopicRecorder.h:40
armarx
This file offers overloads of toIce() and fromIce() functions for STL container types.
Definition: ArmarXTimeserver.cpp:28
armarx::TopicRecorderComponent::TopicRecorderComponent
TopicRecorderComponent()
Definition: TopicRecorder.cpp:37