From a97ddf550b1496765fbce3135b51f195ab2c278b Mon Sep 17 00:00:00 2001 From: lixy9474 Date: Mon, 14 Aug 2023 16:02:29 +0800 Subject: [PATCH] [Embedding] Rename api calls. Signed-off-by: lixy9474 --- .../core/framework/embedding/config.proto | 6 +- .../counter_filter_descriptor_impl.h | 3 + .../framework/embedding/dense_hash_map_kv.h | 15 +- .../embedding/embedding_memory_pool.h | 12 +- .../framework/embedding/embedding_var.cu.cc | 144 ---- .../embedding/embedding_var_ckpt_data.cc | 13 +- .../embedding/embedding_var_ckpt_data.h | 4 + .../embedding/embedding_var_dump_iterator.h | 4 +- .../embedding/feature_descriptor_impl.h | 1 - .../embedding/globalstep_shrink_policy.h | 18 +- .../framework/embedding/gpu_hash_map_kv.h | 20 +- .../embedding/hbm_dram_ssd_storage.h | 4 +- .../framework/embedding/hbm_dram_storage.h | 4 +- .../hbm_multi_tier_feature_descriptor.h | 1 + .../embedding/hbm_storage_iterator.h | 7 +- .../core/framework/embedding/kv_interface.h | 29 +- .../embedding/l2weight_shrink_policy.h | 19 +- .../core/framework/embedding/layout_creator.h | 104 --- .../core/framework/embedding/shrink_policy.h | 21 +- .../framework/embedding/single_tier_storage.h | 237 +++---- .../core/framework/embedding/ssd_hash_kv.h | 112 +-- tensorflow/core/framework/embedding/storage.h | 170 +++-- .../core/framework/embedding/storage_config.h | 30 +- .../framework/embedding/storage_factory.h | 42 +- .../core/framework/embedding/value_ptr.h | 647 ------------------ tensorflow/core/kernels/BUILD | 5 +- .../kernels/embedding_variable_memory_test.cc | 20 +- .../kernels/embedding_variable_ops_test.cc | 632 ++++------------- .../embedding_variable_performance_test.cc | 25 +- .../core/kernels/embedding_variable_test.h | 43 +- .../group_embedding_lookup_ops_test.cc | 4 +- .../core/kernels/incr_save_restore_ops.h | 4 +- .../core/kernels/kv_variable_lookup_ops.cc | 4 +- tensorflow/core/kernels/kv_variable_ops.h | 1 + tensorflow/core/kernels/save_restore_tensor.h | 1 - 35 files changed, 511 insertions(+), 1895 deletions(-) delete mode 100644 tensorflow/core/framework/embedding/layout_creator.h delete mode 100644 tensorflow/core/framework/embedding/value_ptr.h diff --git a/tensorflow/core/framework/embedding/config.proto b/tensorflow/core/framework/embedding/config.proto index a8535347020..424fc5e1a38 100644 --- a/tensorflow/core/framework/embedding/config.proto +++ b/tensorflow/core/framework/embedding/config.proto @@ -50,11 +50,7 @@ enum EmbeddingVariableType { enum ValuePtrStatus { OK = 0; IS_DELETED = 1; -} - -enum ValuePosition { - IN_DRAM = 0; - NOT_IN_DRAM = 1; + NOT_IN_DRAM = 2; } enum IsSetInitialized { diff --git a/tensorflow/core/framework/embedding/counter_filter_descriptor_impl.h b/tensorflow/core/framework/embedding/counter_filter_descriptor_impl.h index 47894c04c4e..e51166a2895 100644 --- a/tensorflow/core/framework/embedding/counter_filter_descriptor_impl.h +++ b/tensorflow/core/framework/embedding/counter_filter_descriptor_impl.h @@ -106,6 +106,9 @@ class CounterFilterDescriptorImpl: public FeatureDescriptorImpl { void* Admit(void* val) override { if (!IsAdmit(val)) { return feat_desc_impl_->Allocate(); + } else { + LOG(FATAL)<<"Only unadmited feature could be admited."; + return nullptr; } } diff --git a/tensorflow/core/framework/embedding/dense_hash_map_kv.h b/tensorflow/core/framework/embedding/dense_hash_map_kv.h index 92baf037721..ffaf2e335dc 100644 --- a/tensorflow/core/framework/embedding/dense_hash_map_kv.h +++ b/tensorflow/core/framework/embedding/dense_hash_map_kv.h @@ -23,9 +23,6 @@ limitations under the License. #include "tensorflow/core/framework/embedding/kv_interface.h" namespace tensorflow { -template -class ValuePtr; - namespace embedding { template @@ -45,7 +42,7 @@ class DenseHashMap : public KVInterface { delete []hash_map_; } - Status Lookup(K key, ValuePtr** value_ptr) override { + Status Lookup(K key, void** value_ptr) override { int64 l_id = std::abs(key)%partition_num_; spin_rd_lock l(hash_map_[l_id].mu); auto iter = hash_map_[l_id].hash_map.find(key); @@ -70,7 +67,7 @@ class DenseHashMap : public KVInterface { } } - Status Insert(K key, const ValuePtr* value_ptr) override { + Status Insert(K key, const void* value_ptr) override { int64 l_id = std::abs(key)%partition_num_; spin_wr_lock l(hash_map_[l_id].mu); auto iter = hash_map_[l_id].hash_map.find(key); @@ -80,8 +77,8 @@ class DenseHashMap : public KVInterface { "already exists Key: ", key, " in DenseHashMap."); } else { auto iter = hash_map_[l_id].hash_map.insert( - std::move(std::pair*>(key, - const_cast*>(value_ptr)))); + std::move(std::pair(key, + const_cast(value_ptr)))); return Status::OK(); } } @@ -109,7 +106,7 @@ class DenseHashMap : public KVInterface { } Status GetSnapshot(std::vector* key_list, - std::vector* >* value_ptr_list) override { + std::vector* value_ptr_list) override { dense_hash_map hash_map_dump[partition_num_]; for (int i = 0; i< partition_num_; i++) { spin_rd_lock l(hash_map_[i].mu); @@ -132,7 +129,7 @@ class DenseHashMap : public KVInterface { const int partition_num_ = 1000; struct dense_hash_map { mutable easy_spinrwlock_t mu = EASY_SPINRWLOCK_INITIALIZER; - google::dense_hash_map* > hash_map; + google::dense_hash_map hash_map; }; dense_hash_map* hash_map_; }; diff --git a/tensorflow/core/framework/embedding/embedding_memory_pool.h b/tensorflow/core/framework/embedding/embedding_memory_pool.h index 27b31ce1ed7..ef175151b00 100644 --- a/tensorflow/core/framework/embedding/embedding_memory_pool.h +++ b/tensorflow/core/framework/embedding/embedding_memory_pool.h @@ -18,9 +18,6 @@ limitations under the License. #include namespace tensorflow { -template -class ValuePtr; - namespace embedding { template class EmbeddingMemoryPool { @@ -50,7 +47,7 @@ class EmbeddingMemoryPool { return ptr; } - void Deallocate(std::vector*> value_ptrs) { + void Deallocate(std::vector value_ptrs) { int64 prev_size = value_ptrs_queue_.size(); for (auto it : value_ptrs) { value_ptrs_queue_.emplace_back(it); @@ -59,9 +56,8 @@ class EmbeddingMemoryPool { int64 n = value_ptrs_queue_.size() - embs_per_block_; n = std::min(prev_size, n); for (int64 i = 0; i < n; i++) { - ValuePtr* val = value_ptrs_queue_.front(); - free_ptr_queue_.emplace_back(val->GetValue(0, 0)); - delete val; + void* val = value_ptrs_queue_.front(); + free_ptr_queue_.emplace_back((V*)val); value_ptrs_queue_.pop_front(); } } @@ -88,7 +84,7 @@ class EmbeddingMemoryPool { int64 embs_per_block_; Allocator* alloc_; std::deque free_ptr_queue_; - std::deque*> value_ptrs_queue_; + std::deque value_ptrs_queue_; std::vector block_list_; }; } //embedding diff --git a/tensorflow/core/framework/embedding/embedding_var.cu.cc b/tensorflow/core/framework/embedding/embedding_var.cu.cc index 0c0be83ec1d..f7162fd2c22 100644 --- a/tensorflow/core/framework/embedding/embedding_var.cu.cc +++ b/tensorflow/core/framework/embedding/embedding_var.cu.cc @@ -42,71 +42,6 @@ void SyncWithEventMgr(se::Stream* stream, while(!is_kernel_finish) {} } -template -void EmbeddingVar::SetDefaultValueOfNewFeatures( - const K* keys, int64 size, const std::list& init_cursor, - V** memcpy_address, se::Stream* compute_stream, EventMgr* event_mgr, - const Eigen::GpuDevice& gpu_device) { - if (init_cursor.size() > 0) { - int64 total = init_cursor.size(); - V** value_address = nullptr; - value_address = TypedAllocator::Allocate(cpu_allocator(), total * 2, - AllocationAttributes()); - V** default_value_address = value_address + total; - V** dev_value_address = nullptr; - dev_value_address = - TypedAllocator::Allocate(alloc_, total * 2, AllocationAttributes()); - V** dev_default_value_address = dev_value_address + total; - int64 i = 0; - auto it = init_cursor.cbegin(); - for (; it != init_cursor.cend(); ++it, ++i) { - ValuePtr* value_ptr = - reinterpret_cast*>(memcpy_address[*it]); - value_address[i] = - *((V**)((char*)(value_ptr->GetPtr()) + sizeof(FixedLengthHeader))) + - storage_->GetOffset(emb_config_.emb_index); - default_value_address[i] = - default_value_ + - (keys[i] % emb_config_.default_value_dim) % value_len_; - } - DeviceMemoryBase gpu_dst_ptr(dev_value_address, total * 2 * sizeof(V*)); - compute_stream->ThenMemcpy(&gpu_dst_ptr, value_address, - total * 2 * sizeof(V*)); - int block_dim = 128; - TF_CHECK_OK(GpuLaunchKernel( - embedding::CopyEmbedding, - (total * value_len_ + block_dim - 1) / block_dim, - block_dim, 0, gpu_device.stream(), dev_default_value_address, - dev_value_address, value_len_, total)); - SyncWithEventMgr(compute_stream, event_mgr); - // Set init meta of ValuePtrs - for (auto it = init_cursor.cbegin(); it != init_cursor.cend(); ++it) { - ValuePtr* value_ptr = - reinterpret_cast*>(memcpy_address[*it]); - value_ptr->SetInitialized(emb_config_.emb_index); - memcpy_address[*it] = value_ptr->GetValue( - emb_config_.emb_index, - storage_->GetOffset(emb_config_.emb_index)); - } - TypedAllocator::Deallocate(alloc_, dev_value_address, total * 2); - TypedAllocator::Deallocate(cpu_allocator(), value_address, total * 2); - } -} - -#define REGISTER_KERNELS(ktype, vtype) \ - template void EmbeddingVar::SetDefaultValueOfNewFeatures( \ - const ktype*, int64, const std::list&, vtype**, \ - se::Stream*, EventMgr*, const Eigen::GpuDevice& gpu_device); -#define REGISTER_KERNELS_ALL(type) \ - REGISTER_KERNELS(int32, type); \ - REGISTER_KERNELS(int64, type) -#define REGISTER_KERNELS_CPU(type) REGISTER_KERNELS_ALL(type) -TF_CALL_FLOAT_TYPES(REGISTER_KERNELS_CPU) -#undef REGISTER_KERNELS_CPU - -#undef REGISTER_KERNELS_ALL -#undef REGISTER_KERNELS - template void EmbeddingVar::CopyEmbeddingsToBuffer( V* val_base, int64 size, V** memcpy_address, @@ -136,85 +71,6 @@ void EmbeddingVar::CopyEmbeddingsToBuffer( TF_CALL_FLOAT_TYPES(REGISTER_KERNELS_CPU) #undef REGISTER_KERNELS_CPU -#undef REGISTER_KERNELS_ALL -#undef REGISTER_KERNELS - -template -void EmbeddingVar::CopyEmbeddingsFromCPUToGPU( - const K* keys, const std::list& copyback_cursor, V** memcpy_address, - se::Stream* compute_stream, EventMgr* event_mgr, - const Eigen::GpuDevice& gpu_device, - const DeviceBase::CpuWorkerThreads* worker_threads, - int64* output_value_ptrs) { - if (copyback_cursor.size() > 0) { - int64 total = copyback_cursor.size(); - size_t value_len = emb_config_.total_num(storage_->GetAllocLen()); - V* memcpy_buffer_gpu = nullptr; - ValuePtr** gpu_value_ptrs = new ValuePtr*[total]; - memcpy_buffer_gpu = (V*)alloc_->AllocateRaw(Allocator::kAllocatorAlignment, - total * value_len * sizeof(V)); - storage_->CopyEmbeddingsFromCPUToGPU( - total, keys, copyback_cursor, memcpy_address, value_len, gpu_value_ptrs, - memcpy_buffer_gpu, compute_stream, event_mgr, worker_threads); - - V** value_address = (V**)cpu_allocator()->AllocateRaw( - Allocator::kAllocatorAlignment, sizeof(V*) * total); - V** dev_value_address = (V**)alloc_->AllocateRaw(Allocator::kAllocatorAlignment, - sizeof(V*) * total); - std::vector copyback_keys(total); - int64 i = 0; - auto it = copyback_cursor.cbegin(); - for (; it != copyback_cursor.cend(); ++it, ++i) { - bool init; - // Get the curosr - int64 cursor = *it & 0x0fffffffffffffff; - gpu_value_ptrs[i]->SetInitialized(emb_config_.emb_index); - memcpy_address[cursor] = LookupOrCreateEmb(gpu_value_ptrs[i], init); - value_address[i] = memcpy_address[cursor]; - copyback_keys[i] = keys[cursor]; - } - DeviceMemoryBase gpu_dst_ptr(dev_value_address, total * sizeof(V*)); - compute_stream->ThenMemcpy(&gpu_dst_ptr, value_address, total * sizeof(V*)); - - int block_dim = 128; - TF_CHECK_OK(GpuLaunchKernel( - embedding::BatchUnpack, (total + block_dim - 1) / block_dim * value_len, - block_dim, 0, gpu_device.stream(), dev_value_address, memcpy_buffer_gpu, - value_len, total)); - - auto do_insert = [this, copyback_keys, gpu_value_ptrs, value_len]( - int64 start, int64 limit) { - for (int64 i = start; i < limit; i++) - storage_->Insert(copyback_keys[i], gpu_value_ptrs[i]); - }; - Shard(worker_threads->num_threads, worker_threads->workers, - copyback_keys.size(), 100000, do_insert); - if (output_value_ptrs != nullptr) { - auto it = copyback_cursor.cbegin(); - for (int64 i = 0; it != copyback_cursor.cend(); ++it, ++i) { - int64 cursor = *it & 0x0fffffffffffffff; - output_value_ptrs[cursor] = (int64)gpu_value_ptrs[i]; - } - } - SyncWithEventMgr(compute_stream, event_mgr); - - alloc_->DeallocateRaw(dev_value_address); - alloc_->DeallocateRaw(memcpy_buffer_gpu); - cpu_allocator()->DeallocateRaw(value_address); - delete[] gpu_value_ptrs; - } -} -#define REGISTER_KERNELS(ktype, vtype) \ - template void EmbeddingVar::CopyEmbeddingsFromCPUToGPU( \ - const ktype*, const std::list&, vtype**, se::Stream*, EventMgr*, \ - const Eigen::GpuDevice&, const DeviceBase::CpuWorkerThreads*, int64*); -#define REGISTER_KERNELS_ALL(type) \ - REGISTER_KERNELS(int32, type); \ - REGISTER_KERNELS(int64, type) -#define REGISTER_KERNELS_CPU(type) REGISTER_KERNELS_ALL(type) -TF_CALL_FLOAT_TYPES(REGISTER_KERNELS_CPU) -#undef REGISTER_KERNELS_CPU - #undef REGISTER_KERNELS_ALL #undef REGISTER_KERNELS } // namespace tensorflow diff --git a/tensorflow/core/framework/embedding/embedding_var_ckpt_data.cc b/tensorflow/core/framework/embedding/embedding_var_ckpt_data.cc index 8165371d6d8..7dddf714b6b 100644 --- a/tensorflow/core/framework/embedding/embedding_var_ckpt_data.cc +++ b/tensorflow/core/framework/embedding/embedding_var_ckpt_data.cc @@ -21,16 +21,17 @@ namespace tensorflow { namespace embedding { template void EmbeddingVarCkptData::Emplace( - K key, ValuePtr* value_ptr, + K key, void* value_ptr, const EmbeddingConfig& emb_config, - V* default_value, int64 value_offset, + V* default_value, + FeatureDescriptor* feat_desc, bool is_save_freq, bool is_save_version, bool save_unfiltered_features) { if((int64)value_ptr == ValuePtrStatus::IS_DELETED) return; - bool is_in_dram = ((int64)value_ptr >> 49 == 0); + bool is_in_dram = ((int64)value_ptr >> kDramFlagOffset == 0); bool is_admit = feat_desc->IsAdmit(value_ptr); if (is_admit) { @@ -38,7 +39,7 @@ void EmbeddingVarCkptData::Emplace( if (!is_in_dram) { value_ptr_vec_.emplace_back((V*)ValuePtrStatus::NOT_IN_DRAM); - value_ptr = (void*)((int64)value_ptr & 0x1ffffffffffff); + value_ptr = (void*)((int64)value_ptr & ((1L << kDramFlagOffset) - 1)); } else if (feat_desc->GetEmbedding(value_ptr, 0) == nullptr) { value_ptr_vec_.emplace_back(default_value); } else { @@ -71,8 +72,8 @@ void EmbeddingVarCkptData::Emplace( } #define REGISTER_KERNELS(ktype, vtype) \ template void EmbeddingVarCkptData::Emplace( \ - ktype, ValuePtr*, const EmbeddingConfig&, \ - vtype*, int64, bool, bool, bool); + ktype, void*, const EmbeddingConfig&, \ + vtype*, FeatureDescriptor*, bool, bool, bool); #define REGISTER_KERNELS_ALL_INDEX(type) \ REGISTER_KERNELS(int32, type) \ REGISTER_KERNELS(int64, type) diff --git a/tensorflow/core/framework/embedding/embedding_var_ckpt_data.h b/tensorflow/core/framework/embedding/embedding_var_ckpt_data.h index 5e7630d144e..10bf0d0e43b 100644 --- a/tensorflow/core/framework/embedding/embedding_var_ckpt_data.h +++ b/tensorflow/core/framework/embedding/embedding_var_ckpt_data.h @@ -19,6 +19,10 @@ limitations under the License. #include "tensorflow/core/framework/embedding/embedding_var_dump_iterator.h" namespace tensorflow { class BundleWriter; +namespace { + const int kSavedPartitionNum = 1000; + const int kDramFlagOffset = 49; +} namespace embedding { template diff --git a/tensorflow/core/framework/embedding/embedding_var_dump_iterator.h b/tensorflow/core/framework/embedding/embedding_var_dump_iterator.h index 84c823a90dc..4c052b43c7e 100644 --- a/tensorflow/core/framework/embedding/embedding_var_dump_iterator.h +++ b/tensorflow/core/framework/embedding/embedding_var_dump_iterator.h @@ -57,7 +57,7 @@ class EV2dVectorDataDumpIterator: public DumpIterator { value_len_(value_len), col_idx_(0) { if (!valueptr_list.empty()) { - if ((int64)*curr_iter_ == ValuePosition::NOT_IN_DRAM) { + if ((int64)*curr_iter_ == ValuePtrStatus::NOT_IN_DRAM) { curr_ptr_ = val_iter_->Next(); } else { curr_ptr_ = *curr_iter_; @@ -75,7 +75,7 @@ class EV2dVectorDataDumpIterator: public DumpIterator { curr_iter_++; col_idx_ = 0; if (curr_iter_ != end_iter_) { - if ((int64)*curr_iter_ == ValuePosition::NOT_IN_DRAM) { + if ((int64)*curr_iter_ == ValuePtrStatus::NOT_IN_DRAM) { curr_ptr_ = val_iter_->Next(); } else { curr_ptr_ = *curr_iter_; diff --git a/tensorflow/core/framework/embedding/feature_descriptor_impl.h b/tensorflow/core/framework/embedding/feature_descriptor_impl.h index 90800da706e..6996d22f447 100644 --- a/tensorflow/core/framework/embedding/feature_descriptor_impl.h +++ b/tensorflow/core/framework/embedding/feature_descriptor_impl.h @@ -18,7 +18,6 @@ limitations under the License. #if GOOGLE_CUDA #define EIGEN_USE_GPU -#include "tensorflow/core/kernels/gpu_device_array.h" #include "tensorflow/core/platform/stream_executor.h" #endif // GOOGLE_CUDA diff --git a/tensorflow/core/framework/embedding/globalstep_shrink_policy.h b/tensorflow/core/framework/embedding/globalstep_shrink_policy.h index a2af6a2430a..b0950eff22d 100644 --- a/tensorflow/core/framework/embedding/globalstep_shrink_policy.h +++ b/tensorflow/core/framework/embedding/globalstep_shrink_policy.h @@ -18,25 +18,21 @@ limitations under the License. #include "tensorflow/core/framework/embedding/shrink_policy.h" namespace tensorflow { - -template -class ValuePtr; - namespace embedding { template class GlobalStepShrinkPolicy : public ShrinkPolicy { public: GlobalStepShrinkPolicy(int64 steps_to_live, - Allocator* alloc, + FeatureDescriptor* feat_desc, KVInterface* kv) : steps_to_live_(steps_to_live), kv_(kv), - ShrinkPolicy(alloc) {} + ShrinkPolicy(feat_desc) {} TF_DISALLOW_COPY_AND_ASSIGN(GlobalStepShrinkPolicy); void Shrink(std::vector& key_list, - std::vector*>& value_list, + std::vector& value_list, const ShrinkArgs& shrink_args) override { ShrinkPolicy::ReleaseValuePtrs(); FilterToDelete(shrink_args.global_step, @@ -46,16 +42,16 @@ class GlobalStepShrinkPolicy : public ShrinkPolicy { private: void FilterToDelete(int64 global_step, std::vector& key_list, - std::vector*>& value_list) { + std::vector& value_list) { for (int64 i = 0; i < key_list.size(); ++i) { - int64 version = value_list[i]->GetStep(); + int64 version = ShrinkPolicy::feat_desc_->GetVersion(value_list[i]); if (version == -1) { - value_list[i]->SetStep(global_step); + ShrinkPolicy::feat_desc_->UpdateVersion(value_list[i], global_step); } else { if (global_step - version > steps_to_live_) { kv_->Remove(key_list[i]); ShrinkPolicy::EmplacePointer(value_list[i]); - value_list[i] = (ValuePtr*)ValuePtrStatus::IS_DELETED; + value_list[i] = (void*)ValuePtrStatus::IS_DELETED; } } } diff --git a/tensorflow/core/framework/embedding/gpu_hash_map_kv.h b/tensorflow/core/framework/embedding/gpu_hash_map_kv.h index 1dd90d63a6e..fc4a2506313 100644 --- a/tensorflow/core/framework/embedding/gpu_hash_map_kv.h +++ b/tensorflow/core/framework/embedding/gpu_hash_map_kv.h @@ -204,29 +204,29 @@ class GPUHashMapKV : public KVInterface { } Status BatchLookupOrCreate(const K* keys, size_t n, - ValuePtr** value_ptrs) override { + void** value_ptrs) override { return Status::OK(); } - Status Lookup(K key, ValuePtr** value_ptr) override { + Status Lookup(K key, void** value_ptr) override { return Status::OK(); } Status Contains(K key) override { return Status::OK(); } - Status Insert(K key, const ValuePtr* value_ptr) override { + Status Insert(K key, const void* value_ptr) override { return Status::OK(); } Status Remove(K key) override { return Status::OK(); } Status BatchLookup(const K* keys, size_t size, - ValuePtr** value_ptrs) override { + void** value_ptrs) override { return Status::OK(); } Status BatchInsert(const std::vector& keys, - const std::vector*>& value_ptrs) override { + const std::vector& value_ptrs) override { return Status::OK(); } @@ -235,22 +235,20 @@ class GPUHashMapKV : public KVInterface { } Status BatchCommit(const std::vector& keys, - const std::vector*>& value_ptrs) override { + const std::vector& value_ptrs) override { return Status::OK(); } int64 Size() const override { return 0; } - void SetTotalDims(int total_dims) override {} + void FreeValuePtr(void* value_ptr) override {} - void FreeValuePtr(ValuePtr* value_ptr) override {} - - Status Commit(K key, const ValuePtr* value_ptr) override { + Status Commit(K key, const void* value_ptr) override { return Status::OK(); } Status GetSnapshot(std::vector* key_list, - std::vector*>* value_ptr_list) override { + std::vector* value_ptr_list) override { return Status::OK(); } diff --git a/tensorflow/core/framework/embedding/hbm_dram_ssd_storage.h b/tensorflow/core/framework/embedding/hbm_dram_ssd_storage.h index 2003197c3de..1056f4bbd78 100644 --- a/tensorflow/core/framework/embedding/hbm_dram_ssd_storage.h +++ b/tensorflow/core/framework/embedding/hbm_dram_ssd_storage.h @@ -244,8 +244,10 @@ class HbmDramSsdStorage : public MultiTierStorage { } for (auto value_ptr: value_ptr_list) { - if ((int64)value_ptr >> kDramFlagOffset == 1) + if ((int64)value_ptr >> kDramFlagOffset == 1) { + value_ptr = (void*)((int64)value_ptr & ((1L << kDramFlagOffset) - 1)); cpu_allocator()->DeallocateRaw(value_ptr); + } } ssd_->Save(tensor_name, prefix, writer, emb_config, diff --git a/tensorflow/core/framework/embedding/hbm_dram_storage.h b/tensorflow/core/framework/embedding/hbm_dram_storage.h index 7647663cdd0..d058d95f05b 100644 --- a/tensorflow/core/framework/embedding/hbm_dram_storage.h +++ b/tensorflow/core/framework/embedding/hbm_dram_storage.h @@ -219,8 +219,10 @@ class HbmDramStorage : public MultiTierStorage { } for (auto value_ptr: value_ptr_list) { - if ((int64)value_ptr >> kDramFlagOffset == 1) + if ((int64)value_ptr >> kDramFlagOffset == 1) { + value_ptr = (void*)((int64)value_ptr & ((1L << kDramFlagOffset) - 1)); cpu_allocator()->DeallocateRaw(value_ptr); + } } return Status::OK(); } diff --git a/tensorflow/core/framework/embedding/hbm_multi_tier_feature_descriptor.h b/tensorflow/core/framework/embedding/hbm_multi_tier_feature_descriptor.h index 517df06852f..a3603a61550 100644 --- a/tensorflow/core/framework/embedding/hbm_multi_tier_feature_descriptor.h +++ b/tensorflow/core/framework/embedding/hbm_multi_tier_feature_descriptor.h @@ -17,6 +17,7 @@ limitations under the License. #include "tensorflow/core/util/env_var.h" #include "tensorflow/core/framework/embedding/feature_descriptor_impl.h" #include "tensorflow/core/framework/embedding/embedding_memory_pool.h" +#include "tensorflow/core/platform/mutex.h" namespace tensorflow { namespace embedding { diff --git a/tensorflow/core/framework/embedding/hbm_storage_iterator.h b/tensorflow/core/framework/embedding/hbm_storage_iterator.h index 36d331e74aa..31dc4459a13 100644 --- a/tensorflow/core/framework/embedding/hbm_storage_iterator.h +++ b/tensorflow/core/framework/embedding/hbm_storage_iterator.h @@ -28,10 +28,11 @@ class HbmValueIterator: public ValueIterator { public: HbmValueIterator( const std::vector& key_list, - const std::vector*>& value_ptr_list, + const std::vector& value_ptr_list, int64 emb_index, int64 value_len, - Allocator* alloc) + Allocator* alloc, + FeatureDescriptor* feat_desc) : value_len_(value_len), alloc_(alloc) { int64 emb_offset = value_len_ * emb_index; @@ -40,7 +41,7 @@ class HbmValueIterator: public ValueIterator { for (int part_id = 0; part_id < kSavedPartitionNum; part_id++) { if (key_list[i] % kSavedPartitionNum == part_id) { value_parts_vec[part_id].emplace_back( - value_ptr_list[i]->GetValue(emb_index, emb_offset)); + feat_desc->GetEmbedding(value_ptr_list[i], emb_index)); break; } } diff --git a/tensorflow/core/framework/embedding/kv_interface.h b/tensorflow/core/framework/embedding/kv_interface.h index 5d1f20b581a..3659187c825 100644 --- a/tensorflow/core/framework/embedding/kv_interface.h +++ b/tensorflow/core/framework/embedding/kv_interface.h @@ -17,6 +17,7 @@ limitations under the License. #define TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_KV_INTERFACE_H_ #include "tensorflow/core/framework/device_base.h" +#include "tensorflow/core/framework/embedding/feature_descriptor.h" #include "tensorflow/core/lib/core/status.h" namespace tensorflow { @@ -24,9 +25,6 @@ namespace { const char* kInferenceMode = "INFERENCE_MODE"; } -template -class ValuePtr; - template class GPUHashTable; @@ -43,19 +41,19 @@ template class KVInterface { public: virtual ~KVInterface() {} - virtual Status Lookup(K key, ValuePtr** value_ptr) = 0; + virtual Status Lookup(K key, void** value_ptr) = 0; virtual Status Contains(K key) = 0; - virtual Status Insert(K key, const ValuePtr* value_ptr) = 0; + virtual Status Insert(K key, const void* value_ptr) = 0; virtual Status Remove(K key) = 0; virtual Status BatchLookup(const K* keys, size_t size, - ValuePtr** value_ptrs) { + void** value_ptrs) { return Status(error::Code::UNIMPLEMENTED, "Unimplemented for BatchLookup in KVInterface."); } // KV Batch Insert virtual Status BatchInsert(const std::vector& keys, - const std::vector*>& value_ptrs) { + const std::vector& value_ptrs) { return Status(error::Code::UNIMPLEMENTED, "Unimplemented for BatchInsert in KVInterface."); } @@ -66,27 +64,30 @@ class KVInterface { } virtual Status BatchLookupOrCreate(const K* keys, size_t size, - ValuePtr** value_ptrs) { + void** value_ptrs) { return Status(error::Code::UNIMPLEMENTED, "Unimplemented for BatchLookupOrInsert in KVInterface."); } + virtual void UpdateValuePtr(K key, void* new_value_ptr, + void* old_value_ptr) { + LOG(FATAL)<<"Unimplemented for UpdateValuePtr in KVInterface."; + } + virtual Status BatchCommit(const std::vector& keys, - const std::vector*>& value_ptrs) = 0; + const std::vector& value_ptrs) = 0; // KV Size virtual int64 Size() const = 0; - virtual void SetTotalDims(int total_dims) {} - - virtual void FreeValuePtr(ValuePtr* value_ptr) {} + virtual void FreeValuePtr(void* value_ptr) {} - virtual Status Commit(K key, const ValuePtr* value_ptr) { + virtual Status Commit(K key, const void* value_ptr) { return Status::OK(); } virtual Status GetSnapshot(std::vector* key_list, - std::vector*>* value_ptr_list) = 0; + std::vector* value_ptr_list) = 0; virtual std::string DebugString() const = 0; diff --git a/tensorflow/core/framework/embedding/l2weight_shrink_policy.h b/tensorflow/core/framework/embedding/l2weight_shrink_policy.h index 2af6b58f94b..9b0ea8aba3f 100644 --- a/tensorflow/core/framework/embedding/l2weight_shrink_policy.h +++ b/tensorflow/core/framework/embedding/l2weight_shrink_policy.h @@ -19,28 +19,23 @@ limitations under the License. namespace tensorflow { -template -class ValuePtr; - namespace embedding { template class L2WeightShrinkPolicy : public ShrinkPolicy { public: L2WeightShrinkPolicy(float l2_weight_threshold, int64 index, - int64 offset, - Allocator* alloc, + FeatureDescriptor* feat_desc, KVInterface* kv) : index_(index), - offset_(offset), kv_(kv), l2_weight_threshold_(l2_weight_threshold), - ShrinkPolicy(alloc) {} + ShrinkPolicy(feat_desc) {} TF_DISALLOW_COPY_AND_ASSIGN(L2WeightShrinkPolicy); void Shrink(std::vector& key_list, - std::vector*>& value_list, + std::vector& value_list, const ShrinkArgs& shrink_args) override { ShrinkPolicy::ReleaseValuePtrs(); FilterToDelete(shrink_args.value_len, @@ -50,9 +45,9 @@ class L2WeightShrinkPolicy : public ShrinkPolicy { private: void FilterToDelete(int64 value_len, std::vector& key_list, - std::vector*>& value_list) { + std::vector& value_list) { for (int64 i = 0; i < key_list.size(); ++i) { - V* val = value_list[i]->GetValue(index_, offset_); + V* val = ShrinkPolicy::feat_desc_->GetEmbedding(value_list[i], index_); if (val != nullptr) { V l2_weight = (V)0.0; for (int64 j = 0; j < value_len; j++) { @@ -61,7 +56,7 @@ class L2WeightShrinkPolicy : public ShrinkPolicy { l2_weight *= (V)0.5; if (l2_weight < (V)l2_weight_threshold_) { kv_->Remove(key_list[i]); - value_list[i] = (ValuePtr*)ValuePtrStatus::IS_DELETED; + value_list[i] = (void*)ValuePtrStatus::IS_DELETED; ShrinkPolicy::EmplacePointer(value_list[i]); } } @@ -70,7 +65,7 @@ class L2WeightShrinkPolicy : public ShrinkPolicy { private: int64 index_; - int64 offset_; + //int64 offset_; KVInterface* kv_; float l2_weight_threshold_; }; diff --git a/tensorflow/core/framework/embedding/layout_creator.h b/tensorflow/core/framework/embedding/layout_creator.h deleted file mode 100644 index 07d50451bf0..00000000000 --- a/tensorflow/core/framework/embedding/layout_creator.h +++ /dev/null @@ -1,104 +0,0 @@ -/* Copyright 2022 The DeepRec Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -======================================================================*/ -#ifndef TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_LAYOUT_CREATOR_H_ -#define TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_LAYOUT_CREATOR_H_ - -#include "tensorflow/core/framework/embedding/cache.h" -#include "tensorflow/core/framework/embedding/config.pb.h" -#include "tensorflow/core/framework/embedding/storage_config.h" -#include "tensorflow/core/lib/core/status.h" - -namespace tensorflow { -template -class ValuePtr; - -namespace embedding { -template -class LayoutCreator { - public: - virtual ValuePtr* Create(Allocator* alloc, size_t size) = 0; -}; - -template -class NormalLayoutCreator : public LayoutCreator { - public: - ValuePtr* Create(Allocator* alloc, size_t size) override { - return new NormalValuePtr(alloc, size); - } -}; - -template -class LightLayoutCreator : public LayoutCreator { - public: - ValuePtr* Create(Allocator* alloc, size_t size) override { - return new LightValuePtr(alloc, size); - } -}; - -template -class NormalContiguousLayoutCreator : public LayoutCreator { - public: - ValuePtr* Create(Allocator* alloc, size_t size) override { - return new NormalContiguousValuePtr(alloc, size); - } -}; - -template -class NormalContiguousGPULayoutCreator : public LayoutCreator { - public: - ValuePtr* Create(Allocator* alloc, size_t size) override { - return new NormalGPUValuePtr(alloc, size); - } -}; - -template -class CompactLayoutCreator : public LayoutCreator { - public: - ValuePtr* Create(Allocator* alloc, size_t size) override { - return new CompactValuePtr(alloc, size); - } -}; - -class LayoutCreatorFactory { - public: - template - static LayoutCreator* Create(const StorageConfig& sc) { - switch (sc.layout_type) { - case LayoutType::NORMAL: - static NormalLayoutCreator normal_creator; - return &normal_creator; - case LayoutType::LIGHT: - static LightLayoutCreator light_creator; - return &light_creator; - case LayoutType::NORMAL_CONTIGUOUS: - static NormalContiguousLayoutCreator normal_contiguous_creator; - return &normal_contiguous_creator; - case LayoutType::NORMAL_CONTIGUOUS_GPU: - static NormalContiguousGPULayoutCreator - normal_contiguous_gpu_creator; - return &normal_contiguous_gpu_creator; - case LayoutType::COMPACT: - static CompactLayoutCreator compact_creator; - return &compact_creator; - default: - static NormalLayoutCreator default_creator; - return &default_creator; - } - } -}; -} // embedding -} // tensorflow - -#endif // TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_LAYOUT_CREATOR_H_ diff --git a/tensorflow/core/framework/embedding/shrink_policy.h b/tensorflow/core/framework/embedding/shrink_policy.h index ea063a113a3..a8d0d9ada75 100644 --- a/tensorflow/core/framework/embedding/shrink_policy.h +++ b/tensorflow/core/framework/embedding/shrink_policy.h @@ -15,14 +15,11 @@ limitations under the License. #ifndef TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_SHRINK_POLICY_H_ #define TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_SHRINK_POLICY_H_ +#include "tensorflow/core/framework/embedding/feature_descriptor.h" #include "tensorflow/core/framework/embedding/kv_interface.h" #include "tensorflow/core/lib/core/status.h" namespace tensorflow { - -template -class ValuePtr; - class Allocator; namespace embedding { @@ -40,31 +37,29 @@ struct ShrinkArgs { template class ShrinkPolicy { public: - ShrinkPolicy(Allocator* alloc): alloc_(alloc) {} + ShrinkPolicy(FeatureDescriptor* feat_desc): feat_desc_(feat_desc) {} virtual ~ShrinkPolicy() {} TF_DISALLOW_COPY_AND_ASSIGN(ShrinkPolicy); virtual void Shrink(std::vector& key_list, - std::vector*>& value_list, + std::vector& value_list, const ShrinkArgs& shrink_args) = 0; protected: - void EmplacePointer(ValuePtr* value_ptr) { + void EmplacePointer(void* value_ptr) { to_delete_.emplace_back(value_ptr); } void ReleaseValuePtrs() { for (auto it : to_delete_) { - it->Destroy(alloc_); - delete it; + feat_desc_->Deallocate(it); } to_delete_.clear(); } protected: - std::vector*> to_delete_; - private: - Allocator* alloc_; + std::vector to_delete_; + FeatureDescriptor* feat_desc_; }; template @@ -74,7 +69,7 @@ class NonShrinkPolicy: public ShrinkPolicy { TF_DISALLOW_COPY_AND_ASSIGN(NonShrinkPolicy); void Shrink(std::vector& key_list, - std::vector*>& value_list, + std::vector& value_list, const ShrinkArgs& shrink_args) override {} }; } // embedding diff --git a/tensorflow/core/framework/embedding/single_tier_storage.h b/tensorflow/core/framework/embedding/single_tier_storage.h index f9de65df588..be08afd7f50 100644 --- a/tensorflow/core/framework/embedding/single_tier_storage.h +++ b/tensorflow/core/framework/embedding/single_tier_storage.h @@ -24,7 +24,6 @@ limitations under the License. #endif // GOOGLE_CUDA #include "tensorflow/core/framework/embedding/kv_interface.h" #include "tensorflow/core/framework/embedding/l2weight_shrink_policy.h" -#include "tensorflow/core/framework/embedding/layout_creator.h" #include "tensorflow/core/framework/embedding/leveldb_kv.h" #include "tensorflow/core/framework/embedding/ssd_hash_kv.h" #include "tensorflow/core/framework/embedding/storage_config.h" @@ -32,9 +31,6 @@ limitations under the License. #include "tensorflow/core/lib/core/status.h" namespace tensorflow { -template -class ValuePtr; - template class EmbeddingVar; @@ -62,24 +58,22 @@ class HbmDramSsdStorage; template class SingleTierStorage : public Storage { public: - SingleTierStorage(const StorageConfig& sc, Allocator* alloc, - KVInterface* kv, LayoutCreator* lc) - : kv_(kv), alloc_(alloc), layout_creator_(lc), + SingleTierStorage(const StorageConfig& sc, + KVInterface* kv, FeatureDescriptor* feat_desc) + : kv_(kv), feat_desc_(feat_desc), Storage(sc) { if (sc.embedding_config.steps_to_live != 0) { shrink_policy_ = new GlobalStepShrinkPolicy( sc.embedding_config.steps_to_live, - alloc_, + feat_desc_, kv_); } else if (sc.embedding_config.l2_weight_threshold != -1.0) { shrink_policy_ = new L2WeightShrinkPolicy( sc.embedding_config.l2_weight_threshold, sc.embedding_config.primary_emb_index, - Storage::GetOffset( - sc.embedding_config.primary_emb_index), - alloc_, + feat_desc_, kv_); } else { shrink_policy_ = new NonShrinkPolicy(); @@ -89,11 +83,10 @@ class SingleTierStorage : public Storage { ~SingleTierStorage() override { mutex_lock l(Storage::mu_); std::vector key_list; - std::vector*> value_ptr_list; + std::vector value_ptr_list; kv_->GetSnapshot(&key_list, &value_ptr_list); for (auto value_ptr : value_ptr_list) { - value_ptr->Destroy(alloc_); - delete value_ptr; + feat_desc_->Deallocate(value_ptr); } delete kv_; delete shrink_policy_; @@ -101,7 +94,7 @@ class SingleTierStorage : public Storage { TF_DISALLOW_COPY_AND_ASSIGN(SingleTierStorage); - Status Get(K key, ValuePtr** value_ptr) override { + Status Get(K key, void** value_ptr) override { return kv_->Lookup(key, value_ptr); } @@ -109,47 +102,45 @@ class SingleTierStorage : public Storage { return kv_->Contains(key); } - virtual void Insert(K key, ValuePtr** value_ptr, - size_t alloc_len, bool to_dram = false) override { + virtual void CreateAndInsert(K key, void** value_ptr, + bool to_dram=false) override { do { - *value_ptr = layout_creator_->Create(alloc_, alloc_len); + *value_ptr = feat_desc_->Allocate(); Status s = kv_->Insert(key, *value_ptr); if (s.ok()) { break; } else { - (*value_ptr)->Destroy(alloc_); - delete *value_ptr; + feat_desc_->Deallocate(*value_ptr); } } while (!(kv_->Lookup(key, value_ptr)).ok()); } - virtual void Insert(K key, ValuePtr* value_ptr) override { - LOG(FATAL)<<"Unsupport Insert(K, ValuePtr*) in SingleTireStorage."; + virtual void Insert(K key, void** value_ptr) override { + do { + Status s = kv_->Insert(key, *value_ptr); + if (s.ok()) { + break; + } else { + feat_desc_->Deallocate(*value_ptr); + } + } while (!(kv_->Lookup(key, value_ptr)).ok()); } - Status GetOrCreate(K key, ValuePtr** value_ptr, - size_t size) override { + Status GetOrCreate(K key, void** value_ptr) override { Status s = kv_->Lookup(key, value_ptr); if (s.ok()) { return s; } - *value_ptr = layout_creator_->Create(alloc_, size); + *value_ptr = feat_desc_->Allocate(); s = kv_->Insert(key, *value_ptr); if (s.ok()) { return s; } // Insert Failed, key already exist - (*value_ptr)->Destroy(alloc_); - delete *value_ptr; + feat_desc_->Deallocate(*value_ptr); return kv_->Lookup(key, value_ptr); } - - Status GetOrCreate(K key, ValuePtr** value_ptr, - size_t size, CopyBackFlag &need_copyback) override { - need_copyback = NOT_COPYBACK; - return GetOrCreate(key, value_ptr, size); - } Status Remove(K key) override { return kv_->Remove(key); @@ -180,7 +171,7 @@ class SingleTierStorage : public Storage { int total, const K* keys, const std::list& copyback_cursor, V** memcpy_address, size_t value_len, - ValuePtr **gpu_value_ptrs, + void **gpu_value_ptrs, V* memcpy_buffer_gpu, se::Stream* compute_stream, EventMgr* event_mgr, @@ -198,13 +189,13 @@ class SingleTierStorage : public Storage { } virtual Status BatchCommit(const std::vector& keys, - const std::vector*>& value_ptrs) override { + const std::vector& value_ptrs) override { LOG(FATAL) << "Unsupport BatchCommit in Storage: " << typeid(this).name(); return Status::OK(); } - virtual Status Commit(K keys, const ValuePtr* value_ptr) { + virtual Status Commit(K keys, const void* value_ptr) { LOG(FATAL) << "Unsupport Commit in Storage: " << typeid(this).name(); return Status::OK(); @@ -222,19 +213,12 @@ class SingleTierStorage : public Storage { return; } - void AllocateMemoryForNewFeatures( - const std::vector*>& value_ptr_list) override { - return; - } - - void AllocateMemoryForNewFeatures( - ValuePtr** value_ptr_list, - int64 num_of_value_ptrs) override { - return; - } + virtual void Import(K key, V* value, + int64 freq, int64 version, + int emb_index) override {} Status GetSnapshot(std::vector* key_list, - std::vector*>* value_ptr_list) override { + std::vector* value_ptr_list) override { mutex_lock l(Storage::mu_); return kv_->GetSnapshot(key_list, value_ptr_list); } @@ -247,7 +231,7 @@ class SingleTierStorage : public Storage { ShrinkArgs& shrink_args, int64 value_len, V* default_value) override { - std::vector*> value_ptr_list; + std::vector value_ptr_list; std::vector key_list_tmp; TF_CHECK_OK(kv_->GetSnapshot( &key_list_tmp, &value_ptr_list)); @@ -255,30 +239,16 @@ class SingleTierStorage : public Storage { if (emb_config.is_primary()) { Shrink(key_list_tmp, value_ptr_list, shrink_args, value_len); } - TF_CHECK_OK((Storage::SaveToCheckpoint( tensor_name, writer, emb_config, value_len, default_value, key_list_tmp, - value_ptr_list))); + value_ptr_list, + SingleTierStorage::feat_desc_))); return Status::OK(); } - void SetAllocLen(int64 value_len, int slot_num) override { - while (Storage::flag_.test_and_set(std::memory_order_acquire)); - // The start address of every slot should be aligned to 16 bytes, - // otherwise a coredump will happen in the ApplyOp. - Storage::alloc_len_ = Storage::ComputeAllocLen(value_len); - - int64 temp = Storage::alloc_len_ * slot_num; - if (temp > Storage::total_dims_) { - Storage::total_dims_ = temp; - SetTotalDims(Storage::total_dims_); - } - Storage::flag_.clear(std::memory_order_release); - } - bool IsMultiLevel() override { return false; } @@ -299,16 +269,22 @@ class SingleTierStorage : public Storage { LOG(FATAL) << "Unsupport Schedule in SingleTierStorage."; } + void UpdateValuePtr(K key, void* new_value_ptr, + void* old_value_ptr) override { + kv_->UpdateValuePtr(key, new_value_ptr, old_value_ptr); + } + protected: - virtual void SetTotalDims(int64 total_dims) = 0; + virtual void* CreateValuePtr() { + return feat_desc_->Allocate(); + } - virtual ValuePtr* CreateValuePtr(int64 size) { - return layout_creator_->Create(alloc_, size); + virtual void DestroyValuePtr(void* value_ptr) { + feat_desc_->Deallocate(value_ptr); } - virtual void DestroyValuePtr(ValuePtr* value_ptr) { - value_ptr->Destroy(alloc_); - delete value_ptr; + FeatureDescriptor* feature_descriptor() { + return feat_desc_; } protected: virtual Status RestoreFeatures(int64 key_num, int bucket_num, int64 partition_id, @@ -324,7 +300,7 @@ class SingleTierStorage : public Storage { } virtual void Shrink(std::vector& key_list, - std::vector*>& value_ptr_list, + std::vector& value_ptr_list, ShrinkArgs& shrink_args, int64 value_len) { mutex_lock l(Storage::mu_); @@ -339,31 +315,40 @@ class SingleTierStorage : public Storage { KVInterface* kv_; ShrinkPolicy* shrink_policy_; Allocator* alloc_; - LayoutCreator* layout_creator_; + FeatureDescriptor* feat_desc_; }; template class DramStorage : public SingleTierStorage { public: - DramStorage(const StorageConfig& sc, Allocator* alloc, - LayoutCreator* lc, - KVInterface* kv) - : SingleTierStorage(sc, alloc, kv, lc) {} + DramStorage(const StorageConfig& sc, + FeatureDescriptor* feat_desc) + : SingleTierStorage(sc, new LocklessHashMap(feat_desc), feat_desc) {} ~DramStorage() override {} Status BatchCommit(const std::vector& keys, - const std::vector*>& value_ptrs) { + const std::vector& value_ptrs) { return SingleTierStorage::kv_->BatchCommit(keys, value_ptrs); } - Status TryInsert(K key, ValuePtr* value_ptr) { + Status TryInsert(K key, void* value_ptr) { return SingleTierStorage::kv_->Insert(key, value_ptr); } - Status Commit(K keys, const ValuePtr* value_ptr) override{ + Status Commit(K keys, const void* value_ptr) override{ return SingleTierStorage::kv_->Commit(keys, value_ptr); } + + void Import(K key, V* value, + int64 freq, int64 version, + int emb_index) override { + void* value_ptr = SingleTierStorage::feat_desc_->Allocate(freq); + SingleTierStorage::Insert(key, &value_ptr); + SingleTierStorage::feat_desc_->SetValue(value_ptr, emb_index, value); + SingleTierStorage::feat_desc_->SetFreq(value_ptr, freq); + SingleTierStorage::feat_desc_->UpdateVersion(value_ptr, version); + } TF_DISALLOW_COPY_AND_ASSIGN(DramStorage); public: @@ -375,12 +360,8 @@ class DramStorage : public SingleTierStorage { friend class HbmDramSsdStorage; #endif protected: - void SetTotalDims(int64 total_dims) override { - SingleTierStorage::kv_->SetTotalDims(total_dims); - } - void Shrink(std::vector& key_list, - std::vector*>& value_ptr_list, + std::vector& value_ptr_list, ShrinkArgs& shrink_args, int64 value_len) override { SingleTierStorage::Shrink( @@ -395,9 +376,10 @@ class DramStorage : public SingleTierStorage { template class HbmStorage : public SingleTierStorage { public: - HbmStorage(const StorageConfig& sc, Allocator* alloc, - LayoutCreator* lc) : SingleTierStorage( - sc, alloc, new GPUHashMapKV(sc.embedding_config, alloc), lc) { + HbmStorage(const StorageConfig& sc, Allocator* gpu_allocator, + FeatureDescriptor* feat_desc) : SingleTierStorage( + sc, new GPUHashMapKV( + sc.embedding_config, gpu_allocator), feat_desc) { } ~HbmStorage() override {} @@ -488,48 +470,27 @@ class HbmStorage : public SingleTierStorage { gpu_kv->Import(key_import, value_import, device, emb_config); return Status::OK(); } - - void SetTotalDims(int64 total_dims) override {} }; template class HbmStorageWithCpuKv: public SingleTierStorage { public: - HbmStorageWithCpuKv(const StorageConfig& sc, Allocator* alloc, - LayoutCreator* lc) : SingleTierStorage( - sc, alloc, new LocklessHashMap(), lc) { + HbmStorageWithCpuKv(const StorageConfig& sc, + FeatureDescriptor* feat_desc) : SingleTierStorage( + sc, new LocklessHashMap(feat_desc), feat_desc) { } ~HbmStorageWithCpuKv() override {} - void Insert(K key, ValuePtr* value_ptr) override { - do { - Status s = SingleTierStorage::kv_->Insert(key, value_ptr); - if (s.ok()) { - break; - } else { - value_ptr->Destroy(SingleTierStorage::alloc_); - delete value_ptr; - } - } while (!(SingleTierStorage::kv_->Lookup(key, &value_ptr)).ok()); - } - - void Insert(K key, ValuePtr** value_ptr, - size_t alloc_len, bool to_dram = false) override { - SingleTierStorage::Insert(key, value_ptr, alloc_len, to_dram); - } - - Status TryInsert(K key, ValuePtr* value_ptr) { + Status TryInsert(K key, void* value_ptr) { return SingleTierStorage::kv_->Insert(key, value_ptr); } public: friend class HbmDramStorage; friend class HbmDramSsdStorage; protected: - void SetTotalDims(int64 total_dims) override {} - void Shrink(std::vector& key_list, - std::vector*>& value_ptr_list, + std::vector& value_ptr_list, ShrinkArgs& shrink_args, int64 value_len) override { SingleTierStorage::Shrink( @@ -544,28 +505,25 @@ class HbmStorageWithCpuKv: public SingleTierStorage { template class PmemMemkindStorage : public SingleTierStorage { public: - PmemMemkindStorage(const StorageConfig& sc, Allocator* alloc, - LayoutCreator* lc) : SingleTierStorage( - sc, alloc, new LocklessHashMap(), lc) { + PmemMemkindStorage(const StorageConfig& sc, + FeatureDescriptor* feat_desc) : SingleTierStorage( + sc, new LocklessHashMap(feat_desc), feat_desc) { } ~PmemMemkindStorage() override {} TF_DISALLOW_COPY_AND_ASSIGN(PmemMemkindStorage); - - protected: - void SetTotalDims(int64 total_dims) override {} }; template class PmemLibpmemStorage : public SingleTierStorage { public: - PmemLibpmemStorage(const StorageConfig& sc, Allocator* alloc, - LayoutCreator* lc) : SingleTierStorage( - sc, alloc, new LocklessHashMap(), lc) { + PmemLibpmemStorage(const StorageConfig& sc, + FeatureDescriptor* feat_desc) : SingleTierStorage( + sc, new LocklessHashMap(feat_desc), feat_desc) { } ~PmemLibpmemStorage() override {} - Status Commit(K keys, const ValuePtr* value_ptr) { + Status Commit(K keys, const void* value_ptr) { return SingleTierStorage::kv_->Commit(keys, value_ptr); } @@ -573,10 +531,8 @@ class PmemLibpmemStorage : public SingleTierStorage { protected: friend class DramPmemStorage; - void SetTotalDims(int64 total_dims) override {} - void Shrink(std::vector& key_list, - std::vector*>& value_ptr_list, + std::vector& value_ptr_list, ShrinkArgs& shrink_args, int64 value_len) override { SingleTierStorage::Shrink( @@ -590,15 +546,15 @@ class PmemLibpmemStorage : public SingleTierStorage { template class LevelDBStore : public SingleTierStorage { public: - LevelDBStore(const StorageConfig& sc, Allocator* alloc, - LayoutCreator* lc) : SingleTierStorage( - sc, alloc, new LevelDBKV(sc.path), lc) { + LevelDBStore(const StorageConfig& sc, + FeatureDescriptor* feat_desc) : SingleTierStorage( + sc, new LevelDBKV(sc.path, feat_desc), feat_desc) { } ~LevelDBStore() override {} TF_DISALLOW_COPY_AND_ASSIGN(LevelDBStore); - Status Commit(K keys, const ValuePtr* value_ptr) { + Status Commit(K keys, const void* value_ptr) { return SingleTierStorage::kv_->Commit(keys, value_ptr); } @@ -608,29 +564,25 @@ class LevelDBStore : public SingleTierStorage { LevelDBKV* leveldb_kv = reinterpret_cast*>(SingleTierStorage::kv_); return new DBValueIterator( - key_list, emb_index, value_len, leveldb_kv); + key_list, emb_index, value_len, + leveldb_kv, SingleTierStorage::feat_desc_); } public: friend class DramLevelDBStore; - - protected: - void SetTotalDims(int64 total_dims) override { - SingleTierStorage::kv_->SetTotalDims(total_dims); - } }; template class SsdHashStorage : public SingleTierStorage { public: - SsdHashStorage(const StorageConfig& sc, Allocator* alloc, - LayoutCreator* lc) : SingleTierStorage( - sc, alloc, new SSDHashKV(sc.path, alloc), lc) { + SsdHashStorage(const StorageConfig& sc, + FeatureDescriptor* feat_desc) : SingleTierStorage( + sc, new SSDHashKV(sc.path, feat_desc), feat_desc) { } ~SsdHashStorage() override {} TF_DISALLOW_COPY_AND_ASSIGN(SsdHashStorage); - Status Commit(K keys, const ValuePtr* value_ptr) { + Status Commit(K keys, const void* value_ptr) { return SingleTierStorage::kv_->Commit(keys, value_ptr); } @@ -691,8 +643,9 @@ class SsdHashStorage : public SingleTierStorage { #endif protected: - void SetTotalDims(int64 total_dims) override { - SingleTierStorage::kv_->SetTotalDims(total_dims); + void Init() override { + dynamic_cast*>( + SingleTierStorage::kv_)->Init(); } }; } // embedding diff --git a/tensorflow/core/framework/embedding/ssd_hash_kv.h b/tensorflow/core/framework/embedding/ssd_hash_kv.h index 8040421233e..f51c6904a50 100644 --- a/tensorflow/core/framework/embedding/ssd_hash_kv.h +++ b/tensorflow/core/framework/embedding/ssd_hash_kv.h @@ -25,17 +25,12 @@ limitations under the License. #include "tensorflow/core/framework/embedding/ssd_record_descriptor.h" #include "tensorflow/core/framework/embedding/emb_file_creator.h" #include "tensorflow/core/framework/embedding/kv_interface.h" -#include "tensorflow/core/framework/embedding/value_ptr.h" #include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/lib/core/threadpool.h" #include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/util/env_var.h" namespace tensorflow { - -template -class ValuePtr; - namespace embedding { class EmbPosition { public: @@ -115,55 +110,6 @@ class SSDIterator { } } - virtual void Key(char* val, int64 dim) { - int64 f_id = file_id_vec_[curr_file_]; - memcpy((char*)val, &((file_map_[f_id])[curr_vec_].first), dim); - } - - virtual void Value(char* val, int64 dim, int64 value_offset) { - int64 f_id = file_id_vec_[curr_file_]; - EmbPosition* posi = (file_map_[f_id])[curr_vec_].second; - if (posi->flushed_) { - emb_files_[posi->version_]-> - ReadWithMemcpy(val, dim, - posi->offset_ + value_offset + sizeof(FixedLengthHeader)); - } else { - memcpy(val, write_buffer_ + posi->buffer_offset_ + - value_offset + sizeof(FixedLengthHeader), dim); - } - } - - virtual void Freq(char* val, int64 dim) { - int64 f_id = file_id_vec_[curr_file_]; - EmbPosition* posi = (file_map_[f_id])[curr_vec_].second; - if (posi->flushed_) { - emb_files_[posi->version_]-> - ReadWithMemcpy(val, sizeof(FixedLengthHeader), - posi->offset_); - } else { - memcpy(val, write_buffer_ + posi->buffer_offset_, - sizeof(FixedLengthHeader)); - } - *((int64*)val) = - reinterpret_cast(val)->GetFreqCounter(); - } - - virtual void Version(char* val, int64 dim) { - int64 f_id = file_id_vec_[curr_file_]; - EmbPosition* posi = (file_map_[f_id])[curr_vec_].second; - - if (posi->flushed_) { - emb_files_[posi->version_]-> - ReadWithMemcpy(val, sizeof(FixedLengthHeader), - posi->offset_); - } else { - memcpy(val, write_buffer_ + posi->buffer_offset_, - sizeof(FixedLengthHeader)); - } - *((int64*)val) = - reinterpret_cast(val)->GetGlobalStep(); - } - virtual K Key() { int64 f_id = file_id_vec_[curr_file_]; return (file_map_[f_id])[curr_vec_].first; @@ -192,8 +138,9 @@ class SSDIterator { template class SSDHashKV : public KVInterface { public: - explicit SSDHashKV(const std::string& path, Allocator* alloc) - : alloc_(alloc) { + explicit SSDHashKV(const std::string& path, + FeatureDescriptor* feat_desc) + : feat_desc_(feat_desc) { path_ = io::JoinPath( path, "ssd_kv_" + std::to_string(Env::Default()->NowMicros()) + "_"); hash_map_.max_load_factor(0.8); @@ -205,9 +152,6 @@ class SSDHashKV : public KVInterface { evict_file_set_.set_counternum(16); evict_file_set_.set_deleted_key(DELETED_KEY); - new_value_ptr_fn_ = [this](size_t size) { - return new NormalContiguousValuePtr(alloc_, size); - }; is_async_compaction_ = true; TF_CHECK_OK(ReadBoolFromEnvVar("TF_SSDHASH_ASYNC_COMPACTION", true, &is_async_compaction_)); @@ -224,7 +168,7 @@ class SSDHashKV : public KVInterface { "Use Sync Compactor in SSDHashKV of Multi-tier Embedding Storage!"; compaction_fn_ = [this](){Compaction();}; check_buffer_fn_ = [this](){CheckBuffer();}; - save_kv_fn_ = [this](K key, const ValuePtr* value_ptr, + save_kv_fn_ = [this](K key, const void* value_ptr, bool is_compaction=false) { SaveKV(key, value_ptr, is_compaction); }; @@ -233,7 +177,7 @@ class SSDHashKV : public KVInterface { "Use Async Compactor in SSDHashKV of Multi-tier Embedding Storage!"; compaction_fn_ = [](){}; check_buffer_fn_ = [this](){CheckBufferAsync();}; - save_kv_fn_ = [this](K key, const ValuePtr* value_ptr, + save_kv_fn_ = [this](K key, const void* value_ptr, bool is_compaction=false) { SaveKVAsync(key, value_ptr, is_compaction); }; @@ -244,9 +188,8 @@ class SSDHashKV : public KVInterface { } } - void SetTotalDims(int total_dims) override { - total_dims_ = total_dims; - val_len_ = sizeof(FixedLengthHeader) + total_dims_ * sizeof(V); + void Init() { + val_len_ = feat_desc_->data_bytes(); max_app_count_ = BUFFER_SIZE / val_len_; write_buffer_ = new char[BUFFER_SIZE]; unsigned int max_key_count = 1 + int(BUFFER_SIZE / val_len_); @@ -334,18 +277,18 @@ class SSDHashKV : public KVInterface { return Status::OK(); } - Status Lookup(K key, ValuePtr** value_ptr) override { + Status Lookup(K key, void** value_ptr) override { auto iter = hash_map_.find_wait_free(key); if (iter.first == EMPTY_KEY) { return errors::NotFound("Unable to find Key: ", key, " in SSDHashKV."); } else { - ValuePtr* val = new_value_ptr_fn_(total_dims_); + void* val = feat_desc_->Allocate(); EmbPosition* posi = iter.second; if (posi->flushed_) { - emb_files_[posi->version_]->Read((char*)(val->GetPtr()), + emb_files_[posi->version_]->Read((char*)val, val_len_, posi->offset_); } else { - memcpy((char*)val->GetPtr(), + memcpy((char*)val, write_buffer_ + posi->buffer_offset_, val_len_); } *value_ptr = val; @@ -363,17 +306,17 @@ class SSDHashKV : public KVInterface { } } - Status Insert(K key, const ValuePtr* value_ptr) override { + Status Insert(K key, const void* value_ptr) override { return Status::OK(); } Status BatchInsert(const std::vector& keys, - const std::vector*>& value_ptrs) override { + const std::vector& value_ptrs) override { return BatchCommit(keys, value_ptrs); } Status BatchCommit(const std::vector& keys, - const std::vector*>& value_ptrs) override { + const std::vector& value_ptrs) override { compaction_fn_(); __sync_fetch_and_add(&total_app_count_, keys.size()); for (int i = 0; i < keys.size(); i++) { @@ -384,7 +327,7 @@ class SSDHashKV : public KVInterface { return Status::OK(); } - Status Commit(K key, const ValuePtr* value_ptr) override { + Status Commit(K key, const void* value_ptr) override { compaction_fn_(); __sync_fetch_and_add(&total_app_count_, 1); check_buffer_fn_(); @@ -402,7 +345,7 @@ class SSDHashKV : public KVInterface { } Status GetSnapshot(std::vector* key_list, - std::vector*>* value_ptr_list) override { + std::vector* value_ptr_list) override { return Status::OK(); } @@ -467,8 +410,8 @@ class SSDHashKV : public KVInterface { int64 Size() const override { return hash_map_.size_lockless(); } - void FreeValuePtr(ValuePtr* value_ptr) override { - delete value_ptr; + void FreeValuePtr(void* value_ptr) override { + feat_desc_->Deallocate(value_ptr); } private: @@ -555,10 +498,10 @@ class SSDHashKV : public KVInterface { } void AppendToWriteBuffer(size_t curr_buffer_offset, K key, - const ValuePtr* value_ptr) { + const void* value_ptr) { current_offset_ += val_len_; memcpy(write_buffer_ + curr_buffer_offset, - (char*)value_ptr->GetPtr(), val_len_); + (char*)value_ptr, val_len_); key_buffer_[buffer_cur_] = key; ++buffer_cur_; } @@ -582,7 +525,7 @@ class SSDHashKV : public KVInterface { return flag; } - void SaveKV(K key, const ValuePtr* value_ptr, + void SaveKV(K key, const void* value_ptr, bool is_compaction = false) { size_t curr_buffer_offset = buffer_cur_ * val_len_; EmbPosition* ep = new EmbPosition(current_offset_, current_version_, @@ -608,7 +551,7 @@ class SSDHashKV : public KVInterface { } } - void SaveKVAsync(K key, const ValuePtr* value_ptr, + void SaveKVAsync(K key, const void* value_ptr, bool is_compaction = false) { size_t curr_buffer_offset = buffer_cur_ * val_len_; EmbPosition* ep = new EmbPosition(current_offset_, evict_version_, @@ -681,21 +624,21 @@ class SSDHashKV : public KVInterface { } void MoveToNewFile() { - ValuePtr* val = new_value_ptr_fn_(total_dims_); + void* val = feat_desc_->Allocate(); for (auto it : evict_file_map_) { EmbFile* file = emb_files_[it.first]; total_app_count_ -= file->InvalidCount(); file->MapForRead(); for (auto it_vec : it.second) { EmbPosition* posi = it_vec.second; - file->ReadWithMemcpy((char*)(val->GetPtr()), val_len_, + file->ReadWithMemcpy((char*)val, val_len_, posi->offset_); CheckBuffer(); SaveKV(it_vec.first, val, true); } file->UnmapForRead(); } - delete val; + feat_desc_->Deallocate(val); } void MoveToNewFileAsync() { @@ -825,11 +768,10 @@ class SSDHashKV : public KVInterface { char* write_buffer_ = nullptr; K* key_buffer_ = nullptr; bool is_async_compaction_; - Allocator* alloc_ = nullptr; + FeatureDescriptor* feat_desc_; int total_dims_; std::string path_; - std::function*(size_t)> new_value_ptr_fn_; typedef google::dense_hash_map_lockless LockLessHashMap; LockLessHashMap hash_map_; @@ -857,7 +799,7 @@ class SSDHashKV : public KVInterface { std::function compaction_fn_; std::function check_buffer_fn_; - std::function*, bool)> save_kv_fn_; + std::function save_kv_fn_; EmbFileCreator* emb_file_creator_ = nullptr; }; template diff --git a/tensorflow/core/framework/embedding/storage.h b/tensorflow/core/framework/embedding/storage.h index bb949183492..1ffb435054b 100644 --- a/tensorflow/core/framework/embedding/storage.h +++ b/tensorflow/core/framework/embedding/storage.h @@ -40,9 +40,6 @@ using GPUDevice = Eigen::GpuDevice; template class CheckpointLoader; -template -class ValuePtr; - template class EmbeddingVar; @@ -57,9 +54,6 @@ class BundleReader; template struct EmbeddingVarContext; -namespace { - const int kSavedPartitionNum = 1000; -} namespace embedding { template @@ -67,42 +61,40 @@ class Storage { friend class CheckpointLoader; public: explicit Storage(const StorageConfig& storage_config) - : storage_config_(storage_config) {} + : storage_config_(storage_config) { + initialize_value_.resize(storage_config.embedding_config.slot_num + 1); + } virtual ~Storage() {} TF_DISALLOW_COPY_AND_ASSIGN(Storage); - virtual Status Get(K key, ValuePtr** value_ptr) = 0; + virtual Status Get(K key, void** value_ptr) = 0; #if GOOGLE_CUDA virtual void BatchGet(const EmbeddingVarContext& ctx, const K* key, - ValuePtr** value_ptr_list, - int64 num_of_keys, - int64 value_len) {} + void** value_ptr_list, + int64 num_of_keys) {} virtual void BatchGetOrCreate( const EmbeddingVarContext& ctx, const K* key, - ValuePtr** value_ptr_list, + void** value_ptr_list, int64 num_of_keys, int64 value_len, std::vector>& not_found_cursor_list) {} #endif //GOOGLE_CUDA virtual Status Contains(K key) = 0; - virtual void Insert(K key, ValuePtr** value_ptr, - size_t alloc_len, bool to_dram = false) = 0; - virtual void Insert(K key, ValuePtr* value_ptr) = 0; - virtual void SetAllocLen(int64 value_len, int slot_num) = 0; + virtual void CreateAndInsert(K key, void** value_ptr, + bool to_dram=false) = 0; + virtual void Insert(K key, void** value_ptr) = 0; + virtual void Init() {} virtual void SetValueLen(int64 value_len) {} - virtual Status GetOrCreate(K key, ValuePtr** value_ptr, - size_t size) = 0; - virtual Status GetOrCreate(K key, ValuePtr** value_ptr, - size_t size, CopyBackFlag &need_copyback) = 0; + virtual Status GetOrCreate(K key, void** value_ptr) = 0; virtual int LookupTier(K key) const = 0; virtual Status Remove(K key) = 0; virtual int64 Size() const = 0; virtual int64 Size(int level) const = 0; virtual Status GetSnapshot(std::vector* key_list, - std::vector*>* value_ptr_list) = 0; + std::vector* value_ptr_list) = 0; virtual Status Save( const string& tensor_name, const string& prefix, @@ -113,7 +105,7 @@ class Storage { V* default_value) = 0; virtual Status BatchCommit(const std::vector& keys, - const std::vector*>& value_ptrs) = 0; + const std::vector& value_ptrs) = 0; virtual Status Eviction(K* evict_ids, int64 evict_size) = 0; @@ -121,7 +113,7 @@ class Storage { int total, const K* keys, const std::list& copyback_cursor, V** memcpy_address, size_t value_len, - ValuePtr **gpu_value_ptrs, + void **gpu_value_ptrs, V* memcpy_buffer_gpu, se::Stream* compute_stream, EventMgr* event_mgr, @@ -149,25 +141,11 @@ class Storage { Allocator* alloc, int64 value_len, int64 block_size) = 0; - virtual void AllocateMemoryForNewFeatures( - const std::vector*>& value_ptr_list) = 0; - virtual void AllocateMemoryForNewFeatures( - ValuePtr** value_ptr_list, int64 num_of_value_ptrs) = 0; inline mutex* get_mutex() { return &mu_; } inline int64 GetAllocLen() { return alloc_len_; } inline int64 GetOffset(int64 index) { return alloc_len_ * index; } inline int64 GetTotalDims() { return total_dims_; } - inline int64 ComputeAllocLen(int64 value_len) { - if (LayoutType::COMPACT == storage_config_.layout_type) { - return value_len; - } else { - return (value_len * sizeof(V) % 16 == 0) - ? value_len - : value_len + (16 - (sizeof(V) * value_len) % 16) / sizeof(V); - } - } - inline LayoutType GetLayoutType() { return storage_config_.layout_type; } inline embedding::StorageType GetStorageType() { return storage_config_.type; } inline std::string GetStoragePath() { return storage_config_.path; } inline embedding::CacheStrategy @@ -183,7 +161,7 @@ class Storage { } inline void Insert(const std::vector& keys, - ValuePtr** value_ptrs) { + void** value_ptrs) { for (size_t i = 0; i < keys.size(); i++) { Insert(keys[i], value_ptrs[i]); } @@ -211,6 +189,13 @@ class Storage { reset_version, reader); restorer.RestoreCkpt(emb_config, device); }; + + virtual void UpdateValuePtr(K key, void* new_value_ptr, + void* old_value_ptr) = 0; + + virtual void Import(K key, V* value, + int64 freq, int64 version, + int emb_index) = 0; protected: virtual Status RestoreFeatures(int64 key_num, int bucket_num, int64 partition_id, @@ -227,12 +212,7 @@ class Storage { const std::string& ssd_emb_file_name, EmbeddingVar* ev, RestoreSSDBuffer& restore_buff) { - int64 alloc_len = Storage::ComputeAllocLen(value_len); - auto* alloc = ev->GetAllocator(); for (int64 i = 0; i < restore_buff.num_of_keys; i++) { - ValuePtr* value_ptr = nullptr; - ev->LookupOrCreateKey(restore_buff.key_list_buf[i], &value_ptr); - value_ptr->SetInitialized(emb_index); int64 file_id = restore_buff.key_file_id_list_buf[i]; int64 key_offset = restore_buff.key_offset_list_buf[i]; // Read data from embedding files on SSD. Data are stored in @@ -240,32 +220,29 @@ class Storage { std::stringstream ss; ss << ssd_emb_file_name << "/" << file_id << ".emb"; int fd = open(ss.str().data(), O_RDONLY); + EmbeddingConfig& emb_config = storage_config_.embedding_config; + FeatureDescriptor normal_feat_desc( + emb_config.block_num, emb_config.slot_num + 1, + ev_allocator(), StorageType::DRAM, true, + true, {false, 0}); + void* value_ptr = normal_feat_desc.Allocate(); char* file_addr = (char*)mmap(nullptr, - sizeof(FixedLengthHeader) + - alloc_len * sizeof(V) * (emb_slot_num + 1) + + normal_feat_desc.data_bytes() + key_offset, PROT_READ, MAP_PRIVATE, fd, 0); - - NormalContiguousValuePtr tmp_value_ptr(alloc, - alloc_len * (emb_slot_num + 1)); - void* ptr = tmp_value_ptr.GetPtr(); - memcpy(ptr, file_addr + key_offset, - sizeof(FixedLengthHeader) + - alloc_len * sizeof(V) * (emb_slot_num + 1)); + memcpy(value_ptr, file_addr + key_offset, + normal_feat_desc.data_bytes()); munmap(file_addr, - sizeof(FixedLengthHeader) + - alloc_len * sizeof(V) * (emb_slot_num + 1) + + normal_feat_desc.data_bytes() + key_offset); close(fd); // Copy Data to ValuePtr, data of slots are set by primary here. - for (int j = 0; j < emb_slot_num + 1; j++) { - V* value = tmp_value_ptr.GetValue(j, alloc_len * j); - if (value != nullptr) { - value_ptr->GetOrAllocate(alloc, value_len, value, j, alloc_len * j); - } - } - value_ptr->SetFreq(tmp_value_ptr.GetFreq()); - value_ptr->SetStep(tmp_value_ptr.GetStep()); + int64 import_freq = normal_feat_desc.GetFreq(value_ptr); + int64 import_version = normal_feat_desc.GetVersion(value_ptr); + V* value = normal_feat_desc.GetEmbedding(value_ptr, emb_index); + Import(restore_buff.key_list_buf[i], value, + import_freq, import_version, emb_index); + normal_feat_desc.Deallocate(value_ptr); } return Status::OK(); } @@ -273,10 +250,11 @@ class Storage { private: void GeneratePartitionedCkptData( const std::vector& key_list, - const std::vector*>& value_ptr_list, + const std::vector& value_ptr_list, EmbeddingVarCkptData* partitioned_ckpt_data, const EmbeddingConfig& emb_config, - V* default_value) { + V* default_value, + FeatureDescriptor* feat_desc) { std::vector> ev_ckpt_data_parts(kSavedPartitionNum); @@ -293,7 +271,43 @@ class Storage { ev_ckpt_data_parts[part_id].Emplace( key_list[i], value_ptr_list[i], emb_config, default_value, - GetOffset(emb_config.emb_index), + feat_desc, + is_save_freq, + is_save_version, + save_unfiltered_features); + break; + } + } + } + + partitioned_ckpt_data->SetWithPartition(ev_ckpt_data_parts); + } + + void GeneratePartitionedCkptData( + const std::vector& key_list, + const std::vector& value_ptr_list, + EmbeddingVarCkptData* partitioned_ckpt_data, + const EmbeddingConfig& emb_config, + V* default_value, + const std::vector*>& feat_desc) { + std::vector> + ev_ckpt_data_parts(kSavedPartitionNum); + + bool save_unfiltered_features = true; + TF_CHECK_OK(ReadBoolFromEnvVar( + "TF_EV_SAVE_FILTERED_FEATURES", true, &save_unfiltered_features)); + + bool is_save_freq = emb_config.is_save_freq(); + bool is_save_version = emb_config.is_save_version(); + + for (int64 i = 0; i < key_list.size(); i++) { + for (int part_id = 0; part_id < kSavedPartitionNum; part_id++) { + if (key_list[i] % kSavedPartitionNum == part_id) { + int feat_desc_type = (int64)value_ptr_list[i] >> kDramFlagOffset; + ev_ckpt_data_parts[part_id].Emplace( + key_list[i], value_ptr_list[i], + emb_config, default_value, + feat_desc[feat_desc_type], is_save_freq, is_save_version, save_unfiltered_features); @@ -333,12 +347,33 @@ class Storage { int64 value_len, V* default_value, const std::vector& key_list, - const std::vector*>& value_ptr_list, + const std::vector& value_ptr_list, + FeatureDescriptor* feat_desc, + ValueIterator* value_iter = nullptr) { + EmbeddingVarCkptData partitioned_ckpt_data; + GeneratePartitionedCkptData(key_list, value_ptr_list, + &partitioned_ckpt_data, emb_config, + default_value, feat_desc); + Status s = + partitioned_ckpt_data.ExportToCkpt( + tensor_name, writer, value_len, value_iter); + return Status::OK(); + } + + Status SaveToCheckpoint( + const string& tensor_name, + BundleWriter* writer, + const EmbeddingConfig& emb_config, + int64 value_len, + V* default_value, + const std::vector& key_list, + const std::vector& value_ptr_list, + const std::vector*>& feat_desc, ValueIterator* value_iter = nullptr) { EmbeddingVarCkptData partitioned_ckpt_data; GeneratePartitionedCkptData(key_list, value_ptr_list, &partitioned_ckpt_data, emb_config, - default_value); + default_value, feat_desc); Status s = partitioned_ckpt_data.ExportToCkpt( tensor_name, writer, value_len, value_iter); @@ -366,6 +401,7 @@ class Storage { mutex mu_; std::atomic_flag flag_ = ATOMIC_FLAG_INIT; + std::vector initialize_value_; }; } // embedding } // tensorflow diff --git a/tensorflow/core/framework/embedding/storage_config.h b/tensorflow/core/framework/embedding/storage_config.h index 85e44879dcb..23babc9ef08 100644 --- a/tensorflow/core/framework/embedding/storage_config.h +++ b/tensorflow/core/framework/embedding/storage_config.h @@ -17,13 +17,11 @@ limitations under the License. #include "tensorflow/core/framework/embedding/cache.h" #include "tensorflow/core/framework/embedding/embedding_config.h" -#include "tensorflow/core/framework/embedding/value_ptr.h" namespace tensorflow { namespace embedding { struct StorageConfig { StorageConfig() : type(StorageType::DEFAULT), path(""), - layout_type(LayoutType::NORMAL), cache_strategy(CacheStrategy::LFU) { size = {1<<30,1<<30,1<<30,1<<30}; } @@ -31,32 +29,14 @@ struct StorageConfig { StorageConfig(StorageType t, const std::string& p, const std::vector& s, - const std::string& layout, const EmbeddingConfig& ec, const CacheStrategy cache_strategy_ = CacheStrategy::LFU) - : type(t), - path(p), - embedding_config(ec), - cache_strategy(cache_strategy_) { - if ("normal" == layout) { - layout_type = LayoutType::NORMAL; - } else if ("light" == layout) { - layout_type = LayoutType::LIGHT; - } else if ("normal_contiguous" == layout){ - layout_type = LayoutType::NORMAL_CONTIGUOUS; - } else if ("normal_contiguous_gpu" == layout){ - layout_type = LayoutType::NORMAL_CONTIGUOUS_GPU; - } else if ("compact" == layout){ - layout_type = LayoutType::COMPACT; - } else { - LOG(WARNING) << "Unknown layout: " - << layout << ", use LayoutType::NORMAL by default."; - layout_type = LayoutType::NORMAL; - } - size = s; - } + : type(t), + path(p), + size(s), + embedding_config(ec), + cache_strategy(cache_strategy_) {} StorageType type; - LayoutType layout_type; std::string path; std::vector size; CacheStrategy cache_strategy; diff --git a/tensorflow/core/framework/embedding/storage_factory.h b/tensorflow/core/framework/embedding/storage_factory.h index 10d2d52b83f..c585b058470 100644 --- a/tensorflow/core/framework/embedding/storage_factory.h +++ b/tensorflow/core/framework/embedding/storage_factory.h @@ -16,7 +16,6 @@ limitations under the License. #define TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_STORAGE_FACTORY_H_ #include "tensorflow/core/framework/embedding/config.pb.h" -#include "tensorflow/core/framework/embedding/layout_creator.h" #include "tensorflow/core/framework/embedding/dram_leveldb_storage.h" #include "tensorflow/core/framework/embedding/dram_pmem_storage.h" #include "tensorflow/core/framework/embedding/dram_ssd_storage.h" @@ -34,50 +33,41 @@ class StorageFactory { public: template static Storage* Create(const StorageConfig& sc, - Allocator* gpu_allocator, const string& name) { - auto layout_creator = LayoutCreatorFactory::Create(sc); - + Allocator* gpu_allocator, FeatureDescriptor* feat_desc, + const string& name) { switch (sc.type) { case StorageType::DRAM: - return new DramStorage(sc, ev_allocator(), - layout_creator, new LocklessHashMap()); + return new DramStorage(sc, feat_desc); case StorageType::PMEM_MEMKIND: - return new PmemMemkindStorage(sc, pmem_allocator(), - layout_creator); + feat_desc->SetAllocator(pmem_allocator()); + return new PmemMemkindStorage(sc, feat_desc); case StorageType::PMEM_LIBPMEM: - return new PmemLibpmemStorage(sc, - experimental_pmem_allocator(sc.path, sc.size[0]), - layout_creator); + feat_desc->SetAllocator( + experimental_pmem_allocator(sc.path, sc.size[0])); + return new PmemLibpmemStorage(sc, feat_desc); case StorageType::DRAM_PMEM: - return new DramPmemStorage(sc, ev_allocator(), - experimental_pmem_allocator(sc.path, sc.size[0]), - layout_creator, name); + return new DramPmemStorage(sc, + feat_desc, name); case StorageType::LEVELDB: case StorageType::DRAM_LEVELDB: - return new DramLevelDBStore(sc, ev_allocator(), - layout_creator, name); + return new DramLevelDBStore(sc, feat_desc, name); case StorageType::SSDHASH: case StorageType::DRAM_SSDHASH: - return new DramSsdHashStorage(sc, ev_allocator(), - layout_creator, name); + return new DramSsdHashStorage(sc, feat_desc, name); case StorageType::HBM: #if GOOGLE_CUDA - return new HbmStorage(sc, gpu_allocator, - layout_creator); + return new HbmStorage(sc, gpu_allocator, feat_desc); #endif // GOOGLE_CUDA case StorageType::HBM_DRAM: #if GOOGLE_CUDA - return new HbmDramStorage(sc, gpu_allocator, - ev_allocator(), layout_creator, name); + return new HbmDramStorage(sc, gpu_allocator, feat_desc, name); #endif // GOOGLE_CUDA case StorageType::HBM_DRAM_SSDHASH: #if GOOGLE_CUDA - return new HbmDramSsdStorage(sc, gpu_allocator, - ev_allocator(), layout_creator, name); + return new HbmDramSsdStorage(sc, gpu_allocator, feat_desc, name); #endif // GOOGLE_CUDA default: - return new DramStorage(sc, ev_allocator(), - layout_creator, new LocklessHashMap()); + return new DramStorage(sc, feat_desc); } } }; diff --git a/tensorflow/core/framework/embedding/value_ptr.h b/tensorflow/core/framework/embedding/value_ptr.h deleted file mode 100644 index ca7d234ed61..00000000000 --- a/tensorflow/core/framework/embedding/value_ptr.h +++ /dev/null @@ -1,647 +0,0 @@ -#ifndef TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_VALUE_PTR_H_ -#define TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_VALUE_PTR_H_ - -#include -#include -#include -#include - -#include "tensorflow/core/framework/typed_allocator.h" -#if GOOGLE_CUDA -#include -#endif // GOOGLE_CUDA - -namespace tensorflow { - -enum class LayoutType { - LIGHT, - NORMAL, - LEVELDB, - NORMAL_CONTIGUOUS, - NORMAL_CONTIGUOUS_GPU, - COMPACT, -}; - -namespace { -constexpr int COLUMN_BITSET_BYTES = 5; -constexpr int COLUMN_BITSET_SIZE = COLUMN_BITSET_BYTES * 8; - -struct MetaHeader { - unsigned char embed_num; - unsigned char value_type; - unsigned char header_size; - unsigned char column_bitset[COLUMN_BITSET_BYTES]; - - static const int kEmbeddingNumStartIndex = 0; - static const int kValueTypeStartIndex = - kEmbeddingNumStartIndex + sizeof(char); - static const int kHeaderSizeStartIndex = - kValueTypeStartIndex + sizeof(char); - static const int kColumnBitsetIndex = - kHeaderSizeStartIndex + sizeof(char); - - inline unsigned int GetEmbeddingNum() { - return (unsigned int) embed_num; - } - - inline void SetEmbeddingNum(size_t s) { - embed_num = (unsigned char)s; - } - - inline std::bitset GetColumnBitset() { - unsigned long meta = ((unsigned long*)this)[0]; - std::bitset bs(meta >> (8 * kColumnBitsetIndex)); - return bs; - } - - inline void SetColumnBitset(const std::bitset& bs, - unsigned int embnum) { - ((unsigned long*)(this))[0] = - (bs.to_ulong() << (8 * kColumnBitsetIndex)) | - (header_size << (8 * kHeaderSizeStartIndex)) | - (value_type << (8 * kValueTypeStartIndex)) | - (embnum << (8 * kEmbeddingNumStartIndex)); - } - - inline unsigned int GetHeaderSize() { - return (unsigned int) header_size; - } - - inline void SetHeaderSize(size_t size) { - header_size = (unsigned char)size; - } - - inline void SetLayoutType(LayoutType vt) { - value_type = (unsigned char)vt; - } - - inline LayoutType GetLayoutType() { - return (LayoutType)value_type; - } -}; - -struct LightHeader { -/*__________________________________________________________________________________________ - | | | | | embedding | slot | - | number of | valueptr | header | each bit a V* | V* | V* | - | embedding | type | size | 1 valid | actually pointer | actually pointer |... - | columns | | | 0 no-valid | by alloctor | by alloctor | - | (8 bits) | (8 bits) | (8 bits) | (40 bits) | (8 bytes) | (8 bytes) | - -------------------------------------------------------------------------------------------- -*/ - MetaHeader meta; - LightHeader() { - memset(this, 0, sizeof(LightHeader)); - meta.SetLayoutType(LayoutType::LIGHT); - meta.SetHeaderSize(sizeof(LightHeader) / sizeof(int64)); - } -}; - -struct NormalHeader { -/*_________________________________________________________________________________________________________________________ - | | | | | | | embedding | slot | - | number of | valueptr | header | each bit a V* | global step | freq counter | V* | V* | - | embedding | type | size | 1 valid | | | actually pointer | actually pointer |... - | columns | | | 0 no-valid | int64 | int64 | by alloctor | by alloctor | - | (8 bits) | (8 bits) | (8 bits) | (40 bits) | (8 bytes) | (8 bytes) | (8 bytes) | (8 bytes) | - -------------------------------------------------------------------------------------------------------------------------- - */ - MetaHeader meta; - int64 global_step; - int64 freq_counter; - - NormalHeader() { - memset(this, 0, sizeof(NormalHeader)); - meta.SetLayoutType(LayoutType::NORMAL); - meta.SetHeaderSize(sizeof(NormalHeader) / sizeof(int64)); - SetGlobalStep(-1); - } - - inline int64 GetGlobalStep() { - return global_step; - } - - inline void SetGlobalStep(int64 gs) { - global_step = gs; - } - - inline int64 GetFreqCounter() { - return freq_counter; - } - - inline void SetFreqCounter(int64 fc) { - freq_counter = fc; - } - - inline void AddFreq() { - __sync_bool_compare_and_swap(&freq_counter, - freq_counter, freq_counter + 1); - } - - inline void AddFreq(int64 count) { - __sync_bool_compare_and_swap(&freq_counter, - freq_counter, freq_counter + count); - } -}; - -struct FixedLengthHeader { -/*_________________________________________________________________________________ - | | | embeddings | - | slotflag + global step | freq counter | V | - | | | actually value | - | int64 | int64 | by alloctor | - | (8 bytes) | (8 bytes) | (4 * slot_num * emb_dim bytes) | - --------------------------------------------------------------------------------- -*/ - int64 global_step; - int64 freq_counter; - - FixedLengthHeader() { - memset(this, 0, sizeof(FixedLengthHeader)); - SetGlobalStep(-1); - } - - inline int64 GetGlobalStep() { - return global_step & 0x0000ffffffffffff; - } - - inline void SetGlobalStep(int64 gs) { - int64 temp = global_step; - temp &= 0xffff000000000000; - gs &= 0x0000ffffffffffff; - temp |= gs; - global_step = temp; - } - - inline void SetInitialized(int64 emb_index) { - int64 temp = 1; - temp = temp << (48 + emb_index); - global_step |= temp; - } - - inline int64 GetFreqCounter() { - return freq_counter; - } - - inline void SetFreqCounter(int64 fc) { - freq_counter = fc; - } - - inline void AddFreq() { - __sync_bool_compare_and_swap(&freq_counter, - freq_counter, freq_counter + 1); - } - - inline void AddFreq(int64 count) { - __sync_bool_compare_and_swap(&freq_counter, - freq_counter, freq_counter + count); - } -}; -} // namespace - -template -class ValuePtr { - public: - virtual ~ValuePtr() {} - - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset) = 0; - - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset, bool &need_initialize) = 0; - - // simple getter for V* and version - virtual V* GetValue(int emb_index, int offset) = 0; - - virtual void Destroy(Allocator* allocator) = 0; - - virtual void* GetPtr() const = 0; - - // Global Step - virtual int64 GetStep() { - LOG(FATAL) << "Unsupport GlobalStep in subclass of ValuePtrBase"; - return 0; - } - - virtual void SetStep(int64 gs) {} - - // Frequency Counter - virtual int64 GetFreq() { - LOG(FATAL) << "Unsupport FreqCounter in subclass of ValuePtrBase"; - return 0; - } - - virtual void SetFreq(int64 freq) {} - - virtual void AddFreq() { - LOG(FATAL) << "Unsupport FreqCounter in subclass of ValuePtrBase"; - } - - virtual void AddFreq(int64 count) { - LOG(FATAL) << "Unsupport FreqCounter in subclass of ValuePtrBase"; - } - - virtual void SetValue(V val, size_t size) { - LOG(FATAL) << "Unsupport SetValue in subclass of ValuePtrBase"; - } - - virtual void SetInitialized(int64 emb_index) { - LOG(FATAL) << "Unsupport SetInitialized in subclass of ValuePtrBase"; - } - - virtual bool SetPtr(V* ptr) { - LOG(FATAL) << "Unsupport SetInitialized in subclass of ValuePtrBase"; - return false; - } - -}; - -template -class LooseValuePtr : public ValuePtr { - public: - virtual ~LooseValuePtr() {} - - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset) { - MetaHeader* meta = (MetaHeader*)ptr_; - unsigned int embnum = (unsigned int)meta->embed_num; - auto metadata = meta->GetColumnBitset(); - - if (!metadata.test(emb_index)) { - while(this->flag_.test_and_set(std::memory_order_acquire)); - metadata = meta->GetColumnBitset(); - if (metadata.test(emb_index)) { - this->flag_.clear(std::memory_order_release); - return ((V**)((int64*)ptr_ + - (unsigned int)meta->header_size))[emb_index]; - } - embnum++ ; - int64 alloc_value_len = value_len; - V* tensor_val = (V*)allocator->AllocateRaw( - Allocator::kAllocatorAlignment, sizeof(V) * alloc_value_len); - memcpy(tensor_val, default_v, sizeof(V) * value_len); - ((V**)((int64*)ptr_ + meta->GetHeaderSize()))[emb_index] = tensor_val; - - metadata.set(emb_index); - // NOTE:if we use ((unsigned long*)((char*)ptr_ + 1))[0] = metadata.to_ulong(); - // the ptr_ will be occaionally modified from 0x7f18700912a0 to 0x700912a0 - // must use ((V**)ptr_ + 1 + 1)[emb_index] = tensor_val; to avoid - meta->SetColumnBitset(metadata, embnum); - this->flag_.clear(std::memory_order_release); - return tensor_val; - } else { - return ((V**)((int64*)ptr_ + meta->GetHeaderSize()))[emb_index]; - } - } - - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset, bool &need_initialize) { - return nullptr; - } - - // simple getter for V* and version - virtual V* GetValue(int emb_index, int offset) { - MetaHeader* meta = (MetaHeader*)ptr_; - auto metadata = meta->GetColumnBitset(); - if (metadata.test(emb_index)) { - return ((V**)((int64*)ptr_ + meta->GetHeaderSize()))[emb_index]; - } else { - return nullptr; - } - } - - virtual void Destroy(Allocator* allocator) { - MetaHeader* meta = (MetaHeader*)ptr_; - unsigned int embnum = (unsigned int)meta->embed_num; - auto metadata = meta->GetColumnBitset(); - for (int i = 0; i< embnum; i++) { - if (metadata.test(i)) { - V* val = ((V**)((int64*)ptr_ + meta->GetHeaderSize()))[i]; - if (val != nullptr) { - allocator->DeallocateRaw(val); - } - } - } - } - - virtual void* GetPtr() const { - return ptr_; - } - - protected: - void* ptr_; - std::atomic_flag flag_ = ATOMIC_FLAG_INIT; -}; - -template -class LightValuePtr : public LooseValuePtr { - public: - LightValuePtr(Allocator* allocator, size_t size) { - this->ptr_ = (void*)malloc( - sizeof(LightHeader) + sizeof(int64) * size); - memset(static_cast(this->ptr_) + sizeof(LightHeader), 0, sizeof(int64) * size); - new ((char*)this->ptr_) LightHeader(); - } - - ~LightValuePtr() { - free(this->ptr_); - } -}; - -template -class NormalValuePtr : public LooseValuePtr { - public: - NormalValuePtr(Allocator* allocator, size_t size) { - this->ptr_ = (void*) malloc(sizeof(NormalHeader) + sizeof(int64) * size); - memset(static_cast(this->ptr_) + sizeof(NormalHeader), 0, sizeof(int64) * size); - new ((char*)this->ptr_) NormalHeader(); - } - - ~NormalValuePtr() { - free(this->ptr_); - } - - int64 GetStep() { - return ((NormalHeader*)this->ptr_)->GetGlobalStep(); - } - - void SetStep(int64 gs) { - ((NormalHeader*)this->ptr_)->SetGlobalStep(gs); - } - - int64 GetFreq() { - return ((NormalHeader*)this->ptr_)->GetFreqCounter(); - } - - void SetFreq(int64 freq) { - ((NormalHeader*)this->ptr_)->SetFreqCounter(freq); - } - - void AddFreq() { - return ((NormalHeader*)this->ptr_)->AddFreq(); - } - - void AddFreq(int64 count) override { - return ((NormalHeader*)this->ptr_)->AddFreq(count); - } -}; - -template -class NormalContiguousValuePtr : public LooseValuePtr { - public: - NormalContiguousValuePtr(Allocator* allocator, size_t size) { - this->ptr_ = allocator->AllocateRaw(Allocator::kAllocatorAlignment, - sizeof(FixedLengthHeader) + sizeof(V) * size); - memset(static_cast(this->ptr_) + sizeof(FixedLengthHeader), 0, sizeof(V) * size); - new ((char*)this->ptr_) FixedLengthHeader(); - } - - ~NormalContiguousValuePtr() { - } - - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset) override { - int8 meta = *((int8*)((char*)this->ptr_ + 6)); - std::bitset<8> bs(meta); - if (!bs.test(emb_index)) { - while(this->flag_.test_and_set(std::memory_order_acquire)); - if (bs.test(emb_index)) { - return ((V*)this->ptr_ + sizeof(FixedLengthHeader) / - sizeof(V) + offset); - } - V* tensor_val = - ((V*)this->ptr_ + sizeof(FixedLengthHeader) / sizeof(V) + offset); - memcpy(tensor_val, default_v, sizeof(V) * value_len); - int8* m = (int8*)((char*)this->ptr_ + 6); - *m |= (1 << emb_index); - this->flag_.clear(std::memory_order_release); - return tensor_val; - } else { - return (V*)this->ptr_ + sizeof(FixedLengthHeader) / - sizeof(V) + offset; - } - } - - virtual V* GetValue(int emb_index, int offset) { - int8 meta = *((int8*)((char*)this->ptr_ + 6)); - std::bitset<8> bs(meta); - if (bs.test(emb_index)) { - return ((V*)this->ptr_ + sizeof(FixedLengthHeader) / - sizeof(V) + offset); - } else { - return nullptr; - } - } - - virtual void Destroy(Allocator* allocator) { - allocator->DeallocateRaw(this->ptr_); - } - - int64 GetStep() { - return ((FixedLengthHeader*)this->ptr_)->GetGlobalStep(); - } - - void SetStep(int64 gs) { - ((FixedLengthHeader*)this->ptr_)->SetGlobalStep(gs); - } - - int64 GetFreq() { - return ((FixedLengthHeader*)this->ptr_)->GetFreqCounter(); - } - - void SetFreq(int64 freq) { - ((FixedLengthHeader*)this->ptr_)->SetFreqCounter(freq); - } - - void AddFreq() { - ((FixedLengthHeader*)this->ptr_)->AddFreq(); - } - - void AddFreq(int64 count) override { - ((FixedLengthHeader*)this->ptr_)->AddFreq(count); - } - - void SetValue(V val, size_t size) { - for (int i = 0; i < size; ++i) { - *((V*)this->ptr_ + sizeof(FixedLengthHeader) / sizeof(V) + i) = val; - } - } -}; - -template -class NormalGPUValuePtr : public LooseValuePtr { - public: - NormalGPUValuePtr(Allocator* allocator, size_t size) { - this->ptr_ = (void*) malloc(sizeof(FixedLengthHeader) + sizeof(V *)); - *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)) = nullptr; - new ((char*)this->ptr_) FixedLengthHeader(); - } - - ~NormalGPUValuePtr() { - free(this->ptr_); - } - -#if GOOGLE_CUDA - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset) override { - int8 meta = *((int8*)((char*)this->ptr_ + 6)); - std::bitset<8> bs(meta); - if (!bs.test(emb_index)) { - while(this->flag_.test_and_set(std::memory_order_acquire)); - if (bs.test(emb_index)) { - return *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)) + offset; - } - V* tensor_val = - *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)) + offset; - cudaMemcpy(tensor_val, default_v, value_len * sizeof(V), - cudaMemcpyDeviceToDevice); - int8* m = (int8*)((char*)this->ptr_ + 6); - *m |= (1 << emb_index); - this->flag_.clear(std::memory_order_release); - } - return *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)) + offset; - } -#endif // GOOGLE_CUDA - - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset, - bool &need_initialize) override { - int8 meta = *((int8*)((char*)this->ptr_ + 6)); - std::bitset<8> bs(meta); - if (!bs.test(emb_index)) { - while(this->flag_.test_and_set(std::memory_order_acquire)); - if (bs.test(emb_index)) { - return *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)) + offset; - } - need_initialize = 1; - this->flag_.clear(std::memory_order_release); - return reinterpret_cast(this); - } - return *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)) + offset; - } - - // simple getter for V* and version - virtual V* GetValue(int emb_index, int offset) { - int8 meta = *((int8*)((char*)this->ptr_ + 6)); - std::bitset<8> bs(meta); - if (bs.test(emb_index)) { - return *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)) + offset; - } else { - return nullptr; - } - } - - virtual void Destroy(Allocator* allocator) { - return; - } - - int64 GetStep() { - return ((FixedLengthHeader*)this->ptr_)->GetGlobalStep(); - } - - void SetStep(int64 gs) { - ((FixedLengthHeader*)this->ptr_)->SetGlobalStep(gs); - } - - int64 GetFreq() { - return ((FixedLengthHeader*)this->ptr_)->GetFreqCounter(); - } - - void SetFreq(int64 freq) { - ((FixedLengthHeader*)this->ptr_)->SetFreqCounter(freq); - } - - void AddFreq() { - ((FixedLengthHeader*)this->ptr_)->AddFreq(); - } - - void AddFreq(int64 count) override { - ((FixedLengthHeader*)this->ptr_)->AddFreq(count); - } - - bool SetPtr(V* ptr) { - while(this->flag_.test_and_set(std::memory_order_acquire)); - V* value_ptr = *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)); - if (value_ptr == nullptr) { - *(V**)((char *)this->ptr_ + sizeof(FixedLengthHeader)) = ptr; - this->flag_.clear(std::memory_order_release); - return true; - } else { - this->flag_.clear(std::memory_order_release); - return false; - } - } - - void SetInitialized(int64 emb_index) { - while(this->flag_.test_and_set(std::memory_order_acquire)); - ((FixedLengthHeader*)this->ptr_)->SetInitialized(emb_index); - this->flag_.clear(std::memory_order_release); - } - -}; - -template -class CompactValuePtr : public ValuePtr { - public: - CompactValuePtr(Allocator* allocator, size_t size) { - memset(static_cast(this->ptr_), 0, sizeof(V) * size + sizeof(int64)); - } - - ~CompactValuePtr() { - } - - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset) override { - int8 meta = *((int8*)((char*)this->ptr_ + 6)); - std::bitset<8> bs(meta); - if (!bs.test(emb_index)) { - while(this->flag_.test_and_set(std::memory_order_acquire)); - if (bs.test(emb_index)) { - return ((V*)this->ptr_ + sizeof(int64) / - sizeof(V) + offset); - } - V* tensor_val = - ((V*)this->ptr_ + sizeof(int64) / sizeof(V) + offset); - memcpy(tensor_val, default_v, sizeof(V) * value_len); - int8* m = (int8*)((char*)this->ptr_ + 6); - *m |= (1 << emb_index); - this->flag_.clear(std::memory_order_release); - return tensor_val; - } else { - return (V*)this->ptr_ + sizeof(int64) / - sizeof(V) + offset; - } - } - - virtual V* GetOrAllocate(Allocator* allocator, int64 value_len, - const V* default_v, int emb_index, int offset, bool &need_initialize) { - return nullptr; - } - - virtual V* GetValue(int emb_index, int offset) { - int8 meta = *((int8*)((char*)this->ptr_ + 6)); - std::bitset<8> bs(meta); - if (bs.test(emb_index)) { - return ((V*)this->ptr_ + sizeof(int64) / - sizeof(V) + offset); - } else { - return nullptr; - } - } - - virtual void Destroy(Allocator* allocator) { - allocator->DeallocateRaw(this->ptr_); - } - - virtual void* GetPtr() const { - return (void*)ptr_; - } - - private: - char ptr_[23]; - std::atomic_flag flag_ = ATOMIC_FLAG_INIT; -}; - -} // namespace tensorflow - -#endif // TENSORFLOW_CORE_FRAMEWORK_EMBEDDING_VALUE_PTR_H_ diff --git a/tensorflow/core/kernels/BUILD b/tensorflow/core/kernels/BUILD index 115e3c4bae6..0c08c30c30a 100644 --- a/tensorflow/core/kernels/BUILD +++ b/tensorflow/core/kernels/BUILD @@ -439,7 +439,8 @@ tf_cc_test( tf_cuda_cc_test( name = "embedding_variable_ops_test", - srcs = ["embedding_variable_ops_test.cc"], + srcs = ["embedding_variable_ops_test.cc", + "embedding_variable_test.h"], extra_copts = ["-fexceptions", "-g"], deps = [ ":io", @@ -6497,7 +6498,7 @@ tf_kernel_library( "training_ali_ops_gpu.h", "training_ali_ops.h" ], - copts = tf_copts(), + copts = tf_copts() + ["-g"], deps = [ ":bounds_check", ":training_op_helpers", diff --git a/tensorflow/core/kernels/embedding_variable_memory_test.cc b/tensorflow/core/kernels/embedding_variable_memory_test.cc index 7ec6b1cf109..393e9a9754b 100644 --- a/tensorflow/core/kernels/embedding_variable_memory_test.cc +++ b/tensorflow/core/kernels/embedding_variable_memory_test.cc @@ -19,17 +19,22 @@ namespace embedding { float PerfMemory(Tensor& default_value, const std::vector& id_list, int value_size, int64 default_value_dim, - int64 filter_freq = 0) { + int64 filter_freq = 0, int64 steps_to_live = 0, + int64 record_freq = false) { auto ev = CreateEmbeddingVar(value_size, default_value, - default_value_dim, filter_freq); - ValuePtr* value_ptr = nullptr; + default_value_dim, filter_freq, + steps_to_live, -1.0, + embedding::StorageType::DRAM, + {1024, 1024, 1024, 1024}, + record_freq); + void* value_ptr = nullptr; bool is_filter = false; double start_mem, end_mem; start_mem = getResident() * getpagesize(); for (int i = 0; i < id_list.size(); i++) { ev->LookupOrCreateKey(id_list[i], &value_ptr, &is_filter, false); if (is_filter) - ev->flat(value_ptr, id_list[i]); + ev->flat(value_ptr); } end_mem = getResident() * getpagesize(); double used_mb = (end_mem - start_mem)/1000000; @@ -58,7 +63,7 @@ TEST(EmbeddingVariabelMemoryTest, TestMemory) { float used_mb = PerfMemory(default_value, id_list, value_size, default_value_dim); float theoritical_mb = - 50 + num_of_ids * (32 + 32 + value_size * sizeof(float))/ 1000000; + 50 + num_of_ids * (value_size * sizeof(float)) / 1000000; EXPECT_TRUE((used_mb > theoritical_mb * 0.99) && (used_mb < theoritical_mb * 1.01)); @@ -68,9 +73,10 @@ TEST(EmbeddingVariabelMemoryTest, TestMemory) { used_mb = PerfMemory(default_value, id_list, value_size, default_value_dim, filter_freq); theoritical_mb = - 50 + num_of_ids * (32 + 32 + 16 + value_size * sizeof(float)/2)/ 1000000; + 50 + num_of_ids * (8 + value_size * sizeof(float) / 2 + + 4/*memory for ids_list*/) / 1000000; EXPECT_TRUE((used_mb > theoritical_mb * 0.99) && - (used_mb < theoritical_mb * 1.01)); + (used_mb < theoritical_mb * 1.02)); } } //namespace embedding } //namespace tensorflow diff --git a/tensorflow/core/kernels/embedding_variable_ops_test.cc b/tensorflow/core/kernels/embedding_variable_ops_test.cc index 4839c171708..e30381fef07 100644 --- a/tensorflow/core/kernels/embedding_variable_ops_test.cc +++ b/tensorflow/core/kernels/embedding_variable_ops_test.cc @@ -21,6 +21,7 @@ #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/framework/types.h" #include "tensorflow/core/framework/types.pb.h" +#include "tensorflow/core/kernels/embedding_variable_test.h" #include "tensorflow/core/kernels/ops_testutil.h" #include "tensorflow/core/kernels/ops_util.h" #include "tensorflow/core/lib/io/path.h" @@ -48,18 +49,6 @@ namespace { const int THREADNUM = 16; const int64 max = 2147483647; -template -class TestableEmbeddingVar : public EmbeddingVar { - public: - TestableEmbeddingVar(const string& name, - embedding::Storage* storage, - EmbeddingConfig emb_cfg = EmbeddingConfig(), - Allocator* alloc = nullptr) : EmbeddingVar( - name, storage, emb_cfg, alloc) {} - - using EmbeddingVar::GetFilter; -}; - struct ProcMemory { long size; // total program size long resident; // resident set size @@ -123,11 +112,7 @@ TEST(EmbeddingVariableTest, TestEmptyEV) { Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 9.0)); { - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto variable = new EmbeddingVar("EmbeddingVar", - storage, EmbeddingConfig(), cpu_allocator()); - variable->Init(value, 1); + auto variable = CreateEmbeddingVar(value_size, value, 1); LOG(INFO) << "size:" << variable->Size(); Tensor part_offset_tensor(DT_INT32, TensorShape({kSavedPartitionNum + 1})); @@ -191,19 +176,14 @@ TEST(EmbeddingVariableTest, TestEVExportSmallLockless) { int64 value_size = 8; Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 9.0)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddigVar"); - auto variable = new EmbeddingVar("EmbeddingVar", - storage, EmbeddingConfig(0, 0, 1, 1, "", 5), - cpu_allocator()); - variable->Init(value, 1); + auto variable = CreateEmbeddingVar(value_size, value, 1, 0, 5); Tensor part_offset_tensor(DT_INT32, TensorShape({kSavedPartitionNum + 1})); for (int64 i = 0; i < 5; i++) { - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; variable->LookupOrCreateKey(i, &value_ptr); - typename TTypes::Flat vflat = variable->flat(value_ptr, i); + typename TTypes::Flat vflat = variable->flat(value_ptr); vflat(i) = 5.0; } @@ -269,20 +249,15 @@ TEST(EmbeddingVariableTest, TestEVExportLargeLockless) { Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 9.0)); float* fill_v = (float*)malloc(value_size * sizeof(float)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto variable = new EmbeddingVar("EmbeddingVar", - storage, EmbeddingConfig(0, 0, 1, 1, "", 5), - cpu_allocator()); - variable->Init(value, 1); + auto variable = CreateEmbeddingVar(value_size, value, 1, 0, 5); Tensor part_offset_tensor(DT_INT32, TensorShape({kSavedPartitionNum + 1})); int64 ev_size = 10048576; for (int64 i = 0; i < ev_size; i++) { - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; variable->LookupOrCreateKey(i, &value_ptr); - typename TTypes::Flat vflat = variable->flat(value_ptr, i); + typename TTypes::Flat vflat = variable->flat(value_ptr); } LOG(INFO) << "size:" << variable->Size(); @@ -344,9 +319,9 @@ TEST(EmbeddingVariableTest, TestEVExportLargeLockless) { void multi_insertion(EmbeddingVar* variable, int64 value_size){ for (long j = 0; j < 5; j++) { - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; variable->LookupOrCreateKey(j, &value_ptr); - typename TTypes::Flat vflat = variable->flat(value_ptr, j); + typename TTypes::Flat vflat = variable->flat(value_ptr); } } @@ -355,12 +330,7 @@ TEST(EmbeddingVariableTest, TestMultiInsertion) { Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 9.0)); float* fill_v = (float*)malloc(value_size * sizeof(float)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto variable = new EmbeddingVar("EmbeddingVar", - storage, EmbeddingConfig(), cpu_allocator()); - - variable->Init(value, 1); + auto variable = CreateEmbeddingVar(value_size, value, 1); std::vector insert_threads(THREADNUM); for (size_t i = 0 ; i < THREADNUM; i++) { @@ -375,54 +345,45 @@ TEST(EmbeddingVariableTest, TestMultiInsertion) { void InsertAndLookup(EmbeddingVar* variable, int64 *keys, long ReadLoops, int value_size){ - float *default_value_fake = (float *)malloc((value_size)*sizeof(float)); - for (int j = 0; j < value_size; j++) { - default_value_fake[j] = -1.0; - } for (long j = 0; j < ReadLoops; j++) { - float *val = (float *)malloc((value_size+1)*sizeof(float)); - float *default_value = (float *)malloc((value_size)*sizeof(float)); - for (int k = 0; k < value_size; k++) { - default_value[k] = (float)keys[j]; - } - variable->LookupOrCreate(keys[j], val, default_value); - variable->LookupOrCreate(keys[j], val, default_value_fake); - ASSERT_EQ(default_value[0] , val[0]); - free(val); - free(default_value); + void* val = nullptr; + void* val_1 = nullptr; + bool is_filter = true; + variable->LookupOrCreateKey(keys[j], &val, &is_filter, false); + variable->LookupOrCreateKey(keys[j], &val_1, &is_filter, false); + ASSERT_EQ(val, val_1); } - free(default_value_fake); } void MultiBloomFilter(EmbeddingVar* var, int value_size, int64 i) { for (long j = 0; j < 1; j++) { - float *val = (float *)malloc((value_size+1)*sizeof(float)); - var->LookupOrCreate(i+1, val, nullptr); + void* val = nullptr; + bool is_filter = true; + var->LookupOrCreateKey(i+1, &val, &is_filter, false); } } TEST(EmbeddingVariableTest, TestBloomFilter) { int value_size = 10; Tensor value(DT_FLOAT, TensorShape({value_size})); - test::FillValues(&value, std::vector(value_size, 10.0)); - float* fill_v = (float*)malloc(value_size * sizeof(float)); - - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto var = new EmbeddingVar("EmbeddingVar", - storage, - EmbeddingConfig(0, 0, 1, 1, "", 5, 3, 99999, -1.0, "normal", 10, 0.01), - cpu_allocator()); - - var->Init(value, 1); - - float *val = (float *)malloc((value_size+1)*sizeof(float)); - float *default_value = (float *)malloc((value_size+1)*sizeof(float)); - var->LookupOrCreate(1, val, default_value); - var->LookupOrCreate(1, val, default_value); - var->LookupOrCreate(1, val, default_value); - var->LookupOrCreate(1, val, default_value); - var->LookupOrCreate(2, val, default_value); + std::vector default_value = + {0.0 ,1.0 ,2.0 ,3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0}; + test::FillValues(&value, default_value); + + auto var = CreateEmbeddingVar(value_size, value, + 1, 3, 5, -1.0, + embedding::StorageType::DRAM, + {1024, 1024, 1024, 1024}, + false, 10, 0.01); + + //float *val = (float *)malloc((value_size+1)*sizeof(float)); + void* val = nullptr; + bool is_filter = true; + var->LookupOrCreateKey(1, &val, &is_filter, false); + var->LookupOrCreateKey(1, &val, &is_filter, false); + var->LookupOrCreateKey(1, &val, &is_filter, false); + var->LookupOrCreateKey(1, &val, &is_filter, false); + var->LookupOrCreateKey(2, &val, &is_filter, false); std::vector keylist; std::vector valuelist; @@ -437,14 +398,11 @@ TEST(EmbeddingVariableTest, TestBloomCounterInt64) { Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 10.0)); float* fill_v = (float*)malloc(value_size * sizeof(float)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto var = new TestableEmbeddingVar("EmbeddingVar", - storage, - EmbeddingConfig(0, 0, 1, 1, "", 5, 3, 99999, -1.0, - "normal", 10, 0.01, DT_UINT64), cpu_allocator()); - - var->Init(value, 1); + auto var = CreateEmbeddingVar(value_size, value, + 1, 3, 5, -1.0, + embedding::StorageType::DRAM, + {1024, 1024, 1024, 1024}, + false, 10, 0.01, DT_UINT64); float *val = (float *)malloc((value_size+1)*sizeof(float)); @@ -509,14 +467,11 @@ TEST(EmbeddingVariableTest, TestBloomCounterInt32) { test::FillValues(&value, std::vector(value_size, 10.0)); float* fill_v = (float*)malloc(value_size * sizeof(float)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto var = new TestableEmbeddingVar("EmbeddingVar", - storage, - EmbeddingConfig(0, 0, 1, 1, "", 5, 3, 99999, -1.0, - "normal", 10, 0.01, DT_UINT32), cpu_allocator()); - - var->Init(value, 1); + auto var = CreateEmbeddingVar(value_size, value, + 1, 3, 5, -1.0, + embedding::StorageType::DRAM, + {1024, 1024, 1024, 1024}, + false, 10, 0.01, DT_UINT32); float *val = (float *)malloc((value_size+1)*sizeof(float)); @@ -581,14 +536,11 @@ TEST(EmbeddingVariableTest, TestBloomCounterInt16) { test::FillValues(&value, std::vector(value_size, 10.0)); float* fill_v = (float*)malloc(value_size * sizeof(float)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto var = new TestableEmbeddingVar("EmbeddingVar", - storage, - EmbeddingConfig(0, 0, 1, 1, "", 5, 3, 99999, -1.0, - "normal_contiguous", 10, 0.01, DT_UINT16), cpu_allocator()); - - var->Init(value, 1); + auto var = CreateEmbeddingVar(value_size, value, + 1, 3, 5, -1.0, + embedding::StorageType::DRAM, + {1024, 1024, 1024, 1024}, + false, 10, 0.01, DT_UINT16); float *val = (float *)malloc((value_size+1)*sizeof(float)); @@ -654,14 +606,11 @@ TEST(EmbeddingVariableTest, TestBloomCounterInt8) { test::FillValues(&value, std::vector(value_size, 10.0)); float* fill_v = (float*)malloc(value_size * sizeof(float)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto var = new TestableEmbeddingVar("EmbeddingVar", - storage, - EmbeddingConfig(0, 0, 1, 1, "", 5, 3, 99999, -1.0, - "normal_contiguous", 10, 0.01, DT_UINT8), cpu_allocator()); - - var->Init(value, 1); + auto var = CreateEmbeddingVar(value_size, value, + 1, 3, 5, -1.0, + embedding::StorageType::DRAM, + {1024, 1024, 1024, 1024}, + false, 10, 0.01, DT_UINT8); float *val = (float *)malloc((value_size+1)*sizeof(float)); @@ -725,12 +674,7 @@ TEST(EmbeddingVariableTest, TestInsertAndLookup) { int64 value_size = 128; Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 10)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto variable = new EmbeddingVar("EmbeddingVar", - storage, EmbeddingConfig(), cpu_allocator()); - - variable->Init(value, 1); + auto variable = CreateEmbeddingVar(value_size, value, 1); int64 InsertLoops = 1000; bool* flag = (bool *)malloc(sizeof(bool)*max); @@ -765,8 +709,9 @@ TEST(EmbeddingVariableTest, TestInsertAndLookup) { } void MultiFilter(EmbeddingVar* variable, int value_size) { - float *val = (float *)malloc((value_size+1)*sizeof(float)); - variable->LookupOrCreate(20, val, nullptr); + bool is_filter = true; + void* val; + variable->LookupOrCreateKey(20, &val, &is_filter, false); } TEST(EmbeddingVariableTest, TestFeatureFilterParallel) { @@ -774,14 +719,8 @@ TEST(EmbeddingVariableTest, TestFeatureFilterParallel) { Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 10.0)); float* fill_v = (float*)malloc(value_size * sizeof(float)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto var = new EmbeddingVar("EmbeddingVar", - storage, - EmbeddingConfig(0, 0, 1, 1, "", 5, 7), - cpu_allocator()); - - var->Init(value, 1); + auto var = CreateEmbeddingVar(value_size, value, 1, 7, 5); + float *val = (float *)malloc((value_size+1)*sizeof(float)); int thread_num = 5; std::vector insert_threads(thread_num); @@ -792,20 +731,16 @@ TEST(EmbeddingVariableTest, TestFeatureFilterParallel) { t.join(); } - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; var->LookupOrCreateKey(20, &value_ptr); - ASSERT_EQ(value_ptr->GetFreq(), thread_num); + ASSERT_EQ(var->GetFreq(20), thread_num); } EmbeddingVar* InitEV_Lockless(int64 value_size) { Tensor value(DT_INT64, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 10)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto variable = new EmbeddingVar("EmbeddingVar", - storage, EmbeddingConfig(), cpu_allocator()); + auto variable = CreateEmbeddingVar(value_size, value, 1); - variable->Init(value, 1); return variable; } @@ -813,7 +748,7 @@ void MultiLookup(EmbeddingVar* variable, int64 InsertLoop, int thread_num, int i) { for (int64 j = i * InsertLoop/thread_num; j < (i+1)*InsertLoop/thread_num; j++) { - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; variable->LookupOrCreateKey(j, &value_ptr); } } @@ -829,9 +764,9 @@ void BM_MULTIREAD_LOCKLESS(int iters, int thread_num) { float* fill_v = (float*)malloc(value_size * sizeof(float)); for (int64 i = 0; i < InsertLoop; i++){ - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; variable->LookupOrCreateKey(i, &value_ptr); - typename TTypes::Flat vflat = variable->flat(value_ptr, i); + typename TTypes::Flat vflat = variable->flat(value_ptr); } testing::StartTiming(); @@ -848,58 +783,6 @@ void BM_MULTIREAD_LOCKLESS(int iters, int thread_num) { } -void hybrid_process(EmbeddingVar* variable, - int64* keys, int64 InsertLoop, int thread_num, - int64 i, int64 value_size) { - float *val = (float *)malloc(sizeof(float)*(value_size + 1)); - for (int64 j = i * InsertLoop/thread_num; - j < (i+1) * InsertLoop/thread_num; j++) { - variable->LookupOrCreate(keys[j], val, nullptr); - } -} - -void BM_HYBRID_LOCKLESS(int iters, int thread_num) { - testing::StopTiming(); - testing::UseRealTime(); - - int64 value_size = 128; - auto variable = InitEV_Lockless(value_size); - int64 InsertLoop = 1000000; - - srand((unsigned)time(NULL)); - int64 *keys = (int64 *)malloc(sizeof(int64)*InsertLoop); - - for (int64 i = 0; i < InsertLoop; i++) { - keys[i] = rand() % 1000; - } - - testing::StartTiming(); - while (iters--) { - std::vector insert_threads(thread_num); - for (size_t i = 0 ; i < thread_num; i++) { - insert_threads[i] = std::thread(hybrid_process, - variable, keys, InsertLoop, thread_num, i, value_size); - } - for (auto &t : insert_threads) { - t.join(); - } - } -} - -BENCHMARK(BM_MULTIREAD_LOCKLESS) - ->Arg(1) - ->Arg(2) - ->Arg(4) - ->Arg(8) - ->Arg(16); - -BENCHMARK(BM_HYBRID_LOCKLESS) - ->Arg(1) - ->Arg(2) - ->Arg(4) - ->Arg(8) - ->Arg(16); - TEST(EmbeddingVariableTest, TestAllocate) { int value_len = 8; @@ -923,23 +806,13 @@ TEST(EmbeddingVariableTest, TestEVStorageType_DRAM) { Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 9.0)); float* fill_v = (float*)malloc(value_size * sizeof(float)); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto variable = new EmbeddingVar("EmbeddingVar", - storage, - EmbeddingConfig(/*emb_index = */0, /*primary_emb_index = */0, - /*block_num = */1, /*slot_num = */1, - /*name = */"", /*steps_to_live = */0, - /*filter_freq = */0, /*max_freq = */999999, - /*l2_weight_threshold = */-1.0, /*layout = */"normal", - /*max_element_size = */0, /*false_positive_probability = */-1.0, - /*counter_type = */DT_UINT64), - cpu_allocator()); - variable->Init(value, 1); + auto variable = CreateEmbeddingVar(value_size, value, 1); int64 ev_size = 100; for (int64 i = 0; i < ev_size; i++) { - variable->LookupOrCreate(i, fill_v, nullptr); + void* val = nullptr; + bool is_filter = true; + variable->LookupOrCreateKey(i, &val, &is_filter, false); } LOG(INFO) << "size:" << variable->Size(); @@ -947,59 +820,20 @@ TEST(EmbeddingVariableTest, TestEVStorageType_DRAM) { void t1(KVInterface* hashmap) { for (int i = 0; i< 100; ++i) { - hashmap->Insert(i, new NormalValuePtr(ev_allocator(), 100)); + hashmap->Insert(i, nullptr); } } TEST(EmbeddingVariableTest, TestRemoveLockless) { - KVInterface* hashmap = new LocklessHashMap(); - ASSERT_EQ(hashmap->Size(), 0); - LOG(INFO) << "hashmap size: " << hashmap->Size(); - auto t = std::thread(t1, hashmap); - t.join(); - LOG(INFO) << "hashmap size: " << hashmap->Size(); - ASSERT_EQ(hashmap->Size(), 100); - TF_CHECK_OK(hashmap->Remove(1)); - TF_CHECK_OK(hashmap->Remove(2)); - ASSERT_EQ(hashmap->Size(), 98); - LOG(INFO) << "2 size:" << hashmap->Size(); -} - -TEST(EmbeddingVariableTest, TestBatchCommitofDBKV) { - int64 value_size = 4; + auto feat_desc = new embedding::FeatureDescriptor( + 1, 1, ev_allocator(), embedding::StorageType::DRAM, + false, false, {false, 0}); KVInterface* hashmap = - new LevelDBKV(testing::TmpDir()); - hashmap->SetTotalDims(value_size); - - for (int64 i = 0; i < 6; ++i) { - const ValuePtr* tmp = - new NormalContiguousValuePtr(ev_allocator(), value_size); - hashmap->Commit(i, tmp); - } - - for(int64 i = 0; i < 6; i++) { - ValuePtr* tmp = nullptr; - Status s = hashmap->Lookup(i, &tmp); - ASSERT_EQ(s.ok(), true); - } -} - -void InsertAndCommit(KVInterface* hashmap) { - for (int64 i = 0; i< 100; ++i) { - const ValuePtr* tmp = - new NormalContiguousValuePtr(ev_allocator(), 100); - hashmap->Insert(i, tmp); - hashmap->Commit(i, tmp); - } -} - -TEST(EmbeddingVariableTest, TestSizeDBKV) { - KVInterface* hashmap = - new LevelDBKV(testing::TmpDir()); - hashmap->SetTotalDims(100); + new LocklessHashMap(feat_desc); + feat_desc->InitSlotInfo(0, 100, {nullptr, 1}); ASSERT_EQ(hashmap->Size(), 0); LOG(INFO) << "hashmap size: " << hashmap->Size(); - auto t = std::thread(InsertAndCommit, hashmap); + auto t = std::thread(t1, hashmap); t.join(); LOG(INFO) << "hashmap size: " << hashmap->Size(); ASSERT_EQ(hashmap->Size(), 100); @@ -1190,213 +1024,6 @@ TEST(EmbeddingVariableTest, TestLFUCache) { } } -TEST(EmbeddingVariableTest, TestCacheRestore) { - setenv("TF_SSDHASH_ASYNC_COMPACTION", "false", 1); - int64 value_size = 4; - Tensor value(DT_FLOAT, TensorShape({value_size})); - test::FillValues(&value, std::vector(value_size, 9.0)); - float* fill_v = (float*)malloc(value_size * sizeof(float)); - std::vector size; - size.emplace_back(64); - auto emb_config = EmbeddingConfig( - /*emb_index = */0, /*primary_emb_index = */0, - /*block_num = */1, /*slot_num = */0, - /*name = */"", /*steps_to_live = */0, - /*filter_freq = */0, /*max_freq = */999999, - /*l2_weight_threshold = */-1.0, /*layout = */"normal_contiguous", - /*max_element_size = */0, /*false_positive_probability = */-1.0, - /*counter_type = */DT_UINT64); - auto storage= embedding::StorageFactory::Create( - embedding::StorageConfig(embedding::DRAM_SSDHASH, - testing::TmpDir(), - size, "normal_contiguous", - emb_config), - cpu_allocator(), - "EmbeddingVar"); - auto variable = new EmbeddingVar("EmbeddingVar", - storage, emb_config, cpu_allocator()); - variable->Init(value, 1); - variable->InitCache(CacheStrategy::LFU); - - Tensor part_offset_tensor(DT_INT32, TensorShape({kSavedPartitionNum + 1})); - - int64 ev_size = 7; - int64 cache_size = 3; - for (int64 i = 1; i < cache_size; i++) { - ValuePtr* value_ptr = nullptr; - variable->LookupOrCreateKey(i, &value_ptr); - typename TTypes::Flat vflat = variable->flat(value_ptr, i); - value_ptr->AddFreq(2); - } - for (int64 i = cache_size; i < ev_size; i++) { - ValuePtr* value_ptr = nullptr; - variable->LookupOrCreateKey(i, &value_ptr); - typename TTypes::Flat vflat = variable->flat(value_ptr, i); - value_ptr->AddFreq(1); - } - - LOG(INFO) << "size:" << variable->Size(); - - BundleWriter writer(Env::Default(), Prefix("foo")); - embedding::ShrinkArgs shrink_args; - shrink_args.global_step = 1; - variable->Save("var/part_0", Prefix("foo"), &writer, shrink_args); - TF_ASSERT_OK(writer.Finish()); - variable->Unref(); - - auto imported_storage= embedding::StorageFactory::Create( - embedding::StorageConfig(embedding::DRAM_SSDHASH, - testing::TmpDir(), - size, "normal_contiguous", - emb_config), - cpu_allocator(), - "EmbeddingVar1"); - auto imported_variable = new EmbeddingVar("EmbeddingVar1", - imported_storage, emb_config, cpu_allocator()); - imported_variable->Init(value, 1); - imported_variable->InitCache(CacheStrategy::LFU); - - BundleReader reader(Env::Default(), Prefix("foo")); - std::string name_string("var"); - imported_variable->Restore(name_string, Prefix("foo"), 0, 1, false, &reader, false); - - ASSERT_EQ(imported_storage->Size(0), ev_size - cache_size); - ASSERT_EQ(imported_storage->Size(1), 2); - delete imported_storage; -} - -void t1_gpu(KVInterface* hashmap) { - for (int i = 0; i< 100; ++i) { - hashmap->Insert(i, new NormalGPUValuePtr(ev_allocator(), 100)); - } -} - -#if GOOGLE_CUDA -TEST(EmbeddingVariableTest,TestRemoveLocklessCPU) { - SessionOptions sops; - std::unique_ptr device = - DeviceFactory::NewDevice(DEVICE_GPU, sops, "/job:a/replica:0/task:0"); - Allocator* gpu_allocator = GPUProcessState::singleton()->GetGPUAllocator( - GPUOptions(), TfGpuId(0), 1 << 26); - KVInterface* hashmap = - new LocklessHashMapCPU(gpu_allocator); - ASSERT_EQ(hashmap->Size(), 0); - LOG(INFO) << "hashmap size: " << hashmap->Size(); - auto t = std::thread(t1, hashmap); - t.join(); - LOG(INFO) << "hashmap size: " << hashmap->Size(); - ASSERT_EQ(hashmap->Size(), 100); - TF_CHECK_OK(hashmap->Remove(1)); - TF_CHECK_OK(hashmap->Remove(2)); - ASSERT_EQ(hashmap->Size(), 98); - LOG(INFO) << "2 size:" << hashmap->Size(); -} -#endif // GOOGLE_CUDA - -/*void CommitGPU(KVInterface* hashmap) { - for (int64 i = 0; i< 100; ++i) { - ValuePtr* tmp= new NormalGPUValuePtr(ev_allocator(), 100); - hashmap->Commit(i, tmp); - } -} - -TEST(EmbeddingVariableTest, TestCommitHashMapCPU) { - KVInterface* hashmap = new LocklessHashMapCPU(); - hashmap->SetTotalDims(100); - ASSERT_EQ(hashmap->Size(), 0); - LOG(INFO) << "hashmap size: " << hashmap->Size(); - auto t = std::thread(CommitGPU, hashmap); - t.join(); - LOG(INFO) << "hashmap size: " << hashmap->Size(); - ASSERT_EQ(hashmap->Size(), 100); - TF_CHECK_OK(hashmap->Remove(1)); - TF_CHECK_OK(hashmap->Remove(2)); - ASSERT_EQ(hashmap->Size(), 98); - LOG(INFO) << "2 size:" << hashmap->Size(); -} - -TEST(EmbeddingVariableTest, TestGPUValuePtr) { - int ev_list_size = 32; - ValuePtr* ptr_ = new NormalGPUValuePtr(ev_allocator(), ev_list_size); - float* address = *(float **)((char *)ptr_->GetPtr() + sizeof(FixedLengthHeader)); - float host_data[ev_list_size]; - float initial_data[ev_list_size]; - for(int i = 0;i < ev_list_size;++i){ - initial_data[i] = 10; - } - for(int i = 0;i < ev_list_size;++i){ - LOG(INFO) << i << " " << initial_data[i]; - } - cudaMemcpy(address, initial_data, ev_list_size * sizeof(float), cudaMemcpyHostToDevice); - cudaMemcpy(host_data, address, ev_list_size * sizeof(float), cudaMemcpyDeviceToHost); - for(int i = 0;i < ev_list_size;++i){ - LOG(INFO) << i << " " << host_data[i]; - } -}//Forbidden, due to no gpu allocator at that time - -TEST(EmbeddingVariableTest, TestCommitValue) { - int ev_list_size = 32; - ValuePtr* ptr_ = new NormalGPUValuePtr(ev_allocator(),ev_list_size); - float* address = *(float **)((char *)ptr_->GetPtr() + sizeof(FixedLengthHeader)); - float initial_data[ev_list_size]; - for(int i = 0;i < ev_list_size;++i){ - initial_data[i] = 10; - } - cudaMemcpy(address, initial_data, ev_list_size * sizeof(float), cudaMemcpyHostToDevice); - KVInterface* hashmap = new LocklessHashMapCPU(); - hashmap->SetTotalDims(ev_list_size); - hashmap->Commit(1, ptr_); - ValuePtr* check; - hashmap->Lookup(1,&check); - LOG(INFO) << "hashmap size: " << hashmap->Size(); - float* tmp = (float *)((char *)check->GetPtr() + sizeof(FixedLengthHeader)); - - for(int i = 0;i < ev_list_size;++i){ - LOG(INFO) << i << " " << tmp[i]; - //ASSERT_EQ(tmp[i], 10); - }// -} - -TEST(EmbeddingVariableTest, TestBatchCommitofLocklessHashMapCPU) { - KVInterface* hashmap = new LocklessHashMapCPU(); - const int EmbeddingSize = 16; - const int BatchSize = 16; - - hashmap->SetTotalDims(EmbeddingSize); - std::vector*> value_ptr_list; - std::vector key_list; - - for(int64 i = 0; i < BatchSize; i++) { - key_list.emplace_back(i); - ValuePtr* ptr_ = new NormalGPUValuePtr(EmbeddingSize); - float* address = *(float **)((char *)ptr_->GetPtr() + sizeof(FixedLengthHeader)); - float initial_data[EmbeddingSize]; - for(int j = 0;j < EmbeddingSize;++j){ - initial_data[j] = i; - //LOG(INFO) << "initial[" << i << "][" << j << "]=" << initial_data[j]; - } - cudaMemcpy(address, initial_data, EmbeddingSize * sizeof(float), cudaMemcpyHostToDevice); - value_ptr_list.emplace_back(ptr_); - }//initialize V on GPU - - timespec start,end; - clock_gettime(CLOCK_MONOTONIC, &start); - hashmap->BatchCommit(key_list, value_ptr_list); - clock_gettime(CLOCK_MONOTONIC, &end); - std::cout << "time: " << ((double)(end.tv_sec - start.tv_sec)*1000000000 + end.tv_nsec - start.tv_nsec)/1000000 << "ms" << std::endl; - - for(int64 i = 0; i < BatchSize; i++) { - ValuePtr* check; - hashmap->Lookup(i,&check); - float* tmp = (float *)((char *)check->GetPtr() + sizeof(FixedLengthHeader)); - for(int j = 0;j < EmbeddingSize;++j){ - LOG(INFO) << "batch[" << i << "][" << j << "]=" << tmp[j]; - //ASSERT_EQ(tmp[j], i); - } - }//compare value after BatchCommit -} -*/ - const int total_size = 1024 * 8; const int th_num = 1; const int malloc_size = total_size / th_num; @@ -1466,17 +1093,11 @@ TEST(EmbeddingVariableTest, TestCPUGPUMalloc) { auto mem_pool = new EmbeddingMemoryPool(gpu_allocator, 256, 1024); float* ptr_1 = mem_pool->Allocate(); float* ptr_2 = mem_pool->Allocate(); - ValuePtr* value_ptr1 = new NormalGPUValuePtr(gpu_allocator, 256); - ValuePtr* value_ptr2 = new NormalGPUValuePtr(gpu_allocator, 256); - value_ptr1->SetPtr(ptr_1); - value_ptr2->SetPtr(ptr_2); - value_ptr1->SetInitialized(0); - value_ptr2->SetInitialized(0); - std::vector*> value_ptrs; - value_ptrs.emplace_back(value_ptr1); + std::vector value_ptrs; + value_ptrs.emplace_back(ptr_1); mem_pool->Deallocate(value_ptrs); value_ptrs.clear(); - value_ptrs.emplace_back(value_ptr2); + value_ptrs.emplace_back(ptr_2); mem_pool->Deallocate(value_ptrs); float* ptr_3 = mem_pool->Allocate(); ASSERT_EQ(ptr_1, ptr_3); @@ -1539,16 +1160,16 @@ TEST(EmbeddingVariableTest, TestEVMallocFree) { void SingleCommit(KVInterface* hashmap, std::vector keys, int bias) { - std::vector*> value_ptrs; + std::vector value_ptrs; for (int64 i = 0; i < keys.size(); ++i) { - ValuePtr* tmp = - new NormalContiguousValuePtr(cpu_allocator(), 124); - tmp->SetValue(float(keys[i] + bias), 124); + void* tmp = cpu_allocator()->AllocateRaw(0, 124 * sizeof(float) + 16); + for (int j = 0; j < 124; j++) { + ((float*)tmp)[j] = keys[i] + bias; + } value_ptrs.push_back(tmp); } ASSERT_EQ(keys.size(), value_ptrs.size()); uint64 start = Env::Default()->NowNanos(); - for (int64 i = 0; i < keys.size(); i++) { hashmap->Commit(keys[i], value_ptrs[i]); } @@ -1558,9 +1179,13 @@ void SingleCommit(KVInterface* hashmap, void TestCompaction() { std::string temp_dir = testing::TmpDir(); + auto feat_desc = new embedding::FeatureDescriptor( + 1, 1, ev_allocator(), embedding::StorageType::DRAM_SSDHASH, + true, true, {false, 0}); auto hashmap = new SSDHashKV( - temp_dir, cpu_allocator()); - hashmap->SetTotalDims(124); + temp_dir, feat_desc); + feat_desc->InitSlotInfo(0, 124, {nullptr, 1}); + hashmap->Init(); ASSERT_EQ(hashmap->Size(), 0); std::vector ids; for (int i = 0; i < 262144; i++) { @@ -1576,12 +1201,12 @@ void TestCompaction() { t1.join(); ids.clear(); sleep(1); - ValuePtr* val = nullptr; + void* val = nullptr; for (int i = 131073; i < 262144; i++) { hashmap->Lookup(i, &val); - float* v = (float*)val->GetPtr(); + float* v = (float*)val; for (int j = 0; j < 124; j++){ - ASSERT_EQ(v[4+j], i+3); + ASSERT_EQ(v[j], i+3); } } for (int i = 131073; i < 262144; i++) { @@ -1596,16 +1221,16 @@ void TestCompaction() { sleep(1); for (int i = 0; i < 131073; i++) { hashmap->Lookup(i, &val); - float* v = (float*)val->GetPtr(); + float* v = (float*)val; for (int j = 0; j < 124; j++){ - ASSERT_EQ(v[4+j], i + 1); + ASSERT_EQ(v[j], i + 1); } } for (int i = 131073; i < 262144; i++) { hashmap->Lookup(i, &val); - float* v = (float*)val->GetPtr(); + float* v = (float*)val; for (int j = 0; j < 124; j++){ - ASSERT_EQ(v[4+j], i + 2); + ASSERT_EQ(v[j], i + 2); } } delete hashmap; @@ -1622,10 +1247,14 @@ TEST(KVInterfaceTest, TestSSDKVSyncCompaction) { } void TestReadEmbFile() { + auto feat_desc = new embedding::FeatureDescriptor( + 1, 1, ev_allocator(), embedding::StorageType::DRAM_SSDHASH, + true, true, {false, 0}); std::string temp_dir = testing::TmpDir(); auto hashmap = new SSDHashKV( - temp_dir, cpu_allocator()); - hashmap->SetTotalDims(124); + temp_dir, feat_desc); + feat_desc->InitSlotInfo(0, 124, {nullptr, 1}); + hashmap->Init(); ASSERT_EQ(hashmap->Size(), 0); std::vector ids; for (int i = 0; i < 262145; i++) { @@ -1634,12 +1263,12 @@ void TestReadEmbFile() { SingleCommit(hashmap, ids, 3); sleep(1); ids.clear(); - ValuePtr* val = nullptr; + void* val = nullptr; for (int i = 0; i < 262144; i++) { hashmap->Lookup(i, &val); - float* v = (float*)val->GetPtr(); + float* v = (float*)val; for (int j = 0; j < 124; j++){ - ASSERT_EQ(v[4+j], i+3); + ASSERT_EQ(v[j], i+3); } } delete hashmap; @@ -1666,9 +1295,10 @@ TEST(KVInterfaceTest, TestDirectIoFile) { void InsertKey(EmbeddingVar* variable, int value_size) { float *val = (float *)malloc((value_size+1)*sizeof(float)); for (int64 i = 0; i < 100000000; i++) { - variable->LookupOrCreate(20, val, nullptr); + void* val = nullptr; + bool is_filter = true; + variable->LookupOrCreateKey(20, &val, &is_filter, false); } - LOG(INFO)<<"Finish Insert"; } void RemoveKey(EmbeddingVar* variable) { @@ -1676,29 +1306,13 @@ void RemoveKey(EmbeddingVar* variable) { sleep(1); variable->storage()->Remove(20); } - LOG(INFO)<<"Remove thread finish"; } TEST(EmbeddingVariableTest, TestLookupRemoveConcurrency) { int value_size = 10; Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 10.0)); - auto emb_config = EmbeddingConfig( - /*emb_index = */0, /*primary_emb_index = */0, - /*block_num = */1, /*slot_num = */0, - /*name = */"", /*steps_to_live = */0, - /*filter_freq = */2, /*max_freq = */999999, - /*l2_weight_threshold = */-1.0, /*layout = */"normal", - /*max_element_size = */0, /*false_positive_probability = */-1.0, - /*counter_type = */DT_UINT64); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto var = new EmbeddingVar("EmbeddingVar", - storage, - emb_config, - cpu_allocator()); - - var->Init(value, 1); + auto var = CreateEmbeddingVar(value_size, value, 1); int thread_num = 5; std::vector insert_threads(thread_num); for (size_t i = 0 ; i < thread_num - 1; i++) { @@ -1714,21 +1328,7 @@ TEST(EmbeddingVariableTest, TestInsertAndGetSnapshot) { int value_size = 10; Tensor value(DT_FLOAT, TensorShape({value_size})); test::FillValues(&value, std::vector(value_size, 10.0)); - auto emb_config = EmbeddingConfig( - /*emb_index = */0, /*primary_emb_index = */0, - /*block_num = */1, /*slot_num = */0, - /*name = */"", /*steps_to_live = */0, - /*filter_freq = */0, /*max_freq = */999999, - /*l2_weight_threshold = */-1.0, /*layout = */"normal", - /*max_element_size = */0, /*false_positive_probability = */-1.0, - /*counter_type = */DT_UINT64); - auto storage = embedding::StorageFactory::Create( - embedding::StorageConfig(), cpu_allocator(), "EmbeddingVar"); - auto var = new EmbeddingVar("EmbeddingVar", - storage, - emb_config, - cpu_allocator()); - var->Init(value, 1); + auto var = CreateEmbeddingVar(value_size, value, 1); float* set_value = (float*)malloc(value_size * sizeof(float)); //Insertion for (int i = 0; i < 100; i++) { diff --git a/tensorflow/core/kernels/embedding_variable_performance_test.cc b/tensorflow/core/kernels/embedding_variable_performance_test.cc index 9b01e35840b..16f4a894858 100644 --- a/tensorflow/core/kernels/embedding_variable_performance_test.cc +++ b/tensorflow/core/kernels/embedding_variable_performance_test.cc @@ -90,14 +90,21 @@ void GenerateSkewInput(int num_of_ids, float skew_factor, void thread_lookup_or_create( EmbeddingVar* ev, const int64* input_batch, + float* default_value, + int default_value_dim, float** outputs, int value_size, int start, int end) { - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; bool is_filter = false; for (int i = start; i < end; i++) { ev->LookupOrCreateKey(input_batch[i], &value_ptr, &is_filter, false); - auto val = ev->flat(value_ptr, input_batch[i]); - memcpy(outputs[i], &val(0), sizeof(float) * value_size); + if (is_filter) { + auto val = ev->flat(value_ptr); + memcpy(outputs[i], &val(0), sizeof(float) * value_size); + } else { + int default_value_index = input_batch[i] % default_value_dim; + memcpy(outputs[i], default_value + default_value_index * value_size, sizeof(float) * value_size); + } } } @@ -138,6 +145,8 @@ double PerfLookupOrCreate( for (int i = 0; i < num_thread; i++) { worker_threads[i] = std::thread(thread_lookup_or_create, ev, input_batches[k].data(), + default_value_matrix.data(), + default_value_dim, outputs.data(), value_size, thread_task_range[i].first, thread_task_range[i].second); @@ -201,11 +210,11 @@ void thread_lookup( const int64* input_batch, float** outputs, int value_size, int start, int end) { - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; bool is_filter = false; for (int i = start; i < end; i++) { ev->LookupKey(input_batch[i], &value_ptr); - auto val = ev->flat(value_ptr, input_batch[i]); + auto val = ev->flat(value_ptr); memcpy(outputs[i], &val(0), sizeof(float) * value_size); } } @@ -293,7 +302,7 @@ TEST(EmbeddingVariablePerformanceTest, TestLookup) { } } auto ev = CreateEmbeddingVar(value_size, default_value, default_value_dim); - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; bool is_filter = false; for (int i = 0; i < hot_ids_list.size(); i++) { ev->LookupOrCreateKey(hot_ids_list[i], &value_ptr, &is_filter, false); @@ -339,13 +348,13 @@ void PerfSave(Tensor& default_value, value_size, default_value, default_value_dim, 0, steps_to_live, l2_weight_threshold); - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; bool is_filter = false; srand((unsigned)time(NULL)); for (int i = 0; i < id_list.size(); i++) { ev->LookupOrCreateKey(id_list[i], &value_ptr, &is_filter, false); - ev->flat(value_ptr, id_list[i]); + ev->flat(value_ptr); int64 global_step = rand() % 100; ev->UpdateVersion(value_ptr, global_step); } diff --git a/tensorflow/core/kernels/embedding_variable_test.h b/tensorflow/core/kernels/embedding_variable_test.h index d06304fb78a..07c34764fb0 100644 --- a/tensorflow/core/kernels/embedding_variable_test.h +++ b/tensorflow/core/kernels/embedding_variable_test.h @@ -107,35 +107,42 @@ EmbeddingVar* CreateEmbeddingVar( int value_size, Tensor& default_value, int64 default_value_dim, int64 filter_freq = 0, int64 steps_to_live = 0, - float l2_weight_threshold=-1.0) { - std::string layout_type = "light"; - if (filter_freq != 0) { - layout_type = "normal"; - } - - if (steps_to_live != 0) { - if (layout_type == "light") { - layout_type = "normal_contiguous"; - } - } + float l2_weight_threshold=-1.0, + embedding::StorageType storage_type = embedding::StorageType::DRAM, + std::vector storage_size = {1024*1024*1024, + 1024*1024*1024, + 1024*1024*1024, + 1024*1024*1024}, + bool record_freq = false, + int64 max_element_size = 0, + float false_positive_probability = -1.0, + DataType counter_type = DT_UINT64) { auto embedding_config = EmbeddingConfig( - 0, 0, 1, 0, "emb_var", steps_to_live, - filter_freq, 999999, l2_weight_threshold, layout_type, - 0, -1.0, DT_UINT64, default_value_dim, - 0.0, false, false, false); + 0, 0, 1, 0, "emb_var", steps_to_live, + filter_freq, 999999, l2_weight_threshold, + max_element_size, false_positive_probability, + counter_type, default_value_dim, + 0.0, record_freq, false, false); + auto feat_desc = new embedding::FeatureDescriptor( + 1, 1, ev_allocator(), storage_type, + record_freq, + embedding_config.is_save_version(), + {embedding_config.is_counter_filter(), filter_freq}); auto storage = embedding::StorageFactory::Create( embedding::StorageConfig( - embedding::StorageType::DRAM, "", - {1024, 1024, 1024, 1024}, layout_type, + storage_type, "", + storage_size, embedding_config), cpu_allocator(), + feat_desc, "emb_var"); auto ev = new EmbeddingVar( "emb_var", storage, embedding_config, - cpu_allocator()); + cpu_allocator(), + feat_desc); ev->Init(default_value, default_value_dim); return ev; } diff --git a/tensorflow/core/kernels/group_embedding/group_embedding_lookup_ops_test.cc b/tensorflow/core/kernels/group_embedding/group_embedding_lookup_ops_test.cc index 55dd40176a8..2f07e2ef537 100644 --- a/tensorflow/core/kernels/group_embedding/group_embedding_lookup_ops_test.cc +++ b/tensorflow/core/kernels/group_embedding/group_embedding_lookup_ops_test.cc @@ -774,7 +774,7 @@ class GroupEmbeddingVariableForWardOpTest : public OpsTestBase { embedding_var->Init(value, 1); for (int64 j = 0; j < nnz; ++j) { - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; Status s = embedding_var->LookupOrCreateKey(sp_values_vec[j], &value_ptr); typename TTypes::Flat vflat = embedding_var->flat(value_ptr); @@ -958,7 +958,7 @@ class GroupEmbeddingVariableBackWardOpTest : public OpsTestBase { embedding_var->Init(value, 1); for (int64 j = 0; j < nnz; ++j) { - ValuePtr* value_ptr = nullptr; + void* value_ptr = nullptr; Status s = embedding_var->LookupOrCreateKey(sp_values_vec[j], &value_ptr); typename TTypes::Flat vflat = embedding_var->flat(value_ptr); diff --git a/tensorflow/core/kernels/incr_save_restore_ops.h b/tensorflow/core/kernels/incr_save_restore_ops.h index 0582697ad16..d84838ae413 100644 --- a/tensorflow/core/kernels/incr_save_restore_ops.h +++ b/tensorflow/core/kernels/incr_save_restore_ops.h @@ -225,9 +225,9 @@ class IncrEVValueDumpIterator : public DumpIterator { keys_idx_++; col_idx_ = 0; } - ValuePtr* value_ptr = NULL; + void* value_ptr = NULL; TF_CHECK_OK(emb_var_->LookupOrCreateKey(*keys_iter_, &value_ptr)); - return emb_var_->flat(value_ptr, *keys_iter_)(col_idx_++); + return emb_var_->flat(value_ptr)(col_idx_++); } private: diff --git a/tensorflow/core/kernels/kv_variable_lookup_ops.cc b/tensorflow/core/kernels/kv_variable_lookup_ops.cc index c69aec8ebb9..7e40dfff7ac 100644 --- a/tensorflow/core/kernels/kv_variable_lookup_ops.cc +++ b/tensorflow/core/kernels/kv_variable_lookup_ops.cc @@ -121,7 +121,7 @@ class KvResourceLookupIDOp : public OpKernel { const int64 indices_size = static_cast(indices_flat.dimension(0)); EmbeddingVarContext ev_ctx(c); ev->GetOrCreateKey(ev_ctx, indices, - reinterpret_cast**>(out_base), + reinterpret_cast(out_base), indices_size); } } @@ -203,7 +203,7 @@ class KvResourceCollectEmbeddingOp : public OpKernel { const size_t slice_bytes = slice_elems * sizeof(TValue); EmbeddingVarContext ev_ctx(c); ev->GatherEmbeddings(ev_ctx, indices, - (ValuePtr**)pointer.data(), + (void**)pointer.data(), out_base, N); } } diff --git a/tensorflow/core/kernels/kv_variable_ops.h b/tensorflow/core/kernels/kv_variable_ops.h index 8e3572443ba..3202e6d12bf 100644 --- a/tensorflow/core/kernels/kv_variable_ops.h +++ b/tensorflow/core/kernels/kv_variable_ops.h @@ -18,6 +18,7 @@ limitations under the License. #include "tensorflow/core/framework/allocator.h" #include "tensorflow/core/framework/bounds_check.h" +#include "tensorflow/core/framework/embedding/cache_factory.h" #include "tensorflow/core/framework/embedding/embedding_var.h" #include "tensorflow/core/framework/embedding/kv_interface.h" #include "tensorflow/core/framework/op_kernel.h" diff --git a/tensorflow/core/kernels/save_restore_tensor.h b/tensorflow/core/kernels/save_restore_tensor.h index 4f69ebe3fb5..da58e17e1bb 100644 --- a/tensorflow/core/kernels/save_restore_tensor.h +++ b/tensorflow/core/kernels/save_restore_tensor.h @@ -23,7 +23,6 @@ limitations under the License. #include "tensorflow/core/framework/hash_table/hash_table.h" #include "tensorflow/core/framework/hash_table/bloom_filter_strategy.h" #include "tensorflow/core/framework/embedding/kv_interface.h" -#include "tensorflow/core/framework/embedding/value_ptr.h" namespace tensorflow {