Merge branch 'cnch_dev_fix_expected_parts_number' into 'cnch-dev'

fix(clickhousech@m-4619293134): cnch dev fix some bad cases for expected parts number

See merge request dp/ClickHouse!25049
This commit is contained in:
石宇泽 2024-10-22 06:48:47 +00:00 committed by Fred Wang
parent d7d1525bee
commit f55a586c84
22 changed files with 425 additions and 296 deletions

View File

@ -149,6 +149,7 @@ ManipulationTaskRecord::~ManipulationTaskRecord()
{
if (!try_execute && !parts.empty())
{
UInt64 source_parts_rows = 0;
std::lock_guard lock(parent.currently_merging_mutating_parts_mutex);
for (auto & part : parts)
{
@ -159,6 +160,13 @@ ManipulationTaskRecord::~ManipulationTaskRecord()
parent.currently_merging_mutating_parts.erase(prev_part->name());
prev_part = prev_part->tryGetPreviousPart();
}
source_parts_rows += part->rowsCount();
}
if (!parts.empty())
{
auto it = parent.merging_mutating_tasks_rows.try_emplace(parts.front()->info().partition_id, 0, 0).first;
it->second.first -= std::min(it->second.first, 1UL);
it->second.second -= std::min(it->second.second, source_parts_rows);
}
}
@ -199,6 +207,7 @@ FutureManipulationTask::~FutureManipulationTask()
{
if (!try_execute && !parts.empty())
{
UInt64 source_parts_rows = 0;
std::lock_guard lock(parent.currently_merging_mutating_parts_mutex);
for (auto & part : parts)
{
@ -209,6 +218,13 @@ FutureManipulationTask::~FutureManipulationTask()
parent.currently_merging_mutating_parts.erase(prev_part->name());
prev_part = prev_part->tryGetPreviousPart();
}
source_parts_rows += part->rowsCount();
}
if (!parts.empty())
{
auto it = parent.merging_mutating_tasks_rows.try_emplace(parts.front()->info().partition_id, 0, 0).first;
it->second.first -= std::min(it->second.first, 1UL);
it->second.second -= std::min(it->second.second, source_parts_rows);
}
}
}
@ -229,6 +245,7 @@ FutureManipulationTask & FutureManipulationTask::tagSourceParts(ServerDataPartsV
if (!record->try_execute)
{
UInt64 source_parts_rows = 0;
std::lock_guard lock(parent.currently_merging_mutating_parts_mutex);
for (const auto & p : parts_)
@ -241,6 +258,13 @@ FutureManipulationTask & FutureManipulationTask::tagSourceParts(ServerDataPartsV
check_and_add(prev_part->name());
prev_part = prev_part->tryGetPreviousPart();
}
source_parts_rows += p->rowsCount();
}
if (!parts_.empty())
{
auto it = parent.merging_mutating_tasks_rows.try_emplace(parts_.front()->info().partition_id, 0, 0).first;
it->second.first++;
it->second.second += source_parts_rows;
}
}
@ -672,7 +696,7 @@ bool CnchMergeMutateThread::tryMergeParts(StoragePtr & istorage, StorageCnchMerg
void removeUnselectableParts(
ServerDataPartsVector & visible_parts,
NameSet & merging_mutating_parts_snapshot,
std::multimap<String, UInt64> & unselectable_part_rows,
std::unordered_map<String, std::pair<UInt64, UInt64> > & unselectable_part_rows,
UInt64 max_bytes,
UInt64 max_rows)
{
@ -681,13 +705,18 @@ void removeUnselectableParts(
visible_parts.begin(),
visible_parts.end(),
[&merging_mutating_parts_snapshot, &unselectable_part_rows, max_bytes, max_rows](const auto & p) {
if (merging_mutating_parts_snapshot.erase(p->name())
|| p->part_model().rows_count() >= max_rows * 0.9
|| p->part_model().size() >= max_bytes * 0.9)
if (merging_mutating_parts_snapshot.erase(p->name()))
return true;
if (p->part_model().rows_count() >= max_rows * 0.9 || p->part_model().size() >= max_bytes * 0.9)
{
unselectable_part_rows.emplace(p->info().partition_id, p->part_model().rows_count());
auto it = unselectable_part_rows.try_emplace(p->info().partition_id, 0, 0).first;
it->second.first++;
it->second.second += p->part_model().rows_count();
return true;
}
return false;
}),
visible_parts.end()
@ -813,7 +842,7 @@ bool CnchMergeMutateThread::trySelectPartsToMerge(StoragePtr & istorage, Storage
/// TODO: support checkpoints
/// Used to calculate total rows of each partition so we can prevent generating huge merge tasks.
std::multimap<String, UInt64> unselectable_part_rows;
auto unselectable_part_rows = copyCurrentlyMergingMutatingTasksRows();
auto max_bytes = std::min(
storage_settings->cnch_merge_max_total_bytes_to_merge.value,
@ -833,14 +862,6 @@ bool CnchMergeMutateThread::trySelectPartsToMerge(StoragePtr & istorage, Storage
return false;
}
}
else
{
for (const auto & p: visible_parts)
{
if (merging_mutating_parts_snapshot.count(p->name()) > 0)
unselectable_part_rows.emplace(p->info().partition_id, p->part_model().rows_count());
}
}
metrics.num_legal_visible_parts = visible_parts.size();
@ -1143,9 +1164,9 @@ String CnchMergeMutateThread::triggerPartMerge(
mutation_timestamps.reserve(mutation_entries.size());
for (const auto & [_, mutation_entry] : mutation_entries)
mutation_timestamps.emplace_back(mutation_entry.commit_time, mutation_entry.commands.changeSchema());
/// Used to calculate total rows of each partition so we can prevent generating huge merge tasks.
std::multimap<String, UInt64> unselectable_part_rows;
auto unselectable_part_rows = copyCurrentlyMergingMutatingTasksRows();
auto & storage = checkAndGetCnchTable(istorage);
auto storage_settings = storage.getSettings();
@ -1162,14 +1183,6 @@ String CnchMergeMutateThread::triggerPartMerge(
auto max_rows = storage_settings->cnch_merge_max_total_rows_to_merge.value;
removeUnselectableParts(visible_parts, merging_mutating_parts_snapshot, unselectable_part_rows, max_bytes, max_rows);
}
else
{
for (const auto & p: visible_parts)
{
if (merging_mutating_parts_snapshot.count(p->name()) > 0)
unselectable_part_rows.emplace(p->info().partition_id, p->part_model().rows_count());
}
}
/// Step 4: create merge predicate
auto can_merge_callback = getMergePred(merging_mutating_parts_snapshot, mutation_timestamps);

View File

@ -57,7 +57,7 @@ struct ManipulationTaskRecord
/// Set task_record's commit_start_time once it go into txn commit stage.
/// There are some other operations may be conflict with merge.
/// 1. DROP PARTITION - get the current max block id and generate a DropRange part.
/// 1. DROP PARTITION - get the current max block id and generate a DropRange part.
/// Need to cancel merge tasks before getting data parts.
/// 2. INGEST PARTITION - generate new content based on current source parts.
/// Need to cancel merge tasks and suspend the merge process before INGEST PARTITION finish.
@ -230,10 +230,18 @@ private:
return currently_merging_mutating_parts;
}
std::unordered_map<String, std::pair<UInt64, UInt64> > copyCurrentlyMergingMutatingTasksRows()
{
std::lock_guard lock(currently_merging_mutating_parts_mutex);
return merging_mutating_tasks_rows;
}
Strings removeLockedPartition(const Strings & partitions);
std::mutex currently_merging_mutating_parts_mutex;
NameSet currently_merging_mutating_parts;
/// partition_id -> {future_parts_number, future_part_rows}
std::unordered_map<String, std::pair<UInt64, UInt64> > merging_mutating_tasks_rows;
std::condition_variable currently_synchronous_tasks_cv; /// for waitTasksFinish function
std::mutex currently_synchronous_tasks_mutex;

View File

@ -23,14 +23,18 @@
#include <Storages/MergeTree/MergeSelectorAdaptiveController.h>
#include <Storages/StorageCnchMergeTree.h>
#include <boost/functional/hash.hpp>
namespace DB
{
static void groupPartsByColumnsMutationsCommitTime(const ServerDataPartsVector & parts, std::vector<ServerDataPartsVector> & part_ranges);
ServerSelectPartsDecision selectPartsToMerge(
const MergeTreeMetaBase & data,
std::vector<ServerDataPartsVector> & res,
const ServerDataPartsVector & data_parts,
const std::multimap<String, UInt64> & unselectable_part_rows,
const std::unordered_map<String, std::pair<UInt64, UInt64> > & unselectable_part_rows,
ServerCanMergeCallback can_merge_callback,
const SelectPartsToMergeSettings & settings,
LoggerPtr log)
@ -50,11 +54,12 @@ ServerSelectPartsDecision selectPartsToMerge(
bool aggressive = settings.aggressive;
bool enable_batch_select = settings.enable_batch_select;
bool final = settings.final;
bool select_nonadjacent_parts_allowed = data_settings->cnch_merge_select_nonadjacent_parts.value;
// bool merge_with_ttl_allowed = settings.merge_with_ttl_allowed
time_t current_time = std::time(nullptr);
IMergeSelector::PartsRanges parts_ranges;
IMergeSelector<ServerDataPart>::PartsRanges parts_ranges;
/// StoragePolicyPtr storage_policy = data.getStoragePolicy(IStorage::StorageLocation::MAIN);
/// Volumes with stopped merges are extremely rare situation.
@ -79,93 +84,104 @@ ServerSelectPartsDecision selectPartsToMerge(
for (auto & bucket: buckets)
{
const String * prev_partition_id = nullptr;
/// Previous part only in boundaries of partition frame
const ServerDataPartPtr * prev_part = nullptr;
std::vector<ServerDataPartsVector> part_ranges_before_split;
if (select_nonadjacent_parts_allowed)
groupPartsByColumnsMutationsCommitTime(bucket.second, part_ranges_before_split);
else
part_ranges_before_split.emplace_back(std::move(bucket.second));
for (const auto & part : bucket.second)
for (const auto & range_before_split: part_ranges_before_split)
{
const String & partition_id = part->info().partition_id;
const String * prev_partition_id = nullptr;
/// Previous part only in boundaries of partition frame
const ServerDataPartPtr * prev_part = nullptr;
if (!prev_partition_id
|| partition_id != *prev_partition_id
|| (!parts_ranges.empty() && parts_ranges.back().size() >= max_parts_to_break))
for (const auto & part : range_before_split)
{
if (parts_ranges.empty() || !parts_ranges.back().empty())
parts_ranges.emplace_back();
const String & partition_id = part->info().partition_id;
/// New partition frame.
prev_partition_id = &partition_id;
prev_part = nullptr;
}
/// If select_nonadjacent_parts_allowed is true, DanceMergeSelector will reorder parts by rows
bool need_split_by_max_parts_to_break = !select_nonadjacent_parts_allowed
&& !parts_ranges.empty() && parts_ranges.back().size() >= max_parts_to_break;
/// Check predicate only for the first part in each range.
if (!prev_part)
{
/* Parts can be merged with themselves for TTL needs for example.
* So we have to check if this part is currently being inserted with quorum and so on and so forth.
* Obviously we have to check it manually only for the first part
* of each partition because it will be automatically checked for a pair of parts. */
if (!can_merge_callback(nullptr, part))
continue;
/// This part can be merged only with next parts (no prev part exists), so start
/// new interval if previous was not empty.
if (!parts_ranges.back().empty())
parts_ranges.emplace_back();
}
else
{
/// If we cannot merge with previous part we had to start new parts
/// interval (in the same partition)
if (!can_merge_callback(*prev_part, part))
if (!prev_partition_id || partition_id != *prev_partition_id || need_split_by_max_parts_to_break)
{
/// Now we have no previous part
if (parts_ranges.empty() || !parts_ranges.back().empty())
parts_ranges.emplace_back();
/// New partition frame.
prev_partition_id = &partition_id;
prev_part = nullptr;
}
/// Mustn't be empty
assert(!parts_ranges.back().empty());
/// Some parts cannot be merged with previous parts and also cannot be merged with themselves,
/// for example, merge is already assigned for such parts, or they participate in quorum inserts
/// and so on.
/// Also we don't start new interval here (maybe all next parts cannot be merged and we don't want to have empty interval)
/// Check predicate only for the first part in each range.
if (!prev_part)
{
/* Parts can be merged with themselves for TTL needs for example.
* So we have to check if this part is currently being inserted with quorum and so on and so forth.
* Obviously we have to check it manually only for the first part
* of each partition because it will be automatically checked for a pair of parts. */
if (!can_merge_callback(nullptr, part))
continue;
/// Starting new interval in the same partition
parts_ranges.emplace_back();
/// This part can be merged only with next parts (no prev part exists), so start
/// new interval if previous was not empty.
if (!parts_ranges.back().empty())
parts_ranges.emplace_back();
}
else
{
/// If we cannot merge with previous part we had to start new parts
/// interval (in the same partition)
if (!can_merge_callback(*prev_part, part))
{
/// Now we have no previous part
prev_part = nullptr;
/// Mustn't be empty
assert(!parts_ranges.back().empty());
/// Some parts cannot be merged with previous parts and also cannot be merged with themselves,
/// for example, merge is already assigned for such parts, or they participate in quorum inserts
/// and so on.
/// Also we don't start new interval here (maybe all next parts cannot be merged and we don't want to have empty interval)
if (!can_merge_callback(nullptr, part))
continue;
/// Starting new interval in the same partition
parts_ranges.emplace_back();
}
}
IMergeSelector<ServerDataPart>::Part part_info;
part_info.size = part->part_model().size();
time_t part_commit_time = TxnTimestamp(part->getCommitTime()).toSecond();
auto p_part = part->tryGetPreviousPart();
while (p_part)
{
++part_info.chain_depth;
part_info.size += p_part->part_model().size();
part_commit_time = TxnTimestamp(p_part->getCommitTime()).toSecond();
p_part = p_part->tryGetPreviousPart();
}
/// Consider the base part's age as the part chain's age,
/// so that the merge selector will give it a better score.
part_info.age = current_time > part_commit_time ? current_time - part_commit_time : 0;
part_info.rows = part->rowsCount();
part_info.level = part->info().level;
part_info.data = part.get();
/// TODO:
/// part_info.ttl_infos = &part->ttl_infos;
/// part_info.compression_codec_desc = part->default_codec->getFullCodecDesc();
/// part_info.shall_participate_in_merges = has_volumes_with_disabled_merges ? part->shallParticipateInMerges(storage_policy) : true;
part_info.shall_participate_in_merges = true;
++parts_selected_precondition;
parts_ranges.back().emplace_back(part_info);
prev_part = &part;
}
IMergeSelector::Part part_info;
part_info.size = part->part_model().size();
time_t part_commit_time = TxnTimestamp(part->getCommitTime()).toSecond();
auto p_part = part->tryGetPreviousPart();
while (p_part)
{
++part_info.chain_depth;
part_info.size += p_part->part_model().size();
part_commit_time = TxnTimestamp(p_part->getCommitTime()).toSecond();
p_part = p_part->tryGetPreviousPart();
}
/// Consider the base part's age as the part chain's age,
/// so that the merge selector will give it a better score.
part_info.age = current_time > part_commit_time ? current_time - part_commit_time : 0;
part_info.rows = part->rowsCount();
part_info.level = part->info().level;
part_info.data = &part;
/// TODO:
/// part_info.ttl_infos = &part->ttl_infos;
/// part_info.compression_codec_desc = part->default_codec->getFullCodecDesc();
/// part_info.shall_participate_in_merges = has_volumes_with_disabled_merges ? part->shallParticipateInMerges(storage_policy) : true;
part_info.shall_participate_in_merges = true;
++parts_selected_precondition;
parts_ranges.back().emplace_back(part_info);
prev_part = &part;
}
}
@ -179,7 +195,7 @@ ServerSelectPartsDecision selectPartsToMerge(
/*
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{
IMergeSelector::PartsRange parts_to_merge;
IMergeSelector<ServerDataPart>::PartsRange parts_to_merge;
/// TTL delete is preferred to recompression
TTLDeleteMergeSelector delete_ttl_selector(
@ -238,7 +254,7 @@ ServerSelectPartsDecision selectPartsToMerge(
merge_settings.min_parts_to_merge_base = 1;
merge_settings.final = final;
merge_settings.max_age_for_single_part_chain = data_settings->merge_with_ttl_timeout;
merge_settings.select_nonadjacent_parts_allowed = data_settings->cnch_merge_select_nonadjacent_parts;
merge_settings.select_nonadjacent_parts_allowed = select_nonadjacent_parts_allowed;
auto merge_selector = std::make_unique<DanceMergeSelector>(merge_settings);
/// Using adaptive controller
@ -252,9 +268,13 @@ ServerSelectPartsDecision selectPartsToMerge(
if (expected_parts_number > 0)
{
UInt64 write_amplification_optimize_threshold = data_settings->cnch_merge_write_amplification_optimize_threshold.value;
if (log)
LOG_TRACE(log, "Using adaptive controller, expected_parts_number is {}", expected_parts_number);
auto adaptive_controller = std::make_shared<MergeSelectorAdaptiveController>(
data.isBucketTable(),
expected_parts_number,
write_amplification_optimize_threshold,
merge_settings.max_parts_to_merge_base.value);
adaptive_controller->init(bg_task_stats, parts_ranges, unselectable_part_rows);
merge_selector->setAdaptiveController(adaptive_controller);
@ -280,7 +300,7 @@ ServerSelectPartsDecision selectPartsToMerge(
{
if (log)
LOG_ERROR(log, "merge selector returned only one part to merge {}, skip this range.",
(*static_cast<const ServerDataPartPtr *>(range.front().data))->name());
static_cast<const ServerDataPart *>(range.front().data)->name());
continue;
}
// throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
@ -288,8 +308,8 @@ ServerSelectPartsDecision selectPartsToMerge(
auto & emplaced_parts = res.emplace_back();
emplaced_parts.reserve(range.size());
for (auto & part : range)
emplaced_parts.push_back(*static_cast<const ServerDataPartPtr *>(part.data));
emplaced_parts.push_back(static_cast<const ServerDataPart *>(part.data)->shared_from_this());
/// When enable selct nonadjacent parts, merge selector can sort parts by rows/size/age to get a
/// better selection. After selection, we need to sort parts again to get right result part name.
if (data_settings->cnch_merge_select_nonadjacent_parts.value)
@ -315,4 +335,23 @@ void groupPartsByBucketNumber(const MergeTreeMetaBase & data, std::unordered_map
}
}
static void groupPartsByColumnsMutationsCommitTime(const ServerDataPartsVector & parts, std::vector<ServerDataPartsVector> & part_ranges)
{
using GroupKeyType = std::pair<UInt64, UInt64>;
std::unordered_map<GroupKeyType, ServerDataPartsVector, boost::hash<GroupKeyType> > grouped_ranges;
for (const auto & p: parts)
{
GroupKeyType key = std::make_pair(p->getColumnsCommitTime(), p->getMutationCommitTime());
auto it = grouped_ranges.try_emplace(key).first;
it->second.emplace_back(p);
}
for (auto & [_, range]: grouped_ranges)
{
part_ranges.emplace_back();
std::swap(range, part_ranges.back());
}
}
}

View File

@ -46,7 +46,7 @@ ServerSelectPartsDecision selectPartsToMerge(
const MergeTreeMetaBase & data,
std::vector<ServerDataPartsVector> & res,
const ServerDataPartsVector & data_parts,
const std::multimap<String, UInt64> & unselectable_part_rows,
const std::unordered_map<String, std::pair<UInt64, UInt64> > & unselectable_part_rows,
ServerCanMergeCallback can_merge_callback,
const SelectPartsToMergeSettings & settings,
LoggerPtr log);

View File

@ -27,8 +27,10 @@
namespace DB
{
class IMergeTreeDataPart;
/// Select all parts within partition (having at least two parts) with minimum total size.
class AllMergeSelector : public IMergeSelector
class AllMergeSelector : public IMergeSelector<IMergeTreeDataPart>
{
public:
/// Parameter max_total_size_to_merge is ignored.

View File

@ -53,14 +53,23 @@ void DanceMergeSelectorSettings::loadFromConfig(const Poco::Util::AbstractConfig
}
}
static void reorderPartsRange(IMergeSelector::PartsRange & parts)
static void reorderPartsRange(DanceMergeSelector::PartsRange & parts, UInt64 max_parts_to_sort)
{
/// Sort parts by size/rows/age to select smaller merge tasks
std::sort(parts.begin(), parts.end(), [](const IMergeSelector::Part & lhs, const IMergeSelector::Part & rhs)
{
time_t max_age = std::numeric_limits<time_t>::max();
DanceMergeSelector::PartsRange::iterator sort_end = parts.end();
auto cmp = [](const DanceMergeSelector::Part & lhs, const DanceMergeSelector::Part & rhs) {
constexpr time_t max_age = std::numeric_limits<time_t>::max();
return std::make_tuple(lhs.size, lhs.rows, max_age - lhs.age) < std::make_tuple(rhs.size, rhs.rows, max_age - rhs.age);
});
};
if (parts.size() > max_parts_to_sort * 2)
{
sort_end = parts.begin() + max_parts_to_sort;
std::nth_element(parts.begin(), sort_end, parts.end(), cmp);
}
/// Sort parts by size/rows/age to select smaller merge tasks
std::sort(parts.begin(), sort_end, cmp);
}
static double score(double count, double sum_size, double sum_size_fixed_cost, double count_exp)
@ -73,7 +82,7 @@ static double mapPiecewiseLinearToUnit(double value, double min, double max)
return value <= min ? 0 : (value >= max ? 1 : ((value - min) / (max - min)));
}
IMergeSelector::PartsRange DanceMergeSelector::select(PartsRanges & partitions, const size_t max_total_size_to_merge, MergeScheduler * merge_scheduler)
DanceMergeSelector::PartsRange DanceMergeSelector::select(PartsRanges & partitions, const size_t max_total_size_to_merge, MergeScheduler * merge_scheduler)
{
if (settings.enable_batch_select)
{
@ -87,14 +96,15 @@ IMergeSelector::PartsRange DanceMergeSelector::select(PartsRanges & partitions,
{
num_parts_of_partitions[getPartitionID(partition.front())] += partition.size();
if (settings.select_nonadjacent_parts_allowed)
reorderPartsRange(partition);
reorderPartsRange(partition, settings.max_parts_to_break.value + settings.max_parts_to_merge_base.value);
}
}
std::unordered_map<String, std::pair<UInt64, UInt64> > allowed_parts_rows;
for (const auto & partition : partitions)
{
if (partition.size() >= 2)
selectWithinPartition(partition, max_total_size_to_merge, merge_scheduler);
selectWithinPartition(partition, max_total_size_to_merge, allowed_parts_rows, merge_scheduler);
}
/// Because of using iterator in best_ranges, should not modify part ranges vector after selectWithPartition.
@ -114,7 +124,7 @@ IMergeSelector::PartsRange DanceMergeSelector::select(PartsRanges & partitions,
return {};
}
IMergeSelector::PartsRanges DanceMergeSelector::selectMulti(PartsRanges & partitions, const size_t max_total_size_to_merge, MergeScheduler * merge_scheduler)
DanceMergeSelector::PartsRanges DanceMergeSelector::selectMulti(PartsRanges & partitions, const size_t max_total_size_to_merge, MergeScheduler * merge_scheduler)
{
for (auto & partition : partitions)
{
@ -122,13 +132,14 @@ IMergeSelector::PartsRanges DanceMergeSelector::selectMulti(PartsRanges & partit
{
num_parts_of_partitions[getPartitionID(partition.front())] += partition.size();
if (settings.select_nonadjacent_parts_allowed)
reorderPartsRange(partition);
reorderPartsRange(partition, settings.max_parts_to_break.value + settings.max_parts_to_merge_base.value);
}
}
std::unordered_map<String, std::pair<UInt64, UInt64> > allowed_parts_rows;
for (const auto & partition : partitions)
{
selectWithinPartition(partition, max_total_size_to_merge, merge_scheduler);
selectWithinPartition(partition, max_total_size_to_merge, allowed_parts_rows, merge_scheduler);
}
/// Because of using iterator in best_ranges, should not modify part ranges vector after selectWithPartition.
@ -177,7 +188,8 @@ void DanceMergeSelector::selectRangesFromScoreTable(
size_t j,
size_t num_max_out,
size_t max_width,
std::vector<BestRangeWithScore> & out)
std::vector<BestRangeWithScore> & out,
size_t & allowed_parts)
{
if (i >= j || out.size() >= num_max_out)
return;
@ -207,13 +219,21 @@ void DanceMergeSelector::selectRangesFromScoreTable(
BestRangeWithScore range{};
range.update(min_score, parts.begin() + min_i, parts.begin() + min_j + 1);
size_t parts_in_range = std::distance(range.best_begin, range.best_end);
/// Already selected enough parts in current batch according to expected_parts_number. After merge, will generate a
/// new part and remove ${parts_in_range} parts.
if (parts_in_range > allowed_parts + 1)
return;
out.push_back(range);
allowed_parts = allowed_parts + 1 - parts_in_range;
if (min_i > i + 1)
selectRangesFromScoreTable(parts, score_table, i, min_i - 1, num_max_out, max_width, out);
selectRangesFromScoreTable(parts, score_table, i, min_i - 1, num_max_out, max_width, out, allowed_parts);
if (min_j < j - 1)
selectRangesFromScoreTable(parts, score_table, min_j + 1, j, num_max_out, max_width, out);
selectRangesFromScoreTable(parts, score_table, min_j + 1, j, num_max_out, max_width, out, allowed_parts);
}
/// Like std::min, but treat 0 as infinity
@ -222,7 +242,12 @@ static size_t minValueWithoutZero(size_t a, size_t b)
return a == 0 ? b : (b == 0 ? a : std::min(a, b));
}
void DanceMergeSelector::selectWithinPartition(const PartsRange & parts, const size_t max_total_size_to_merge, [[maybe_unused]] MergeScheduler * merge_scheduler)
void DanceMergeSelector::selectWithinPartition(
const PartsRange & parts,
const size_t max_total_size_to_merge,
/// partition_id -> {allowed_parts_in_all_ranges, allowed_rows_in_single_range}
std::unordered_map<String, std::pair<UInt64, UInt64> > & allowed_parts_rows,
[[maybe_unused]] MergeScheduler * merge_scheduler)
{
if (parts.empty())
return;
@ -246,10 +271,18 @@ void DanceMergeSelector::selectWithinPartition(const PartsRange & parts, const s
bool enable_batch_select = enable_batch_select_for_partition(partition_id);
double base = getModifiedBaseByController(partition_id);
const auto & [max_parts_, max_rows_] =
controller ? controller->getMaxPartsAndRows(partition_id) : std::make_pair(settings.max_parts_to_merge_base, 0);
size_t max_parts_to_merge = std::min(max_parts_, settings.max_parts_to_merge_base.value);
size_t max_rows_to_merge = minValueWithoutZero(max_rows_, settings.max_total_rows_to_merge);
auto [it, emplaced] = allowed_parts_rows.try_emplace(partition_id, std::numeric_limits<size_t>::max() - 1, 0);
if (controller && emplaced)
it->second = controller->getMaxPartsAndRows(partition_id);
/// If batch select is enabled, allowed_parts_in_all_ranges will limit source parts number in all part ranges of
/// current partition. For example, we have expected_parts_number = 10, and there is 11 parts in current partition,
/// but splited into 2 ranges by columns_commit_time, we can only select 1 task with 2 parts within these 2 ranges
/// to keep parts number not less than expected_parts_number.
size_t & allowed_parts_in_all_ranges = it->second.first;
size_t max_parts_to_merge = std::min(it->second.first + 1, settings.max_parts_to_merge_base.value);
size_t max_rows_to_merge = minValueWithoutZero(it->second.second, settings.max_total_rows_to_merge.value);
/// score_table[i][j] means begin with i and length is j --> range [i, i + j - 1]
std::vector<std::vector<double>> score_table;
@ -372,10 +405,10 @@ void DanceMergeSelector::selectWithinPartition(const PartsRange & parts, const s
size_t begin = best_range.best_begin - parts.begin();
size_t end = best_range.best_end - parts.begin() - 1;
if (begin > 1)
selectRangesFromScoreTable(parts, score_table, 0, begin - 1, num_expected_ranges, max_parts_to_merge, res_ranges);
selectRangesFromScoreTable(parts, score_table, 0, begin - 1, num_expected_ranges, max_parts_to_merge, res_ranges, allowed_parts_in_all_ranges);
if (end + 2 < max_end)
selectRangesFromScoreTable(parts, score_table, end + 1, max_end - 1, num_expected_ranges, max_parts_to_merge, res_ranges);
selectRangesFromScoreTable(parts, score_table, end + 1, max_end - 1, num_expected_ranges, max_parts_to_merge, res_ranges, allowed_parts_in_all_ranges);
}
for (const auto & range : res_ranges)
@ -435,16 +468,9 @@ bool DanceMergeSelector::allow(double base, double sum_size, double max_size, do
return (sum_size + range_size * settings.size_fixed_cost_to_add) / (max_size + settings.size_fixed_cost_to_add) >= lowered_base;
}
String DanceMergeSelector::getPartitionID(const Part & part)
{
if (get_partition_id)
return get_partition_id(part);
return part.getDataPartPtr()->info.partition_id;
}
double DanceMergeSelector::getModifiedBaseByController(const String & partition_id)
{
if (controller && controller->needControlWriteAmplification(partition_id))
if (controller && controller->needOptimizeWriteAmplification(partition_id))
{
double base = settings.min_parts_to_merge_base;
const auto & [wa, wa_min, wa_max] = controller->getWriteAmplification(partition_id);

View File

@ -15,6 +15,7 @@
#pragma once
#include <Catalog/DataModelPartWrapper.h>
#include <Core/BaseSettings.h>
#include <Storages/MergeTree/MergeSelector.h>
#include <Storages/MergeTree/MergeSelectorAdaptiveController.h>
@ -23,7 +24,7 @@
namespace DB
{
class MergeScheduler;
class MergeTreeMetaBase;
class ServerDataPart;
#define MERGE_MAX_PARTS_TO_BREAK 10000
@ -69,7 +70,7 @@ public:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config);
};
class DanceMergeSelector : public IMergeSelector
class DanceMergeSelector : public IMergeSelector<ServerDataPart>
{
public:
using Iterator = PartsRange::const_iterator;
@ -108,10 +109,14 @@ public:
}
};
void debugSetGetPartitionID(std::function<String (const Part &)> get_partition_id_) { get_partition_id = get_partition_id_; }
private:
void selectWithinPartition(const PartsRange & parts, const size_t max_total_size_to_merge, MergeScheduler * merge_scheduler = nullptr);
void selectWithinPartition(
const PartsRange & parts,
const size_t max_total_size_to_merge,
/// partition_id -> {allowed_parts_in_all_ranges, allowed_rows_in_single_range}
std::unordered_map<String, std::pair<UInt64, UInt64> > & allowed_parts_rows,
MergeScheduler * merge_scheduler = nullptr);
bool allow(double base, double sum_size, double max_size, double min_age, double range_size);
const Settings settings;
@ -125,11 +130,10 @@ private:
// const MergeSelectorAdaptiveController & controller;
std::shared_ptr<MergeSelectorAdaptiveController> controller;
std::function<String (const Part &)> get_partition_id{};
void selectRangesFromScoreTable(const PartsRange & parts, const std::vector<std::vector<double>> & score_table, size_t i, size_t j, size_t n,
size_t max_width, std::vector<BestRangeWithScore> & out, size_t & allowed_parts);
void selectRangesFromScoreTable(const PartsRange & parts, const std::vector<std::vector<double>> & score_table, size_t i, size_t j, size_t n, size_t max_width, std::vector<BestRangeWithScore> & out);
String getPartitionID(const Part & part);
String getPartitionID(const Part & part) { return part.getDataPartPtr()->info().partition_id; }
inline bool enable_batch_select_for_partition(const String & partition_id)
{

View File

@ -27,10 +27,12 @@
namespace DB
{
class IMergeTreeDataPart;
/** Select parts to merge based on its level.
* Select first range of parts of parts_to_merge length with minimum level.
*/
class LevelMergeSelector : public IMergeSelector
class LevelMergeSelector : public IMergeSelector<IMergeTreeDataPart>
{
public:
struct Settings

View File

@ -123,7 +123,7 @@ public:
void prepareCountMerges();
void prepareCountQueries();
PartSizeInfo countPartSize(const IMergeSelector::PartsRange & parts, size_t begin, size_t end);
PartSizeInfo countPartSize(const IMergeSelector<IMergeTreeDataPart>::PartsRange & parts, size_t begin, size_t end);
void getEstimatedBytes(size_t & bytes)
{
@ -151,7 +151,7 @@ public:
void prepare();
bool strategyOfTime();
bool strategyOfSize(const MergeScheduler::PartSizeInfo & part_size_info, bool only_check = false);
bool canMerge(const IMergeSelector::PartsRange & parts, size_t begin, size_t end);
bool canMerge(const IMergeSelector<IMergeTreeDataPart>::PartsRange & parts, size_t begin, size_t end);
static bool expiredUTCTime(const ContextPtr & context);
MergeTreeData::DataPartsVector getPartsForOptimize(const MergeTreeData::DataPartsVector & parts);

View File

@ -25,7 +25,6 @@
#include <ctime>
#include <vector>
#include <functional>
#include <Storages/MergeTree/IMergeTreeDataPart_fwd.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Parsers/IAST_fwd.h>
@ -48,6 +47,7 @@ class MergeScheduler;
*
* Number of parallel merges are controlled outside of scope of this interface.
*/
template<class DataPartType>
class IMergeSelector
{
public:
@ -81,9 +81,9 @@ public:
/// The depth of the part chain. Convert the part chain to a new base part when the chain is long.
size_t chain_depth = 0;
const MergeTreeDataPartPtr & getDataPartPtr() const
const DataPartType * getDataPartPtr() const
{
return *static_cast<const MergeTreeDataPartPtr *>(data);
return static_cast<const DataPartType *>(data);
}
};

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeSelectorAdaptiveController.h>
#include <Catalog/DataModelPartWrapper.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
@ -13,8 +14,8 @@ static double getAverageWithoutZero(double a, double b, double abs_zero = 1E-6)
void MergeSelectorAdaptiveController::init(
MergeTreeBgTaskStatisticsPtr & stats,
const IMergeSelector::PartsRanges & parts_ranges,
const std::multimap<String, UInt64> & unselectable_part_rows)
const IMergeSelector<ServerDataPart>::PartsRanges & parts_ranges,
const std::unordered_map<String, std::pair<UInt64, UInt64> > & unselectable_part_rows)
{
estimators.clear();
if (!now)
@ -23,7 +24,7 @@ void MergeSelectorAdaptiveController::init(
/// Currently only support write amplification control, which is not suitable for bucket table.
if (is_bucket_table || expected_parts < 1)
return;
/// Update current_parts, current_rows and smallest_part_rows
std::unordered_set<String> partition_ids;
for (const auto & parts_range: parts_ranges)
@ -53,16 +54,15 @@ void MergeSelectorAdaptiveController::init(
}
}
/// Update current_parts, current_rows and smallest_part_rows by unselectable parts rows
for (const auto & [p, r]: unselectable_part_rows)
/// Update current_parts, current_rows by unselectable parts rows
for (const auto & [p, pair]: unselectable_part_rows)
{
auto it = estimators.find(p);
if (it == estimators.end())
continue;
it->second.current_parts++;
it->second.current_rows += r;
it->second.smallest_part_rows = std::min(it->second.smallest_part_rows, r);
it->second.current_parts += pair.first;
it->second.current_rows += pair.second;
}
/// Update other infos in estimators
@ -75,7 +75,7 @@ void MergeSelectorAdaptiveController::init(
auto it = partition_stats_map.find(partition_id);
if (it == partition_stats_map.end())
continue;
auto & estimator_elem = estimators[partition_id];
if (auto last_hour_stats_optional = it->second.last_hour_stats.lastIntervalStats(now))
@ -107,15 +107,18 @@ void MergeSelectorAdaptiveController::init(
}
}
bool MergeSelectorAdaptiveController::needControlWriteAmplification(const String & partition_id) const
bool MergeSelectorAdaptiveController::needOptimizeWriteAmplification(const String & partition_id) const
{
if (wa_optimize_threshold == 0)
return false;
const auto & estimator_elem = getEstimatorElement(partition_id);
/// Write amplification may be not precise enough if only a little insertions.
if (is_bucket_table || expected_parts < 1 || !isRealTimePartition(estimator_elem) || !haveEnoughInfo(estimator_elem))
return false;
return estimator_elem.current_parts <= 4 * expected_parts && std::get<0>(estimator_elem.wa) > std::get<1>(estimator_elem.wa);
return estimator_elem.current_parts <= wa_optimize_threshold * expected_parts && std::get<0>(estimator_elem.wa) > std::get<1>(estimator_elem.wa);
}
/// write_amplification, wa_min, wa_max
@ -129,23 +132,16 @@ std::tuple<double, double, double> MergeSelectorAdaptiveController::getWriteAmpl
std::pair<size_t, size_t> MergeSelectorAdaptiveController::getMaxPartsAndRows(const String & partition_id) const
{
if (is_bucket_table || expected_parts < 1)
return std::make_pair(max_parts_to_merge, 0); /// Won't modify merge selector settings
return std::make_pair(std::numeric_limits<size_t>::max() - 1, 0); /// Won't modify merge selector settings
const auto & estimator_elem = getEstimatorElement(partition_id);
if (estimator_elem.current_parts <= expected_parts)
return std::make_pair(1, 1); /// Won't allow any merge
size_t max_parts = estimator_elem.current_parts - expected_parts + 1;
size_t max_parts = estimator_elem.current_parts - expected_parts;
size_t max_rows = estimator_elem.current_rows / expected_parts + estimator_elem.smallest_part_rows - 1;
return std::make_pair(max_parts, max_rows);
}
String MergeSelectorAdaptiveController::getPartitionID(const IMergeSelector::Part & part)
{
if (unlikely(get_partition_id))
return get_partition_id(part);
return part.getDataPartPtr()->info.partition_id;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <common/types.h>
#include <Catalog/DataModelPartWrapper.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/MergeSelector.h>
#include <Storages/MergeTree/MergeTreeBgTaskStatistics.h>
@ -8,25 +9,28 @@
namespace DB
{
class ServerDataPart;
/// AdaptiveController is a stateless object used to fine tune merge selector settings for each partition by
/// current state and historical statistics. For example, if merged bytes is much greater than inserted bytes,
/// which means write amplification of merge is very high, this information can be used to get a better merge
/// selection result.
/// selection result.
class MergeSelectorAdaptiveController
{
public:
MergeSelectorAdaptiveController(bool is_bucket_table_, UInt64 expected_parts_, UInt64 max_parts_to_merge_)
MergeSelectorAdaptiveController(bool is_bucket_table_, UInt64 expected_parts_, UInt64 wa_optimize_threshold_, UInt64 max_parts_to_merge_)
: is_bucket_table(is_bucket_table_)
, expected_parts(expected_parts_)
, wa_optimize_threshold(wa_optimize_threshold_)
, max_parts_to_merge(max_parts_to_merge_)
{ }
void init(
MergeTreeBgTaskStatisticsPtr & stats,
const IMergeSelector::PartsRanges & parts_ranges,
const std::multimap<String, UInt64> & unselectable_part_rows);
const IMergeSelector<ServerDataPart>::PartsRanges & parts_ranges,
const std::unordered_map<String, std::pair<UInt64, UInt64> > & unselectable_part_rows);
bool needControlWriteAmplification(const String & partition_id) const;
bool needOptimizeWriteAmplification(const String & partition_id) const;
/// write_amplification, wa_min, wa_max
std::tuple<double, double, double> getWriteAmplification(const String & partition_id) const;
@ -34,9 +38,7 @@ public:
/// max_parts, max_rows, 0 means unlimited
std::pair<size_t, size_t> getMaxPartsAndRows(const String & partition_id) const;
/// functions only used in ut
void debugSetGetPartitionID(std::function<String (const IMergeSelector::Part &)> get_partition_id_) { get_partition_id = get_partition_id_; }
void debugSetNow(time_t now_) { now = now_; }
void setCurrentTime(time_t now_) { now = now_; }
protected:
struct EstimatorElement
@ -57,7 +59,7 @@ protected:
bool isRealTimePartition(const EstimatorElement & estimator_elem) const
{
return estimator_elem.inserted_parts > 0 && estimator_elem.last_insert_time + 24 * 60 * 60 > now;
return estimator_elem.inserted_parts > 0 && estimator_elem.last_insert_time + 6 * 60 * 60 > now;
}
bool haveEnoughInfo(const EstimatorElement & estimator_elem) const
@ -65,7 +67,7 @@ protected:
return estimator_elem.inserted_parts >= 10;
}
String getPartitionID(const IMergeSelector::Part & part);
String getPartitionID(const IMergeSelector<ServerDataPart>::Part & part) { return part.getDataPartPtr()->info().partition_id; }
const EstimatorElement empty_estimator_elem;
const EstimatorElement & getEstimatorElement(const String & partition_id) const
@ -78,11 +80,10 @@ protected:
bool is_bucket_table;
UInt64 expected_parts;
UInt64 wa_optimize_threshold;
UInt64 max_parts_to_merge;
std::unordered_map<String, EstimatorElement> estimators;
time_t now{0};
std::function<String (const IMergeSelector::Part &)> get_partition_id{};
};
using MergeControllerPtr = std::shared_ptr<MergeSelectorAdaptiveController>;

View File

@ -119,7 +119,7 @@ SELECT uuid, partition_id,
sumIf(bytes, event_type='MergeParts') AS merged_bytes,
countIf(event_type='RemovePart' and rows>0) AS removed_parts
FROM system.server_part_log
WHERE table NOT LIKE '\%CHTMP' AND event_date >= today() - 7 AND {}
WHERE table NOT LIKE '\%CHTMP' AND event_date >= today() - 3 AND {}
GROUP BY uuid, partition_id, start_of_hour
HAVING inserted_parts > 0 OR merged_parts > 0 OR removed_parts > 0;
)""";

View File

@ -103,7 +103,7 @@ static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 2;
/// because between selecting parts to merge and doing merge, amount of free space could have decreased.
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.1;
static MergeTreeData::DataPartsVector toDataPartsVector(const IMergeSelector::PartsRange & parts)
static MergeTreeData::DataPartsVector toDataPartsVector(const IMergeSelector<IMergeTreeDataPart>::PartsRange & parts)
{
MergeTreeData::DataPartsVector res;
res.reserve(parts.size());
@ -270,7 +270,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
time_t current_time = std::time(nullptr);
IMergeSelector::PartsRanges parts_ranges;
IMergeSelector<IMergeTreeDataPart>::PartsRanges parts_ranges;
StoragePolicyPtr storage_policy = data.getStoragePolicy(IStorage::StorageLocation::MAIN);
/// Volumes with stopped merges are extremely rare situation.
@ -335,7 +335,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
}
}
IMergeSelector::Part part_info;
IMergeSelector<IMergeTreeDataPart>::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.rows = part->rows_count;
part_info.age = current_time - part->modification_time;
@ -366,7 +366,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
return SelectPartsDecision::CANNOT_SELECT;
}
IMergeSelector::PartsRange parts_to_merge;
IMergeSelector<IMergeTreeDataPart>::PartsRange parts_to_merge;
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{
@ -422,7 +422,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
MergeTreeData::DataPartsVector parts;
parts.reserve(parts_to_merge.size());
for (IMergeSelector::Part & part_info : parts_to_merge)
for (IMergeSelector<IMergeTreeDataPart>::Part & part_info : parts_to_merge)
{
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
parts.push_back(part);
@ -458,7 +458,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeMulti(
time_t current_time = std::time(nullptr);
IMergeSelector::PartsRanges parts_ranges;
IMergeSelector<IMergeTreeDataPart>::PartsRanges parts_ranges;
StoragePolicyPtr storage_policy = data.getStoragePolicy(IStorage::StorageLocation::MAIN);
/// Volumes with stopped merges are extremely rare situation.
@ -523,7 +523,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeMulti(
}
}
IMergeSelector::Part part_info;
IMergeSelector<IMergeTreeDataPart>::Part part_info;
part_info.size = part->getBytesOnDisk();
part_info.rows = part->rows_count;
part_info.age = current_time - part->modification_time;
@ -556,7 +556,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeMulti(
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{
IMergeSelector::PartsRange parts_to_merge;
IMergeSelector<IMergeTreeDataPart>::PartsRange parts_to_merge;
/// TTL delete is preferred to recompression
TTLDeleteMergeSelector delete_ttl_selector(
@ -592,30 +592,33 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeMulti(
return SelectPartsDecision::SELECTED;
}
std::unique_ptr<IMergeSelector> merge_selector;
std::unique_ptr<IMergeSelector<IMergeTreeDataPart>> merge_selector;
auto & config = data.getContext()->getConfigRef();
auto merge_selector_str = config.getString("merge_selector", "simple");
if (merge_selector_str == "dance")
{
DanceMergeSelector::Settings merge_settings;
merge_settings.loadFromConfig(config);
/// Override value from table settings
/// For CNCH compatibility, we use cnch_merge_max_parts_to_merge and max_parts_to_merge_at_once.
merge_settings.max_parts_to_merge_base = std::min(data_settings->cnch_merge_max_parts_to_merge, data_settings->max_parts_to_merge_at_once);
merge_settings.enable_batch_select = enable_batch_select;
if (aggressive)
merge_settings.min_parts_to_merge_base = 1;
/// make sure rowid could be represented in 4 bytes
if (metadata_snapshot->hasUniqueKey())
{
auto & max_rows = merge_settings.max_total_rows_to_merge;
if (!(0 < max_rows && max_rows <= std::numeric_limits<UInt32>::max()))
max_rows = std::numeric_limits<UInt32>::max();
}
merge_selector = std::make_unique<DanceMergeSelector>(merge_settings);
}
else
/// Disable dance merge selector for local merge tree tables. In CNCH, DanceMergeSelector can only used with ServerDataPart, so we simply disable it
/// for local merge tree tables which will select merge tasks with IMergeTreeDataPart.
// if (merge_selector_str == "dance")
// {
// DanceMergeSelector::Settings merge_settings;
// merge_settings.loadFromConfig(config);
// /// Override value from table settings
// /// For CNCH compatibility, we use cnch_merge_max_parts_to_merge and max_parts_to_merge_at_once.
// merge_settings.max_parts_to_merge_base = std::min(data_settings->cnch_merge_max_parts_to_merge, data_settings->max_parts_to_merge_at_once);
// merge_settings.enable_batch_select = enable_batch_select;
// if (aggressive)
// merge_settings.min_parts_to_merge_base = 1;
// /// make sure rowid could be represented in 4 bytes
// if (metadata_snapshot->hasUniqueKey())
// {
// auto & max_rows = merge_settings.max_total_rows_to_merge;
// if (!(0 < max_rows && max_rows <= std::numeric_limits<UInt32>::max()))
// max_rows = std::numeric_limits<UInt32>::max();
// }
// merge_selector = std::make_unique<DanceMergeSelector>(merge_settings);
// }
// else
{
SimpleMergeSelector::Settings merge_settings;
/// Override value from table settings

View File

@ -432,7 +432,8 @@ enum StealingCacheMode : UInt64
M(UInt64, cnch_merge_max_total_rows_to_merge, 50000000, "", 0) \
M(UInt64, cnch_merge_max_total_bytes_to_merge, 150ULL * 1024 * 1024 * 1024, "", 0) \
M(UInt64, cnch_merge_max_parts_to_merge, 100, "", 0) \
M(Int64, cnch_merge_expected_parts_number, 0, "Expected part numbers per partition, used to control merge selecting frequency and task size. 0 means using worker numbers in vw settings, negative value means disable this feature.", 0) \
M(Int64, cnch_merge_expected_parts_number, -1, "Expected part numbers per partition, used to control merge selecting frequency and task size. 0 means using worker numbers in vw settings, negative value means disable this feature.", 0) \
M(Float, cnch_merge_write_amplification_optimize_threshold, 4.0, "Threshold for optimizing merge write amplification, 0 means don't optimize. If positive, will optimize write amplification when parts number in current partition is less than cnch_merge_write_amplification_optimize_threshold * cnch_merge_expected_parts_number.", 0) \
M(UInt64, cnch_mutate_max_parts_to_mutate, 100, "", 0) \
M(UInt64, cnch_mutate_max_total_bytes_to_mutate, 50UL * 1024 * 1024 * 1024, "", 0) \
\

View File

@ -263,7 +263,7 @@ SimpleMergeSelector::selectMulti(PartsRanges & partitions, size_t max_total_size
// Fall back to default behavior when enable_batch_select is not turned on.
if (!settings.enable_batch_select)
{
return IMergeSelector::selectMulti(partitions, max_total_size_to_merge, merge_scheduler);
return IMergeSelector<IMergeTreeDataPart>::selectMulti(partitions, max_total_size_to_merge, merge_scheduler);
}
PartsRanges res;

View File

@ -105,8 +105,9 @@ namespace DB
{
class MergeScheduler;
class IMergeTreeDataPart;
class SimpleMergeSelector final : public IMergeSelector
class SimpleMergeSelector final : public IMergeSelector<IMergeTreeDataPart>
{
public:
struct Settings

View File

@ -37,12 +37,12 @@ const String & getPartitionIdForPart(const ITTLMergeSelector::Part & part_info)
}
IMergeSelector::PartsRange ITTLMergeSelector::select(
IMergeSelector<IMergeTreeDataPart>::PartsRange ITTLMergeSelector::select(
PartsRanges & parts_ranges,
const size_t max_total_size_to_merge,
[[maybe_unused]] MergeScheduler * merge_scheduler)
{
using Iterator = IMergeSelector::PartsRange::const_iterator;
using Iterator = IMergeSelector<IMergeTreeDataPart>::PartsRange::const_iterator;
Iterator best_begin;
ssize_t partition_to_merge_index = -1;
time_t partition_to_merge_min_ttl = 0;
@ -125,12 +125,12 @@ IMergeSelector::PartsRange ITTLMergeSelector::select(
return PartsRange(best_begin, best_end);
}
time_t TTLDeleteMergeSelector::getTTLForPart(const IMergeSelector::Part & part) const
time_t TTLDeleteMergeSelector::getTTLForPart(const IMergeSelector<IMergeTreeDataPart>::Part & part) const
{
return only_drop_parts ? part.ttl_infos->part_max_ttl : part.ttl_infos->part_min_ttl;
}
bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & part) const
bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector<IMergeTreeDataPart>::Part & part) const
{
/// N.B. Satisfied TTL means that TTL is NOT expired.
/// return true -- this part can not be selected
@ -148,12 +148,12 @@ bool TTLDeleteMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part &
return !part.shall_participate_in_merges;
}
time_t TTLRecompressMergeSelector::getTTLForPart(const IMergeSelector::Part & part) const
time_t TTLRecompressMergeSelector::getTTLForPart(const IMergeSelector<IMergeTreeDataPart>::Part & part) const
{
return part.ttl_infos->getMinimalMaxRecompressionTTL();
}
bool TTLRecompressMergeSelector::isTTLAlreadySatisfied(const IMergeSelector::Part & part) const
bool TTLRecompressMergeSelector::isTTLAlreadySatisfied(const IMergeSelector<IMergeTreeDataPart>::Part & part) const
{
/// N.B. Satisfied TTL means that TTL is NOT expired.
/// return true -- this part can not be selected

View File

@ -32,6 +32,7 @@ namespace DB
{
class MergeScheduler;
class IMergeTreeDataPart;
/** Merge selector, which is used to remove values with expired ttl.
* It selects parts to merge by greedy algorithm:
@ -39,7 +40,7 @@ class MergeScheduler;
* 2. Tries to find the longest range of parts with expired ttl, that includes part from step 1.
* Finally, merge selector updates TTL merge timer for the selected partition
*/
class ITTLMergeSelector : public IMergeSelector
class ITTLMergeSelector : public IMergeSelector<IMergeTreeDataPart>
{
public:
using PartitionIdToTTLs = std::map<String, time_t>;
@ -58,11 +59,11 @@ public:
/// Get TTL value for part, may depend on child type and some settings in
/// constructor.
virtual time_t getTTLForPart(const IMergeSelector::Part & part) const = 0;
virtual time_t getTTLForPart(const IMergeSelector<IMergeTreeDataPart>::Part & part) const = 0;
/// Sometimes we can check that TTL already satisfied using information
/// stored in part and don't assign merge for such part.
virtual bool isTTLAlreadySatisfied(const IMergeSelector::Part & part) const = 0;
virtual bool isTTLAlreadySatisfied(const IMergeSelector<IMergeTreeDataPart>::Part & part) const = 0;
protected:
time_t current_time;
@ -84,11 +85,11 @@ public:
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_)
, only_drop_parts(only_drop_parts_) {}
time_t getTTLForPart(const IMergeSelector::Part & part) const override;
time_t getTTLForPart(const IMergeSelector<IMergeTreeDataPart>::Part & part) const override;
/// Delete TTL should be checked only by TTL time, there are no other ways
/// to satisfy it.
bool isTTLAlreadySatisfied(const IMergeSelector::Part &) const override;
bool isTTLAlreadySatisfied(const IMergeSelector<IMergeTreeDataPart>::Part &) const override;
private:
bool only_drop_parts;
@ -105,12 +106,12 @@ public:
{}
/// Return part min recompression TTL.
time_t getTTLForPart(const IMergeSelector::Part & part) const override;
time_t getTTLForPart(const IMergeSelector<IMergeTreeDataPart>::Part & part) const override;
/// Checks that part's codec is not already equal to required codec
/// according to recompression TTL. It doesn't make sense to assign such
/// merge.
bool isTTLAlreadySatisfied(const IMergeSelector::Part & part) const override;
bool isTTLAlreadySatisfied(const IMergeSelector<IMergeTreeDataPart>::Part & part) const override;
private:
TTLDescriptions recompression_ttls;
};

View File

@ -38,13 +38,23 @@ struct TestAction
UInt64 ts; /// in milliseconds
TestActionType type;
IMergeSelector::PartsRange merge_parts{};
IMergeSelector::Part future_part{};
IMergeSelector<ServerDataPart>::PartsRange merge_parts{};
IMergeSelector<ServerDataPart>::Part future_part{};
/// need to order by ts assending
bool operator < (const TestAction & rhs) const { return this->ts > rhs.ts; }
};
static std::shared_ptr<MergeTreePartInfo> part_info = std::make_shared<MergeTreePartInfo>();
static DataModelPartWrapperPtr global_data_model_part_wrapper = std::make_shared<DataModelPartWrapper>();
class MockDataPart: public ServerDataPart
{
public:
MockDataPart(UInt64 id_): ServerDataPart(global_data_model_part_wrapper), id(id_) {}
UInt64 id{0};
};
constexpr const UInt64 SECOND_TO_MS = 1000UL;
constexpr const UInt64 MINUTE_TO_MS = 60UL * SECOND_TO_MS;
@ -54,6 +64,7 @@ constexpr const UInt64 DAY_TO_MS = 24UL * HOUR_TO_MS;
constexpr const UInt64 SAMPLE_INTERVAL_MS = 1 * MINUTE_TO_MS;
const UUID STORAGE_UUID = UUIDHelpers::generateV4();
const StorageID STORAGE_ID("test_db", "test_table", STORAGE_UUID);
const String PARTITION_ID = "all";
const UInt64 LEVEL_0_PART_ROWS = 10000;
@ -61,9 +72,10 @@ const UInt64 LEVEL_0_PART_SIZE = 100 * 1024 * 1024; // 10MB
struct TestState
{
IMergeSelector::PartsRanges all_partitions{{}};
IMergeSelector::PartsRange future_merging_parts{};
IMergeSelector<ServerDataPart>::PartsRanges all_partitions{{}};
IMergeSelector<ServerDataPart>::PartsRange future_merging_parts{};
std::priority_queue<TestAction> actions_queue{};
std::unordered_map<UInt64, std::shared_ptr<MockDataPart> > all_parts{};
UInt64 milliseconds{0};
UInt64 seconds{0};
@ -85,7 +97,7 @@ struct TestState
void reset()
{
*this = TestState{};
bg_task_stats = std::make_shared<MergeTreeBgTaskStatistics>(STORAGE_UUID);
bg_task_stats = std::make_shared<MergeTreeBgTaskStatistics>(STORAGE_ID);
bg_task_stats->setInitializeState(MergeTreeBgTaskStatistics::InitializeState::InitializeSucceed);
}
@ -101,7 +113,7 @@ void updatePartsAge(UInt64 curr_ms, UInt64 last_ms)
}
}
String getPartitionID(const IMergeSelector::Part &)
String getPartitionID(const IMergeSelector<ServerDataPart>::Part &)
{
return PARTITION_ID;
}
@ -117,32 +129,35 @@ static size_t minValueWithoutZero(size_t a, size_t b)
return a == 0 ? b : (b == 0 ? a : std::min(a, b));
}
IMergeSelector::PartsRange mergeSelect(bool with_adaptive_controller)
IMergeSelector<ServerDataPart>::PartsRanges mergeSelect(bool with_adaptive_controller)
{
DanceMergeSelector::Settings settings;
settings.select_nonadjacent_parts_allowed = true;
settings.min_parts_to_merge_base = 3;
settings.min_parts_to_merge_base = 5;
DanceMergeSelector selector(settings);
selector.debugSetGetPartitionID(getPartitionID);
if (with_adaptive_controller)
{
std::unordered_map<String, std::vector<UInt64> > future_part_rows {{PARTITION_ID, {}}};
std::unordered_map<String, std::pair<UInt64, UInt64> > future_part_rows;
for (const auto & p: current_stat.future_merging_parts)
future_part_rows[PARTITION_ID].emplace_back(p.rows);
{
auto it = future_part_rows.try_emplace(PARTITION_ID, 0UL, 0UL).first;
it->second.first++;
it->second.second += p.rows;
}
auto adaptive_controller = std::make_shared<MergeSelectorAdaptiveController>(
/*is_bucket_table_*/ false,
params.num_workers,
/*wa_optimize_threshold*/ 4,
settings.max_parts_to_merge_base.value);
adaptive_controller->debugSetGetPartitionID(getPartitionID);
adaptive_controller->debugSetNow(current_stat.seconds);
adaptive_controller->setCurrentTime(current_stat.seconds);
adaptive_controller->init(current_stat.bg_task_stats, current_stat.all_partitions, future_part_rows);
/// Some trace logs with adaptive controller
if (params.enable_trace)
{
bool need_control = adaptive_controller->needControlWriteAmplification(PARTITION_ID);
bool need_control = adaptive_controller->needOptimizeWriteAmplification(PARTITION_ID);
const auto & [max_parts_, max_rows_] = adaptive_controller->getMaxPartsAndRows(PARTITION_ID);
size_t max_parts = std::min(max_parts_, settings.max_parts_to_merge_base.value);
size_t max_rows = minValueWithoutZero(max_rows_, settings.max_total_rows_to_merge);
@ -171,19 +186,22 @@ IMergeSelector::PartsRange mergeSelect(bool with_adaptive_controller)
selector.setAdaptiveController(adaptive_controller);
}
IMergeSelector::PartsRange selected_parts = selector.select(current_stat.all_partitions, 0);
IMergeSelector<ServerDataPart>::PartsRanges selected_ranges = selector.selectMulti(current_stat.all_partitions, 0);
/// Some trace logs with merge selector
if (params.enable_trace && with_adaptive_controller && !selected_parts.empty())
if (params.enable_trace && with_adaptive_controller && !selected_ranges.empty())
{
UInt64 total_rows = 0;
for (auto & p: selected_parts)
total_rows += p.rows;
for (const auto & selected_parts: selected_ranges)
{
UInt64 total_rows = 0;
for (auto & p: selected_parts)
total_rows += p.rows;
std::cout << "Selected merge with " << selected_parts.size() << " parts with " << total_rows << " rows.\n";
std::cout << "Selected merge with " << selected_parts.size() << " parts with " << total_rows << " rows.\n";
}
}
return selected_parts;
return selected_ranges;
}
void testImpl(UInt64 start_time, UInt64 final_time, bool have_inserts, bool with_adaptive_controller, UInt64 stop_early = 0)
@ -193,7 +211,7 @@ void testImpl(UInt64 start_time, UInt64 final_time, bool have_inserts, bool with
current_stat.actions_queue.push(TestAction{.ts = start_time, .type = ACTION_TYPE_MERGE_SELECT});
current_stat.actions_queue.push(TestAction{.ts = start_time, .type = ACTION_TYPE_INSERT});
}
UInt64 start_time_s = start_time / SECOND_TO_MS;
UInt64 last_sample_time = 0;
@ -223,13 +241,16 @@ void testImpl(UInt64 start_time, UInt64 final_time, bool have_inserts, bool with
if (!have_inserts)
break;
auto it = current_stat.all_parts.try_emplace(current_stat.block_id).first;
it->second = std::make_shared<MockDataPart>(current_stat.block_id++);
/// TODO(shiyuze): support random rows/size
IMergeSelector::Part part = IMergeSelector::Part{
IMergeSelector<ServerDataPart>::Part part = IMergeSelector<ServerDataPart>::Part{
.size = LEVEL_0_PART_SIZE,
.rows = LEVEL_0_PART_ROWS,
.age = 0,
.level = 0,
.data = reinterpret_cast<const void *>(current_stat.block_id++),
.data = reinterpret_cast<const void *>(it->second.get()),
};
current_stat.inserted++;
@ -244,36 +265,43 @@ void testImpl(UInt64 start_time, UInt64 final_time, bool have_inserts, bool with
bool selected = true;
if (current_stat.concurrent_merges < params.max_concurrent_merges)
{
auto selected_range = mergeSelect(with_adaptive_controller);
selected = !selected_range.empty();
if (!selected_range.empty())
auto selected_ranges = mergeSelect(with_adaptive_controller);
selected = !selected_ranges.empty();
for (const auto & selected_range: selected_ranges)
{
auto merge_completed_action = TestAction{
.ts = current_stat.milliseconds + selected_range.size() * params.part_merge_elapsed_ms,
.type = ACTION_TYPE_MERGE_COMPLETED,
.merge_parts = selected_range,
.future_part = {.size = 0, .rows = 0, .age = 0, .level = 0, .data = reinterpret_cast<const void *>(current_stat.block_id++), },
};
std::unordered_set<const void *> selected_datas;
for (auto & p: selected_range)
if (!selected_range.empty())
{
merge_completed_action.future_part.size += p.size;
merge_completed_action.future_part.rows += p.rows;
merge_completed_action.future_part.level = std::max(merge_completed_action.future_part.level, p.level + 1);
selected_datas.insert(p.data);
auto it = current_stat.all_parts.try_emplace(current_stat.block_id).first;
it->second = std::make_shared<MockDataPart>(current_stat.block_id++);
auto merge_completed_action = TestAction{
.ts = current_stat.milliseconds + selected_range.size() * params.part_merge_elapsed_ms,
.type = ACTION_TYPE_MERGE_COMPLETED,
.merge_parts = selected_range,
.future_part = {.size = 0, .rows = 0, .age = 0, .level = 0, .data = reinterpret_cast<const void *>(it->second.get()), },
};
std::unordered_set<const void *> selected_datas;
for (auto & p: selected_range)
{
merge_completed_action.future_part.size += p.size;
merge_completed_action.future_part.rows += p.rows;
merge_completed_action.future_part.level = std::max(merge_completed_action.future_part.level, p.level + 1);
selected_datas.insert(p.data);
current_stat.all_parts.erase(reinterpret_cast<const MockDataPart *>(p.data)->id);
}
current_stat.all_partitions[0].erase(
std::remove_if(current_stat.all_partitions[0].begin(), current_stat.all_partitions[0].end(),
[&](const IMergeSelector<ServerDataPart>::Part & p) { return selected_datas.count(p.data); }),
current_stat.all_partitions[0].end()
);
current_stat.future_merging_parts.emplace_back(merge_completed_action.future_part);
current_stat.actions_queue.push(std::move(merge_completed_action));
current_stat.concurrent_merges++;
current_stat.concurrent_merging_parts += selected_range.size();
}
current_stat.all_partitions[0].erase(
std::remove_if(current_stat.all_partitions[0].begin(), current_stat.all_partitions[0].end(),
[&](const IMergeSelector::Part & p) { return selected_datas.count(p.data); }),
current_stat.all_partitions[0].end()
);
current_stat.future_merging_parts.emplace_back(merge_completed_action.future_part);
current_stat.actions_queue.push(std::move(merge_completed_action));
current_stat.concurrent_merges++;
current_stat.concurrent_merging_parts += selected_range.size();
}
}
UInt64 select_interval_ms = selected ? params.select_interval_ms : params.select_interval_ms * 10;
@ -288,7 +316,7 @@ void testImpl(UInt64 start_time, UInt64 final_time, bool have_inserts, bool with
UInt64 num_future_parts_before = current_stat.future_merging_parts.size();
current_stat.future_merging_parts.erase(
std::remove_if(current_stat.future_merging_parts.begin(), current_stat.future_merging_parts.end(),
[&](const IMergeSelector::Part & p) { return p.data == part.data; }),
[&](const IMergeSelector<ServerDataPart>::Part & p) { return p.data == part.data; }),
current_stat.future_merging_parts.end()
);
if (current_stat.future_merging_parts.size() != num_future_parts_before - 1)
@ -360,7 +388,8 @@ void testHistoricalPartition(bool with_adaptive_controller)
UInt64 merged_bytes_before = current_stat.merged_bytes;
testImpl(params.real_time_test_ms, params.real_time_test_ms + params.historical_test_ms, false, with_adaptive_controller, 3000);
std::sort(current_stat.all_partitions[0].begin(), current_stat.all_partitions[0].end(), [](const IMergeSelector::Part & lhs, const IMergeSelector::Part & rhs)
std::sort(current_stat.all_partitions[0].begin(), current_stat.all_partitions[0].end(),
[](const IMergeSelector<ServerDataPart>::Part & lhs, const IMergeSelector<ServerDataPart>::Part & rhs)
{
return lhs.level > rhs.level;
});
@ -380,7 +409,7 @@ void testHistoricalPartition(bool with_adaptive_controller)
<< " Num merges: " << current_stat.merged - merged_before
<< " Tree depth: " << current_stat.max_level
<< " Final parts: " << current_stat.all_partitions[0].size() << " ( " << final_parts_ss.str() << " )"
<< " Standard deviation or rows: " << std::sqrt(variance(part_rows))
<< " Standard deviation of rows: " << std::sqrt(variance(part_rows))
<< "\n\n\n";
}
@ -398,6 +427,9 @@ int main(int argc, char ** argv)
return -1;
}
part_info->partition_id = PARTITION_ID;
global_data_model_part_wrapper->info = part_info;
params = TestParams{
.real_time_test_ms = 24 * HOUR_TO_MS,
.historical_test_ms = 3 * DAY_TO_MS,

View File

@ -14,8 +14,8 @@ int main(int, char **)
{
using namespace DB;
IMergeSelector::PartsRanges partitions(1);
IMergeSelector::PartsRange & parts = partitions.back();
IMergeSelector<IMergeTreeDataPart>::PartsRanges partitions(1);
IMergeSelector<IMergeTreeDataPart>::PartsRange & parts = partitions.back();
SimpleMergeSelector::Settings settings;
// settings.base = 2;
@ -37,7 +37,7 @@ int main(int, char **)
readText(size, in);
skipWhitespaceIfAny(in);
IMergeSelector::Part part;
IMergeSelector<IMergeTreeDataPart>::Part part;
part.size = size;
part.age = 0;
part.level = 0;
@ -53,7 +53,7 @@ int main(int, char **)
while (parts.size() > 1)
{
IMergeSelector::PartsRange selected_parts = selector.select(partitions, 0);
IMergeSelector<IMergeTreeDataPart>::PartsRange selected_parts = selector.select(partitions, 0);
if (selected_parts.empty())
{

View File

@ -19,8 +19,8 @@ int main(int, char **)
{
using namespace DB;
IMergeSelector::PartsRanges partitions(1);
IMergeSelector::PartsRange & parts = partitions.back();
IMergeSelector<IMergeTreeDataPart>::PartsRanges partitions(1);
IMergeSelector<IMergeTreeDataPart>::PartsRange & parts = partitions.back();
/* SimpleMergeSelector::Settings settings;
SimpleMergeSelector selector(settings);*/
@ -37,7 +37,7 @@ int main(int, char **)
while (!in.eof())
{
part_names.emplace_back();
IMergeSelector::Part part;
IMergeSelector<IMergeTreeDataPart>::Part part;
in >> part.size >> "\t" >> part.age >> "\t" >> part.level >> "\t" >> part_names.back() >> "\n";
part.data = part_names.back().data();
// part.level = 0;
@ -52,7 +52,7 @@ int main(int, char **)
while (parts.size() > 1)
{
IMergeSelector::PartsRange selected_parts = selector.select(partitions, 100ULL * 1024 * 1024 * 1024);
IMergeSelector<IMergeTreeDataPart>::PartsRange selected_parts = selector.select(partitions, 100ULL * 1024 * 1024 * 1024);
if (selected_parts.empty())
{