7 #include "spdlog/spdlog.h" 
    8 #include "spdlog/fmt/fmt.h" 
   25         return numa_max_node() + 1;
 
   29         int cpu = sched_getcpu();
 
   30         return numa_node_of_cpu(cpu);
 
   35         get_mempolicy(&numa_node, 
nullptr, 0, 
const_cast<void*
>(ptr), MPOL_F_NODE | MPOL_F_ADDR); 
 
   42         std::getline(source, buffer);
 
   43         bitmask* shared = numa_parse_cpustring_all(buffer.c_str());
 
   44         for(
int i = 0; i < numa_num_possible_cpus(); ++i) {
 
   45             if (numa_bitmask_isbitset(shared, i)) {
 
   49         numa_free_cpumask(shared);
 
   54         std::string file_name = fmt::format(
"/sys/devices/system/cpu/cpu{}/topology/thread_siblings_list",
 
   56         std::fstream topology_file(file_name, std::fstream::in);
 
   57         if(!topology_file.is_open()) {
 
   58             spdlog::error(
"Could not open topology file '{}'", file_name);
 
   65         constexpr 
const int MAX_CACHE_LEVELS = 10;
 
   66         for(
int index = 0; index < MAX_CACHE_LEVELS; ++index) {
 
   67             std::string file_name = fmt::format(
"/sys/devices/system/cpu/cpu{}/cache/index{}/shared_cpu_list",
 
   69             std::fstream topology_file(file_name, std::fstream::in);
 
   70             if (!topology_file.is_open()) {
 
   81     if(numa_run_on_node(target_node) == -1) {
 
   82         spdlog::error(
"Error pinning thread {} to node {}: {}", pthread_self(), target_node, strerror(errno));
 
   97             if(numa_bitmask_isbitset(numa_all_nodes_ptr, i) == 0) {
 
   98                 spdlog::warn(
"NUMA node {} is disabled, no local data copy was created", i);
 
  101             int current_preferred = numa_preferred();
 
  102             numa_set_preferred(i);
 
  105             numa_set_preferred(current_preferred);
 
  114         static std::any empty;
 
  126         int rc = pthread_setaffinity_np(pthread_self(), 
sizeof(cpu_set_t), &cpuset);
 
  129             spdlog::error(
"Error fixing thread {} to core {}: {}\n", pthread_self(), cpu.
to_index(), strerror(rc));
 
  145             m_CPUs.push_back(cpu_id);
 
  146             m_LoadIndicator.push_back(0);
 
  150             return m_CPUs.empty();
 
  158             auto max_el = std::max_element(begin(m_LoadIndicator), end(m_LoadIndicator));
 
  163             auto min_core = std::min_element(begin(m_LoadIndicator), end(m_LoadIndicator));
 
  164             long index = std::distance(begin(m_LoadIndicator), min_core);
 
  167                 auto found = std::find(begin(m_CPUs), end(m_CPUs), sibling);
 
  168                 if(found == end(m_CPUs)) {
 
  175                 auto found = std::find(begin(m_CPUs), end(m_CPUs), sibling);
 
  176                 if(found == end(m_CPUs)) {
 
  179                 *(begin(m_LoadIndicator) + std::distance(begin(m_CPUs), found)) += 1;
 
  183             return m_CPUs[index];
 
  207         std::vector<NodeData> nodes_avail;
 
  209         if(numa_available() >= 0) {
 
  210             for(
int i = 0; i < numa_num_possible_nodes(); ++i) {
 
  211                 nodes_avail.emplace_back(i);
 
  213             for(
int i = 0; i < numa_num_possible_cpus(); ++i) {
 
  214                 if (numa_bitmask_isbitset(numa_all_cpus_ptr, i) != 0) {
 
  216                     int node = numa_node_of_cpu(i);
 
  217                     if(numa_bitmask_isbitset(numa_all_nodes_ptr, node) == 0) {
 
  218                         spdlog::warn(
"Node {} of CPU {} is not available.", node, i);
 
  220                     nodes_avail.at(node).add_cpu(
cpu_id_t{i});
 
  226             nodes_avail.emplace_back(0);
 
  227             for(
unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
 
  228                 nodes_avail.at(0).add_cpu(
cpu_id_t{i});
 
  233         nodes_avail.erase(std::remove_if(begin(nodes_avail), end(nodes_avail),
 
  234                                          [](
auto&& d){ 
return d.empty(); }),
 
  241         m_Logger(std::move(logger))
 
  243     if(num_threads < 0) {
 
  244         THROW_EXCEPTION(std::invalid_argument, 
"Negative number of threads {} given!", num_threads)
 
  249         m_Logger->info(
"Distributing {} threads to {} NUMA nodes.",
 
  250                        num_threads, nodes_avail.size());
 
  252     while(
m_TargetCPUs.size() < 
static_cast<std::size_t
>(num_threads)) {
 
  253         for(
auto& node : nodes_avail) {
 
  259         for (
auto& node : nodes_avail) {
 
  260             m_Logger->info(
"Node {}: {} threads, load {}", node.get_id().to_index(), node.num_threads(), node.max_load());
 
  268         throw std::runtime_error(fmt::format(
"Could not pin thread {} to CPU {}",
 
  269                                              pthread_self(), target_cpu.to_index()));
 
  274     if(numa_available() >= 0) {
 
  275         numa_set_localalloc();
 
  279         m_Logger->info(
"Pinned thread {} ({}) to Core {} on Node {}",
 
  280                        thread_id.
to_index(), pthread_self(),
 
  281                        target_cpu.to_index(),
 
std::vector< cpu_id_t > m_CPUs
Vector of CPU ids that are on this NUMA node.
std::vector< int > m_LoadIndicator
How much work have we placed on that CPU.
void add_cpu(cpu_id_t cpu_id)
numa_node_id_t get_id() const
constexpr T to_index() const
! Explicitly convert to an integer.
bool m_HasNUMA
Whether NUMA functions are available.
void make_copies()
Uses the get_clone() function of the implementing class to generate NUMA-local copies for each NUMA n...
virtual std::any get_clone() const =0
This function needs to be implemented by a derived class.
std::vector< std::any > m_Copies
This vector contains one copy of data for each NUMA node.
const std::any & access_local() const
void pin_this_thread(thread_id_t thread_id)
std::vector< cpu_id_t > m_TargetCPUs
List of CPUs to which the threads will be assigned.
ThreadDistributor(long num_threads, std::shared_ptr< spdlog::logger >={})
std::shared_ptr< spdlog::logger > m_Logger
Strong typedef for an int to signify a (core of a) cpu.
Strong typedef for an int to signify a numa domain.
Strong typedef for an int to signify a thread id.
Defines configuration variables.
void for_each_shared_cache_sibling(cpu_id_t core, F &&action)
int lookup_numa_node(const void *ptr)
void for_each_sibling(cpu_id_t core, F &&action)
std::vector< NodeData > get_available_nodes()
Returns the list of available NUMA nodes.
void handle_cpu_set(std::fstream &source, F &&action)
bool set_thread_affinity(cpu_id_t cpu)
constexpr const int COST_PLACE_THREAD
Load balancing cost for placing a thread on a core.
void pin_to_data(const void *data)
Pint the calling thread to the NUMA node on which data resides.
constexpr const int COST_PLACE_HYPER_THREAD
Load balancing cost for placing a thread on a SMT shared core.
#define THROW_EXCEPTION(exception_type,...)