From 661adb324748180c99d1ff605b94c1e8203f2aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=BC=80=E5=AE=87?= Date: Mon, 21 Oct 2024 08:08:19 +0000 Subject: [PATCH] Merge 'fky@cnch-dev@fix@parts_info' into 'cnch-dev' fix(clickhousech@m-5358052205): precise rows count for parts_info recalculation. See merge request: !25737 --- src/Catalog/Catalog.cpp | 102 +++----- src/Catalog/Catalog.h | 11 +- src/Catalog/DataModelPartWrapper.h | 2 + src/CloudServices/CnchPartsHelper.cpp | 32 ++- src/CloudServices/CnchPartsHelper.h | 6 +- .../tests/gtest_calc_visibility.cpp | 241 ++++++++++++++++++ src/Protos/data_models.proto | 2 + src/Storages/CnchTablePartitionMetrics.cpp | 101 ++++++++ src/Storages/CnchTablePartitionMetrics.h | 21 +- src/Storages/PartCacheManager.cpp | 30 +++ .../System/StorageSystemCnchPartsInfo.cpp | 2 + .../StorageSystemCnchPartsInfoLocal.cpp | 14 + .../50010_parts_info.reference | 16 ++ .../4_cnch_stateless/50010_parts_info.sql | 167 ++++++++++++ ...0011_parts_info_for_unique_table.reference | 16 ++ .../50011_parts_info_for_unique_table.sql | 169 ++++++++++++ ...arts_info_last_modification_time.reference | 11 + ...0011_parts_info_last_modification_time.sql | 133 ++++++++++ .../50013_parts_info_with_drop_table.sql | 6 +- ...14_parts_info_with_dropped_parts.reference | 16 ++ .../50014_parts_info_with_dropped_parts.sql | 184 +++++++++++++ tests/queries/skip_list.json | 8 +- 22 files changed, 1205 insertions(+), 85 deletions(-) create mode 100644 src/CloudServices/tests/gtest_calc_visibility.cpp create mode 100644 tests/queries/4_cnch_stateless/50010_parts_info.reference create mode 100644 tests/queries/4_cnch_stateless/50010_parts_info.sql create mode 100644 tests/queries/4_cnch_stateless/50011_parts_info_for_unique_table.reference create mode 100644 tests/queries/4_cnch_stateless/50011_parts_info_for_unique_table.sql create mode 100644 tests/queries/4_cnch_stateless/50011_parts_info_last_modification_time.reference create mode 100644 tests/queries/4_cnch_stateless/50011_parts_info_last_modification_time.sql create mode 100644 tests/queries/4_cnch_stateless/50014_parts_info_with_dropped_parts.reference create mode 100644 tests/queries/4_cnch_stateless/50014_parts_info_with_dropped_parts.sql diff --git a/src/Catalog/Catalog.cpp b/src/Catalog/Catalog.cpp index 8b19902e66..396a6feed2 100644 --- a/src/Catalog/Catalog.cpp +++ b/src/Catalog/Catalog.cpp @@ -1873,10 +1873,12 @@ namespace Catalog const TxnTimestamp & ts, const Context * session_context, const VisibilityLevel visibility, - const std::set & bucket_numbers) + const std::set & bucket_numbers, + const bool disable_cache) { ServerDataPartsWithDBM res; - res.first = getServerDataPartsInPartitions(storage, partitions, ts, session_context, VisibilityLevel::All, bucket_numbers); + res.first + = getServerDataPartsInPartitions(storage, partitions, ts, session_context, VisibilityLevel::All, bucket_numbers, disable_cache); if (res.first.empty()) return res; @@ -1884,7 +1886,13 @@ namespace Catalog bool is_unique_table = storage->getInMemoryMetadataPtr()->hasUniqueKey(); if (is_unique_table) res.second = getDeleteBitmapsInPartitions( - storage, {partitions.begin(), partitions.end()}, ts, /*session_context=*/nullptr, VisibilityLevel::All, bucket_numbers); + storage, + {partitions.begin(), partitions.end()}, + ts, + /*session_context=*/nullptr, + VisibilityLevel::All, + bucket_numbers, + disable_cache); /// Make sure they use the same records of transactions list. if (ts && visibility != VisibilityLevel::All) @@ -1933,7 +1941,8 @@ namespace Catalog const TxnTimestamp & ts, const Context * session_context, const VisibilityLevel visibility, - const std::set & bucket_numbers) + const std::set & bucket_numbers, + const bool disable_cache) { ServerDataPartsVector res; String source; @@ -1973,7 +1982,7 @@ namespace Catalog context.getServerType() == ServerType::cnch_server && isLocalServer(host_with_rpc, std::to_string(context.getRPCPort()))) { - bool can_use_cache = canUseCache(storage, session_context); + bool can_use_cache = canUseCache(storage, session_context, disable_cache); if (!can_use_cache) { @@ -2090,7 +2099,8 @@ namespace Catalog const TxnTimestamp & ts, const Context * session_context, const VisibilityLevel visibility, - const std::set & bucket_numbers) + const std::set & bucket_numbers, + const bool disable_cache) { DeleteBitmapMetaPtrVector res; String source; @@ -2133,7 +2143,7 @@ namespace Catalog context.getServerType() == ServerType::cnch_server && isLocalServer(host_with_rpc, std::to_string(context.getRPCPort()))) { - bool can_use_cache = canUseCache(storage, session_context); + bool can_use_cache = canUseCache(storage, session_context, disable_cache); can_use_cache &= !context.getConfigRef().getBool("disable_delete_bitmap_cache", false); if (!can_use_cache) @@ -2723,8 +2733,10 @@ namespace Catalog meta_proxy->setNonHostUpdateTimeStamp(name_space, UUIDHelpers::UUIDToString(storage->getStorageID().uuid), current_pts); } - bool Catalog::canUseCache(const ConstStoragePtr & storage, const Context * session_context) + bool Catalog::canUseCache(const ConstStoragePtr & storage, const Context * session_context, const bool disable_cache) { + if (disable_cache) + return false; if (!context.getPartCacheManager()) return false; if (context.getSettingsRef().server_write_ha) @@ -5650,42 +5662,9 @@ namespace Catalog } PartitionMetrics::PartitionMetricsStore Catalog::getPartitionMetricsStoreFromMetastore( - const String & table_uuid, const String & partition_id, size_t max_commit_time, std::function need_abort) + const String & table_uuid, const String & partition_id, size_t max_commit_time, std::function) { - auto calculate_metrics_by_partition = [&](ServerDataPartsVector & parts) { - PartitionMetricsStorePtr res = std::make_shared(); - - for (auto & part : parts) - { - if (unlikely(need_abort())) - { - LOG_WARNING(log, "getPartitionMetricsStoreFromMetastore is aborted by caller."); - break; - } - - /// For those blocks only have deleted part, just ignore them because the covered part may be already removed by GC. - /// But we should still calculate it's `last_modification_time`. - res->updateLastModificationTime(part->part_model()); - } - - std::sort(parts.begin(), parts.end(), CnchPartsHelper::PartComparator{}); - auto visible_parts = CnchPartsHelper::calcVisibleParts(parts, false); - - for (auto & part : visible_parts) - { - if (unlikely(need_abort())) - { - LOG_WARNING(log, "getPartitionMetricsStoreFromMetastore is aborted by caller."); - break; - } - - res->update(part->part_model()); - } - - return res; - }; - PartitionMetrics::PartitionMetricsStore ret; runWithMetricSupport( [&] { @@ -5697,36 +5676,25 @@ namespace Catalog /// Get latest table version. StoragePtr storage = getTableByUUID(context, table_uuid, max_commit_time); - const auto & merge_tree_storage = dynamic_cast(*storage); + const auto & merge_tree_storage = dynamic_cast(*storage); - IMetaStore::IteratorPtr it = meta_proxy->getPartsInRange(name_space, table_uuid, partition_id); + /// Do not use cached parts, because this is not a user query. + ServerDataPartsWithDBM parts_with_dbm = getServerDataPartsInPartitionsWithDBM( + storage, {partition_id}, max_commit_time, nullptr, VisibilityLevel::Committed, {}, true); + LOG_TRACE( + log, + "getPartitionMetricsStoreFromMetastore for table {} partition {} get parts: {}, bitmaps: {}", + table_uuid, + partition_id, + parts_with_dbm.first.size(), + parts_with_dbm.second.size()); - ServerDataPartsVector parts; - while (it->next()) + if (parts_with_dbm.first.empty()) { - if (unlikely(need_abort())) - { - LOG_WARNING(log, "getPartitionMetricsStoreFromMetastore is aborted by caller."); - break; - } - Protos::DataModelPart part_model; - part_model.ParseFromString(it->value()); - - /// Skip the Uncommitted parts or the parts that - /// cannot be seen by the time `max_commit_time`. - if (part_model.commit_time() == 0 || part_model.commit_time() > max_commit_time) - { - LOG_TRACE(log, "Skip parts: {}, max_commit_time: {}", part_model.ShortDebugString(), max_commit_time); - continue; - } - - parts.emplace_back(std::make_shared(createPartWrapperFromModel(merge_tree_storage, std::move(part_model)))); + return; } - if (!parts.empty()) - { - ret = *calculate_metrics_by_partition(parts); - } + ret = PartitionMetrics::PartitionMetricsStore(parts_with_dbm, merge_tree_storage); }, ProfileEvents::GetPartitionMetricsFromMetastoreSuccess, ProfileEvents::GetPartitionMetricsFromMetastoreFailed); diff --git a/src/Catalog/Catalog.h b/src/Catalog/Catalog.h index 27d42ba947..70deb2dfdd 100644 --- a/src/Catalog/Catalog.h +++ b/src/Catalog/Catalog.h @@ -313,7 +313,8 @@ public: const TxnTimestamp & ts, const Context * session_context, VisibilityLevel visibility = VisibilityLevel::Visible, - const std::set & bucket_numbers = {}); + const std::set & bucket_numbers = {}, + bool disable_cache = false); /// @param bucket_numbers If empty fetch all bucket_numbers, otherwise fetch the given bucket_numbers. ServerDataPartsVector getServerDataPartsInPartitions( @@ -322,7 +323,8 @@ public: const TxnTimestamp & ts, const Context * session_context, VisibilityLevel visibility = VisibilityLevel::Visible, - const std::set & bucket_numbers = {}); + const std::set & bucket_numbers = {}, + bool disable_cache = false); ServerDataPartsWithDBM getTrashedPartsInPartitionsWithDBM(const ConstStoragePtr & storage, const Strings & partitions, const TxnTimestamp & ts); @@ -354,7 +356,8 @@ public: const TxnTimestamp & ts, const Context * session_context = nullptr, VisibilityLevel visibility = VisibilityLevel::Visible, - const std::set & bucket_numbers = {}); + const std::set & bucket_numbers = {}, + bool disable_cache = false); DeleteBitmapMetaPtrVector getDeleteBitmapsInPartitionsFromMetastore( const ConstStoragePtr & storage, const Strings & partitions, const TxnTimestamp & ts, VisibilityLevel visibility = VisibilityLevel::Visible); DeleteBitmapMetaPtrVector getTrashedDeleteBitmapsInPartitions( @@ -1016,7 +1019,7 @@ private: void mayUpdateUHUT(const StoragePtr & storage); - bool canUseCache(const ConstStoragePtr & storage, const Context * session_context); + bool canUseCache(const ConstStoragePtr & storage, const Context * session_context, bool disable_cache); void finishCommitInBatch( const StoragePtr & storage, diff --git a/src/Catalog/DataModelPartWrapper.h b/src/Catalog/DataModelPartWrapper.h index b012951082..937ef84ebf 100644 --- a/src/Catalog/DataModelPartWrapper.h +++ b/src/Catalog/DataModelPartWrapper.h @@ -76,6 +76,8 @@ class DataPartInterface { public: virtual bool isServerDataPart() const = 0; + DataPartInterface() = default; + DataPartInterface(const DataPartInterface &) = default; virtual ~DataPartInterface() = default; }; diff --git a/src/CloudServices/CnchPartsHelper.cpp b/src/CloudServices/CnchPartsHelper.cpp index 7205a30eda..8632a38766 100644 --- a/src/CloudServices/CnchPartsHelper.cpp +++ b/src/CloudServices/CnchPartsHelper.cpp @@ -245,7 +245,7 @@ namespace if (Operation::isRangeTombstone(prev)) { /// Sort will place range tombstones consecutively at the beginning of each partition. - /// We'll record theri boundaries during iteration and process them when reaching partition end + /// We'll record their boundaries during iteration and process them when reaching partition end if (range_tombstone_beg_it == end) range_tombstone_beg_it = prev_it; range_tombstone_end_it = std::next(prev_it); @@ -336,7 +336,8 @@ namespace bool skip_drop_ranges, Vec * visible_alone_drop_ranges, Vec * invisible_dropped_parts, - LoggingOption logging) + LoggingOption logging, + Vec * invisible_parts = nullptr) { using Part = typename Vec::value_type; @@ -348,7 +349,10 @@ namespace if (all_parts.size() == 1) { if (skip_drop_ranges && all_parts.front()->get_deleted()) - ; /// do nothing + { + if (invisible_parts) + *invisible_parts = all_parts; + } else visible_parts = all_parts; @@ -379,6 +383,9 @@ namespace /// i) curr_part is also a DROP RANGE mark, and must be the bigger one if ((*curr_it)->get_info().level == MergeTreePartInfo::MAX_LEVEL) { + if (invisible_parts) + invisible_parts->push_back(*prev_it); + if (invisible_dropped_parts) invisible_dropped_parts->push_back(*prev_it); @@ -395,6 +402,9 @@ namespace /// ii) curr_part is marked as dropped by prev_part else if ((*curr_it)->get_info().max_block <= prev_part->get_info().max_block) { + if (invisible_parts) + invisible_parts->push_back(*curr_it); + if (invisible_dropped_parts) invisible_dropped_parts->push_back(*curr_it); @@ -412,7 +422,10 @@ namespace /// c) different partition if (skip_drop_ranges) - ; /// do nothing + { + if (invisible_parts) + invisible_parts->push_back(prev_part); + } else visible_parts_.push_back(prev_part); @@ -430,7 +443,11 @@ namespace else { if (skip_drop_ranges && prev_part->get_deleted()) - ; /// do nothing + { + /// do nothing + if (invisible_parts) + invisible_parts->push_back(prev_part); + } else visible_parts_.push_back(prev_part); @@ -550,9 +567,10 @@ MergeTreeDataPartsVector calcVisibleParts(MergeTreeDataPartsVector & all_parts, return calcVisiblePartsImpl(all_parts, flatten, /* skip_drop_ranges */ true, nullptr, nullptr, logging); } -ServerDataPartsVector calcVisibleParts(ServerDataPartsVector & all_parts, bool flatten, LoggingOption logging) +ServerDataPartsVector calcVisibleParts(ServerDataPartsVector & all_parts, bool flatten, LoggingOption logging, ServerDataPartsVector * invisible_parts) { - return calcVisiblePartsImpl(all_parts, flatten, /* skip_drop_ranges */ true, nullptr, nullptr, logging); + return calcVisiblePartsImpl( + all_parts, flatten, /* skip_drop_ranges */ true, nullptr, nullptr, logging, invisible_parts); } MergeTreeDataPartsCNCHVector calcVisibleParts(MergeTreeDataPartsCNCHVector & all_parts, bool flatten, LoggingOption logging) diff --git a/src/CloudServices/CnchPartsHelper.h b/src/CloudServices/CnchPartsHelper.h index 45d2244d6a..721199b295 100644 --- a/src/CloudServices/CnchPartsHelper.h +++ b/src/CloudServices/CnchPartsHelper.h @@ -114,7 +114,11 @@ IMergeTreeDataPartsVector toIMergeTreeDataPartsVector(const MergeTreeDataPartsCN MergeTreeDataPartsCNCHVector toMergeTreeDataPartsCNCHVector(const IMergeTreeDataPartsVector & vec); MergeTreeDataPartsVector calcVisibleParts(MergeTreeDataPartsVector & all_parts, bool flatten, LoggingOption logging = DisableLogging); -ServerDataPartsVector calcVisibleParts(ServerDataPartsVector & all_parts, bool flatten, LoggingOption logging = DisableLogging); +ServerDataPartsVector calcVisibleParts( + ServerDataPartsVector & all_parts, + bool flatten, + LoggingOption logging = DisableLogging, + ServerDataPartsVector * invisible_parts = nullptr); MergeTreeDataPartsCNCHVector calcVisibleParts(MergeTreeDataPartsCNCHVector & all_parts, bool flatten, LoggingOption logging = DisableLogging); IMergeTreeDataPartsVector calcVisibleParts(IMergeTreeDataPartsVector& all_parts, bool collect_on_chain, bool skip_drop_ranges, IMergeTreeDataPartsVector* visible_alone_drop_ranges, diff --git a/src/CloudServices/tests/gtest_calc_visibility.cpp b/src/CloudServices/tests/gtest_calc_visibility.cpp new file mode 100644 index 0000000000..0984ea0f1c --- /dev/null +++ b/src/CloudServices/tests/gtest_calc_visibility.cpp @@ -0,0 +1,241 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace GTEST_Parts_Helper { + +class CalcVisibility: public ::testing::Test +{ +protected: + static void SetUpTestSuite() + { + tryRegisterStorages(); + tryRegisterDisks(); + getContext().resetStoragePolicy(); + } +}; + +using namespace DB; + +DataModelPartPtr +createPart(const String & partition_id, UInt64 min_block, UInt64 max_block, UInt64 level, UInt64 hint_mutation = 0) +{ + DataModelPartPtr part_model = std::make_shared(); + Protos::DataModelPartInfo * info_model = part_model->mutable_part_info(); + + info_model->set_partition_id(partition_id); + info_model->set_min_block(min_block); + info_model->set_max_block(max_block); + info_model->set_level(level); + info_model->set_hint_mutation(hint_mutation); + + part_model->set_rows_count(0); + part_model->set_partition_minmax("xxxx"); + part_model->set_marks_count(0); + part_model->set_size(818); + + return part_model; +} + +DB::ServerDataPartPtr createServerDataPart( + StoragePtr storage, const String & partition_id, size_t min_block, size_t max_block, size_t level, bool deleted, size_t commit_time, size_t hint_mutation) +{ + auto part_model = createPart(partition_id, min_block, max_block, level, hint_mutation); + part_model->set_deleted(deleted); + part_model->set_commit_time(commit_time); + + const auto & merge_tree = dynamic_cast(*storage); + + auto part = createPartWrapperFromModel(merge_tree, Protos::DataModelPart(*part_model)); + part->part_model->set_commit_time(commit_time); + auto ret = std::make_shared(DB::ServerDataPart(part)); + return ret; +} + +void checkParts(ServerDataPartsVector parts, ServerDataPartsVector expected) +{ + sort(parts.begin(), parts.end(), [](const auto & lhs, const auto & rhs) { return lhs->name() < rhs->name(); }); + sort(expected.begin(), expected.end(), [](const auto & lhs, const auto & rhs) { return lhs->name() < rhs->name(); }); + if (parts.size() != expected.size()) + { + std::cout << "given: " << std::endl; + + for (const auto & part : parts) + { + std::cout << part->name() << " deleted? " << toString(part->deleted()) << " commit_time: " << toString(part->get_commit_time()) + << " previous? " << part->get_info().hint_mutation << std::endl; + } + + std::cout << "expected: " << std::endl; + + for (const auto & part : expected) + { + std::cout << part->name() << " deleted? " << toString(part->deleted()) << std::endl; + } + } + + EXPECT_EQ(parts.size(), expected.size()); + for (size_t i = 0; i < parts.size(); i++) { + EXPECT_EQ(parts[i]->name(), expected[i]->name()); + } +} + + +void flattenParts(ServerDataPartsVector & parts) { + for (int i = parts.size() - 1; i >= 0; i--) + { + auto cur = parts[i]; + while ((cur = cur->tryGetPreviousPart())) + { + parts.push_back(cur); + } + } + + sort(parts.begin(), parts.end(), [](const auto & lhs, const auto & rhs) { return lhs->name() < rhs->name(); }); +} + +TEST_F(CalcVisibility, Basic) +{ + String query = "create table db.test UUID '61f0c404-5cb3-11e7-907b-a6006ad3dba0' (id Int32) ENGINE=CnchMergeTree order by id"; + StoragePtr storage = DB::createStorageFromQuery(query, getContext().context); + + auto merge_tree_meta_base = std::dynamic_pointer_cast(storage); + ASSERT_TRUE(merge_tree_meta_base != nullptr); + + auto p = [&](String partition_id, + size_t min_block, + size_t max_block, + size_t level, + bool deleted, + size_t commit_time, + size_t hint_mutation = 0) { + return createServerDataPart(storage, partition_id, min_block, max_block, level, deleted, commit_time, hint_mutation); + }; + + { + std::cout << "parts" << std::endl; + // P <- Partial + // P + // P + ServerDataPartsVector origin = { + p("20230101", 1, 1, 0, false, 111), + /// Partial part has higher level and commit_time. + p("20230101", 1, 1, 1, false, 123, 111), + p("20230101", 2, 2, 0, false, 222), + p("20240101", 1, 1, 0, false, 123), + }; + + + ServerDataPartsVector invisible; + ServerDataPartsVector visible; + visible = calcVisibleParts(origin, false, CnchPartsHelper::LoggingOption::DisableLogging, &invisible); + + flattenParts(visible); + flattenParts(invisible); + + checkParts(visible, origin); + checkParts(invisible, {}); + } + + { + std::cout << "drop range" << std::endl; + // P ◄─┬─ DropRange + // P ◄─┤ + // P ◄─┘ + // P + // --- + // DropRange + ServerDataPartsVector origin = { + p("20230101", 1, 1, 0, false, 112), + p("20230101", 2, 2, 0, false, 222), + p("20230101", 4, 8, 3, false, 234), + p("20230101", 0, 10, MergeTreePartInfo::MAX_LEVEL, true, 235), + p("20230101", 11, 11, 1, false, 236), + p("20240101", 0, 10, MergeTreePartInfo::MAX_LEVEL, true, 235), + }; + + ServerDataPartsVector invisible; + ServerDataPartsVector visible; + visible = calcVisibleParts(origin, false, CnchPartsHelper::LoggingOption::DisableLogging, &invisible); + + flattenParts(visible); + flattenParts(invisible); + + checkParts(visible, { + p("20230101", 11, 11, 1, false, 235), + }); + checkParts(invisible, { + p("20230101", 1, 1, 0, false, 112), + p("20230101", 2, 2, 0, false, 222), + p("20230101", 4, 8, 3, false, 234), + p("20230101", 0, 10, MergeTreePartInfo::MAX_LEVEL, true, 235), + p("20240101", 0, 10, MergeTreePartInfo::MAX_LEVEL, true, 235), + }); + } + + { + std::cout << "dropped part" << std::endl; + // P ◄─ Dropped + // P ◄─ Dropped + ServerDataPartsVector origin = { + p("20230101", 1, 1, 0, false, 111), + p("20230101", 1, 1, 1, true, 222), + p("20230101", 2, 2, 0, false, 111), + p("20230101", 2, 2, 1, true, 222), + }; + + ServerDataPartsVector invisible; + ServerDataPartsVector visible; + visible = calcVisibleParts(origin, false, CnchPartsHelper::LoggingOption::DisableLogging, &invisible); + + flattenParts(invisible); + flattenParts(visible); + + checkParts(visible, {}); + checkParts(invisible, origin); + } + + + { + std::cout << "dropped part with merge" << std::endl; + // P ◄─ Dropped ◄─┬─ P + // P ◄─ Dropped ◄─┘ + ServerDataPartsVector origin = { + p("20230101", 1, 1, 0, false, 111), + p("20230101", 1, 1, 1, true, 222), + p("20230101", 2, 2, 0, false, 111), + p("20230101", 2, 2, 1, true, 222), + p("20230101", 1, 2, 1, false, 222), + }; + + ServerDataPartsVector invisible; + ServerDataPartsVector visible; + visible = calcVisibleParts(origin, false, CnchPartsHelper::LoggingOption::DisableLogging, &invisible); + + flattenParts(visible); + flattenParts(invisible); + checkParts( + visible, + { + p( "20230101", 1, 2, 1, false, 222), + }); + checkParts( + invisible, + { + p("20230101", 1, 1, 0, false, 111), + p("20230101", 1, 1, 1, true, 222), + p("20230101", 2, 2, 0, false, 111), + p("20230101", 2, 2, 1, true, 222), + }); + } + +} + +} diff --git a/src/Protos/data_models.proto b/src/Protos/data_models.proto index b91d1e0429..301e0c7fcd 100644 --- a/src/Protos/data_models.proto +++ b/src/Protos/data_models.proto @@ -777,6 +777,8 @@ message PartitionPartsMetricsSnapshot { required uint64 last_update_time = 8; required uint64 last_snapshot_time = 9; optional uint64 last_modification_time = 10; + optional int64 dropped_parts_size = 11; + optional int64 dropped_parts_number = 12; } // Key: diff --git a/src/Storages/CnchTablePartitionMetrics.cpp b/src/Storages/CnchTablePartitionMetrics.cpp index 600d4370e4..0afeea28c8 100644 --- a/src/Storages/CnchTablePartitionMetrics.cpp +++ b/src/Storages/CnchTablePartitionMetrics.cpp @@ -1,7 +1,9 @@ #include #include #include +#include #include +#include namespace DB { @@ -529,6 +531,8 @@ PartitionMetrics::PartitionMetricsStore::PartitionMetricsStore(const Protos::Par total_parts_size = snapshot.total_parts_size(); total_rows_count = snapshot.total_rows_count(); total_parts_number = snapshot.total_parts_number(); + dropped_parts_size = snapshot.dropped_parts_size(); + dropped_parts_number = snapshot.dropped_parts_number(); has_bucket_number_neg_one = snapshot.hash_bucket_number_neg_one(); is_single_table_definition_hash = snapshot.is_single_table_definition_hash(); table_definition_hash = snapshot.table_definition_hash(); @@ -543,6 +547,8 @@ Protos::PartitionPartsMetricsSnapshot PartitionMetrics::PartitionMetricsStore::t res.set_total_parts_size(total_parts_size); res.set_total_rows_count(total_rows_count); res.set_total_parts_number(total_parts_number); + res.set_dropped_parts_size(dropped_parts_size); + res.set_dropped_parts_number(dropped_parts_number); res.set_hash_bucket_number_neg_one(has_bucket_number_neg_one); res.set_is_single_table_definition_hash(is_single_table_definition_hash); res.set_table_definition_hash(table_definition_hash); @@ -558,6 +564,8 @@ PartitionMetrics::PartitionMetricsStore PartitionMetrics::PartitionMetricsStore: res.total_parts_size = this->total_parts_size + rhs.total_parts_size; res.total_rows_count = this->total_rows_count + rhs.total_rows_count; res.total_parts_number = this->total_parts_number + rhs.total_parts_number; + res.dropped_parts_size = this->dropped_parts_size + rhs.dropped_parts_size; + res.dropped_parts_number = this->dropped_parts_number + rhs.dropped_parts_number; res.has_bucket_number_neg_one = this->has_bucket_number_neg_one || rhs.has_bucket_number_neg_one; res.is_single_table_definition_hash = this->is_single_table_definition_hash && rhs.is_single_table_definition_hash; // TODO: verify with GuanZhe. @@ -616,6 +624,9 @@ void PartitionMetrics::PartitionMetricsStore::update(const Protos::DataModelPart total_rows_count -= (part_model.has_covered_parts_rows() ? part_model.covered_parts_rows() : part_model.rows_count()); total_parts_size -= (part_model.has_covered_parts_size() ? part_model.covered_parts_size() : part_model.size()); total_parts_number -= (part_model.has_covered_parts_count() ? part_model.covered_parts_count() : 1); + dropped_parts_size + += (part_model.has_covered_parts_size() ? (part_model.covered_parts_size() + part_model.size()) : part_model.size()); + dropped_parts_number += (part_model.has_covered_parts_count() ? (part_model.covered_parts_count() + 1) : 2); } else { @@ -678,4 +689,94 @@ bool PartitionMetrics::PartitionMetricsStore::matches(const PartitionMetricsStor return total_parts_size == rhs.total_parts_size && total_parts_number == rhs.total_parts_number && total_rows_count == rhs.total_rows_count && last_modification_time == rhs.last_modification_time; } +PartitionMetrics::PartitionMetricsStore::PartitionMetricsStore( + ServerDataPartsWithDBM & parts_with_dbm, const StorageCnchMergeTree & storage) +{ + if (parts_with_dbm.first.empty()) + return; + + + /// 1. Use all parts to calculate last_update_time and last_modification_time. + for (const auto & part : parts_with_dbm.first) + { + auto cur = part; + while (cur) + { + updateLastModificationTime(cur->part_model()); + last_update_time = std::max(last_update_time, cur->part_model().commit_time()); + cur = cur->tryGetPreviousPart(); + } + } + + /// 2. Divide parts into two groups: using and dropped. + ServerDataPartsWithDBM using_parts_with_dbm; + ServerDataPartsWithDBM dropped_parts_with_dbm; + + /// 3. Prepare for rows count. + using_parts_with_dbm.first = CnchPartsHelper::calcVisibleParts( + parts_with_dbm.first, false, CnchPartsHelper::DisableLogging, &dropped_parts_with_dbm.first); + CnchPartsHelper::calcVisibleDeleteBitmaps(parts_with_dbm.second, using_parts_with_dbm.second); + + /// 4. For each (user-visible) parts, update total_parts_size, total_parts_number + /// has_bucket_number_neg_one, is_single_table_definition_hash, is_deleted + /// and table_definition_hash. + for (const auto & part : using_parts_with_dbm.first) + { + auto cur = part; + while (cur) { + total_parts_size += cur->size(); + total_parts_number += 1; + if (part->part_model().bucket_number() == -1) + has_bucket_number_neg_one = true; + if (table_definition_hash != 0 && table_definition_hash != part->part_model().table_definition_hash()) + is_single_table_definition_hash = false; + table_definition_hash = cur->part_model().table_definition_hash(); + is_deleted = false; + + cur = cur->tryGetPreviousPart(); + } + } + + /// 5. For dropped_parts_size and dropped_parts_number, we use dropped_parts_with_dbm. + for (const auto & part : dropped_parts_with_dbm.first) + { + auto cur = part; + while (cur) { + dropped_parts_size += cur->size(); + dropped_parts_number += 1; + cur = cur->tryGetPreviousPart(); + } + } + + /// 6. Pick rows count logic from StorageCnchMergeTree.cpp + if (storage.getInMemoryMetadataPtr()->hasUniqueKey()) + storage.getDeleteBitmapMetaForServerParts(using_parts_with_dbm.first, using_parts_with_dbm.second); + + for (const auto & part : using_parts_with_dbm.first) + total_rows_count += part->rowsCount() - part->deletedRowsCount(storage); +} +void PartitionMetrics::PartitionMetricsStore::removeDroppedPart(const Protos::DataModelPart & part_model) +{ + dropped_parts_size -= part_model.size(); + dropped_parts_number -= 1; +} +void PartitionMetrics::removeDroppedPart(const Protos::DataModelPart & part_model) +{ + std::unique_lock write_lock(mutex); + if (old_store.has_value()) + { + if (part_model.commit_time() > old_store.value().first) + { + new_store.removeDroppedPart(part_model); + } + else + { + old_store.value().second.removeDroppedPart(part_model); + } + } + else + { + new_store.removeDroppedPart(part_model); + } +} } diff --git a/src/Storages/CnchTablePartitionMetrics.h b/src/Storages/CnchTablePartitionMetrics.h index f44af2ec7f..bf868abd18 100644 --- a/src/Storages/CnchTablePartitionMetrics.h +++ b/src/Storages/CnchTablePartitionMetrics.h @@ -5,7 +5,6 @@ #pragma once #include -#include #include #include #include @@ -20,6 +19,13 @@ namespace DB { +namespace Catalog +{ + struct TrashItems; +} + +class StorageCnchMergeTree; + /** * @class TableMetrics * @brief Table level metrics for trashed items. @@ -57,6 +63,10 @@ public: String toString() const { return toSnapshot().DebugString(); } bool validateMetrics() const; + /// We have two phases of GC. For dropped_parts, + /// phase-one GC will add some value to statistics, + /// meanwhile phase-two GC will substract it from statistics. + /// So `positive` means whether the value is added or substracted. void update(const ServerDataPartPtr & part, size_t ts, bool positive = true); void update(const DeleteBitmapMetaPtr & bitmap, size_t ts, bool positive = true); void update(const Protos::DataModelPart & part, size_t ts, bool positive = true); @@ -106,9 +116,13 @@ public: */ struct PartitionMetricsStore { + /// Parts that visible to users. int64_t total_parts_size{0}; int64_t total_parts_number{0}; int64_t total_rows_count{0}; + /// Parts that is in phase-one GC. + int64_t dropped_parts_size{0}; + int64_t dropped_parts_number{0}; // Will be true if there is one part that has bucket_number == -1 bool has_bucket_number_neg_one{false}; // False if there are multiple table_definition_hash in this partition @@ -123,6 +137,7 @@ public: PartitionMetricsStore() = default; explicit PartitionMetricsStore(const Protos::PartitionPartsMetricsSnapshot & snapshot); + explicit PartitionMetricsStore(ServerDataPartsWithDBM & parts_with_dbm, const StorageCnchMergeTree & storage); PartitionMetricsStore operator+(const PartitionMetricsStore & rhs) const; PartitionMetricsStore & operator+=(const PartitionMetricsStore & rhs); @@ -132,11 +147,15 @@ public: void updateLastModificationTime(const Protos::DataModelPart & part_model); void update(const Protos::DataModelPart & part_model); + /// Called by phase-one GC. + void removeDroppedPart(const Protos::DataModelPart & part_model); bool validateMetrics() const; bool matches(const PartitionMetricsStore & rhs) const; }; bool validateMetrics(); + /// Called by phase-one GC. + void removeDroppedPart(const Protos::DataModelPart & part_model); bool isRecalculating() { return recalculating.load(); } void update(const Protos::DataModelPart & part_model); diff --git a/src/Storages/PartCacheManager.cpp b/src/Storages/PartCacheManager.cpp index 3814ee74b3..bf81898af0 100644 --- a/src/Storages/PartCacheManager.cpp +++ b/src/Storages/PartCacheManager.cpp @@ -1875,8 +1875,38 @@ std::shared_ptr PartCacheManager::getTrashItemsInfoMetrics(const I } void PartCacheManager::updateTrashItemsMetrics(const UUID & table_uuid, const Catalog::TrashItems & items, bool positive) { + /// Since currently all items are from the same partition + /// (and even it is not that, they are very likely from consecutive partitions), + /// we can cache the partition_info to avoid querying the table meta too often. + std::optional> cached_partition_info; + if (auto table_meta_ptr = getTableMeta(table_uuid)) + { table_partition_metrics.updateMetrics(items, table_meta_ptr->trash_item_metrics, positive); + + /// Phase one GC. + if (positive) + { + for (const auto & part : items.data_parts) + { + const String & partition_id = part->info().partition_id; + if (!cached_partition_info.has_value() || cached_partition_info.value().first != partition_id) + { + cached_partition_info = std::make_pair(partition_id, table_meta_ptr->getPartitionInfo(partition_id)); + } + + if (cached_partition_info.has_value()) + { + auto & partition_info = cached_partition_info.value().second; + + if (partition_info->metrics_ptr) + { + partition_info->metrics_ptr->removeDroppedPart(part->part_model()); + } + } + } + } + } } bool PartCacheManager::forceRecalculate(StoragePtr table) { diff --git a/src/Storages/System/StorageSystemCnchPartsInfo.cpp b/src/Storages/System/StorageSystemCnchPartsInfo.cpp index 3215da6162..8c0e181292 100644 --- a/src/Storages/System/StorageSystemCnchPartsInfo.cpp +++ b/src/Storages/System/StorageSystemCnchPartsInfo.cpp @@ -52,6 +52,8 @@ StorageSystemCnchPartsInfo::StorageSystemCnchPartsInfo(const StorageID & table_i {"total_parts_number", std::make_shared()}, {"total_parts_size", std::make_shared()}, {"total_rows_count", std::make_shared()}, + {"dropped_parts_number", std::make_shared()}, + {"dropped_parts_size", std::make_shared()}, {"ready_state", std::move(ready_state)}, /// Boolean {"recalculating", std::make_shared()}, diff --git a/src/Storages/System/StorageSystemCnchPartsInfoLocal.cpp b/src/Storages/System/StorageSystemCnchPartsInfoLocal.cpp index 9f678caeed..15c208fd47 100644 --- a/src/Storages/System/StorageSystemCnchPartsInfoLocal.cpp +++ b/src/Storages/System/StorageSystemCnchPartsInfoLocal.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include namespace DB @@ -55,6 +56,8 @@ StorageSystemCnchPartsInfoLocal::StorageSystemCnchPartsInfoLocal(const StorageID {"total_parts_number", std::make_shared()}, {"total_parts_size", std::make_shared()}, {"total_rows_count", std::make_shared()}, + {"dropped_parts_number", std::make_shared()}, + {"dropped_parts_size", std::make_shared()}, {"ready_state", std::move(ready_state)}, /// Boolean {"recalculating", std::make_shared()}, @@ -273,7 +276,18 @@ Pipe StorageSystemCnchPartsInfoLocal::read( res_columns[dest_index++]->insert(0); if (columns_mask[src_index++]) res_columns[dest_index++]->insert(0); + LOG_TRACE( + getLogger("PartsInfoLocal"), + "database: {}, table: {}, partition: {}, Actual metrics is {}", + entry->database, + entry->table, + metric->partition, + metrics_data.toString()); } + if (columns_mask[src_index++]) + res_columns[dest_index++]->insert(metrics_data.dropped_parts_number >= 0 ? metrics_data.dropped_parts_number : 0); + if (columns_mask[src_index++]) + res_columns[dest_index++]->insert(metrics_data.dropped_parts_size >= 0 ? metrics_data.dropped_parts_size : 0); if (columns_mask[src_index++]) { ReadyState state = ReadyState::Unloaded; diff --git a/tests/queries/4_cnch_stateless/50010_parts_info.reference b/tests/queries/4_cnch_stateless/50010_parts_info.reference new file mode 100644 index 0000000000..acc5399c19 --- /dev/null +++ b/tests/queries/4_cnch_stateless/50010_parts_info.reference @@ -0,0 +1,16 @@ +Test drop range. +1 +2 +1 +2 +1 +2 +0 0 0 +Test droped part. +1 +1 +1 +1 +1 +1 +0 0 0 diff --git a/tests/queries/4_cnch_stateless/50010_parts_info.sql b/tests/queries/4_cnch_stateless/50010_parts_info.sql new file mode 100644 index 0000000000..66b26ffe73 --- /dev/null +++ b/tests/queries/4_cnch_stateless/50010_parts_info.sql @@ -0,0 +1,167 @@ +select 'Test drop range.'; + +DROP TABLE IF EXISTS pi; + +CREATE TABLE pi +( + `name` String, + `country` String, + `asdf` Int +) +ENGINE = CnchMergeTree +PARTITION BY name +ORDER BY name; + +insert into pi values ('a', 'a', 1); +insert into pi values ('a', 'a', 2); + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +-- Action: mutation. +ALTER TABLE pi DROP COLUMN asdf; + +-- Make sure that mutation is finished +SYSTEM START MERGES pi; +SYSTEM STOP MERGES pi; +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +-- Action: mutation. +ALTER TABLE pi DROP COLUMN country; + +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +TRUNCATE TABLE pi; + +select total_parts_number, total_rows_count, total_parts_size from +system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +DROP TABLE IF EXISTS pi; + +--------- + +select 'Test droped part.'; + +DROP TABLE IF EXISTS pi; + +CREATE TABLE pi +( + `name` String, + `country` String, + `asdf` Int +) +ENGINE = CnchMergeTree +PARTITION BY name +ORDER BY name; + +insert into pi values ('a', 'a', 1); + + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +-- Action: mutation. +ALTER TABLE pi DROP COLUMN asdf; + +-- Make sure that mutation is finished +SYSTEM START MERGES pi; +SYSTEM STOP MERGES pi; +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +-- Action: mutation. +ALTER TABLE pi DROP COLUMN country; + +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +TRUNCATE TABLE pi; + +select total_parts_number, total_rows_count, total_parts_size from +system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +DROP TABLE IF EXISTS pi; diff --git a/tests/queries/4_cnch_stateless/50011_parts_info_for_unique_table.reference b/tests/queries/4_cnch_stateless/50011_parts_info_for_unique_table.reference new file mode 100644 index 0000000000..7f29a299aa --- /dev/null +++ b/tests/queries/4_cnch_stateless/50011_parts_info_for_unique_table.reference @@ -0,0 +1,16 @@ +Test drop range. +1 +2 +1 +1 +1 +1 +0 0 0 +Test droped part. +1 +1 +1 +1 +1 +1 +0 0 0 diff --git a/tests/queries/4_cnch_stateless/50011_parts_info_for_unique_table.sql b/tests/queries/4_cnch_stateless/50011_parts_info_for_unique_table.sql new file mode 100644 index 0000000000..22375e9fc9 --- /dev/null +++ b/tests/queries/4_cnch_stateless/50011_parts_info_for_unique_table.sql @@ -0,0 +1,169 @@ +select 'Test drop range.'; + +DROP TABLE IF EXISTS pi; + +CREATE TABLE pi +( + `name` String, + `country` String, + `asdf` Int +) +ENGINE = CnchMergeTree +PARTITION BY name +ORDER BY name +UNIQUE KEY name; + +insert into pi values ('a', 'a', 1); +insert into pi values ('a', 'a', 2); + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +-- Action: mutation. +ALTER TABLE pi DROP COLUMN asdf; + +-- Make sure that mutation is finished +SYSTEM START MERGES pi; +SYSTEM STOP MERGES pi; +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +-- Action: mutation. +ALTER TABLE pi DROP COLUMN country; + +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +TRUNCATE TABLE pi; + +select total_parts_number, total_rows_count, total_parts_size from +system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +DROP TABLE IF EXISTS pi; + +--------- + +select 'Test droped part.'; + +DROP TABLE IF EXISTS pi; + +CREATE TABLE pi +( + `name` String, + `country` String, + `asdf` Int +) +ENGINE = CnchMergeTree +PARTITION BY name +ORDER BY name +UNIQUE KEY name; + +insert into pi values ('a', 'a', 1); + + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +-- Action: mutation. +ALTER TABLE pi DROP COLUMN asdf; + +-- Make sure that mutation is finished +SYSTEM START MERGES pi; +SYSTEM STOP MERGES pi; +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +-- Action: mutation. +ALTER TABLE pi DROP COLUMN country; + +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + + +-- Parts' numbers and size should match. +select equals( + ( + select count(), sum(bytes_on_disk) from system.cnch_parts + where database = currentDatabase() and table = 'pi' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info + where database = currentDatabase() and table = 'pi' + ) +); + +-- Row numbers should match. +select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +TRUNCATE TABLE pi; + +select total_parts_number, total_rows_count, total_parts_size from +system.cnch_parts_info where database = currentDatabase() and table = 'pi'; + +DROP TABLE IF EXISTS pi; diff --git a/tests/queries/4_cnch_stateless/50011_parts_info_last_modification_time.reference b/tests/queries/4_cnch_stateless/50011_parts_info_last_modification_time.reference new file mode 100644 index 0000000000..6702293297 --- /dev/null +++ b/tests/queries/4_cnch_stateless/50011_parts_info_last_modification_time.reference @@ -0,0 +1,11 @@ +Insert, alter and drop table. +1 +1 +1 +1 +Merge would not affect the last_modification_time. +1 +1 +Recalculate would not affect the correctness. +1 +1 diff --git a/tests/queries/4_cnch_stateless/50011_parts_info_last_modification_time.sql b/tests/queries/4_cnch_stateless/50011_parts_info_last_modification_time.sql new file mode 100644 index 0000000000..9526cba224 --- /dev/null +++ b/tests/queries/4_cnch_stateless/50011_parts_info_last_modification_time.sql @@ -0,0 +1,133 @@ +-- This test assume that GC will not removed dropped parts very often. + +SELECT 'Insert, alter and drop table.'; + +DROP TABLE IF EXISTS pi; + +CREATE TABLE pi +( + `name` String, + `country` String, + `asdf` Int +) +ENGINE = CnchMergeTree +PARTITION BY name +ORDER BY name; + +-- Insert + +INSERT INTO pi VALUES ('a', 'a', 1); + +SELECT equals( + ( + SELECT max(commit_time) FROM system.cnch_parts WHERE database = currentDatabase() AND table = 'pi' + ), + ( + SELECT last_modification_time FROM system.cnch_parts_info WHERE database = currentDatabase() AND table = 'pi' + ) +); + +-- Alter +ALTER TABLE pi DROP COLUMN asdf; + +SYSTEM START MERGES pi; +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; +SELECT number + sleepEachRow(3) from numbers(3) FORMAT Null; + +SELECT equals( + ( + SELECT count() FROM system.cnch_parts WHERE database = currentDatabase() AND table = 'pi' + ), + ( + SELECT 2 + ) +); + +SELECT equals( + ( + SELECT max(commit_time) FROM system.cnch_parts WHERE database = currentDatabase() AND table = 'pi' + ), + ( + SELECT last_modification_time FROM system.cnch_parts_info WHERE database = currentDatabase() AND table = 'pi' + ) +); + +-- Drop +TRUNCATE TABLE pi; + +SELECT equals( + ( + SELECT max(commit_time) FROM system.cnch_parts WHERE database = currentDatabase() AND table = 'pi' + ), + ( + SELECT last_modification_time FROM system.cnch_parts_info WHERE database = currentDatabase() AND table = 'pi' + ) +); + +--- +SELECT 'Merge would not affect the last_modification_time.'; + +DROP TABLE IF EXISTS pi; + +CREATE TABLE pi +( + `name` String, + `country` String, + `asdf` Int +) +ENGINE = CnchMergeTree +PARTITION BY name +ORDER BY name; + +INSERT INTO pi VALUES ('a', 'a', 1); +INSERT INTO pi VALUES ('a', 'a', 1); +INSERT INTO pi VALUES ('a', 'a', 1); + +SYSTEM STOP MERGES pi; +SYSTEM START MERGES pi; +OPTIMIZE TABLE pi SETTINGS mutations_sync = 1; + +SELECT equals( + ( + SELECT total_parts_number, total_rows_count FROM system.cnch_parts_info WHERE database = currentDatabase() AND table = 'pi' + ), + ( + SELECT 1, 3 + ) +); + +SELECT equals( + ( + SELECT max(commit_time) FROM system.cnch_parts WHERE database = currentDatabase() AND table = 'pi' and name like '%\_0\_%' + ), + ( + SELECT last_modification_time FROM system.cnch_parts_info WHERE database = currentDatabase() AND table = 'pi' + ) +); + +--- +SELECT 'Recalculate would not affect the correctness.'; + +TRUNCATE TABLE pi; + +SELECT equals( + ( + SELECT max(commit_time) FROM system.cnch_parts WHERE database = currentDatabase() AND table = 'pi' + ), + ( + SELECT last_modification_time FROM system.cnch_parts_info WHERE database = currentDatabase() AND table = 'pi' + ) +); + +SYSTEM RECALCULATE METRICS FOR pi; + +SELECT number + sleepEachRow(3) from numbers(5) FORMAT Null; + +SELECT equals( + ( + SELECT max(commit_time) FROM system.cnch_parts WHERE database = currentDatabase() AND table = 'pi' + ), + ( + SELECT last_modification_time FROM system.cnch_parts_info WHERE database = currentDatabase() AND table = 'pi' + ) +); diff --git a/tests/queries/4_cnch_stateless/50013_parts_info_with_drop_table.sql b/tests/queries/4_cnch_stateless/50013_parts_info_with_drop_table.sql index ae1101081f..33dd7db921 100644 --- a/tests/queries/4_cnch_stateless/50013_parts_info_with_drop_table.sql +++ b/tests/queries/4_cnch_stateless/50013_parts_info_with_drop_table.sql @@ -14,7 +14,7 @@ SELECT lessOrEquals ( ( SELECT count() FROM system.cnch_parts_info - WHERE database = currentDatabase(0) AND table = 'pi' + WHERE database = currentDatabase() AND table = 'pi' ) , 1 @@ -36,7 +36,7 @@ SELECT lessOrEquals ( ( SELECT count() FROM system.cnch_parts_info - WHERE database = currentDatabase(0) AND table = 'pi' + WHERE database = currentDatabase() AND table = 'pi' ) , 1 @@ -58,7 +58,7 @@ SELECT lessOrEquals ( ( SELECT count() FROM system.cnch_parts_info - WHERE database = currentDatabase(0) AND table = 'pi' + WHERE database = currentDatabase() AND table = 'pi' ) , 1 diff --git a/tests/queries/4_cnch_stateless/50014_parts_info_with_dropped_parts.reference b/tests/queries/4_cnch_stateless/50014_parts_info_with_dropped_parts.reference new file mode 100644 index 0000000000..7c1f73a14d --- /dev/null +++ b/tests/queries/4_cnch_stateless/50014_parts_info_with_dropped_parts.reference @@ -0,0 +1,16 @@ +Alter table should not cover the base part. +1 +1 +Rows count should be accurate after recalculating metrics. +1 +Alter table should not cover the base part. +1 +1 +1 +Rows count should be accurate after recalculating metrics. +1 +Truncate table and dropped part. +1 +1 +0 0 0 +1 diff --git a/tests/queries/4_cnch_stateless/50014_parts_info_with_dropped_parts.sql b/tests/queries/4_cnch_stateless/50014_parts_info_with_dropped_parts.sql new file mode 100644 index 0000000000..a12d78f1cd --- /dev/null +++ b/tests/queries/4_cnch_stateless/50014_parts_info_with_dropped_parts.sql @@ -0,0 +1,184 @@ +SET disable_optimize_final = 0; +select 'Alter table should not cover the base part.'; + +CREATE TABLE test (a int, b int, c int, d int) ENGINE = CnchMergeTree() ORDER BY d; +system start merges test; + +alter table test modify setting old_parts_lifetime=10000; -- phase-one gc + +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; + +ALTER TABLE test drop column a; +ALTER TABLE test drop column b; + +OPTIMIZE TABLE test FINAL; + +SELECT sleepEachRow(3) FROM system.numbers LIMIT 20 FORMAT Null; + +select equals( + ( + select count(), sum(bytes) from system.cnch_parts where database = currentDatabase() and table = 'test' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +select equals( + ( + select sum(rows) from system.cnch_parts where database = currentDatabase() and table = 'test' and part_type = 1 + ), + ( + select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +select 'Rows count should be accurate after recalculating metrics.'; +SYSTEM RECALCULATE METRICS FOR test; +SELECT sleepEachRow(3) FROM system.numbers LIMIT 6 FORMAT Null; + +select equals( + ( + select count() from test + ), + ( + select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +DROP TABLE test; + +select 'Alter table should not cover the base part.'; + +CREATE TABLE test (a int, b int, c int, d int) ENGINE = CnchMergeTree() ORDER BY d UNIQUE KEY d % 400; +system start merges test; + +alter table test modify setting old_parts_lifetime=10000; -- phase-one gc + +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 100; + +ALTER TABLE test drop column a; +ALTER TABLE test drop column b; + +OPTIMIZE TABLE test FINAL; + +SELECT sleepEachRow(3) FROM system.numbers LIMIT 20 FORMAT Null; + +select equals( + ( + select count(), sum(bytes) from system.cnch_parts where database = currentDatabase() and table = 'test' and part_type <= 2 + ), + ( + select total_parts_number, total_parts_size from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +select equals( + ( + select sum(rows) from system.cnch_parts where database = currentDatabase() and table = 'test' and part_type = 1 + ), + ( + select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +select equals( + ( + select count(), sum(bytes) from system.cnch_parts where database = currentDatabase() and table = 'test' and part_type >= 3 + ), + ( + select dropped_parts_number, dropped_parts_size from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +select 'Rows count should be accurate after recalculating metrics.'; +SYSTEM RECALCULATE METRICS FOR test; +SELECT sleepEachRow(3) FROM system.numbers LIMIT 6 FORMAT Null; + +select equals( + ( + select count() from test + ), + ( + select total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +DROP TABLE test; + +select 'Truncate table and dropped part.'; + +CREATE TABLE test (a int, b int, c int, d int) ENGINE = CnchMergeTree() ORDER BY d; +system stop merges test; + +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; + +TRUNCATE TABLE test; + +select equals( + ( + select count(), sum(bytes) from system.cnch_parts where database = currentDatabase() and table = 'test' and part_type >= 3 + ), + ( + select dropped_parts_number, dropped_parts_size from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; +insert into test SELECT * FROM generateRandom('a int, b int, c int, d int') LIMIT 10; + +TRUNCATE TABLE test; + +select equals( + ( + select count(), sum(bytes) from system.cnch_parts where database = currentDatabase() and table = 'test' and part_type >= 3 + ), + ( + select dropped_parts_number, dropped_parts_size from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); + +system recalculate metrics for test; +SELECT sleepEachRow(3) FROM system.numbers LIMIT 6 FORMAT Null; + +select total_parts_number, total_parts_size, total_rows_count from system.cnch_parts_info where database = currentDatabase() and table = 'test'; + +select equals( + ( + select count(), sum(bytes) from system.cnch_parts where database = currentDatabase() and table = 'test' + ), + ( + select dropped_parts_number, dropped_parts_size from system.cnch_parts_info where database = currentDatabase() and table = 'test' + ) +); diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index c67adc42d7..771c6fe2ba 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -653,11 +653,15 @@ /// ---- Need to forward `SYSTEM GC` to host server. "10202_part_gc", "10200_table_snapshots", - "50010_parts_info", - "50011_parts_info_for_unique_table", /// Need flush system logs on host server "02981_vertical_merges_memory_usage", "02981_vertical_merges_lc_memory_usage", + /// Parts info related. + "50010_parts_info", + "50011_parts_info_for_unique_table", + "50011_parts_info_last_modification_time", + "50013_parts_info_with_drop_table", + "50014_parts_info_with_dropped_parts", /// Code: 57 -- Table xxx already exists. "40023_mv_with_topn_filtering", /// Code: 36 -- Dictionary xxx not found