Skip to content

Commit

Permalink
more refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
eseiler committed Oct 7, 2023
1 parent 5686388 commit 7c2a7a3
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 71 deletions.
3 changes: 3 additions & 0 deletions src/display_layout/display_layout.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ void init_options(sharg::parser & parser, config & cfg)
.description = "The output. ",
.required = true,
.validator = sharg::output_file_validator{}});
parser.add_option(
cfg.threads,
sharg::config{.short_id = '\0', .long_id = "threads", .description = "The number of threads to use."});
}

void parse(sharg::parser & parser)
Expand Down
5 changes: 2 additions & 3 deletions src/display_layout/general.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
// ---------------------------------------------------------------------------------------------------

#include <iostream>
#include <set>

#include <robin_hood.h>

Expand Down Expand Up @@ -56,15 +55,15 @@ int execute(config const & cfg)
std::ifstream layout_file{cfg.input};

if (!layout_file.good() || !layout_file.is_open())
throw std::logic_error{"Could not open file " + cfg.input.string() + " for reading"}; // GCOVR_EXCL_LINE
throw std::logic_error{"Could not open file " + cfg.input.string() + " for reading"};

auto [filenames, chopper_config, hibf_layout] = chopper::layout::read_layout_file(layout_file);
auto const & hibf_config = chopper_config.hibf_config;

std::ofstream output_stream{cfg.output};

if (!output_stream.good() || !output_stream.is_open())
throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"}; // GCOVR_EXCL_LINE
throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"};

// Fetch all file sizes such that sorting by file size doesn't have to access the filesystem too often.
// n = filenames.size()
Expand Down
1 change: 1 addition & 0 deletions src/display_layout/shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ struct config
{
std::filesystem::path input{};
std::filesystem::path output{};
uint8_t threads{1u};
};

void execute_general(config const & cfg);
Expand Down
104 changes: 36 additions & 68 deletions src/display_layout/sizes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@
// shipped with this file and also available at: https://github.com/seqan/chopper/blob/main/LICENSE.md
// ---------------------------------------------------------------------------------------------------

#include <cassert>
#include <iostream>
#include <set>

#include <robin_hood.h>

#include <sharg/detail/to_string.hpp>
#include <sharg/exceptions.hpp>
#include <sharg/parser.hpp>

#include <seqan3/io/sequence_file/all.hpp>
#include <seqan3/search/views/minimiser_hash.hpp>

#include <chopper/adjust_seed.hpp>
#include <chopper/layout/hibf_statistics.hpp>
#include <chopper/layout/input.hpp>

