19 m_NumThreads(num_threads), m_ChunkSize(chunk_size),
20 m_TimeLimit(std::numeric_limits<std::chrono::milliseconds::rep>::max()) {
35 return std::chrono::duration_cast<std::chrono::milliseconds>(arg);
40 using std::chrono::milliseconds;
41 using std::chrono::steady_clock;
44 if(num_threads <= 0) {
45 num_threads =
to_long(std::thread::hardware_concurrency());
47 if(num_threads > 2*std::thread::hardware_concurrency() + 1) {
48 spdlog::warn(
"You have specified many more threads ({}) than your hardware appears to support ({}). Number"
49 "of threads has been capped at hardware concurrency.",
50 num_threads, std::thread::hardware_concurrency());
51 num_threads =
static_cast<long>(std::thread::hardware_concurrency());
54 long num_tasks = tasks.
num_tasks() - start;
59 num_threads = std::min(num_threads, num_chunks);
61 std::atomic<std::size_t> cpu_time{0};
64 std::atomic<long> sub_counter{0};
67 if(!sub_counter.is_lock_free()) {
68 spdlog::warn(
"Counter implementation is not lock-free. This might result in degraded performance in case of many threads");
71 auto start_time = steady_clock::now();
73 std::vector<std::thread> workers;
74 workers.reserve(num_threads);
76 m_Logger->info(
"spawning {} threads to run {} tasks", num_threads, num_tasks);
81 for(
int thread = 0; thread < num_threads; ++thread) {
82 workers.emplace_back([&, thread_id=
thread_id_t(thread)]()
93 long search_pos = sub_counter++;
94 if(search_pos >= num_chunks) {
98 auto task_start_time = steady_clock::now();
100 long begin_task = search_pos *
m_ChunkSize + start;
101 long end_task = std::min((search_pos + 1) *
m_ChunkSize, (
long)num_tasks) + start;
104 tasks.
run_tasks(begin_task, end_task, thread_id);
107 cpu_time.fetch_add(
to_ms(steady_clock::now() - task_start_time).count());
113 for(
auto& t : workers) {
119 auto wall_time =
to_ms(steady_clock::now() - start_time);
122 if(sub_counter >= num_chunks) {
123 m_Logger->info(
"Threads finished after {}s (per thread {}s).", wall_time.count() / 1000,
124 cpu_time / 1000 / num_threads);
126 m_Logger->info(
"Computation timeout ({}s) reached after {} tasks ({}s -- {}s per thread)",
128 sub_counter, wall_time.count() / 1000, cpu_time / 1000 / num_threads);
134 spdlog::warn(
"The average time per chunk of work is only {}µs, consider increasing chunk size (currently {}) to "
138 return {sub_counter >= num_chunks, sub_counter *
m_ChunkSize + start,
139 std::chrono::duration_cast<std::chrono::seconds>(wall_time)};
144 if(begin == end - 1) {
145 m_Logger->trace(
"Starting task {}", begin);
147 m_Logger->trace(
"Starting tasks {}-{}", begin, end-1);
153 if(begin == end - 1) {
154 m_Logger->trace(
"Finished task {}", begin);
156 m_Logger->trace(
"Finished tasks {}-{}", begin, end-1);
161 if(time_limit.count() <= 0) {
162 m_TimeLimit = std::chrono::milliseconds(std::numeric_limits<std::chrono::milliseconds::rep>::max());
176 for(
long t = begin; t < end; ++t) {
179 std::this_thread::sleep_for(std::chrono::milliseconds(10));
195 auto res = runner.
run(task);
196 REQUIRE(res.IsFinished);
199 for(
int s = 0; s <
ssize(task.check); ++s) {
200 REQUIRE_MESSAGE(task.check[s] == 1,
"error at index " << s);
208 auto res = runner.
run(task, 5);
209 REQUIRE(res.IsFinished);
212 for(
int s = 0; s < 5; ++s) {
213 REQUIRE(task.check[s] == 0);
215 for(
int s = 5; s <
ssize(task.check); ++s) {
216 REQUIRE_MESSAGE(task.check[s] == 1,
"error at index " << s);
225 auto res = runner.run(task, 5);
226 REQUIRE_FALSE(res.IsFinished);
229 for(
int s = 5; s < res.NextTask; ++s) {
230 REQUIRE(task.check[s] == 1);
231 }
for(
int s = res.NextTask; s <
ssize(task.check); ++s) {
232 REQUIRE(task.check[s] == 0);
void set_logger(std::shared_ptr< spdlog::logger > logger)
sets the logger object that is used for reporting. Set to nullptr for quiet mode.
std::shared_ptr< spdlog::logger > m_Logger
void log_start(long begin, long end)
void set_time_limit(std::chrono::milliseconds time_limit)
RunResult run(TaskGenerator &tasks, long start=0)
void set_chunk_size(long chunk_size)
ParallelRunner(long num_threads, long chunk_size=1)
std::chrono::milliseconds m_TimeLimit
void log_finished(long begin, long end)
Base class for all parallelized operations.
virtual void finalize()
Called after all threads have finished their tasks.
virtual void run_tasks(long begin, long end, thread_id_t thread_id)=0
virtual void prepare(long num_threads, long chunk_size)
Called to notify the TaskGenerator about the number of threads.
virtual long num_tasks() const =0
virtual void init_thread(thread_id_t thread_id)
Called once a thread has spun up, but before it runs its first task.
This class helps with distributing threads to the different CPU cores.
void pin_this_thread(thread_id_t thread_id)
Strong typedef for an int to signify a thread id.
Defines configuration variables.
Main namespace in which all types, classes, and functions are defined.
constexpr auto ssize(const C &c) -> std::common_type_t< std::ptrdiff_t, std::make_signed_t< decltype(c.size())>>
signed size free function. Taken from https://en.cppreference.com/w/cpp/iterator/size
constexpr long to_long(T value)
Convert the given value to long, throwing an error if the conversion is not possible.
constexpr const int MIN_TIME_PER_CHUNK_MS
If the time needed per chunk of work is less than this, we display a warning.
TEST_CASE("run parallel")
void run_tasks(long begin, long end, thread_id_t thread_id) override
long num_tasks() const override