StreamProviderI.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package
19 * @author
20 * @date
21 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22 * GNU General Public License
23 */
24#include "StreamProviderI.h"
25
26#include <cstdint>
27#include <filesystem>
28#include <iostream>
29
31
32#include <VisionX/interface/core/ImageProviderInterface.h>
35
36#include <Image/ImageProcessor.h>
37
38namespace Stream
39{
40
42 std::string prefix) :
44 {
46 "imageProviderProxyName", "TestImageProvider", "Name of the image provider");
48 "imageStreamTopicName", "ImageStream", "Name of the image streaming topic");
49 defineOptionalProperty<std::string>("h264Preset", "veryfast", "Preset for the h264 codec");
50 defineOptionalProperty<std::string>("h264Profile", "Main", "profile for the h264 codec");
51 // defineOptionalProperty<std::string>("OfferedSources", "Camera,Test", "Names of sources that are to be offered. For Identification on Receiver Side. seperated by comma, space or tab");
52 // defineOptionalProperty<int>("ImageWidth", 640, "Image width");
53 // defineOptionalProperty<int>("ImageHeight", 480, "Image height");
54 defineOptionalProperty<float>("CompressionRate", 25, "QRC value of h264 codec");
55 defineOptionalProperty<CodecType>("Codec", eH264, "Codec used for compression")
56 .setCaseInsensitive(true)
57 .map("h264", eH264);
59 "Framerate",
60 25,
61 "Framerate at which the input images are encoded. Slower or fasting pushing of input "
62 "is possible. Frames will be skipped then. ");
63 }
64
66 {
67 convertCtx = nullptr;
68 encoder = nullptr;
69 // encodedImageBufferSize = 10000;
70 // encodedImageBuffer.set_capacity(encodedImageBufferSize);
71 frameCounter = 0;
72 srand(static_cast<unsigned int>(IceUtil::Time::now().toMicroSeconds()));
73 }
74
75 /// Is called once initialization of the ManagedIceObject is done.
76 void
78 {
79 fps = getProperty<float>("Framerate");
80 Logging::setTag("StreamProvider");
81 offeringTopicFromProperty("imageStreamTopicName");
82 // Start the timer task for the fps calcuation:
83 fpsCalculator = new armarx::PeriodicTask<StreamProviderI>(
84 this, &StreamProviderI::calculateFps, 995, true, "FPSCalcThread");
85 fpsCalculator->start();
86
87 imageProviderProxyName = getProperty<std::string>("imageProviderProxyName").getValue();
88 usingImageProvider(imageProviderProxyName);
89 setFramerate(fps);
90 // processTask = new armarx::PeriodicTask<StreamProviderI>(this, &StreamProviderI::processImage, 1000.0 / fps, true, "ImageEncoding", false);
91
92 usingTopic("TopicRecorderListener");
93 }
94
95 /// Is called once all dependencies of the object have been resolved and Ice connection is established.
96 void
98 {
99 getTopicFromProperty(listener, "imageStreamTopicName");
100
101 imageProviderProxy = getProxy<visionx::ImageProviderInterfacePrx>(imageProviderProxyName);
102 visionx::ImageProviderInfo imageProviderInfo = getImageProvider(imageProviderProxyName);
103
104 imageFormat = imageProviderInfo.imageFormat;
105 if (imageFormat.type != visionx::eRgb)
106 {
107 throw armarx::LocalException("The StreamProvider supports only RGB at the moment.");
108 }
109 numberImages = imageProviderInfo.numberImages;
110 imgWidth = imageFormat.dimension.width;
111 imgHeight = imageFormat.dimension.height;
112 encodedImgHeight = imgHeight * numberImages;
113 imgType = imageFormat.type;
114
115
116 ppInputImages = new CByteImage*[size_t(numberImages)];
117 for (int i = 0; i < numberImages; i++)
118 {
119 ppInputImages[i] = visionx::tools::createByteImage(imageProviderInfo);
120 }
121
122
123 pImageForEncoder =
124 new CByteImage(imageProviderInfo.imageFormat.dimension.width,
125 imageProviderInfo.imageFormat.dimension.height * numberImages,
126 visionx::tools::convert(imageProviderInfo.imageFormat.type));
127
128
129 x264_param_default_preset(
130 &param, getProperty<std::string>("h264Preset").getValue().c_str(), "zerolatency");
131 param.i_threads = 1;
132 param.i_width = imgWidth;
133 param.i_height = encodedImgHeight;
134 param.i_fps_num = uint32_t(fps);
135 param.i_fps_den = 1;
136 // Intra refres:
137 param.i_keyint_max = int(fps);
138 param.b_intra_refresh = 1;
139 //Rate control:
140 // param.rc.i_qp_constant = 51;
141 param.rc.i_rc_method = X264_RC_CRF;
142 param.rc.f_rf_constant = getProperty<float>("CompressionRate");
143 param.rc.f_rf_constant_max = param.rc.f_rf_constant * 1.4f;
144
145 // param.rc.i_qp_constant = 18;
146 // param.rc.i_qp_min = 18;
147 // param.rc.i_qp_max = 18;
148
149 //For streaming:
150 param.b_repeat_headers = 1;
151 param.b_annexb = 1;
152 if (x264_param_apply_profile(
153 &param, getProperty<std::string>("h264Profile").getValue().c_str()) != 0)
154 {
155 ARMARX_WARNING << "Could not set '"
156 << getProperty<std::string>("h264Profile").getValue()
157 << "' profile for x264 codec";
158 }
159
160
161 startCapture();
162 }
163
164 /// Is called if a dependency of the object got lost (crash, network error, stopped, ...)
165 void
167 {
168 stopCapture();
169
170 std::cout << "exiting StreamProviderI" << std::endl;
171
172 //stop the timer task for the fps calcuation:
173 fpsCalculator->stop();
174
175
176 if (ppInputImages)
177 {
178 for (int i = 0; i < numberImages; i++)
179 {
180 delete ppInputImages[i];
181 }
182 delete[] ppInputImages;
183 }
184 }
185
186 /// Is called once the component terminates.
187 void
191
192 void
194 {
195 if (!capturing)
196 {
197 return;
198 }
199 if (!waitForImages())
200 {
201 ARMARX_VERBOSE << "No images from provider available.";
202 }
203 else
204 {
205 armarx::MetaInfoSizeBasePtr info;
206 getImages(imageProviderProxyName, ppInputImages, info);
207
208
209 for (int i = 0; i < numberImages; ++i)
210 {
211 int imageByteSize = ppInputImages[i]->width * ppInputImages[i]->height *
212 ppInputImages[i]->bytesPerPixel;
213 memcpy(pImageForEncoder->pixels + i * imageByteSize,
214 ppInputImages[i]->pixels,
215 size_t(imageByteSize));
216 }
217 int srcstride = imgWidth * 3; // RGB stride is just 3*width
218 const uint8_t* pixels = pImageForEncoder->pixels;
219 sws_scale(convertCtx,
220 &pixels,
221 &srcstride,
222 0,
223 encodedImgHeight,
224 pic_in.img.plane,
225 pic_in.img.i_stride);
226 x264_nal_t* nals;
227 int i_nals;
228 int frameSize = x264_encoder_encode(encoder, &nals, &i_nals, &pic_in, &pic_out);
229 if (frameSize >= 0)
230 {
231 ARMARX_DEBUG << deactivateSpam(1) << "encoded image: " << VAROUT(frameSize);
232
233 frameCounter++;
234
235 DataChunk chunk;
236 chunk.reserve(size_t(frameSize));
237 chunk.assign(nals->p_payload, nals->p_payload + frameSize);
238 listener->reportNewStreamData(chunk, info->timeProvided);
239 }
240 }
241 }
242
243 /**
244 * Retrieve default name of component
245 *
246 * @return default name of the component
247 */
248 std::string
250 {
251 return "StreamProvider";
252 }
253
254 bool
255 StreamProviderI::startCapture(const ::Ice::Current&)
256 {
257
258 if (capturing)
259 {
260 stopCapture();
261 }
262 encoder = x264_encoder_open(&param);
263
264 x264_picture_alloc(&pic_in, X264_CSP_I420, imgWidth, encodedImgHeight);
265
266 convertCtx = sws_getContext(imgWidth,
267 encodedImgHeight,
268 AV_PIX_FMT_RGB24,
269 imgWidth,
270 encodedImgHeight,
271 AV_PIX_FMT_YUV420P,
272 SWS_FAST_BILINEAR,
273 nullptr,
274 nullptr,
275 nullptr);
276
277 capturing = true;
278
279 return true;
280 }
281
282 void
283 StreamProviderI::stopCapture(const ::Ice::Current&)
284 {
285 capturing = false;
286
287 if (encoder)
288 {
289 x264_picture_clean(&pic_in);
290 memset(reinterpret_cast<char*>(&pic_in), 0, sizeof(pic_in));
291 memset(reinterpret_cast<char*>(&pic_out), 0, sizeof(pic_out));
292
293 x264_encoder_close(encoder);
294 encoder = nullptr;
295 }
296
297 if (convertCtx)
298 {
299 sws_freeContext(convertCtx);
300 convertCtx = nullptr;
301 }
302 }
303
304 CodecType
305 StreamProviderI::getCodecType(const Ice::Current&)
306 {
307 return getProperty<CodecType>("Codec").getValue();
308 }
309
310 void
311 StreamProviderI::calculateFps()
312 {
313 // StreamSourceMap::iterator it = streamSources.begin();
314 // for(; it != streamSources.end(); it++)
315 // {
316 // it->second->fps = it->second->fetchedChunks;//0.2*fetchedChunks+0.8*fps; //calculate the new fps as weighted medium
317 // it->second->fps = it->second->fps > 10 ? (it->second->fps+2) : 10; //set the fps to a minimum of 10, increase the fps, so that slightly more images can be pushed in than pulled out
318 // armarx::ScopedLock lock(it->second->mutex);
319 // it->second->fetchedChunks = 0;
320 // }
321 }
322
323 void
325 int& imageHeight,
326 int& imageType,
327 const Ice::Current&)
328 {
329 imageWidth = imgWidth;
330 imageHeight = imgHeight;
331 imageType = imgType;
332 }
333
334 int
336 {
337 return numberImages;
338 }
339
340 void
341 StreamProviderI::setCompressionRate(CompressionRate, const Ice::Current&)
342 {
343 ARMARX_WARNING << "Not yet implemented";
344 }
345
346 void
348 {
349 startCapture();
350 }
351
352} // namespace Stream
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
#define VAROUT(x)
void onStartRecording(const ::Ice::Current &=Ice::emptyCurrent) override
bool startCapture(const ::Ice::Current &=Ice::emptyCurrent) override
starts the capture for the given source
void onConnectImageProcessor() override
Is called once all dependencies of the object have been resolved and Ice connection is established.
void setCompressionRate(::Stream::CompressionRate=COMPRESSIONHIGH, const ::Ice::Current &=Ice::emptyCurrent) override
void onExitImageProcessor() override
Is called once the component terminates.
int getNumberOfImages(const ::Ice::Current &=Ice::emptyCurrent) override
void process() override
Process the vision component.
CodecType getCodecType(const Ice::Current &) override
void onInitImageProcessor() override
Is called once initialization of the ManagedIceObject is done.
void stopCapture(const ::Ice::Current &=Ice::emptyCurrent) override
void getImageInformation(int &imageWidth, int &imageHeight, int &imageType, const Ice::Current &c=Ice::emptyCurrent) override
std::string getDefaultName() const override
Retrieve default name of component.
virtual void onDisConnectImageProcessor()
Is called if a dependency of the object got lost (crash, network error, stopped, ....
TopicProxyType getTopicFromProperty(const std::string &propertyName)
Get a topic proxy whose name is specified by the given property.
Definition Component.h:221
void offeringTopicFromProperty(const std::string &propertyName)
Offer a topic whose name is specified by the given property.
Property< PropertyType > getProperty(const std::string &name)
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Ice::ObjectPrx getProxy(long timeoutMs=0, bool waitForScheduler=true) const
Returns the proxy of this object (optionally it waits for the proxy)
The periodic task executes one thread method repeatedly using the time period specified in the constr...
std::string prefix
Prefix of the properties such as namespace, domain, component name, etc.
PropertyDefinition< PropertyType > & defineOptionalProperty(const std::string &name, PropertyType defaultValue, const std::string &description="", PropertyDefinitionBase::PropertyConstness constness=PropertyDefinitionBase::eConstant)
ImageProcessorPropertyDefinitions(std::string prefix)
void usingImageProvider(std::string name)
Registers a delayed topic subscription and a delayed provider proxy retrieval which all will be avail...
bool waitForImages(int milliseconds=1000)
Wait for new images.
ImageProviderInfo getImageProvider(std::string name, ImageType destinationImageType=eRgb, bool waitForProxy=false)
Select an ImageProvider.
int getImages(CByteImage **ppImages)
Poll images from provider.
int numberImages
Number of images.
ImageFormatInfo imageFormat
Image format struct that contains all necessary image information.
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
CByteImage * createByteImage(const ImageFormatInfo &imageFormat, const ImageType imageType)
Creates a ByteImage for the destination type specified in the given imageProviderInfo.
CByteImage::ImageType convert(const ImageType visionxImageType)
Converts a VisionX image type into an image type of IVT's ByteImage.
ArmarX headers.