TopicRecorder.cpp
Go to the documentation of this file.
1/*
2* This file is part of ArmarX.
3*
4* ArmarX is free software; you can redistribute it and/or modify
5* it under the terms of the GNU General Public License version 2 as
6* published by the Free Software Foundation.
7*
8* ArmarX is distributed in the hope that it will be useful, but
9* WITHOUT ANY WARRANTY; without even the implied warranty of
10* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11* GNU General Public License for more details.
12*
13* You should have received a copy of the GNU General Public License
14* along with this program. If not, see <http://www.gnu.org/licenses/>.
15*
16* @package ArmarX
17* @author Mirko Waechter( mirko.waechter at kit dot edu)
18* @date 2016
19* @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20* GNU General Public License
21*/
22#include "TopicRecorder.h"
23
24#include <fstream>
25
26#include <Ice/ObjectAdapter.h>
27#include <IceStorm/IceStorm.h> // for TopicPrx, TopicManagerPrx, etc
28
29#include <SimoxUtility/algorithm/string/string_tools.h>
30
34
35#include "DatabaseTopicWriter.h"
36#include "FileTopicWriter.h"
38
39namespace armarx
40{
41
46
52
53 void
55 {
56 idleCondition.notify_all();
57 }
58
59 void
61 {
62 isRecordingEnabled = false;
63 outputFilename = getProperty<std::string>("Outputfile").getValue();
64
65 offeringTopic("TopicRecorderListener");
66 }
67
68 void
70 {
72
73 auto allTopics = getIceManager()->getTopicManager()->retrieveAll();
74 for (auto& elem : allTopics)
75 {
76 ARMARX_INFO << "Topic: " << elem.first;
77 }
78
79 auto startTimestamp = TimeUtil::GetTime();
80 Ice::StringSeq topics;
81 if (getProperty<std::string>("TopicsToLog").getValue() == "*")
82 {
83 for (auto& elem : allTopics)
84 {
85 topics.push_back(elem.first);
86 }
87 }
88 else
89 {
90 topics = Split(getProperty<std::string>("TopicsToLog").getValue(), ",");
91 }
92 for (auto topic : topics)
93 {
94 float maxFrequency = -1.f;
95 if (Contains(topic, ":"))
96 {
97 auto topicAndFreq = Split(topic, ":");
98 maxFrequency = atof(topicAndFreq.at(1).c_str());
99 topic = topicAndFreq.at(0);
100 }
102 new GenericTopicSubscriber(this, topic, startTimestamp, maxFrequency));
103 Ice::ObjectPrx prx = getObjectAdapter()->addWithUUID(ptr);
104 getIceManager()->subscribeTopic(prx, topic, true);
106 }
107
108
109 if (getProperty<bool>("EnableRecording").getValue())
110 {
111 startRecording(getProperty<int>("Duration").getValue());
112 }
113 else
114 {
115 ARMARX_INFO << "automatic recording disabled.";
116 }
117 }
118
119 void
121 {
122 std::lock_guard<std::mutex> lock(mutex);
123
125 {
126 ARMARX_WARNING << "topic recorder is already recording.";
127 return;
128 }
129
130
131 this->maxDuration = maxDuration;
133 if (getProperty<bool>("TimestampedFilename").getValue())
134 {
135 std::string time = IceUtil::Time::now().toDateTime();
136 time = simox::alg::replace_all(time, "/", "-");
137 time = simox::alg::replace_all(time, " ", "_");
138 time = simox::alg::replace_all(time, ":", "-");
140 outputfilePath.parent_path() /
141 (outputfilePath.stem().string() + "-" + time + outputfilePath.extension().string());
142 }
143
144 std::string outputfilePathStr = outputfilePath.string();
145 ArmarXDataPath::ResolveHomePath(outputfilePathStr);
146 outputfilePath = outputfilePathStr;
147 ARMARX_IMPORTANT << "Writing to file '" << outputfilePath.string() << "'";
148
149 //Check which storage mode to use and init the writer
150 std::string storageMode = getProperty<std::string>("StorageMode").getValue();
151 if (!storageMode.compare("file"))
152 {
154 }
155 else if (!storageMode.compare("database"))
156 {
158 }
159 else
160 {
162 << "StorageMode " << storageMode
163 << " is not supported (database, file). Falling back to default 'database' mode.";
165 }
166
168
169 for (auto kv : topicsSubscribers)
170 {
171 kv.second->setTime(startTime);
172 std::queue<TopicUtil::TopicData> data;
173 kv.second->getData(data);
174 }
175
177 this, &TopicRecorderComponent::write, "WriterThread");
178 queueTask->start();
179
180 isRecordingEnabled = true;
181
182 topicRecoderListener->onStartRecording();
183 }
184
185 void
187 {
188 std::lock_guard<std::mutex> lock(mutex);
189
191 {
192 ARMARX_WARNING << "topic recorder is not recording.";
193 return;
194 }
195
196
197 idleCondition.notify_all();
198 if (queueTask->isRunning())
199 {
200 queueTask->stop();
201 }
202
203 ARMARX_IMPORTANT << "Writing log ";
204 writer.reset();
205 ARMARX_IMPORTANT << "Wrote everything to file '" << outputfilePath.string() << "'";
206
207 isRecordingEnabled = false;
208 }
209
210 void
215
216 void
221
222 std::string
224 {
225 return "TopicRecorder";
226 }
227
228 void
230 {
231 auto sortFunc = [](const TopicUtil::TopicData& l, const TopicUtil::TopicData& r)
232 { return (l.timestamp < r.timestamp); };
233
234 while (!queueTask->isStopped())
235 {
236 if (maxDuration)
237 {
238 if ((startTime + IceUtil::Time::seconds(maxDuration)) < armarx::TimeUtil::GetTime())
239 {
240 ARMARX_INFO << "max recording duration reached.";
242 }
243 }
244
245 std::unique_lock lock(queueMutex);
246
247 std::vector<TopicUtil::TopicData> newDataOfAllTopics;
248 for (auto& elem : topicsSubscribers)
249 {
250 std::queue<TopicUtil::TopicData> data;
251 elem.second->getData(data);
252
253 while (!data.empty())
254 {
255 newDataOfAllTopics.push_back(data.front());
256 data.pop();
257 }
258 }
259 std::sort(newDataOfAllTopics.begin(), newDataOfAllTopics.end(), sortFunc);
260 for (auto& e : newDataOfAllTopics)
261 {
262 writer->write(e);
263 }
264 if (newDataOfAllTopics.empty())
265 {
266 usleep(10000);
267 }
268 }
269
270 if (getProperty<int>("Duration").getValue() > 0)
271 {
272 getArmarXManager()->asyncShutdown();
273 }
274 }
275
276
277} // namespace armarx
constexpr T c
static void ResolveHomePath(std::string &path)
Resolves a path like ~/myfile.txt or $HOME/myfile.txt to /home/user/myfile.txt.
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition Component.cpp:90
Property< PropertyType > getProperty(const std::string &name)
void offeringTopic(const std::string &name)
Registers a topic for retrival after initialization.
TopicProxyType getTopic(const std::string &name)
Returns a proxy of the specified topic.
Ice::ObjectAdapterPtr getObjectAdapter() const
Returns object's Ice adapter.
IceManagerPtr getIceManager() const
Returns the IceManager.
ArmarXManagerPtr getArmarXManager() const
Returns the ArmarX manager used to add and remove components.
static IceUtil::Time GetTime(TimeMode timeMode=TimeMode::VirtualTime)
Get the current time.
Definition TimeUtil.cpp:42
void onInitComponent() override
Pure virtual hook for the subclass.
std::condition_variable idleCondition
RunningTask< TopicRecorderComponent >::pointer_type queueTask
void startRecording(const int maxDuration, const Ice::Current &c=Ice::emptyCurrent) override
void onDisconnectComponent() override
Hook for subclass.
std::map< std::string, GenericTopicSubscriberPtr > topicsSubscribers
void stopRecording(const Ice::Current &c=Ice::emptyCurrent) override
TopicRecorderListenerInterfacePrx topicRecoderListener
void onConnectComponent() override
Pure virtual hook for the subclass.
PropertyDefinitionsPtr createPropertyDefinitions() override
std::filesystem::path outputfilePath
void onExitComponent() override
Hook for subclass.
std::string getDefaultName() const override
Retrieve default name of component.
TopicWriterInterfacePtr writer
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
Definition Logging.h:190
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceInternal::Handle< GenericTopicSubscriber > GenericTopicSubscriberPtr
std::vector< std::string > Split(const std::string &source, const std::string &splitBy, bool trimElements=false, bool removeEmptyElements=false)
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
bool Contains(const ContainerType &container, const ElementType &searchElement)
Definition algorithm.h:330