Merge 'log-resource-in-query-log' into 'cnch-dev'

fix(clickhousech@m-5381084553): init source task payload timing from visible parts

See merge request: !25940
This commit is contained in:
连薛超 2024-10-22 06:25:20 +00:00 committed by Fred Wang
parent a1315e2580
commit d7d1525bee
17 changed files with 203 additions and 44 deletions

View File

@ -361,8 +361,8 @@ namespace
return visible_parts;
}
auto process_parts = [&](Vec & parts, size_t begin_pos, size_t end_pos, Vec & visible_parts_)
{
auto process_parts = [&](Vec & parts, size_t begin_pos, size_t end_pos, Vec & visible_parts_) {
/// NOTE! BSPScheduler relies on the same sort algorithm to estimate correct rows for table scan tasks
pdqsort(parts.begin() + begin_pos, parts.begin() + end_pos, PartComparator<Part>{});
/// One-pass algorithm to construct delta chains

View File

@ -402,9 +402,6 @@ void CnchServerResource::sendResources(const ContextPtr & context, std::optional
if (all_resources.empty())
return;
if (resource_option)
initSourceTaskPayload(context, all_resources);
Stopwatch rpc_watch;
auto worker_group_status = context->getWorkerGroupStatusPtr();
auto handler = std::make_shared<ExceptionHandlerWithFailedInfo>();
@ -671,6 +668,8 @@ void CnchServerResource::allocateResource(
if (auto it = assigned_map.find(host_ports.id); it != assigned_map.end())
{
assigned_parts = std::move(it->second);
if (resource_option)
initSourceTaskPayload(context, storage, host_ports, assigned_parts);
CnchPartsHelper::flattenPartsVector(assigned_parts);
LOG_TRACE(
log,
@ -759,36 +758,18 @@ void CnchServerResource::allocateResource(
}
void CnchServerResource::initSourceTaskPayload(
const ContextPtr & context, std::unordered_map<HostWithPorts, std::vector<AssignedResource>> & all_resources)
const ContextPtr & context, StoragePtr storage, const HostWithPorts & host_with_ports, ServerDataPartsVector & visible_parts)
{
for (const auto & [host_ports, assinged_resource] : all_resources)
auto uuid = storage->getStorageID().uuid;
for (const auto & p : visible_parts)
{
for (const auto & r : assinged_resource)
{
auto uuid = r.storage->getStorageID().uuid;
bool reclustered = r.storage->isTableClustered(context);
for (const auto & p : r.server_parts)
{
auto bucket_number = getBucketNumberOrInvalid(p->part_model_wrapper->bucketNumber(), reclustered);
auto addr = AddressInfo(host_ports.getHost(), host_ports.getTCPPort(), "", "", host_ports.exchange_port);
source_task_payload[uuid][addr].part_num += 1;
source_task_payload[uuid][addr].rows += p->rowExistsCount();
source_task_payload[uuid][addr].buckets.insert(bucket_number);
}
if (log->trace())
{
for (const auto & [addr, payload] : source_task_payload[uuid])
{
LOG_TRACE(
log,
"Source task payload for {}.{} addr:{} is {}",
r.storage->getDatabaseName(),
r.storage->getTableName(),
addr.toShortString(),
payload.toString());
}
}
}
bool reclustered = storage->isTableClustered(context);
auto bucket_number = getBucketNumberOrInvalid(p->part_model_wrapper->bucketNumber(), reclustered);
auto addr = AddressInfo(host_with_ports.getHost(), host_with_ports.getTCPPort(), "", "", host_with_ports.exchange_port);
source_task_payload[uuid][addr].part_num += 1;
source_task_payload[uuid][addr].rows += p->rowExistsCount();
source_task_payload[uuid][addr].visible_parts.push_back(p);
source_task_payload[uuid][addr].buckets.insert(bucket_number);
}
}
}

View File

@ -256,8 +256,8 @@ private:
std::lock_guard<std::mutex> &,
std::optional<ResourceOption> resource_option = std::nullopt);
void
initSourceTaskPayload(const ContextPtr & context, std::unordered_map<HostWithPorts, std::vector<AssignedResource>> & all_resources);
void initSourceTaskPayload(
const ContextPtr & context, StoragePtr storage, const HostWithPorts & host_with_ports, ServerDataPartsVector & visible_parts);
void sendCreateQueries(const ContextPtr & context);
void sendDataParts(const ContextPtr & context);

View File

@ -5,7 +5,11 @@
#include <limits>
#include <memory>
#include <mutex>
#include <Catalog/DataModelPartWrapper_fwd.h>
#include <CloudServices/CnchPartsHelper.h>
#include <CloudServices/CnchServerResource.h>
#include <Interpreters/DistributedStages/SourceTask.h>
#include <MergeTreeCommon/assignCnchParts.h>
#include <bthread/mutex.h>
#include <Poco/Logger.h>
#include <Common/CurrentThread.h>
@ -31,6 +35,7 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <pdqsort.h>
#include <CloudServices/CnchServerResource.h>
namespace ProfileEvents
@ -711,6 +716,57 @@ void BSPScheduler::sendResources(PlanSegment * plan_segment_ptr)
}
}
std::unordered_map<UUID, SourceTaskStat> BSPScheduler::createSourceTaskStats(
PlanSegment * plan_segment_ptr, const SegmentTaskInstance & instance, const SourceTaskFilter & source_task_filter)
{
std::unordered_map<UUID, SourceTaskStat> source_task_stats;
const auto & source_task_payload_map = query_context->getCnchServerResource()->getSourceTaskPayload();
AddressInfo addr;
{
std::unique_lock<std::mutex> lock(node_selector_result_mutex);
addr = node_selector_result[instance.segment_id].worker_nodes[instance.parallel_index].address;
}
for (const auto & plan_segment_input : plan_segment_ptr->getPlanSegmentInputs())
{
auto storage_id = plan_segment_input->getStorageID();
if (storage_id && storage_id->hasUUID())
{
if (auto iter = source_task_payload_map.find(storage_id->uuid); iter != source_task_payload_map.end())
{
auto source_task_payload_iter = iter->second.find(addr);
auto [iiter, _] = source_task_stats.emplace(storage_id->uuid, SourceTaskStat(storage_id.value(), 0));
if (source_task_payload_iter != iter->second.end())
{
auto visible_parts = source_task_payload_iter->second.visible_parts;
/// pdqsort is unstable, but all visible parts share different ids by CnchPartsHelper::PartComparator,
/// so worker and server will share the same order
/// refer to calcVisiblePartsImpl for more details
pdqsort(visible_parts.begin(), visible_parts.end(), CnchPartsHelper::PartComparator<ServerDataPartPtr>{});
filterParts(visible_parts, source_task_filter);
for (const auto & part : visible_parts)
{
iiter->second.rows += part->rowExistsCount();
}
}
}
}
}
if (log->trace())
{
for (const auto & [_, stat] : source_task_stats)
{
LOG_TRACE(
log,
"SourceTaskStats(table:{}) of segment instance({}_{}) contains {} rows",
stat.storage_id.getFullTableName(),
instance.segment_id,
instance.parallel_index,
stat.rows);
}
}
return source_task_stats;
}
void BSPScheduler::prepareTask(PlanSegment * plan_segment_ptr, NodeSelectorResult & selector_info, const SegmentTask & task)
{
// Register exchange for all outputs.
@ -762,6 +818,10 @@ PlanSegmentExecutionInfo BSPScheduler::generateExecutionInfo(size_t task_id, siz
execution_info.source_task_filter.buckets = source_task_buckets[instance];
}
auto source_task_stats = createSourceTaskStats(dag_graph_ptr->getPlanSegmentPtr(task_id), instance, execution_info.source_task_filter);
if (!source_task_stats.empty())
execution_info.source_task_stats = source_task_stats;
PlanSegmentInstanceId instance_id = PlanSegmentInstanceId{static_cast<UInt32>(task_id), static_cast<UInt32>(index)};
{
std::unique_lock<std::mutex> lk(nodes_alloc_mutex);

View File

@ -253,6 +253,8 @@ private:
void resendResource(const HostWithPorts & host_ports);
Protos::SendResourceRequestReq fillResourceRequestToProto(const ResourceRequest & req);
std::unordered_map<UUID, SourceTaskStat> createSourceTaskStats(
PlanSegment * plan_segment_ptr, const SegmentTaskInstance & instance, const SourceTaskFilter & source_task_filter);
// All batch task will be enqueue first. The schedule logic will pop queue and schedule the poped tasks.
EventQueue queue{10000};

View File

@ -29,9 +29,11 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/ProcessorProfile.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/QueryExchangeLog.h>
#include <Interpreters/RuntimeFilter/RuntimeFilterManager.h>
#include <Interpreters/executeQueryHelper.h>
#include <Interpreters/sendPlanSegment.h>
#include <MergeTreeCommon/assignCnchParts.h>
#include <Optimizer/Signature/PlanSegmentNormalizer.h>
#include <Optimizer/Signature/PlanSignature.h>
#include <Processors/Exchange/BroadcastExchangeSink.h>
@ -158,6 +160,26 @@ PlanSegmentExecutor::~PlanSegmentExecutor() noexcept
if (auto query_log = context->getQueryLog())
query_log->add(*query_log_element);
}
if (context->getSettingsRef().log_query_exchange && context->getSettingsRef().bsp_mode)
{
if (auto query_exchange_log = context->getQueryExchangeLog())
{
for (const auto & [uuid, stat] : plan_segment_instance->info.source_task_stats)
{
QueryExchangeLogElement element;
element.initial_query_id = context->getInitialQueryId();
element.parallel_index = plan_segment_instance->info.parallel_id;
element.read_segment = plan_segment->getPlanSegmentId();
element.event_time
= std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
element.recv_rows = stat.rows;
element.type = fmt::format("table_scan_{}", stat.storage_id.getFullNameNotQuoted());
element.finish_code = query_log_element->type == QUERY_FINISH ? BroadcastStatusCode::ALL_SENDERS_DONE
: BroadcastStatusCode::RECV_UNKNOWN_ERROR;
query_exchange_log->add(element);
}
}
}
}
catch (...)
{

View File

@ -68,6 +68,7 @@ struct PlanSegmentExecutionInfo
UInt32 attempt_id = std::numeric_limits<UInt32>::max();
std::unordered_map<UInt64, std::vector<PlanSegmentMultiPartitionSource>> sources;
UInt32 worker_epoch{0};
std::unordered_map<UUID, SourceTaskStat> source_task_stats;
};
struct PlanSegmentInstance

View File

@ -24,9 +24,11 @@
#include <Interpreters/DistributedStages/PlanSegmentInstance.h>
#include <Interpreters/DistributedStages/PlanSegmentManagerRpcService.h>
#include <Interpreters/DistributedStages/PlanSegmentReport.h>
#include <Interpreters/DistributedStages/SourceTask.h>
#include <Interpreters/DistributedStages/executePlanSegment.h>
#include <Interpreters/NamedSession.h>
#include <Processors/Exchange/DataTrans/Brpc/ReadBufferFromBrpcBuf.h>
#include <Protos/RPCHelpers.h>
#include <Protos/cnch_worker_rpc.pb.h>
#include <Protos/plan_segment_manager.pb.h>
#include <brpc/controller.h>
@ -631,6 +633,9 @@ void PlanSegmentManagerRpcService::submitPlanSegment(
}
}
for (const auto & iter : request->source_task_stats())
execution_info.source_task_stats.emplace(RPCHelpers::createUUID(iter.storage_id().uuid()), SourceTaskStat::fromProto(iter));
butil::IOBuf plan_segment_buf;
auto plan_segment_buf_size = cntl->request_attachment().cutn(&plan_segment_buf, request->plan_segment_buf_size());
if (plan_segment_buf_size != request->plan_segment_buf_size())

