Skip to content

Commit

Permalink
[TMP] StorageMergeTreeWrapper::saveMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Sep 2, 2024
1 parent fa2471a commit 190e062
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 23 deletions.
20 changes: 6 additions & 14 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
extractPartitionValues(write_settings.partition_settings.partition_dir, partition_values);
}

bool SparkMergeTreeWriter::useLocalStorage() const
{
return dataWrapper->useLocalStorage();
}

void SparkMergeTreeWriter::write(const DB::Block & block)
{
auto new_block = removeColumnSuffix(block);
Expand Down Expand Up @@ -155,17 +150,17 @@ void SparkMergeTreeWriter::finalize()
finalizeMerge();

dataWrapper->commitPartToRemoteStorageIfNeeded(new_parts.unsafeGet(), context->getReadSettings(), context->getWriteSettings());
saveMetadata();
dataWrapper->saveMetadata(new_parts.unsafeGet(), context);
}

void SparkMergeTreeWriter::saveMetadata()
void StorageMergeTreeWrapper::saveMetadata(const std::deque<DB::MergeTreeDataPartPtr> & parts, const DB::ContextPtr & context)
{
if (!dataWrapper->isRemoteStorage)
if (!isRemoteStorage)
return;

for (const auto & merge_tree_data_part : new_parts.unsafeGet())
for (const auto & merge_tree_data_part : parts)
{
auto part = dataWrapper->dest_storage().loadDataPartsWithNames({merge_tree_data_part->name});
auto part = dest_storage().loadDataPartsWithNames({merge_tree_data_part->name});
if (part.empty())
{
LOG_WARNING(
Expand All @@ -176,10 +171,7 @@ void SparkMergeTreeWriter::saveMetadata()
}

saveFileStatus(
dataWrapper->dest_storage(),
context,
merge_tree_data_part->name,
const_cast<IDataPartStorage &>(part.at(0)->getDataPartStorage()));
dest_storage(), context, merge_tree_data_part->name, const_cast<IDataPartStorage &>(part.at(0)->getDataPartStorage()));
}
}

Expand Down
13 changes: 4 additions & 9 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,21 @@ class StorageMergeTreeWrapper
protected:
CustomStorageMergeTreePtr merge_tree = nullptr;
bool isRemoteStorage;
bool localStorageFirst;
DB::StorageMetadataPtr metadata_snapshot;
DB::Block header;

public:
virtual ~StorageMergeTreeWrapper() = default;
explicit StorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, bool isRemoteStorage_, bool useLocalStorage)
explicit StorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, bool isRemoteStorage_)
: merge_tree(data_)
, isRemoteStorage(isRemoteStorage_)
, localStorageFirst(useLocalStorage)
, metadata_snapshot(merge_tree->getInMemoryMetadataPtr())
, header(metadata_snapshot->getSampleBlock())
{
}
static StorageMergeTreeWrapperPtr
create(const MergeTreeTable & merge_tree_table, bool insert_with_local_storage, const DB::ContextMutablePtr & context);

bool useLocalStorage() const { return localStorageFirst; };

virtual CustomStorageMergeTree & dest_storage() { return *merge_tree; }
virtual CustomStorageMergeTreePtr temp_storage() { return nullptr; }
CustomStorageMergeTreePtr data() const { return merge_tree; }
Expand All @@ -87,13 +83,14 @@ class StorageMergeTreeWrapper
const std::deque<DB::MergeTreeDataPartPtr> & parts, const ReadSettings & read_settings, const WriteSettings & write_settings)
{
}
void saveMetadata(const std::deque<DB::MergeTreeDataPartPtr> & parts, const DB::ContextPtr & context);
};

class DirectStorageMergeTreeWrapper : public StorageMergeTreeWrapper
{
public:
explicit DirectStorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, bool isRemoteStorage_)
: StorageMergeTreeWrapper(data_, isRemoteStorage_, false)
: StorageMergeTreeWrapper(data_, isRemoteStorage_)
{
}
};
Expand All @@ -104,7 +101,7 @@ class CopyToRemoteStorageMergeTreeWrapper : public StorageMergeTreeWrapper

public:
explicit CopyToRemoteStorageMergeTreeWrapper(const CustomStorageMergeTreePtr & data_, const CustomStorageMergeTreePtr & org_)
: StorageMergeTreeWrapper(data_, true, true), org_storage(org_)
: StorageMergeTreeWrapper(data_, true), org_storage(org_)
{
assert(merge_tree != org_storage);
}
Expand Down Expand Up @@ -135,11 +132,9 @@ class SparkMergeTreeWriter
void checkAndMerge(bool force = false);
void safeEmplaceBackPart(DB::MergeTreeDataPartPtr);
void safeAddPart(DB::MergeTreeDataPartPtr);
void saveMetadata();
void finalizeMerge();
bool chunkToPart(Chunk && plan_chunk);
bool blockToPart(Block & block);
bool useLocalStorage() const;

const GlutenMergeTreeWriteSettings write_settings;
DB::ContextPtr context;
Expand Down

0 comments on commit 190e062

Please sign in to comment.