diff --git a/src/CloudServices/CnchWorkerServiceImpl.cpp b/src/CloudServices/CnchWorkerServiceImpl.cpp index 20d717047d..366371a326 100644 --- a/src/CloudServices/CnchWorkerServiceImpl.cpp +++ b/src/CloudServices/CnchWorkerServiceImpl.cpp @@ -84,6 +84,12 @@ namespace ProfileEvents extern const Event PreloadExecTotalOps; } +namespace CurrentMetrics +{ + extern const Metric WorkerServicePoolTask; + extern const Metric WorkerServicePoolPendingTask; +} + namespace DB { namespace ErrorCodes @@ -101,8 +107,19 @@ namespace ErrorCodes CnchWorkerServiceImpl::CnchWorkerServiceImpl(ContextMutablePtr context_) : WithMutableContext(context_->getGlobalContext()) , log(getLogger("CnchWorkerService")) - , thread_pool(getNumberOfPhysicalCPUCores() * 4, getNumberOfPhysicalCPUCores() * 2, getNumberOfPhysicalCPUCores() * 8) { + size_t thread_pool_size = getContext()->getConfigRef().getUInt("worker_service_thread_pool_size", 0); + if (!thread_pool_size) + { + thread_pool_size = getNumberOfPhysicalCPUCores() * 4; + LOG_INFO(log, "worker_service_thread_pool_size is not set, use default value {} based on available CPU cores", thread_pool_size); + } + if (thread_pool_size > 5000) + { + LOG_INFO(log, "Lowering worker_service_thread_pool_size to 5000, current value: {}", thread_pool_size); + thread_pool_size = 5000; + } + thread_pool = std::make_unique(thread_pool_size, thread_pool_size / 2, thread_pool_size * 2); } CnchWorkerServiceImpl::~CnchWorkerServiceImpl() @@ -110,7 +127,7 @@ CnchWorkerServiceImpl::~CnchWorkerServiceImpl() try { LOG_TRACE(log, "Waiting local thread pool finishing"); - thread_pool.wait(); + thread_pool->wait(); } catch (...) { @@ -121,10 +138,12 @@ CnchWorkerServiceImpl::~CnchWorkerServiceImpl() #define THREADPOOL_SCHEDULE(func) \ try \ { \ - thread_pool.scheduleOrThrowOnError(std::move(func)); \ + CurrentMetrics::add(CurrentMetrics::WorkerServicePoolPendingTask); \ + thread_pool->scheduleOrThrowOnError(std::move(func)); \ } \ catch (...) \ { \ + CurrentMetrics::sub(CurrentMetrics::WorkerServicePoolPendingTask); \ tryLogCurrentException(log, __PRETTY_FUNCTION__); \ RPCHelpers::handleException(response->mutable_exception()); \ done->Run(); \ @@ -132,6 +151,8 @@ CnchWorkerServiceImpl::~CnchWorkerServiceImpl() #define SUBMIT_THREADPOOL(...) \ auto _func = [=, this] { \ + CurrentMetrics::sub(CurrentMetrics::WorkerServicePoolPendingTask); \ + CurrentMetrics::Increment metric_increment{CurrentMetrics::WorkerServicePoolTask}; \ brpc::ClosureGuard done_guard(done); \ try \ { \ diff --git a/src/CloudServices/CnchWorkerServiceImpl.h b/src/CloudServices/CnchWorkerServiceImpl.h index f608a32b8f..35c46974df 100644 --- a/src/CloudServices/CnchWorkerServiceImpl.h +++ b/src/CloudServices/CnchWorkerServiceImpl.h @@ -246,7 +246,7 @@ private: // class PreloadHandler; // std::shared_ptr preload_handler; - ThreadPool thread_pool; + std::unique_ptr thread_pool; std::mutex backup_lock; std::unique_ptr backup_rpc_pool; diff --git a/src/CloudServices/QueryResourceUtils.h b/src/CloudServices/QueryResourceUtils.h index 551406dd29..9007468def 100644 --- a/src/CloudServices/QueryResourceUtils.h +++ b/src/CloudServices/QueryResourceUtils.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include #include #include @@ -34,12 +35,43 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte { static LoggerPtr log = getLogger("WorkerResource"); LOG_TRACE(log, "Receiving resources for Session: {}", query_resource.txn_id()); + + struct Stats + { + UInt64 init_us; + UInt64 ddl; + UInt64 ddl_us; + UInt64 parts; + UInt64 vparts; + UInt64 lakeparts; + UInt64 files; + UInt64 load_us; + String toString() const + { + WriteBufferFromOwnString buf; + buf << "init in " << init_us << ", " << ddl << " ddl in " << ddl_us; + buf << ", load "; + if (parts) + buf << parts << " parts "; + if (vparts) + buf << vparts << " vparts "; + if (lakeparts) + buf << lakeparts << " lakeparts "; + if (files) + buf << files << " files "; + buf << "in " << load_us; + return buf.str(); + } + }; + Stats stats{}; + Stopwatch watch; auto session = context->acquireNamedCnchSession(query_resource.txn_id(), query_resource.timeout(), false); auto session_context = session->context; session_context->setTemporaryTransaction(query_resource.txn_id(), query_resource.primary_txn_id()); if (query_resource.has_session_timezone()) session_context->setSetting("session_timezone", query_resource.session_timezone()); + stats.init_us = watch.elapsedMicroseconds(); CurrentThread::QueryScope query_scope(session_context); auto worker_resource = session_context->getCnchWorkerResource(); @@ -72,15 +104,13 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte object_columns); } create_timer.stop(); - LOG_INFO( - log, - "Prepared {} tables for session {} in {} us", - query_resource.create_queries_size() + query_resource.cacheable_create_queries_size(), - query_resource.txn_id(), - create_timer.elapsedMicroseconds()); - ProfileEvents::increment(ProfileEvents::QueryCreateTablesMicroseconds, create_timer.elapsedMicroseconds()); + stats.ddl = query_resource.create_queries_size() + query_resource.cacheable_create_queries_size(); + stats.ddl_us = create_timer.elapsedMicroseconds(); + LOG_DEBUG(log, "Prepared {} tables for {} in {} us", stats.ddl, query_resource.txn_id(), stats.ddl_us); + ProfileEvents::increment(ProfileEvents::QueryCreateTablesMicroseconds, stats.ddl_us); } + Stopwatch load_timer; bool lazy_load_parts = query_resource.has_lazy_load_data_parts() && query_resource.lazy_load_data_parts(); for (const auto & data : query_resource.data_parts()) { @@ -126,6 +156,7 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte } } + stats.parts += server_parts_size; cloud_merge_tree->receiveDataParts(std::move(server_parts)); LOG_DEBUG( @@ -161,17 +192,18 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte } } - cloud_merge_tree->receiveVirtualDataParts(std::move(virtual_parts)); + stats.vparts += virtual_parts_size; + cloud_merge_tree->receiveVirtualDataParts(std::move(virtual_parts)); - LOG_DEBUG( - log, - "Received {} virtual parts for table {}(txn_id: {}), disk_cache_mode {}, is_dict: {}, lazy_load_parts: {}", - virtual_parts_size, - cloud_merge_tree->getStorageID().getNameForLogs(), - query_resource.txn_id(), - query_resource.disk_cache_mode(), - is_dict_table, - lazy_load_parts); + LOG_DEBUG( + log, + "Received {} virtual parts for table {}(txn_id: {}), disk_cache_mode {}, is_dict: {}, lazy_load_parts: {}", + virtual_parts_size, + cloud_merge_tree->getStorageID().getNameForLogs(), + query_resource.txn_id(), + query_resource.disk_cache_mode(), + is_dict_table, + lazy_load_parts); } std::set required_bucket_numbers; @@ -200,11 +232,13 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte { auto settings = hive_table->getSettings(); auto lake_scan_infos = ILakeScanInfo::deserialize(data.lake_scan_info_parts(), context, storage->getInMemoryMetadataPtr(), *settings); + stats.lakeparts += lake_scan_infos.size(); hive_table->loadLakeScanInfos(lake_scan_infos); } else if (auto * cloud_file_table = dynamic_cast(storage.get())) { auto data_parts = createCnchFileDataParts(session_context, data.file_parts()); + stats.files += data_parts.size(); cloud_file_table->loadDataParts(data_parts); LOG_DEBUG( @@ -216,6 +250,7 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte else throw Exception("Unknown table engine: " + storage->getName(), ErrorCodes::UNKNOWN_TABLE); } + stats.load_us += load_timer.elapsedMicroseconds(); std::unordered_map udf_infos; for (const auto & udf_info : query_resource.udf_infos()) @@ -225,7 +260,7 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte } watch.stop(); - LOG_INFO(log, "Load all resources for session {} in {} us.", query_resource.txn_id(), watch.elapsedMicroseconds()); + LOG_INFO(log, "Received all resources for {} in {} us: {}", query_resource.txn_id(), watch.elapsedMicroseconds(), stats.toString()); ProfileEvents::increment(ProfileEvents::QueryLoadResourcesMicroseconds, watch.elapsedMicroseconds()); } diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index b8731d018c..1a2cca5a55 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -178,6 +178,9 @@ \ M(StorageMemoryRows, "Memory table input rows") \ M(StorageMemoryBytes, "Memory table input bytes") \ +\ + M(WorkerServicePoolTask, "Number of active tasks in worker service thread pool.") \ + M(WorkerServicePoolPendingTask, "Number of pending tasks in worker service thread pool.") \ \ M(CnchSDRequestsUpstream, "Number of Service Discovery requests to upstream") \ \