StreamProviderI.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5  *
6  * ArmarX is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 as
8  * published by the Free Software Foundation.
9  *
10  * ArmarX is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  * @package
19  * @author
20  * @date
21  * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22  * GNU General Public License
23  */
24 #include "StreamProviderI.h"
25 
27 
28 #include <filesystem>
29 #include <VisionX/interface/core/ImageProviderInterface.h>
31 
33 
34 #include <Image/ImageProcessor.h>
35 
36 #include <iostream>
37 #include <cstdint>
38 
39 namespace Stream
40 {
41 
43  visionx::ImageProcessorPropertyDefinitions(prefix)
44  {
45  defineOptionalProperty<std::string>("imageProviderProxyName", "TestImageProvider", "Name of the image provider");
46  defineOptionalProperty<std::string>("imageStreamTopicName", "ImageStream", "Name of the image streaming topic");
47  defineOptionalProperty<std::string>("h264Preset", "veryfast", "Preset for the h264 codec");
48  defineOptionalProperty<std::string>("h264Profile", "Main", "profile for the h264 codec");
49  // defineOptionalProperty<std::string>("OfferedSources", "Camera,Test", "Names of sources that are to be offered. For Identification on Receiver Side. seperated by comma, space or tab");
50  // defineOptionalProperty<int>("ImageWidth", 640, "Image width");
51  // defineOptionalProperty<int>("ImageHeight", 480, "Image height");
52  defineOptionalProperty<float>("CompressionRate", 25, "QRC value of h264 codec");
53  defineOptionalProperty<CodecType>("Codec", eH264, "Codec used for compression")
54  .setCaseInsensitive(true)
55  .map("h264", eH264);
56  defineOptionalProperty<float>("Framerate", 25, "Framerate at which the input images are encoded. Slower or fasting pushing of input is possible. Frames will be skipped then. ");
57  }
58 
59 
61  {
62  convertCtx = nullptr;
63  encoder = nullptr;
64  // encodedImageBufferSize = 10000;
65  // encodedImageBuffer.set_capacity(encodedImageBufferSize);
66  frameCounter = 0;
67  srand(static_cast<unsigned int>(IceUtil::Time::now().toMicroSeconds()));
68  }
69 
70 
71  /// Is called once initialization of the ManagedIceObject is done.
73  {
74  fps = getProperty<float>("Framerate");
75  Logging::setTag("StreamProvider");
76  offeringTopicFromProperty("imageStreamTopicName");
77  // Start the timer task for the fps calcuation:
78  fpsCalculator = new armarx::PeriodicTask<StreamProviderI>(this, &StreamProviderI::calculateFps, 995, true, "FPSCalcThread");
79  fpsCalculator->start();
80 
81  imageProviderProxyName = getProperty<std::string>("imageProviderProxyName").getValue();
82  usingImageProvider(imageProviderProxyName);
83  setFramerate(fps);
84  // processTask = new armarx::PeriodicTask<StreamProviderI>(this, &StreamProviderI::processImage, 1000.0 / fps, true, "ImageEncoding", false);
85 
86  usingTopic("TopicRecorderListener");
87  }
88 
89 
90  /// Is called once all dependencies of the object have been resolved and Ice connection is established.
92  {
93  getTopicFromProperty(listener, "imageStreamTopicName");
94 
95  imageProviderProxy = getProxy<visionx::ImageProviderInterfacePrx>(imageProviderProxyName);
96  visionx::ImageProviderInfo imageProviderInfo = getImageProvider(imageProviderProxyName);
97 
98  imageFormat = imageProviderInfo.imageFormat;
99  if (imageFormat.type != visionx::eRgb)
100  {
101  throw armarx::LocalException("The StreamProvider supports only RGB at the moment.");
102  }
103  numberImages = imageProviderInfo.numberImages;
104  imgWidth = imageFormat.dimension.width;
105  imgHeight = imageFormat.dimension.height;
106  encodedImgHeight = imgHeight * numberImages;
107  imgType = imageFormat.type;
108 
109 
110  ppInputImages = new CByteImage*[size_t(numberImages)];
111  for (int i = 0; i < numberImages ; i++)
112  {
113  ppInputImages[i] = visionx::tools::createByteImage(imageProviderInfo);
114  }
115 
116 
117  pImageForEncoder = new CByteImage(imageProviderInfo.imageFormat.dimension.width,
118  imageProviderInfo.imageFormat.dimension.height * numberImages,
119  visionx::tools::convert(imageProviderInfo.imageFormat.type));
120 
121 
122  x264_param_default_preset(&param, getProperty<std::string>("h264Preset").getValue().c_str(), "zerolatency");
123  param.i_threads = 1;
124  param.i_width = imgWidth;
125  param.i_height = encodedImgHeight;
126  param.i_fps_num = uint32_t(fps);
127  param.i_fps_den = 1;
128  // Intra refres:
129  param.i_keyint_max = int(fps);
130  param.b_intra_refresh = 1;
131  //Rate control:
132  // param.rc.i_qp_constant = 51;
133  param.rc.i_rc_method = X264_RC_CRF;
134  param.rc.f_rf_constant = getProperty<float>("CompressionRate");
135  param.rc.f_rf_constant_max = param.rc.f_rf_constant * 1.4f;
136 
137  // param.rc.i_qp_constant = 18;
138  // param.rc.i_qp_min = 18;
139  // param.rc.i_qp_max = 18;
140 
141  //For streaming:
142  param.b_repeat_headers = 1;
143  param.b_annexb = 1;
144  if (x264_param_apply_profile(&param, getProperty<std::string>("h264Profile").getValue().c_str()) != 0)
145  {
146  ARMARX_WARNING << "Could not set '" << getProperty<std::string>("h264Profile").getValue() << "' profile for x264 codec";
147  }
148 
149 
150  startCapture();
151  }
152 
153  /// Is called if a dependency of the object got lost (crash, network error, stopped, ...)
155  {
156  stopCapture();
157 
158  std::cout << "exiting StreamProviderI" << std::endl;
159 
160  //stop the timer task for the fps calcuation:
161  fpsCalculator->stop();
162 
163 
164  if (ppInputImages)
165  {
166  for (int i = 0; i < numberImages ; i++)
167  {
168  delete ppInputImages[i];
169  }
170  delete[] ppInputImages;
171  }
172  }
173 
174  /// Is called once the component terminates.
176  {
177 
178 
179  }
180 
182  {
183  if (!capturing)
184  {
185  return;
186  }
187  if (!waitForImages())
188  {
189  ARMARX_VERBOSE << "No images from provider available.";
190  }
191  else
192  {
193  armarx::MetaInfoSizeBasePtr info;
194  getImages(imageProviderProxyName, ppInputImages, info);
195 
196 
197  for (int i = 0; i < numberImages; ++i)
198  {
199  int imageByteSize = ppInputImages[i]->width * ppInputImages[i]->height * ppInputImages[i]->bytesPerPixel;
200  memcpy(pImageForEncoder->pixels + i * imageByteSize, ppInputImages[i]->pixels, size_t(imageByteSize));
201  }
202  int srcstride = imgWidth * 3; // RGB stride is just 3*width
203  const uint8_t* pixels = pImageForEncoder->pixels;
204  sws_scale(convertCtx, &pixels, &srcstride, 0, encodedImgHeight, pic_in.img.plane, pic_in.img.i_stride);
205  x264_nal_t* nals;
206  int i_nals;
207  int frameSize = x264_encoder_encode(encoder, &nals, &i_nals, &pic_in, &pic_out);
208  if (frameSize >= 0)
209  {
210  ARMARX_DEBUG << deactivateSpam(1) << "encoded image: " << VAROUT(frameSize);
211 
212  frameCounter++;
213 
214  DataChunk chunk;
215  chunk.reserve(size_t(frameSize));
216  chunk.assign(nals->p_payload, nals->p_payload + frameSize);
217  listener->reportNewStreamData(chunk, info->timeProvided);
218  }
219  }
220  }
221 
222 
223 
224 
225  /**
226  * Retrieve default name of component
227  *
228  * @return default name of the component
229  */
231  {
232  return "StreamProvider";
233  }
234 
235 
236 
237  bool StreamProviderI::startCapture(const ::Ice::Current&)
238  {
239 
240  if (capturing)
241  {
242  stopCapture();
243 
244  }
245  encoder = x264_encoder_open(&param);
246 
247  x264_picture_alloc(&pic_in, X264_CSP_I420, imgWidth, encodedImgHeight);
248 
249  convertCtx = sws_getContext(imgWidth, encodedImgHeight, AV_PIX_FMT_RGB24, imgWidth, encodedImgHeight,
250  AV_PIX_FMT_YUV420P, SWS_FAST_BILINEAR, nullptr, nullptr, nullptr);
251 
252  capturing = true;
253 
254  return true;
255  }
256 
257  void StreamProviderI::stopCapture(const ::Ice::Current&)
258  {
259  capturing = false;
260 
261  if (encoder)
262  {
263  x264_picture_clean(&pic_in);
264  memset(reinterpret_cast<char*>(&pic_in), 0, sizeof(pic_in));
265  memset(reinterpret_cast<char*>(&pic_out), 0, sizeof(pic_out));
266 
267  x264_encoder_close(encoder);
268  encoder = nullptr;
269  }
270 
271  if (convertCtx)
272  {
273  sws_freeContext(convertCtx);
274  convertCtx = nullptr;
275  }
276  }
277 
278 
279 
280  CodecType StreamProviderI::getCodecType(const Ice::Current&)
281  {
282  return getProperty<CodecType>("Codec").getValue();
283  }
284 
285 
286 
287 
288 
289  void StreamProviderI::calculateFps()
290  {
291  // StreamSourceMap::iterator it = streamSources.begin();
292  // for(; it != streamSources.end(); it++)
293  // {
294  // it->second->fps = it->second->fetchedChunks;//0.2*fetchedChunks+0.8*fps; //calculate the new fps as weighted medium
295  // it->second->fps = it->second->fps > 10 ? (it->second->fps+2) : 10; //set the fps to a minimum of 10, increase the fps, so that slightly more images can be pushed in than pulled out
296  // armarx::ScopedLock lock(it->second->mutex);
297  // it->second->fetchedChunks = 0;
298  // }
299  }
300 
301 
302 
303 
304  void StreamProviderI::getImageInformation(int& imageWidth, int& imageHeight, int& imageType, const Ice::Current&)
305  {
306  imageWidth = imgWidth;
307  imageHeight = imgHeight;
308  imageType = imgType;
309  }
310 
311  int StreamProviderI::getNumberOfImages(const Ice::Current&)
312  {
313  return numberImages;
314  }
315 
316  void StreamProviderI::setCompressionRate(CompressionRate, const Ice::Current&)
317  {
318  ARMARX_WARNING << "Not yet implemented";
319  }
320 
321 
322  void StreamProviderI::onStartRecording(const Ice::Current&)
323  {
324  startCapture();
325  }
326 
327 }
ARMARX_VERBOSE
#define ARMARX_VERBOSE
Definition: Logging.h:180
Stream::StreamProviderI::getDefaultName
std::string getDefaultName() const override
Retrieve default name of component.
Definition: StreamProviderI.cpp:230
Stream::StreamProviderI::stopCapture
void stopCapture(const ::Ice::Current &=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:257
visionx::ImageProviderInfo::numberImages
int numberImages
Number of images.
Definition: ImageProcessor.h:506
visionx
ArmarX headers.
Definition: OpenPoseStressTest.h:38
Stream::StreamProviderI::getImageInformation
void getImageInformation(int &imageWidth, int &imageHeight, int &imageType, const Ice::Current &c=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:304
Stream::StreamProviderI::onDisConnectImageProcessor
virtual void onDisConnectImageProcessor()
Is called if a dependency of the object got lost (crash, network error, stopped, ....
Definition: StreamProviderI.cpp:154
visionx::ImageProcessor::getImageProvider
ImageProviderInfo getImageProvider(std::string name, ImageType destinationImageType=eRgb, bool waitForProxy=false)
Select an ImageProvider.
Definition: ImageProcessor.cpp:152
armarx::Component::offeringTopicFromProperty
void offeringTopicFromProperty(const std::string &propertyName)
Offer a topic whose name is specified by the given property.
Definition: Component.cpp:154
Stream::StreamProviderI::onStartRecording
void onStartRecording(const ::Ice::Current &=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:322
visionx::ImageProviderInfo::imageFormat
ImageFormatInfo imageFormat
Image format struct that contains all necessary image information.
Definition: ImageProcessor.h:496
visionx::tools::createByteImage
CByteImage * createByteImage(const ImageFormatInfo &imageFormat, const ImageType imageType)
Creates a ByteImage for the destination type specified in the given imageProviderInfo.
armarx::Component::getTopicFromProperty
TopicProxyType getTopicFromProperty(const std::string &propertyName)
Get a topic proxy whose name is specified by the given property.
Definition: Component.h:218
visionx::ImageProviderInfo
Definition: ImageProcessor.h:466
visionx::tools::convert
CByteImage::ImageType convert(const ImageType visionxImageType)
Converts a VisionX image type into an image type of IVT's ByteImage.
Definition: TypeMapping.cpp:95
Stream::StreamProviderI::onInitImageProcessor
void onInitImageProcessor() override
Is called once initialization of the ManagedIceObject is done.
Definition: StreamProviderI.cpp:72
Stream::StreamProviderI::startCapture
bool startCapture(const ::Ice::Current &=Ice::emptyCurrent) override
starts the capture for the given source
Definition: StreamProviderI.cpp:237
Stream::StreamProviderI::StreamProviderPropertyDefinitions::StreamProviderPropertyDefinitions
StreamProviderPropertyDefinitions(std::string prefix)
Definition: StreamProviderI.cpp:42
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:177
Stream::StreamProviderI::getCodecType
CodecType getCodecType(const Ice::Current &) override
Definition: StreamProviderI.cpp:280
Stream
Definition: StreamProviderI.cpp:39
visionx::ImageProcessor::usingImageProvider
void usingImageProvider(std::string name)
Registers a delayed topic subscription and a delayed provider proxy retrieval which all will be avail...
Definition: ImageProcessor.cpp:117
Stream::StreamProviderI::process
void process() override
Process the vision component.
Definition: StreamProviderI.cpp:181
visionx::ImageProcessor::getImages
int getImages(CByteImage **ppImages)
Poll images from provider.
Definition: ImageProcessor.cpp:351
StreamProviderI.h
visionx::ImageProcessor::setFramerate
void setFramerate(float fps)
Definition: ImageProcessor.cpp:682
Stream::StreamProviderI::onExitImageProcessor
void onExitImageProcessor() override
Is called once the component terminates.
Definition: StreamProviderI.cpp:175
armarx::ManagedIceObject::usingTopic
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Definition: ManagedIceObject.cpp:248
Stream::StreamProviderI::StreamProviderI
StreamProviderI()
Definition: StreamProviderI.cpp:60
VAROUT
#define VAROUT(x)
Definition: StringHelpers.h:182
ImageUtil.h
Stream::StreamProviderI::getNumberOfImages
int getNumberOfImages(const ::Ice::Current &=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:311
armarx::Logging::deactivateSpam
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition: Logging.cpp:92
TypeMapping.h
Stream::StreamProviderI::onConnectImageProcessor
void onConnectImageProcessor() override
Is called once all dependencies of the object have been resolved and Ice connection is established.
Definition: StreamProviderI.cpp:91
armarx::PeriodicTask
Definition: ArmarXManager.h:70
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:186
Stream::StreamProviderI::setCompressionRate
void setCompressionRate(::Stream::CompressionRate=COMPRESSIONHIGH, const ::Ice::Current &=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:316
Exception.h
visionx::ImageProcessor::waitForImages
bool waitForImages(int milliseconds=1000)
Wait for new images.
Definition: ImageProcessor.cpp:275