Merge branch 'enhance_cross' into 'cnch-dev'

feat(clickhousech@m-5288512525): refactor cross & support table-level for cross

See merge request dp/ClickHouse!24919
This commit is contained in:
任强 2024-09-14 06:15:01 +00:00 committed by fredwang
parent 47df57f28d
commit 8eca43b58d
39 changed files with 781 additions and 83 deletions

View File

@ -0,0 +1,35 @@
version: "3"
services:
hdfs-namenode:
image: hub.byted.org/bytehouse/hdfs:3.3
command: bash /etc/hadoop/conf/init_users_cross.sh
entrypoint: /entrypoint.sh
environment:
- HADOOP_CONF_DIR=/etc/hadoop/conf
ports:
- "127.0.0.1:9881:9870"
- "9999:9000"
volumes:
- ./hdfs:/etc/hadoop/conf:ro
- /var/log/hadoop
- /data1
- /data2
hdfs-datanode:
# depends_on:
# - hdfs-namenode
image: hub.byted.org/bytehouse/hdfs:3.3
command: datanode
environment:
- HADOOP_CONF_DIR=/etc/hadoop/conf
volumes:
- ./hdfs:/etc/hadoop/conf:ro
- /var/log/hadoop
- /data1
- /data2
volumes:
hdfs-namenode:
external: false
hdfs-datanode:
external: false

View File

