From af3173ffd939c85db74eaf4fd8a6ffb2744b445f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=B3=E5=93=B2?= Date: Fri, 18 Oct 2024 07:39:01 +0000 Subject: [PATCH] Merge branch 'cnch_dev_part_cache_ut' into 'cnch-dev' fix(clickhousech@m-5370514631): add more ut for PartCacheManager See merge request dp/ClickHouse!25682 --- src/Catalog/Catalog.cpp | 19 +- src/Catalog/Catalog.h | 2 +- src/Storages/PartCacheManager.cpp | 402 +++++++++--------- .../tests/gtest_part_storage_cache.cpp | 74 ++++ 4 files changed, 290 insertions(+), 207 deletions(-) diff --git a/src/Catalog/Catalog.cpp b/src/Catalog/Catalog.cpp index 5c47b3dcae..8b19902e66 100644 --- a/src/Catalog/Catalog.cpp +++ b/src/Catalog/Catalog.cpp @@ -1945,7 +1945,7 @@ namespace Catalog ServerDataPartsVector tmp_res; const auto & merge_tree_storage = dynamic_cast(*storage); Strings all_partitions = getPartitionIDsFromMetastore(storage); - auto parts_model = getDataPartsMetaFromMetastore(storage, partitions, all_partitions, ts, /*from_trash=*/ false); + auto parts_model = getDataPartsMetaFromMetastore(storage, partitions, all_partitions, ts, session_context, /*from_trash=*/ false); for (auto & ele : parts_model) { auto part_model_wrapper = createPartWrapperFromModel(merge_tree_storage, std::move(*(ele->model)), std::move(ele->name)); @@ -1989,7 +1989,7 @@ namespace Catalog partitions, [&](const Strings & required_partitions, const Strings & full_partitions) -> DataModelPartWrapperVector { miss_cache = true; - DataModelPartWithNameVector fetched = getDataPartsMetaFromMetastore(storage, required_partitions, full_partitions, TxnTimestamp{0}, /*from_trash=*/ false); + DataModelPartWithNameVector fetched = getDataPartsMetaFromMetastore(storage, required_partitions, full_partitions, TxnTimestamp{0}, session_context, /*from_trash=*/ false); DataModelPartWrapperVector ret; ret.reserve(fetched.size()); @@ -2285,7 +2285,7 @@ namespace Catalog Stopwatch watch; Strings all_partitions = getPartitionIDsFromMetastore(storage); - auto parts_model = getDataPartsMetaFromMetastore(storage, partitions, all_partitions, ts, /*from_trash=*/ true); + auto parts_model = getDataPartsMetaFromMetastore(storage, partitions, all_partitions, ts, &context, /*from_trash=*/ true); for (auto & ele : parts_model) { auto part_model_wrapper = createPartWrapperFromModel(*merge_tree, std::move(*(ele->model)), std::move(ele->name)); @@ -6230,7 +6230,12 @@ namespace Catalog } DataModelPartWithNameVector Catalog::getDataPartsMetaFromMetastore( - const ConstStoragePtr & storage, const Strings & required_partitions, const Strings & full_partitions, const TxnTimestamp & ts, bool from_trash) + const ConstStoragePtr & storage, + const Strings & required_partitions, + const Strings & full_partitions, + const TxnTimestamp & ts, + const Context * session_context, + bool from_trash) { String uuid = UUIDHelpers::UUIDToString(storage->getStorageUUID()); String part_meta_prefix = MetastoreProxy::dataPartPrefix(name_space, uuid); @@ -6250,7 +6255,11 @@ namespace Catalog }; - UInt32 time_out_ms = 1000 * (context.getSettingsRef().cnch_fetch_parts_timeout.totalSeconds()); + UInt32 time_out_ms; + if (session_context) + time_out_ms = 1000 * (session_context->getSettingsRef().cnch_fetch_parts_timeout.totalSeconds()); + else + time_out_ms = 1000 * context.getSettingsRef().cnch_fetch_parts_timeout.totalSeconds(); String meta_prefix = from_trash ? MetastoreProxy::trashItemsPrefix(name_space, uuid) + PART_STORE_PREFIX diff --git a/src/Catalog/Catalog.h b/src/Catalog/Catalog.h index e80bc6a9fc..27d42ba947 100644 --- a/src/Catalog/Catalog.h +++ b/src/Catalog/Catalog.h @@ -993,7 +993,7 @@ private: StoragePtr createTableFromDataModel(const Context & session_context, const Protos::DataModelTable & data_model); void detachOrAttachTable(const String & db, const String & name, const TxnTimestamp & ts, bool is_detach); DataModelPartWithNameVector getDataPartsMetaFromMetastore( - const ConstStoragePtr & storage, const Strings & required_partitions, const Strings & full_partitions, const TxnTimestamp & ts, bool from_trash = false); + const ConstStoragePtr & storage, const Strings & required_partitions, const Strings & full_partitions, const TxnTimestamp & ts, const Context * session_context, bool from_trash = false); DeleteBitmapMetaPtrVector getDeleteBitmapsInPartitionsImpl( const ConstStoragePtr & storage, const Strings & partitions, const TxnTimestamp & ts, bool from_trash = false, VisibilityLevel visibility = VisibilityLevel::Visible); DataModelDeleteBitmapPtrVector getDeleteBitmapsInPartitionsImpl( diff --git a/src/Storages/PartCacheManager.cpp b/src/Storages/PartCacheManager.cpp index f874cf506d..3814ee74b3 100644 --- a/src/Storages/PartCacheManager.cpp +++ b/src/Storages/PartCacheManager.cpp @@ -1249,10 +1249,10 @@ RetValueVec PartCacheManager::getDataInternal( } else { + std::map partition_parts; max_threads = std::min(max_threads, meta_partitions.size()); ExceptionHandler exception_handler; ThreadPool thread_pool(max_threads); - std::map partition_parts; for (auto & [partition_id, partition_info_ptr] : meta_partitions) { partition_parts[partition_id] = RetValueVec(); @@ -1437,136 +1437,57 @@ template < typename Adapter, typename LoadFunc, typename RetValueVec> - RetValueVec PartCacheManager::getDataByPartition( - const MergeTreeMetaBase & storage, - const TableMetaEntryPtr & meta_ptr, - const Strings & partitions, - LoadFunc & load_func, - const UInt64 & ts) +RetValueVec PartCacheManager::getDataByPartition( + const MergeTreeMetaBase & storage, + const TableMetaEntryPtr & meta_ptr, + const Strings & partitions, + LoadFunc & load_func, + const UInt64 & ts) +{ + String type; + CachePtr cache_ptr = nullptr; + + if constexpr (std::is_same_v) { - String type; - CachePtr cache_ptr = nullptr; + type = "parts"; + cache_ptr = part_cache_ptr; + } + else if constexpr (std::is_same_v) + { + type = "delete bitmaps"; + cache_ptr = delete_bitmap_cache_ptr; + } + else + { + static_assert(DependentFalse::value, "invalid template type for CachePtr"); + } - if constexpr (std::is_same_v) + LOG_DEBUG(getLogger("PartCacheManager"), "Get {} by partitions for table : {}", type, storage.getLogName()); + Stopwatch watch; + UUID uuid = storage.getStorageUUID(); + + const UInt64 current_loading_task_id = getLoadTaskID(); + auto meta_partitions = meta_ptr->getPartitions(partitions); + + auto process_partition = [&](const String & partition_id, const PartitionInfoPtr & partition_info_ptr, RetValueVec & parts) { + DataCacheStatus * cache_status = getCacheStatus(partition_info_ptr); + + while (true) { - type = "parts"; - cache_ptr = part_cache_ptr; - } - else if constexpr (std::is_same_v) - { - type = "delete bitmaps"; - cache_ptr = delete_bitmap_cache_ptr; - } - else - { - static_assert(DependentFalse::value, "invalid template type for CachePtr"); - } + /// stop if fetch part time exceeds the query max execution time. + checkTimeLimit(watch); - LOG_DEBUG(getLogger("PartCacheManager"), "Get {} by partitions for table : {}", type, storage.getLogName()); - Stopwatch watch; - UUID uuid = storage.getStorageUUID(); - - const UInt64 current_loading_task_id = getLoadTaskID(); - auto meta_partitions = meta_ptr->getPartitions(partitions); - - auto process_partition = [&](const String & partition_id, const PartitionInfoPtr & partition_info_ptr, RetValueVec & parts) { - DataCacheStatus * cache_status = getCacheStatus(partition_info_ptr); - - while (true) + CacheStatusGuard cache_status_guard(uuid, current_loading_task_id, cache_ptr); + bool need_load_parts = false; { - /// stop if fetch part time exceeds the query max execution time. - checkTimeLimit(watch); - - CacheStatusGuard cache_status_guard(uuid, current_loading_task_id, cache_ptr); - bool need_load_parts = false; + if (cache_status->isLoaded()) { - if (cache_status->isLoaded()) + auto cached = cache_ptr->get({uuid, partition_id}); + if (cached) { - auto cached = cache_ptr->get({uuid, partition_id}); - if (cached) + for (auto it = cached->begin(); it != cached->end(); ++it) { - for (auto it = cached->begin(); it != cached->end(); ++it) - { - const auto & data_wrapper_ptr = *it; - if (this->isVisible(data_wrapper_ptr, ts)) - { - Adapter adapter(storage, data_wrapper_ptr); - parts.push_back(adapter.toData()); - //logPartsVector(storage, res); - } - } - - /// already get parts from cache, continue to next partition - return; - } - } - - auto partition_write_lock = partition_info_ptr->writeLock(); - /// Double check - if (!cache_status->isLoading()) - { - cache_status->setToLoading(current_loading_task_id); - cache_status_guard.addPartition(partition_info_ptr); - need_load_parts = true; - } - } - - /// Now cache status must be LOADING; - /// need to load parts from metastore - if (need_load_parts) - { - Vec fetched; - try - { - std::map> partition_to_parts; - fetched = load_func({partition_id}, {partition_id}); - - /// It happens that new parts have been inserted into cache during loading parts from bytekv, we need merge them to make - /// sure the cache contains all parts of the partition. - auto partition_write_lock = partition_info_ptr->writeLock(); - if (cache_status->isLoadingByCurrentThread(current_loading_task_id)) - { - auto cached = cache_ptr->get({uuid, partition_id}); - if (!cached) - { - /// directly insert all fetched parts into cache - cached = std::make_shared(); - for (auto & data_wrapper_ptr : fetched) - { - Adapter adapter(storage, data_wrapper_ptr); - cached->update(adapter.getName(), data_wrapper_ptr); - } - cache_ptr->insert({uuid, partition_id}, cached); - } - else - { - for (auto & data_wrapper_ptr : fetched) - { - Adapter adapter(storage, data_wrapper_ptr); - auto it = cached->find(adapter.getName()); - Adapter it_adapter(storage, *it); - /// do not update cache if the cached data is newer than bytekv. - if (it == cached->end() || it_adapter.getCommitTime() < adapter.getCommitTime()) - { - cached->update(adapter.getName(), data_wrapper_ptr); - } - } - /// Force LRU cache update status(weight/evict). - cache_ptr->insert({uuid, partition_id}, cached); - } - - cache_status->setToLoaded(); - } - - /// Release partition lock before construct ServerDataPart - partition_write_lock.reset(); - - /// Finish fetching parts, notify other waiting tasks if any. - meta_ptr->fetch_cv.notify_all(); - - for (auto & data_wrapper_ptr : fetched) - { - /// Only filter the parts when both commit_time and txnid are smaller or equal to ts (txnid is helpful for intermediate parts). + const auto & data_wrapper_ptr = *it; if (this->isVisible(data_wrapper_ptr, ts)) { Adapter adapter(storage, data_wrapper_ptr); @@ -1575,104 +1496,183 @@ template < } } - /// go to next partition; + /// already get parts from cache, continue to next partition return; } - catch (...) - { - throw; - } } - else /// other task is fetching parts now, just wait for the result - { - { - std::unique_lock lock(meta_ptr->fetch_mutex); - if (!meta_ptr->fetch_cv.wait_for( - lock, std::chrono::milliseconds(5000), [&cache_status]() { return cache_status->isLoaded(); })) - { - LOG_TRACE( - getLogger("PartCacheManager"), - "Wait timeout 5000ms for other thread loading table: {}, partition: {}", - storage.getStorageID().getNameForLogs(), - partition_id); - continue; - } - } - if (cache_status->isLoaded()) + auto partition_write_lock = partition_info_ptr->writeLock(); + /// Double check + if (!cache_status->isLoading()) + { + cache_status->setToLoading(current_loading_task_id); + cache_status_guard.addPartition(partition_info_ptr); + need_load_parts = true; + } + } + + /// Now cache status must be LOADING; + /// need to load parts from metastore + if (need_load_parts) + { + Vec fetched; + try + { + std::map> partition_to_parts; + fetched = load_func({partition_id}, {partition_id}); + + /// It happens that new parts have been inserted into cache during loading parts from bytekv, we need merge them to make + /// sure the cache contains all parts of the partition. + auto partition_write_lock = partition_info_ptr->writeLock(); + if (cache_status->isLoadingByCurrentThread(current_loading_task_id)) { auto cached = cache_ptr->get({uuid, partition_id}); if (!cached) { - throw Exception("Cannot get already loaded parts from cache. Its a logic error.", ErrorCodes::LOGICAL_ERROR); - } - for (auto it = cached->begin(); it != cached->end(); ++it) - { - const auto & part_wrapper_ptr = *it; - /// Only filter the parts when both commit_time and txnid are smaller or equal to ts (txnid is helpful for intermediate parts). - if (this->isVisible(part_wrapper_ptr, ts)) + /// directly insert all fetched parts into cache + cached = std::make_shared(); + for (auto & data_wrapper_ptr : fetched) { - Adapter adapter(storage, part_wrapper_ptr); - parts.push_back(adapter.toData()); - //logPartsVector(storage, res); + Adapter adapter(storage, data_wrapper_ptr); + cached->update(adapter.getName(), data_wrapper_ptr); } + cache_ptr->insert({uuid, partition_id}, cached); } - return; + else + { + for (auto & data_wrapper_ptr : fetched) + { + Adapter adapter(storage, data_wrapper_ptr); + auto it = cached->find(adapter.getName()); + Adapter it_adapter(storage, *it); + /// do not update cache if the cached data is newer than bytekv. + if (it == cached->end() || it_adapter.getCommitTime() < adapter.getCommitTime()) + { + cached->update(adapter.getName(), data_wrapper_ptr); + } + } + /// Force LRU cache update status(weight/evict). + cache_ptr->insert({uuid, partition_id}, cached); + } + + cache_status->setToLoaded(); } - // if cache status does not change to loaded, get parts of current partition again. + + /// Release partition lock before construct ServerDataPart + partition_write_lock.reset(); + + /// Finish fetching parts, notify other waiting tasks if any. + meta_ptr->fetch_cv.notify_all(); + + for (auto & data_wrapper_ptr : fetched) + { + /// Only filter the parts when both commit_time and txnid are smaller or equal to ts (txnid is helpful for intermediate parts). + if (this->isVisible(data_wrapper_ptr, ts)) + { + Adapter adapter(storage, data_wrapper_ptr); + parts.push_back(adapter.toData()); + //logPartsVector(storage, res); + } + } + + /// go to next partition; + return; + } + catch (...) + { + throw; } } - }; - - RetValueVec res; - - size_t max_threads = getMaxThreads(); - if (meta_partitions.size() < 2 || max_threads < 2) - { - for (auto & [partition_id, partition_info_ptr] : meta_partitions) + else /// other task is fetching parts now, just wait for the result { - process_partition(partition_id, partition_info_ptr, res); + { + std::unique_lock lock(meta_ptr->fetch_mutex); + if (!meta_ptr->fetch_cv.wait_for( + lock, std::chrono::milliseconds(5000), [&cache_status]() { return cache_status->isLoaded(); })) + { + LOG_TRACE( + getLogger("PartCacheManager"), + "Wait timeout 5000ms for other thread loading table: {}, partition: {}", + storage.getStorageID().getNameForLogs(), + partition_id); + continue; + } + } + + if (cache_status->isLoaded()) + { + auto cached = cache_ptr->get({uuid, partition_id}); + if (!cached) + { + throw Exception("Cannot get already loaded parts from cache. Its a logic error.", ErrorCodes::LOGICAL_ERROR); + } + for (auto it = cached->begin(); it != cached->end(); ++it) + { + const auto & part_wrapper_ptr = *it; + /// Only filter the parts when both commit_time and txnid are smaller or equal to ts (txnid is helpful for intermediate parts). + if (this->isVisible(part_wrapper_ptr, ts)) + { + Adapter adapter(storage, part_wrapper_ptr); + parts.push_back(adapter.toData()); + //logPartsVector(storage, res); + } + } + return; + } + // if cache status does not change to loaded, get parts of current partition again. } } - else + }; + + RetValueVec res; + + size_t max_threads = getMaxThreads(); + if (meta_partitions.size() < 2 || max_threads < 2) + { + for (auto & [partition_id, partition_info_ptr] : meta_partitions) { - max_threads = std::min(max_threads, meta_partitions.size()); - ExceptionHandler exception_handler; - ThreadPool thread_pool(max_threads); - std::map partition_parts; - for (auto & [partition_id, partition_info_ptr] : meta_partitions) - { - partition_parts[partition_id] = RetValueVec(); - } - for (auto & [partition_id_, partition_info_ptr_] : meta_partitions) - { - thread_pool.scheduleOrThrowOnError(createExceptionHandledJob( - [&, partition_id = partition_id_, partition_info_ptr = partition_info_ptr_]() { - process_partition(partition_id, partition_info_ptr, partition_parts[partition_id]); - }, - exception_handler)); - } - - LOG_DEBUG( - getLogger("PartCacheManager"), - "Waiting for loading parts for table {} use {} threads.", - storage.getStorageID().getNameForLogs(), - max_threads); - thread_pool.wait(); - exception_handler.throwIfException(); - - size_t total_parts_number = 0; - for (auto & [partition_id, parts] : partition_parts) - total_parts_number += parts.size(); - res.reserve(total_parts_number); - for (auto & [partition_id, parts] : partition_parts) - { - res.insert(res.end(), std::make_move_iterator(parts.begin()), std::make_move_iterator(parts.end())); - } + process_partition(partition_id, partition_info_ptr, res); } - - return res; } + else + { + max_threads = std::min(max_threads, meta_partitions.size()); + ExceptionHandler exception_handler; + std::map partition_parts; + ThreadPool thread_pool(max_threads); + for (auto & [partition_id, partition_info_ptr] : meta_partitions) + { + partition_parts[partition_id] = RetValueVec(); + } + for (auto & [partition_id_, partition_info_ptr_] : meta_partitions) + { + thread_pool.scheduleOrThrowOnError(createExceptionHandledJob( + [&, partition_id = partition_id_, partition_info_ptr = partition_info_ptr_]() { + process_partition(partition_id, partition_info_ptr, partition_parts[partition_id]); + }, + exception_handler)); + } + + LOG_DEBUG( + getLogger("PartCacheManager"), + "Waiting for loading parts for table {} use {} threads.", + storage.getStorageID().getNameForLogs(), + max_threads); + thread_pool.wait(); + exception_handler.throwIfException(); + + size_t total_parts_number = 0; + for (auto & [partition_id, parts] : partition_parts) + total_parts_number += parts.size(); + res.reserve(total_parts_number); + for (auto & [partition_id, parts] : partition_parts) + { + res.insert(res.end(), std::make_move_iterator(parts.begin()), std::make_move_iterator(parts.end())); + } + } + + return res; +} void PartCacheManager::checkTimeLimit(Stopwatch & watch) { diff --git a/src/Storages/tests/gtest_part_storage_cache.cpp b/src/Storages/tests/gtest_part_storage_cache.cpp index ce0ef2da2d..df8fbd54de 100644 --- a/src/Storages/tests/gtest_part_storage_cache.cpp +++ b/src/Storages/tests/gtest_part_storage_cache.cpp @@ -13,6 +13,11 @@ using namespace std::chrono; using namespace DB; using namespace testing; +namespace DB::ErrorCodes +{ + extern const int TIMEOUT_EXCEEDED; +} + namespace CacheTestMock { @@ -387,6 +392,75 @@ TEST_F(CacheManagerTest, GetPartsFromCache) cache_manager->shutDown(); } +TEST_F(CacheManagerTest, GetPartsFromCacheWithMultiThreads) +{ + auto context = getContext().context; + auto settings = context->getSettingsRef(); + settings.catalog_enable_multiple_threads = true; + context->setSettings(settings); + + std::shared_ptr cache_manager = std::make_shared(context, 0, true); + auto topology_version = PairInt64{1, 1}; + + // mock storage + String query = "create table gztest.test (id Int32) ENGINE=CnchMergeTree partition by id order by tuple()"; + StoragePtr storage = CacheTestMock::createTable(query, context); + String storage_uuid = UUIDHelpers::UUIDToString(storage->getStorageUUID()); + + // add table entry in cache manager and mock load partitions + cache_manager->mayUpdateTableMeta(*storage, topology_version); + auto entry = cache_manager->getTableMeta(storage->getStorageUUID()); + + // initialize partition infos + Strings partitions; + for (size_t i = 1000; i<1005; i++) + { + partitions.push_back(toString(i)); + entry->partitions.emplace(partitions.back(), std::make_shared(storage_uuid, nullptr, partitions.back(), RWLockImpl::create())); + } + + auto mock_timeout_loading = [&](const Strings &, const Strings &) -> DataModelPartWrapperVector { + throw Exception("Loading data reached timeout.", ErrorCodes::TIMEOUT_EXCEEDED); + }; + + EXPECT_EQ(entry->load_parts_by_partition, false); + // get data parts from cache with multiple threads. Load timeout exception will be thrown and after which the following loading will + // load by partition + try + { + cache_manager->getOrSetServerDataPartsInPartitions(*storage, partitions, mock_timeout_loading, TxnTimestamp::maxTS(), topology_version); + } + catch(Exception &e) + { + EXPECT_EQ(e.code(), ErrorCodes::TIMEOUT_EXCEEDED); + } + + // Fall back to load parts by partition + EXPECT_EQ(entry->load_parts_by_partition, true); + + auto mock_load = [&](const Strings & partition_not_cached, const Strings &) -> DataModelPartWrapperVector { + DataModelPartWrapperVector res; + for (const auto & partition_id : partition_not_cached) + { + auto loaded = CacheTestMock::createPartsBatch(partition_id, 10, storage); + res.insert(res.end(), loaded.begin(), loaded.end()); + } + return res; + }; + + ServerDataPartsVector parts_from_cache = cache_manager->getOrSetServerDataPartsInPartitions(*storage, partitions, mock_load, TxnTimestamp::maxTS(), topology_version); + EXPECT_EQ(parts_from_cache.size(), 50); + + // assert partition cache status Loaded + for (const auto & partition_id : partitions) + { + auto partition_info = entry->getPartitionInfo(partition_id); + EXPECT_EQ(partition_info->part_cache_status.isLoaded(), true); + } + + cache_manager->shutDown(); +} + /// Test the functionality of `updateTableNameInMetaEntry`. TEST_F(CacheManagerTest, RenameTable) {