From 89530bfd61a0fea3bced48cabbcd17f138747da4 Mon Sep 17 00:00:00 2001 From: Tao He Date: Wed, 13 Dec 2023 14:27:40 +0800 Subject: [PATCH] Implement concurrent memcpy for building Python objects into vineyard (#1646) Remove the problematic `.buffer` property (as it cannot bind the lifetime of the underlying blob to the memoryview object) and add concurrent support for memcpy for faster object building. Fixes #1631 Signed-off-by: Tao He --- python/core.cc | 115 +++++++----------- python/pybind11_docs.cc | 24 +--- python/pybind11_docs.h | 4 - python/pybind11_utils.cc | 25 ++-- python/pybind11_utils.h | 12 +- python/vineyard/data/arrow.py | 14 +-- python/vineyard/data/benchmarks/__init__.py | 17 +++ .../vineyard/data/benchmarks/test_tensor.py | 86 +++++++++++++ .../drivers/io/adaptors/deserializer.py | 2 +- src/common/memory/memcpy.h | 38 ++++++ test/concurrent_memcpy_test.cc | 65 ++++++++++ test/runner.py | 3 +- 12 files changed, 282 insertions(+), 123 deletions(-) create mode 100644 python/vineyard/data/benchmarks/__init__.py create mode 100644 python/vineyard/data/benchmarks/test_tensor.py create mode 100644 test/concurrent_memcpy_test.cc diff --git a/python/core.cc b/python/core.cc index 34e1af10..1c1ad3b4 100644 --- a/python/core.cc +++ b/python/core.cc @@ -22,6 +22,7 @@ limitations under the License. #include "client/ds/object_meta.h" #include "client/ds/remote_blob.h" #include "client/rpc_client.h" +#include "common/memory/memcpy.h" #include "common/util/json.h" #include "common/util/status.h" #include "common/util/uuid.h" @@ -481,18 +482,6 @@ void bind_blobs(py::module& mod) { "address", [](Blob* self) { return reinterpret_cast(self->data()); }, doc::Blob_address) - .def_property_readonly( - "buffer", - [](Blob& blob) -> py::object { - auto buffer = blob.Buffer(); - if (buffer == nullptr) { - return py::none(); - } else { - return py::memoryview::from_memory( - const_cast(buffer->data()), buffer->size(), true); - } - }, - doc::Blob_buffer) .def_buffer([](Blob& blob) -> py::buffer_info { return py::buffer_info(const_cast(blob.data()), sizeof(int8_t), py::format_descriptor::format(), 1, @@ -550,21 +539,29 @@ void bind_blobs(py::module& mod) { .def( "copy", [](BlobWriter* self, size_t const offset, uintptr_t ptr, - size_t const size) { - std::memcpy(self->data() + offset, reinterpret_cast(ptr), - size); - }, - "offset"_a, "address"_a, "size"_a, doc::BlobBuilder_copy) + size_t const size, + size_t const concurrency = memory::default_memcpy_concurrency) { + memory::concurrent_memcpy(self->data() + offset, + reinterpret_cast(ptr), size, + concurrency); + }, + "offset"_a, "address"_a, "size"_a, + py::arg("concurrency") = memory::default_memcpy_concurrency, + doc::BlobBuilder_copy) .def( "copy", - [](BlobWriter* self, size_t offset, py::buffer const& buffer) { + [](BlobWriter* self, size_t offset, py::buffer const& buffer, + size_t const concurrency = memory::default_memcpy_concurrency) { throw_on_error(copy_memoryview(buffer.ptr(), self->data(), - self->size(), offset)); + self->size(), offset, concurrency)); }, - "offset"_a, "buffer"_a) + "offset"_a, "buffer"_a, + py::arg("concurrency") = memory::default_memcpy_concurrency, + doc::BlobBuilder_copy) .def( "copy", - [](BlobWriter* self, size_t offset, py::bytes const& bs) { + [](BlobWriter* self, size_t offset, py::bytes const& bs, + size_t const concurrency = memory::default_memcpy_concurrency) { char* buffer = nullptr; ssize_t length = 0; if (PYBIND11_BYTES_AS_STRING_AND_SIZE(bs.ptr(), &buffer, &length)) { @@ -577,27 +574,18 @@ void bind_blobs(py::module& mod) { "', but the buffer size is '" + std::to_string(length) + "'")); } - std::memcpy(self->data() + offset, buffer, length); + memory::concurrent_memcpy(self->data() + offset, buffer, length, + concurrency); }, - "offset"_a, "bytes"_a) + "offset"_a, "bytes"_a, + py::arg("concurrency") = memory::default_memcpy_concurrency, + doc::BlobBuilder_copy) .def_property_readonly( "address", [](BlobWriter* self) { return reinterpret_cast(self->data()); }, doc::BlobBuilder_address) - .def_property_readonly( - "buffer", - [](BlobWriter& blob) -> py::object { - auto buffer = blob.Buffer(); - if (buffer == nullptr) { - return py::none(); - } else { - return py::memoryview::from_memory(buffer->mutable_data(), - buffer->size(), false); - } - }, - doc::BlobBuilder_buffer) .def_buffer([](BlobWriter& blob) -> py::buffer_info { return py::buffer_info(blob.data(), sizeof(int8_t), py::format_descriptor::format(), 1, @@ -641,18 +629,6 @@ void bind_blobs(py::module& mod) { return reinterpret_cast(self->data()); }, doc::RemoteBlob_address) - .def_property_readonly( - "buffer", - [](RemoteBlob& blob) -> py::object { - auto buffer = blob.Buffer(); - if (buffer == nullptr) { - return py::none(); - } else { - return py::memoryview::from_memory( - const_cast(buffer->data()), buffer->size(), true); - } - }, - doc::RemoteBlob_buffer) .def_buffer([](RemoteBlob& blob) -> py::buffer_info { return py::buffer_info(const_cast(blob.data()), sizeof(int8_t), py::format_descriptor::format(), 1, @@ -733,21 +709,29 @@ void bind_blobs(py::module& mod) { .def( "copy", [](RemoteBlobWriter* self, size_t const offset, uintptr_t ptr, - size_t const size) { - std::memcpy(self->data() + offset, reinterpret_cast(ptr), - size); - }, - "offset"_a, "address"_a, "size"_a, doc::RemoteBlobBuilder_copy) + size_t const size, + size_t const concurrency = memory::default_memcpy_concurrency) { + memory::concurrent_memcpy(self->data() + offset, + reinterpret_cast(ptr), size, + concurrency); + }, + "offset"_a, "address"_a, "size"_a, + py::arg("concurrency") = memory::default_memcpy_concurrency, + doc::RemoteBlobBuilder_copy) .def( "copy", - [](RemoteBlobWriter* self, size_t offset, py::buffer const& buffer) { + [](RemoteBlobWriter* self, size_t offset, py::buffer const& buffer, + size_t const concurrency = memory::default_memcpy_concurrency) { throw_on_error(copy_memoryview(buffer.ptr(), self->data(), - self->size(), offset)); + self->size(), offset, concurrency)); }, - "offset"_a, "buffer"_a) + "offset"_a, "buffer"_a, + py::arg("concurrency") = memory::default_memcpy_concurrency, + doc::RemoteBlobBuilder_copy) .def( "copy", - [](RemoteBlobWriter* self, size_t offset, py::bytes const& bs) { + [](RemoteBlobWriter* self, size_t offset, py::bytes const& bs, + size_t const concurrency = memory::default_memcpy_concurrency) { char* buffer = nullptr; ssize_t length = 0; if (PYBIND11_BYTES_AS_STRING_AND_SIZE(bs.ptr(), &buffer, &length)) { @@ -760,27 +744,18 @@ void bind_blobs(py::module& mod) { "', but the buffer size is '" + std::to_string(length) + "'")); } - std::memcpy(self->data() + offset, buffer, length); + memory::concurrent_memcpy(self->data() + offset, buffer, length, + concurrency); }, - "offset"_a, "bytes"_a) + "offset"_a, "bytes"_a, + py::arg("concurrency") = memory::default_memcpy_concurrency, + doc::RemoteBlobBuilder_copy) .def_property_readonly( "address", [](RemoteBlobWriter* self) { return reinterpret_cast(self->data()); }, doc::RemoteBlobBuilder_address) - .def_property_readonly( - "buffer", - [](RemoteBlobWriter& blob) -> py::object { - auto buffer = blob.Buffer(); - if (buffer == nullptr) { - return py::none(); - } else { - return py::memoryview::from_memory(buffer->mutable_data(), - buffer->size(), false); - } - }, - doc::RemoteBlobBuilder_buffer) .def_buffer([](RemoteBlobWriter& blob) -> py::buffer_info { return py::buffer_info(blob.data(), sizeof(int8_t), py::format_descriptor::format(), 1, diff --git a/python/pybind11_docs.cc b/python/pybind11_docs.cc index c9ca54bf..6d203252 100644 --- a/python/pybind11_docs.cc +++ b/python/pybind11_docs.cc @@ -339,11 +339,6 @@ const char* Blob_address = R"doc( The memory address value of this blob. )doc"; -const char* Blob_buffer = R"doc( -The readonly buffer behind this blob. The result buffer has type -:code:`memoryview`. -)doc"; - const char* BlobBuilder = R"doc( :class:`BlobBuilder` is the builder for creating a finally immutable blob in vineyard server. @@ -373,7 +368,7 @@ Shrink the blob builder to the given size if it is not sealed yet. )doc"; const char* BlobBuilder_copy = R"doc( -.. method:: copy(self, offset: int, ptr: int, size: int) +.. method:: copy(self, offset: int, ptr: int, size: int, concurrency: int = 6) :noindex: Copy the given address to the given offset. @@ -383,11 +378,6 @@ const char* BlobBuilder_address = R"doc( The memory address value of this blob builder. )doc"; -const char* BlobBuilder_buffer = R"doc( -The writeable buffer behind this blob builder. The result buffer has type -:code:`memoryview`, and it is a mutable one. -)doc"; - const char* RemoteBlob = R"doc( :class:`RemoteBlob` is a holder for :class:`Blob` in cases like the :class:`Blob` doesn't exist in the local vineyardd instance but the client still want th access @@ -414,11 +404,6 @@ const char* RemoteBlob_address = R"doc( The memory address value of this blob. )doc"; -const char* RemoteBlob_buffer = R"doc( -The readonly buffer behind this blob. The result buffer has type -:code:`memoryview`. -)doc"; - const char* RemoteBlobBuilder = R"doc( :class:`RemoteBlobBuilder` is the builder for creating a finally immutable blob in vineyard server over a RPC client. @@ -476,7 +461,7 @@ Abort the blob builder if it is not sealed yet. )doc"; const char* RemoteBlobBuilder_copy = R"doc( -.. method:: copy(self, offset: int, ptr: int, size: int) +.. method:: copy(self, offset: int, ptr: int, size: int, concurrency: int = 6) :noindex: Copy the given address to the given offset. @@ -486,11 +471,6 @@ const char* RemoteBlobBuilder_address = R"doc( The memory address value of this blob builder. )doc"; -const char* RemoteBlobBuilder_buffer = R"doc( -The writeable buffer behind this blob builder. The result buffer has type -:code:`memoryview`, and it is a mutable one. -)doc"; - const char* InstanceStatus = R"doc( :class:`InstanceStatus` represents the status of connected vineyard instance, including the instance identity, memory statistics and workloads on this instance. diff --git a/python/pybind11_docs.h b/python/pybind11_docs.h index 0a9a6b05..82d08db1 100644 --- a/python/pybind11_docs.h +++ b/python/pybind11_docs.h @@ -58,7 +58,6 @@ extern const char* Blob_is_empty; extern const char* Blob_empty; extern const char* Blob__len__; extern const char* Blob_address; -extern const char* Blob_buffer; extern const char* BlobBuilder; extern const char* BlobBuilder_id; @@ -67,7 +66,6 @@ extern const char* BlobBuilder_abort; extern const char* BlobBuilder_shrink; extern const char* BlobBuilder_copy; extern const char* BlobBuilder_address; -extern const char* BlobBuilder_buffer; extern const char* RemoteBlob; extern const char* RemoteBlob_id; @@ -75,14 +73,12 @@ extern const char* RemoteBlob_instance_id; extern const char* RemoteBlob_is_empty; extern const char* RemoteBlob__len__; extern const char* RemoteBlob_address; -extern const char* RemoteBlob_buffer; extern const char* RemoteBlobBuilder; extern const char* RemoteBlobBuilder_size; extern const char* RemoteBlobBuilder_abort; extern const char* RemoteBlobBuilder_copy; extern const char* RemoteBlobBuilder_address; -extern const char* RemoteBlobBuilder_buffer; extern const char* InstanceStatus; extern const char* InstanceStatus_instance_id; diff --git a/python/pybind11_utils.cc b/python/pybind11_utils.cc index eb0e1cb6..53b31b43 100644 --- a/python/pybind11_utils.cc +++ b/python/pybind11_utils.cc @@ -18,7 +18,6 @@ limitations under the License. #include #include #include -#include #include #include "common/memory/memcpy.h" @@ -133,11 +132,13 @@ void bind_utils(py::module& mod) { mod.def( "memory_copy", [](py::buffer const dst, size_t offset, py::buffer const src, - size_t const size) { - throw_on_error( - copy_memoryview_to_memoryview(src.ptr(), dst.ptr(), size, offset)); + size_t const size, + size_t const concurrency = memory::default_memcpy_concurrency) { + throw_on_error(copy_memoryview_to_memoryview(src.ptr(), dst.ptr(), size, + offset, concurrency)); }, - "dst"_a, "offset"_a, "src"_a, py::arg("size") = 0 /* not checked */); + "dst"_a, "offset"_a, "src"_a, py::arg("size") = 0 /* not checked */, + py::arg("concurrency") = memory::default_memcpy_concurrency); PyModule_AddFunctions(mod.ptr(), vineyard_utils_methods); } @@ -177,7 +178,7 @@ class PyBufferGetter { } // namespace detail Status copy_memoryview(PyObject* src, void* dst, size_t const size, - size_t const offset) { + size_t const offset, size_t const concurrency) { detail::PyBufferGetter src_buffer(src); if (!src_buffer.has_buffer()) { return Status::AssertionFailed( @@ -201,15 +202,17 @@ Status copy_memoryview(PyObject* src, void* dst, size_t const size, { py::gil_scoped_release release; // memcpy - memory::inline_memcpy(reinterpret_cast(dst) + offset, - src_buffer.data(), src_buffer.size()); + memory::concurrent_memcpy(reinterpret_cast(dst) + offset, + src_buffer.data(), src_buffer.size(), + concurrency); } return Status::OK(); } Status copy_memoryview_to_memoryview(PyObject* src, PyObject* dst, - size_t const size, size_t const offset) { + size_t const size, size_t const offset, + size_t const concurrency) { detail::PyBufferGetter src_buffer(src); if (!src_buffer.has_buffer()) { return Status::AssertionFailed( @@ -256,8 +259,8 @@ Status copy_memoryview_to_memoryview(PyObject* src, PyObject* dst, { py::gil_scoped_release release; // memcpy - memory::inline_memcpy(dst_buffer.data() + offset, src_buffer.data(), - src_buffer.size()); + memory::concurrent_memcpy(dst_buffer.data() + offset, src_buffer.data(), + src_buffer.size(), concurrency); } return Status::OK(); diff --git a/python/pybind11_utils.h b/python/pybind11_utils.h index e7a25c64..b653c371 100644 --- a/python/pybind11_utils.h +++ b/python/pybind11_utils.h @@ -22,6 +22,7 @@ limitations under the License. #include #include +#include "common/memory/memcpy.h" #include "common/util/json.h" #include "common/util/status.h" #include "common/util/uuid.h" @@ -107,17 +108,18 @@ void throw_on_error(Status const& status); * * @size: capacity of the dst memory block. */ -Status copy_memoryview(PyObject* src, void* dst, size_t const size, - size_t const offset = 0); +Status copy_memoryview( + PyObject* src, void* dst, size_t const size, size_t const offset = 0, + size_t const concurrency = memory::default_memcpy_concurrency); /** * Copy a memoryview/buffer to a dst pointer. * * @size: capacity of the dst memoryview. */ -Status copy_memoryview_to_memoryview(PyObject* src, PyObject* dst, - size_t const size, - size_t const offset = 0); +Status copy_memoryview_to_memoryview( + PyObject* src, PyObject* dst, size_t const size, size_t const offset = 0, + size_t const concurrency = memory::default_memcpy_concurrency); namespace detail { py::object from_json(const json& value); diff --git a/python/vineyard/data/arrow.py b/python/vineyard/data/arrow.py index e62f2295..a0c49e34 100644 --- a/python/vineyard/data/arrow.py +++ b/python/vineyard/data/arrow.py @@ -32,6 +32,7 @@ from vineyard._C import IPCClient from vineyard._C import Object from vineyard._C import ObjectMeta +from vineyard._C import RemoteBlob from vineyard.core.builder import BuilderContext from vineyard.core.resolver import ResolverContext from vineyard.data.utils import build_buffer @@ -48,16 +49,11 @@ def buffer_builder(client, buffer: Union[bytes, memoryview], builder: BuilderCon return build_buffer(client, address, size, builder) -def as_arrow_buffer(blob: Blob): - if isinstance(blob, Blob): - buffer = blob.buffer +def as_arrow_buffer(blob: Union[Blob, RemoteBlob]): + if isinstance(blob, (Blob, RemoteBlob)) and not blob.is_empty: + buffer = memoryview(blob) else: - if not blob.is_empty: - buffer = memoryview(blob) - else: - buffer = memoryview(b'') - if buffer is None: - return pa.py_buffer(bytearray()) + buffer = memoryview(b'') return pa.py_buffer(buffer) diff --git a/python/vineyard/data/benchmarks/__init__.py b/python/vineyard/data/benchmarks/__init__.py new file mode 100644 index 00000000..5e2dfecb --- /dev/null +++ b/python/vineyard/data/benchmarks/__init__.py @@ -0,0 +1,17 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2020-2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/python/vineyard/data/benchmarks/test_tensor.py b/python/vineyard/data/benchmarks/test_tensor.py new file mode 100644 index 00000000..64438d17 --- /dev/null +++ b/python/vineyard/data/benchmarks/test_tensor.py @@ -0,0 +1,86 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2020-2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import numpy as np +import pyarrow as pa + +try: + import pyarrow.plasma as plasma +except ImportError: + plasma = None + +import pytest +import pytest_cases + +from vineyard.conftest import vineyard_client +from vineyard.conftest import vineyard_rpc_client +from vineyard.core import default_builder_context +from vineyard.core import default_resolver_context +from vineyard.data import register_builtin_types + +register_builtin_types(default_builder_context, default_resolver_context) + + +@pytest.fixture(scope="module", autouse=True) +def plasma_client(): + if plasma is None: + pytest.skip("plasma is not installed, pyarrow<=11 is required") + + with plasma.start_plasma_store(plasma_store_memory=1024 * 1024 * 1024 * 8) as ( + plasma_socket, + plasma_proc, + ): + plasma_client = plasma.connect(plasma_socket) + yield plasma_client + plasma_client.disconnect() + plasma_proc.kill() + + +@pytest.mark.skipif( + plasma is None, reason="plasma is not installed, pyarrow<=11 is required" +) +@pytest_cases.parametrize( + "client,nbytes", + [ + (vineyard_client, '256'), + (vineyard_client, '256KB'), + (vineyard_client, '256MB'), + (vineyard_client, '1GB'), + (vineyard_client, '4GB'), + (plasma_client, '256'), + (plasma_client, '256KB'), + (plasma_client, '256MB'), + (plasma_client, '1GB'), + (plasma_client, '4GB'), + ], +) +def test_bench_numpy_ndarray(benchmark, client, nbytes): + shape = { + '256': (64,), + '256KB': (64, 1024), + '256MB': (64, 1024, 1024), + '1GB': (256, 1024, 1024), + '4GB': (1024, 1024, 1024), + }[nbytes] + data = np.random.rand(*shape).astype(np.float32) + + def bench_numpy_ndarray(client, data): + object_id = client.put(data) + client.delete([object_id]) + + benchmark(bench_numpy_ndarray, client, data) diff --git a/python/vineyard/drivers/io/adaptors/deserializer.py b/python/vineyard/drivers/io/adaptors/deserializer.py index b0e2d9e1..9bfbfd8d 100644 --- a/python/vineyard/drivers/io/adaptors/deserializer.py +++ b/python/vineyard/drivers/io/adaptors/deserializer.py @@ -69,7 +69,7 @@ def copy_bytestream_to_blob(client, bs: ByteStream, blob: BlobBuilder): serialization_options = json.loads(serialization_options) offset = 0 reader = bs.open_reader(client) - buffer = blob.buffer + buffer = memoryview(blob) raw_buffer = io.BytesIO() while True: try: diff --git a/src/common/memory/memcpy.h b/src/common/memory/memcpy.h index f964bba9..9988e22d 100644 --- a/src/common/memory/memcpy.h +++ b/src/common/memory/memcpy.h @@ -19,7 +19,10 @@ #define SRC_COMMON_MEMORY_MEMCPY_H_ #include +#include #include +#include +#include namespace vineyard { @@ -250,6 +253,41 @@ static inline void * inline_memcpy(void * __restrict dst_, const void * __restri // clang-format on +// use the same default concurrency as apache-arrow. +static constexpr size_t default_memcpy_concurrency = 6; + +static inline void* concurrent_memcpy(void* __restrict dst_, + const void* __restrict src_, size_t size, + const size_t concurrency = default_memcpy_concurrency) { + static constexpr size_t concurrent_memcpy_threshold = 1024 * 1024 * 4; + if (size < concurrent_memcpy_threshold) { + inline_memcpy(dst_, src_, size); + } else if ((dst_ >= src_ && + dst_ <= static_cast(src_) + size) || + (src_ >= dst_ && src_ <= static_cast(dst_) + size)) { + inline_memcpy(dst_, src_, size); + } else { + static constexpr size_t alignment = 1024 * 1024 * 4; + size_t chunk_size = (size / concurrency + alignment - 1) & ~(alignment - 1); + std::vector threads; + for (size_t i = 0; i < concurrency; ++i) { + if (size <= i * chunk_size) { + break; + } + size_t chunk = std::min(chunk_size, size - i * chunk_size); + threads.emplace_back([=]() { + inline_memcpy(static_cast(dst_) + i * chunk_size, + static_cast(src_) + i * chunk_size, + chunk); + }); + } + for (auto &thread: threads) { + thread.join(); + } + } + return dst_; +} + } // namespace memory } // namespace vineyard diff --git a/test/concurrent_memcpy_test.cc b/test/concurrent_memcpy_test.cc new file mode 100644 index 00000000..4df50465 --- /dev/null +++ b/test/concurrent_memcpy_test.cc @@ -0,0 +1,65 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include +#include + +#include "common/memory/memcpy.h" +#include "common/util/logging.h" + +using namespace vineyard; // NOLINT(build/namespaces) + +void testing_memcpy(const size_t size) { + LOG(INFO) << "Testing memcpy with size: " << size; + std::unique_ptr src(new char[size]); + std::unique_ptr dst(new char[size]); + for (size_t i = 0; i < size; ++i) { + src[i] = i % 256; + } + + std::vector sizes_to_test = { + size, size - 1, size - 3, size - 10, size - 1024, size - 1024 * 1024, + }; + + for (size_t size_to_test : sizes_to_test) { + if (size_to_test > size) { + continue; + } + for (size_t concurrency = 1; concurrency <= 8; ++concurrency) { + memset(dst.get(), 0, size_to_test); + memory::concurrent_memcpy(dst.get(), src.get(), size_to_test, + concurrency); + CHECK_EQ(0, memcmp(dst.get(), src.get(), size_to_test)); + } + } + LOG(INFO) << "Passed memcpy test with size: " << size; +} + +int main(int argc, char** argv) { + if (argc < 1) { + printf("usage ./concurrent_memcpy_test"); + return 1; + } + + for (size_t sz = 1024 * 1024 * 8; sz < 1024 * 1024 * 1024; + sz += 1024 * 1024 * 256) { + testing_memcpy(sz); + } + + LOG(INFO) << "Passed concurrent memcpy tests..."; + + return 0; +} diff --git a/test/runner.py b/test/runner.py index d5ff8229..5148226b 100755 --- a/test/runner.py +++ b/test/runner.py @@ -206,7 +206,7 @@ def make_metadata_settings(meta, endpoint, prefix): def start_vineyardd( metadata_settings, allocator_settings, - size=3 * 1024 * 1024 * 1024, + size=8 * 1024 * 1024 * 1024, default_ipc_socket=VINEYARD_CI_IPC_SOCKET, idx=None, spill_path="", @@ -433,6 +433,7 @@ def run_vineyard_cpp_tests(meta, allocator, endpoints, tests): # run_test('allocator_test') run_test(tests, 'arrow_data_structure_test') run_test(tests, 'clear_test') + run_test(tests, 'concurrent_memcpy_test') run_test(tests, 'custom_vector_test') run_test(tests, 'dataframe_test') run_test(tests, 'delete_test')