Expand Down Expand Up @@ -55,21 +51,16 @@ size_t compute_ibf_size(robin_hood::unordered_flat_set<uint64_t> & parent_kmers,
size_t const number_of_bins,
seqan::hibf::layout::graph::node const & ibf_node,
seqan::hibf::build::build_data & data,
size_t current_hibf_level)
size_t const current_hibf_level)
{
size_t const kmers_per_bin{static_cast<size_t>(std::ceil(static_cast<double>(kmers.size()) / number_of_bins))};
double const bin_bits{
static_cast<double>(seqan::hibf::build::bin_size_in_bits({.fpr = data.config.maximum_false_positive_rate,
.hash_count = data.config.number_of_hash_functions,
.elements = kmers_per_bin}))};
seqan::hibf::bin_size const bin_size{
static_cast<size_t>(std::ceil(bin_bits * data.fpr_correction[number_of_bins]))};
seqan::hibf::bin_count const bin_count{ibf_node.number_of_technical_bins};

size_t const bin_words = (bin_count.value + 63) >> 6; // = ceil(bins/64)
size_t const technical_bins = bin_words << 6; // = bin_words * 64
size_t const kmers_per_bin = std::ceil(static_cast<double>(kmers.size()) / number_of_bins);
size_t const bin_size =
std::ceil(seqan::hibf::build::bin_size_in_bits({.fpr = data.config.maximum_false_positive_rate,
.hash_count = data.config.number_of_hash_functions,
.elements = kmers_per_bin})
* data.fpr_correction[number_of_bins]);

size_t const ibf_size = technical_bins * bin_size.value;
size_t const ibf_size = ibf_node.number_of_technical_bins * bin_size;

if (current_hibf_level > 0 /* not top level */)
update_parent_kmers(parent_kmers, kmers);
Expand All @@ -81,7 +72,7 @@ size_t hierarchical_stats(stats & stats_data,
robin_hood::unordered_flat_set<uint64_t> & parent_kmers,
seqan::hibf::layout::graph::node const & current_node,
seqan::hibf::build::build_data & data,
size_t current_hibf_level)
size_t const current_hibf_level)
{
size_t const ibf_pos{data.request_ibf_idx()};
std::vector<size_t> size_waste_per_tb(current_node.number_of_technical_bins);
Expand Down Expand Up @@ -117,18 +108,12 @@ size_t hierarchical_stats(stats & stats_data,
kmers.clear(); // reduce memory peak

// parse all other children (merged bins) of the current ibf
auto loop_over_children = [&]()
if (!current_node.children.empty())
{
if (current_node.children.empty())
return;

std::vector<seqan::hibf::layout::graph::node> children = current_node.children; // copy for threads

size_t const number_of_mutex = (current_node.number_of_technical_bins + 63) / 64;
std::vector<std::mutex> local_ibf_mutex(number_of_mutex);
std::mutex merge_kmer_mutex{};

size_t number_of_threads{};
std::vector<size_t> indices(children.size());
std::vector<size_t> indices(current_node.children.size());
std::iota(indices.begin(), indices.end(), size_t{});

// We do not want to process the favourite child. It has already been processed prior.
Expand All @@ -150,27 +135,21 @@ size_t hierarchical_stats(stats & stats_data,
#pragma omp parallel for schedule(dynamic, 1) num_threads(number_of_threads)
for (auto && index : indices)
{
auto & child = children[index];
auto & child = current_node.children[index];

robin_hood::unordered_flat_set<uint64_t> kmers{};
hierarchical_stats(stats_data, kmers, child, data, current_hibf_level + 1);
auto parent_bin_index = child.parent_bin_index;
hierarchical_stats(stats_data, kmers, child, data, current_hibf_level + 1u);

if (current_hibf_level > 0 /* not top level */)
{
size_t const mutex_id{parent_bin_index / 64};
std::lock_guard<std::mutex> guard{local_ibf_mutex[mutex_id]};

if (current_hibf_level > 0 /* not top level */)
update_parent_kmers(parent_kmers, kmers);

assert(amount_of_max_bin_kmers >= kmers.size());
size_waste_per_tb[parent_bin_index] =
amount_of_max_bin_kmers - std::min(amount_of_max_bin_kmers, kmers.size());
std::lock_guard<std::mutex> guard{merge_kmer_mutex};
update_parent_kmers(parent_kmers, kmers);
}
}
};

loop_over_children();
size_waste_per_tb[child.parent_bin_index] =
amount_of_max_bin_kmers - std::min(amount_of_max_bin_kmers, kmers.size());
}
}

// If max bin was a merged bin, process all remaining records, otherwise the first one has already been processed
size_t const start{(current_node.favourite_child_idx.has_value()) ? 0u : 1u};
Expand All @@ -180,22 +159,9 @@ size_t hierarchical_stats(stats & stats_data,

compute_kmers(kmers, data, record);

size_t const kmers_per_tb = kmers.size() / record.number_of_technical_bins;
size_t const kmers_per_tb = kmers.size() / record.number_of_technical_bins + 1u;
size_t const diff = amount_of_max_bin_kmers - std::min(amount_of_max_bin_kmers, kmers_per_tb);

if (diff > amount_of_max_bin_kmers)
throw std::runtime_error{"Wrong? diff:" + std::to_string(diff)
+ " amount_of_max_bin_kmers:" + std::to_string(amount_of_max_bin_kmers)};

// size_t const kmers_per_tb_corrected = kmers_per_tb - (0.03 * kmers_per_tb);
// if (amount_of_max_bin_kmers < kmers_per_tb_corrected)
// {
// throw std::runtime_error{"amount_of_max_bin_kmers:" + std::to_string(amount_of_max_bin_kmers)
// + " was smaller than current size: " + std::to_string(kmers.size()) + "/"
// + std::to_string(record.number_of_technical_bins) + "="
// + std::to_string(kmers.size() / record.number_of_technical_bins)};
// }

assert(size_waste_per_tb.size() >= record.storage_TB_id + record.number_of_technical_bins);
for (size_t i = record.storage_TB_id; i < record.storage_TB_id + record.number_of_technical_bins; ++i)
size_waste_per_tb[i] = diff;
Expand All @@ -208,12 +174,12 @@ size_t hierarchical_stats(stats & stats_data,

stats_data.ibf_sizes[ibf_pos] = ibf_size;
stats_data.ibf_levels[ibf_pos] = current_hibf_level;

// compute load factor
size_t const waste = std::accumulate(size_waste_per_tb.begin(), size_waste_per_tb.end(), size_t{});
size_t const total = amount_of_max_bin_kmers * size_waste_per_tb.size();

if (waste > total)
throw std::runtime_error{"Wrong? waste:" + std::to_string(waste) + " total:" + std::to_string(total)};
assert(waste <= total);

stats_data.ibf_load_factor[ibf_pos] = (total - waste) / static_cast<double>(total);

Expand All @@ -233,9 +199,10 @@ void execute_general_stats(config const & cfg)
std::ifstream layout_file{cfg.input};

if (!layout_file.good() || !layout_file.is_open())
throw std::logic_error{"Could not open file " + cfg.input.string() + " for reading"}; // GCOVR_EXCL_LINE
throw std::logic_error{"Could not open file " + cfg.input.string() + " for reading"};

auto [filenames, chopper_config, hibf_layout] = chopper::layout::read_layout_file(layout_file);
chopper_config.hibf_config.threads = cfg.threads;

auto input_lambda = [&filenames, &chopper_config](size_t const user_bin_id, seqan::hibf::insert_iterator it)
{
Expand All @@ -257,14 +224,14 @@ void execute_general_stats(config const & cfg)
std::ofstream output_stream{cfg.output};

if (!output_stream.good() || !output_stream.is_open())
throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"}; // GCOVR_EXCL_LINE
throw std::logic_error{"Could not open file " + cfg.output.string() + " for reading"};

size_t const number_of_ibfs = hibf_layout.max_bins.size() + 1u;

stats stats_data;
stats_data.ibf_sizes.resize(number_of_ibfs);
stats_data.ibf_levels.resize(number_of_ibfs);
stats_data.ibf_load_factor.resize(number_of_ibfs, 1.0);
stats_data.ibf_load_factor.resize(number_of_ibfs);

seqan::hibf::build::build_data data{.config = hibf_config, .ibf_graph = {hibf_layout}};

Expand All @@ -280,9 +247,9 @@ void execute_general_stats(config const & cfg)

// accumulate statistics per level
size_t const number_of_levels = std::ranges::max(stats_data.ibf_levels) + 1u;
std::vector<size_t> size_per_level(number_of_levels, 0u);
std::vector<double> load_factor_per_level(number_of_levels, 0.0);
std::vector<size_t> num_ibfs_per_level(number_of_levels, 0u);
std::vector<size_t> size_per_level(number_of_levels);
std::vector<double> load_factor_per_level(number_of_levels);
std::vector<size_t> num_ibfs_per_level(number_of_levels);

for (size_t i = 0; i < stats_data.ibf_sizes.size(); ++i)
{
Expand All @@ -295,11 +262,12 @@ void execute_general_stats(config const & cfg)
for (size_t i = 0; i < number_of_levels; ++i)
load_factor_per_level[i] /= num_ibfs_per_level[i];

output_stream << "#Levels: " << number_of_levels << '\n';
output_stream << "LEVEL-IDX\tSIZE-IN-BITS\tNUMBER-OF-IBFS\tLOAD-FACTOR-IN-%\n";
output_stream << std::fixed << std::setprecision(2);
output_stream << "# Levels: " << number_of_levels << '\n';
output_stream << "LEVEL\tBIT_SIZE\tIBFS\tLOAD_FACTOR\n";
for (size_t i = 0; i < number_of_levels; ++i)
output_stream << i << '\t' << size_per_level[i] << '\t' << num_ibfs_per_level[i] << '\t'
<< load_factor_per_level[i] << '\n';
<< load_factor_per_level[i] * 100 << '\n';
}

void execute_sizes(config const & cfg)
Expand Down

0 comments on commit 7c2a7a3

Please sign in to comment.