mirror of https://github.com/ByConity/ByConity
287 lines
12 KiB
C++
287 lines
12 KiB
C++
#include <algorithm>
|
|
#include <cstdint>
|
|
#include <iostream>
|
|
#include <string>
|
|
#include <IO/S3Common.h>
|
|
#include <IO/ReadHelpers.h>
|
|
#include <IO/ReadBufferFromString.h>
|
|
#include <aws/s3/S3Client.h>
|
|
#include <aws/s3/model/Object.h>
|
|
#include <aws/s3/model/ListObjectsRequest.h>
|
|
#include <aws/s3/model/CreateMultipartUploadRequest.h>
|
|
#include <aws/s3/model/UploadPartCopyRequest.h>
|
|
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
|
|
#include <aws/s3/model/CopyObjectRequest.h>
|
|
#include <aws/s3/model/DeleteObjectRequest.h>
|
|
#include <boost/algorithm/string.hpp>
|
|
#include <boost/asio/thread_pool.hpp>
|
|
#include <boost/asio/post.hpp>
|
|
#include <boost/program_options.hpp>
|
|
#include <Poco/AutoPtr.h>
|
|
#include <Poco/ConsoleChannel.h>
|
|
#include <Poco/FormattingChannel.h>
|
|
#include <Poco/Logger.h>
|
|
#include <Poco/Path.h>
|
|
#include <Poco/PatternFormatter.h>
|
|
|
|
#define MAX_OBJECT_SIZE 5368709120
|
|
|
|
using namespace Aws;
|
|
|
|
bool CopyObject(const String &fromBucket, const String &fromKey, const String &toBucket,
|
|
const String &toKey, const S3::S3Client & client) {
|
|
S3::Model::CopyObjectRequest request;
|
|
|
|
request.WithCopySource(fromBucket + "/" + fromKey)
|
|
.WithKey(toKey)
|
|
.WithBucket(toBucket);
|
|
|
|
S3::Model::CopyObjectOutcome outcome = client.CopyObject(request);
|
|
if (!outcome.IsSuccess()) {
|
|
const S3::S3Error &err = outcome.GetError();
|
|
LOG_WARNING(&Poco::Logger::get("RenameS3Tool"), "Error: CopyObject: {}: {}",
|
|
err.GetExceptionName() , err.GetMessage());
|
|
}
|
|
else {
|
|
LOG_INFO(&Poco::Logger::get("RenameS3Tool"), "Successfully copy {} in bucket {} to {} in bucket {}",
|
|
fromKey, fromBucket, toKey, toBucket);
|
|
}
|
|
|
|
return outcome.IsSuccess();
|
|
}
|
|
|
|
bool UploadPartCopy(const String &fromBucket, const String &fromKey, const String &toBucket,
|
|
const String &toKey, const int64_t & objectSize, const S3::S3Client & client) {
|
|
// Initiate the multipart upload.
|
|
S3::Model::CreateMultipartUploadRequest init_request;
|
|
init_request.WithBucket(toBucket).WithKey(toKey);
|
|
S3::Model::CreateMultipartUploadOutcome init_outcome = client.CreateMultipartUpload(init_request);
|
|
|
|
if (!init_outcome.IsSuccess()) {
|
|
const S3::S3Error &err = init_outcome.GetError();
|
|
std::cout << "Error: CopyObject: " << err.GetExceptionName() << err.GetMessage() << std::endl;
|
|
LOG_WARNING(&Poco::Logger::get("RenameS3Tool"), "Error: CopyObject: {}: {}",
|
|
err.GetExceptionName() , err.GetMessage());
|
|
return false;
|
|
}
|
|
|
|
// Copy the objec tusing 1GB parts.
|
|
int64_t part_size = 1024 * 1024 * 1024;
|
|
int64_t byte_position = 0;
|
|
int16_t part_num = 1;
|
|
S3::Model::CompletedMultipartUpload completed_multipart;
|
|
while (byte_position < objectSize) {
|
|
// The last part might be smaller than partSize, so check to make sure
|
|
// that lastByte isn't beyond the end of the object.
|
|
int64_t last_byte = std::min(byte_position + part_size - 1, objectSize - 1);
|
|
String source_range = "bytes=" + std::to_string(byte_position) + "-" + std::to_string(last_byte);
|
|
|
|
// Copy this part.
|
|
S3::Model::UploadPartCopyRequest copy_request;
|
|
copy_request.WithCopySource(fromBucket + "/" + fromKey)
|
|
.WithBucket(toBucket)
|
|
.WithKey(toKey)
|
|
.WithUploadId(init_outcome.GetResult().GetUploadId())
|
|
.WithCopySourceRange(source_range)
|
|
.WithPartNumber(part_num);
|
|
S3::Model::UploadPartCopyOutcome copy_outcome = client.UploadPartCopy(copy_request);
|
|
if (!copy_outcome.IsSuccess()) {
|
|
const S3::S3Error &err = init_outcome.GetError();
|
|
std::cout << "Error: CopyObject: " << err.GetExceptionName() << err.GetMessage() << std::endl;
|
|
LOG_WARNING(&Poco::Logger::get("RenameS3Tool"), "Error: CopyObject: {}: {}",
|
|
err.GetExceptionName() , err.GetMessage());
|
|
return false;
|
|
}
|
|
S3::Model::CompletedPart completed_part;
|
|
completed_part.WithPartNumber(part_num).WithETag(copy_outcome.GetResult().GetCopyPartResult().GetETag());
|
|
completed_multipart.AddParts(completed_part);
|
|
byte_position += part_size;
|
|
part_num++;
|
|
}
|
|
|
|
// Complete the upload request to concatenate all uploaded parts and make the copied object available.
|
|
S3::Model::CompleteMultipartUploadRequest complete_request;
|
|
complete_request.WithBucket(toBucket)
|
|
.WithKey(toKey)
|
|
.WithUploadId(init_outcome.GetResult().GetUploadId())
|
|
.WithMultipartUpload(completed_multipart);
|
|
auto complete_outcome = client.CompleteMultipartUpload(complete_request);
|
|
|
|
if (!complete_outcome.IsSuccess()) {
|
|
const S3::S3Error &err = complete_outcome.GetError();
|
|
std::cout << "Error: CopyObject: " << err.GetExceptionName() << err.GetMessage() << std::endl;
|
|
LOG_WARNING(&Poco::Logger::get("RenameS3Tool"), "Error: CopyObject: {}: {}",
|
|
err.GetExceptionName() , err.GetMessage());
|
|
}
|
|
else {
|
|
LOG_INFO(&Poco::Logger::get("RenameS3Tool"), "Successfully copy {} in bucket {} to {} in bucket {}",
|
|
fromKey, fromBucket, toKey, toBucket);
|
|
}
|
|
|
|
return complete_outcome.IsSuccess();
|
|
}
|
|
|
|
bool DeleteObject(const String &objectKey, const String &fromBucket, const S3::S3Client & client) {
|
|
S3::Model::DeleteObjectRequest request;
|
|
request.WithKey(objectKey)
|
|
.WithBucket(fromBucket);
|
|
|
|
S3::Model::DeleteObjectOutcome outcome = client.DeleteObject(request);
|
|
|
|
if (!outcome.IsSuccess()) {
|
|
const S3::S3Error &err = outcome.GetError();
|
|
LOG_WARNING(&Poco::Logger::get("RenameS3Tool"), "Error: DeleteObject: {}: {}",
|
|
err.GetExceptionName() , err.GetMessage());
|
|
}
|
|
else {
|
|
LOG_INFO(&Poco::Logger::get("RenameS3Tool"), "Successfully deleted the object {}.", objectKey);
|
|
}
|
|
|
|
return outcome.IsSuccess();
|
|
}
|
|
|
|
bool ListAndRenameObjects(const String &fromBucket, const String &rootPrefix, const String& toBucket,
|
|
const bool needDelete, const bool checkUUid, const S3::S3Client &client, const int threadNum) {
|
|
S3::Model::ListObjectsRequest request;
|
|
request.WithBucket(fromBucket);
|
|
if (!rootPrefix.empty()) {
|
|
request.WithPrefix(rootPrefix);
|
|
}
|
|
|
|
boost::asio::thread_pool thread_pool(threadNum);
|
|
bool is_done = false;
|
|
while (!is_done) {
|
|
auto outcome = client.ListObjects(request);
|
|
if (!outcome.IsSuccess()) {
|
|
const S3::S3Error &err = outcome.GetError();
|
|
LOG_WARNING(&Poco::Logger::get("RenameS3Tool"), "Error: ListObjects: {}: {}",
|
|
err.GetExceptionName() , err.GetMessage());
|
|
return false;
|
|
}
|
|
else {
|
|
Vector<S3::Model::Object> objects = outcome.GetResult().GetContents();
|
|
for (S3::Model::Object &object: objects) {
|
|
// root_prefix/table_uuid/part_uuid/name
|
|
String from_key = object.GetKey();
|
|
Vector<String> split_parts;
|
|
boost::split(split_parts, from_key, boost::is_any_of("/"));
|
|
|
|
size_t index = 0;
|
|
while (index < split_parts.size()) {
|
|
if (rootPrefix.find(split_parts[index]) != std::string::npos) {
|
|
index++;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (split_parts.size() - index == 3) {
|
|
if (checkUUid) {
|
|
DB::UUID uuid;
|
|
DB::ReadBufferFromString table_buffer(split_parts[index]);
|
|
DB::ReadBufferFromString part_buffer(split_parts[index + 1]);
|
|
if (!DB::tryReadUUIDText(uuid, table_buffer) || !DB::tryReadUUIDText(uuid, part_buffer)) {
|
|
continue;
|
|
}
|
|
}
|
|
String to_key = rootPrefix + "/" + split_parts[index + 1] + "/" + split_parts[index + 2];
|
|
int64_t object_size = object.GetSize();
|
|
// submit copy task
|
|
boost::asio::post(thread_pool, [=, &client]() {
|
|
bool copy_result = false;
|
|
if (object_size >= MAX_OBJECT_SIZE) {
|
|
copy_result = UploadPartCopy(fromBucket, from_key, toBucket, to_key, object_size, client);
|
|
} else {
|
|
copy_result = CopyObject(fromBucket, from_key, toBucket, to_key, client);
|
|
}
|
|
if (copy_result && needDelete) {
|
|
DeleteObject(from_key, fromBucket, client);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
is_done = !outcome.GetResult().GetIsTruncated();
|
|
if (!is_done) {
|
|
request.SetMarker(objects.back().GetKey());
|
|
}
|
|
}
|
|
}
|
|
thread_pool.join();
|
|
return true;
|
|
}
|
|
|
|
namespace po = boost::program_options;
|
|
/**
|
|
* @brief This tool is used to delete table_uuid from the key of s3 objects. Only patterns like
|
|
* "root_prefix/table_uuid/part_uuid/key" will be renamed, other objects will keep like before.
|
|
*/
|
|
int mainEntryClickhouseS3RenameTool(int argc, char ** argv)
|
|
{
|
|
po::options_description desc("S3 rename tools is used to delete table_uuid from the key of s3 objects. Parameters");
|
|
|
|
desc.add_options()
|
|
("help", "produce help message")
|
|
("s3_ak_id", po::value<String>(), "access key id of S3")
|
|
("s3_ak_secret", po::value<String>(), "secret access key id of S3")
|
|
("s3_region", po::value<String>(), "region of S3")
|
|
("s3_endpoint", po::value<String>(), "endpoint of S3")
|
|
("from_bucket", po::value<String>(), "bucket name in which files need to be renamed")
|
|
("root_prefix", po::value<String>()->default_value(""), "files that need to be renamed start with. If not specified, all files in bucket will be renamed")
|
|
("to_bucket", po::value<String>(), "bucket name that files need to be moved to, default from_bucket")
|
|
("thread_number", po::value<int>()->default_value(1), "using how many threads, default 1")
|
|
("need_delete", po::value<bool>()->default_value(true), "whether delete origin file, default true")
|
|
("uuid_check", po::value<bool>()->default_value(true), "whether check uuid is valid or not, default true")
|
|
("enable_logging", po::value<bool>()->default_value(true),"Enable logging output")
|
|
("logging_level", po::value<String>()->default_value("information"), "logging level")
|
|
;
|
|
|
|
po::variables_map vm;
|
|
po::parsed_options parsed_opts = po::command_line_parser(argc, argv)
|
|
.options(desc)
|
|
.run();
|
|
|
|
po::store(parsed_opts, vm);
|
|
po::notify(vm);
|
|
|
|
if (vm.count("help")) {
|
|
std::cout << desc << "\n";
|
|
return 1;
|
|
}
|
|
|
|
if (!vm.count("s3_ak_id") || !vm.count("s3_ak_secret") || !vm.count("s3_endpoint")
|
|
|| !vm.count("from_bucket") || !vm.count("s3_region")) {
|
|
std::cerr << "Missing required field\n" << desc << std::endl;
|
|
return 1;
|
|
}
|
|
|
|
if (vm.count("enable_logging"))
|
|
{
|
|
Poco::AutoPtr<Poco::PatternFormatter> formatter(new Poco::PatternFormatter("%Y.%m.%d %H:%M:%S.%F <%p> %s: %t"));
|
|
Poco::AutoPtr<Poco::ConsoleChannel> console_chanel(new Poco::ConsoleChannel);
|
|
Poco::AutoPtr<Poco::FormattingChannel> channel(new Poco::FormattingChannel(formatter, console_chanel));
|
|
Poco::Logger::root().setLevel(vm["logging_level"].as<String>());
|
|
Poco::Logger::root().setChannel(channel);
|
|
}
|
|
|
|
DB::S3::S3Config s3_config(vm["s3_endpoint"].as<String>(), vm["s3_region"].as<String>(),
|
|
vm["from_bucket"].as<String>(), vm["s3_ak_id"].as<String>(),
|
|
vm["s3_ak_secret"].as<String>(), vm["root_prefix"].as<String>());
|
|
|
|
String to_bucket = vm["from_bucket"].as<String>();
|
|
if (vm.count("to_bucket")) {
|
|
to_bucket = vm["to_bucket"].as<String>();
|
|
}
|
|
|
|
ListAndRenameObjects(
|
|
vm["from_bucket"].as<String>(),
|
|
vm["root_prefix"].as<String>(),
|
|
to_bucket,
|
|
vm["need_delete"].as<bool>(),
|
|
vm["uuid_check"].as<bool>(),
|
|
*s3_config.create(),
|
|
vm["thread_number"].as<int>());
|
|
|
|
return 1;
|
|
}
|