@ -65,7 +65,7 @@ void sendCopyTasksToWorker(BackupTaskPtr & backup_task, const BackupCopyTasks &
DiskPtr source_disk = context->getDisk(copy_task.source_disk());
DiskPtr destination_disk = context->getDisk(copy_task.destination_disk());
if (source_disk->getType() == DiskType::Type::ByteS3 && destination_disk->getType() == DiskType::Type::ByteS3)
if (source_disk->getInnerType() == DiskType::Type::ByteS3 && destination_disk->getInnerType() == DiskType::Type::ByteS3)
s3_tasks.push_back(std::move(copy_task));
else
hdfs_tasks.push_back(std::move(copy_task));

View File

@ -152,7 +152,7 @@ void BackupEntriesCollector::collectTableEntries(
// Calculate relative path to disk
String relative_path;
switch (remote_disk->getType())
switch (remote_disk->getInnerType())
{
case DiskType::Type::ByteS3: {
UUID part_id = RPCHelpers::createUUID(part_model.part_id());

View File

@ -419,7 +419,7 @@ Strings getTableMetaFileNameFromBackupDisk(const DiskPtr & backup_disk, const St
Strings table_metadata_filenames;
backup_disk->listFiles(backup_dir + "metadata/" + escapeForFileName(database_name) + "/", table_metadata_filenames);
if (backup_disk->getType() == DiskType::Type::ByteS3)
if (backup_disk->getInnerType() == DiskType::Type::ByteS3)
for (String & table_metadata_filename : table_metadata_filenames)
table_metadata_filename = fs::path(table_metadata_filename).filename();

View File

@ -237,7 +237,7 @@ DumpedData CnchDataWriter::dumpCnchParts(
// Assign part id here, since we need to record it into undo buffer before dump part to filesystem
String relative_path = part_name;
if (disk->getType() == DiskType::Type::ByteS3)
if (disk->getInnerType() == DiskType::Type::ByteS3)
{
UUID part_id = CnchDataWriter::newPartID(part->info, txn_id.toUInt64());
part->uuid = part_id;
@ -257,7 +257,7 @@ DumpedData CnchDataWriter::dumpCnchParts(
// Assign part id here, since we need to record it into undo buffer before dump part to filesystem
String relative_path = part_name;
if (disk->getType() == DiskType::Type::ByteS3)
if (disk->getInnerType() == DiskType::Type::ByteS3)
{
UUID part_id = CnchDataWriter::newPartID(staged_part->info, txn_id.toUInt64());
staged_part->uuid = part_id;

View File

@ -1003,6 +1003,30 @@
M(ReadBufferFromRpcStreamFileConnectFailed, "remote rpc file data connect failed count") \
M(ReadBufferFromRpcStreamFileConnectMs, "remote rpc file data connect ms") \
\
M(CloudFSConnect, "The count of connecting CloudFS client") \
M(CloudFSConnectMicroseconds, "The time elapsed for connecting CloudFS client") \
M(CloudFSDisConnect, "The count of disconnecting CloudFS client") \
M(CloudFSDisConnectMicroseconds, "The time elapsed for disconnecting CloudFS client") \
M(ReadBufferFromCloudFSRead, "The count of ReadBufferFromCloudFS read op") \
M(ReadBufferFromCloudFSReadMicroseconds, "The time spent of ReadBufferFromCloudFS read from client") \
M(ReadBufferFromCloudFSFailed, "The failed count of ReadBufferFromCloudFS read op") \
M(ReadBufferFromCloudFSReadBytes, "Bytes size ReadBufferFromCloudFS read from client") \
M(ReadBufferFromCloudFSSessionGetMicroseconds, "The time spent of ReadBufferFromCloudFS client get session") \
M(ReadBufferFromCloudFSOpenFileMicroseconds, "The time spent of ReadBufferFromCloudFS client open file") \
M(ReadBufferFromCloudFSOpenFile, "The count of ReadBufferFromCloudFS client open file") \
M(ReadBufferFromCloudFSCloseFileMicroseconds, "The time spent of ReadBufferFromCloudFS client close file") \
M(ReadBufferFromCloudFSCloseFile, "The count of ReadBufferFromCloudFS client close file") \
\
M(WriteBufferFromCloudFSWrite, "The number of ops the WriteBufferFromCFS writes") \
M(WriteBufferFromCloudFSWriteBytes, "Bytes size WriteBufferFromCFS has written from client") \
M(WriteBufferFromCloudFSWriteMicroseconds, "The time elapsed by WriteBufferFromCFS writing from client") \
M(WriteBufferFromCloudFSSync, "The number of ops the WriteBufferFromCFS syncs") \
M(WriteBufferFromCloudFSSyncMicroseconds, "The time elapsed by WriteBufferFromCFS syncing files") \
M(WriteBufferFromCloudFSFailed, "The number of failed ops by the WriteBufferFromCFS") \
M(WriteBufferFromCloudFSOpenFile, "The number of ops the WriteBufferFromCFS try to open files") \
M(WriteBufferFromCloudFSCloseFile, "The number of ops the WriteBufferFromCFS tye to close files") \
M(WriteBufferFromCloudFSCloseFileMicroseconds, "The time elapsed by WriteBufferFromCFS closing files") \
\
M(IOSchedulerOpenFileMicro, "Time used in open file when using io scheduler") \
M(IOSchedulerScheduleMicro, "Time used in schedule io request") \
M(IOSchedulerSubmittedUserRequests, "Number of submitted user request from user") \

View File

@ -0,0 +1,405 @@
#include <Common/config.h>
#if USE_CLOUDFS
#include <Disks/CloudFS/DiskCloudFS.h>
#include <Disks/DiskFactory.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <IO/CloudFS/ReadBufferFromCFS.h>
#include <IO/CloudFS/WriteBufferFromCFS.h>
#include <IO/ReadBufferFromFileWithNexusFS.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int INCORRECT_DISK_INDEX;
}
class DiskByteCFSDirectoryIterator : public IDiskDirectoryIterator
{
public:
DiskByteCFSDirectoryIterator(CFSFileSystem & cfs_fs_, const String & disk_path, const String & dir_path)
: cfs_fs(cfs_fs_), idx(0)
{
base_path = std::filesystem::path(disk_path) / dir_path;
cfs_fs.list(base_path, file_names, file_sizes);
}
virtual void next() override { ++idx; }
virtual bool isValid() const override { return idx < file_names.size(); }
virtual String path() const override { return base_path / name(); }
virtual String name() const override
{
if (idx >= file_names.size())
{
throw Exception("Trying to get file name while iterator reach eof", ErrorCodes::BAD_ARGUMENTS);
}
return file_names[idx];
}
size_t size() const override { return file_sizes.at(idx); }
private:
CFSFileSystem & cfs_fs;
std::filesystem::path base_path;
size_t idx;
std::vector<String> file_names;
std::vector<size_t> file_sizes;
};
DiskPtr DiskCloudFSReservation::getDisk(size_t i) const
{
if (i != 0)
{
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
}
return disk;
}
inline String DiskCloudFS::absolutePath(const String & relative_path) const
{
return fs::path(disk_path) / relative_path;
}
/// XXX: is it necessary
void DiskCloudFS::assertCFSAvailable() const
{
if (cfs == nullptr || !cfs->isAvailable())
throw Exception("CloudFS disk is not available", ErrorCodes::NETWORK_ERROR);
}
UInt64 DiskCloudFS::getID() const
{
return static_cast<UInt64>(std::hash<String>{}(DiskType::toString(getType())) ^ std::hash<String>{}(getPath()));
}
ReservationPtr DiskCloudFS::reserve(UInt64 bytes)
{
return std::make_unique<DiskCloudFSReservation>(static_pointer_cast<DiskCloudFS>(shared_from_this()), bytes);
}
DiskStats DiskCloudFS::getTotalSpace([[maybe_unused]]bool with_keep_free) const
{
return ufs_disk->getTotalSpace(with_keep_free);
}
DiskStats DiskCloudFS::getAvailableSpace() const
{
return ufs_disk->getAvailableSpace();
}
DiskStats DiskCloudFS::getUnreservedSpace() const
{
return ufs_disk->getUnreservedSpace();
}
bool DiskCloudFS::exists(const String& path) const
{
assertCFSAvailable();
return cfs_fs.exists(absolutePath(path));
}
bool DiskCloudFS::isFile(const String& path) const
{
assertCFSAvailable();
return cfs_fs.isFile(absolutePath(path));
}
bool DiskCloudFS::isDirectory(const String & path) const
{
assertCFSAvailable();
return cfs_fs.isDirectory(absolutePath(path));
}
size_t DiskCloudFS::getFileSize(const String & path) const
{
assertCFSAvailable();
return cfs_fs.getFileSize(absolutePath(path));
}
void DiskCloudFS::createDirectory(const String & path)
{
assertNotReadonly();
assertCFSAvailable();
cfs_fs.createDirectory(absolutePath(path));
}
void DiskCloudFS::createDirectories(const String & path)
{
assertNotReadonly();
assertCFSAvailable();
cfs_fs.createDirectories(absolutePath(path));
}
void DiskCloudFS::clearDirectory(const String & path)
{
assertNotReadonly();
assertCFSAvailable();
std::vector<String> file_names;
std::vector<size_t> file_sizes;
cfs_fs.list(absolutePath(path), file_names, file_sizes);
for (const String & file_name : file_names)
{
cfs_fs.remove(fs::path(disk_path) / path / file_name);
}
}
void DiskCloudFS::moveDirectory(const String & from_path, const String & to_path)
{
assertNotReadonly();
assertCFSAvailable();
cfs_fs.renameTo(absolutePath(from_path), absolutePath(to_path));
}
DiskDirectoryIteratorPtr DiskCloudFS::iterateDirectory(const String & path)
{
assertCFSAvailable();
return std::make_unique<DiskByteCFSDirectoryIterator>(cfs_fs, disk_path, path);
}
void DiskCloudFS::createFile(const String & path)
{
assertNotReadonly();
assertCFSAvailable();
cfs_fs.createFile(absolutePath(path));
}
void DiskCloudFS::moveFile(const String & from_path, const String & to_path)
{
assertNotReadonly();
assertCFSAvailable();
cfs_fs.renameTo(absolutePath(from_path), absolutePath(to_path));
}
void DiskCloudFS::replaceFile(const String & from_path, const String & to_path)
{
assertNotReadonly();
assertCFSAvailable();
String from_abs_path = absolutePath(from_path);
String to_abs_path = absolutePath(to_path);
if (cfs_fs.exists(to_abs_path))
{
String origin_backup_file = to_abs_path + ".old";
cfs_fs.renameTo(to_abs_path, origin_backup_file);
}
cfs_fs.renameTo(from_abs_path, to_abs_path);
}
void DiskCloudFS::listFiles(const String & path, std::vector<String> & file_names)
{
assertCFSAvailable();
std::vector<size_t> file_sizes;
cfs_fs.list(absolutePath(path), file_names, file_sizes);
}
std::unique_ptr<ReadBufferFromFileBase> DiskCloudFS::readFile(const String & path, const ReadSettings& settings) const
{
if (unlikely(settings.remote_fs_read_failed_injection != 0))
{
if (settings.remote_fs_read_failed_injection == -1)
throw Exception("remote_fs_read_failed_injection is enabled and return error immediately", ErrorCodes::LOGICAL_ERROR);
else
{
LOG_TRACE(log, "remote_fs_read_failed_injection is enabled and will sleep {}ms", settings.remote_fs_read_failed_injection);
std::this_thread::sleep_for(std::chrono::milliseconds(settings.remote_fs_read_failed_injection));
}
}
String file_path = absolutePath(path);
std::unique_ptr<ReadBufferFromFileBase> impl;
/// CloudFS switch should be controlled by table level setting (See MergeTreeSettings.h::enable_cloudfs),
/// which would be checked while getStoragePolicy() is called,
/// thus here we don't need to check it again, and we only need to check if CloudFS is available.
/// XXX: if there is any other scenario that we need to check ReadSettings::enable_cloudfs?
if (cfs->isAvailable()/* && settings.enable_cloudfs */)
{
CloudFSPtr cfs_ptr = dynamic_pointer_cast<CloudFS>(cfs);
if (!cfs_ptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "expect CloudFSPtr to create ReadBufferFromCFS");
impl = std::make_unique<ReadBufferFromCFS>(cfs_ptr,
file_path,
settings.remote_fs_prefetch,
settings.remote_fs_buffer_size);
}
else
{
LOG_WARNING(log, "CloudFS is not available, fallback to ufs disk {} instead", ufs_disk->getName());
impl = ufs_disk->readFile(file_path, settings);
}
if (settings.enable_nexus_fs)
{
auto nexus_fs = Context::getGlobalContextInstance()->getNexusFS();
if (nexus_fs)
impl = std::make_unique<ReadBufferFromFileWithNexusFS>(nexus_fs->getSegmentSize(), std::move(impl), *nexus_fs);
}
if (settings.remote_fs_prefetch)
{
auto global_context = Context::getGlobalContextInstance();
auto reader = global_context->getThreadPoolReader();
return std::make_unique<AsynchronousBoundedReadBuffer>(std::move(impl), *reader, settings);
}
return impl;
}
std::unique_ptr<WriteBufferFromFileBase> DiskCloudFS::writeFile(const String & path, const WriteSettings& settings)
{
assertNotReadonly();
if (unlikely(settings.remote_fs_write_failed_injection != 0))
{
if (settings.remote_fs_write_failed_injection == -1)
throw Exception("remote_fs_write_failed_injection is enabled and return error immediately", ErrorCodes::LOGICAL_ERROR);
else
{
LOG_TRACE(log, "remote_fs_write_failed_injection is enabled and will sleep {}ms", settings.remote_fs_write_failed_injection);
std::this_thread::sleep_for(std::chrono::milliseconds(settings.remote_fs_write_failed_injection));
}
}
/// The same as readFile() api, see comment above.
if (cfs->isAvailable()/* && settings.enable_cloudfs */)
{
int write_mode = settings.mode == WriteMode::Append ? (O_APPEND | O_WRONLY) : O_CREAT;
CloudFSPtr cfs_ptr = dynamic_pointer_cast<CloudFS>(cfs);
if (!cfs_ptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "expect CloudFSPtr to create WriteBufferFromCFS");
return std::make_unique<WriteBufferFromCFS>(cfs_ptr, absolutePath(path),
settings.buffer_size, write_mode);
}
else
{
LOG_WARNING(log, "CloudFS is not available, fallback to ufs disk {} instead", ufs_disk->getName());
return ufs_disk->writeFile(absolutePath(path), settings);
}
}
void DiskCloudFS::removeFile(const String & path)
{
assertNotReadonly();
assertCFSAvailable();
cfs_fs.remove(absolutePath(path), false);
}
void DiskCloudFS::removeFileIfExists(const String & path)
{
assertNotReadonly();
assertCFSAvailable();
String abs_path = absolutePath(path);
if (cfs_fs.exists(abs_path))
cfs_fs.remove(abs_path, false);
}
void DiskCloudFS::removeDirectory(const String & path)
{
assertNotReadonly();
assertCFSAvailable();
cfs_fs.remove(absolutePath(path), false);
}
void DiskCloudFS::removeRecursive(const String & path)
{
assertNotReadonly();
assertCFSAvailable();
cfs_fs.remove(absolutePath(path), true);
}
void DiskCloudFS::removePart(const String & path)
{
assertCFSAvailable();
try
{
removeRecursive(path);
}
catch (Poco::FileException &e)
{
/// We don't know if this exception is caused by a non-existent path,
/// so we need to determine it manually
if (!exists(path)) {
/// the part has already been deleted, exit
return;
}
throw e;
}
}
void DiskCloudFS::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
assertCFSAvailable();
cfs_fs.setLastModifiedInSeconds(absolutePath(path), timestamp.epochTime());
}
Poco::Timestamp DiskCloudFS::getLastModified(const String & path)
{
assertCFSAvailable();
auto seconds = cfs_fs.getLastModifiedInSeconds(absolutePath(path));
return Poco::Timestamp(seconds * 1000 * 1000);
}
void DiskCloudFS::setReadOnly(const String & path)
{
assertCFSAvailable();
cfs_fs.setWriteable(absolutePath(path), false);
}
void DiskCloudFS::createHardLink(const String &, const String &)
{
throw Exception("createHardLink is not supported by DiskCloudFS", ErrorCodes::NOT_IMPLEMENTED);
}
bool DiskCloudFS::load(const String & path) const
{
assertCFSAvailable();
cfs_fs.load(absolutePath(path));
return true;
}
void registerDiskCloudFS(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context_,
const DisksMap & disk_map) -> DiskPtr
{
String ufs_disk_name = config.getString(config_prefix + ".ufs_disk");
if (ufs_disk_name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ufs_disk is required for CloudFS config");
auto find = disk_map.find(ufs_disk_name);
if (find == disk_map.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot find ufs_disk {} while initializing DiskCloudFS {}", ufs_disk_name, name);
DiskPtr ufs_disk = find->second;
if (ufs_disk->getType() != DiskType::Type::ByteHDFS && ufs_disk->getType() != DiskType::Type::HDFS)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only ByteHDFS or HDFS is supported for CloudFS now");
// initialize cfs
std::shared_ptr<AbstractCloudFS> cfs = nullptr;
/// bool cfs_enable = config.getBool("cloudfs.enable", false);
std::unique_ptr<CfsConf> cfs_conf = CfsConf::parseFromConfig(
config,
context_,
config_prefix,
context_->getHdfsConnectionParams().formatPath(ufs_disk->getPath()).toString());
cfs = std::make_shared<CloudFS>(std::move(cfs_conf));
return std::make_shared<DiskCloudFS>(name, ufs_disk, cfs);
};
factory.registerDiskType("cfs", creator);
}
} /// end namespace DB
#endif

View File

@ -0,0 +1,128 @@
#pragma once
#include <Common/config.h>
#if USE_CLOUDFS
#include <Disks/IDisk.h>
#include <IO/AbstractCloudFS.h>
#include <IO/CloudFS/CFSFileSystem.h>
namespace DB
{
class DiskCloudFSReservation;
class DiskCloudFS final : public IDisk
{
public:
DiskCloudFS(
const String & disk_name_,
const DiskPtr & ufs_disk_,
const std::shared_ptr<AbstractCloudFS> & cfs_)
: disk_name(disk_name_)
, ufs_disk(ufs_disk_)
, cfs(cfs_)
, cfs_fs(cfs)
{
disk_path = ufs_disk->getPath();
setDiskWritable();
}
const String & getPath() const override { return disk_path; }
const String & getName() const override { return disk_name; }
DiskType::Type getType() const override { return DiskType::Type::CLOUDFS; }
DiskType::Type getInnerType() const override { return ufs_disk->getType(); }
UInt64 getID() const override;
ReservationPtr reserve(UInt64 bytes) override;
DiskStats getTotalSpace(bool with_keep_free = false) const override;
DiskStats getAvailableSpace() const override;
DiskStats getUnreservedSpace() const override;
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings& settings) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
const WriteSettings& settings) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
void removePart(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
bool load(const String & path) const override;
private:
inline String absolutePath(const String& relative_path) const;
void assertCFSAvailable() const;
std::string disk_name;
std::string disk_path;
/// std::string ufs_base_path;
/// HDFSConnectionParams hdfs_params;
DiskPtr ufs_disk;
std::shared_ptr<AbstractCloudFS> cfs;
CFSFileSystem cfs_fs;
LoggerPtr log = getLogger("DiskCloudFS");
};
class DiskCloudFSReservation: public IReservation
{
public:
DiskCloudFSReservation(std::shared_ptr<DiskCloudFS> disk_, UInt64 size_): disk(disk_), size(size_) {}
virtual UInt64 getSize() const override
{
return size;
}
virtual DiskPtr getDisk(size_t i) const override;
virtual Disks getDisks() const override
{
return {disk};
}
virtual void update(UInt64 new_size) override
{
size = new_size;
}
private:
DiskPtr disk;
UInt64 size;
};
}
#endif

View File

@ -270,11 +270,8 @@ std::unique_ptr<ReadBufferFromFileBase> DiskByteS3::readFile(const String & path
auto reader = global_context->getThreadPoolReader();
return std::make_unique<AsynchronousBoundedReadBuffer>(std::move(impl), *reader, modified_settings);
}
else
{
return std::make_unique<ReadBufferFromS3>(s3_util.getClient(),
s3_util.getBucket(), object_key, modified_settings, 3, false);
}
return impl;
}
}
@ -293,20 +290,18 @@ std::unique_ptr<WriteBufferFromFileBase> DiskByteS3::writeFile(const String & pa
}
}
{
return std::make_unique<WriteBufferFromS3>(
s3_util.getClient(),
s3_util.getBucket(),
std::filesystem::path(root_prefix) / path,
max_single_part_upload_size,
min_upload_part_size,
settings.file_meta,
settings.buffer_size,
false,
false,
8,
true);
}
return std::make_unique<WriteBufferFromS3>(
s3_util.getClient(),
s3_util.getBucket(),
std::filesystem::path(root_prefix) / path,
max_single_part_upload_size,
min_upload_part_size,
settings.file_meta,
settings.buffer_size,
false,
false,
8,
true);
}
void DiskByteS3::removeFile(const String& path)
@ -366,13 +361,14 @@ static void checkRemoveAccess(IDisk & disk) { disk.removeFile("test_acl"); }
void registerDiskByteS3(DiskFactory& factory)
{
auto creator = [](const String& name,
const Poco::Util::AbstractConfiguration& config, const String& config_prefix,
ContextPtr) -> DiskPtr {
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
[[maybe_unused]]ContextPtr ctx,
const DisksMap & /* disk_map */) -> DiskPtr {
S3::S3Config s3_cfg(config, config_prefix);
std::shared_ptr<Aws::S3::S3Client> client = s3_cfg.create();
// initialize cfs
auto s3disk = std::make_shared<DiskByteS3>(name, s3_cfg.root_prefix, s3_cfg.bucket, client,
s3_cfg.min_upload_part_size, s3_cfg.max_single_part_upload_size);

View File

@ -42,8 +42,8 @@ public:
friend class DiskByteS3Reservation;
DiskByteS3(const String& name_, const String& root_prefix_, const String& bucket_,
const std::shared_ptr<Aws::S3::S3Client>& client_,
const UInt64 min_upload_part_size_, const UInt64 max_single_part_upload_size_);
const std::shared_ptr<Aws::S3::S3Client>& client_, const UInt64 min_upload_part_size_,
const UInt64 max_single_part_upload_size_);
virtual const String & getName() const override { return name; }

View File

@ -24,7 +24,8 @@ DiskPtr DiskFactory::create(
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context) const
ContextPtr context,
const DisksMap & disks_map) const
{
const auto disk_type = config.getString(config_prefix + ".type", "local");
@ -33,7 +34,7 @@ DiskPtr DiskFactory::create(
throw Exception{"DiskFactory: the disk '" + name + "' has unknown disk type: " + disk_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG};
const auto & disk_creator = found->second;
return disk_creator(name, config, config_prefix, context);
return disk_creator(name, config, config_prefix, context, disks_map);
}
}

View File

@ -14,6 +14,7 @@
namespace DB
{
using DisksMap = std::map<String, DiskPtr>;
/**
* Disk factory. Responsible for creating new disk objects.
*/
@ -24,7 +25,8 @@ public:
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context)>;
ContextPtr context,
const DisksMap & disks_map)>;
static DiskFactory & instance();
@ -34,7 +36,8 @@ public:
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context) const;
ContextPtr context,
const DisksMap & disks_map) const;
private:
using DiskTypeRegistry = std::unordered_map<String, Creator>;

