mirror of https://github.com/ByConity/ByConity
646 lines
27 KiB
C++
646 lines
27 KiB
C++
#include "DumpHelper.h"
|
|
|
|
#include <Core/Settings.h>
|
|
#include <IO/ReadHelpers.h>
|
|
#include <Interpreters/Context.h>
|
|
#include <Parsers/ASTCreateQuery.h>
|
|
#include <Parsers/ASTExpressionList.h>
|
|
#include <Parsers/ASTFunction.h>
|
|
#include <Parsers/ASTIdentifier.h>
|
|
// #include <Storages/MergeTree/MergeTreeDataPart.h>
|
|
#include <Storages/StorageCloudMergeTree.h>
|
|
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
|
|
#include <DataStreams/ExpressionBlockInputStream.h>
|
|
#include <DataStreams/MaterializingBlockInputStream.h>
|
|
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
|
|
#include <Poco/Logger.h>
|
|
#include <Poco/NullChannel.h>
|
|
#include <Poco/String.h>
|
|
#include <Common/Config/ConfigProcessor.h>
|
|
#include <Common/Exception.h>
|
|
#include <Common/StringUtils/StringUtils.h>
|
|
#include <Common/Macros.h>
|
|
#include <Common/ZooKeeper/ZooKeeper.h>
|
|
#include <rocksdb/db.h>
|
|
#include <Common/Coding.h>
|
|
#include <Storages/IndexFile/IndexFileWriter.h>
|
|
#include <Storages/IndexFile/FilterPolicy.h>
|
|
#include <Interpreters/sortBlock.h>
|
|
#include <IO/Operators.h>
|
|
// #include <IO/WriteBufferFromHDFS.h>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int BAD_ARGUMENTS;
|
|
extern const int NO_REPLICA_NAME_GIVEN;
|
|
extern const int DIRECTORY_DOESNT_EXIST;
|
|
extern const int MANIFEST_OPERATION_ERROR;
|
|
extern const int UNKNOWN_FORMAT_VERSION;
|
|
extern const int NO_ZOOKEEPER;
|
|
}
|
|
|
|
static String deleteBitmapFileRelativePath(const String & part_name)
|
|
{
|
|
std::stringstream ss;
|
|
ss << DeleteBitmapMeta::delete_files_dir << part_name << ".bitmap";
|
|
return ss.str();
|
|
}
|
|
|
|
static String deleteMetaFileRelativePath(const String & part_name)
|
|
{
|
|
std::stringstream ss;
|
|
ss << DeleteBitmapMeta::delete_files_dir << part_name << ".meta";
|
|
return ss.str();
|
|
}
|
|
|
|
static String getDeleteFilesPathForDump(StorageCloudMergeTree & cloud, const DB::String & part_name, bool is_bitmap = true)
|
|
{
|
|
DiskPtr disk = cloud.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk();
|
|
std::stringstream ss;
|
|
ss << cloud.getRelativeDataPath(IStorage::StorageLocation::MAIN) << "/" << DeleteBitmapMeta::delete_files_dir << part_name;
|
|
|
|
if (is_bitmap)
|
|
ss << ".bitmap";
|
|
else
|
|
ss << ".meta";
|
|
return ss.str();
|
|
}
|
|
|
|
void UniqueTableDumpHelper::writeTempUniqueKeyIndex(Block & block, size_t first_rid, rocksdb::DB & temp_index, StorageCloudMergeTree & cloud)
|
|
{
|
|
ColumnsWithTypeAndName key_columns;
|
|
for (auto & col_name : cloud.getInMemoryMetadataPtr()->getUniqueKeyColumns())
|
|
key_columns.emplace_back(block.getByName(col_name));
|
|
|
|
rocksdb::WriteOptions opts;
|
|
opts.disableWAL = true;
|
|
|
|
ColumnPtr version_column;
|
|
if (!unique_version_column.empty())
|
|
version_column = block.getByName(unique_version_column).column;
|
|
|
|
SerializationsMap serializations;
|
|
for (auto & col : key_columns)
|
|
{
|
|
if (!serializations.count(col.name))
|
|
serializations.emplace(col.name, col.type->getDefaultSerialization());
|
|
}
|
|
|
|
size_t rows = block.rows();
|
|
for (size_t i = 0; i < rows; ++i)
|
|
{
|
|
WriteBufferFromOwnString key_buf;
|
|
for (auto & col : key_columns)
|
|
serializations[col.name]->serializeMemComparable(*col.column, i, key_buf);
|
|
|
|
String value;
|
|
auto rid = static_cast<UInt32>(first_rid + i);
|
|
PutVarint32(&value, rid);
|
|
if (version_column)
|
|
PutFixed64(&value, version_column->getUInt(i));
|
|
auto status = temp_index.Put(opts, key_buf.str(), value);
|
|
if (!status.ok())
|
|
throw Exception("Failed to add unique key : " + status.ToString(), ErrorCodes::LOGICAL_ERROR);
|
|
}
|
|
}
|
|
|
|
void UniqueTableDumpHelper::generateUniqueIndexFileIfNeed(MergeTreeMetaBase::MutableDataPartPtr & part, StorageCloudMergeTree & cloud, const std::shared_ptr<IDisk> & local_disk)
|
|
{
|
|
/// if table is not unique or disk based unique key index, do nothing, otherwise generate unique key index file
|
|
if (!cloud.getInMemoryMetadataPtr()->hasUniqueKey() || part->getChecksums()->files.count(UKI_FILE_NAME))
|
|
return;
|
|
|
|
LOG_DEBUG(log, "Part {} do not have {} and start to generate.", part->name, UKI_FILE_NAME);
|
|
|
|
String part_relative_path;
|
|
String part_path;
|
|
if (local_disk)
|
|
{
|
|
part_relative_path = "data/" + cloud.getDatabaseName() + '/' + cloud.getTableName() + "/" + part->name + "/" ;
|
|
part_path = local_disk->getPath() + "data/" + cloud.getDatabaseName() + '/' + cloud.getTableName() + '/' + part->name + "/" ;
|
|
}
|
|
else
|
|
throw Exception("Failed to get local disk, because local disk is null ptr", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
LOG_DEBUG(log, "Part part_relative_path = {}, part_path = {}", part_relative_path, part_path);
|
|
|
|
String uki_path = part_path + UKI_FILE_NAME;
|
|
|
|
/// 1. read unique key index required column from part
|
|
Stopwatch timer;
|
|
ExpressionActionsPtr unique_key_expr = cloud.getInMemoryMetadataPtr()->getUniqueKeyExpression();
|
|
Names read_columns = unique_key_expr->getRequiredColumns();
|
|
if (!unique_version_column.empty())
|
|
read_columns.emplace_back(unique_version_column);
|
|
|
|
std::unique_ptr<MergeTreeSequentialSource> source = std::make_unique<MergeTreeSequentialSource>(
|
|
cloud,
|
|
cloud.getStorageSnapshot(cloud.getInMemoryMetadataPtr(), nullptr),
|
|
part,
|
|
read_columns,
|
|
/*direct_io=*/false,
|
|
/*take_column_types_from_storage=*/true,
|
|
/*quite=*/false);
|
|
QueryPipeline pipeline;
|
|
pipeline.init(Pipe(std::move(source)));
|
|
pipeline.setMaxThreads(1);
|
|
BlockInputStreamPtr pipeline_input_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
|
|
|
|
auto input = std::make_shared<MaterializingBlockInputStream>(
|
|
std::make_shared<ExpressionBlockInputStream>(
|
|
pipeline_input_stream,
|
|
unique_key_expr));
|
|
|
|
input->readPrefix();
|
|
rocksdb::DB * temp_unique_key_index = nullptr;
|
|
Block unique_key_block;
|
|
size_t rows_count = 0;
|
|
while (Block block = input->read())
|
|
{
|
|
size_t rows = block.rows();
|
|
if (temp_unique_key_index != nullptr)
|
|
{
|
|
writeTempUniqueKeyIndex(block, /*first_rid=*/rows_count, *temp_unique_key_index, cloud);
|
|
rows_count += rows;
|
|
continue;
|
|
}
|
|
if (unique_key_block.rows() == 0)
|
|
unique_key_block = std::move(block);
|
|
else
|
|
{
|
|
rocksdb::Options opts;
|
|
opts.create_if_missing = true;
|
|
opts.error_if_exists = true;
|
|
opts.write_buffer_size = 16 << 20; /// 16MB
|
|
String temp_unique_key_index_abs_path = part_path + "TEMP_unique_key_index";
|
|
String temp_unique_key_index_relative_path = part_relative_path + "TEMP_unique_key_index";
|
|
/// avoid files left over from the last execution failure
|
|
if (local_disk->exists(temp_unique_key_index_relative_path))
|
|
{
|
|
LOG_WARNING(log, "local disk {} already unique key index path {}, start to remove it...", local_disk->getPath(), temp_unique_key_index_relative_path);
|
|
local_disk->removeRecursive(temp_unique_key_index_relative_path);
|
|
}
|
|
|
|
auto status = rocksdb::DB::Open(opts, temp_unique_key_index_abs_path, &temp_unique_key_index);
|
|
if (!status.ok())
|
|
throw Exception("Can't create temp unique key index at " + temp_unique_key_index_abs_path, ErrorCodes::LOGICAL_ERROR);
|
|
|
|
writeTempUniqueKeyIndex(unique_key_block, /*first_rid=*/0, *temp_unique_key_index, cloud);
|
|
rows_count += unique_key_block.rows();
|
|
unique_key_block.clear();
|
|
writeTempUniqueKeyIndex(block, /*first_rid=*/rows_count, *temp_unique_key_index, cloud);
|
|
rows_count += rows;
|
|
}
|
|
}
|
|
input->readSuffix();
|
|
LOG_DEBUG(log, "Read unique key data cost {} ms. ", timer.elapsedMilliseconds());
|
|
|
|
/// 2. write unique key index file
|
|
timer.restart();
|
|
IndexFile::Options options;
|
|
options.filter_policy.reset(IndexFile::NewBloomFilterPolicy(10));
|
|
IndexFile::IndexFileWriter index_writer(options);
|
|
size_t keys_count = 0;
|
|
auto status = index_writer.Open(uki_path);
|
|
if (!status.ok())
|
|
throw Exception("Error while opening file " + uki_path + ": " + status.ToString(),
|
|
ErrorCodes::CANNOT_OPEN_FILE);
|
|
if (temp_unique_key_index)
|
|
{
|
|
std::unique_ptr<rocksdb::Iterator> iter(temp_unique_key_index->NewIterator(rocksdb::ReadOptions()));
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next())
|
|
{
|
|
auto key = iter->key();
|
|
auto val = iter->value();
|
|
status = index_writer.Add(Slice(key.data(), key.size()), Slice(val.data(), val.size()));
|
|
if (!status.ok())
|
|
throw Exception("Error while adding key to " + uki_path + ": " + status.ToString(),
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
keys_count++;
|
|
}
|
|
if (!iter->status().ok())
|
|
throw Exception("Error while scanning temp key index file " + uki_path + ": " + iter->status().ToString(),
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
iter.reset();
|
|
try
|
|
{
|
|
auto res = temp_unique_key_index->Close();
|
|
if (!res.ok())
|
|
LOG_WARNING(log, "Failed to close temp_unique_key_index {}", status.ToString());
|
|
delete temp_unique_key_index;
|
|
temp_unique_key_index = nullptr;
|
|
}
|
|
catch (...)
|
|
{
|
|
LOG_WARNING(log, getCurrentExceptionMessage(false));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
assert(unique_key_block.rows() > 0);
|
|
rows_count = unique_key_block.rows();
|
|
SortDescription sort_description;
|
|
sort_description.reserve(cloud.getInMemoryMetadataPtr()->getUniqueKeyColumns().size());
|
|
for (auto & name : cloud.getInMemoryMetadataPtr()->getUniqueKeyColumns())
|
|
sort_description.emplace_back(unique_key_block.getPositionByName(name), 1, 1);
|
|
|
|
IColumn::Permutation * unique_key_perm_ptr = nullptr;
|
|
IColumn::Permutation unique_key_perm;
|
|
if (!isAlreadySorted(unique_key_block, sort_description))
|
|
{
|
|
stableGetPermutation(unique_key_block, sort_description, unique_key_perm);
|
|
unique_key_perm_ptr = &unique_key_perm;
|
|
}
|
|
|
|
ColumnsWithTypeAndName key_columns;
|
|
for (auto & col_name : cloud.getInMemoryMetadataPtr()->getUniqueKeyColumns())
|
|
key_columns.emplace_back(unique_key_block.getByName(col_name));
|
|
|
|
ColumnPtr version_column;
|
|
if (!unique_version_column.empty())
|
|
version_column = unique_key_block.getByName(unique_version_column).column;
|
|
|
|
SerializationsMap serializations;
|
|
for (auto & col : key_columns)
|
|
{
|
|
if (!serializations.count(col.name))
|
|
serializations.emplace(col.name, col.type->getDefaultSerialization());
|
|
}
|
|
|
|
for (UInt32 rid = 0, size = unique_key_block.rows(); rid < size; ++rid)
|
|
{
|
|
size_t idx = unique_key_perm_ptr ? unique_key_perm[rid] : rid;
|
|
|
|
WriteBufferFromOwnString key_buf;
|
|
for (auto & col : key_columns)
|
|
serializations[col.name]->serializeMemComparable(*col.column, rid, key_buf);
|
|
|
|
String value;
|
|
PutVarint32(&value, static_cast<UInt32>(idx));
|
|
if (version_column)
|
|
PutFixed64(&value, version_column->getUInt(idx));
|
|
|
|
status = index_writer.Add(key_buf.str(), value);
|
|
if (!status.ok())
|
|
throw Exception("Error while adding key to " + uki_path + ": " + status.ToString(),
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
}
|
|
|
|
keys_count = unique_key_block.rows();
|
|
unique_key_block.clear();
|
|
}
|
|
|
|
if (rows_count != keys_count)
|
|
throw Exception("rows count " + toString(rows_count) + " doesn't match unique keys count " + toString(keys_count), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
IndexFile::IndexFileInfo file_info;
|
|
status = index_writer.Finish(&file_info);
|
|
if (!status.ok())
|
|
throw Exception("Error while finishing total file " + uki_path + ": " + status.ToString(), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
LOG_DEBUG(log, "Write unique key index file cost {} ms", timer.elapsedMilliseconds());
|
|
|
|
/// 3. rewrite checksum
|
|
if (file_info.file_size > 0)
|
|
{
|
|
part->getChecksums()->files[UKI_FILE_NAME].file_size = file_info.file_size;
|
|
part->getChecksums()->files[UKI_FILE_NAME].file_hash = file_info.file_hash;
|
|
part->min_unique_key = file_info.smallest_key;
|
|
part->max_unique_key = file_info.largest_key;
|
|
}
|
|
else
|
|
throw Exception("Write unique key index file size = 0, it's a bug! ", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
{
|
|
/// Write file with prepared_checksums
|
|
String name = "checksums.txt";
|
|
WriteBufferFromFile out_temp(part_path + name + ".tmp", 4096);
|
|
part->getChecksums()->writeLocal(out_temp);
|
|
Poco::File file_tmp{part_path + name};
|
|
if (file_tmp.exists())
|
|
file_tmp.renameTo(part_path + name + ".tmp2");
|
|
|
|
Poco::File{part_path + name + ".tmp"}.renameTo(part_path + name);
|
|
|
|
Poco::File file_remove{part_path + name + ".tmp2"};
|
|
if (file_remove.exists())
|
|
file_remove.remove();
|
|
}
|
|
}
|
|
|
|
void UniqueTableDumpHelper::loadAndDumpDeleteBitmap(StorageCloudMergeTree & cloud, MergeTreeCloudData::DataPartPtr part, const UInt64 & version, const std::shared_ptr<IDisk> & local_disk)
|
|
{
|
|
if (!cloud.getInMemoryMetadataPtr()->hasUniqueKey())
|
|
return;
|
|
|
|
/// Load delete file
|
|
String part_path;
|
|
if (local_disk)
|
|
part_path = local_disk->getPath() + "data/" + cloud.getDatabaseName() + '/' + cloud.getTableName() + '/' + part->name + "/" ;
|
|
else
|
|
part_path = cloud.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getPath() + part->name + "/" ;
|
|
String delete_file_path = part_path + "delete." + std::to_string(version);
|
|
ReadBufferFromFile in(delete_file_path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(delete_file_path).getSize()));
|
|
UInt8 format_version;
|
|
readIntBinary(format_version, in);
|
|
if (format_version != 1)
|
|
throw Exception("Unknown delete file version: " + toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
|
|
size_t buf_size;
|
|
readIntBinary(buf_size, in);
|
|
PODArray<char> buf(buf_size);
|
|
in.read(buf.data(), buf_size);
|
|
Roaring bitmap = Roaring::read(buf.data());
|
|
|
|
/// Dump delete meta file and delete file if necessary
|
|
DiskPtr disk = cloud.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk();
|
|
bool need_bitmap = false;
|
|
{
|
|
const String files_rel_path = cloud.getRelativeDataPath(IStorage::StorageLocation::MAIN) + "/" + DeleteBitmapMeta::delete_files_dir;
|
|
if (!disk->exists(files_rel_path))
|
|
disk->createDirectories(files_rel_path);
|
|
|
|
String meta_rel_path = getDeleteFilesPathForDump(cloud, part->name, false);
|
|
std::unique_ptr<WriteBufferFromFileBase> meta_writer = disk->writeFile(meta_rel_path);
|
|
writeIntBinary(DeleteBitmapMeta::delete_file_meta_format_version, *meta_writer);
|
|
writeIntBinary(bitmap.cardinality(), *meta_writer);
|
|
if (bitmap.cardinality() <= DeleteBitmapMeta::kInlineBitmapMaxCardinality)
|
|
{
|
|
// Write inline value
|
|
String value;
|
|
value.reserve(bitmap.cardinality() * sizeof(UInt32));
|
|
for (auto it = bitmap.begin(); it != bitmap.end(); ++it)
|
|
PutFixed32(&value, *it);
|
|
writeStringBinary(value, *meta_writer);
|
|
}
|
|
else
|
|
{
|
|
need_bitmap = true;
|
|
bitmap.runOptimize();
|
|
size_t size = bitmap.getSizeInBytes();
|
|
PODArray<char> write_buf(size);
|
|
size = bitmap.write(write_buf.data());
|
|
writeIntBinary(size, *meta_writer);
|
|
{
|
|
String bitmap_rel_path = getDeleteFilesPathForDump(cloud, part->name, true);
|
|
std::unique_ptr<WriteBufferFromFileBase> bitmap_writer = disk->writeFile(bitmap_rel_path);
|
|
bitmap_writer->write(write_buf.data(), size);
|
|
/// It's necessary to do next() and sync() here, otherwise it will omit the error in WriteBufferFromHDFS::WriteBufferFromHDFSImpl::~WriteBufferFromHDFSImpl() which case file incomplete.
|
|
bitmap_writer->next();
|
|
bitmap_writer->sync();
|
|
}
|
|
}
|
|
meta_writer->next();
|
|
meta_writer->sync();
|
|
}
|
|
|
|
auto move_file_to_target = [disk](const String & from_path, const String & to_path)
|
|
{
|
|
disk->removeFileIfExists(to_path);
|
|
disk->moveFile(from_path, to_path);
|
|
};
|
|
|
|
const String meta_to_path = cloud.getDatabaseName() + '/' + cloud.getTableName() + '/' + deleteMetaFileRelativePath(part->name) ;
|
|
const String meta_from_path = getDeleteFilesPathForDump(cloud, part->name, false);
|
|
move_file_to_target(meta_from_path, meta_to_path);
|
|
if (need_bitmap)
|
|
{
|
|
const String bitmap_to_path = cloud.getDatabaseName() + '/' + cloud.getTableName() + '/' + deleteBitmapFileRelativePath(part->name) ;
|
|
const String bitmap_from_path = getDeleteFilesPathForDump(cloud, part->name, true);
|
|
move_file_to_target(bitmap_from_path, bitmap_to_path);
|
|
}
|
|
}
|
|
|
|
void UniqueTableDumpHelper::parseZKReplicaPathFromEngineArgs(
|
|
const ASTStorage * storage, const String & /*database*/, const String & /*table*/, const Context & context)
|
|
{
|
|
const auto & engine_args = storage->engine->arguments->getChildren();
|
|
String zookeeper_path = "";
|
|
String replica_name = "";
|
|
const auto * ast_zk_path = engine_args[0]->as<ASTLiteral>();
|
|
if (ast_zk_path && ast_zk_path->value.getType() == Field::Types::String)
|
|
zookeeper_path = safeGet<String>(ast_zk_path->value);
|
|
else
|
|
throw Exception(
|
|
"Parse zookeeper replica path failed for unique: Path in ZooKeeper must be a string literal", ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
const auto * ast_replica_name = engine_args[1]->as<ASTLiteral>();
|
|
if (ast_replica_name && ast_replica_name->value.getType() == Field::Types::String)
|
|
replica_name = safeGet<String>(ast_replica_name->value);
|
|
else
|
|
throw Exception("Parse zookeeper replica path failed for unique: Replica name must be a string literal", ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
if (replica_name.empty())
|
|
throw Exception("Parse zookeeper replica path failed for unique: No replica name in config", ErrorCodes::NO_REPLICA_NAME_GIVEN);
|
|
|
|
if (engine_args.size() > 2)
|
|
{
|
|
if (!engine_args.back()->as<ASTIdentifier>() && !engine_args.back()->as<ASTFunction>())
|
|
throw Exception("Version column must be identifier or function expression", ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
String partition_key = storage->partition_by ? storage->partition_by->getColumnName() : "";
|
|
if (partition_key == engine_args.back()->getColumnName())
|
|
{
|
|
/// When partition as version, we skip set version column in order to avoiding write version to unique key index
|
|
unique_version_column = "";
|
|
}
|
|
else if (!tryGetIdentifierNameInto(engine_args.back(), unique_version_column))
|
|
throw Exception("Version column name must be an unquoted string", ErrorCodes::BAD_ARGUMENTS);
|
|
}
|
|
|
|
/// Expand by marcos
|
|
LOG_DEBUG(log, "Parsed replica zookeeper path before expand: {}, replica_name = {}", zookeeper_path, replica_name);
|
|
|
|
zookeeper_path = context.getMacros()->expand(zookeeper_path);
|
|
replica_name = context.getMacros()->expand(replica_name);
|
|
|
|
LOG_DEBUG(log, "Parsed replica zookeeper path after expand: {}, replica_name = {}", zookeeper_path, replica_name);
|
|
|
|
replica_path = zookeeper_path + "/replicas/" + replica_name;
|
|
auto zookeeper = context.getZooKeeper();
|
|
if (zookeeper->expired())
|
|
throw Exception("ZooKeeper session has expired.", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
zookeeper->createIfNotExists(replica_path + "/dump_lsn", "");
|
|
LOG_DEBUG(log, "Parsed replica zookeeper path: {}", replica_path);
|
|
}
|
|
|
|
Snapshot UniqueTableDumpHelper::getUniqueTableSnapshot(
|
|
const DB::String & database, const DB::String & table, const DB::String & dir, const Context & context)
|
|
{
|
|
/// 1. open manifest in read-only mode
|
|
rocksdb::Options options;
|
|
String db_dir = dir + "db";
|
|
rocksdb::DB * db = nullptr;
|
|
auto status = rocksdb::DB::OpenForReadOnly(options, db_dir, &db);
|
|
if (!status.ok())
|
|
throw Exception(" Can't open manifest DB at " + db_dir + " for unique table " + database + "." + table + ": " + status.ToString(), ErrorCodes::DIRECTORY_DOESNT_EXIST);
|
|
|
|
/// 2. set dump_lsn on zookeeper
|
|
auto keys = {
|
|
rocksdb::Slice(ManifestStore::commit_version_key),
|
|
rocksdb::Slice(ManifestStore::checkpoint_version_key)
|
|
};
|
|
Strings values(2);
|
|
auto result = db->MultiGet(rocksdb::ReadOptions(), keys, &values);
|
|
for (auto & st : result)
|
|
{
|
|
if (!st.ok())
|
|
throw Exception("Can't read version from manifest DB for dump unique table " + database + "." + table + ": " + status.ToString(), ErrorCodes::MANIFEST_OPERATION_ERROR);
|
|
}
|
|
|
|
UInt64 commit_version = parse<UInt64>(values[0]);
|
|
UInt64 checkpoint_version = parse<UInt64>(values[1]);
|
|
if (commit_version < checkpoint_version)
|
|
throw Exception("Can't get snapshot at " + toString(commit_version) + " because it's older than last checkpoint " + toString(checkpoint_version), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
saveDumpVersionToZk(commit_version, context);
|
|
|
|
/// 3. get commit_version snapshot
|
|
std::ostringstream ss;
|
|
ss << dir << "/" << "checkpoint." << std::setfill('0') << std::setw(10) << checkpoint_version;
|
|
Snapshot snapshot = manifest_store.readCheckpointFile(checkpoint_version, ss.str());
|
|
String start_key = manifest_store.generateLogKey(checkpoint_version + 1);
|
|
String end_key = manifest_store.generateLogKey(commit_version + 1);
|
|
std::cout << " start key = " << start_key << " end key = " << end_key << std::endl;
|
|
rocksdb::ReadOptions opt;
|
|
rocksdb::Slice upper_bound(end_key);
|
|
opt.iterate_upper_bound = &upper_bound;
|
|
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(opt));
|
|
for (iter->Seek(start_key); iter->Valid(); iter->Next())
|
|
{
|
|
ReadBufferFromMemory in(iter->value().data(), iter->value().size());
|
|
ManifestLogEntry value;
|
|
manifest_store.readManifest(value, in);
|
|
if (value.type == ManifestLogEntry::MODIFY_SCHEMA)
|
|
continue;
|
|
if (value.type == ManifestLogEntry::SYNC_PARTS)
|
|
{
|
|
for (auto & part : value.added_parts)
|
|
snapshot[part] = value.version;
|
|
for (auto & part : value.updated_parts)
|
|
snapshot[part] = value.version;
|
|
for (auto & part : value.removed_parts)
|
|
snapshot.erase(part);
|
|
}
|
|
if (value.type == ManifestLogEntry::DETACH_PARTS)
|
|
{
|
|
for (auto & part : value.removed_parts)
|
|
snapshot.erase(part);
|
|
}
|
|
}
|
|
if (!iter->status().ok())
|
|
throw Exception("Failed to read manifest in [" + start_key + "," + end_key + ") : " + iter->status().ToString(), ErrorCodes::MANIFEST_OPERATION_ERROR);
|
|
|
|
return snapshot;
|
|
}
|
|
|
|
void UniqueTableDumpHelper::saveDumpVersionToZk(const UInt64 & dump_version, const Context & context)
|
|
{
|
|
auto zookeeper = context.getZooKeeper();
|
|
if (zookeeper->expired())
|
|
throw Exception("ZooKeeper session has expired.", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
String dump_lsn_path_prefix = replica_path + "/dump_lsn/dump_lsn-";
|
|
dump_lsn_path = zookeeper->create(dump_lsn_path_prefix, toString(dump_version), zkutil::CreateMode::EphemeralSequential);
|
|
LOG_DEBUG(log, "created dump lsn path: {}, value: {}", dump_lsn_path, dump_version);
|
|
}
|
|
|
|
void UniqueTableDumpHelper::removeDumpVersionFromZk(const Context & context)
|
|
{
|
|
auto zookeeper = context.getZooKeeper();
|
|
if (zookeeper->expired())
|
|
throw Exception("ZooKeeper session has expired.", ErrorCodes::NO_ZOOKEEPER);
|
|
|
|
if (zookeeper->exists(dump_lsn_path))
|
|
{
|
|
zookeeper->remove(dump_lsn_path);
|
|
LOG_DEBUG(log, "Dump lsn path: {} has been removed.", dump_lsn_path);
|
|
}
|
|
/// Try to remove base dump lsn path
|
|
zookeeper->tryRemove(replica_path + "/dump_lsn");
|
|
}
|
|
|
|
void ManifestStore::readLabeledNames(ReadBuffer & in, const char * label, Strings & names)
|
|
{
|
|
size_t num;
|
|
in >> label >> num >> "\n";
|
|
names.resize(num);
|
|
for (size_t i = 0; i < num; ++i)
|
|
in >> names[i] >> "\n";
|
|
}
|
|
|
|
void ManifestStore::readManifest(DB::ManifestLogEntry & log_entry, DB::ReadBuffer & in) const
|
|
{
|
|
UInt8 format;
|
|
in >> "format:" >> format >> "\n";
|
|
if (format != log_entry.format_version)
|
|
throw Exception("unknown manifest format version : " + toString(format), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
LocalDateTime create_time_dt;
|
|
in >> "version:" >> log_entry.version >> "\n";
|
|
in >> "prev_version:" >> log_entry.prev_version >> "\n";
|
|
in >> "create_time:" >> create_time_dt >> "\n";
|
|
log_entry.create_time = create_time_dt;
|
|
in >> "source_replica:" >> log_entry.source_replica >> "\n";
|
|
|
|
String type_str;
|
|
in >> type_str >> "\n";
|
|
if (type_str == "sync_parts")
|
|
{
|
|
log_entry.type = ManifestLogEntry::Type::SYNC_PARTS;
|
|
readLabeledNames(in, "added:", log_entry.added_parts);
|
|
readLabeledNames(in, "updated:", log_entry.updated_parts);
|
|
readLabeledNames(in, "removed:", log_entry.removed_parts);
|
|
}
|
|
else if (type_str == "detach_parts")
|
|
{
|
|
log_entry.type = ManifestLogEntry::Type::DETACH_PARTS;
|
|
readLabeledNames(in, "removed:", log_entry.removed_parts);
|
|
}
|
|
else if (type_str == "reset_manifest")
|
|
{
|
|
log_entry.type = ManifestLogEntry::Type::RESET_MANIFEST;
|
|
in >> "target_version:" >> log_entry.target_version >> "\n";
|
|
}
|
|
else if (type_str == "checkpoint_manifest")
|
|
{
|
|
log_entry.type = ManifestLogEntry::Type::CHECKPOINT_MANIFEST;
|
|
in >> "target_version:" >> log_entry.target_version >> "\n";
|
|
}
|
|
else if (type_str == "modify_schema")
|
|
{
|
|
log_entry.type = ManifestLogEntry::Type::MODIFY_SCHEMA;
|
|
}
|
|
else
|
|
{
|
|
throw Exception("Unknown manifest log_entry type: " + type_str, ErrorCodes::LOGICAL_ERROR);
|
|
}
|
|
}
|
|
|
|
Snapshot ManifestStore::readCheckpointFile(DB::UInt64 version, const String & path)
|
|
{
|
|
Snapshot result;
|
|
if (version == 0)
|
|
return result;
|
|
|
|
ReadBufferFromFile buf(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
|
|
while (!buf.eof())
|
|
{
|
|
String partName;
|
|
UInt64 delete_version;
|
|
readString(partName, buf);
|
|
assertChar('\t', buf);
|
|
readIntText(delete_version, buf);
|
|
assertChar('\n', buf);
|
|
result[partName] = delete_version;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
}
|