Merge 'server-side-retry-profile-events' into 'cnch-dev'

feat(clickhousech@m-5348865586): add a server side bsp retry metrics

See merge request: !25278
This commit is contained in:
连薛超 2024-09-25 03:11:43 +00:00 committed by Fred Wang
parent af6bdb3fce
commit e631fe5263
6 changed files with 98 additions and 76 deletions

View File

@ -1216,6 +1216,7 @@
\
M(NumberOfMarkRangesBeforeBeMergedInPKFilter, "Number of mark ranges in primary index filtering before adjacent ranges be merged into more bigger ranges") \
M(PlanSegmentInstanceRetry, "How many times this plan segment has been retried, only valid under bsp mode") \
M(QueryBspRetryCount, "How many times the whole query has retried, only valid for server") \
\
M(OrcTotalStripes, "Total Stripes") \
M(OrcReadStripes, "Total Read Stripes") \

View File

@ -8,7 +8,9 @@
#include <CloudServices/CnchServerResource.h>
#include <bthread/mutex.h>
#include <Poco/Logger.h>
#include <Common/CurrentThread.h>
#include <Common/ErrorCodes.h>
#include <Common/ProfileEvents.h>
#include <common/logger_useful.h>
#include <Interpreters/DistributedStages/AddressInfo.h>
@ -31,6 +33,11 @@
#include <utility>
#include <CloudServices/CnchServerResource.h>
namespace ProfileEvents
{
extern const Event QueryBspRetryCount;
}
namespace DB
{
@ -354,7 +361,7 @@ bool BSPScheduler::retryTaskIfPossible(size_t segment_id, UInt64 parallel_index,
{
std::unique_lock<std::mutex> lk(nodes_alloc_mutex);
attempt_id = segment_instance_attempts[instance_id];
if (attempt_id >= query_context->getSettingsRef().bsp_max_retry_num)
if (attempt_id > query_context->getSettingsRef().bsp_max_retry_num)
return false;
auto running_worker_maybe = segment_parallel_locations[segment_id][parallel_index];
if (!running_worker_maybe.has_value())
@ -435,6 +442,7 @@ bool BSPScheduler::retryTaskIfPossible(size_t segment_id, UInt64 parallel_index,
postEvent(std::make_shared<TriggerDispatchEvent>());
}
}
CurrentThread::getProfileEvents().increment(ProfileEvents::QueryBspRetryCount, 1);
return true;
}

View File

@ -20,19 +20,20 @@
*/
#include <memory>
#include <optional>
#include <Client/Connection.h>
#include <Interpreters/executeQueryHelper.h>
#include <Common/HistogramMetrics.h>
#include <Common/Config/VWCustomizedSettings.h>
#include <Common/SettingsChanges.h>
#include <Common/Exception.h>
#include <Common/HistogramMetrics.h>
#include <Common/HostWithPorts.h>
#include <Common/PODArray.h>
#include <Common/SettingsChanges.h>
#include <Common/ThreadProfileEvents.h>
#include <Common/formatReadable.h>
#include <Common/time.h>
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <Common/time.h>
#include <common/types.h>
#include <IO/LimitReadBuffer.h>
@ -496,6 +497,66 @@ static void logException(ContextPtr context, QueryLogElement & elem)
elem.stack_trace);
}
/// Common code for finish and exception callbacks
void logStatusInfo(
std::optional<bool> is_unlimited_query,
ContextPtr context,
QueryLogElement & element,
const QueryStatusInfo & info,
const ASTPtr query_ast)
{
DB::UInt64 query_time = info.elapsed_seconds * 1000000;
ProfileEvents::increment(ProfileEvents::QueryTimeMicroseconds, query_time);
if (query_ast->as<ASTSelectQuery>() || query_ast->as<ASTSelectWithUnionQuery>())
{
ProfileEvents::increment(ProfileEvents::SelectQueryTimeMicroseconds, query_time);
}
else if (query_ast->as<ASTInsertQuery>())
{
ProfileEvents::increment(ProfileEvents::InsertQueryTimeMicroseconds, query_time);
}
element.query_duration_ms = info.elapsed_seconds * 1000;
element.read_rows = info.read_rows;
element.read_bytes = info.read_bytes;
element.disk_cache_read_bytes = info.disk_cache_read_bytes;
element.written_rows = info.written_rows;
element.written_bytes = info.written_bytes;
element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
element.thread_ids = std::move(info.thread_ids);
element.profile_counters = std::move(info.profile_counters);
element.max_io_time_thread_name = std::move(info.max_io_time_thread_name);
element.max_io_time_thread_ms = info.max_io_time_thread_ms;
element.max_thread_io_profile_counters = std::move(info.max_io_thread_profile_counters);
if (element.max_thread_io_profile_counters)
{
auto max_io_ms
= element.max_thread_io_profile_counters->getIOReadTime(element.query_settings->remote_filesystem_read_prefetch) / 1000;
auto io_ms = max_io_ms < element.query_duration_ms ? max_io_ms : 0;
if (is_unlimited_query)
{
HistogramMetrics::increment(HistogramMetrics::UnlimitedQueryIOLatency, io_ms, Metrics::MetricType::Timer);
}
else
{
if (auto vw = context->tryGetCurrentVW())
{
HistogramMetrics::increment(HistogramMetrics::QueryIOLatency, io_ms, Metrics::MetricType::Timer, {{"vw", vw->getName()}});
}
else
{
HistogramMetrics::increment(HistogramMetrics::UnlimitedQueryIOLatency, io_ms, Metrics::MetricType::Timer);
}
}
}
}
static LabelledMetrics::MetricLabels markQueryProfileEventLabels(
ContextMutablePtr context,
ProcessListQueryType query_type = ProcessListQueryType::Default,
@ -585,9 +646,18 @@ static void onExceptionBeforeStart(
elem.worker_group = worker_group->getID();
}
QueryStatus * process_list_elem = context->getProcessListElement();
const Settings & current_settings = context->getSettingsRef();
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
if (process_list_elem)
{
QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false);
logStatusInfo(is_unlimited_query, context, elem, info, ast);
}
if (settings.log_queries && elem.type >= settings.log_queries_min_type
&& !settings.log_queries_min_query_duration_ms.totalMilliseconds())
logQuery(context, elem);
@ -1083,6 +1153,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ProcessListQueryType query_type {ProcessListQueryType::Default};
std::optional<bool> is_unlimited_query;
/// make sure when exception before start happened, process_list_entry is still alive
ProcessList::EntryPtr process_list_entry;
try
{
@ -1121,7 +1193,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
checkASTSizeLimits(*ast, settings);
/// Put query to process list. But don't put SHOW PROCESSLIST query itself.
ProcessList::EntryPtr process_list_entry;
if (!internal && !ast->as<ASTShowProcesslistQuery>())
{
LOG_TRACE(getLogger("executeQuery"), "enqueue process list query :{}", query_for_logging);
@ -1515,69 +1586,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
logQuery(context, elem);
}
/// Common code for finish and exception callbacks
auto status_info_to_query_log = [is_unlimited_query, context](QueryLogElement & element, const QueryStatusInfo & info, const ASTPtr query_ast) mutable {
DB::UInt64 query_time = info.elapsed_seconds * 1000000;
ProfileEvents::increment(ProfileEvents::QueryTimeMicroseconds, query_time);
if (query_ast->as<ASTSelectQuery>() || query_ast->as<ASTSelectWithUnionQuery>())
{
ProfileEvents::increment(ProfileEvents::SelectQueryTimeMicroseconds, query_time);
}
else if (query_ast->as<ASTInsertQuery>())
{
ProfileEvents::increment(ProfileEvents::InsertQueryTimeMicroseconds, query_time);
}
element.query_duration_ms = info.elapsed_seconds * 1000;
element.read_rows = info.read_rows;
element.read_bytes = info.read_bytes;
element.disk_cache_read_bytes = info.disk_cache_read_bytes;
element.written_rows = info.written_rows;
element.written_bytes = info.written_bytes;
element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
element.thread_ids = std::move(info.thread_ids);
element.profile_counters = std::move(info.profile_counters);
element.max_io_time_thread_name = std::move(info.max_io_time_thread_name);
element.max_io_time_thread_ms = info.max_io_time_thread_ms;
element.max_thread_io_profile_counters = std::move(info.max_io_thread_profile_counters);
if (element.max_thread_io_profile_counters)
{
auto max_io_ms = element.max_thread_io_profile_counters->getIOReadTime(element.query_settings->remote_filesystem_read_prefetch) / 1000;
auto io_ms = max_io_ms < element.query_duration_ms ? max_io_ms : 0;
if (is_unlimited_query)
{
HistogramMetrics::increment(
HistogramMetrics::UnlimitedQueryIOLatency,
io_ms,
Metrics::MetricType::Timer);
}
else
{
if (auto vw = context->tryGetCurrentVW())
{
HistogramMetrics::increment(
HistogramMetrics::QueryIOLatency,
io_ms,
Metrics::MetricType::Timer,
{{"vw", vw->getName()}});
}
else
{
HistogramMetrics::increment(
HistogramMetrics::UnlimitedQueryIOLatency,
io_ms,
Metrics::MetricType::Timer);
}
}
}
};
auto query_id = context->getCurrentQueryId();
/// Also make possible for caller to log successful query finish and exception during execution.
auto finish_callback
@ -1591,12 +1599,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
log_processors_profiles = settings.log_processors_profiles,
status_info_to_query_log,
query_id,
is_unlimited_query,
finish_current_transaction](
IBlockInputStream * stream_in,
IBlockOutputStream * stream_out,
QueryPipeline * query_pipeline) mutable {
IBlockInputStream * stream_in, IBlockOutputStream * stream_out, QueryPipeline * query_pipeline) mutable {
/// If active (write) use of the query cache is enabled and the query is eligible for result caching, then store the query
/// result buffered in the special-purpose cache processor (added on top of the pipeline) into the cache.
if (query_cache_usage == QueryCache::Usage::Write)
@ -1623,7 +1629,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.event_time = time_in_seconds(finish_time);
elem.event_time_microseconds = time_in_microseconds(finish_time);
elem.graphviz = process_list_elem->getGraphviz();
status_info_to_query_log(elem, info, ast);
logStatusInfo(is_unlimited_query, context, elem, info, ast);
if (process_list_elem->isUnlimitedQuery())
@ -1800,7 +1806,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
quota(quota),
status_info_to_query_log,
query_id,
finish_current_transaction,
query_type,
@ -1831,7 +1836,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (process_list_elem)
{
QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false);
status_info_to_query_log(elem, info, ast);
logStatusInfo(is_unlimited_query, context, elem, info, ast);
}
if (current_settings.calculate_text_stack_trace)

