ByConity/programs/dumper/Dumper.cpp

883 lines
34 KiB
C++

#include "DumpHelper.h"
#include <optional>
#include <algorithm>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Core/Settings.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/ITableFunction.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeTreeCNCHDataDumper.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/S3ObjectMetadata.h>
#include <Storages/StorageCloudMergeTree.h>
#include <Storages/StorageFactory.h>
#include <Storages/registerStorages.h>
#include <Disks/registerDisks.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/DiskLocal.h>
#include <Poco/Logger.h>
#include <Poco/NullChannel.h>
#include <Poco/String.h>
#include <Poco/Util/Application.h>
#include <Poco/Util/HelpFormatter.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ThreadStatus.h>
#include <Common/escapeForFileName.h>
// #include <Common/getFQDNOrHostName.h>
#include <common/ErrorHandlers.h>
#include <Common/Macros.h>
#include <common/scope_guard.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <iostream>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int SYNTAX_ERROR;
extern const int INVALID_PARTITION_VALUE;
extern const int METADATA_MISMATCH;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int TOO_MANY_PARTS;
extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_UPDATE_COLUMN;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
extern const int BAD_TTL_EXPRESSION;
extern const int NOT_FOUND_EXPECTED_DATA_PART;
extern const int UNKNOWN_TABLE;
extern const int NO_SUCH_DATA_PART;
extern const int BAD_DATA_PART_NAME;
}
class ClickHouseDumper : public Poco::Util::Application
{
public:
using Poco::Util::Application::Application;
using CanDumpCallback = std::function<bool(String partition_id)>;
~ClickHouseDumper() override;
int main(const std::vector<String> & args) override;
void initialize(Poco::Util::Application & self) override;
void defineOptions(Poco::Util::OptionSet & options) override;
void handleHelp(const String &, const String &);
void handlePartition(const String &, const String &);
void handlePartitionlist(const String &, const String &);
void handleSkipPartitionlist(const String &, const String &);
void removeConfiguration(Poco::Util::LayeredConfiguration& cfg,
const String& prefix);
void initPath();
void initHDFS();
StoragePtr createStorageFromCreateQuery(
const String & create_table_query,
const String & database,
const String & table,
Context & context,
bool & is_atomic_database);
void processDatabase(const String & database);
void processTable(const String & database, const String & table,
const String & partition,
const std::vector<String>& partitionlist,
const std::vector<String>& skippartitionlist);
void initDataDiskPath(const String & escaped_database,
const String & escaped_table,
std::vector<String> & data_paths,
bool is_multi_disk);
void startLoadAndDumpPart(StorageCloudMergeTree & cloud,
const ActiveDataPartSet & part_names,
Snapshot & snapshot,
const std::shared_ptr<IDisk> & local_disk,
const std::shared_ptr<IDisk> & remote_disk);
void getUniqueTableActivePartsFromDisk(StorageCloudMergeTree & cloud,
Snapshot & snapshot,
const String & escaped_database,
const String & escaped_table,
std::vector<String> & data_paths,
std::map<String, ActiveDataPartSet> & active_parts_multi_disk,
bool is_multi_disk,
CanDumpCallback can_dump);
void loadAndDumpPart(StorageCloudMergeTree & cloud,
MergeTreeCNCHDataDumper & dumper,
const String & part_name,
const UInt64 version,
const std::shared_ptr<IDisk> & local_disk,
const std::shared_ptr<IDisk> & remote_disk);
private:
SharedContextHolder shared_context;
ContextMutablePtr global_context;
Settings settings;
Int64 current_shard_number {0};
LoggerPtr log{};
UniqueTableDumpHelper unique_table_dump_helper;
};
ClickHouseDumper::~ClickHouseDumper()
{
if (global_context)
global_context->shutdown();
}
void ClickHouseDumper::initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
// Turn off server logging to stderr
// if (!config().has("verbose"))
// {
// Poco::Logger::root().setLevel("none");
// Poco::Logger::root().setChannel(Poco::AutoPtr<Poco::NullChannel>(new Poco::NullChannel()));
// }
}
void ClickHouseDumper::defineOptions(Poco::Util::OptionSet & options)
{
Poco::Util::Application::defineOptions(options);
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("config-file", "C", "load configuration from a given file") //
.required(false)
.argument("<file>")
.binding("config-file"));
options.addOption(Poco::Util::Option("database", "", "database to be dumped") //
.required(true)
.argument("<database>")
.binding("database"));
options.addOption(Poco::Util::Option("table", "", "table to be dumped") //
.required(false)
.argument("<table>")
.binding("table"));
options.addOption(Poco::Util::Option("partition", "", "partition to be dumped") //
.required(false)
.argument("<partition>")
.binding("partition")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handlePartition)));
options.addOption(Poco::Util::Option("partition_list", "", "multi-partitions to be dumped") //
.required(false)
.argument("<partition_list>")
.binding("partition_list")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handlePartitionlist)));
options.addOption(Poco::Util::Option("skip_partition_list", "", "skip-partitions to be dumped") //
.required(false)
.argument("<skip_partition_list>")
.binding("skip_partition_list")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleSkipPartitionlist)));
options.addOption(Poco::Util::Option("hdfs_nnproxy", "", "") //
.required(true)
.argument("<psm>")
.binding("output_hdfs_nnproxy"));
options.addOption(Poco::Util::Option("output", "O", "output path on shared-storage, the final part path is under output/database/table/") //
.required(true)
.argument("<path>")
.binding("output"));
options.addOption(Poco::Util::Option("parallel", "P", "threads for dumping parts") //
.required(false)
.argument("<num_threads>")
.binding("parallel"));
options.addOption(Poco::Util::Option("overwrite", "R", "overwrite existed parts") //
.required(false)
.binding("overwrite"));
options.addOption(Poco::Util::Option("skip_corrupt_parts", "S", "skip corrupt parts") //
.required(false)
.binding("skip_corrupt_parts"));
options.addOption(Poco::Util::Option("skip_unkowning_settings", "", "skip dumper unknown settings") //
.required(false)
.binding("skip_unkowning_settings"));
options.addOption(Poco::Util::Option("multi_disk_path_list", "", "multi disk path list") //
.required(false)
.argument("<multi_disk_path_list>")
.binding("multi_disk_path_list"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message") //
.binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
}
void ClickHouseDumper::handleHelp(const String &, const String &)
{
Poco::Util::HelpFormatter helpFormatter(options());
String command_name = commandName();
if (!endsWith(command_name, "dumper"))
command_name += " dumper";
helpFormatter.setCommand(command_name);
helpFormatter.setHeader("Dump tables from internal ClickHouse to shared-storage");
helpFormatter.setUsage("--config-file <config-file> --database <db> --table <table> --partition <partition> --output <path>");
helpFormatter.format(std::cerr);
stopOptionsProcessing();
}
void ClickHouseDumper::handlePartition(const String &, const String &)
{
String partitionlist = config().getString("partition_list","");
String skip_partition_list = config().getString("skip_partition_list","");
if(!partitionlist.empty() || !skip_partition_list.empty())
{
stopOptionsProcessing();
throw Poco::InvalidArgumentException("argument to dump exist parameter conflict");
}
}
void ClickHouseDumper::handlePartitionlist(const String &, const String &)
{
String partition = config().getString("partition","");
String skip_partition_list = config().getString("skip_partition_list","");
if(!partition.empty() || !skip_partition_list.empty())
{
stopOptionsProcessing();
throw Poco::InvalidArgumentException("argument to dump exist parameter conflict");
}
}
void ClickHouseDumper::handleSkipPartitionlist(const String &, const String &)
{
String partition = config().getString("partition","");
String partition_list = config().getString("partition_list","");
if(!partition.empty() || !partition_list.empty())
{
stopOptionsProcessing();
throw Poco::InvalidArgumentException("argument to dump exist parameter conflict");
}
}
StoragePtr ClickHouseDumper::createStorageFromCreateQuery(
const String & create_table_query, const String & database, const String & table, Context & context, bool & is_atomic_database)
{
ParserCreateQuery p_create_query;
auto ast = parseQuery(p_create_query, create_table_query, global_context->getSettingsRef().max_query_size, global_context->getSettingsRef().max_parser_depth);
auto & create_query = ast->as<ASTCreateQuery &>();
if (String::npos == create_query.storage->engine->name.find("MergeTree"))
return {};
if (String::npos != create_query.storage->engine->name.find("HaUnique"))
unique_table_dump_helper.parseZKReplicaPathFromEngineArgs(create_query.storage, database, table, context);
create_query.database = std::move(database);
create_query.attach = true;
UUID uuid = create_query.uuid;
if (create_query.table == "_" && (uuid != UUIDHelpers::Nil))
{
is_atomic_database = true;
create_query.table = std::move(table);
}
if (uuid == UUIDHelpers::Nil)
create_query.uuid = UUIDHelpers::generateV4();
auto engine = std::make_shared<ASTFunction>();
engine->name = "CloudMergeTree";
engine->arguments = std::make_shared<ASTExpressionList>();
engine->arguments->children.push_back(std::make_shared<ASTIdentifier>(create_query.database));
engine->arguments->children.push_back(std::make_shared<ASTIdentifier>(create_query.table));
create_query.storage->set(create_query.storage->engine, engine);
ColumnsDescription columns = InterpreterCreateQuery::getColumnsDescription(*create_query.columns_list->columns, global_context, create_query.attach);
ConstraintsDescription constraints = InterpreterCreateQuery::getConstraintsDescription(create_query.columns_list->constraints);
ForeignKeysDescription foreign_keys = InterpreterCreateQuery::getForeignKeysDescription(create_query.columns_list->foreign_keys);
UniqueNotEnforcedDescription unique = InterpreterCreateQuery::getUniqueNotEnforcedDescription(create_query.columns_list->unique);
bool skip_unknown_settings = config().has("skip_unkowning_settings");
return StorageFactory::instance().get(
create_query,
"",
Context::createCopy(global_context),
global_context,
columns,
constraints,
foreign_keys,
unique,
false /*has_force_restore_data_flag*/,
nullptr,
skip_unknown_settings);
}
void ClickHouseDumper::removeConfiguration(Poco::Util::LayeredConfiguration& cfg, const String& prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
cfg.keys(prefix, keys);
for (const String& key : keys)
{
removeConfiguration(cfg, prefix + "." + key);
}
cfg.remove(prefix);
}
void ClickHouseDumper::initPath()
{
Poco::Util::LayeredConfiguration& cfg = config();
/// Setup path
String path = cfg.getString("path", ".");
Poco::trimInPlace(path);
if (path.empty())
throw Exception("Empty root path", ErrorCodes::BAD_ARGUMENTS);
if (path.back() != '/')
path += '/';
LOG_DEBUG(log, "ClickHouse default set data path {}", path);
global_context->setPath(path);
/// In case of empty path set paths to helpful directories
String cd = Poco::Path::current();
// context->setTemporaryPath(cd + "tmp");
global_context->setFlagsPath(cd + "flags");
global_context->setUserFilesPath(""); // user's files are everywhere
// Parse default local disk path
String default_local_disk_path = path;
String default_hdfs_disk_path = cfg.getString("output") + "/";
// Reset default storage policy configuration
removeConfiguration(cfg, "storage_configuration");
// Set default storage policy
{
cfg.setString("storage_configuration", "");
cfg.setString("storage_configuration.disks.default.path", "");
cfg.setString("storage_configuration.disks.default_hdfs.path",
default_hdfs_disk_path);
cfg.setString("storage_configuration.disks.default_hdfs.type", "hdfs");
String default_volume_cfg_prefix = "storage_configuration.policies.default.volumes";
cfg.setString(default_volume_cfg_prefix + ".local.default", "default");
cfg.setString(default_volume_cfg_prefix + ".local.disk", "default");
cfg.setString(default_volume_cfg_prefix + ".hdfs.default", "default_hdfs");
cfg.setString(default_volume_cfg_prefix + ".hdfs.disk", "default_hdfs");
}
/// Debug logging
LOG_DEBUG(log, "ClickHouse default data path {}", default_local_disk_path);
}
void ClickHouseDumper::initHDFS()
{
/// Init HDFS3 client config path
String hdfs_config = config().getString("hdfs3_config", "");
if (!hdfs_config.empty())
{
setenv("LIBHDFS3_CONF", hdfs_config.c_str(), 1);
}
/// Options load from command line argument use priority -100 in layeredconfiguration, so construct
/// hdfs params from config directly rather than from config file
HDFSConnectionParams hdfs_params = HDFSConnectionParams::parseFromMisusedNNProxyStr(
config().getString("output_hdfs_nnproxy", "nnproxy"), config().getString("hdfs_user", "clickhouse"));
global_context->setHdfsConnectionParams(hdfs_params);
/// register default hdfs file system
bool has_hdfs_disk = false;
for (const auto & [name, disk] : global_context->getDisksMap())
{
if (disk->getType() == DiskType::Type::ByteHDFS)
{
has_hdfs_disk = true;
}
}
if (has_hdfs_disk)
{
const int hdfs_max_fd_num = config().getInt("hdfs_max_fd_num", 100000);
const int hdfs_skip_fd_num = config().getInt("hdfs_skip_fd_num", 100);
const int hdfs_io_error_num_to_reconnect = config().getInt("hdfs_io_error_num_to_reconnect", 10);
registerDefaultHdfsFileSystem(hdfs_params, hdfs_max_fd_num, hdfs_skip_fd_num, hdfs_io_error_num_to_reconnect);
}
}
void ClickHouseDumper::processDatabase(const String & database)
{
String escaped_database = escapeForFileName(database);
String db_metadata_path = config().getString("path") + "/metadata/" + escaped_database + "/";
for (Poco::DirectoryIterator it(db_metadata_path); it != Poco::DirectoryIterator(); ++it)
{
auto & name = it.name();
if (!endsWith(name, ".sql"))
continue;
try
{
processTable(database, unescapeForFileName(name), "",std::vector<String>(),std::vector<String>());
}
catch (...)
{
tryLogCurrentException(log);
}
}
}
void ClickHouseDumper::initDataDiskPath(
const String & escaped_database,
const String & escaped_table,
std::vector<String> & data_paths,
bool is_multi_disk)
{
if (!is_multi_disk)
{
String data_path = config().getString("path") + "/data/" + escaped_database + "/" + escaped_table;
data_paths.push_back(data_path);
return ;
}
String fuzzy_disk_names = config().getString("multi_disk_path_list");
std::vector<String> fuzzy_disk_lists = parseDescription(fuzzy_disk_names, 0, fuzzy_disk_names.length(), ',' , 100/* hard coded max files */);
std::vector<Strings> disks_list;
for (auto fuzzy_name : fuzzy_disk_lists)
{
disks_list.push_back(parseDescription(fuzzy_name, 0, fuzzy_name.length(), '|', 100));
}
for (const auto & vec_names : disks_list)
{
for (const auto & disk_name : vec_names)
{
String disk_path = disk_name + "data/" + escaped_database + "/" + escaped_table + "/";
data_paths.push_back(std::move(disk_path));
}
}
}
void ClickHouseDumper::getUniqueTableActivePartsFromDisk(
StorageCloudMergeTree & cloud,
Snapshot & snapshot,
const String & escaped_database,
const String & escaped_table,
std::vector<String> & data_paths,
std::map<String, ActiveDataPartSet> & active_parts_multi_disk,
bool is_multi_disk,
CanDumpCallback can_dump)
{
ActiveDataPartSet part_names(cloud.format_version);
for (auto it = snapshot.begin(); it != snapshot.end(); it++)
{
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(it->first, &part_info, cloud.format_version))
throw Exception("Can't parse part name: " + it->first + " in Snapshot. ", ErrorCodes::BAD_DATA_PART_NAME);
if (!can_dump(part_info.partition_id))
continue;
if (is_multi_disk)
{
bool flag = false;
for (const auto & data_path : data_paths)
{
if (Poco::File(data_path + it->first).exists())
{
part_names.add(it->first);
flag = true;
break;
}
}
if (!flag)
throw Exception(
"Part: " + it->first + " in snapshot doesn't exist in dir , please check and retry. ",
ErrorCodes::NO_SUCH_DATA_PART);
}
else
{
DiskPtr local_disk = global_context->getStoragePolicy("default")->getAnyDisk();
String data_relative_path = "data/" + escaped_database + "/" + escaped_table + "/";
LOG_TRACE(log, "Unique Scan local disk path = {} data_relative_path = {}", local_disk->getPath(), data_relative_path);
if (local_disk->exists(data_relative_path + it->first))
part_names.add(it->first);
else
throw Exception(
"Part: " + it->first + " in snapshot doesn't exist in dir: " + data_relative_path + ", please check and retry. ",
ErrorCodes::NO_SUCH_DATA_PART);
}
}
LOG_DEBUG(log, "Found active data parts = {}", part_names.size());
for (const auto & data_path : data_paths)
{
ActiveDataPartSet local_disk_part_names(cloud.format_version);
for (const auto & part_name : part_names.getParts())
{
if (Poco::File(data_path + part_name).exists())
local_disk_part_names.add(part_name);
}
LOG_TRACE(log, "Unique data path = {} local_disk_part_names size = {}", data_path, local_disk_part_names.size());
active_parts_multi_disk.insert(std::make_pair(data_path, std::move(local_disk_part_names)));
}
}
void ClickHouseDumper::processTable(const String & database, const String & table,
const String & partition,
const std::vector<String>& partition_list,
const std::vector<String>& skip_partition_list)
{
String db_table = database + "." + table;
String escaped_database = escapeForFileName(database);
String escaped_table = escapeForFileName(table);
/// Create storage from metadata
String attach_query_path = config().getString("path") + "/metadata/" + escaped_database + "/" + escaped_table + ".sql";
LOG_TRACE(log, " attach_query_path is {}", attach_query_path);
if (!Poco::File(attach_query_path).exists())
throw Exception("Table " + db_table + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
ReadBufferFromFile in(attach_query_path, 1024);
String attach_query_str;
readStringUntilEOF(attach_query_str, in);
bool is_atomic_database = false;
auto storage = createStorageFromCreateQuery(attach_query_str, database, table, *global_context, is_atomic_database);
if (!storage)
{
LOG_ERROR(log, "{} is not MergeTree: {}", db_table, attach_query_str);
return;
}
auto & cloud = dynamic_cast<StorageCloudMergeTree &>(*storage);
Snapshot snapshot;
SCOPE_EXIT({
if (cloud.getInMemoryMetadataPtr()->hasUniqueKey())
unique_table_dump_helper.removeDumpVersionFromZk(*global_context);
});
/// Scan parts
bool is_multi_disk = config().has("multi_disk_path_list");
std::vector<String> data_paths;
if (is_multi_disk && is_atomic_database)
throw Exception("Table " + db_table + " is atomic database : " + attach_query_str, ErrorCodes::UNKNOWN_TABLE);
initDataDiskPath(escaped_database, escaped_table, data_paths, is_multi_disk);
/// Get unique table snapshot
if (cloud.getInMemoryMetadataPtr()->hasUniqueKey())
{
for (const auto & data_path : data_paths)
{
if (!Poco::File(data_path + "/manifest/").exists())
{
LOG_DEBUG(log, "skip not exists manifest {}", data_path);
continue;
}
snapshot = unique_table_dump_helper.getUniqueTableSnapshot(
escaped_database, escaped_table, data_path + "/manifest/", *global_context);
}
}
auto can_dump = [&](String partition_id) ->bool
{
if(!partition.empty() && partition_id != partition)
return false;
if(!partition_list.empty() && std::find(std::begin(partition_list),std::end(partition_list),
partition_id) == std::end(partition_list))
return false;
if(!skip_partition_list.empty() && std::find(std::begin(skip_partition_list),std::end(skip_partition_list),
partition_id) != std::end(skip_partition_list))
return false;
return true;
};
/// look through from local disk, get active data part
std::map<String, ActiveDataPartSet> active_parts_multi_disk;
if (cloud.getInMemoryMetadataPtr()->hasUniqueKey())
{
getUniqueTableActivePartsFromDisk(cloud,
snapshot, escaped_database, escaped_table, data_paths, active_parts_multi_disk, is_multi_disk, can_dump);
}
else
{
ActiveDataPartSet part_names(cloud.format_version);
for (const auto & data_path : data_paths)
{
if (!Poco::File(data_path).exists())
{
LOG_TRACE(log, "skip not exists disk path = {}", data_path);
continue;
}
LOG_TRACE(log, "Scan local disk path = {}", data_path);
for (Poco::DirectoryIterator it(data_path); it != Poco::DirectoryIterator(); ++it)
{
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(it.name(), &part_info, cloud.format_version))
continue;
if(!can_dump(part_info.partition_id))
continue;
part_names.add(it.name());
}
}
for (const auto & data_path : data_paths)
{
ActiveDataPartSet local_disk_part_names(cloud.format_version);
for (const auto & part_name : part_names.getParts())
{
if (Poco::File(data_path + part_name).exists())
local_disk_part_names.add(part_name);
}
active_parts_multi_disk.insert(std::make_pair(data_path, std::move(local_disk_part_names)));
}
}
DiskPtr remote_disk = global_context->getStoragePolicy("cnch_default_hdfs")->getAnyDisk();
if (remote_disk->getType() == DiskType::Type::ByteS3)
throw Exception("Currently dump to " + DiskType::toString(remote_disk->getType()) + " doesn't supported.", ErrorCodes::UNKNOWN_TABLE);
for (auto & [data_path, part_names] : active_parts_multi_disk)
{
LOG_DEBUG(log, "Found in {} active parts = {}", data_path, part_names.size());
String tmp_path = "data/" + database + "/" + table + "/";
String disk_path = data_path.substr(0, data_path.length() - tmp_path.length());
std::shared_ptr<IDisk> local_disk = std::make_shared<DiskLocal>(data_path, disk_path, DB::DiskStats{});
startLoadAndDumpPart(cloud, part_names, snapshot, local_disk, remote_disk);
LOG_DEBUG(log, "Dumped {} parts under disk path {}", part_names.size(), local_disk->getPath());
}
/// remote disk remove uuid dir
String remote_uuid_rel_dir = cloud.getRelativeDataPath(IStorage::StorageLocation::MAIN);
LOG_DEBUG(log, "start remove remote uuid dir {} remote disk path = {}", remote_uuid_rel_dir, remote_disk->getPath());
if(remote_disk->exists(remote_uuid_rel_dir))
remote_disk->removeRecursive(remote_uuid_rel_dir);
}
void ClickHouseDumper::startLoadAndDumpPart(
StorageCloudMergeTree & cloud,
const ActiveDataPartSet & part_names,
Snapshot & snapshot,
const std::shared_ptr<IDisk> & local_disk,
const std::shared_ptr<IDisk> & remote_disk)
{
S3ObjectMetadata::PartGeneratorID part_generator_id(S3ObjectMetadata::PartGeneratorID::DUMPER,
UUIDHelpers::UUIDToString(UUIDHelpers::generateV4()));
size_t num_threads = std::min(part_names.size(), size_t(config().getUInt("parallel", 1)));
MergeTreeCNCHDataDumper dumper(cloud, part_generator_id);
String to_path = cloud.getDatabaseName() + '/' + cloud.getTableName() + '/';
if (cloud.getInMemoryMetadataPtr()->hasUniqueKey())
to_path += DeleteBitmapMeta::delete_files_dir;
if(!remote_disk->exists(to_path))
remote_disk->createDirectories(to_path);
if (num_threads <= 1)
{
for (auto & name : part_names.getParts())
{
loadAndDumpPart(cloud, dumper, name, snapshot[name], local_disk, remote_disk);
}
}
else
{
ThreadPool thread_pool(num_threads);
auto name_vec = part_names.getParts();
for (size_t i = 0; i < name_vec.size(); ++i)
{
thread_pool.scheduleOrThrowOnError([&, i]
{
loadAndDumpPart(cloud, dumper, name_vec[i], snapshot[name_vec[i]], local_disk, remote_disk);
});
}
thread_pool.wait();
}
}
void ClickHouseDumper::loadAndDumpPart(
StorageCloudMergeTree & cloud,
MergeTreeCNCHDataDumper & dumper,
const String & part_name,
const UInt64 version,
const std::shared_ptr<IDisk> & local_disk,
const std::shared_ptr<IDisk> & remote_disk)
{
bool overwrite = config().has("overwrite");
const String to_path = '/' + cloud.getDatabaseName() + '/' + cloud.getTableName() + '/';
const String full_path = remote_disk->getPath() + to_path + part_name;
if(!overwrite && remote_disk->exists(full_path))
{
LOG_WARNING(log, "Part " + part_name + " already exists. Ignore it.");
return;
}
auto volume = std::make_shared<SingleDiskVolume>("volume_single", local_disk, 0);
const String relative_path = local_disk->getPath() + "/data/" + cloud.getDatabaseName() + "/" + cloud.getTableName() + "/" + part_name;
const String from_path = cloud.getRelativeDataPath(IStorage::StorageLocation::MAIN) + '/' + part_name;
auto part_info = MergeTreePartInfo::fromPartName(part_name, cloud.format_version);
MergeTreeMetaBase::MutableDataPartPtr local_part = std::make_shared<MergeTreeDataPartWide>(
cloud,
part_name,
part_info,
volume,
relative_path,
nullptr,
IStorage::StorageLocation::AUXILITY);
bool skip_corrupt_parts = config().has("skip_corrupt_parts");
try
{
LOG_TRACE(log, "Loading {}", part_name);
local_part->loadColumnsChecksumsIndexes(true, true);
local_part->low_priority = true;
if (cloud.getInMemoryMetadataPtr()->hasUniqueKey())
unique_table_dump_helper.generateUniqueIndexFileIfNeed(local_part, cloud, local_disk);
LOG_TRACE(log, "Dumping {}", local_part->name);
auto dumped_part = dumper.dumpTempPart(local_part, remote_disk, true);
if (cloud.getInMemoryMetadataPtr()->hasUniqueKey())
unique_table_dump_helper.loadAndDumpDeleteBitmap(cloud, local_part, version, local_disk);
dumped_part->is_temp = false;
dumped_part->renameTo(local_part->name, true);
if (remote_disk->exists(full_path))
{
LOG_WARNING(log, "remote disk {} already exists output path {}, start to remove it...", remote_disk->getPath(), full_path);
remote_disk->removeRecursive(full_path);
}
/// remote disk from uuid path move to {database}/{table}
remote_disk->moveDirectory(from_path, remote_disk->getPath() + to_path);
}
catch (const DB::Exception & e)
{
if (!skip_corrupt_parts)
throw;
LOG_ERROR(log, "Failed to load or dump the part: {}, exception: {}", part_name, e.what());
}
}
int ClickHouseDumper::main(const std::vector<String> &)
{
ThreadStatus thread_status;
/// Load config files if exists
if (config().has("config-file") || Poco::File("config.xml").exists())
{
const auto config_path = config().getString("config-file", "config.xml");
ConfigProcessor config_processor(config_path, false, true);
// Mark this configuration as writable and in PRIO_APPLICATION so we can modify it's content when initPath()
config().add(config_processor.loadConfig().configuration.duplicate(), PRIO_APPLICATION, true, false);
}
logger().setLevel(config().getString("logger.level", "debug"));
log = getLogger(logger());
unique_table_dump_helper.setLog(log);
shared_context = DB::Context::createShared();
global_context = DB::Context::createGlobal(shared_context.get());
SCOPE_EXIT(global_context->shutdown());
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::LOCAL);
if (config().has("macros"))
{
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
/// try to parse current shard index from macros
const std::map<String, String> macro_map = global_context->getMacros()->getMacroMap();
if (macro_map.count("shard_num"))
current_shard_number = parse<Int64>(macro_map.at("shard_num"));
}
/// We will terminate process on error
static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);
registerFunctions();
registerAggregateFunctions();
registerDisks();
registerStorages();
initPath();
initHDFS();
String database = config().getString("database");
String table = config().getString("table", "");
String partition = config().getString("partition", "");
String partition_list = config().getString("partition_list","");
String skip_partition_list = config().getString("skip_partition_list","");
std::vector<String> partition_vec;
std::vector<String> skip_partition_vec;
if(!partition_list.empty())
boost::split(partition_vec, partition_list, boost::is_any_of(","));
if(!skip_partition_list.empty())
boost::split(skip_partition_vec, skip_partition_list, boost::is_any_of(","));
if (table.empty())
processDatabase(database);
else
processTable(database, table, partition, partition_vec, skip_partition_vec);
return {};
}
}
int mainEntryClickHouseDumper(int argc, char ** argv)
{
try
{
DB::ClickHouseDumper app;
app.init(argc, argv);
return app.run();
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}