RunningTask.cpp
Go to the documentation of this file.
1 #include "RunningTask.h"
2 
3 #include "ThreadList.h"
4 
7 
8 #include <condition_variable>
9 #include <mutex>
10 
11 namespace armarx
12 {
13 
15  {
16  //! mutex for the stop function
17  std::mutex stoppingMutex;
18  //! mutex for the stopped status
19  std::mutex stopMutex;
20  std::condition_variable stopCondition;
21  //! mutex for the finished status
22  std::mutex finishedMutex;
23  std::condition_variable finishedCondition;
24 
26  bool threadJoined = false;
27  };
28 
29  RunningTaskBase::RunningTaskBase(std::string const& name)
30  : impl(new Impl)
31  {
32  this->running = false;
33  this->stopped = false;
34  this->finished = false;
35  this->workload = 0.0f;
36 
37  setName(name);
38  }
39 
41  {
42  stop();
43  }
44 
45  void RunningTaskBase::setName(const std::string& name)
46  {
47  RunningTaskIceBase::name = name;
48  }
49 
51  {
52  impl->customThreadList = threadList;
53 
54  if (isRunning())
55  {
56  impl->customThreadList->addRunningTask(this);
57  }
58  }
59 
61  {
62  auto app = Application::getInstance();
63  if (app && app->getForbidThreadCreation())
64  {
65  throw LocalException() << "Thread creation is now allowed in this application at the point in time! Use Application::getInstance()->getThreadPool() instead.";
66  }
67 
68  if (!running && !finished)
69  {
70  impl->threadJoined = false;
71  startTime = IceUtil::Time::now().toMicroSeconds();
72  IceUtil::Thread::start();
73 
74  running = true;
75  }
76 
77  if (!running && finished)
78  {
79  throw LocalException("Running Task '" + RunningTaskIceBase::name + "' is already finished and cannot be started again.");
80  }
81  }
82 
83  void RunningTaskBase::stop(bool waitForJoin)
84  {
85  std::unique_lock stoppingMutexLock{impl->stoppingMutex, std::defer_lock};
86 
87  if (waitForJoin)
88  {
89  impl->stoppingMutex.lock(); // thread is beeing stopped at the moment
90  }
91  else if (!stoppingMutexLock.try_lock())
92  {
93  return; // thread is already being stopped
94  }
95 
96  try
97  {
98  if (running)
99  {
100  if (!stopped)
101  {
102  {
103  std::unique_lock lock(impl->stopMutex);
104  stopped = true;
105  impl->stopCondition.notify_all();
106  }
107  }
108  }
109 
110  if (waitForJoin && !impl->threadJoined)
111  {
112  impl->threadJoined = true;
113  getThreadControl().join();
114  }
115 
116  impl->stoppingMutex.unlock();
117 
118  }
119  catch (IceUtil::ThreadSyscallException& e) // happens for example when the thread is already erased
120  {
121  impl->stoppingMutex.unlock();
122  }
123  catch (IceUtil::ThreadNotStartedException& e)
124  {
125  impl->stoppingMutex.unlock();
126  }
127  catch (...) // make sure that the mutex is unlocked on all exceptions
128  {
129  ARMARX_INFO_S << "Got exception in RunningTask::stop()";
130  impl->stoppingMutex.unlock();
131  throw;
132  }
133 
134  }
135 
137  {
138  std::unique_lock lock(impl->stoppingMutex);
139  if (!impl->threadJoined)
140  {
141  impl->threadJoined = true;
142  getThreadControl().join();
143  }
144  }
145 
147  {
148  return this->running;
149  }
150 
152  {
153  return this->finished;
154  }
155 
157  {
158  std::unique_lock lock(impl->finishedMutex);
159  if (!running && !finished) // the task never started
160  {
161  return true;
162  }
163  while (!isFinished())
164  {
165  if (timeoutMS == -1)
166  {
167  impl->finishedCondition.wait(lock);
168  }
169  else if (impl->finishedCondition.wait_for(lock, std::chrono::milliseconds(timeoutMS)) == std::cv_status::timeout)
170  {
171  return false;
172  }
173 
174  }
175  return true;
176  }
177 
179  {
180  lastFeedbackTime = IceUtil::Time::now().toMicroSeconds();
181  return stopped;
182  }
183 
185  {
186  std::unique_lock lock(impl->stopMutex);
187 
188  while (!isStopped())
189  {
190  impl->stopCondition.wait(lock);
191  }
192 
193  }
194 
195  std::string RunningTaskBase::getName() const
196  {
197  return RunningTaskIceBase::name;
198  }
199 
200  void RunningTaskBase::run()
201  {
202  threadId = LogSender::getThreadId();
203  ThreadList::getApplicationThreadList()->addRunningTask(this);
204 
205  if (impl->customThreadList)
206  {
207  impl->customThreadList->addRunningTask(this);
208  }
209 
210  try
211  {
212  if (callback)
213  {
214  callback();
215  }
216  else
217  {
218  throw std::runtime_error("No callback defined in RunningTask");
219  }
220  }
221  catch (...)
222  {
224  }
225 
226  std::unique_lock lock(impl->finishedMutex);
227  finished = true;
228  running = false;
229  ThreadList::getApplicationThreadList()->removeRunningTask(this);
230 
231  if (impl->customThreadList)
232  {
233  impl->customThreadList->removeRunningTask(this);
234  }
235 
236  impl->stopCondition.notify_all();
237  impl->finishedCondition.notify_all();
238  }
239 
240 
241 }
armarx::RunningTaskBase::Impl::finishedCondition
std::condition_variable finishedCondition
Definition: RunningTask.cpp:23
armarx::RunningTaskBase::setThreadList
void setThreadList(ThreadListPtr threadList)
Definition: RunningTask.cpp:50
armarx::RunningTaskBase::Impl::stoppingMutex
std::mutex stoppingMutex
mutex for the stop function
Definition: RunningTask.cpp:17
armarx::RunningTaskBase::start
void start()
Starts the thread.
Definition: RunningTask.cpp:60
armarx::RunningTaskBase::waitForStop
void waitForStop()
Wait blocking for thread until stop() has been called.
Definition: RunningTask.cpp:184
LocalException.h
armarx::LogSender::getThreadId
static long getThreadId()
Definition: LogSender.cpp:447
armarx::RunningTaskBase::Impl::finishedMutex
std::mutex finishedMutex
mutex for the finished status
Definition: RunningTask.cpp:22
armarx::control::common::MPStatus::finished
@ finished
armarx::RunningTaskBase::isStopped
bool isStopped()
Retrieve whether stop() has been called.
Definition: RunningTask.cpp:178
armarx::ThreadList::getApplicationThreadList
static ThreadListPtr getApplicationThreadList()
getApplicationThreadList retrieves the ThreadList, that contains all TimerTasks and PeriodicTasks in ...
Definition: ThreadList.cpp:189
RunningTask.h
armarx::RunningTaskBase::getName
std::string getName() const
Definition: RunningTask.cpp:195
armarx::RunningTaskBase::Impl::threadJoined
bool threadJoined
Definition: RunningTask.cpp:26
armarx::RunningTaskBase::waitForFinished
bool waitForFinished(int timeoutMS=-1)
wait blocking for thread to be finished.
Definition: RunningTask.cpp:156
armarx::RunningTaskBase::isFinished
bool isFinished() const
Retrieve finished state of the thread.
Definition: RunningTask.cpp:151
armarx::RunningTaskBase::callback
CallbackT callback
Definition: RunningTask.h:48
IceInternal::Handle< ThreadList >
armarx::RunningTaskBase::~RunningTaskBase
~RunningTaskBase() override
Destructor stops the thread and waits for completion.
Definition: RunningTask.cpp:40
ThreadList.h
armarx::RunningTaskBase::Impl::stopCondition
std::condition_variable stopCondition
Definition: RunningTask.cpp:20
armarx::RunningTaskBase::Impl::customThreadList
ThreadListPtr customThreadList
Definition: RunningTask.cpp:25
armarx::RunningTaskBase::Impl
Definition: RunningTask.cpp:14
armarx::Application::getInstance
static ApplicationPtr getInstance()
Retrieve shared pointer to the application object.
Definition: Application.cpp:289
armarx::RunningTaskBase::setName
void setName(const std::string &name)
Definition: RunningTask.cpp:45
armarx::RunningTaskBase::stop
void stop(bool waitForJoin=true)
Stops the thread.
Definition: RunningTask.cpp:83
armarx::RunningTaskBase::join
void join()
Wait for the RunningTask to finish without telling it to finish.
Definition: RunningTask.cpp:136
armarx::RunningTaskBase::isRunning
bool isRunning() const
Retrieve running state of the thread.
Definition: RunningTask.cpp:146
armarx::RunningTaskBase::RunningTaskBase
RunningTaskBase(std::string const &name)
Definition: RunningTask.cpp:29
ARMARX_INFO_S
#define ARMARX_INFO_S
Definition: Logging.h:195
armarx::RunningTaskBase::Impl::stopMutex
std::mutex stopMutex
mutex for the stopped status
Definition: RunningTask.cpp:19
armarx::handleExceptions
void handleExceptions()
Definition: Exception.cpp:141
armarx
This file offers overloads of toIce() and fromIce() functions for STL container types.
Definition: ArmarXTimeserver.cpp:28
Application.h
armarx::control::common::MPStatus::running
@ running