View File

@ -1,5 +1,7 @@
#include <Interpreters/DistributedStages/SourceTask.h>
#include "common/types.h"
#include <Protos/RPCHelpers.h>
#include <Protos/plan_segment_manager.pb.h>
#include <common/types.h>
namespace DB
@ -40,4 +42,18 @@ void SourceTaskFilter::fromProto(const Protos::SourceTaskFilter & proto)
}
}
Protos::SourceTaskStat SourceTaskStat::toProto() const
{
Protos::SourceTaskStat proto;
storage_id.toProto(*proto.mutable_storage_id());
proto.set_rows(rows);
return proto;
}
SourceTaskStat SourceTaskStat::fromProto(const Protos::SourceTaskStat & proto)
{
return {RPCHelpers::createStorageID(proto.storage_id()), proto.rows()};
}
} // namespace DB

View File

@ -3,6 +3,9 @@
#include <map>
#include <set>
#include <string>
#include <Catalog/DataModelPartWrapper_fwd.h>
#include <Core/Types.h>
#include <Interpreters/StorageID.h>
#include <Protos/plan_segment_manager.pb.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/range/adaptor/transformed.hpp>
@ -15,6 +18,7 @@ namespace DB
struct SourceTaskPayload
{
std::set<Int64> buckets;
ServerDataPartsVector visible_parts;
size_t rows = 0;
size_t part_num = 0;
String toString() const
@ -27,6 +31,17 @@ struct SourceTaskPayload
}
};
struct SourceTaskStat
{
SourceTaskStat(StorageID storage_id_, size_t rows_) : storage_id(storage_id_), rows(rows_)
{
}
StorageID storage_id;
size_t rows;
Protos::SourceTaskStat toProto() const;
static SourceTaskStat fromProto(const Protos::SourceTaskStat & proto);
};
struct SourceTaskPayloadOnWorker
{
String worker_id;

View File

@ -219,6 +219,9 @@ void executePlanSegmentRemotelyWithPreparedBuf(
}
}
for (const auto & iter : execution_info.source_task_stats)
*request.add_source_task_stats() = iter.second.toProto();
if (execution_info.worker_epoch > 0)
request.set_worker_epoch(execution_info.worker_epoch);

