DiSMEC++
numa.cpp
Go to the documentation of this file.
1 // Copyright (c) 2021, Aalto University, developed by Erik Schultheis
2 // All rights reserved.
3 //
4 // SPDX-License-Identifier: MIT
5 
6 #include "parallel/numa.h"
7 #include "spdlog/spdlog.h"
8 #include "spdlog/fmt/fmt.h"
9 #include "utils/throw_error.h"
10 #include "config.h"
11 #include <numa.h>
12 #include <numaif.h>
13 #include <thread>
14 #include <numeric>
15 #include <fstream>
16 
17 namespace parallel = dismec::parallel;
18 using namespace parallel;
19 
20 // https://stackoverflow.com/questions/61454437/programmatically-get-accurate-cpu-cache-hierarchy-information-on-linux
21 
22 namespace {
24  // `numa_max_node()` gives the highest index -- so to have the correct count, we need to add one
25  return numa_max_node() + 1;
26  }
27 
29  int cpu = sched_getcpu();
30  return numa_node_of_cpu(cpu);
31  }
32 
33  int lookup_numa_node(const void* ptr) {
34  int numa_node = -1;
35  get_mempolicy(&numa_node, nullptr, 0, const_cast<void*>(ptr), MPOL_F_NODE | MPOL_F_ADDR); // NOLINT(cppcoreguidelines-pro-type-const-cast)
36  return numa_node;
37  }
38 
39  template<class F>
40  void handle_cpu_set(std::fstream& source, F&& action) {
41  std::string buffer;
42  std::getline(source, buffer);
43  bitmask* shared = numa_parse_cpustring_all(buffer.c_str());
44  for(int i = 0; i < numa_num_possible_cpus(); ++i) {
45  if (numa_bitmask_isbitset(shared, i)) {
46  action(cpu_id_t{i});
47  }
48  }
49  numa_free_cpumask(shared);
50  }
51 
52  template<class F>
53  void for_each_sibling(cpu_id_t core, F&& action) {
54  std::string file_name = fmt::format("/sys/devices/system/cpu/cpu{}/topology/thread_siblings_list",
55  core.to_index());
56  std::fstream topology_file(file_name, std::fstream::in);
57  if(!topology_file.is_open()) {
58  spdlog::error("Could not open topology file '{}'", file_name);
59  }
60  handle_cpu_set(topology_file, std::forward<F>(action));
61  }
62 
63  template<class F>
64  void for_each_shared_cache_sibling(cpu_id_t core, F&& action) {
65  constexpr const int MAX_CACHE_LEVELS = 10;
66  for(int index = 0; index < MAX_CACHE_LEVELS; ++index) {
67  std::string file_name = fmt::format("/sys/devices/system/cpu/cpu{}/cache/index{}/shared_cpu_list",
68  core.to_index(), index);
69  std::fstream topology_file(file_name, std::fstream::in);
70  if (!topology_file.is_open()) {
71  return; // ok, we've handled the last cache
72  }
73  handle_cpu_set(topology_file, std::forward<F>(action));
74  }
75  }
76 }
77 
78 void parallel::pin_to_data(const void* data) {
79  int target_node = lookup_numa_node(data);
80  errno = 0;
81  if(numa_run_on_node(target_node) == -1) {
82  spdlog::error("Error pinning thread {} to node {}: {}", pthread_self(), target_node, strerror(errno));
83  }
84 }
85 
86 
87 NUMAReplicatorBase::NUMAReplicatorBase() : m_HasNUMA(numa_available() >= 0)
88 {
89 }
90 
92  if(m_HasNUMA) {
93  m_Copies.clear();
94  m_Copies.resize(numa_node_count());
95 
96  for(int i = 0; i < numa_node_count(); ++i) {
97  if(numa_bitmask_isbitset(numa_all_nodes_ptr, i) == 0) {
98  spdlog::warn("NUMA node {} is disabled, no local data copy was created", i);
99  continue;
100  }
101  int current_preferred = numa_preferred();
102  numa_set_preferred(i);
103 
104  m_Copies.at(i) = get_clone();
105  numa_set_preferred(current_preferred);
106  }
107  }
108 }
109 
110 const std::any& NUMAReplicatorBase::access_local() const {
111  if(m_HasNUMA) {
112  return m_Copies.at(current_numa_node());
113  } else {
114  static std::any empty;
115  return empty;
116  }
117 }
118 
119 namespace {
120  // sets the cpu affinity of the current thread to i
122  // first, we set the current thread to CPU `i`
123  cpu_set_t cpuset;
124  CPU_ZERO(&cpuset);
125  CPU_SET(cpu.to_index(), &cpuset);
126  int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
127 
128  if (rc != 0) {
129  spdlog::error("Error fixing thread {} to core {}: {}\n", pthread_self(), cpu.to_index(), strerror(rc));
130  return false;
131  }
132  return true;
133  }
134 
135  class NodeData {
136  public:
137  explicit NodeData(int id) : m_ID(id) {
138  }
139 
140  [[nodiscard]] numa_node_id_t get_id() const {
141  return m_ID;
142  }
143 
144  void add_cpu(cpu_id_t cpu_id) {
145  m_CPUs.push_back(cpu_id);
146  m_LoadIndicator.push_back(0);
147  }
148 
149  [[nodiscard]] bool empty() const {
150  return m_CPUs.empty();
151  }
152 
153  [[nodiscard]] int num_threads() const {
154  return NumThreads;
155  }
156 
157  [[nodiscard]] int max_load() const {
158  auto max_el = std::max_element(begin(m_LoadIndicator), end(m_LoadIndicator));
159  return *max_el;
160  }
161 
162  [[nodiscard]] cpu_id_t place_thread() {
163  auto min_core = std::min_element(begin(m_LoadIndicator), end(m_LoadIndicator));
164  long index = std::distance(begin(m_LoadIndicator), min_core);
165  m_LoadIndicator[index] += COST_PLACE_THREAD;
166  for_each_sibling(m_CPUs[index], [&](cpu_id_t sibling) {
167  auto found = std::find(begin(m_CPUs), end(m_CPUs), sibling);
168  if(found == end(m_CPUs)) {
169  return; // I guess this means that we are not allowed to run on the sibling, so this is fine
170  }
171  *(begin(m_LoadIndicator) + std::distance(begin(m_CPUs), found)) += COST_PLACE_HYPER_THREAD;
172  });
173 
174  for_each_shared_cache_sibling(m_CPUs[index], [&](cpu_id_t sibling) {
175  auto found = std::find(begin(m_CPUs), end(m_CPUs), sibling);
176  if(found == end(m_CPUs)) {
177  return; // I guess this means that we are not allowed to run on the sibling, so this is fine
178  }
179  *(begin(m_LoadIndicator) + std::distance(begin(m_CPUs), found)) += 1;
180  });
181 
182  ++NumThreads;
183  return m_CPUs[index];
184  }
185  private:
187  std::vector<cpu_id_t> m_CPUs;
193  std::vector<int> m_LoadIndicator;
194 
195  int NumThreads = 0;
196  };
197 
206  std::vector<NodeData> get_available_nodes() {
207  std::vector<NodeData> nodes_avail;
208  // first, we check which CPU indices are available
209  if(numa_available() >= 0) {
210  for(int i = 0; i < numa_num_possible_nodes(); ++i) {
211  nodes_avail.emplace_back(i);
212  }
213  for(int i = 0; i < numa_num_possible_cpus(); ++i) {
214  if (numa_bitmask_isbitset(numa_all_cpus_ptr, i) != 0) {
215  // OK, CPU is available
216  int node = numa_node_of_cpu(i);
217  if(numa_bitmask_isbitset(numa_all_nodes_ptr, node) == 0) {
218  spdlog::warn("Node {} of CPU {} is not available.", node, i);
219  }
220  nodes_avail.at(node).add_cpu(cpu_id_t{i});
221  }
222  }
223 
224  } else {
225  // if we don't have numa available, assume all CPUs are available and correspond to node 0
226  nodes_avail.emplace_back(0);
227  for(unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
228  nodes_avail.at(0).add_cpu(cpu_id_t{i});
229  }
230  }
231 
232  // remove all NUMA nodes for which we don't have any CPUs
233  nodes_avail.erase(std::remove_if(begin(nodes_avail), end(nodes_avail),
234  [](auto&& d){ return d.empty(); }),
235  end(nodes_avail));
236  return nodes_avail;
237  }
238 }
239 
240 ThreadDistributor::ThreadDistributor(long num_threads, std::shared_ptr<spdlog::logger> logger) :
241  m_Logger(std::move(logger))
242 {
243  if(num_threads < 0) {
244  THROW_EXCEPTION(std::invalid_argument, "Negative number of threads {} given!", num_threads)
245  }
246  std::vector<NodeData> nodes_avail = get_available_nodes();
247 
248  if(m_Logger) {
249  m_Logger->info("Distributing {} threads to {} NUMA nodes.",
250  num_threads, nodes_avail.size());
251  }
252  while(m_TargetCPUs.size() < static_cast<std::size_t>(num_threads)) {
253  for(auto& node : nodes_avail) {
254  m_TargetCPUs.push_back(node.place_thread());
255  }
256  }
257 
258  if(m_Logger) {
259  for (auto& node : nodes_avail) {
260  m_Logger->info("Node {}: {} threads, load {}", node.get_id().to_index(), node.num_threads(), node.max_load());
261  }
262  }
263 }
264 
266  auto target_cpu = m_TargetCPUs.at(thread_id.to_index());
267  if(!set_thread_affinity(target_cpu)) {
268  throw std::runtime_error(fmt::format("Could not pin thread {} to CPU {}",
269  pthread_self(), target_cpu.to_index()));
270  }
271 
272  // if we successfully pinned a thread to a CPU, we also pin its memory allocation to the corresponding
273  // NUMA node
274  if(numa_available() >= 0) {
275  numa_set_localalloc();
276  }
277 
278  if(m_Logger) {
279  m_Logger->info("Pinned thread {} ({}) to Core {} on Node {}",
280  thread_id.to_index(), pthread_self(),
281  target_cpu.to_index(),
282  numa_available() >= 0 ? current_numa_node() : -1);
283  }
284 }
std::vector< cpu_id_t > m_CPUs
Vector of CPU ids that are on this NUMA node.
Definition: numa.cpp:187
std::vector< int > m_LoadIndicator
How much work have we placed on that CPU.
Definition: numa.cpp:193
void add_cpu(cpu_id_t cpu_id)
Definition: numa.cpp:144
numa_node_id_t get_id() const
Definition: numa.cpp:140
constexpr T to_index() const
! Explicitly convert to an integer.
Definition: opaque_int.h:32
bool m_HasNUMA
Whether NUMA functions are available.
Definition: numa.h:51
void make_copies()
Uses the get_clone() function of the implementing class to generate NUMA-local copies for each NUMA n...
Definition: numa.cpp:91
virtual std::any get_clone() const =0
This function needs to be implemented by a derived class.
std::vector< std::any > m_Copies
This vector contains one copy of data for each NUMA node.
Definition: numa.h:54
const std::any & access_local() const
Definition: numa.cpp:110
void pin_this_thread(thread_id_t thread_id)
Definition: numa.cpp:265
std::vector< cpu_id_t > m_TargetCPUs
List of CPUs to which the threads will be assigned.
Definition: numa.h:123
ThreadDistributor(long num_threads, std::shared_ptr< spdlog::logger >={})
Definition: numa.cpp:240
std::shared_ptr< spdlog::logger > m_Logger
Definition: numa.h:125
Strong typedef for an int to signify a (core of a) cpu.
Definition: thread_id.h:34
Strong typedef for an int to signify a numa domain.
Definition: thread_id.h:27
Strong typedef for an int to signify a thread id.
Definition: thread_id.h:20
Defines configuration variables.
void for_each_shared_cache_sibling(cpu_id_t core, F &&action)
Definition: numa.cpp:64
int lookup_numa_node(const void *ptr)
Definition: numa.cpp:33
void for_each_sibling(cpu_id_t core, F &&action)
Definition: numa.cpp:53
std::vector< NodeData > get_available_nodes()
Returns the list of available NUMA nodes.
Definition: numa.cpp:206
void handle_cpu_set(std::fstream &source, F &&action)
Definition: numa.cpp:40
bool set_thread_affinity(cpu_id_t cpu)
Definition: numa.cpp:121
constexpr const int COST_PLACE_THREAD
Load balancing cost for placing a thread on a core.
Definition: config.h:32
void pin_to_data(const void *data)
Pint the calling thread to the NUMA node on which data resides.
Definition: numa.cpp:78
constexpr const int COST_PLACE_HYPER_THREAD
Load balancing cost for placing a thread on a SMT shared core.
Definition: config.h:35
#define THROW_EXCEPTION(exception_type,...)
Definition: throw_error.h:16