View File

@ -247,7 +247,7 @@ TEST(GtestScheduler, RetryTestCase1)
std::unordered_map<std::string, DB::Field> settings{
{"bsp_mode", 1},
{"distributed_max_parallel_size", parallel_size},
{"bsp_max_retry_num", 3},
{"bsp_max_retry_num", 2},
{"enable_disk_shuffle_partition_coalescing", 0}};
auto scheduler_context = createSchedulerTestContext(parallel_size, settings);
auto bsp_scheduler = scheduler_context.bsp_scheduler;

View File

@ -0,0 +1,7 @@
drop table if exists 10726_t1;
drop table if exists 10726_t2;
create table 10726_t1 (a Int32, b Int32) ENGINE=CnchMergeTree() ORDER BY a settings cnch_merge_max_total_rows_to_merge=1;
INSERT INTO 10726_t1 (a,b) VALUES (0,1);
select sum(a) from 10726_t1 settings enable_optimizer=1,log_queries=1,bsp_mode=1,max_bytes_to_read_leaf=1,bsp_max_retry_num=1; --{serverError 307};
system flush logs;
select ProfileEvents['QueryBspRetryCount'] FROM system.query_log where query LIKE '%select sum(a) from 10726_t1 settings enable_optimizer=1,log_queries=1,bsp_mode=1,max_bytes_to_read_leaf=1,bsp_max_retry_num=1%' AND type > 1 ORDER BY event_time DESC LIMIT 1;