StreamProviderI.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of ArmarX.
3  *
4  * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5  *
6  * ArmarX is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 as
8  * published by the Free Software Foundation.
9  *
10  * ArmarX is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  * @package
19  * @author
20  * @date
21  * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22  * GNU General Public License
23  */
24 #include "StreamProviderI.h"
25 
26 #include <cstdint>
27 #include <filesystem>
28 #include <iostream>
29 
31 
32 #include <VisionX/interface/core/ImageProviderInterface.h>
35 
36 #include <Image/ImageProcessor.h>
37 
38 namespace Stream
39 {
40 
42  std::string prefix) :
43  visionx::ImageProcessorPropertyDefinitions(prefix)
44  {
45  defineOptionalProperty<std::string>(
46  "imageProviderProxyName", "TestImageProvider", "Name of the image provider");
47  defineOptionalProperty<std::string>(
48  "imageStreamTopicName", "ImageStream", "Name of the image streaming topic");
49  defineOptionalProperty<std::string>("h264Preset", "veryfast", "Preset for the h264 codec");
50  defineOptionalProperty<std::string>("h264Profile", "Main", "profile for the h264 codec");
51  // defineOptionalProperty<std::string>("OfferedSources", "Camera,Test", "Names of sources that are to be offered. For Identification on Receiver Side. seperated by comma, space or tab");
52  // defineOptionalProperty<int>("ImageWidth", 640, "Image width");
53  // defineOptionalProperty<int>("ImageHeight", 480, "Image height");
54  defineOptionalProperty<float>("CompressionRate", 25, "QRC value of h264 codec");
55  defineOptionalProperty<CodecType>("Codec", eH264, "Codec used for compression")
56  .setCaseInsensitive(true)
57  .map("h264", eH264);
58  defineOptionalProperty<float>(
59  "Framerate",
60  25,
61  "Framerate at which the input images are encoded. Slower or fasting pushing of input "
62  "is possible. Frames will be skipped then. ");
63  }
64 
66  {
67  convertCtx = nullptr;
68  encoder = nullptr;
69  // encodedImageBufferSize = 10000;
70  // encodedImageBuffer.set_capacity(encodedImageBufferSize);
71  frameCounter = 0;
72  srand(static_cast<unsigned int>(IceUtil::Time::now().toMicroSeconds()));
73  }
74 
75  /// Is called once initialization of the ManagedIceObject is done.
76  void
78  {
79  fps = getProperty<float>("Framerate");
80  Logging::setTag("StreamProvider");
81  offeringTopicFromProperty("imageStreamTopicName");
82  // Start the timer task for the fps calcuation:
83  fpsCalculator = new armarx::PeriodicTask<StreamProviderI>(
84  this, &StreamProviderI::calculateFps, 995, true, "FPSCalcThread");
85  fpsCalculator->start();
86 
87  imageProviderProxyName = getProperty<std::string>("imageProviderProxyName").getValue();
88  usingImageProvider(imageProviderProxyName);
89  setFramerate(fps);
90  // processTask = new armarx::PeriodicTask<StreamProviderI>(this, &StreamProviderI::processImage, 1000.0 / fps, true, "ImageEncoding", false);
91 
92  usingTopic("TopicRecorderListener");
93  }
94 
95  /// Is called once all dependencies of the object have been resolved and Ice connection is established.
96  void
98  {
99  getTopicFromProperty(listener, "imageStreamTopicName");
100 
101  imageProviderProxy = getProxy<visionx::ImageProviderInterfacePrx>(imageProviderProxyName);
102  visionx::ImageProviderInfo imageProviderInfo = getImageProvider(imageProviderProxyName);
103 
104  imageFormat = imageProviderInfo.imageFormat;
105  if (imageFormat.type != visionx::eRgb)
106  {
107  throw armarx::LocalException("The StreamProvider supports only RGB at the moment.");
108  }
109  numberImages = imageProviderInfo.numberImages;
110  imgWidth = imageFormat.dimension.width;
111  imgHeight = imageFormat.dimension.height;
112  encodedImgHeight = imgHeight * numberImages;
113  imgType = imageFormat.type;
114 
115 
116  ppInputImages = new CByteImage*[size_t(numberImages)];
117  for (int i = 0; i < numberImages; i++)
118  {
119  ppInputImages[i] = visionx::tools::createByteImage(imageProviderInfo);
120  }
121 
122 
123  pImageForEncoder =
124  new CByteImage(imageProviderInfo.imageFormat.dimension.width,
125  imageProviderInfo.imageFormat.dimension.height * numberImages,
126  visionx::tools::convert(imageProviderInfo.imageFormat.type));
127 
128 
129  x264_param_default_preset(
130  &param, getProperty<std::string>("h264Preset").getValue().c_str(), "zerolatency");
131  param.i_threads = 1;
132  param.i_width = imgWidth;
133  param.i_height = encodedImgHeight;
134  param.i_fps_num = uint32_t(fps);
135  param.i_fps_den = 1;
136  // Intra refres:
137  param.i_keyint_max = int(fps);
138  param.b_intra_refresh = 1;
139  //Rate control:
140  // param.rc.i_qp_constant = 51;
141  param.rc.i_rc_method = X264_RC_CRF;
142  param.rc.f_rf_constant = getProperty<float>("CompressionRate");
143  param.rc.f_rf_constant_max = param.rc.f_rf_constant * 1.4f;
144 
145  // param.rc.i_qp_constant = 18;
146  // param.rc.i_qp_min = 18;
147  // param.rc.i_qp_max = 18;
148 
149  //For streaming:
150  param.b_repeat_headers = 1;
151  param.b_annexb = 1;
152  if (x264_param_apply_profile(
153  &param, getProperty<std::string>("h264Profile").getValue().c_str()) != 0)
154  {
155  ARMARX_WARNING << "Could not set '"
156  << getProperty<std::string>("h264Profile").getValue()
157  << "' profile for x264 codec";
158  }
159 
160 
161  startCapture();
162  }
163 
164  /// Is called if a dependency of the object got lost (crash, network error, stopped, ...)
165  void
167  {
168  stopCapture();
169 
170  std::cout << "exiting StreamProviderI" << std::endl;
171 
172  //stop the timer task for the fps calcuation:
173  fpsCalculator->stop();
174 
175 
176  if (ppInputImages)
177  {
178  for (int i = 0; i < numberImages; i++)
179  {
180  delete ppInputImages[i];
181  }
182  delete[] ppInputImages;
183  }
184  }
185 
186  /// Is called once the component terminates.
187  void
189  {
190  }
191 
192  void
194  {
195  if (!capturing)
196  {
197  return;
198  }
199  if (!waitForImages())
200  {
201  ARMARX_VERBOSE << "No images from provider available.";
202  }
203  else
204  {
205  armarx::MetaInfoSizeBasePtr info;
206  getImages(imageProviderProxyName, ppInputImages, info);
207 
208 
209  for (int i = 0; i < numberImages; ++i)
210  {
211  int imageByteSize = ppInputImages[i]->width * ppInputImages[i]->height *
212  ppInputImages[i]->bytesPerPixel;
213  memcpy(pImageForEncoder->pixels + i * imageByteSize,
214  ppInputImages[i]->pixels,
215  size_t(imageByteSize));
216  }
217  int srcstride = imgWidth * 3; // RGB stride is just 3*width
218  const uint8_t* pixels = pImageForEncoder->pixels;
219  sws_scale(convertCtx,
220  &pixels,
221  &srcstride,
222  0,
223  encodedImgHeight,
224  pic_in.img.plane,
225  pic_in.img.i_stride);
226  x264_nal_t* nals;
227  int i_nals;
228  int frameSize = x264_encoder_encode(encoder, &nals, &i_nals, &pic_in, &pic_out);
229  if (frameSize >= 0)
230  {
231  ARMARX_DEBUG << deactivateSpam(1) << "encoded image: " << VAROUT(frameSize);
232 
233  frameCounter++;
234 
235  DataChunk chunk;
236  chunk.reserve(size_t(frameSize));
237  chunk.assign(nals->p_payload, nals->p_payload + frameSize);
238  listener->reportNewStreamData(chunk, info->timeProvided);
239  }
240  }
241  }
242 
243  /**
244  * Retrieve default name of component
245  *
246  * @return default name of the component
247  */
248  std::string
250  {
251  return "StreamProvider";
252  }
253 
254  bool
255  StreamProviderI::startCapture(const ::Ice::Current&)
256  {
257 
258  if (capturing)
259  {
260  stopCapture();
261  }
262  encoder = x264_encoder_open(&param);
263 
264  x264_picture_alloc(&pic_in, X264_CSP_I420, imgWidth, encodedImgHeight);
265 
266  convertCtx = sws_getContext(imgWidth,
267  encodedImgHeight,
268  AV_PIX_FMT_RGB24,
269  imgWidth,
270  encodedImgHeight,
271  AV_PIX_FMT_YUV420P,
272  SWS_FAST_BILINEAR,
273  nullptr,
274  nullptr,
275  nullptr);
276 
277  capturing = true;
278 
279  return true;
280  }
281 
282  void
283  StreamProviderI::stopCapture(const ::Ice::Current&)
284  {
285  capturing = false;
286 
287  if (encoder)
288  {
289  x264_picture_clean(&pic_in);
290  memset(reinterpret_cast<char*>(&pic_in), 0, sizeof(pic_in));
291  memset(reinterpret_cast<char*>(&pic_out), 0, sizeof(pic_out));
292 
293  x264_encoder_close(encoder);
294  encoder = nullptr;
295  }
296 
297  if (convertCtx)
298  {
299  sws_freeContext(convertCtx);
300  convertCtx = nullptr;
301  }
302  }
303 
304  CodecType
305  StreamProviderI::getCodecType(const Ice::Current&)
306  {
307  return getProperty<CodecType>("Codec").getValue();
308  }
309 
310  void
311  StreamProviderI::calculateFps()
312  {
313  // StreamSourceMap::iterator it = streamSources.begin();
314  // for(; it != streamSources.end(); it++)
315  // {
316  // it->second->fps = it->second->fetchedChunks;//0.2*fetchedChunks+0.8*fps; //calculate the new fps as weighted medium
317  // it->second->fps = it->second->fps > 10 ? (it->second->fps+2) : 10; //set the fps to a minimum of 10, increase the fps, so that slightly more images can be pushed in than pulled out
318  // armarx::ScopedLock lock(it->second->mutex);
319  // it->second->fetchedChunks = 0;
320  // }
321  }
322 
323  void
325  int& imageHeight,
326  int& imageType,
327  const Ice::Current&)
328  {
329  imageWidth = imgWidth;
330  imageHeight = imgHeight;
331  imageType = imgType;
332  }
333 
334  int
336  {
337  return numberImages;
338  }
339 
340  void
341  StreamProviderI::setCompressionRate(CompressionRate, const Ice::Current&)
342  {
343  ARMARX_WARNING << "Not yet implemented";
344  }
345 
346  void
348  {
349  startCapture();
350  }
351 
352 } // namespace Stream
ARMARX_VERBOSE
#define ARMARX_VERBOSE
Definition: Logging.h:187
Stream::StreamProviderI::getDefaultName
std::string getDefaultName() const override
Retrieve default name of component.
Definition: StreamProviderI.cpp:249
Stream::StreamProviderI::stopCapture
void stopCapture(const ::Ice::Current &=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:283
visionx::ImageProviderInfo::numberImages
int numberImages
Number of images.
Definition: ImageProcessor.h:519
visionx
ArmarX headers.
Definition: OpenPoseStressTest.h:38
Stream::StreamProviderI::getImageInformation
void getImageInformation(int &imageWidth, int &imageHeight, int &imageType, const Ice::Current &c=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:324
Stream::StreamProviderI::onDisConnectImageProcessor
virtual void onDisConnectImageProcessor()
Is called if a dependency of the object got lost (crash, network error, stopped, ....
Definition: StreamProviderI.cpp:166
visionx::ImageProcessor::getImageProvider
ImageProviderInfo getImageProvider(std::string name, ImageType destinationImageType=eRgb, bool waitForProxy=false)
Select an ImageProvider.
Definition: ImageProcessor.cpp:167
armarx::Component::offeringTopicFromProperty
void offeringTopicFromProperty(const std::string &propertyName)
Offer a topic whose name is specified by the given property.
Definition: Component.cpp:159
Stream::StreamProviderI::onStartRecording
void onStartRecording(const ::Ice::Current &=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:347
visionx::ImageProviderInfo::imageFormat
ImageFormatInfo imageFormat
Image format struct that contains all necessary image information.
Definition: ImageProcessor.h:509
visionx::tools::createByteImage
CByteImage * createByteImage(const ImageFormatInfo &imageFormat, const ImageType imageType)
Creates a ByteImage for the destination type specified in the given imageProviderInfo.
armarx::Component::getTopicFromProperty
TopicProxyType getTopicFromProperty(const std::string &propertyName)
Get a topic proxy whose name is specified by the given property.
Definition: Component.h:221
visionx::ImageProviderInfo
Definition: ImageProcessor.h:479
visionx::tools::convert
CByteImage::ImageType convert(const ImageType visionxImageType)
Converts a VisionX image type into an image type of IVT's ByteImage.
Definition: TypeMapping.cpp:97
Stream::StreamProviderI::onInitImageProcessor
void onInitImageProcessor() override
Is called once initialization of the ManagedIceObject is done.
Definition: StreamProviderI.cpp:77
Stream::StreamProviderI::startCapture
bool startCapture(const ::Ice::Current &=Ice::emptyCurrent) override
starts the capture for the given source
Definition: StreamProviderI.cpp:255
Stream::StreamProviderI::StreamProviderPropertyDefinitions::StreamProviderPropertyDefinitions
StreamProviderPropertyDefinitions(std::string prefix)
Definition: StreamProviderI.cpp:41
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:184
Stream::StreamProviderI::getCodecType
CodecType getCodecType(const Ice::Current &) override
Definition: StreamProviderI.cpp:305
Stream
Definition: StreamProviderI.cpp:38
visionx::ImageProcessor::usingImageProvider
void usingImageProvider(std::string name)
Registers a delayed topic subscription and a delayed provider proxy retrieval which all will be avail...
Definition: ImageProcessor.cpp:128
Stream::StreamProviderI::process
void process() override
Process the vision component.
Definition: StreamProviderI.cpp:193
visionx::ImageProcessor::getImages
int getImages(CByteImage **ppImages)
Poll images from provider.
Definition: ImageProcessor.cpp:395
StreamProviderI.h
visionx::ImageProcessor::setFramerate
void setFramerate(float fps)
Definition: ImageProcessor.cpp:771
Stream::StreamProviderI::onExitImageProcessor
void onExitImageProcessor() override
Is called once the component terminates.
Definition: StreamProviderI.cpp:188
armarx::ManagedIceObject::usingTopic
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Definition: ManagedIceObject.cpp:254
Stream::StreamProviderI::StreamProviderI
StreamProviderI()
Definition: StreamProviderI.cpp:65
VAROUT
#define VAROUT(x)
Definition: StringHelpers.h:198
ImageUtil.h
Stream::StreamProviderI::getNumberOfImages
int getNumberOfImages(const ::Ice::Current &=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:335
armarx::Logging::deactivateSpam
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition: Logging.cpp:99
TypeMapping.h
Stream::StreamProviderI::onConnectImageProcessor
void onConnectImageProcessor() override
Is called once all dependencies of the object have been resolved and Ice connection is established.
Definition: StreamProviderI.cpp:97
armarx::PeriodicTask
Definition: ArmarXManager.h:70
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
Stream::StreamProviderI::setCompressionRate
void setCompressionRate(::Stream::CompressionRate=COMPRESSIONHIGH, const ::Ice::Current &=Ice::emptyCurrent) override
Definition: StreamProviderI.cpp:341
Exception.h
visionx::ImageProcessor::waitForImages
bool waitForImages(int milliseconds=1000)
Wait for new images.
Definition: ImageProcessor.cpp:309