Merge branch 'cnch_dev_part_cache_ut' into 'cnch-dev'

fix(clickhousech@m-5370514631): add more ut for PartCacheManager

See merge request dp/ClickHouse!25682
This commit is contained in:
关哲 2024-10-18 07:39:01 +00:00 committed by Fred Wang
parent 387e338f9e
commit af3173ffd9
4 changed files with 290 additions and 207 deletions

View File

@ -1945,7 +1945,7 @@ namespace Catalog
ServerDataPartsVector tmp_res;
const auto & merge_tree_storage = dynamic_cast<const MergeTreeMetaBase &>(*storage);
Strings all_partitions = getPartitionIDsFromMetastore(storage);
auto parts_model = getDataPartsMetaFromMetastore(storage, partitions, all_partitions, ts, /*from_trash=*/ false);
auto parts_model = getDataPartsMetaFromMetastore(storage, partitions, all_partitions, ts, session_context, /*from_trash=*/ false);
for (auto & ele : parts_model)
{
auto part_model_wrapper = createPartWrapperFromModel(merge_tree_storage, std::move(*(ele->model)), std::move(ele->name));
@ -1989,7 +1989,7 @@ namespace Catalog
partitions,
[&](const Strings & required_partitions, const Strings & full_partitions) -> DataModelPartWrapperVector {
miss_cache = true;
DataModelPartWithNameVector fetched = getDataPartsMetaFromMetastore(storage, required_partitions, full_partitions, TxnTimestamp{0}, /*from_trash=*/ false);
DataModelPartWithNameVector fetched = getDataPartsMetaFromMetastore(storage, required_partitions, full_partitions, TxnTimestamp{0}, session_context, /*from_trash=*/ false);
DataModelPartWrapperVector ret;
ret.reserve(fetched.size());
@ -2285,7 +2285,7 @@ namespace Catalog
Stopwatch watch;
Strings all_partitions = getPartitionIDsFromMetastore(storage);
auto parts_model = getDataPartsMetaFromMetastore(storage, partitions, all_partitions, ts, /*from_trash=*/ true);
auto parts_model = getDataPartsMetaFromMetastore(storage, partitions, all_partitions, ts, &context, /*from_trash=*/ true);
for (auto & ele : parts_model)
{
auto part_model_wrapper = createPartWrapperFromModel(*merge_tree, std::move(*(ele->model)), std::move(ele->name));
@ -6230,7 +6230,12 @@ namespace Catalog
}
DataModelPartWithNameVector Catalog::getDataPartsMetaFromMetastore(
const ConstStoragePtr & storage, const Strings & required_partitions, const Strings & full_partitions, const TxnTimestamp & ts, bool from_trash)
const ConstStoragePtr & storage,
const Strings & required_partitions,
const Strings & full_partitions,
const TxnTimestamp & ts,
const Context * session_context,
bool from_trash)
{
String uuid = UUIDHelpers::UUIDToString(storage->getStorageUUID());
String part_meta_prefix = MetastoreProxy::dataPartPrefix(name_space, uuid);
@ -6250,7 +6255,11 @@ namespace Catalog
};
UInt32 time_out_ms = 1000 * (context.getSettingsRef().cnch_fetch_parts_timeout.totalSeconds());
UInt32 time_out_ms;
if (session_context)
time_out_ms = 1000 * (session_context->getSettingsRef().cnch_fetch_parts_timeout.totalSeconds());
else
time_out_ms = 1000 * context.getSettingsRef().cnch_fetch_parts_timeout.totalSeconds();
String meta_prefix = from_trash
? MetastoreProxy::trashItemsPrefix(name_space, uuid) + PART_STORE_PREFIX

View File

@ -993,7 +993,7 @@ private:
StoragePtr createTableFromDataModel(const Context & session_context, const Protos::DataModelTable & data_model);
void detachOrAttachTable(const String & db, const String & name, const TxnTimestamp & ts, bool is_detach);
DataModelPartWithNameVector getDataPartsMetaFromMetastore(
const ConstStoragePtr & storage, const Strings & required_partitions, const Strings & full_partitions, const TxnTimestamp & ts, bool from_trash = false);
const ConstStoragePtr & storage, const Strings & required_partitions, const Strings & full_partitions, const TxnTimestamp & ts, const Context * session_context, bool from_trash = false);
DeleteBitmapMetaPtrVector getDeleteBitmapsInPartitionsImpl(
const ConstStoragePtr & storage, const Strings & partitions, const TxnTimestamp & ts, bool from_trash = false, VisibilityLevel visibility = VisibilityLevel::Visible);
DataModelDeleteBitmapPtrVector getDeleteBitmapsInPartitionsImpl(

View File

@ -1249,10 +1249,10 @@ RetValueVec PartCacheManager::getDataInternal(
}
else
{
std::map<String, RetValueVec> partition_parts;
max_threads = std::min(max_threads, meta_partitions.size());
ExceptionHandler exception_handler;
ThreadPool thread_pool(max_threads);
std::map<String, RetValueVec> partition_parts;
for (auto & [partition_id, partition_info_ptr] : meta_partitions)
{
partition_parts[partition_id] = RetValueVec();
@ -1437,136 +1437,57 @@ template <
typename Adapter,
typename LoadFunc,
typename RetValueVec>
RetValueVec PartCacheManager::getDataByPartition(
const MergeTreeMetaBase & storage,
const TableMetaEntryPtr & meta_ptr,
const Strings & partitions,
LoadFunc & load_func,
const UInt64 & ts)
RetValueVec PartCacheManager::getDataByPartition(
const MergeTreeMetaBase & storage,
const TableMetaEntryPtr & meta_ptr,
const Strings & partitions,
LoadFunc & load_func,
const UInt64 & ts)
{
String type;
CachePtr cache_ptr = nullptr;
if constexpr (std::is_same_v<CachePtr, CnchDataPartCachePtr>)
{
String type;
CachePtr cache_ptr = nullptr;
type = "parts";
cache_ptr = part_cache_ptr;
}
else if constexpr (std::is_same_v<CachePtr, CnchDeleteBitmapCachePtr>)
{
type = "delete bitmaps";
cache_ptr = delete_bitmap_cache_ptr;
}
else
{
static_assert(DependentFalse<CachePtr>::value, "invalid template type for CachePtr");
}
if constexpr (std::is_same_v<CachePtr, CnchDataPartCachePtr>)
LOG_DEBUG(getLogger("PartCacheManager"), "Get {} by partitions for table : {}", type, storage.getLogName());
Stopwatch watch;
UUID uuid = storage.getStorageUUID();
const UInt64 current_loading_task_id = getLoadTaskID();
auto meta_partitions = meta_ptr->getPartitions(partitions);
auto process_partition = [&](const String & partition_id, const PartitionInfoPtr & partition_info_ptr, RetValueVec & parts) {
DataCacheStatus * cache_status = getCacheStatus<CachePtr>(partition_info_ptr);
while (true)
{
type = "parts";
cache_ptr = part_cache_ptr;
}
else if constexpr (std::is_same_v<CachePtr, CnchDeleteBitmapCachePtr>)
{
type = "delete bitmaps";
cache_ptr = delete_bitmap_cache_ptr;
}
else
{
static_assert(DependentFalse<CachePtr>::value, "invalid template type for CachePtr");
}
/// stop if fetch part time exceeds the query max execution time.
checkTimeLimit(watch);
LOG_DEBUG(getLogger("PartCacheManager"), "Get {} by partitions for table : {}", type, storage.getLogName());
Stopwatch watch;
UUID uuid = storage.getStorageUUID();
const UInt64 current_loading_task_id = getLoadTaskID();
auto meta_partitions = meta_ptr->getPartitions(partitions);
auto process_partition = [&](const String & partition_id, const PartitionInfoPtr & partition_info_ptr, RetValueVec & parts) {
DataCacheStatus * cache_status = getCacheStatus<CachePtr>(partition_info_ptr);
while (true)
CacheStatusGuard<CachePtr> cache_status_guard(uuid, current_loading_task_id, cache_ptr);
bool need_load_parts = false;
{
/// stop if fetch part time exceeds the query max execution time.
checkTimeLimit(watch);
CacheStatusGuard<CachePtr> cache_status_guard(uuid, current_loading_task_id, cache_ptr);
bool need_load_parts = false;
if (cache_status->isLoaded())
{
if (cache_status->isLoaded())
auto cached = cache_ptr->get({uuid, partition_id});
if (cached)
{
auto cached = cache_ptr->get({uuid, partition_id});
if (cached)
for (auto it = cached->begin(); it != cached->end(); ++it)
{
for (auto it = cached->begin(); it != cached->end(); ++it)
{
const auto & data_wrapper_ptr = *it;
if (this->isVisible(data_wrapper_ptr, ts))
{
Adapter adapter(storage, data_wrapper_ptr);
parts.push_back(adapter.toData());
//logPartsVector(storage, res);
}
}
/// already get parts from cache, continue to next partition
return;
}
}
auto partition_write_lock = partition_info_ptr->writeLock();
/// Double check
if (!cache_status->isLoading())
{
cache_status->setToLoading(current_loading_task_id);
cache_status_guard.addPartition(partition_info_ptr);
need_load_parts = true;
}
}
/// Now cache status must be LOADING;
/// need to load parts from metastore
if (need_load_parts)
{
Vec<FetchedValue> fetched;
try
{
std::map<String, Vec<FetchedValue>> partition_to_parts;
fetched = load_func({partition_id}, {partition_id});
/// It happens that new parts have been inserted into cache during loading parts from bytekv, we need merge them to make
/// sure the cache contains all parts of the partition.
auto partition_write_lock = partition_info_ptr->writeLock();
if (cache_status->isLoadingByCurrentThread(current_loading_task_id))
{
auto cached = cache_ptr->get({uuid, partition_id});
if (!cached)
{
/// directly insert all fetched parts into cache
cached = std::make_shared<CacheValueMap>();
for (auto & data_wrapper_ptr : fetched)
{
Adapter adapter(storage, data_wrapper_ptr);
cached->update(adapter.getName(), data_wrapper_ptr);
}
cache_ptr->insert({uuid, partition_id}, cached);
}
else
{
for (auto & data_wrapper_ptr : fetched)
{
Adapter adapter(storage, data_wrapper_ptr);
auto it = cached->find(adapter.getName());
Adapter it_adapter(storage, *it);
/// do not update cache if the cached data is newer than bytekv.
if (it == cached->end() || it_adapter.getCommitTime() < adapter.getCommitTime())
{
cached->update(adapter.getName(), data_wrapper_ptr);
}
}
/// Force LRU cache update status(weight/evict).
cache_ptr->insert({uuid, partition_id}, cached);
}
cache_status->setToLoaded();
}
/// Release partition lock before construct ServerDataPart
partition_write_lock.reset();
/// Finish fetching parts, notify other waiting tasks if any.
meta_ptr->fetch_cv.notify_all();
for (auto & data_wrapper_ptr : fetched)
{
/// Only filter the parts when both commit_time and txnid are smaller or equal to ts (txnid is helpful for intermediate parts).
const auto & data_wrapper_ptr = *it;
if (this->isVisible(data_wrapper_ptr, ts))
{
Adapter adapter(storage, data_wrapper_ptr);
@ -1575,104 +1496,183 @@ template <
}
}
/// go to next partition;
/// already get parts from cache, continue to next partition
return;
}
catch (...)
{
throw;
}
}
else /// other task is fetching parts now, just wait for the result
{
{
std::unique_lock<std::mutex> lock(meta_ptr->fetch_mutex);
if (!meta_ptr->fetch_cv.wait_for(
lock, std::chrono::milliseconds(5000), [&cache_status]() { return cache_status->isLoaded(); }))
{
LOG_TRACE(
getLogger("PartCacheManager"),
"Wait timeout 5000ms for other thread loading table: {}, partition: {}",
storage.getStorageID().getNameForLogs(),
partition_id);
continue;
}
}
if (cache_status->isLoaded())
auto partition_write_lock = partition_info_ptr->writeLock();
/// Double check
if (!cache_status->isLoading())
{
cache_status->setToLoading(current_loading_task_id);
cache_status_guard.addPartition(partition_info_ptr);
need_load_parts = true;
}
}
/// Now cache status must be LOADING;
/// need to load parts from metastore
if (need_load_parts)
{
Vec<FetchedValue> fetched;
try
{
std::map<String, Vec<FetchedValue>> partition_to_parts;
fetched = load_func({partition_id}, {partition_id});
/// It happens that new parts have been inserted into cache during loading parts from bytekv, we need merge them to make
/// sure the cache contains all parts of the partition.
auto partition_write_lock = partition_info_ptr->writeLock();
if (cache_status->isLoadingByCurrentThread(current_loading_task_id))
{
auto cached = cache_ptr->get({uuid, partition_id});
if (!cached)
{
throw Exception("Cannot get already loaded parts from cache. Its a logic error.", ErrorCodes::LOGICAL_ERROR);
}
for (auto it = cached->begin(); it != cached->end(); ++it)
{
const auto & part_wrapper_ptr = *it;
/// Only filter the parts when both commit_time and txnid are smaller or equal to ts (txnid is helpful for intermediate parts).
if (this->isVisible(part_wrapper_ptr, ts))
/// directly insert all fetched parts into cache
cached = std::make_shared<CacheValueMap>();
for (auto & data_wrapper_ptr : fetched)
{
Adapter adapter(storage, part_wrapper_ptr);
parts.push_back(adapter.toData());
//logPartsVector(storage, res);
Adapter adapter(storage, data_wrapper_ptr);
cached->update(adapter.getName(), data_wrapper_ptr);
}
cache_ptr->insert({uuid, partition_id}, cached);
}
return;
else
{
for (auto & data_wrapper_ptr : fetched)
{
Adapter adapter(storage, data_wrapper_ptr);
auto it = cached->find(adapter.getName());
Adapter it_adapter(storage, *it);
/// do not update cache if the cached data is newer than bytekv.
if (it == cached->end() || it_adapter.getCommitTime() < adapter.getCommitTime())
{
cached->update(adapter.getName(), data_wrapper_ptr);
}
}
/// Force LRU cache update status(weight/evict).
cache_ptr->insert({uuid, partition_id}, cached);
}
cache_status->setToLoaded();
}
// if cache status does not change to loaded, get parts of current partition again.
/// Release partition lock before construct ServerDataPart
partition_write_lock.reset();
/// Finish fetching parts, notify other waiting tasks if any.
meta_ptr->fetch_cv.notify_all();
for (auto & data_wrapper_ptr : fetched)
{
/// Only filter the parts when both commit_time and txnid are smaller or equal to ts (txnid is helpful for intermediate parts).
if (this->isVisible(data_wrapper_ptr, ts))
{
Adapter adapter(storage, data_wrapper_ptr);
parts.push_back(adapter.toData());
//logPartsVector(storage, res);
}
}
/// go to next partition;
return;
}
catch (...)
{
throw;
}
}
};
RetValueVec res;
size_t max_threads = getMaxThreads();
if (meta_partitions.size() < 2 || max_threads < 2)
{
for (auto & [partition_id, partition_info_ptr] : meta_partitions)
else /// other task is fetching parts now, just wait for the result
{
process_partition(partition_id, partition_info_ptr, res);
{
std::unique_lock<std::mutex> lock(meta_ptr->fetch_mutex);
if (!meta_ptr->fetch_cv.wait_for(
lock, std::chrono::milliseconds(5000), [&cache_status]() { return cache_status->isLoaded(); }))
{
LOG_TRACE(
getLogger("PartCacheManager"),
"Wait timeout 5000ms for other thread loading table: {}, partition: {}",
storage.getStorageID().getNameForLogs(),
partition_id);
continue;
}
}
if (cache_status->isLoaded())
{
auto cached = cache_ptr->get({uuid, partition_id});
if (!cached)
{
throw Exception("Cannot get already loaded parts from cache. Its a logic error.", ErrorCodes::LOGICAL_ERROR);
}
for (auto it = cached->begin(); it != cached->end(); ++it)
{
const auto & part_wrapper_ptr = *it;
/// Only filter the parts when both commit_time and txnid are smaller or equal to ts (txnid is helpful for intermediate parts).
if (this->isVisible(part_wrapper_ptr, ts))
{
Adapter adapter(storage, part_wrapper_ptr);
parts.push_back(adapter.toData());
//logPartsVector(storage, res);
}
}
return;
}
// if cache status does not change to loaded, get parts of current partition again.
}
}
else
};
RetValueVec res;
size_t max_threads = getMaxThreads();
if (meta_partitions.size() < 2 || max_threads < 2)
{
for (auto & [partition_id, partition_info_ptr] : meta_partitions)
{
max_threads = std::min(max_threads, meta_partitions.size());
ExceptionHandler exception_handler;
ThreadPool thread_pool(max_threads);
std::map<String, RetValueVec> partition_parts;
for (auto & [partition_id, partition_info_ptr] : meta_partitions)
{
partition_parts[partition_id] = RetValueVec();
}
for (auto & [partition_id_, partition_info_ptr_] : meta_partitions)
{
thread_pool.scheduleOrThrowOnError(createExceptionHandledJob(
[&, partition_id = partition_id_, partition_info_ptr = partition_info_ptr_]() {
process_partition(partition_id, partition_info_ptr, partition_parts[partition_id]);
},
exception_handler));
}
LOG_DEBUG(
getLogger("PartCacheManager"),
"Waiting for loading parts for table {} use {} threads.",
storage.getStorageID().getNameForLogs(),
max_threads);
thread_pool.wait();
exception_handler.throwIfException();
size_t total_parts_number = 0;
for (auto & [partition_id, parts] : partition_parts)
total_parts_number += parts.size();
res.reserve(total_parts_number);
for (auto & [partition_id, parts] : partition_parts)
{
res.insert(res.end(), std::make_move_iterator(parts.begin()), std::make_move_iterator(parts.end()));
}
process_partition(partition_id, partition_info_ptr, res);
}
return res;
}
else
{
max_threads = std::min(max_threads, meta_partitions.size());
ExceptionHandler exception_handler;
std::map<String, RetValueVec> partition_parts;
ThreadPool thread_pool(max_threads);
for (auto & [partition_id, partition_info_ptr] : meta_partitions)
{
partition_parts[partition_id] = RetValueVec();
}
for (auto & [partition_id_, partition_info_ptr_] : meta_partitions)
{
thread_pool.scheduleOrThrowOnError(createExceptionHandledJob(
[&, partition_id = partition_id_, partition_info_ptr = partition_info_ptr_]() {
process_partition(partition_id, partition_info_ptr, partition_parts[partition_id]);
},
exception_handler));
}
LOG_DEBUG(
getLogger("PartCacheManager"),
"Waiting for loading parts for table {} use {} threads.",
storage.getStorageID().getNameForLogs(),
max_threads);
thread_pool.wait();
exception_handler.throwIfException();
size_t total_parts_number = 0;
for (auto & [partition_id, parts] : partition_parts)
total_parts_number += parts.size();
res.reserve(total_parts_number);
for (auto & [partition_id, parts] : partition_parts)
{
res.insert(res.end(), std::make_move_iterator(parts.begin()), std::make_move_iterator(parts.end()));
}
}
return res;
}
void PartCacheManager::checkTimeLimit(Stopwatch & watch)
{

View File

@ -13,6 +13,11 @@ using namespace std::chrono;
using namespace DB;
using namespace testing;
namespace DB::ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
namespace CacheTestMock
{
@ -387,6 +392,75 @@ TEST_F(CacheManagerTest, GetPartsFromCache)
cache_manager->shutDown();
}
TEST_F(CacheManagerTest, GetPartsFromCacheWithMultiThreads)
{
auto context = getContext().context;
auto settings = context->getSettingsRef();
settings.catalog_enable_multiple_threads = true;
context->setSettings(settings);
std::shared_ptr<PartCacheManager> cache_manager = std::make_shared<PartCacheManager>(context, 0, true);
auto topology_version = PairInt64{1, 1};
// mock storage
String query = "create table gztest.test (id Int32) ENGINE=CnchMergeTree partition by id order by tuple()";
StoragePtr storage = CacheTestMock::createTable(query, context);
String storage_uuid = UUIDHelpers::UUIDToString(storage->getStorageUUID());
// add table entry in cache manager and mock load partitions
cache_manager->mayUpdateTableMeta(*storage, topology_version);
auto entry = cache_manager->getTableMeta(storage->getStorageUUID());
// initialize partition infos
Strings partitions;
for (size_t i = 1000; i<1005; i++)
{
partitions.push_back(toString(i));
entry->partitions.emplace(partitions.back(), std::make_shared<CnchPartitionInfo>(storage_uuid, nullptr, partitions.back(), RWLockImpl::create()));
}
auto mock_timeout_loading = [&](const Strings &, const Strings &) -> DataModelPartWrapperVector {
throw Exception("Loading data reached timeout.", ErrorCodes::TIMEOUT_EXCEEDED);
};
EXPECT_EQ(entry->load_parts_by_partition, false);
// get data parts from cache with multiple threads. Load timeout exception will be thrown and after which the following loading will
// load by partition
try
{
cache_manager->getOrSetServerDataPartsInPartitions(*storage, partitions, mock_timeout_loading, TxnTimestamp::maxTS(), topology_version);
}
catch(Exception &e)
{
EXPECT_EQ(e.code(), ErrorCodes::TIMEOUT_EXCEEDED);
}
// Fall back to load parts by partition
EXPECT_EQ(entry->load_parts_by_partition, true);
auto mock_load = [&](const Strings & partition_not_cached, const Strings &) -> DataModelPartWrapperVector {
DataModelPartWrapperVector res;
for (const auto & partition_id : partition_not_cached)
{
auto loaded = CacheTestMock::createPartsBatch(partition_id, 10, storage);
res.insert(res.end(), loaded.begin(), loaded.end());
}
return res;
};
ServerDataPartsVector parts_from_cache = cache_manager->getOrSetServerDataPartsInPartitions(*storage, partitions, mock_load, TxnTimestamp::maxTS(), topology_version);
EXPECT_EQ(parts_from_cache.size(), 50);
// assert partition cache status Loaded
for (const auto & partition_id : partitions)
{
auto partition_info = entry->getPartitionInfo(partition_id);
EXPECT_EQ(partition_info->part_cache_status.isLoaded(), true);
}
cache_manager->shutDown();
}
/// Test the functionality of `updateTableNameInMetaEntry`.
TEST_F(CacheManagerTest, RenameTable)
{