Merge 'add-transaction-id-in-query-log' into 'cnch-dev'

feat(clickhousech@m-5383597034): add transaction id in query log and query exchange log

See merge request: !25980
This commit is contained in:
连薛超 2024-10-24 02:56:51 +00:00 committed by Fred Wang
parent 420834886d
commit 07e14115fc
10 changed files with 17 additions and 1 deletions

View File

@ -106,6 +106,7 @@ namespace ErrorCodes
void PlanSegmentExecutor::prepareSegmentInfo() const
{
query_log_element->client_info = context->getClientInfo();
query_log_element->txn_id = context->getCurrentTransactionID();
query_log_element->segment_id = plan_segment->getPlanSegmentId();
query_log_element->segment_parallel = plan_segment->getParallelSize();
query_log_element->segment_parallel_index = plan_segment_instance->info.parallel_id;
@ -167,6 +168,7 @@ PlanSegmentExecutor::~PlanSegmentExecutor() noexcept
for (const auto & [uuid, stat] : plan_segment_instance->info.source_task_stats)
{
QueryExchangeLogElement element;
element.txn_id = context->getCurrentTransactionID();
element.initial_query_id = context->getInitialQueryId();
element.parallel_index = plan_segment_instance->info.parallel_id;
element.read_segment = plan_segment->getPlanSegmentId();

View File

@ -29,6 +29,7 @@ namespace DB
NamesAndTypesList QueryExchangeLogElement::getNamesAndTypes()
{
return {
{"txn_id", std::make_shared<DataTypeUInt64>()},
{"initial_query_id", std::make_shared<DataTypeString>()},
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
@ -87,6 +88,7 @@ NamesAndAliases QueryExchangeLogElement::getNamesAndAliases()
void QueryExchangeLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(txn_id);
columns[i++]->insert(initial_query_id);
columns[i++]->insert(DateLUT::serverTimezoneInstance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);

View File

@ -15,6 +15,7 @@
#pragma once
#include <limits>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/SystemLog.h>
@ -27,6 +28,7 @@ namespace DB
{
struct QueryExchangeLogElement
{
UInt64 txn_id{std::numeric_limits<UInt64>::max()};
String initial_query_id{"-1"};
UInt64 exchange_id{std::numeric_limits<UInt64>::max()};
UInt64 partition_id{std::numeric_limits<UInt64>::max()};

View File

@ -147,7 +147,8 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
{"virtual_warehouse", std::make_shared<DataTypeString>()},
{"worker_group", std::make_shared<DataTypeString>()},
{"query_plan", std::make_shared<DataTypeString>()},
{"normalized_query_plan_hash", std::make_shared<DataTypeUInt64>()}};
{"normalized_query_plan_hash", std::make_shared<DataTypeUInt64>()},
{"txn_id", std::make_shared<DataTypeUInt64>()}};
}
NamesAndAliases QueryLogElement::getNamesAndAliases()
@ -350,6 +351,7 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(worker_group);
columns[i++]->insert(query_plan);
columns[i++]->insert(normalized_query_plan_hash);
columns[i++]->insert(txn_id);
}
void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableColumns & columns, size_t & i)

View File

@ -106,6 +106,7 @@ struct QueryLogElement
String worker_group;
String query_plan;
UInt64 normalized_query_plan_hash{};
UInt64 txn_id{};
static std::string name() { return "QueryLog"; }

View File

@ -635,6 +635,7 @@ static void onExceptionBeforeStart(
bool throw_root_cause = needThrowRootCauseError(context.get(), elem.exception_code, elem.exception);
elem.client_info = context->getClientInfo();
elem.txn_id = context->getCurrentTransactionID();
elem.partition_ids = context->getPartitionIds();
elem.log_comment = settings.log_comment;
@ -1542,6 +1543,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.client_info = client_info;
elem.partition_ids = context->getPartitionIds();
if (txn)
elem.txn_id = context->getCurrentTransactionID();
if (auto worker_group = context->tryGetCurrentWorkerGroup())

View File

@ -60,6 +60,7 @@ DiskPartitionWriter::~DiskPartitionWriter()
if (enable_disk_writer_metrics && query_exchange_log)
{
QueryExchangeLogElement element;
element.txn_id = context->getCurrentTransactionID();
element.initial_query_id = context->getInitialQueryId();
element.exchange_id = extended_key.key->exchange_id;
element.partition_id = extended_key.key->partition_id;

View File

@ -100,6 +100,7 @@ BrpcRemoteBroadcastReceiver::~BrpcRemoteBroadcastReceiver()
if (!enable_receiver_metrics || !query_exchange_log)
return;
QueryExchangeLogElement element;
element.txn_id = context->getCurrentTransactionID();
element.initial_query_id = initial_query_id;
element.exchange_id = trans_key->exchange_id;
element.partition_id = trans_key->partition_id;

View File

@ -75,6 +75,7 @@ BrpcRemoteBroadcastSender::~BrpcRemoteBroadcastSender()
if (enable_sender_metrics)
{
QueryExchangeLogElement element;
element.txn_id = context->getCurrentTransactionID();
const auto & key = trans_keys.front();
element.initial_query_id = context->getInitialQueryId();
element.exchange_id = key->exchange_id;

View File

@ -200,6 +200,7 @@ LocalBroadcastChannel::~LocalBroadcastChannel()
if ((enable_sender_metrics || enable_receiver_metrics) && query_exchange_log)
{
QueryExchangeLogElement element;
element.txn_id = context->getCurrentTransactionID();
element.initial_query_id = context->getInitialQueryId();
element.exchange_id = data_key->exchange_id;
element.partition_id = data_key->partition_id;