Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/boostorg/capy
8 : //
9 :
10 : #include <boost/capy/ex/thread_pool.hpp>
11 : #include <boost/capy/detail/intrusive.hpp>
12 : #include <atomic>
13 : #include <condition_variable>
14 : #include <mutex>
15 : #include <thread>
16 : #include <vector>
17 :
18 : namespace boost {
19 : namespace capy {
20 :
21 : //------------------------------------------------------------------------------
22 :
23 : class thread_pool::impl
24 : {
25 : struct work : detail::intrusive_queue<work>::node
26 : {
27 : coro h_;
28 :
29 118 : explicit work(coro h) noexcept
30 118 : : h_(h)
31 : {
32 118 : }
33 :
34 118 : void run()
35 : {
36 118 : auto h = h_;
37 118 : delete this;
38 118 : h.resume();
39 118 : }
40 :
41 0 : void destroy()
42 : {
43 0 : delete this;
44 0 : }
45 : };
46 :
47 : std::mutex mutex_;
48 : std::condition_variable_any cv_;
49 : detail::intrusive_queue<work> q_;
50 : std::vector<std::thread> threads_;
51 : std::atomic<bool> stop_requested_{false};
52 : std::size_t num_threads_;
53 : std::once_flag start_flag_;
54 :
55 : public:
56 51 : ~impl()
57 : {
58 51 : stop();
59 82 : for (auto& t : threads_)
60 31 : if (t.joinable())
61 31 : t.join();
62 51 : threads_.clear();
63 :
64 51 : while(auto* w = q_.pop())
65 0 : w->destroy();
66 51 : }
67 :
68 : explicit
69 51 : impl(std::size_t num_threads)
70 51 : : num_threads_(num_threads)
71 : {
72 51 : if(num_threads_ == 0)
73 1 : num_threads_ = std::thread::hardware_concurrency();
74 51 : if(num_threads_ == 0)
75 0 : num_threads_ = 1;
76 51 : }
77 :
78 : void
79 118 : post(coro h)
80 : {
81 118 : ensure_started();
82 118 : auto* w = new work(h);
83 : {
84 118 : std::lock_guard<std::mutex> lock(mutex_);
85 118 : q_.push(w);
86 118 : }
87 118 : cv_.notify_one();
88 118 : }
89 :
90 : void
91 51 : stop() noexcept
92 : {
93 51 : stop_requested_.store(true);
94 51 : cv_.notify_all();
95 51 : }
96 :
97 : private:
98 : void
99 118 : ensure_started()
100 : {
101 118 : std::call_once(start_flag_, [this]{
102 19 : threads_.reserve(num_threads_);
103 50 : for(std::size_t i = 0; i < num_threads_; ++i)
104 62 : threads_.emplace_back([this]{ run(); });
105 19 : });
106 118 : }
107 :
108 : void
109 31 : run()
110 : {
111 : for(;;)
112 : {
113 149 : work* w = nullptr;
114 : {
115 149 : std::unique_lock<std::mutex> lock(mutex_);
116 361 : cv_.wait(lock, [this]{ return !q_.empty() || stop_requested_.load(); });
117 149 : if (stop_requested_.load() && q_.empty())
118 62 : return;
119 118 : w = q_.pop();
120 149 : }
121 118 : w->run();
122 118 : }
123 : }
124 : };
125 :
126 : //------------------------------------------------------------------------------
127 :
128 51 : thread_pool::
129 : ~thread_pool()
130 : {
131 51 : shutdown();
132 51 : destroy();
133 51 : delete impl_;
134 51 : }
135 :
136 51 : thread_pool::
137 51 : thread_pool(std::size_t num_threads)
138 51 : : impl_(new impl(num_threads))
139 : {
140 51 : }
141 :
142 : void
143 0 : thread_pool::
144 : stop() noexcept
145 : {
146 0 : impl_->stop();
147 0 : }
148 :
149 : //------------------------------------------------------------------------------
150 :
151 : void
152 118 : thread_pool::executor_type::
153 : post(coro h) const
154 : {
155 118 : pool_->impl_->post(h);
156 118 : }
157 :
158 : } // capy
159 : } // boost
|