RunningTask.cpp
Go to the documentation of this file.
1#include "RunningTask.h"
2
3#include <condition_variable>
4#include <mutex>
5
8
9#include "ThreadList.h"
10
11namespace armarx
12{
13
15 {
16 //! mutex for the stop function
17 std::mutex stoppingMutex;
18 //! mutex for the stopped status
19 std::mutex stopMutex;
20 std::condition_variable stopCondition;
21 //! mutex for the finished status
22 std::mutex finishedMutex;
23 std::condition_variable finishedCondition;
24
26 bool threadJoined = false;
27 };
28
29 RunningTaskBase::RunningTaskBase(std::string const& name) : impl(new Impl)
30 {
31 this->running = false;
32 this->stopped = false;
33 this->finished = false;
34 this->workload = 0.0f;
35
36 setName(name);
37 }
38
43
44 void
45 RunningTaskBase::setName(const std::string& name)
46 {
47 RunningTaskIceBase::name = name;
48 }
49
50 void
52 {
53 impl->customThreadList = threadList;
54
55 if (isRunning())
56 {
57 impl->customThreadList->addRunningTask(this);
58 }
59 }
60
61 void
63 {
64 auto app = Application::getInstance();
65 if (app && app->getForbidThreadCreation())
66 {
67 throw LocalException()
68 << "Thread creation is now allowed in this application at the point in time! Use "
69 "Application::getInstance()->getThreadPool() instead.";
70 }
71
72 if (!running && !finished)
73 {
74 impl->threadJoined = false;
75 startTime = IceUtil::Time::now().toMicroSeconds();
76 IceUtil::Thread::start();
77
78 running = true;
79 }
80
81 if (!running && finished)
82 {
83 throw LocalException("Running Task '" + RunningTaskIceBase::name +
84 "' is already finished and cannot be started again.");
85 }
86 }
87
88 void
89 RunningTaskBase::stop(bool waitForJoin)
90 {
91 std::unique_lock stoppingMutexLock{impl->stoppingMutex, std::defer_lock};
92
93 if (waitForJoin)
94 {
95 impl->stoppingMutex.lock(); // thread is beeing stopped at the moment
96 }
97 else if (!stoppingMutexLock.try_lock())
98 {
99 return; // thread is already being stopped
100 }
101
102 try
103 {
104 if (running)
105 {
106 if (!stopped)
107 {
108 {
109 std::unique_lock lock(impl->stopMutex);
110 stopped = true;
111 impl->stopCondition.notify_all();
112 }
113 }
114 }
115
116 if (waitForJoin && !impl->threadJoined)
117 {
118 impl->threadJoined = true;
119 getThreadControl().join();
120 }
121
122 impl->stoppingMutex.unlock();
123 }
124 catch (IceUtil::ThreadSyscallException&
125 e) // happens for example when the thread is already erased
126 {
127 impl->stoppingMutex.unlock();
128 }
129 catch (IceUtil::ThreadNotStartedException& e)
130 {
131 impl->stoppingMutex.unlock();
132 }
133 catch (...) // make sure that the mutex is unlocked on all exceptions
134 {
135 ARMARX_INFO_S << "Got exception in RunningTask::stop()";
136 impl->stoppingMutex.unlock();
137 throw;
138 }
139 }
140
141 void
143 {
144 std::unique_lock lock(impl->stoppingMutex);
145 if (!impl->threadJoined)
146 {
147 impl->threadJoined = true;
148 getThreadControl().join();
149 }
150 }
151
152 bool
154 {
155 return this->running;
156 }
157
158 bool
160 {
161 return this->finished;
162 }
163
164 bool
166 {
167 std::unique_lock lock(impl->finishedMutex);
168 if (!running && !finished) // the task never started
169 {
170 return true;
171 }
172 while (!isFinished())
173 {
174 if (timeoutMS == -1)
175 {
176 impl->finishedCondition.wait(lock);
177 }
178 else if (impl->finishedCondition.wait_for(lock, std::chrono::milliseconds(timeoutMS)) ==
179 std::cv_status::timeout)
180 {
181 return false;
182 }
183 }
184 return true;
185 }
186
187 bool
189 {
190 lastFeedbackTime = IceUtil::Time::now().toMicroSeconds();
191 return stopped;
192 }
193
194 void
196 {
197 std::unique_lock lock(impl->stopMutex);
198
199 while (!isStopped())
200 {
201 impl->stopCondition.wait(lock);
202 }
203 }
204
205 std::string
207 {
208 return RunningTaskIceBase::name;
209 }
210
211 void
212 RunningTaskBase::run()
213 {
214 threadId = LogSender::getThreadId();
215 ThreadList::getApplicationThreadList()->addRunningTask(this);
216
217 if (impl->customThreadList)
218 {
219 impl->customThreadList->addRunningTask(this);
220 }
221
222 try
223 {
224 if (callback)
225 {
226 callback();
227 }
228 else
229 {
230 throw std::runtime_error("No callback defined in RunningTask");
231 }
232 }
233 catch (...)
234 {
236 }
237
238 std::unique_lock lock(impl->finishedMutex);
239 finished = true;
240 running = false;
241 ThreadList::getApplicationThreadList()->removeRunningTask(this);
242
243 if (impl->customThreadList)
244 {
245 impl->customThreadList->removeRunningTask(this);
246 }
247
248 impl->stopCondition.notify_all();
249 impl->finishedCondition.notify_all();
250 }
251
252
253} // namespace armarx
static ApplicationPtr getInstance()
Retrieve shared pointer to the application object.
static long getThreadId()
~RunningTaskBase() override
Destructor stops the thread and waits for completion.
bool isStopped()
Retrieve whether stop() has been called.
bool waitForFinished(int timeoutMS=-1)
wait blocking for thread to be finished.
bool isFinished() const
Retrieve finished state of the thread.
void start()
Starts the thread.
void join()
Wait for the RunningTask to finish without telling it to finish.
std::string getName() const
void waitForStop()
Wait blocking for thread until stop() has been called.
void setName(const std::string &name)
void stop(bool waitForJoin=true)
Stops the thread.
void setThreadList(ThreadListPtr threadList)
bool isRunning() const
Retrieve running state of the thread.
RunningTaskBase(std::string const &name)
static ThreadListPtr getApplicationThreadList()
getApplicationThreadList retrieves the ThreadList, that contains all TimerTasks and PeriodicTasks in ...
#define ARMARX_INFO_S
Definition Logging.h:202
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceInternal::Handle< ThreadList > ThreadListPtr
Definition RunningTask.h:39
void handleExceptions()
std::condition_variable stopCondition
std::mutex stoppingMutex
mutex for the stop function
std::mutex stopMutex
mutex for the stopped status
std::condition_variable finishedCondition
std::mutex finishedMutex
mutex for the finished status