mirror of https://github.com/ByConity/ByConity
Merge branch 'zema/cnch-dev-loadDataPartsInWorkerResource' into 'cnch-dev'
feat(clickhousech@m-4420282579): load data parts when getting table from worker resource See merge request dp/ClickHouse!24914
This commit is contained in:
parent
54d548fc36
commit
df1e1b3968
|
@ -7,6 +7,7 @@ profiles:
|
|||
enable_auto_query_forwarding: true
|
||||
cnch_data_retention_time_in_sec: 10
|
||||
s3_gc_batch_size: 10
|
||||
enable_lazy_load_data_parts: 1
|
||||
point_lookup:
|
||||
max_threads: 1
|
||||
exchange_source_pipeline_threads: 1
|
||||
|
|
|
@ -6,6 +6,7 @@ profiles:
|
|||
exchange_timeout_ms: 300000
|
||||
cnch_data_retention_time_in_sec: 10
|
||||
s3_gc_batch_size: 10
|
||||
enable_lazy_load_data_parts: 1
|
||||
point_lookup:
|
||||
max_threads: 1
|
||||
exchange_source_pipeline_threads: 1
|
||||
|
|
|
@ -395,6 +395,8 @@ brpc::CallId CnchWorkerClient::sendResources(
|
|||
request.set_timeout(recycle_timeout);
|
||||
if (!settings.session_timezone.value.empty())
|
||||
request.set_session_timezone(settings.session_timezone.value);
|
||||
if (settings.enable_lazy_load_data_parts.value)
|
||||
request.set_lazy_load_data_parts(true);
|
||||
|
||||
bool require_worker_info = false;
|
||||
for (const auto & resource: resources_to_send)
|
||||
|
|
|
@ -178,18 +178,25 @@ void CnchWorkerResource::executeCacheableCreateQuery(
|
|||
insertCloudTable({res_table_id.getDatabaseName(), res_table_id.getTableName()}, res, context, /*throw_if_exists=*/ false);
|
||||
}
|
||||
|
||||
StoragePtr CnchWorkerResource::getTable(const StorageID & table_id) const
|
||||
StoragePtr CnchWorkerResource::tryGetTable(const StorageID & table_id, bool load_data_parts) const
|
||||
{
|
||||
String tenant_db = formatTenantDatabaseName(table_id.getDatabaseName());
|
||||
auto lock = getLock();
|
||||
StoragePtr res = {};
|
||||
|
||||
auto it = cloud_tables.find({tenant_db, table_id.getTableName()});
|
||||
if (it != cloud_tables.end())
|
||||
{
|
||||
return it->second;
|
||||
auto lock = getLock();
|
||||
auto it = cloud_tables.find({tenant_db, table_id.getTableName()});
|
||||
if (it != cloud_tables.end())
|
||||
res = it->second;
|
||||
}
|
||||
|
||||
return {};
|
||||
if (load_data_parts)
|
||||
{
|
||||
if (auto cloud_table = dynamic_pointer_cast<StorageCloudMergeTree>(res))
|
||||
cloud_table->prepareDataPartsForRead();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
DatabasePtr CnchWorkerResource::getDatabase(const String & database_name) const
|
||||
|
|
|
@ -46,7 +46,7 @@ public:
|
|||
const String & underlying_dictionary_tables,
|
||||
const ColumnsDescription & object_columns);
|
||||
|
||||
StoragePtr getTable(const StorageID & table_id) const;
|
||||
StoragePtr tryGetTable(const StorageID & table_id, bool load_data_parts = true) const;
|
||||
DatabasePtr getDatabase(const String & database_name) const;
|
||||
bool isCnchTableInWorker(const StorageID & table_id) const;
|
||||
|
||||
|
|
|
@ -100,6 +100,7 @@ namespace ErrorCodes
|
|||
extern const int PREALLOCATE_QUERY_INTENT_NOT_FOUND;
|
||||
extern const int SESSION_NOT_FOUND;
|
||||
extern const int ABORTED;
|
||||
extern const int WORKER_TABLE_NOT_FOUND;
|
||||
}
|
||||
|
||||
CnchWorkerServiceImpl::CnchWorkerServiceImpl(ContextMutablePtr context_)
|
||||
|
@ -760,9 +761,19 @@ void CnchWorkerServiceImpl::sendResources(
|
|||
ProfileEvents::increment(ProfileEvents::QueryCreateTablesMicroseconds, create_timer.elapsedMicroseconds());
|
||||
}
|
||||
|
||||
bool lazy_load_parts = request->has_lazy_load_data_parts() && request->lazy_load_data_parts();
|
||||
for (const auto & data : request->data_parts())
|
||||
{
|
||||
auto storage = DatabaseCatalog::instance().getTable({data.database(), data.table()}, query_context);
|
||||
/// By default, calling getTable (from WorkerResource) will trigger loading data parts.
|
||||
/// Here is the first time and happens before parts are ready. So don't trigger load data parts here.
|
||||
StorageID storage_id = {data.database(), data.table()};
|
||||
auto storage = worker_resource->tryGetTable(storage_id, /*load_data_parts*/false);
|
||||
if (!storage)
|
||||
throw Exception(ErrorCodes::WORKER_TABLE_NOT_FOUND, "Table {} not found in worker resource, it's a bug.", storage_id.getNameForLogs());
|
||||
|
||||
bool is_dict_table = false;
|
||||
if (lazy_load_parts)
|
||||
is_dict_table = !!dynamic_cast<StorageDictCloudMergeTree *>(storage.get());
|
||||
|
||||
if (auto * cloud_merge_tree = dynamic_cast<StorageCloudMergeTree *>(storage.get()))
|
||||
{
|
||||
|
@ -797,24 +808,13 @@ void CnchWorkerServiceImpl::sendResources(
|
|||
}
|
||||
}
|
||||
|
||||
/// `loadDataParts` is an expensive action as it may involve remote read.
|
||||
/// The worker rpc thread pool may be blocked when there are many `sendResources` requests.
|
||||
/// Here we just pass the server_parts to storage. And it will do `loadDataParts` later (before reading).
|
||||
/// One exception is StorageDictCloudMergeTree as it use a different read logic rather than StorageCloudMergeTree::read.
|
||||
bool is_dict = false;
|
||||
if (auto * cloud_dict = dynamic_cast<StorageDictCloudMergeTree *>(storage.get()))
|
||||
{
|
||||
cloud_dict->loadDataParts(server_parts);
|
||||
is_dict = true;
|
||||
}
|
||||
else
|
||||
cloud_merge_tree->receiveDataParts(std::move(server_parts));
|
||||
cloud_merge_tree->receiveDataParts(std::move(server_parts));
|
||||
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Received {} parts for table {}(txn_id: {}), disk_cache_mode {}, is_dict: {}",
|
||||
"Received {} parts for table {}(txn_id: {}), disk_cache_mode {}, is_dict: {}, lazy_load_parts: {}",
|
||||
server_parts_size, cloud_merge_tree->getStorageID().getNameForLogs(),
|
||||
request->txn_id(), request->disk_cache_mode(), is_dict);
|
||||
request->txn_id(), request->disk_cache_mode(), is_dict_table, lazy_load_parts);
|
||||
}
|
||||
|
||||
if (!data.virtual_parts().empty())
|
||||
|
@ -839,20 +839,13 @@ void CnchWorkerServiceImpl::sendResources(
|
|||
}
|
||||
}
|
||||
|
||||
bool is_dict = false;
|
||||
if (auto * cloud_dict = dynamic_cast<StorageDictCloudMergeTree *>(storage.get()))
|
||||
{
|
||||
cloud_dict->loadDataParts(virtual_parts);
|
||||
is_dict = true;
|
||||
}
|
||||
else
|
||||
cloud_merge_tree->receiveVirtualDataParts(std::move(virtual_parts));
|
||||
cloud_merge_tree->receiveVirtualDataParts(std::move(virtual_parts));
|
||||
|
||||
LOG_DEBUG(
|
||||
log,
|
||||
"Received {} virtual parts for table {}(txn_id: {}), disk_cache_mode {}, is_dict: {}",
|
||||
"Received {} virtual parts for table {}(txn_id: {}), disk_cache_mode {}, is_dict: {}, lazy_load_parts: {}",
|
||||
virtual_parts_size, cloud_merge_tree->getStorageID().getNameForLogs(),
|
||||
request->txn_id(), request->disk_cache_mode(), is_dict);
|
||||
request->txn_id(), request->disk_cache_mode(), is_dict_table, lazy_load_parts);
|
||||
}
|
||||
|
||||
std::set<Int64> required_bucket_numbers;
|
||||
|
@ -866,6 +859,15 @@ void CnchWorkerServiceImpl::sendResources(
|
|||
auto mutation_entry = CnchMergeTreeMutationEntry::parse(mutation_str);
|
||||
cloud_merge_tree->addMutationEntry(mutation_entry);
|
||||
}
|
||||
|
||||
/// prepareDataPartsForRead/loadDataParts is an expensive action as it may involve remote read.
|
||||
/// The worker rpc thread pool may be blocked when there are many `sendResources` requests.
|
||||
/// lazy_load_parts means the storage just receives server_parts in rpc. And it will call `prepareDataPartsForRead` later (before reading).
|
||||
/// One exception is StorageDictCloudMergeTree as it use a different read logic rather than StorageCloudMergeTree::read.
|
||||
if (!lazy_load_parts || is_dict_table)
|
||||
{
|
||||
cloud_merge_tree->prepareDataPartsForRead();
|
||||
}
|
||||
}
|
||||
else if (auto * hive_table = dynamic_cast<StorageCloudHive *>(storage.get()))
|
||||
{
|
||||
|
|
|
@ -855,6 +855,7 @@
|
|||
M(5046, DISK_CACHE_NOT_USED) \
|
||||
M(5047, WORKER_RESTARTED) \
|
||||
M(5048, WORKER_NODE_NOT_FOUND) \
|
||||
M(5049, WORKER_TABLE_NOT_FOUND) \
|
||||
\
|
||||
M(5453, HDFS_FILE_SYSTEM_UNREGISTER) \
|
||||
M(5454, BAD_HDFS_META_FILE) \
|
||||
|
|
|
@ -1252,6 +1252,7 @@ enum PreloadLevelSettings : UInt64
|
|||
M(Bool, force_grouping_standard_compatibility, true, "Make GROUPING function to return 1 when argument is not used as an aggregation key", 0) \
|
||||
M(Bool, disable_optimize_final, true, "Disable optimize final command", 0) \
|
||||
M(Milliseconds, brpc_data_parts_timeout_ms, 30000, "Timeout for transmitting data parts in brpc", 0) \
|
||||
M(Bool, enable_lazy_load_data_parts, false, "Trigger loadDataParts when worker actual use the cloud table, but not when worker receive resources. This significantly reduce the cost on worker brpc threads.", 0) \
|
||||
M(UInt64, scan_all_table_threshold, 20, "The upper limit to avoid scan all tables in some system tables, like tables and cnch_tables.", 0) \
|
||||
M(Seconds, cnch_txn_lock_expire_duration_seconds, 30, "Transaction lock expire duration.", 0) \
|
||||
M(Seconds, cnch_lock_manager_txn_checker_schedule_seconds, 30, "LockManager txn checker schedule seconds.", 0) \
|
||||
|
|
|
@ -4743,7 +4743,7 @@ StorageID Context::resolveStorageID(StorageID storage_id, StorageNamespace where
|
|||
{
|
||||
if (auto worker_resource = tryGetCnchWorkerResource())
|
||||
{
|
||||
if (auto storage = worker_resource->getTable(storage_id))
|
||||
if (auto storage = worker_resource->tryGetTable(storage_id))
|
||||
return storage->getStorageID();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -296,7 +296,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
|
|||
{
|
||||
if (auto worker_resource = context_->tryGetCnchWorkerResource())
|
||||
{
|
||||
if (auto table = worker_resource->getTable(table_id))
|
||||
if (auto table = worker_resource->tryGetTable(table_id))
|
||||
{
|
||||
LOG_INFO(log, "got table {} from worker resource", table_id.getNameForLogs());
|
||||
return {nullptr, table};
|
||||
|
|
|
@ -204,7 +204,7 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
|
|||
|
||||
mutable_context->setCurrentTransaction(worker_txn);
|
||||
}
|
||||
return mutable_context->getCnchWorkerResource()->getTable(query.table_id);
|
||||
return mutable_context->getCnchWorkerResource()->tryGetTable(query.table_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -235,7 +235,7 @@ StoragePtr InterpreterInsertQuery::tryGetTableInWorkerResource(const StorageID &
|
|||
/// can only be found in worker_resource
|
||||
auto try_get_table_from_worker_resource = [&table_id](const auto & context) -> StoragePtr {
|
||||
if (auto worker_resource = context->tryGetCnchWorkerResource(); worker_resource)
|
||||
return worker_resource->getTable(table_id);
|
||||
return worker_resource->tryGetTable(table_id);
|
||||
else
|
||||
return nullptr;
|
||||
};
|
||||
|
|
|
@ -533,6 +533,7 @@ message SendResourcesReq
|
|||
// can coexist with `create_queries'
|
||||
repeated CacheableTableDefinition cacheable_create_queries = 10;
|
||||
optional string session_timezone = 11;
|
||||
optional bool lazy_load_data_parts = 12;
|
||||
}
|
||||
|
||||
message SendResourcesResp
|
||||
|
|
|
@ -79,7 +79,7 @@ IngestColumnBlockInputStream::IngestColumnBlockInputStream(
|
|||
context(std::move(local_context)),
|
||||
log(getLogger(target_storage->getStorageID().getNameForLogs() + " (IngestColumn)"))
|
||||
{
|
||||
source_storage = context->tryGetCnchWorkerResource()->getTable(StorageID{command.from_database, command.from_table});
|
||||
source_storage = context->tryGetCnchWorkerResource()->tryGetTable(StorageID{command.from_database, command.from_table});
|
||||
|
||||
ingest_column_names = command.column_names;
|
||||
target_cloud_merge_tree = dynamic_cast<StorageCloudMergeTree *>(target_storage.get());
|
||||
|
|
|
@ -281,10 +281,10 @@ void MergeTreeCloudData::prepareVersionedPartsForRead(ContextPtr local_context,
|
|||
Stopwatch watch;
|
||||
|
||||
std::lock_guard<std::mutex> lock(load_data_parts_mutex);
|
||||
if (data_parts_loaded)
|
||||
if (versioned_data_parts_loaded)
|
||||
return;
|
||||
|
||||
SCOPE_EXIT_SAFE(data_parts_loaded=true);
|
||||
SCOPE_EXIT_SAFE(versioned_data_parts_loaded=true);
|
||||
|
||||
std::unordered_map<String, ServerDataPartsWithDBM> server_parts_by_partition;
|
||||
std::vector<std::shared_ptr<MergeTreePartition>> partition_list;
|
||||
|
|
|
@ -93,6 +93,7 @@ protected:
|
|||
/// guard for loading received data_parts and virtual_data_parts.
|
||||
std::mutex load_data_parts_mutex;
|
||||
bool data_parts_loaded{false};
|
||||
bool versioned_data_parts_loaded{false};
|
||||
MutableDataPartsVector received_data_parts;
|
||||
MutableDataPartsVector received_virtual_data_parts;
|
||||
|
||||
|
|
|
@ -114,8 +114,6 @@ void StorageCloudMergeTree::read(
|
|||
{
|
||||
if (data_version)
|
||||
prepareVersionedPartsForRead(local_context, query_info, column_names);
|
||||
else
|
||||
prepareDataPartsForRead();
|
||||
|
||||
if (auto plan = MergeTreeDataSelectExecutor(*this).read(
|
||||
column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage))
|
||||
|
|
|
@ -106,7 +106,6 @@ void StorageSystemCnchPartsColumns::fillData(MutableColumns & res_columns, Conte
|
|||
throw Exception("Wrong storage type for parts columns request", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
auto columns = storage->getInMemoryMetadataPtr()->getColumns().getAllPhysical();
|
||||
cloud->prepareDataPartsForRead();
|
||||
auto parts = cloud->getDataPartsVector();
|
||||
|
||||
LOG_DEBUG(log, "Target CloudMergeTree got {} parts" , parts.size());
|
||||
|
|
|
@ -15,7 +15,7 @@ set enable_ab_index_optimization = 1, enable_partition_filter_push_down = 1;
|
|||
-- force_index_by_date
|
||||
select sum(events) from test.t40114_bitmap_index_force_index_by_date where arraySetCheck(int_vid, [1]) settings force_index_by_date = 1 format Null; -- { serverError 277 }
|
||||
select sum(events) from test.t40114_bitmap_index_force_index_by_date where date = '2024-08-01' and arraySetCheck(int_vid, [1]) settings force_index_by_date = 1 format Null;
|
||||
select sum(events) from test.t40114_bitmap_index_force_index_by_date where date in ('2024-08-01', '2023-07-01') and arraySetCheck(int_vid, [1]) settings force_index_by_date = 1 format Null;
|
||||
select sum(events) from test.t40114_bitmap_index_force_index_by_date where date in ('2024-08-01', '2023-07-01') and arraySetCheck(int_vid, [1]) settings force_index_by_date = 1 format Null; -- { serverError 277 }
|
||||
select sum(events) from test.t40114_bitmap_index_force_index_by_date where date >= '2023-08-01' and date <= '2024-08-01' and arraySetCheck(int_vid, [1]) settings force_index_by_date = 1 format Null;
|
||||
select sum(events) from test.t40114_bitmap_index_force_index_by_date where toStartOfYear(date) = '2024-08-01' and arraySetCheck(int_vid, [1]) settings force_index_by_date = 1 format Null;
|
||||
|
||||
|
|
Loading…
Reference in New Issue