DiSMEC++
runner.cpp
Go to the documentation of this file.
1 // Copyright (c) 2021, Aalto University, developed by Erik Schultheis
2 // All rights reserved.
3 //
4 // SPDX-License-Identifier: MIT
5 
6 #include <algorithm>
7 #include <thread>
8 #include <atomic>
9 #include "config.h"
10 #include "parallel/runner.h"
11 #include "parallel/task.h"
12 #include "parallel/numa.h"
13 #include "utils/conversion.h"
14 
15 using namespace dismec;
16 using namespace dismec::parallel;
17 
18 ParallelRunner::ParallelRunner(long num_threads, long chunk_size) :
19  m_NumThreads(num_threads), m_ChunkSize(chunk_size),
20  m_TimeLimit(std::numeric_limits<std::chrono::milliseconds::rep>::max()) {
21 
22 }
23 
24 void ParallelRunner::set_chunk_size(long chunk_size) {
25  m_ChunkSize = chunk_size;
26 }
27 
28 void ParallelRunner::set_logger(std::shared_ptr<spdlog::logger> logger) {
29  m_Logger = std::move(logger);
30 }
31 
32 namespace {
33  template<class T>
34  auto to_ms(T&& arg) {
35  return std::chrono::duration_cast<std::chrono::milliseconds>(arg);
36  }
37 }
38 
40  using std::chrono::milliseconds;
41  using std::chrono::steady_clock;
42 
43  long num_threads = m_NumThreads;
44  if(num_threads <= 0) {
45  num_threads = to_long(std::thread::hardware_concurrency());
46  }
47  if(num_threads > 2*std::thread::hardware_concurrency() + 1) {
48  spdlog::warn("You have specified many more threads ({}) than your hardware appears to support ({}). Number"
49  "of threads has been capped at hardware concurrency.",
50  num_threads, std::thread::hardware_concurrency());
51  num_threads = static_cast<long>(std::thread::hardware_concurrency());
52  }
53 
54  long num_tasks = tasks.num_tasks() - start;
55  long num_chunks = num_tasks / m_ChunkSize;
56  if(num_tasks % m_ChunkSize != 0) {
57  num_chunks += 1;
58  }
59  num_threads = std::min(num_threads, num_chunks);
60 
61  std::atomic<std::size_t> cpu_time{0};
62 
63  // we need an atomic counter to make sure that all sub-problems are touched exactly once
64  std::atomic<long> sub_counter{0};
65  // long sub_counter = start;
66  // nothing we can do if the counter isn't lock free, but notify the user.
67  if(!sub_counter.is_lock_free()) {
68  spdlog::warn("Counter implementation is not lock-free. This might result in degraded performance in case of many threads");
69  }
70 
71  auto start_time = steady_clock::now();
72 
73  std::vector<std::thread> workers;
74  workers.reserve(num_threads);
75  if(m_Logger)
76  m_Logger->info("spawning {} threads to run {} tasks", num_threads, num_tasks);
77 
78  tasks.prepare(num_threads, m_ChunkSize);
79  ThreadDistributor distribute(num_threads, m_Logger);
80 
81  for(int thread = 0; thread < num_threads; ++thread) {
82  workers.emplace_back([&, thread_id=thread_id_t(thread)]()
83  {
84  if(m_BindThreads) {
85  distribute.pin_this_thread(thread_id);
86  }
87 
88  tasks.init_thread(thread_id);
89 
90  while(to_ms(steady_clock::now() - start_time) < m_TimeLimit) {
91  // get a new sub-problem
92  // see also https://stackoverflow.com/questions/41206861/atomic-increment-and-return-counter
93  long search_pos = sub_counter++;
94  if(search_pos >= num_chunks) {
95  return;
96  }
97 
98  auto task_start_time = steady_clock::now();
99 
100  long begin_task = search_pos * m_ChunkSize + start;
101  long end_task = std::min((search_pos + 1) * m_ChunkSize, (long)num_tasks) + start;
102 
103  log_start(begin_task, end_task);
104  tasks.run_tasks(begin_task, end_task, thread_id);
105  log_finished(begin_task, end_task);
106 
107  cpu_time.fetch_add( to_ms(steady_clock::now() - task_start_time).count());
108  }
109  });
110  }
111 
112  // OK, now we just have to wait for the threads to finnish
113  for(auto& t : workers) {
114  t.join();
115  }
116 
117  tasks.finalize();
118 
119  auto wall_time = to_ms(steady_clock::now() - start_time);
120 
121  if(m_Logger) {
122  if(sub_counter >= num_chunks) {
123  m_Logger->info("Threads finished after {}s (per thread {}s).", wall_time.count() / 1000,
124  cpu_time / 1000 / num_threads);
125  } else {
126  m_Logger->info("Computation timeout ({}s) reached after {} tasks ({}s -- {}s per thread)",
127  m_TimeLimit.count() / 1000,
128  sub_counter, wall_time.count() / 1000, cpu_time / 1000 / num_threads);
129  }
130  }
131 
132  // display a warning if threads need to get new work more than every 5 ms.
133  if((cpu_time * m_ChunkSize) / num_tasks < MIN_TIME_PER_CHUNK_MS) {
134  spdlog::warn("The average time per chunk of work is only {}µs, consider increasing chunk size (currently {}) to "
135  "reduce parallelization overhead.", (1000 * cpu_time * m_ChunkSize) / num_tasks, m_ChunkSize);
136  }
137 
138  return {sub_counter >= num_chunks, sub_counter * m_ChunkSize + start,
139  std::chrono::duration_cast<std::chrono::seconds>(wall_time)};
140 }
141 
142 void ParallelRunner::log_start(long begin, long end) {
143  if(!m_Logger) return;
144  if(begin == end - 1) {
145  m_Logger->trace("Starting task {}", begin);
146  } else {
147  m_Logger->trace("Starting tasks {}-{}", begin, end-1);
148  }
149 }
150 
151 void ParallelRunner::log_finished(long begin, long end) {
152  if(!m_Logger) return;
153  if(begin == end - 1) {
154  m_Logger->trace("Finished task {}", begin);
155  } else {
156  m_Logger->trace("Finished tasks {}-{}", begin, end-1);
157  }
158 }
159 
160 void ParallelRunner::set_time_limit(std::chrono::milliseconds time_limit) {
161  if(time_limit.count() <= 0) {
162  m_TimeLimit = std::chrono::milliseconds(std::numeric_limits<std::chrono::milliseconds::rep>::max());
163  } else {
164  m_TimeLimit = time_limit;
165  }
166 }
167 
168 #include "doctest.h"
169 
170 namespace {
172  DummyTask() : check(10000, 0) {
173 
174  }
175  void run_tasks(long begin, long end, thread_id_t thread_id) override {
176  for(long t = begin; t < end; ++t) {
177  check.at(t) += 1;
178  if(do_work) {
179  std::this_thread::sleep_for(std::chrono::milliseconds(10));
180  }
181  }
182  }
183  [[nodiscard]] long num_tasks() const override {
184  return check.size();
185  }
186 
187  std::vector<int> check;
188  bool do_work=false;
189  };
190 }
191 
192 TEST_CASE("run parallel") {
193  ParallelRunner runner{-1};
194  DummyTask task;
195  auto res = runner.run(task);
196  REQUIRE(res.IsFinished);
197 
198  // make sure each task ran exactly once
199  for(int s = 0; s < ssize(task.check); ++s) {
200  REQUIRE_MESSAGE(task.check[s] == 1, "error at index " << s);
201  }
202 }
203 
204 TEST_CASE("run chunked parallel with start pos")
205 {
206  ParallelRunner runner{-1, 32};
207  DummyTask task;
208  auto res = runner.run(task, 5);
209  REQUIRE(res.IsFinished);
210 
211  // make sure that skipped tasks are not run, but all others are
212  for(int s = 0; s < 5; ++s) {
213  REQUIRE(task.check[s] == 0);
214  }
215  for(int s = 5; s < ssize(task.check); ++s) {
216  REQUIRE_MESSAGE(task.check[s] == 1, "error at index " << s);
217  }
218 }
219 
220 TEST_CASE("run parallel with timeout") {
221  ParallelRunner runner{-1, 16};
222  DummyTask task;
223  task.do_work = true;
224  runner.set_time_limit(std::chrono::milliseconds(50));
225  auto res = runner.run(task, 5);
226  REQUIRE_FALSE(res.IsFinished);
227 
228  // check that NextTask correctly identifies until where we have done our work
229  for(int s = 5; s < res.NextTask; ++s) {
230  REQUIRE(task.check[s] == 1);
231  }for(int s = res.NextTask; s < ssize(task.check); ++s) {
232  REQUIRE(task.check[s] == 0);
233  }
234 }
235 
236 // TODO check chunks, starts etc
void set_logger(std::shared_ptr< spdlog::logger > logger)
sets the logger object that is used for reporting. Set to nullptr for quiet mode.
Definition: runner.cpp:28
std::shared_ptr< spdlog::logger > m_Logger
Definition: runner.h:53
void log_start(long begin, long end)
Definition: runner.cpp:142
void set_time_limit(std::chrono::milliseconds time_limit)
Definition: runner.cpp:160
RunResult run(TaskGenerator &tasks, long start=0)
Definition: runner.cpp:39
void set_chunk_size(long chunk_size)
Definition: runner.cpp:24
ParallelRunner(long num_threads, long chunk_size=1)
Definition: runner.cpp:18
std::chrono::milliseconds m_TimeLimit
Definition: runner.h:52
void log_finished(long begin, long end)
Definition: runner.cpp:151
Base class for all parallelized operations.
Definition: task.h:21
virtual void finalize()
Called after all threads have finished their tasks.
Definition: task.h:59
virtual void run_tasks(long begin, long end, thread_id_t thread_id)=0
virtual void prepare(long num_threads, long chunk_size)
Called to notify the TaskGenerator about the number of threads.
Definition: task.h:45
virtual long num_tasks() const =0
virtual void init_thread(thread_id_t thread_id)
Called once a thread has spun up, but before it runs its first task.
Definition: task.h:51
This class helps with distributing threads to the different CPU cores.
Definition: numa.h:118
void pin_this_thread(thread_id_t thread_id)
Definition: numa.cpp:265
Strong typedef for an int to signify a thread id.
Definition: thread_id.h:20
Defines configuration variables.
Main namespace in which all types, classes, and functions are defined.
Definition: app.h:15
constexpr auto ssize(const C &c) -> std::common_type_t< std::ptrdiff_t, std::make_signed_t< decltype(c.size())>>
signed size free function. Taken from https://en.cppreference.com/w/cpp/iterator/size
Definition: conversion.h:42
constexpr long to_long(T value)
Convert the given value to long, throwing an error if the conversion is not possible.
Definition: conversion.h:14
constexpr const int MIN_TIME_PER_CHUNK_MS
If the time needed per chunk of work is less than this, we display a warning.
Definition: config.h:28
TEST_CASE("run parallel")
Definition: runner.cpp:192
void run_tasks(long begin, long end, thread_id_t thread_id) override
Definition: runner.cpp:175