ThreadPool.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2017, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package ArmarX
19 * @author Mirko Waechter( mirko.waechter at kit dot edu)
20 * @date 2018
21 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22 * GNU General Public License
23 */
24
25#include "ThreadPool.h"
26
29
30// This was transitively included from some other STL header earlier,
31// but not anymore since GCC 12. This change will make building boost
32// asio fail now. See also https://github.com/doxygen/doxygen/issues/8909
33// for a similar issue in Doxygen.
34#include <utility>
35
36#include <boost/asio.hpp>
37#include <boost/thread.hpp>
38
39namespace armarx
40{
42 {
43 Impl(std::size_t pool_size, bool queueTasks) :
45 {
46 }
47
48 boost::asio::io_service io_service;
49 boost::asio::io_service::work work;
50 boost::thread_group threads;
51 std::size_t available;
52 std::mutex mutex;
54 };
55
56 ThreadPool::ThreadPool(std::size_t pool_size, bool queueTasks) :
57 impl(new Impl(pool_size, queueTasks))
58 {
59 for (std::size_t i = 0; i < pool_size; ++i)
60 {
61 impl->threads.create_thread(
62 boost::bind(&boost::asio::io_service::run, &impl->io_service));
63 }
64 }
65
67 {
68 // Force all threads to return from io_service::run().
69 impl->io_service.stop();
70
71 // Suppress all exceptions.
72 try
73 {
74 impl->threads.join_all();
75 }
76 catch (const std::exception&)
77 {
78 }
79 }
80
82 ThreadPool::runTask(std::function<void()> task)
83 {
84 std::unique_lock lock(impl->mutex);
85
86 // If no threads are available, then return.
87 if (!impl->queueTasks && impl->available == 0)
88 {
89 return {};
90 }
91
92 // Decrement count, indicating thread is no longer available.
93 if (!impl->queueTasks)
94 {
95 --impl->available;
96 }
97 auto promise = std::make_shared<std::promise<void>>();
98 // Post a wrapped task into the queue.
99 impl->io_service.post(
100 [this, task, promise]
101 {
102 wrap_task(task);
103 promise->set_value();
104 });
105 return Handle{promise->get_future()};
106 }
107
108 int
110 {
111 std::unique_lock lock(impl->mutex);
112 if (!impl->queueTasks)
113 {
114 return impl->available;
115 }
116 else
117 {
118 return -1;
119 }
120 }
121
122 void
123 ThreadPool::wrap_task(std::function<void()> task)
124 {
125 // Run the user supplied task.
126 try
127 {
128 task();
129 }
130 // Suppress all exceptions.
131 catch (const std::exception&)
132 {
133 std::unique_lock lock(impl->mutex);
134 // Task has ended, so increment count of available threads.
135 if (!impl->queueTasks)
136 {
137 ++impl->available;
138 }
140 }
141
142 // Task has finished, so increment count of available threads.
143 std::unique_lock lock(impl->mutex);
144 if (!impl->queueTasks)
145 {
146 ++impl->available;
147 }
148 }
149
150 ThreadPool::Handle::Handle() : mutex(new std::mutex)
151 {
152 }
153
154 ThreadPool::Handle::Handle(std::shared_future<void> functionFinished) :
155 mutex(new std::mutex), functionFinished(functionFinished)
156 {
157 }
158
160 {
161 if (!joined && isValid() && !detached)
162 {
163 throw LocalException()
164 << "You did not join the thread pool handle before the handle was deleted!";
165 }
166 }
167
168 bool
170 {
171 return functionFinished.valid();
172 }
173
174 void
176 {
178 std::lock_guard<decltype(*mutex)> lock(*mutex);
179
180 if (!isValid())
181 {
182 return;
183 }
184 if (detached)
185 {
186 throw LocalException() << "You cannot join a detached thread!";
187 }
188 functionFinished.get();
189 joined = true;
190 }
191
192 void
194 {
196 std::lock_guard<decltype(*mutex)> lock(*mutex);
197
198 if (!isValid())
199 {
200 return;
201 }
202 if (joined)
203 {
204 throw LocalException() << "You cannot detach a joined thread!";
205 }
206 detached = true;
207 }
208
209 const std::shared_future<void>&
211 {
212 return functionFinished;
213 }
214
215 bool
217 {
218 return joined;
219 }
220
221 bool
223 {
224 return detached;
225 }
226
227} // namespace armarx
const std::shared_future< void > & getFuture() const
~Handle() noexcept(false)
Handle runTask(std::function< void()> task)
Adds a task to the thread pool if a thread is currently available.
ThreadPool(std::size_t pool_size, bool queueTasks=false)
Constructor.
int getAvailableTaskCount() const
If queing is disabled, returns the number of available threads.
~ThreadPool()
Destructor.
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
This file offers overloads of toIce() and fromIce() functions for STL container types.
void handleExceptions()
boost::asio::io_service::work work
boost::thread_group threads
Impl(std::size_t pool_size, bool queueTasks)
boost::asio::io_service io_service