View File

@ -32,7 +32,7 @@ String getDiskNameForPathId(const VolumePtr& volume, UInt32 path_id)
{
return volume->getDefaultDisk()->getName();
}
switch (volume->getDisk()->getType())
switch (volume->getDisk()->getInnerType())
{
case DiskType::Type::ByteHDFS: return "HDFS/" + std::to_string(path_id);
case DiskType::Type::ByteS3: return "S3/" + std::to_string(path_id);

View File

@ -425,7 +425,8 @@ void registerDiskLocal(DiskFactory & factory)
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context) -> DiskPtr {
ContextPtr context,
const DisksMap & /* disk_map */) -> DiskPtr {
String path = config.getString(config_prefix + ".path", "");
if (name == "default")
{

View File

@ -478,7 +478,8 @@ void registerDiskMemory(DiskFactory & factory)
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & /*config*/,
const String & /*config_prefix*/,
ContextPtr /*context*/) -> DiskPtr { return std::make_shared<DiskMemory>(name); };
ContextPtr /*context*/,
const DisksMap & /* disk_map */) -> DiskPtr { return std::make_shared<DiskMemory>(name); };
factory.registerDiskType("memory", creator);
}

View File

@ -59,6 +59,9 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
constexpr auto default_disk_name = "default";
bool has_default_disk = false;
#if USE_CLOUDFS
Strings cloudfs_disks;
#endif
for (const auto & disk_name : keys)
{
if (!std::all_of(disk_name.begin(), disk_name.end(), isWordCharASCII))
@ -68,9 +71,25 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
has_default_disk = true;
auto disk_config_prefix = config_prefix + "." + disk_name;
#if USE_CLOUDFS
if (config.getString(disk_config_prefix + ".type", "") == "cfs")
{
cloudfs_disks.emplace_back(disk_name);
continue;
}
#endif
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context, disks));
}
/// DiskCloudFS depends on its ufs disk, thus it must be initialized after all other disks.
#if USE_CLOUDFS
for (auto & disk_name : cloudfs_disks)
{
auto disk_config_prefix = config_prefix + "." + disk_name;
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context, disks));
}
#endif
if (!has_default_disk)
disks.emplace(default_disk_name, std::make_shared<DiskLocal>(default_disk_name, context->getPath(), DiskStats{}));
@ -118,7 +137,7 @@ DiskSelectorPtr DiskSelector::updateFromConfig(
if (result->getDisksMap().count(disk_name) == 0)
{
auto disk_config_prefix = config_prefix + "." + disk_name;
result->addToDiskMap(disk_name, factory.create(disk_name, config, disk_config_prefix, context));
result->addToDiskMap(disk_name, factory.create(disk_name, config, disk_config_prefix, context, result->getDisksMap()));
}
else
{

View File

@ -38,7 +38,8 @@ struct DiskType
S3 = 2,
HDFS = 3,
ByteHDFS = 4,
ByteS3 = 5
ByteS3 = 5,
CLOUDFS = 6,
};
static String toString(Type disk_type)
{
@ -56,6 +57,8 @@ struct DiskType
return "bytehdfs";
case Type::ByteS3:
return "bytes3";
case Type::CLOUDFS:
return "cloudfs";
}
__builtin_unreachable();
}

