22 #include <condition_variable>
42 : num_tasks_(num_tasks)
47 void add(
size_t num_tasks = 1) noexcept { num_tasks_.fetch_add(num_tasks); }
51 void cross(
size_t num_tasks = 1)
53 num_tasks_.fetch_sub(num_tasks);
54 if (num_tasks_ <= 0) {
56 std::lock_guard<std::mutex> lk(mtx_);
63 bool empty() const noexcept {
return num_tasks_ <= 0; }
68 void wait(
size_t millis = 0)
70 std::this_thread::yield();
71 auto wake_up = [
this] {
return (num_tasks_ <= 0) || exception_ptr_; };
72 std::unique_lock<std::mutex> lk(mtx_);
74 cv_.wait(lk, wake_up);
76 cv_.wait_for(lk, std::chrono::milliseconds(millis), wake_up);
79 std::rethrow_exception(exception_ptr_);
85 void stop(std::exception_ptr eptr =
nullptr) noexcept
88 std::lock_guard<std::mutex> lk(mtx_);
91 num_tasks_ = std::numeric_limits<int>::min() / 2;
92 exception_ptr_ = eptr;
98 alignas(64) std::atomic_int num_tasks_{ 0 };
100 std::condition_variable cv_;
101 std::exception_ptr exception_ptr_{
nullptr };
112 explicit RingBuffer(
size_t capacity)
113 : buffer_{ std::unique_ptr<T[]>(
new T[capacity]) }
114 , capacity_{ capacity }
115 , mask_{ capacity - 1 }
119 size_t capacity()
const {
return capacity_; }
121 void set_entry(
size_t i, T val) { buffer_[i & mask_] = val; }
123 T get_entry(
size_t i)
const {
return buffer_[i & mask_]; }
125 RingBuffer<T>* enlarged_copy(
size_t bottom,
size_t top)
const
127 RingBuffer<T>* new_buffer =
new RingBuffer{ 2 * capacity_ };
128 for (
size_t i = top; i != bottom; ++i)
129 new_buffer->set_entry(i, this->get_entry(i));
134 std::unique_ptr<T[]> buffer_;
143 exchange(T& obj, T&& new_value) noexcept
145 T old_value = std::move(obj);
146 obj = std::forward<T>(new_value);
153 using Task = std::function<void()>;
158 TaskQueue(
size_t capacity = 256)
159 : buffer_{
new RingBuffer<Task*>(capacity) }
162 ~TaskQueue() noexcept
165 auto buf_ptr = buffer_.load();
166 for (
int i = top_; i < bottom_.load(m_relaxed); ++i)
167 delete buf_ptr->get_entry(i);
171 TaskQueue(TaskQueue
const& other) =
delete;
172 TaskQueue& operator=(TaskQueue
const& other) =
delete;
177 return (bottom_.load(m_relaxed) <= top_.load(m_relaxed));
182 bool try_push(Task&& task)
187 std::unique_lock<std::mutex> lk(mutex_, std::try_to_lock);
190 this->push_unsafe(std::forward<Task>(task));
197 void force_push(Task&& task)
201 std::lock_guard<std::mutex> lk(mutex_);
202 this->push_unsafe(std::forward<Task>(task));
208 void push_unsafe(Task&& task)
210 auto b = bottom_.load(m_relaxed);
211 auto t = top_.load(m_acquire);
212 RingBuffer<Task*>* buf_ptr = buffer_.load(m_relaxed);
214 if (
static_cast<int>(buf_ptr->capacity()) < (b - t) + 1) {
216 old_buffers_.emplace_back(
217 exchange(buf_ptr, buf_ptr->enlarged_copy(b, t)));
218 buffer_.store(buf_ptr, m_relaxed);
221 buf_ptr->set_entry(b,
new Task{ std::forward<Task>(task) });
222 bottom_.store(b + 1, m_release);
226 bool try_pop(Task& task)
228 auto t = top_.load(m_acquire);
229 std::atomic_thread_fence(m_seq_cst);
230 auto b = bottom_.load(m_acquire);
235 auto task_ptr = buffer_.load(m_acquire)->get_entry(t);
237 if (top_.compare_exchange_strong(t, t + 1, m_seq_cst, m_relaxed)) {
238 task = std::move(*task_ptr);
248 std::unique_lock<std::mutex> lk(mutex_);
249 cv_.wait(lk, [
this] {
return !this->empty() || stopped_; });
255 std::lock_guard<std::mutex> lk(mutex_);
262 alignas(64) std::atomic_int top_{ 0 };
263 alignas(64) std::atomic_int bottom_{ 0 };
264 alignas(64) std::atomic<RingBuffer<Task*>*> buffer_{
nullptr };
265 std::vector<std::unique_ptr<RingBuffer<Task*>>> old_buffers_;
267 std::condition_variable cv_;
268 std::atomic<bool> stopped_;
271 static constexpr std::memory_order m_relaxed = std::memory_order_relaxed;
272 static constexpr std::memory_order m_acquire = std::memory_order_acquire;
273 static constexpr std::memory_order m_release = std::memory_order_release;
274 static constexpr std::memory_order m_seq_cst = std::memory_order_seq_cst;
280 std::vector<TaskQueue> queues_;
282 alignas(64) std::atomic_size_t push_idx_{ 0 };
283 std::atomic_bool stopped_{
false };
284 std::atomic_size_t todo_list_{ 0 };
286 explicit TaskManager(
size_t num_queues)
287 : queues_{ std::vector<TaskQueue>(num_queues) }
288 , num_queues_{ num_queues }
291 template<
typename Task>
292 void push(Task&& task)
296 for (
size_t count = 0; count < num_queues_ * 20; count++) {
297 if (queues_[push_idx_++ % num_queues_].try_push(task))
300 queues_[push_idx_++ % num_queues_].force_push(task);
303 template<
typename Task>
304 bool try_pop(Task& task,
size_t worker_id = 0)
308 for (
size_t k = 0; k <= num_queues_; k++) {
309 if (queues_[(worker_id + k) % num_queues_].try_pop(task))
315 void wait_for_jobs(
size_t id) { queues_[id].wait(); }
319 for (
auto& q : queues_)
324 bool stopped() {
return stopped_; }
335 :
ThreadPool(std::thread::hardware_concurrency())
342 : task_manager_{ n_workers }
344 for (
size_t id = 0;
id < n_workers; ++id) {
345 workers_.emplace_back([
this,
id] {
346 std::function<void()> task;
347 while (!task_manager_.stopped()) {
348 task_manager_.wait_for_jobs(
id);
351 if (task_manager_.try_pop(task,
id))
352 this->execute_safely(task);
353 }
while (!todo_list_.
empty());
361 task_manager_.stop();
362 for (
auto& worker : workers_) {
363 if (worker.joinable())
383 template<
class Function,
class... Args>
384 void push(Function&& f, Args&&... args)
386 if (workers_.size() == 0)
390 std::bind(std::forward<Function>(f), std::forward<Args>(args)...));
398 template<
class Function,
class... Args>
399 auto async(Function&& f, Args&&... args)
400 -> std::future<decltype(f(args...))>
403 std::bind(std::forward<Function>(f), std::forward<Args>(args)...);
404 using pack_t = std::packaged_task<decltype(f(args...))()>;
405 auto task_ptr = std::make_shared<pack_t>(std::move(pack));
406 this->
push([task_ptr] { (*task_ptr)(); });
407 return task_ptr->get_future();
414 void execute_safely(std::function<
void()>& task)
420 todo_list_.
stop(std::current_exception());
421 task_manager_.stop();
425 detail::TaskManager task_manager_;
426 TodoList todo_list_{ 0 };
427 std::vector<std::thread> workers_;
435 template<
class Function,
class... Args>
437 push(Function&& f, Args&&... args)
440 std::forward<Args>(args)...);
448 template<
class Function,
class... Args>
450 async(Function&& f, Args&&... args) -> std::future<decltype(f(args...))>
453 std::forward<Args>(args)...);