diff --git a/cpp/array_record_reader.cc b/cpp/array_record_reader.cc index 2794ec9..0950350 100644 --- a/cpp/array_record_reader.cc +++ b/cpp/array_record_reader.cc @@ -191,8 +191,8 @@ void ArrayRecordReaderBase::Initialize() { if (state_->pool) { max_parallelism = state_->pool->NumThreads(); if (state_->options.max_parallelism().has_value()) { - max_parallelism = - std::min(max_parallelism, state_->options.max_parallelism().value()); + max_parallelism = std::min( + max_parallelism, state_->options.max_parallelism().value()); } } state_->options.set_max_parallelism(max_parallelism); @@ -324,16 +324,16 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecords( if (state_->chunk_offsets.empty()) { return absl::OkStatus(); } - uint64_t num_chunk_groups = - CeilOfRatio(state_->chunk_offsets.size(), state_->chunk_group_size); + uint64_t num_chunk_groups = CeilOfRatio(state_->chunk_offsets.size(), + state_->chunk_group_size); const auto reader = get_backing_reader(); auto status = ParallelForWithStatus<1>( Seq(num_chunk_groups), state_->pool, [&](size_t buf_idx) -> absl::Status { uint64_t chunk_idx_start = buf_idx * state_->chunk_group_size; // inclusive index, not the conventional exclusive index. uint64_t last_chunk_idx = - std::min((buf_idx + 1) * state_->chunk_group_size - 1, - state_->chunk_offsets.size() - 1); + std::min((buf_idx + 1) * state_->chunk_group_size - 1, + state_->chunk_offsets.size() - 1); uint64_t buf_len = state_->ChunkEndOffset(last_chunk_idx) - state_->chunk_offsets[chunk_idx_start]; AR_ENDO_JOB( @@ -398,9 +398,10 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange( begin, end, NumRecords()); } uint64_t chunk_idx_begin = begin / state_->record_group_size; - uint64_t chunk_idx_end = CeilOfRatio(end, state_->record_group_size); + uint64_t chunk_idx_end = CeilOfRatio(end, state_->record_group_size); uint64_t num_chunks = chunk_idx_end - chunk_idx_begin; - uint64_t num_chunk_groups = CeilOfRatio(num_chunks, state_->chunk_group_size); + uint64_t num_chunk_groups = + CeilOfRatio(num_chunks, state_->chunk_group_size); const auto reader = get_backing_reader(); auto status = ParallelForWithStatus<1>( @@ -408,7 +409,7 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange( uint64_t chunk_idx_start = chunk_idx_begin + buf_idx * state_->chunk_group_size; // inclusive index, not the conventional exclusive index. - uint64_t last_chunk_idx = std::min( + uint64_t last_chunk_idx = std::min( chunk_idx_begin + (buf_idx + 1) * state_->chunk_group_size - 1, chunk_idx_end - 1); uint64_t buf_len = state_->ChunkEndOffset(last_chunk_idx) - @@ -604,7 +605,7 @@ bool ArrayRecordReaderBase::SeekRecord(uint64_t record_index) { if (!ok()) { return false; } - state_->record_idx = std::min(record_index, state_->num_records); + state_->record_idx = std::min(record_index, state_->num_records); return true; } @@ -654,8 +655,9 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { std::vector decoders; decoders.reserve(state_->chunk_group_size); uint64_t chunk_start = buffer_idx * state_->chunk_group_size; - uint64_t chunk_end = std::min(state_->chunk_offsets.size(), - (buffer_idx + 1) * state_->chunk_group_size); + uint64_t chunk_end = + std::min(state_->chunk_offsets.size(), + (buffer_idx + 1) * state_->chunk_group_size); const auto reader = get_backing_reader(); for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) { uint64_t chunk_offset = state_->chunk_offsets[chunk_idx]; @@ -695,8 +697,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { chunk_offsets.reserve(state_->chunk_group_size); uint64_t chunk_start = buffer_to_add * state_->chunk_group_size; uint64_t chunk_end = - std::min(state_->chunk_offsets.size(), - (buffer_to_add + 1) * state_->chunk_group_size); + std::min(state_->chunk_offsets.size(), + (buffer_to_add + 1) * state_->chunk_group_size); for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) { chunk_offsets.push_back(state_->chunk_offsets[chunk_idx]); } diff --git a/cpp/shareable_dependency_test.cc b/cpp/shareable_dependency_test.cc index 2083dd7..2695ba5 100644 --- a/cpp/shareable_dependency_test.cc +++ b/cpp/shareable_dependency_test.cc @@ -15,6 +15,7 @@ limitations under the License. #include "cpp/shareable_dependency.h" +#include #include #include @@ -73,13 +74,14 @@ TEST_F(ShareableDependencyTest, SanityTest) { EXPECT_FALSE(new_main.IsUnique()); // NOLINT(bugprone-use-after-move) absl::Notification notification; - pool_->Schedule([refobj = main.Share(), ¬ification] { - notification.WaitForNotification(); - absl::SleepFor(absl::Milliseconds(10)); - EXPECT_EQ(refobj->value(), 1); - const auto second_ref = refobj; - refobj->add_value(1); - }); + pool_->Schedule( + [refobj = std::make_shared>(main.Share()), + ¬ification] { + notification.WaitForNotification(); + absl::SleepFor(absl::Milliseconds(10)); + EXPECT_EQ(refobj->get()->value(), 1); + refobj->get()->add_value(1); + }); EXPECT_FALSE(main.IsUnique()); notification.Notify(); auto& unique = main.WaitUntilUnique(); @@ -97,13 +99,14 @@ TEST_F(ShareableDependencyTest, SanityTestWithReset) { EXPECT_TRUE(main.IsUnique()); absl::Notification notification; - pool_->Schedule([refobj = main.Share(), ¬ification] { - notification.WaitForNotification(); - absl::SleepFor(absl::Milliseconds(10)); - EXPECT_EQ(refobj->value(), 1); - const auto second_ref = refobj; - refobj->add_value(1); - }); + pool_->Schedule( + [refobj = std::make_shared>(main.Share()), + ¬ification] { + notification.WaitForNotification(); + absl::SleepFor(absl::Milliseconds(10)); + EXPECT_EQ(refobj->get()->value(), 1); + refobj->get()->add_value(1); + }); EXPECT_FALSE(main.IsUnique()); notification.Notify(); auto& unique = main.WaitUntilUnique(); diff --git a/oss/build_whl.sh b/oss/build_whl.sh index 8267364..dbb4c64 100755 --- a/oss/build_whl.sh +++ b/oss/build_whl.sh @@ -29,9 +29,12 @@ function main() { write_to_bazelrc "build -c opt" write_to_bazelrc "build --cxxopt=-std=c++17" write_to_bazelrc "build --host_cxxopt=-std=c++17" - write_to_bazelrc "build --linkopt=\"-lrt -lm\"" write_to_bazelrc "build --experimental_repo_remote_exec" write_to_bazelrc "build --python_path=\"${PYTHON_BIN}\"" + PLATFORM="$(uname)" + if [[ "$PLATFORM" != "Darwin" ]]; then + write_to_bazelrc "build --linkopt=\"-lrt -lm\"" + fi if [ -n "${CROSSTOOL_TOP}" ]; then write_to_bazelrc "build --crosstool_top=${CROSSTOOL_TOP}" @@ -40,8 +43,8 @@ function main() { export USE_BAZEL_VERSION="${BAZEL_VERSION}" bazel clean - bazel build ... - bazel test --verbose_failures --test_output=errors ... + bazel build ... --action_env PYTHON_BIN_PATH="${PYTHON_BIN}" + bazel test --verbose_failures --test_output=errors ... --action_env PYTHON_BIN_PATH="${PYTHON_BIN}" DEST="/tmp/array_record/all_dist" # Create the directory, then do dirname on a non-existent file inside it to diff --git a/oss/runner_common.sh b/oss/runner_common.sh index a35f5c0..97794f9 100644 --- a/oss/runner_common.sh +++ b/oss/runner_common.sh @@ -49,6 +49,9 @@ function install_and_init_pyenv { if [[ ! -d $PYENV_ROOT ]]; then echo "Installing pyenv.." git clone https://github.com/pyenv/pyenv.git "$PYENV_ROOT" + pushd "$PYENV_ROOT" + git checkout "v2.4.21" + popd export PATH="/home/kbuilder/.local/bin:$PYENV_ROOT/bin:$PATH" eval "$(pyenv init --path)" fi @@ -56,17 +59,16 @@ function install_and_init_pyenv { echo "Python setup..." pyenv install -s "$PYENV_PYTHON_VERSION" pyenv global "$PYENV_PYTHON_VERSION" - PYTHON=$(pyenv which python) + export PYTHON_BIN=$(pyenv which python) } -function setup_env_vars_py310 { +function setup_env_vars_py { # This controls the python binary to use. - PYTHON=python3.10 - PYTHON_STR=python3.10 - PYTHON_MAJOR_VERSION=3 - PYTHON_MINOR_VERSION=10 + PYTHON_MAJOR_VERSION=$1 + PYTHON_MINOR_VERSION=$2 # This is for pyenv install. - PYENV_PYTHON_VERSION=3.10.13 + PYENV_PYTHON_VERSION=${PYTHON_MAJOR_VERSION}.${PYTHON_MINOR_VERSION} + PYTHON="python$PYENV_PYTHON_VERSION" } function update_bazel_macos { @@ -78,7 +80,17 @@ function update_bazel_macos { ./bazel-${BAZEL_VERSION}-installer-darwin-${ARCH}.sh --user rm -f ./bazel-${BAZEL_VERSION}-installer-darwin-${ARCH}.sh # Add new bazel installation to path - PATH="/Users/kbuilder/bin:$PATH" + export PATH="/Users/kbuilder/bin:$PATH" +} + +function install_ar_deps { + $PYTHON_BIN -m pip install -U \ + absl-py \ + build \ + etils[epath] \ + setuptools \ + twine \ + wheel; } function build_and_test_array_record_macos() { @@ -90,13 +102,19 @@ function build_and_test_array_record_macos() { update_bazel_macos ${BAZEL_VERSION} bazel --version - # Set up Pyenv. - setup_env_vars_py310 - install_and_init_pyenv + PYTHON_MAJOR_VERSION=3 + for PYTHON_MINOR_VERSION in 10 11 12 + do + # Set up Pyenv. + PYTHON_VERSION=${PYTHON_MAJOR_VERSION}.${PYTHON_MINOR_VERSION} + echo "Creating array_record wheel for Python Version $PYTHON_VERSION" + setup_env_vars_py $PYTHON_MAJOR_VERSION $PYTHON_MINOR_VERSION + install_and_init_pyenv + install_ar_deps - # Build and test ArrayRecord. - cd ${SOURCE_DIR} - bash ${SOURCE_DIR}/oss/build_whl.sh + # Build and test ArrayRecord. + bash ${SOURCE_DIR}/oss/build_whl.sh + done ls ${SOURCE_DIR}/all_dist/*.whl } \ No newline at end of file