diff --git a/src/display_layout/display_layout.cpp b/src/display_layout/display_layout.cpp index 43fec673..dfc210df 100644 --- a/src/display_layout/display_layout.cpp +++ b/src/display_layout/display_layout.cpp @@ -55,6 +55,9 @@ void init_options(sharg::parser & parser, config & cfg) .description = "The output. ", .required = true, .validator = sharg::output_file_validator{}}); + parser.add_option( + cfg.threads, + sharg::config{.short_id = '\0', .long_id = "threads", .description = "The number of threads to use."}); } void parse(sharg::parser & parser) diff --git a/src/display_layout/general.cpp b/src/display_layout/general.cpp index 94133985..42656cbd 100644 --- a/src/display_layout/general.cpp +++ b/src/display_layout/general.cpp @@ -6,7 +6,6 @@ // --------------------------------------------------------------------------------------------------- #include -#include #include @@ -56,7 +55,7 @@ int execute(config const & cfg) std::ifstream layout_file{cfg.input}; if (!layout_file.good() || !layout_file.is_open()) - throw std::logic_error{"Could not open file " + cfg.input.string() + " for reading"}; // GCOVR_EXCL_LINE + throw std::logic_error{"Could not open file " + cfg.input.string() + " for reading"}; auto [filenames, chopper_config, hibf_layout] = chopper::layout::read_layout_file(layout_file); auto const & hibf_config = chopper_config.hibf_config; @@ -64,7 +63,7 @@ int execute(config const & cfg) std::ofstream output_stream{cfg.output}; if (!output_stream.good() || !output_stream.is_open()) - throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"}; // GCOVR_EXCL_LINE + throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"}; // Fetch all file sizes such that sorting by file size doesn't have to access the filesystem too often. // n = filenames.size() diff --git a/src/display_layout/shared.hpp b/src/display_layout/shared.hpp index 26713e2a..55122c1d 100644 --- a/src/display_layout/shared.hpp +++ b/src/display_layout/shared.hpp @@ -18,6 +18,7 @@ struct config { std::filesystem::path input{}; std::filesystem::path output{}; + uint8_t threads{1u}; }; void execute_general(config const & cfg); diff --git a/src/display_layout/sizes.cpp b/src/display_layout/sizes.cpp index 7e0e603e..a90df1c8 100644 --- a/src/display_layout/sizes.cpp +++ b/src/display_layout/sizes.cpp @@ -5,8 +5,8 @@ // shipped with this file and also available at: https://github.com/seqan/chopper/blob/main/LICENSE.md // --------------------------------------------------------------------------------------------------- +#include #include -#include #include @@ -14,10 +14,6 @@ #include #include -#include -#include - -#include #include #include @@ -55,21 +51,16 @@ size_t compute_ibf_size(robin_hood::unordered_flat_set & parent_kmers, size_t const number_of_bins, seqan::hibf::layout::graph::node const & ibf_node, seqan::hibf::build::build_data & data, - size_t current_hibf_level) + size_t const current_hibf_level) { - size_t const kmers_per_bin{static_cast(std::ceil(static_cast(kmers.size()) / number_of_bins))}; - double const bin_bits{ - static_cast(seqan::hibf::build::bin_size_in_bits({.fpr = data.config.maximum_false_positive_rate, - .hash_count = data.config.number_of_hash_functions, - .elements = kmers_per_bin}))}; - seqan::hibf::bin_size const bin_size{ - static_cast(std::ceil(bin_bits * data.fpr_correction[number_of_bins]))}; - seqan::hibf::bin_count const bin_count{ibf_node.number_of_technical_bins}; - - size_t const bin_words = (bin_count.value + 63) >> 6; // = ceil(bins/64) - size_t const technical_bins = bin_words << 6; // = bin_words * 64 + size_t const kmers_per_bin = std::ceil(static_cast(kmers.size()) / number_of_bins); + size_t const bin_size = + std::ceil(seqan::hibf::build::bin_size_in_bits({.fpr = data.config.maximum_false_positive_rate, + .hash_count = data.config.number_of_hash_functions, + .elements = kmers_per_bin}) + * data.fpr_correction[number_of_bins]); - size_t const ibf_size = technical_bins * bin_size.value; + size_t const ibf_size = ibf_node.number_of_technical_bins * bin_size; if (current_hibf_level > 0 /* not top level */) update_parent_kmers(parent_kmers, kmers); @@ -81,7 +72,7 @@ size_t hierarchical_stats(stats & stats_data, robin_hood::unordered_flat_set & parent_kmers, seqan::hibf::layout::graph::node const & current_node, seqan::hibf::build::build_data & data, - size_t current_hibf_level) + size_t const current_hibf_level) { size_t const ibf_pos{data.request_ibf_idx()}; std::vector size_waste_per_tb(current_node.number_of_technical_bins); @@ -117,18 +108,12 @@ size_t hierarchical_stats(stats & stats_data, kmers.clear(); // reduce memory peak // parse all other children (merged bins) of the current ibf - auto loop_over_children = [&]() + if (!current_node.children.empty()) { - if (current_node.children.empty()) - return; - - std::vector children = current_node.children; // copy for threads - - size_t const number_of_mutex = (current_node.number_of_technical_bins + 63) / 64; - std::vector local_ibf_mutex(number_of_mutex); + std::mutex merge_kmer_mutex{}; size_t number_of_threads{}; - std::vector indices(children.size()); + std::vector indices(current_node.children.size()); std::iota(indices.begin(), indices.end(), size_t{}); // We do not want to process the favourite child. It has already been processed prior. @@ -150,27 +135,21 @@ size_t hierarchical_stats(stats & stats_data, #pragma omp parallel for schedule(dynamic, 1) num_threads(number_of_threads) for (auto && index : indices) { - auto & child = children[index]; + auto & child = current_node.children[index]; robin_hood::unordered_flat_set kmers{}; - hierarchical_stats(stats_data, kmers, child, data, current_hibf_level + 1); - auto parent_bin_index = child.parent_bin_index; + hierarchical_stats(stats_data, kmers, child, data, current_hibf_level + 1u); + if (current_hibf_level > 0 /* not top level */) { - size_t const mutex_id{parent_bin_index / 64}; - std::lock_guard guard{local_ibf_mutex[mutex_id]}; - - if (current_hibf_level > 0 /* not top level */) - update_parent_kmers(parent_kmers, kmers); - - assert(amount_of_max_bin_kmers >= kmers.size()); - size_waste_per_tb[parent_bin_index] = - amount_of_max_bin_kmers - std::min(amount_of_max_bin_kmers, kmers.size()); + std::lock_guard guard{merge_kmer_mutex}; + update_parent_kmers(parent_kmers, kmers); } - } - }; - loop_over_children(); + size_waste_per_tb[child.parent_bin_index] = + amount_of_max_bin_kmers - std::min(amount_of_max_bin_kmers, kmers.size()); + } + } // If max bin was a merged bin, process all remaining records, otherwise the first one has already been processed size_t const start{(current_node.favourite_child_idx.has_value()) ? 0u : 1u}; @@ -180,22 +159,9 @@ size_t hierarchical_stats(stats & stats_data, compute_kmers(kmers, data, record); - size_t const kmers_per_tb = kmers.size() / record.number_of_technical_bins; + size_t const kmers_per_tb = kmers.size() / record.number_of_technical_bins + 1u; size_t const diff = amount_of_max_bin_kmers - std::min(amount_of_max_bin_kmers, kmers_per_tb); - if (diff > amount_of_max_bin_kmers) - throw std::runtime_error{"Wrong? diff:" + std::to_string(diff) - + " amount_of_max_bin_kmers:" + std::to_string(amount_of_max_bin_kmers)}; - - // size_t const kmers_per_tb_corrected = kmers_per_tb - (0.03 * kmers_per_tb); - // if (amount_of_max_bin_kmers < kmers_per_tb_corrected) - // { - // throw std::runtime_error{"amount_of_max_bin_kmers:" + std::to_string(amount_of_max_bin_kmers) - // + " was smaller than current size: " + std::to_string(kmers.size()) + "/" - // + std::to_string(record.number_of_technical_bins) + "=" - // + std::to_string(kmers.size() / record.number_of_technical_bins)}; - // } - assert(size_waste_per_tb.size() >= record.storage_TB_id + record.number_of_technical_bins); for (size_t i = record.storage_TB_id; i < record.storage_TB_id + record.number_of_technical_bins; ++i) size_waste_per_tb[i] = diff; @@ -208,12 +174,12 @@ size_t hierarchical_stats(stats & stats_data, stats_data.ibf_sizes[ibf_pos] = ibf_size; stats_data.ibf_levels[ibf_pos] = current_hibf_level; + // compute load factor size_t const waste = std::accumulate(size_waste_per_tb.begin(), size_waste_per_tb.end(), size_t{}); size_t const total = amount_of_max_bin_kmers * size_waste_per_tb.size(); - if (waste > total) - throw std::runtime_error{"Wrong? waste:" + std::to_string(waste) + " total:" + std::to_string(total)}; + assert(waste <= total); stats_data.ibf_load_factor[ibf_pos] = (total - waste) / static_cast(total); @@ -233,9 +199,10 @@ void execute_general_stats(config const & cfg) std::ifstream layout_file{cfg.input}; if (!layout_file.good() || !layout_file.is_open()) - throw std::logic_error{"Could not open file " + cfg.input.string() + " for reading"}; // GCOVR_EXCL_LINE + throw std::logic_error{"Could not open file " + cfg.input.string() + " for reading"}; auto [filenames, chopper_config, hibf_layout] = chopper::layout::read_layout_file(layout_file); + chopper_config.hibf_config.threads = cfg.threads; auto input_lambda = [&filenames, &chopper_config](size_t const user_bin_id, seqan::hibf::insert_iterator it) { @@ -257,14 +224,14 @@ void execute_general_stats(config const & cfg) std::ofstream output_stream{cfg.output}; if (!output_stream.good() || !output_stream.is_open()) - throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"}; // GCOVR_EXCL_LINE + throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"}; size_t const number_of_ibfs = hibf_layout.max_bins.size() + 1u; stats stats_data; stats_data.ibf_sizes.resize(number_of_ibfs); stats_data.ibf_levels.resize(number_of_ibfs); - stats_data.ibf_load_factor.resize(number_of_ibfs, 1.0); + stats_data.ibf_load_factor.resize(number_of_ibfs); seqan::hibf::build::build_data data{.config = hibf_config, .ibf_graph = {hibf_layout}}; @@ -280,9 +247,9 @@ void execute_general_stats(config const & cfg) // accumulate statistics per level size_t const number_of_levels = std::ranges::max(stats_data.ibf_levels) + 1u; - std::vector size_per_level(number_of_levels, 0u); - std::vector load_factor_per_level(number_of_levels, 0.0); - std::vector num_ibfs_per_level(number_of_levels, 0u); + std::vector size_per_level(number_of_levels); + std::vector load_factor_per_level(number_of_levels); + std::vector num_ibfs_per_level(number_of_levels); for (size_t i = 0; i < stats_data.ibf_sizes.size(); ++i) { @@ -295,11 +262,12 @@ void execute_general_stats(config const & cfg) for (size_t i = 0; i < number_of_levels; ++i) load_factor_per_level[i] /= num_ibfs_per_level[i]; - output_stream << "#Levels: " << number_of_levels << '\n'; - output_stream << "LEVEL-IDX\tSIZE-IN-BITS\tNUMBER-OF-IBFS\tLOAD-FACTOR-IN-%\n"; + output_stream << std::fixed << std::setprecision(2); + output_stream << "# Levels: " << number_of_levels << '\n'; + output_stream << "LEVEL\tBIT_SIZE\tIBFS\tLOAD_FACTOR\n"; for (size_t i = 0; i < number_of_levels; ++i) output_stream << i << '\t' << size_per_level[i] << '\t' << num_ibfs_per_level[i] << '\t' - << load_factor_per_level[i] << '\n'; + << load_factor_per_level[i] * 100 << '\n'; } void execute_sizes(config const & cfg)