PeriodicTask.h
Go to the documentation of this file.
1/*
2* This file is part of ArmarX.
3*
4* Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5*
6* ArmarX is free software; you can redistribute it and/or modify
7* it under the terms of the GNU General Public License version 2 as
8* published by the Free Software Foundation.
9*
10* ArmarX is distributed in the hope that it will be useful, but
11* WITHOUT ANY WARRANTY; without even the implied warranty of
12* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13* GNU General Public License for more details.
14*
15* You should have received a copy of the GNU General Public License
16* along with this program. If not, see <http://www.gnu.org/licenses/>.
17*
18* @package ArmarXCore::core
19* @author Kai Welke (welke at kit dot edu)
20* @date 2012
21* @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22* GNU General Public License
23*/
24
25#pragma once
26
27
28#include <time.h>
29
30#include <functional>
31#include <mutex>
32
33#include <IceUtil/Time.h>
34#include <IceUtil/Timer.h>
35
40
41#include "ThreadList.h"
42#include <cxxabi.h>
43
44namespace armarx
45{
46 /**
47 * \class PeriodicTask
48 * \ingroup Threads
49 * The periodic task executes one thread method repeatedly using the time period specified in the constructor.
50 */
51 template <class T>
53 virtual public IceUtil::TimerTask,
54 virtual public Logging,
55 virtual protected PeriodicTaskIceBase
56 {
57 public:
58 /**
59 * Typedef for the periodic method. Thread methods need to follow the template
60 * void methodName(void). Parameter passing is not used since methods are members
61 * of the class.
62 */
63 typedef void (T::*method_type)(void);
64
65 /**
66 * Shared pointer type for convenience.
67 */
69
70 /**
71 * Constructs a periodic task within the class parent which calls the peridoicFn in
72 * a new thread.
73 *
74 * @param parent the class wheter periodicFn is member
75 * @param peridicFn reference to the periodic function implementation
76 * @param periodMs period time in milliseconds
77 * @param assureMeanInterval If set to TRUE, the scheduler tries to
78 * compensate timeslot exceedings overtime and not only on the next interval
79 * @param name of this thread, that describes what it is doing
80 * @param forceSystemTime If set to false, the Timeserver will be used for timing (if available)
81 *
82 */
83 PeriodicTask(T* parent,
84 method_type periodicFn,
85 int periodMs,
86 bool assureMeanInterval = false,
87 std::string name = "",
88 bool forceSystemTime = true)
89 {
90 shutdown = false;
91 running = false;
92 functionExecuting = false;
93 this->assureMeanInterval = assureMeanInterval;
94 executionCounter = 0;
95 last_cpu_total_time = 0;
96 this->periodicFn = periodicFn;
97 this->parent = parent;
98
99 if (name.empty())
100 {
101 char* demangled = nullptr;
102 int status = -1;
103 demangled = abi::__cxa_demangle(typeid(T).name(), nullptr, nullptr, &status);
104 setName(demangled);
105 free(demangled);
106 }
107 else
108 {
109 setName(name);
110 }
111
112 intervalMs = periodMs;
113 interval = IceUtil::Time::milliSeconds(periodMs);
114 timer = new armarx::Timer(forceSystemTime);
115 delayWarningTolerance = IceUtil::Time::milliSeconds(5);
116
117 workloadList.resize(100, 0.f);
118 setTag("PeriodicTask" + this->name);
119
120 if (customThreadList)
121 {
122 customThreadList->addPeriodicTask(this);
123 }
124
125 appThreadList = ThreadList::getApplicationThreadList();
126
127 if (appThreadList)
128 {
129 appThreadList->addPeriodicTask(this);
130 }
131 }
132
133 /**
134 * Destructor stops the thread and waits for completion.
135 */
136 ~PeriodicTask() override
137 {
138 stop();
139 if (appThreadList)
140 {
141 appThreadList->removePeriodicTask(this);
142 }
143
144 if (customThreadList)
145 {
146 customThreadList->removePeriodicTask(this);
147 }
148 }
149
150 /**
151 * @brief Set the name identifying PeriodicTask instances.
152 * @param name string containing the name
153 */
154 void
155 setName(std::string name)
156 {
157 this->name = name;
158 }
159
160 /**
161 * @brief This method sets \p threadList as PeriodicTask::customThreadList and appends \p this to the list.
162 * @param threadList a pointer to the ThreadList instance.
163 */
164 void
166 {
167 this->customThreadList = threadList;
168
169 if (isRunning())
170 {
171 threadList->addPeriodicTask(this);
172 }
173 }
174
175 /**
176 * Starts the thread. Only has effect if the task has not been started. After completion of the task, it can be started again (see isRunning()).
177 */
178 void
180 {
181 std::unique_lock lock(dataMutex);
182 auto app = Application::getInstance();
183 if (app && app->getForbidThreadCreation())
184 {
185 throw LocalException()
186 << "Thread creation is now allowed in this application at the point in time! "
187 "Use Application::getInstance()->getThreadPool() instead.";
188 }
189 if (!running)
190 {
191 shutdown = false;
192 cycleStart = logTime = __now();
193 startTime = logTime.toMicroSeconds();
194 timeExceeded = false;
195 timer->schedule(this, IceUtil::Time::milliSeconds(0));
196 }
197
198 running = true;
199 }
200
201 /**
202 * @return The scheduling interval of the thread instance in milliseconds
203 */
204 IceUtil::Int64
206 {
207 return interval.toMilliSeconds();
208 }
209
210 /**
211 * @return Set the tolerance value for delay warnings. If the tasks takes longer than (scheduling interval + tolerance), a warning is issued.
212 * @param newToleranceInMilliSeconds The tolerance, in milliseconds.
213 */
214 void
215 setDelayWarningTolerance(int newToleranceInMilliSeconds = 5)
216 {
217 delayWarningTolerance = IceUtil::Time::milliSeconds(newToleranceInMilliSeconds);
218 }
219
220 /**
221 * @brief changeInterval changes the interval at which the task is executed.
222 * The execution starts the first time again after the duration specified
223 * in the parameter OR if the task is currently executed after this task
224 * + the interval specified - cycleDuration.
225 * @param intervalInMs new execution interval in milliseconds.
226 */
227 void
228 changeInterval(unsigned int intervalInMs)
229 {
230 std::unique_lock lock(dataMutex);
231
232 if (interval.toMilliSeconds() == intervalInMs)
233 {
234 return;
235 }
236
237 intervalMs = intervalInMs;
238 interval = IceUtil::Time::milliSeconds(intervalInMs);
239
240 __changeInterval();
241 }
242
243 /**
244 * Stops the thread. Only has effect if the task has been started.
245 *
246 *
247 */
248 void
250 {
251 std::unique_lock lock(dataMutex);
252
253 if (running)
254 {
255 shutdown = true;
256 lock.unlock();
257 // timer->cancel(this);
258 timer->destroy();
259 timer = new armarx::Timer(timer->getUseSystemTime());
260 }
261
262 running = false;
263 }
264
265 /**
266 * Retrieve running state of the thread
267 *
268 * @return whether thread is running
269 */
270 bool
271 isRunning() const
272 {
273 return running;
274 }
275
276 /**
277 * Return the execution state of the thread function.
278 *
279 * @return PeriodicTask::functionExecuting
280 */
281 bool
283 {
284 std::unique_lock lock(mutexFunctionExecuting);
285 return functionExecuting;
286 }
287
288 /**
289 * @return PeriodicTaskIceBase base pointer to the current PeriodicTask instance
290 */
291 const PeriodicTaskIceBase&
293 {
294 const PeriodicTaskIceBase* base = dynamic_cast<const PeriodicTaskIceBase*>(this);
295 return *base;
296 }
297
298 private:
299 void
300 runTimerTask() override
301 {
302 {
303 std::unique_lock lock(mutexFunctionExecuting);
304 functionExecuting = true;
305 }
306 IceUtil::Time idleDuration;
307 {
308 std::unique_lock lock(dataMutex);
309 threadId = LogSender::getThreadId();
310
311 idleDuration = __now() - cycleStart - cycleDuration;
312 cycleStart = __now();
313 lastCycleStartTime = cycleStart.toMicroSeconds();
314 }
315
316 // call periodic function
317 try
318 {
319 (parent->*periodicFn)();
320 }
321 catch (...)
322 {
324 }
325
326 {
327 std::unique_lock lock2(dataMutex);
328 std::unique_lock lock(mutexFunctionExecuting);
329 // measure execution time
330 cycleDuration = __now() - cycleStart;
331 lastCycleDuration = cycleDuration.toMicroSeconds();
332 __checkTimeslotUsage();
333 __getCPULoad(idleDuration);
334 __reschedule();
335 functionExecuting = false;
336 }
337 }
338
339 void
340 __changeInterval()
341 {
342 std::unique_lock lock(mutexFunctionExecuting);
343 startTime = __now().toMicroSeconds();
344 executionCounter = 0;
345
346 if (functionExecuting)
347 {
348 return;
349 }
350
351 timer->cancel(this);
352
353 timer->schedule(this, interval);
354 ARMARX_VERBOSE << "Execution interval changed to " << interval.toMilliSeconds()
355 << " ms." << std::endl;
356 }
357
358 void
359 __reschedule()
360 {
361 if (isRunning())
362 {
363 IceUtil::Time newInterval;
364 executionCounter++;
365
366 if (assureMeanInterval)
367 {
368 newInterval = IceUtil::Time::microSeconds(
369 startTime + interval.toMicroSeconds() * executionCounter) -
370 __now();
371 }
372 else
373 {
374 newInterval = interval - IceUtil::Time::microSeconds(lastCycleDuration);
375 }
376
377 if (newInterval.toMicroSeconds() < 0)
378 {
379 newInterval = IceUtil::Time::microSeconds(0);
380 }
381
382 if (!shutdown)
383 {
384 timer->schedule(this, newInterval);
385 }
386 }
387 }
388
389 void
390 __checkTimeslotUsage()
391 {
392 if (interval + delayWarningTolerance < cycleDuration)
393 {
394 timeExceeded = true;
395 executionTime = cycleDuration.toMilliSeconds();
396 }
397
398 float logInterval = 30.0f;
399
400 if (timeExceeded)
401 {
402 ARMARX_INFO << deactivateSpam(logInterval) << "periodic task '" << name << "' took "
403 << executionTime << " [ms] instead of the requested "
404 << interval.toMilliSeconds()
405 << " [ms] (this message is only posted every " << logInterval
406 << " seconds.)";
407 timeExceeded = false;
408 }
409 }
410
411 void
412 __getCPULoad(IceUtil::Time& idleDuration)
413 {
414 double threadCPUUsage = -1;
415 timespec usage;
416 timespec st;
417
418 if (clock_gettime(CLOCK_REALTIME, &st) != -1 &&
419 clock_gettime(CLOCK_THREAD_CPUTIME_ID, &usage) != -1)
420 {
421 double current_cpu_thread_time = static_cast<double>(usage.tv_sec) +
422 static_cast<double>(usage.tv_nsec) / 1'000'000'000;
423 double current_cpu_total_time = static_cast<double>(st.tv_sec) +
424 static_cast<double>(st.tv_nsec) / 1'000'000'000;
425
426 if (last_cpu_total_time != 0)
427 {
428 threadCPUUsage = (current_cpu_thread_time - last_cpu_thread_time) /
429 (current_cpu_total_time - last_cpu_total_time);
430 }
431
432 last_cpu_total_time = current_cpu_total_time;
433 last_cpu_thread_time = current_cpu_thread_time;
434 }
435 else
436 {
437 // store the workload of this task the simple way
438 threadCPUUsage = static_cast<double>(lastCycleDuration) /
439 (lastCycleDuration + idleDuration.toMicroSeconds());
440 }
441
442 if (threadCPUUsage > 0)
443 {
444 workloadList.push_back(threadCPUUsage);
445
446 if (workloadList.size() > 100.f * 1.5f)
447 {
448 workloadList.erase(workloadList.begin(), workloadList.begin() + 50);
449 }
450 }
451 }
452
453 IceUtil::Time
454 __now() const
455 {
456 if (timer->getUseSystemTime())
457 {
458 return IceUtil::Time::now();
459 }
460 else
461 {
462 return TimeUtil::GetTime();
463 }
464 }
465
466 armarx::TimerPtr timer;
467 T* parent;
468 method_type periodicFn;
469
470
471 IceUtil::Time interval;
472 IceUtil::Time cycleStart;
473 IceUtil::Time cycleDuration;
474 IceUtil::Time logTime;
475 IceUtil::Time delayWarningTolerance;
476 IceUtil::Int64 executionTime;
477 IceUtil::Int64 executionCounter;
478
479 // IceUtil::Time scheduleTime;
480 // IceUtil::Time scheduledInterval;
481 // IceUtil::Int64 realIntervalSum;
482 // IceUtil::Int64 scheduledIntervalSum;
483 double last_cpu_total_time;
484 double last_cpu_thread_time;
485
486 bool timeExceeded;
487 //! Avoids Drifting
488 bool assureMeanInterval;
489
490 mutable std::mutex mutexFunctionExecuting;
491 bool functionExecuting;
492 mutable std::mutex dataMutex;
493
494 ThreadListPtr customThreadList;
495 ThreadListPtr appThreadList; // prevent deletion of global applist
496 };
497
498
499} // namespace armarx
#define ARMARXCORE_IMPORT_EXPORT
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
static ApplicationPtr getInstance()
Retrieve shared pointer to the application object.
void setTag(const LogTag &tag)
Definition Logging.cpp:54
PeriodicTask(T *parent, method_type periodicFn, int periodMs, bool assureMeanInterval=false, std::string name="", bool forceSystemTime=true)
Constructs a periodic task within the class parent which calls the peridoicFn in a new thread.
void changeInterval(unsigned int intervalInMs)
changeInterval changes the interval at which the task is executed.
~PeriodicTask() override
Destructor stops the thread and waits for completion.
void start()
Starts the thread.
const PeriodicTaskIceBase & getStatistics() const
void setThreadList(ThreadListPtr threadList)
This method sets threadList as PeriodicTask::customThreadList and appends this to the list.
void(T::* method_type)(void)
Typedef for the periodic method.
void setDelayWarningTolerance(int newToleranceInMilliSeconds=5)
IceUtil::Int64 getInterval() const
IceUtil::Handle< PeriodicTask< T > > pointer_type
Shared pointer type for convenience.
bool isFunctionExecuting() const
Return the execution state of the thread function.
static ThreadListPtr getApplicationThreadList()
getApplicationThreadList retrieves the ThreadList, that contains all TimerTasks and PeriodicTasks in ...
Timer implementation with TimeServer support.
Definition Timer.h:62
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
bool isRunning(Status status)
Returns whether the given task status describes a state where a path is planned.
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceUtil::Handle< Timer > TimerPtr
smart pointer for armarx::Timer
Definition Timer.h:162
IceInternal::Handle< ThreadList > ThreadListPtr
Definition RunningTask.h:39
void handleExceptions()
Interval< T > interval(T lo, T hi)