View File

@ -31,7 +31,7 @@
#include <IO/WSReadBufferFromFS.h>
#include <Storages/HDFS/ReadBufferFromByteHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include "IO/HDFSRemoteFSReader.h"
#include <fmt/core.h>
namespace DB
{
@ -89,8 +89,14 @@ private:
};
/// TODO: use HDFSCommon replace HDFSFileSystem
DiskByteHDFS::DiskByteHDFS(const String & disk_name_, const String & hdfs_base_path_, const HDFSConnectionParams & hdfs_params_)
: disk_name(disk_name_), disk_path(hdfs_base_path_), hdfs_params(hdfs_params_), hdfs_fs(hdfs_params_, 10000, 100, 0)
DiskByteHDFS::DiskByteHDFS(
const String & disk_name_,
const String & hdfs_base_path_,
const HDFSConnectionParams & hdfs_params_)
: disk_name(disk_name_)
, disk_path(hdfs_base_path_)
, hdfs_params(hdfs_params_)
, hdfs_fs(hdfs_params_, 10000, 100, 0)
{
pread_reader_opts = std::make_shared<HDFSRemoteFSReaderOpts>(hdfs_params, true);
read_reader_opts = std::make_shared<HDFSRemoteFSReaderOpts>(hdfs_params, false);
@ -184,6 +190,7 @@ void DiskByteHDFS::replaceFile(const String & from_path, const String & to_path)
assertNotReadonly();
String from_abs_path = absolutePath(from_path);
String to_abs_path = absolutePath(to_path);
if (hdfs_fs.exists(to_abs_path))
{
String origin_backup_file = to_abs_path + ".old";
@ -276,11 +283,9 @@ std::unique_ptr<WriteBufferFromFileBase> DiskByteHDFS::writeFile(const String &
}
}
{
int write_mode = settings.mode == WriteMode::Append ? (O_APPEND | O_WRONLY) : O_WRONLY;
return std::make_unique<WriteBufferFromHDFS>(absolutePath(path), hdfs_params,
settings.buffer_size, write_mode);
}
int write_mode = settings.mode == WriteMode::Append ? (O_APPEND | O_WRONLY) : O_WRONLY;
return std::make_unique<WriteBufferFromHDFS>(absolutePath(path), hdfs_params,
settings.buffer_size, write_mode);
}
void DiskByteHDFS::removeFile(const String & path)
@ -293,6 +298,7 @@ void DiskByteHDFS::removeFileIfExists(const String & path)
{
assertNotReadonly();
String abs_path = absolutePath(path);
if (hdfs_fs.exists(abs_path))
{
hdfs_fs.remove(abs_path, false);
@ -365,7 +371,8 @@ void registerDiskByteHDFS(DiskFactory & factory)
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context_) -> DiskPtr {
ContextPtr context_,
const DisksMap & /* disk_map */) -> DiskPtr {
String path = config.getString(config_prefix + ".path");
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);

View File

@ -32,8 +32,10 @@ class DiskByteHDFSReservation;
class DiskByteHDFS final : public IDisk
{
public:
DiskByteHDFS(const String& disk_name_, const String& hdfs_base_path_,
const HDFSConnectionParams& hdfs_params_);
DiskByteHDFS(
const String & disk_name_,
const String & hdfs_base_path_,
const HDFSConnectionParams & hdfs_params_);
virtual const String& getName() const override;

View File

@ -191,7 +191,8 @@ void registerDiskHDFS(DiskFactory & factory)
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context_) -> DiskPtr
ContextPtr context_,
const DisksMap & /* disk_map */) -> DiskPtr
{
fs::path disk = fs::path(context_->getPath()) / "disks" / name;
fs::create_directories(disk);

View File

@ -289,6 +289,10 @@ public:
/// Return disk type - "local", "s3", etc.
virtual DiskType::Type getType() const = 0;
/// Return disk type of ufs disk, which matters for CloudFS
/// XXX: maybe need a more generic way to get real disk type
virtual DiskType::Type getInnerType() const { return getType(); }
/// Invoked when Global Context is shutdown.
virtual void shutdown() {}

View File

@ -200,7 +200,8 @@ void registerDiskS3(DiskFactory & factory)
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context) -> DiskPtr {
ContextPtr context,
const DisksMap & /* disk_map */) -> DiskPtr {
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);

