Merge 'config-worker-service-pool-cnch-dev' into 'cnch-dev'

feat(clickhousech@m-5285085311): [cp cnch-dev] add worker config worker_service_thread_pool_size

See merge request: !25997
This commit is contained in:
高大月 2024-10-23 08:52:41 +00:00 committed by Fred Wang
parent 10552bf704
commit 420834886d
4 changed files with 81 additions and 22 deletions

View File

@ -84,6 +84,12 @@ namespace ProfileEvents
extern const Event PreloadExecTotalOps;
}
namespace CurrentMetrics
{
extern const Metric WorkerServicePoolTask;
extern const Metric WorkerServicePoolPendingTask;
}
namespace DB
{
namespace ErrorCodes
@ -101,8 +107,19 @@ namespace ErrorCodes
CnchWorkerServiceImpl::CnchWorkerServiceImpl(ContextMutablePtr context_)
: WithMutableContext(context_->getGlobalContext())
, log(getLogger("CnchWorkerService"))
, thread_pool(getNumberOfPhysicalCPUCores() * 4, getNumberOfPhysicalCPUCores() * 2, getNumberOfPhysicalCPUCores() * 8)
{
size_t thread_pool_size = getContext()->getConfigRef().getUInt("worker_service_thread_pool_size", 0);
if (!thread_pool_size)
{
thread_pool_size = getNumberOfPhysicalCPUCores() * 4;
LOG_INFO(log, "worker_service_thread_pool_size is not set, use default value {} based on available CPU cores", thread_pool_size);
}
if (thread_pool_size > 5000)
{
LOG_INFO(log, "Lowering worker_service_thread_pool_size to 5000, current value: {}", thread_pool_size);
thread_pool_size = 5000;
}
thread_pool = std::make_unique<ThreadPool>(thread_pool_size, thread_pool_size / 2, thread_pool_size * 2);
}
CnchWorkerServiceImpl::~CnchWorkerServiceImpl()
@ -110,7 +127,7 @@ CnchWorkerServiceImpl::~CnchWorkerServiceImpl()
try
{
LOG_TRACE(log, "Waiting local thread pool finishing");
thread_pool.wait();
thread_pool->wait();
}
catch (...)
{
@ -121,10 +138,12 @@ CnchWorkerServiceImpl::~CnchWorkerServiceImpl()
#define THREADPOOL_SCHEDULE(func) \
try \
{ \
thread_pool.scheduleOrThrowOnError(std::move(func)); \
CurrentMetrics::add(CurrentMetrics::WorkerServicePoolPendingTask); \
thread_pool->scheduleOrThrowOnError(std::move(func)); \
} \
catch (...) \
{ \
CurrentMetrics::sub(CurrentMetrics::WorkerServicePoolPendingTask); \
tryLogCurrentException(log, __PRETTY_FUNCTION__); \
RPCHelpers::handleException(response->mutable_exception()); \
done->Run(); \
@ -132,6 +151,8 @@ CnchWorkerServiceImpl::~CnchWorkerServiceImpl()
#define SUBMIT_THREADPOOL(...) \
auto _func = [=, this] { \
CurrentMetrics::sub(CurrentMetrics::WorkerServicePoolPendingTask); \
CurrentMetrics::Increment metric_increment{CurrentMetrics::WorkerServicePoolTask}; \
brpc::ClosureGuard done_guard(done); \
try \
{ \

View File

@ -246,7 +246,7 @@ private:
// class PreloadHandler;
// std::shared_ptr<PreloadHandler> preload_handler;
ThreadPool thread_pool;
std::unique_ptr<ThreadPool> thread_pool;
std::mutex backup_lock;
std::unique_ptr<ThreadPool> backup_rpc_pool;

View File

@ -1,6 +1,7 @@
#pragma once
#include <CloudServices/CnchServerResource.h>
#include <CloudServices/CnchWorkerResource.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/WorkerStatusManager.h>
#include <Protos/cnch_worker_rpc.pb.h>
#include <Storages/ColumnsDescription.h>
@ -34,12 +35,43 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte
{
static LoggerPtr log = getLogger("WorkerResource");
LOG_TRACE(log, "Receiving resources for Session: {}", query_resource.txn_id());
struct Stats
{
UInt64 init_us;
UInt64 ddl;
UInt64 ddl_us;
UInt64 parts;
UInt64 vparts;
UInt64 lakeparts;
UInt64 files;
UInt64 load_us;
String toString() const
{
WriteBufferFromOwnString buf;
buf << "init in " << init_us << ", " << ddl << " ddl in " << ddl_us;
buf << ", load ";
if (parts)
buf << parts << " parts ";
if (vparts)
buf << vparts << " vparts ";
if (lakeparts)
buf << lakeparts << " lakeparts ";
if (files)
buf << files << " files ";
buf << "in " << load_us;
return buf.str();
}
};
Stats stats{};
Stopwatch watch;
auto session = context->acquireNamedCnchSession(query_resource.txn_id(), query_resource.timeout(), false);
auto session_context = session->context;
session_context->setTemporaryTransaction(query_resource.txn_id(), query_resource.primary_txn_id());
if (query_resource.has_session_timezone())
session_context->setSetting("session_timezone", query_resource.session_timezone());
stats.init_us = watch.elapsedMicroseconds();
CurrentThread::QueryScope query_scope(session_context);
auto worker_resource = session_context->getCnchWorkerResource();
@ -72,15 +104,13 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte
object_columns);
}
create_timer.stop();
LOG_INFO(
log,
"Prepared {} tables for session {} in {} us",
query_resource.create_queries_size() + query_resource.cacheable_create_queries_size(),
query_resource.txn_id(),
create_timer.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::QueryCreateTablesMicroseconds, create_timer.elapsedMicroseconds());
stats.ddl = query_resource.create_queries_size() + query_resource.cacheable_create_queries_size();
stats.ddl_us = create_timer.elapsedMicroseconds();
LOG_DEBUG(log, "Prepared {} tables for {} in {} us", stats.ddl, query_resource.txn_id(), stats.ddl_us);
ProfileEvents::increment(ProfileEvents::QueryCreateTablesMicroseconds, stats.ddl_us);
}
Stopwatch load_timer;
bool lazy_load_parts = query_resource.has_lazy_load_data_parts() && query_resource.lazy_load_data_parts();
for (const auto & data : query_resource.data_parts())
{
@ -126,6 +156,7 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte
}
}
stats.parts += server_parts_size;
cloud_merge_tree->receiveDataParts(std::move(server_parts));
LOG_DEBUG(
@ -161,6 +192,7 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte
}
}
stats.vparts += virtual_parts_size;
cloud_merge_tree->receiveVirtualDataParts(std::move(virtual_parts));
LOG_DEBUG(
@ -200,11 +232,13 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte
{
auto settings = hive_table->getSettings();
auto lake_scan_infos = ILakeScanInfo::deserialize(data.lake_scan_info_parts(), context, storage->getInMemoryMetadataPtr(), *settings);
stats.lakeparts += lake_scan_infos.size();
hive_table->loadLakeScanInfos(lake_scan_infos);
}
else if (auto * cloud_file_table = dynamic_cast<IStorageCloudFile *>(storage.get()))
{
auto data_parts = createCnchFileDataParts(session_context, data.file_parts());
stats.files += data_parts.size();
cloud_file_table->loadDataParts(data_parts);
LOG_DEBUG(
@ -216,6 +250,7 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte
else
throw Exception("Unknown table engine: " + storage->getName(), ErrorCodes::UNKNOWN_TABLE);
}
stats.load_us += load_timer.elapsedMicroseconds();
std::unordered_map<String, UInt64> udf_infos;
for (const auto & udf_info : query_resource.udf_infos())
@ -225,7 +260,7 @@ static void loadQueryResource(const T & query_resource, const ContextPtr & conte
}
watch.stop();
LOG_INFO(log, "Load all resources for session {} in {} us.", query_resource.txn_id(), watch.elapsedMicroseconds());
LOG_INFO(log, "Received all resources for {} in {} us: {}", query_resource.txn_id(), watch.elapsedMicroseconds(), stats.toString());
ProfileEvents::increment(ProfileEvents::QueryLoadResourcesMicroseconds, watch.elapsedMicroseconds());
}

View File

@ -178,6 +178,9 @@
\
M(StorageMemoryRows, "Memory table input rows") \
M(StorageMemoryBytes, "Memory table input bytes") \
\
M(WorkerServicePoolTask, "Number of active tasks in worker service thread pool.") \
M(WorkerServicePoolPendingTask, "Number of pending tasks in worker service thread pool.") \
\
M(CnchSDRequestsUpstream, "Number of Service Discovery requests to upstream") \
\