Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal Change #136

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions cpp/array_record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ void ArrayRecordReaderBase::Initialize() {
if (state_->pool) {
max_parallelism = state_->pool->NumThreads();
if (state_->options.max_parallelism().has_value()) {
max_parallelism =
std::min(max_parallelism, state_->options.max_parallelism().value());
max_parallelism = std::min<size_t>(
max_parallelism, state_->options.max_parallelism().value());
}
}
state_->options.set_max_parallelism(max_parallelism);
Expand Down Expand Up @@ -324,16 +324,16 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecords(
if (state_->chunk_offsets.empty()) {
return absl::OkStatus();
}
uint64_t num_chunk_groups =
CeilOfRatio(state_->chunk_offsets.size(), state_->chunk_group_size);
uint64_t num_chunk_groups = CeilOfRatio<size_t>(state_->chunk_offsets.size(),
state_->chunk_group_size);
const auto reader = get_backing_reader();
auto status = ParallelForWithStatus<1>(
Seq(num_chunk_groups), state_->pool, [&](size_t buf_idx) -> absl::Status {
uint64_t chunk_idx_start = buf_idx * state_->chunk_group_size;
// inclusive index, not the conventional exclusive index.
uint64_t last_chunk_idx =
std::min((buf_idx + 1) * state_->chunk_group_size - 1,
state_->chunk_offsets.size() - 1);
std::min<size_t>((buf_idx + 1) * state_->chunk_group_size - 1,
state_->chunk_offsets.size() - 1);
uint64_t buf_len = state_->ChunkEndOffset(last_chunk_idx) -
state_->chunk_offsets[chunk_idx_start];
AR_ENDO_JOB(
Expand Down Expand Up @@ -398,17 +398,18 @@ absl::Status ArrayRecordReaderBase::ParallelReadRecordsInRange(
begin, end, NumRecords());
}
uint64_t chunk_idx_begin = begin / state_->record_group_size;
uint64_t chunk_idx_end = CeilOfRatio(end, state_->record_group_size);
uint64_t chunk_idx_end = CeilOfRatio<size_t>(end, state_->record_group_size);
uint64_t num_chunks = chunk_idx_end - chunk_idx_begin;
uint64_t num_chunk_groups = CeilOfRatio(num_chunks, state_->chunk_group_size);
uint64_t num_chunk_groups =
CeilOfRatio<size_t>(num_chunks, state_->chunk_group_size);

const auto reader = get_backing_reader();
auto status = ParallelForWithStatus<1>(
Seq(num_chunk_groups), state_->pool, [&](size_t buf_idx) -> absl::Status {
uint64_t chunk_idx_start =
chunk_idx_begin + buf_idx * state_->chunk_group_size;
// inclusive index, not the conventional exclusive index.
uint64_t last_chunk_idx = std::min(
uint64_t last_chunk_idx = std::min<size_t>(
chunk_idx_begin + (buf_idx + 1) * state_->chunk_group_size - 1,
chunk_idx_end - 1);
uint64_t buf_len = state_->ChunkEndOffset(last_chunk_idx) -
Expand Down Expand Up @@ -604,7 +605,7 @@ bool ArrayRecordReaderBase::SeekRecord(uint64_t record_index) {
if (!ok()) {
return false;
}
state_->record_idx = std::min(record_index, state_->num_records);
state_->record_idx = std::min<size_t>(record_index, state_->num_records);
return true;
}

Expand Down Expand Up @@ -654,8 +655,9 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
std::vector<ChunkDecoder> decoders;
decoders.reserve(state_->chunk_group_size);
uint64_t chunk_start = buffer_idx * state_->chunk_group_size;
uint64_t chunk_end = std::min(state_->chunk_offsets.size(),
(buffer_idx + 1) * state_->chunk_group_size);
uint64_t chunk_end =
std::min<size_t>(state_->chunk_offsets.size(),
(buffer_idx + 1) * state_->chunk_group_size);
const auto reader = get_backing_reader();
for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) {
uint64_t chunk_offset = state_->chunk_offsets[chunk_idx];
Expand Down Expand Up @@ -695,8 +697,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
chunk_offsets.reserve(state_->chunk_group_size);
uint64_t chunk_start = buffer_to_add * state_->chunk_group_size;
uint64_t chunk_end =
std::min(state_->chunk_offsets.size(),
(buffer_to_add + 1) * state_->chunk_group_size);
std::min<size_t>(state_->chunk_offsets.size(),
(buffer_to_add + 1) * state_->chunk_group_size);
for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) {
chunk_offsets.push_back(state_->chunk_offsets[chunk_idx]);
}
Expand Down
31 changes: 17 additions & 14 deletions cpp/shareable_dependency_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.

#include "cpp/shareable_dependency.h"

#include <memory>
#include <optional>
#include <utility>

Expand Down Expand Up @@ -73,13 +74,14 @@ TEST_F(ShareableDependencyTest, SanityTest) {
EXPECT_FALSE(new_main.IsUnique()); // NOLINT(bugprone-use-after-move)

absl::Notification notification;
pool_->Schedule([refobj = main.Share(), &notification] {
notification.WaitForNotification();
absl::SleepFor(absl::Milliseconds(10));
EXPECT_EQ(refobj->value(), 1);
const auto second_ref = refobj;
refobj->add_value(1);
});
pool_->Schedule(
[refobj = std::make_shared<DependencyShare<FooBase*>>(main.Share()),
&notification] {
notification.WaitForNotification();
absl::SleepFor(absl::Milliseconds(10));
EXPECT_EQ(refobj->get()->value(), 1);
refobj->get()->add_value(1);
});
EXPECT_FALSE(main.IsUnique());
notification.Notify();
auto& unique = main.WaitUntilUnique();
Expand All @@ -97,13 +99,14 @@ TEST_F(ShareableDependencyTest, SanityTestWithReset) {
EXPECT_TRUE(main.IsUnique());

absl::Notification notification;
pool_->Schedule([refobj = main.Share(), &notification] {
notification.WaitForNotification();
absl::SleepFor(absl::Milliseconds(10));
EXPECT_EQ(refobj->value(), 1);
const auto second_ref = refobj;
refobj->add_value(1);
});
pool_->Schedule(
[refobj = std::make_shared<DependencyShare<FooBase*>>(main.Share()),
&notification] {
notification.WaitForNotification();
absl::SleepFor(absl::Milliseconds(10));
EXPECT_EQ(refobj->get()->value(), 1);
refobj->get()->add_value(1);
});
EXPECT_FALSE(main.IsUnique());
notification.Notify();
auto& unique = main.WaitUntilUnique();
Expand Down
9 changes: 6 additions & 3 deletions oss/build_whl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ function main() {
write_to_bazelrc "build -c opt"
write_to_bazelrc "build --cxxopt=-std=c++17"
write_to_bazelrc "build --host_cxxopt=-std=c++17"
write_to_bazelrc "build --linkopt=\"-lrt -lm\""
write_to_bazelrc "build --experimental_repo_remote_exec"
write_to_bazelrc "build --python_path=\"${PYTHON_BIN}\""
PLATFORM="$(uname)"
if [[ "$PLATFORM" != "Darwin" ]]; then
write_to_bazelrc "build --linkopt=\"-lrt -lm\""
fi

if [ -n "${CROSSTOOL_TOP}" ]; then
write_to_bazelrc "build --crosstool_top=${CROSSTOOL_TOP}"
Expand All @@ -40,8 +43,8 @@ function main() {

export USE_BAZEL_VERSION="${BAZEL_VERSION}"
bazel clean
bazel build ...
bazel test --verbose_failures --test_output=errors ...
bazel build ... --action_env PYTHON_BIN_PATH="${PYTHON_BIN}"
bazel test --verbose_failures --test_output=errors ... --action_env PYTHON_BIN_PATH="${PYTHON_BIN}"

DEST="/tmp/array_record/all_dist"
# Create the directory, then do dirname on a non-existent file inside it to
Expand Down
46 changes: 32 additions & 14 deletions oss/runner_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,26 @@ function install_and_init_pyenv {
if [[ ! -d $PYENV_ROOT ]]; then
echo "Installing pyenv.."
git clone https://github.com/pyenv/pyenv.git "$PYENV_ROOT"
pushd "$PYENV_ROOT"
git checkout "v2.4.21"
popd
export PATH="/home/kbuilder/.local/bin:$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init --path)"
fi

echo "Python setup..."
pyenv install -s "$PYENV_PYTHON_VERSION"
pyenv global "$PYENV_PYTHON_VERSION"
PYTHON=$(pyenv which python)
export PYTHON_BIN=$(pyenv which python)
}

function setup_env_vars_py310 {
function setup_env_vars_py {
# This controls the python binary to use.
PYTHON=python3.10
PYTHON_STR=python3.10
PYTHON_MAJOR_VERSION=3
PYTHON_MINOR_VERSION=10
PYTHON_MAJOR_VERSION=$1
PYTHON_MINOR_VERSION=$2
# This is for pyenv install.
PYENV_PYTHON_VERSION=3.10.13
PYENV_PYTHON_VERSION=${PYTHON_MAJOR_VERSION}.${PYTHON_MINOR_VERSION}
PYTHON="python$PYENV_PYTHON_VERSION"
}

function update_bazel_macos {
Expand All @@ -78,7 +80,17 @@ function update_bazel_macos {
./bazel-${BAZEL_VERSION}-installer-darwin-${ARCH}.sh --user
rm -f ./bazel-${BAZEL_VERSION}-installer-darwin-${ARCH}.sh
# Add new bazel installation to path
PATH="/Users/kbuilder/bin:$PATH"
export PATH="/Users/kbuilder/bin:$PATH"
}

function install_ar_deps {
$PYTHON_BIN -m pip install -U \
absl-py \
build \
etils[epath] \
setuptools \
twine \
wheel;
}

function build_and_test_array_record_macos() {
Expand All @@ -90,13 +102,19 @@ function build_and_test_array_record_macos() {
update_bazel_macos ${BAZEL_VERSION}
bazel --version

# Set up Pyenv.
setup_env_vars_py310
install_and_init_pyenv
PYTHON_MAJOR_VERSION=3
for PYTHON_MINOR_VERSION in 10 11 12
do
# Set up Pyenv.
PYTHON_VERSION=${PYTHON_MAJOR_VERSION}.${PYTHON_MINOR_VERSION}
echo "Creating array_record wheel for Python Version $PYTHON_VERSION"
setup_env_vars_py $PYTHON_MAJOR_VERSION $PYTHON_MINOR_VERSION
install_and_init_pyenv
install_ar_deps

# Build and test ArrayRecord.
cd ${SOURCE_DIR}
bash ${SOURCE_DIR}/oss/build_whl.sh
# Build and test ArrayRecord.
bash ${SOURCE_DIR}/oss/build_whl.sh
done

ls ${SOURCE_DIR}/all_dist/*.whl
}
Loading