View File

@ -474,4 +474,13 @@ StoragePolicyPtr StoragePolicySelector::get(const String & name) const
return it->second;
}
StoragePolicyPtr StoragePolicySelector::tryGet(const String & name) const
{
auto it = policies.find(name);
if (it == policies.end())
return nullptr;
return it->second;
}
}

View File

@ -150,6 +150,7 @@ public:
/// Policy by name
StoragePolicyPtr get(const String & name) const;
StoragePolicyPtr tryGet(const String & name) const;
/// All policies
const StoragePoliciesMap & getPoliciesMap() const { return policies; }

View File

@ -43,6 +43,9 @@ void registerDiskHDFS(DiskFactory & factory);
void registerDiskByteHDFS(DiskFactory & factory);
#endif
#if USE_CLOUDFS
void registerDiskCloudFS(DiskFactory & factory);
#endif
void registerDisks()
{
@ -60,6 +63,10 @@ void registerDisks()
registerDiskHDFS(factory);
registerDiskByteHDFS(factory);
#endif
#if USE_CLOUDFS
registerDiskCloudFS(factory);
#endif
}
}

View File

@ -4208,6 +4208,15 @@ StoragePolicyPtr Context::getStoragePolicy(const String & name) const
return policy_selector->get(name);
}
StoragePolicyPtr Context::tryGetStoragePolicy(const String & name) const
{
std::lock_guard lock(shared->storage_policies_mutex);
auto policy_selector = getStoragePolicySelector(lock);
return policy_selector->tryGet(name);
}
DisksMap Context::getDisksMap() const
{

View File

@ -1399,6 +1399,7 @@ public:
/// Provides storage politics schemes
StoragePolicyPtr getStoragePolicy(const String & name) const;
StoragePolicyPtr tryGetStoragePolicy(const String & name) const;
/// Get the server uptime in seconds.
time_t getUptimeSeconds() const;

View File

@ -305,7 +305,7 @@ bool executeGlobalGC(const Protos::DataModelTable & table, const Context & conte
LOG_DEBUG(log, "Remove data path for table {}", storage_id.getNameForLogs());
StoragePolicyPtr remote_storage_policy = mergetree->getStoragePolicy(IStorage::StorageLocation::MAIN);
DiskType::Type remote_disk_type = remote_storage_policy->getAnyDisk()->getType();
DiskType::Type remote_disk_type = remote_storage_policy->getAnyDisk()->getInnerType();
switch (remote_disk_type)
{
/// delete data directory of the table from hdfs

View File

@ -184,6 +184,18 @@ StoragePolicyPtr MergeTreeMetaBase::getStoragePolicy(StorageLocation location) c
throw Exception("Get auxility storage policy is not supported",
ErrorCodes::LOGICAL_ERROR);
}
/// This logic is only for StorageMergeTree now, thus we only need to check cloudfs
// if (getSettings()->enable_cloudfs || getContext()->getSettingsRef().enable_cloudfs)
// {
// const String & storage_policy_name = getSettings()->storage_policy.value + CLOUDFS_STORAGE_POLICY_SUFFIX;
// auto policy = getContext()->tryGetStoragePolicy(storage_policy_name);
// if (policy)
// return policy;
// else
// LOG_WARNING(log, "Storage Policy {} is not found and will fallback to use ufs storage policy", storage_policy_name);
// }
return getContext()->getStoragePolicy(getSettings()->storage_policy);
}

View File

@ -42,6 +42,7 @@ class MergeTreeMetaBase : public IStorage, public WithMutableContext, public Mer
public:
constexpr static auto FORMAT_VERSION_FILE_NAME = "format_version.txt";
constexpr static auto DETACHED_DIR_NAME = "detached";
constexpr static auto CLOUDFS_STORAGE_POLICY_SUFFIX = "_with_cloudfs";
SourceTaskFilter source_task_filter;

View File

@ -112,7 +112,7 @@ createPartFromModelCommon(const MergeTreeMetaBase & storage, const Protos::DataM
DiskPtr remote_disk = getDiskForPathId(storage.getStoragePolicy(IStorage::StorageLocation::MAIN), path_id);
auto mock_volume = std::make_shared<SingleDiskVolume>("volume_mock", remote_disk, 0);
UUID part_id = UUIDHelpers::Nil;
switch(remote_disk->getType())
switch(remote_disk->getInnerType())
{
case DiskType::Type::ByteS3:
{
@ -129,7 +129,7 @@ createPartFromModelCommon(const MergeTreeMetaBase & storage, const Protos::DataM
}
default:
throw Exception(fmt::format("Unsupported disk type {} in createPartFromModelCommon",
DiskType::toString(remote_disk->getType())), ErrorCodes::LOGICAL_ERROR);
DiskType::toString(remote_disk->getInnerType())), ErrorCodes::LOGICAL_ERROR);
}
auto part = std::make_shared<MergeTreeDataPartCNCH>(storage, part_name, *info,
mock_volume, relative_path, nullptr, part_id);
@ -137,7 +137,7 @@ createPartFromModelCommon(const MergeTreeMetaBase & storage, const Protos::DataM
if (part_model.has_staging_txn_id())
{
part->staging_txn_id = part_model.staging_txn_id();
if (remote_disk->getType() == DiskType::Type::ByteHDFS)
if (remote_disk->getInnerType() == DiskType::Type::ByteHDFS)
{
/// this part shares the same relative path with the corresponding staged part
MergeTreePartInfo staged_part_info = part->info;

View File

@ -103,7 +103,7 @@ DiskPtr getDiskFromURI(const String & sd_url, const ContextPtr & context, const
// configuration->setString(config_prefix + ".skip_access_check",false);
configuration->setBool(config_prefix + ".skip_access_check", true);
configuration->setBool(config_prefix + ".is_virtual_hosted_style", context->getSettingsRef().s3_use_virtual_hosted_style || settings.s3_use_virtual_hosted_style);
return DiskFactory::instance().create("hive_s3_disk", *configuration, config_prefix, context);
return DiskFactory::instance().create("hive_s3_disk", *configuration, config_prefix, context, {});
}
#endif

View File

@ -260,7 +260,7 @@ void CnchAttachProcessor::exec()
for (const auto & part : prepared_parts.second)
partitions_filter.emplace(part->info.partition_id);
DiskType::Type disk_type = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getType();
DiskType::Type disk_type = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getInnerType();
// For attach/replace partition from src_table in copy and S3, just use commitParts treating parts as new insert ones
if(disk_type == DiskType::Type::ByteS3 && (!enable_copy_for_partition_operation || command.from_table.empty()))
commitPartsFromS3(prepared_parts, staged_part_names);
@ -480,7 +480,7 @@ CnchAttachProcessor::PartsFromSources CnchAttachProcessor::collectPartsFromTable
{
LOG_DEBUG(logger, fmt::format("Collect parts from table {} with filter {}", tbl.getLogName(), filter.toString()));
DiskType::Type remote_disk_type = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getType();
DiskType::Type remote_disk_type = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getInnerType();
switch (remote_disk_type)
{
case DiskType::Type::ByteHDFS:
@ -548,7 +548,7 @@ CnchAttachProcessor::collectPartsFromPath(const String & path, const AttachFilte
{
LOG_DEBUG(logger, fmt::format("Collect parts from path {} with filter {}", path, filter.toString()));
DiskType::Type remote_disk_type = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getType();
DiskType::Type remote_disk_type = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getInnerType();
switch (remote_disk_type)
{
case DiskType::Type::ByteHDFS:
@ -1292,7 +1292,7 @@ CnchAttachProcessor::prepareParts(const PartsFromSources & parts_from_sources, A
/// remote_disk is only used for copy
auto target_disk = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk();
DiskType::Type target_disk_type = target_disk->getType();
DiskType::Type target_disk_type = target_disk->getInnerType();
TxnTimestamp txn_id = query_ctx->getCurrentTransaction()->getTransactionID();
switch (target_disk_type)
@ -1572,7 +1572,7 @@ void CnchAttachProcessor::genPartsDeleteMark(PartsWithHistory & parts_to_write)
}
auto txn_id = query_ctx->getCurrentTransactionID();
DiskType::Type disk_type = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getType();
DiskType::Type disk_type = target_tbl.getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getInnerType();
S3ObjectMetadata::PartGeneratorID part_generator_id(S3ObjectMetadata::PartGeneratorID::TRANSACTION, txn_id.toString());
MergeTreeCNCHDataDumper dumper(target_tbl, part_generator_id);
for (auto && temp_part : target_tbl.createDropRangesFromParts(query_ctx, parts_to_drop, query_ctx->getCurrentTransaction()))

View File

@ -185,12 +185,13 @@ MutableMergeTreeDataPartCNCHPtr MergeTreeCNCHDataDumper::dumpTempPart(
VolumeSingleDiskPtr volume = std::make_shared<SingleDiskVolume>("temp_volume", disk);
MutableMergeTreeDataPartCNCHPtr new_part = nullptr;
new_part_info.storage_type = disk->getType();
new_part_info.storage_type = disk->getInnerType();
String part_name = new_part_info.getPartName();
LOG_DEBUG(log, "Disk type : " + DiskType::toString(disk->getType()));
LOG_DEBUG(log, "Disk type : " + DiskType::toString(disk->getType())
+ (disk->getType() == DiskType::Type::CLOUDFS ? " with inner type : " + DiskType::toString(disk->getInnerType()) : ""));
switch (disk->getType())
switch (disk->getInnerType())
{
case DiskType::Type::ByteHDFS: {
String relative_path
@ -324,8 +325,6 @@ MutableMergeTreeDataPartCNCHPtr MergeTreeCNCHDataDumper::dumpTempPart(
write_settings.mode = WriteMode::Create;
write_settings.file_meta.insert(std::pair<String, String>(S3ObjectMetadata::PART_GENERATOR_ID, generator_id.str()));
write_settings.remote_fs_write_failed_injection = data.getContext()->getSettings().remote_fs_write_failed_injection;
// if (data.getSettings()->enable_cloudfs.changed)
// write_settings.enable_cloudfs = data.getSettings()->enable_cloudfs;
auto data_out = disk->writeFile(data_file_rel_path, write_settings);
SCOPE_EXIT({

View File

@ -521,7 +521,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
// Skip disk for byconity remote storage
if (disk->getType() != DiskType::Type::ByteHDFS && disk->getType() != DiskType::Type::ByteS3
if (disk->getInnerType() != DiskType::Type::ByteHDFS && disk->getInnerType() != DiskType::Type::ByteS3
&& defined_disk_names.count(disk_name) == 0 && disk->exists(getRelativeDataPath(IStorage::StorageLocation::MAIN)))
{
for (const auto it = disk->iterateDirectory(getRelativeDataPath(IStorage::StorageLocation::MAIN)); it->isValid(); it->next())
@ -2555,9 +2555,9 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
original_active_part->force_keep_shared_data = false;
if (original_active_part->volume->getDisk()->getType() == DiskType::Type::S3)
if (original_active_part->volume->getDisk()->getInnerType() == DiskType::Type::S3)
{
if (part_copy->volume->getDisk()->getType() == DiskType::Type::S3
if (part_copy->volume->getDisk()->getInnerType() == DiskType::Type::S3
&& original_active_part->getUniqueId() == part_copy->getUniqueId())
{
/// May be when several volumes use the same S3 storage

View File

@ -484,7 +484,7 @@ enum StealingCacheMode : UInt64
/** Obsolete settings. Kept for backward compatibility only. */ \
\
M(Bool, enable_local_disk_cache, true, "Enable local disk cache", 0) \
M(Bool, enable_cloudfs, false, "CROSS feature for table level setting", 0) \
M(Bool, enable_cloudfs, false, "Enable cloudfs; disabled by default", 0) \
M(Bool, enable_nexus_fs, false, "Enable local NexusFS", 0) \
/*keep enable_preload_parts for compitable*/\
M(Bool, enable_preload_parts, false, "Enable preload parts", 0) \

View File

@ -276,10 +276,23 @@ MutationCommands StorageCloudMergeTree::getFirstAlterMutationCommandsForPart(con
StoragePolicyPtr StorageCloudMergeTree::getStoragePolicy(StorageLocation location) const
{
String policy_name = (location == StorageLocation::MAIN ?
getSettings()->storage_policy :
getContext()->getCnchAuxilityPolicyName());
return getContext()->getStoragePolicy(policy_name);
if (location == StorageLocation::MAIN)
{
if (getSettings()->enable_cloudfs)
{
/// This will be used for compatibility with old version of Cross (bundled with DiskByteHDFS)
const String & storage_policy_name = getSettings()->storage_policy.value + CLOUDFS_STORAGE_POLICY_SUFFIX;
auto policy = getContext()->tryGetStoragePolicy(storage_policy_name);
if (policy)
return policy;
else
LOG_WARNING(log, "Storage Policy {} is not found and will fallback to use ufs storage policy", storage_policy_name);
}
return getContext()->getStoragePolicy(getSettings()->storage_policy);
}
else
return getContext()->getStoragePolicy(getContext()->getCnchAuxilityPolicyName());
}
const String& StorageCloudMergeTree::getRelativeDataPath(StorageLocation location) const

View File

@ -1195,7 +1195,7 @@ MinimumDataParts StorageCnchMergeTree::getBackupPartsFromDisk(
Strings backup_part_names;
// HDFS will only return part_name, while S3 will return the fullpath of object.
backup_disk->listFiles(parts_path_in_backup, backup_part_names);
if (backup_disk->getType() == DiskType::Type::ByteS3)
if (backup_disk->getInnerType() == DiskType::Type::ByteS3)
{
for (String & backup_part_name : backup_part_names)
{
@ -1284,7 +1284,7 @@ void StorageCnchMergeTree::restoreDataFromBackup(
String relative_part_path;
// Part's full path relative to disk. For HDFS, need to add table_uuid as directory.
String full_part_path;
switch (table_disk->getType())
switch (table_disk->getInnerType())
{
case DiskType::Type::ByteS3: {
relative_part_path = UUIDHelpers::UUIDToString(new_part_uuid);
@ -2557,7 +2557,7 @@ void StorageCnchMergeTree::checkAlterSettings(const AlterCommands & commands) co
"insertion_label_ttl",
"enable_local_disk_cache",
/// "enable_cloudfs", // table level setting for cloudfs has not been supported well
"enable_cloudfs", // table level setting for cloudfs
"enable_nexus_fs",
"enable_preload_parts",
"enable_parts_sync_preload",
@ -2858,7 +2858,7 @@ void StorageCnchMergeTree::dropPartsImpl(
{
auto metadata_snapshot = getInMemoryMetadataPtr();
DiskType::Type remote_storage_type = getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getType();
DiskType::Type remote_storage_type = getStoragePolicy(IStorage::StorageLocation::MAIN)->getAnyDisk()->getInnerType();
bool enable_copy_for_partition_operation = local_context->getSettingsRef().cnch_enable_copy_for_partition_operation;
if (enable_copy_for_partition_operation)
@ -3203,8 +3203,23 @@ LocalDeleteBitmaps StorageCnchMergeTree::createDeleteBitmapTombstones(const IMut
StoragePolicyPtr StorageCnchMergeTree::getStoragePolicy(StorageLocation location) const
{
String policy_name = (location == StorageLocation::MAIN ? getSettings()->storage_policy : getContext()->getCnchAuxilityPolicyName());
return getContext()->getStoragePolicy(policy_name);
if (location == StorageLocation::MAIN)
{
/// XXX: should we add a setting to control whether to enable fallback logic?
if (getSettings()->enable_cloudfs)
{
const String & storage_policy_name = getSettings()->storage_policy.value + CLOUDFS_STORAGE_POLICY_SUFFIX;
auto policy = getContext()->tryGetStoragePolicy(storage_policy_name);
if (policy)
return policy;
else
LOG_WARNING(log, "Storage Policy {} is not found and will fallback to use ufs storage policy", storage_policy_name);
}
return getContext()->getStoragePolicy(getSettings()->storage_policy);
}
else
return getContext()->getStoragePolicy(getContext()->getCnchAuxilityPolicyName());
}
const String & StorageCnchMergeTree::getRelativeDataPath(StorageLocation location) const

View File

@ -156,7 +156,7 @@ void UndoResource::clean(Catalog::Catalog & , [[maybe_unused]]MergeTreeMetaBase
if (disk->exists(rel_path))
{
if ((type() == UndoResourceType::Part || type() == UndoResourceType::StagedPart)
&& disk->getType() == DiskType::Type::ByteS3)
&& disk->getInnerType() == DiskType::Type::ByteS3)
{
if (auto s3_disk = std::dynamic_pointer_cast<DiskByteS3>(disk); s3_disk != nullptr)
{