7 #include "spdlog/spdlog.h"
8 #include "spdlog/fmt/fmt.h"
25 return numa_max_node() + 1;
29 int cpu = sched_getcpu();
30 return numa_node_of_cpu(cpu);
35 get_mempolicy(&numa_node,
nullptr, 0,
const_cast<void*
>(ptr), MPOL_F_NODE | MPOL_F_ADDR);
42 std::getline(source, buffer);
43 bitmask* shared = numa_parse_cpustring_all(buffer.c_str());
44 for(
int i = 0; i < numa_num_possible_cpus(); ++i) {
45 if (numa_bitmask_isbitset(shared, i)) {
49 numa_free_cpumask(shared);
54 std::string file_name = fmt::format(
"/sys/devices/system/cpu/cpu{}/topology/thread_siblings_list",
56 std::fstream topology_file(file_name, std::fstream::in);
57 if(!topology_file.is_open()) {
58 spdlog::error(
"Could not open topology file '{}'", file_name);
65 constexpr
const int MAX_CACHE_LEVELS = 10;
66 for(
int index = 0; index < MAX_CACHE_LEVELS; ++index) {
67 std::string file_name = fmt::format(
"/sys/devices/system/cpu/cpu{}/cache/index{}/shared_cpu_list",
69 std::fstream topology_file(file_name, std::fstream::in);
70 if (!topology_file.is_open()) {
81 if(numa_run_on_node(target_node) == -1) {
82 spdlog::error(
"Error pinning thread {} to node {}: {}", pthread_self(), target_node, strerror(errno));
97 if(numa_bitmask_isbitset(numa_all_nodes_ptr, i) == 0) {
98 spdlog::warn(
"NUMA node {} is disabled, no local data copy was created", i);
101 int current_preferred = numa_preferred();
102 numa_set_preferred(i);
105 numa_set_preferred(current_preferred);
114 static std::any empty;
126 int rc = pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
129 spdlog::error(
"Error fixing thread {} to core {}: {}\n", pthread_self(), cpu.
to_index(), strerror(rc));
145 m_CPUs.push_back(cpu_id);
146 m_LoadIndicator.push_back(0);
150 return m_CPUs.empty();
158 auto max_el = std::max_element(begin(m_LoadIndicator), end(m_LoadIndicator));
163 auto min_core = std::min_element(begin(m_LoadIndicator), end(m_LoadIndicator));
164 long index = std::distance(begin(m_LoadIndicator), min_core);
167 auto found = std::find(begin(m_CPUs), end(m_CPUs), sibling);
168 if(found == end(m_CPUs)) {
175 auto found = std::find(begin(m_CPUs), end(m_CPUs), sibling);
176 if(found == end(m_CPUs)) {
179 *(begin(m_LoadIndicator) + std::distance(begin(m_CPUs), found)) += 1;
183 return m_CPUs[index];
207 std::vector<NodeData> nodes_avail;
209 if(numa_available() >= 0) {
210 for(
int i = 0; i < numa_num_possible_nodes(); ++i) {
211 nodes_avail.emplace_back(i);
213 for(
int i = 0; i < numa_num_possible_cpus(); ++i) {
214 if (numa_bitmask_isbitset(numa_all_cpus_ptr, i) != 0) {
216 int node = numa_node_of_cpu(i);
217 if(numa_bitmask_isbitset(numa_all_nodes_ptr, node) == 0) {
218 spdlog::warn(
"Node {} of CPU {} is not available.", node, i);
220 nodes_avail.at(node).add_cpu(
cpu_id_t{i});
226 nodes_avail.emplace_back(0);
227 for(
unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
228 nodes_avail.at(0).add_cpu(
cpu_id_t{i});
233 nodes_avail.erase(std::remove_if(begin(nodes_avail), end(nodes_avail),
234 [](
auto&& d){
return d.empty(); }),
241 m_Logger(std::move(logger))
243 if(num_threads < 0) {
244 THROW_EXCEPTION(std::invalid_argument,
"Negative number of threads {} given!", num_threads)
249 m_Logger->info(
"Distributing {} threads to {} NUMA nodes.",
250 num_threads, nodes_avail.size());
252 while(
m_TargetCPUs.size() <
static_cast<std::size_t
>(num_threads)) {
253 for(
auto& node : nodes_avail) {
259 for (
auto& node : nodes_avail) {
260 m_Logger->info(
"Node {}: {} threads, load {}", node.get_id().to_index(), node.num_threads(), node.max_load());
268 throw std::runtime_error(fmt::format(
"Could not pin thread {} to CPU {}",
269 pthread_self(), target_cpu.to_index()));
274 if(numa_available() >= 0) {
275 numa_set_localalloc();
279 m_Logger->info(
"Pinned thread {} ({}) to Core {} on Node {}",
280 thread_id.
to_index(), pthread_self(),
281 target_cpu.to_index(),
std::vector< cpu_id_t > m_CPUs
Vector of CPU ids that are on this NUMA node.
std::vector< int > m_LoadIndicator
How much work have we placed on that CPU.
void add_cpu(cpu_id_t cpu_id)
numa_node_id_t get_id() const
constexpr T to_index() const
! Explicitly convert to an integer.
bool m_HasNUMA
Whether NUMA functions are available.
void make_copies()
Uses the get_clone() function of the implementing class to generate NUMA-local copies for each NUMA n...
virtual std::any get_clone() const =0
This function needs to be implemented by a derived class.
std::vector< std::any > m_Copies
This vector contains one copy of data for each NUMA node.
const std::any & access_local() const
void pin_this_thread(thread_id_t thread_id)
std::vector< cpu_id_t > m_TargetCPUs
List of CPUs to which the threads will be assigned.
ThreadDistributor(long num_threads, std::shared_ptr< spdlog::logger >={})
std::shared_ptr< spdlog::logger > m_Logger
Strong typedef for an int to signify a (core of a) cpu.
Strong typedef for an int to signify a numa domain.
Strong typedef for an int to signify a thread id.
Defines configuration variables.
void for_each_shared_cache_sibling(cpu_id_t core, F &&action)
int lookup_numa_node(const void *ptr)
void for_each_sibling(cpu_id_t core, F &&action)
std::vector< NodeData > get_available_nodes()
Returns the list of available NUMA nodes.
void handle_cpu_set(std::fstream &source, F &&action)
bool set_thread_affinity(cpu_id_t cpu)
constexpr const int COST_PLACE_THREAD
Load balancing cost for placing a thread on a core.
void pin_to_data(const void *data)
Pint the calling thread to the NUMA node on which data resides.
constexpr const int COST_PLACE_HYPER_THREAD
Load balancing cost for placing a thread on a SMT shared core.
#define THROW_EXCEPTION(exception_type,...)