View File

@ -516,6 +516,7 @@ NodeSelectorResult SourceNodeSelector::select(PlanSegment * plan_segment_ptr, Co
is_bucket_valid = is_bucket_valid && hasBucketScan(*plan_segment_ptr);
/// table => (worker => payload)
const auto & source_task_payload_map = query_context->getCnchServerResource()->getSourceTaskPayload();
for (const auto & plan_segment_input : plan_segment_ptr->getPlanSegmentInputs())
{
@ -530,7 +531,15 @@ NodeSelectorResult SourceNodeSelector::select(PlanSegment * plan_segment_ptr, Co
rows_count += p.rows;
auto & worker_payload = payload_on_workers[addr];
worker_payload.rows += p.rows;
worker_payload.part_num += 1;
worker_payload.part_num += p.part_num;
LOG_TRACE(
log,
"Payload on Worker({}) is rows:{} part_num:{} visible_part size:{} buckets size:{}",
addr.toShortString(),
p.rows,
p.part_num,
p.visible_parts.size(),
p.buckets.size());
if (is_bucket_valid)
{
for (auto bucket : p.buckets)

View File

@ -107,6 +107,7 @@ struct NodeSelectorResult
std::vector<size_t> indexes;
std::unordered_map<AddressInfo, size_t, AddressInfo::Hash> source_task_count_on_workers;
std::unordered_map<AddressInfo, std::vector<std::set<Int64>>, AddressInfo::Hash> buckets_on_workers;
std::unordered_map<AddressInfo, std::unordered_map<UUID, SourceTaskStat>, AddressInfo::Hash> worker_source_task_stats;
//input plansegment id => source address and partition ids, ordered by parallel index, used by bsp mode
std::map<PlanSegmentInstanceId, std::vector<PlanSegmentMultiPartitionSource>> sources;

View File

@ -15,8 +15,8 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/SystemLog.h>
namespace ProfileEvents
{

View File

@ -14,18 +14,23 @@
*/
#include <Catalog/Catalog.h>
#include <Catalog/DataModelPartWrapper.h>
#include <Catalog/DataModelPartWrapper_fwd.h>
#include <Interpreters/Context.h>
#include <Interpreters/WorkerGroupHandle.h>
#include <MergeTreeCommon/assignCnchParts.h>
#include <Storages/DataLakes/ScanInfo/ILakeScanInfo.h>
#include <Storages/MergeTree/DeleteBitmapMeta.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/RemoteFile/CnchFileCommon.h>
#include <Storages/RemoteFile/CnchFileSettings.h>
#include <Storages/MergeTree/DeleteBitmapMeta.h>
#include <Storages/DataLakes/ScanInfo/ILakeScanInfo.h>
#include <Catalog/DataModelPartWrapper.h>
#include "Common/HostWithPorts.h"
#include "common/types.h"
#include <Common/HostWithPorts.h>
#include <common/logger_useful.h>
#include "Catalog/DataModelPartWrapper_fwd.h"
#include "Interpreters/Context.h"
#include <common/types.h>
#include <sstream>
#include <unordered_map>
@ -816,7 +821,31 @@ std::pair<ServerAssignmentMap, VirtualPartAssignmentMap> assignCnchHybridParts(
}
}
void filterParts(IMergeTreeDataPartsVector & parts, const SourceTaskFilter & filter)
template <class T>
struct GetBucketTrait
{
};
template <>
struct GetBucketTrait<ServerDataPart>
{
static Int64 getBucketNumber(const ServerDataPart & part)
{
return part.part_model().bucket_number();
}
};
template <>
struct GetBucketTrait<IMergeTreeDataPart>
{
static Int64 getBucketNumber(const IMergeTreeDataPart & part)
{
return part.bucket_number;
}
};
template <class T>
void filterParts(std::vector<std::shared_ptr<const T>> & parts, const SourceTaskFilter & filter)
{
if (filter.index && filter.count)
{
@ -844,11 +873,15 @@ void filterParts(IMergeTreeDataPartsVector & parts, const SourceTaskFilter & fil
for (auto iter = parts.begin(); iter != parts.end();)
{
const auto & part = *iter;
if (!buckets.contains(part->bucket_number))
if (!buckets.contains(GetBucketTrait<T>::getBucketNumber(*part)))
iter = parts.erase(iter);
else
iter++;
}
}
}
template void
filterParts<Coordination::IMergeTreeDataPart>(Coordination::IMergeTreeDataPartsVector & parts, const SourceTaskFilter & filter);
template void filterParts<ServerDataPart>(ServerDataPartsVector & parts, const SourceTaskFilter & filter);
}

View File

@ -117,5 +117,10 @@ void mergeConsecutiveRanges(VirtualPartAssignmentMap & virtual_part_assignment);
ServerVirtualPartVector getVirtualPartVector(const ServerDataPartsVector & parts, std::map<int, std::unique_ptr<MarkRanges>> & parts_entry);
void filterParts(Coordination::IMergeTreeDataPartsVector & parts, const SourceTaskFilter & filter);
template <class T>
void filterParts(std::vector<std::shared_ptr<const T>> & parts, const SourceTaskFilter & filter);
extern template void
filterParts<Coordination::IMergeTreeDataPart>(Coordination::IMergeTreeDataPartsVector & parts, const SourceTaskFilter & filter);
extern template void filterParts<ServerDataPart>(ServerDataPartsVector & parts, const SourceTaskFilter & filter);
}

View File

@ -134,6 +134,11 @@ message SourceTaskFilter {
repeated int64 buckets = 3;
}
message SourceTaskStat {
required StorageID storage_id = 1;
optional uint64 rows = 2;
}
message SubmitPlanSegmentRequest {
// major version number
required uint32 brpc_protocol_major_revision = 1;
@ -156,6 +161,7 @@ message SubmitPlanSegmentRequest {
optional SourceTaskFilter source_task_filter = 11;
repeated PlanSegmentMultiPartitionSource sources = 12;
optional uint32 worker_epoch = 13;
repeated SourceTaskStat source_task_stats = 15;
}
message PlanSegmentHeader {