mirror of https://github.com/ByConity/ByConity
Merge branch 'zema/cnch-dev-fixRenameSystemLog' into 'cnch-dev'
fix(clickhousech@m-12240767): rename system table when config values change See merge request dp/ClickHouse!25804
This commit is contained in:
parent
28c6b5d7e5
commit
387e338f9e
|
@ -121,7 +121,6 @@ std::shared_ptr<TSystemLog> createSystemLog(
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config)
|
SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config)
|
||||||
{
|
{
|
||||||
bool on_server = global_context->getServerType() == ServerType::cnch_server;
|
bool on_server = global_context->getServerType() == ServerType::cnch_server;
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
#include <common/types.h>
|
#include <common/types.h>
|
||||||
#include <Core/Defines.h>
|
#include <Core/Defines.h>
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
|
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
#include <Parsers/ASTCreateQuery.h>
|
#include <Parsers/ASTCreateQuery.h>
|
||||||
#include <Parsers/parseQuery.h>
|
#include <Parsers/parseQuery.h>
|
||||||
|
@ -42,6 +43,7 @@
|
||||||
#include <Parsers/formatAST.h>
|
#include <Parsers/formatAST.h>
|
||||||
#include <Parsers/ASTIndexDeclaration.h>
|
#include <Parsers/ASTIndexDeclaration.h>
|
||||||
#include <Parsers/ASTInsertQuery.h>
|
#include <Parsers/ASTInsertQuery.h>
|
||||||
|
#include <Parsers/ASTFunction.h>
|
||||||
#include <Interpreters/InterpreterCreateQuery.h>
|
#include <Interpreters/InterpreterCreateQuery.h>
|
||||||
#include <Interpreters/InterpreterRenameQuery.h>
|
#include <Interpreters/InterpreterRenameQuery.h>
|
||||||
#include <Interpreters/InterpreterInsertQuery.h>
|
#include <Interpreters/InterpreterInsertQuery.h>
|
||||||
|
@ -126,7 +128,6 @@ class ISystemLog
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
virtual String getName() = 0;
|
virtual String getName() = 0;
|
||||||
virtual ASTPtr getCreateTableQuery(ParserSettingsImpl dt) = 0;
|
|
||||||
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
|
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
|
||||||
virtual void flush(bool force = false) = 0;
|
virtual void flush(bool force = false) = 0;
|
||||||
virtual void prepareTable() = 0;
|
virtual void prepareTable() = 0;
|
||||||
|
@ -223,8 +224,9 @@ public:
|
||||||
void shutdown() override
|
void shutdown() override
|
||||||
{
|
{
|
||||||
stopFlushThread();
|
stopFlushThread();
|
||||||
if (table)
|
auto t = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
|
||||||
table->flushAndShutdown();
|
if (t)
|
||||||
|
t->flushAndShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
String getName() override
|
String getName() override
|
||||||
|
@ -232,7 +234,8 @@ public:
|
||||||
return LogElement::name();
|
return LogElement::name();
|
||||||
}
|
}
|
||||||
|
|
||||||
ASTPtr getCreateTableQuery(ParserSettingsImpl dt) override;
|
ASTPtr getCreateTableQuery(ParserSettingsImpl dt);
|
||||||
|
ASTPtr getCreateTableQueryClean();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
/// use raw logger here because TextLog requires logger->setLevel() capability
|
/// use raw logger here because TextLog requires logger->setLevel() capability
|
||||||
|
@ -243,6 +246,8 @@ protected:
|
||||||
const StorageID table_id;
|
const StorageID table_id;
|
||||||
const String storage_def;
|
const String storage_def;
|
||||||
StoragePtr table;
|
StoragePtr table;
|
||||||
|
String create_query;
|
||||||
|
String old_create_query;
|
||||||
bool is_prepared = false;
|
bool is_prepared = false;
|
||||||
const size_t flush_interval_milliseconds;
|
const size_t flush_interval_milliseconds;
|
||||||
ThreadFromGlobalPool saving_thread;
|
ThreadFromGlobalPool saving_thread;
|
||||||
|
@ -296,6 +301,14 @@ SystemLog<LogElement>::SystemLog(
|
||||||
{
|
{
|
||||||
assert((database_name_ == DatabaseCatalog::SYSTEM_DATABASE) || (database_name_ == CNCH_SYSTEM_LOG_DB_NAME));
|
assert((database_name_ == DatabaseCatalog::SYSTEM_DATABASE) || (database_name_ == CNCH_SYSTEM_LOG_DB_NAME));
|
||||||
log = getRawLogger("SystemLog (" + database_name_ + "." + table_name_ + ")");
|
log = getRawLogger("SystemLog (" + database_name_ + "." + table_name_ + ")");
|
||||||
|
|
||||||
|
// Initialize `create_query` with dialect_type.
|
||||||
|
auto query_context = Context::createCopy(context);
|
||||||
|
auto dialect_type = ParserSettings::valueOf(query_context->getSettingsRef());
|
||||||
|
|
||||||
|
/// NOTE: CnchSystemLog has no storage_def when created. And it has a different logic of comparing table schema, see CnchSystemLog::prepareTable().
|
||||||
|
if (!storage_def.empty())
|
||||||
|
create_query = serializeAST(*getCreateTableQuery(dialect_type));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -573,6 +586,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
template <typename LogElement>
|
template <typename LogElement>
|
||||||
void SystemLog<LogElement>::prepareTable()
|
void SystemLog<LogElement>::prepareTable()
|
||||||
{
|
{
|
||||||
|
@ -585,15 +599,14 @@ void SystemLog<LogElement>::prepareTable()
|
||||||
|
|
||||||
if (table)
|
if (table)
|
||||||
{
|
{
|
||||||
auto metadata_columns = table->getInMemoryMetadataPtr()->getColumns();
|
if (old_create_query.empty())
|
||||||
auto old_query = InterpreterCreateQuery::formatColumns(metadata_columns, dialect_type);
|
{
|
||||||
|
old_create_query = serializeAST(*getCreateTableQueryClean());
|
||||||
|
if (old_create_query.empty())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty CREATE QUERY for {}", backQuoteIfNeed(table_id.table_name));
|
||||||
|
}
|
||||||
|
|
||||||
auto ordinary_columns = LogElement::getNamesAndTypes();
|
if (old_create_query != create_query)
|
||||||
auto alias_columns = LogElement::getNamesAndAliases();
|
|
||||||
auto current_query = InterpreterCreateQuery::formatColumns(
|
|
||||||
ordinary_columns, alias_columns, ParserSettings::valueOf(query_context->getSettingsRef()));
|
|
||||||
|
|
||||||
if (old_query->getTreeHash() != current_query->getTreeHash())
|
|
||||||
{
|
{
|
||||||
/// Rename the existing table.
|
/// Rename the existing table.
|
||||||
int suffix = 0;
|
int suffix = 0;
|
||||||
|
@ -619,9 +632,11 @@ void SystemLog<LogElement>::prepareTable()
|
||||||
|
|
||||||
LOG_DEBUG(
|
LOG_DEBUG(
|
||||||
log,
|
log,
|
||||||
"Existing table {} for system log has obsolete or different structure. Renaming it to {}",
|
"Existing table {} for system log has obsolete or different structure. Renaming it to {}.\nOld: {}\nNew: {}\n.",
|
||||||
description,
|
description,
|
||||||
backQuoteIfNeed(to.table));
|
backQuoteIfNeed(to.table),
|
||||||
|
old_create_query,
|
||||||
|
create_query);
|
||||||
|
|
||||||
query_context->makeQueryContext();
|
query_context->makeQueryContext();
|
||||||
InterpreterRenameQuery(rename, query_context).execute();
|
InterpreterRenameQuery(rename, query_context).execute();
|
||||||
|
@ -637,15 +652,16 @@ void SystemLog<LogElement>::prepareTable()
|
||||||
{
|
{
|
||||||
/// Create the table.
|
/// Create the table.
|
||||||
LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name());
|
LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name());
|
||||||
auto create = getCreateTableQuery(ParserSettings::valueOf(query_context->getSettingsRef()));
|
auto create_query_ast = getCreateTableQuery(dialect_type);
|
||||||
|
|
||||||
query_context->makeQueryContext();
|
query_context->makeQueryContext();
|
||||||
|
|
||||||
InterpreterCreateQuery interpreter(create, query_context);
|
InterpreterCreateQuery interpreter(create_query_ast, query_context);
|
||||||
interpreter.setInternal(true);
|
interpreter.setInternal(true);
|
||||||
interpreter.execute();
|
interpreter.execute();
|
||||||
|
|
||||||
table = DatabaseCatalog::instance().getTable(table_id, getContext());
|
table = DatabaseCatalog::instance().getTable(table_id, getContext());
|
||||||
|
|
||||||
|
old_create_query.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
is_prepared = true;
|
is_prepared = true;
|
||||||
|
@ -668,13 +684,39 @@ ASTPtr SystemLog<LogElement>::getCreateTableQuery(ParserSettingsImpl dt)
|
||||||
|
|
||||||
ParserStorage storage_parser(dt);
|
ParserStorage storage_parser(dt);
|
||||||
ASTPtr storage_ast = parseQuery(
|
ASTPtr storage_ast = parseQuery(
|
||||||
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
|
storage_parser,
|
||||||
"Storage to create table for " + LogElement::name(), 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
storage_def.data(),
|
||||||
|
storage_def.data() + storage_def.size(),
|
||||||
|
"Storage to create table for " + LogElement::name(),
|
||||||
|
0,
|
||||||
|
DBMS_DEFAULT_MAX_PARSER_DEPTH);
|
||||||
create->set(create->storage, storage_ast);
|
create->set(create->storage, storage_ast);
|
||||||
|
|
||||||
|
// Write additional (default) settings for (only) MergeTree engine to make it possible
|
||||||
|
// to compare ASTs and recreate tables on settings changes.
|
||||||
|
const auto & engine = create->storage->engine->as<ASTFunction &>();
|
||||||
|
if (endsWith(engine.name, "MergeTree"))
|
||||||
|
{
|
||||||
|
auto storage_settings = std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings());
|
||||||
|
storage_settings->loadFromQuery(*create->storage);
|
||||||
|
}
|
||||||
|
|
||||||
return create;
|
return create;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns CREATE TABLE query, but with removed UUID.
|
||||||
|
// This way it can be used to compared with the `SystemLog::getCreateTableQuery`.
|
||||||
|
template <typename LogElement>
|
||||||
|
ASTPtr SystemLog<LogElement>::getCreateTableQueryClean()
|
||||||
|
{
|
||||||
|
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name, getContext());
|
||||||
|
ASTPtr old_ast = database->getCreateTableQuery(table_id.table_name, getContext());
|
||||||
|
auto & old_create_query_ast = old_ast->as<ASTCreateQuery &>();
|
||||||
|
/// Reset UUID
|
||||||
|
old_create_query_ast.uuid = UUIDHelpers::Nil;
|
||||||
|
return old_ast;
|
||||||
|
}
|
||||||
|
|
||||||
template<typename TLogElement>
|
template<typename TLogElement>
|
||||||
class CnchSystemLog : public SystemLog<TLogElement>
|
class CnchSystemLog : public SystemLog<TLogElement>
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
# pylint: disable=line-too-long
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
# pylint: disable=redefined-outer-name
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
node = cluster.add_instance('node_default', stay_alive=True)
|
||||||
|
|
||||||
|
@pytest.fixture(scope='module', autouse=True)
|
||||||
|
def start_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
yield cluster
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_system_logs_recreate():
|
||||||
|
system_logs = [
|
||||||
|
# enabled by default
|
||||||
|
'query_log',
|
||||||
|
'query_thread_log',
|
||||||
|
'part_log',
|
||||||
|
'trace_log',
|
||||||
|
'metric_log',
|
||||||
|
]
|
||||||
|
|
||||||
|
node.query('SYSTEM FLUSH LOGS')
|
||||||
|
for table in system_logs:
|
||||||
|
assert 'ENGINE = MergeTree' in node.query(f'SHOW CREATE TABLE system.{table}')
|
||||||
|
assert 'ENGINE = Null' not in node.query(f'SHOW CREATE TABLE system.{table}')
|
||||||
|
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 1
|
||||||
|
|
||||||
|
# NOTE: we use zzz- prefix to make it the last file,
|
||||||
|
# so that it will be applied last.
|
||||||
|
for table in system_logs:
|
||||||
|
node.exec_in_container(['bash', '-c', f"""echo "
|
||||||
|
<clickhouse>
|
||||||
|
<{table}>
|
||||||
|
<engine>ENGINE = Null</engine>
|
||||||
|
<partition_by remove='remove'/>
|
||||||
|
</{table}>
|
||||||
|
</clickhouse>
|
||||||
|
" > /etc/clickhouse-server/config.d/zzz-override-{table}.xml
|
||||||
|
"""])
|
||||||
|
|
||||||
|
node.restart_clickhouse()
|
||||||
|
node.query('SYSTEM FLUSH LOGS')
|
||||||
|
for table in system_logs:
|
||||||
|
assert 'ENGINE = MergeTree' not in node.query(f'SHOW CREATE TABLE system.{table}')
|
||||||
|
assert 'ENGINE = Null' in node.query(f'SHOW CREATE TABLE system.{table}')
|
||||||
|
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 2
|
||||||
|
|
||||||
|
for table in system_logs:
|
||||||
|
node.exec_in_container(['rm', f'/etc/clickhouse-server/config.d/zzz-override-{table}.xml'])
|
||||||
|
|
||||||
|
node.restart_clickhouse()
|
||||||
|
node.query('SYSTEM FLUSH LOGS')
|
||||||
|
for table in system_logs:
|
||||||
|
assert 'ENGINE = MergeTree' in node.query(f'SHOW CREATE TABLE system.{table}')
|
||||||
|
assert 'ENGINE = Null' not in node.query(f'SHOW CREATE TABLE system.{table}')
|
||||||
|
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 3
|
||||||
|
|
||||||
|
node.query('SYSTEM FLUSH LOGS')
|
||||||
|
# Ensure that there was no superfluous RENAME's
|
||||||
|
# IOW that the table created only when the structure is indeed different.
|
||||||
|
for table in system_logs:
|
||||||
|
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 3
|
Loading…
Reference in New Issue