Merge 'optimize_closeSession_lock_dev' into 'cnch-dev'

feat(clickhousech@m-5371472614):[TO cnch-dev]Optimize session lock in cleanThread when releasing

See merge request: !25928
This commit is contained in:
魏祥威 2024-10-23 03:31:26 +00:00 committed by Fred Wang
parent 3be231242e
commit 76f29c9ec1
4 changed files with 227 additions and 96 deletions

View File

@ -217,7 +217,10 @@
M(IOSchRawRequests, "RawRequests in deadline scheduler") \
\
M(IOUringPendingEvents, "Number of io_uring SQEs waiting to be submitted") \
M(IOUringInFlightEvents, "Number of io_uring SQEs in flight")
M(IOUringInFlightEvents, "Number of io_uring SQEs in flight") \
\
M(ActiveCnchSession, "Number of Cnch session in active") \
M(ActiveHttpSession, "Number of Cnch session in active") \
namespace CurrentMetrics
{

View File

@ -15,13 +15,20 @@
#include <Interpreters/NamedSession.h>
#include <Common/setThreadName.h>
#include <CloudServices/CnchWorkerResource.h>
#include <Interpreters/Context.h>
#include <Transaction/TxnTimestamp.h>
#include <CloudServices/CnchWorkerResource.h>
#include <Common/CurrentMetrics.h>
#include <Common/setThreadName.h>
#include <chrono>
namespace CurrentMetrics
{
extern const Metric ActiveCnchSession;
extern const Metric ActiveHttpSession;
}
namespace DB
{
@ -58,25 +65,26 @@ std::shared_ptr<NamedSession> NamedSessionsImpl<NamedSession>::acquireSession(
bool throw_if_not_found,
bool return_null_if_not_found)
{
Poco::Timestamp current_time;
std::unique_lock lock(mutex);
auto it = sessions.find(session_id);
if (it == sessions.end())
auto & sessions_by_key = sessions.template get<1>();
auto it = sessions_by_key.find(session_id);
bool session_exist = true;
if (it == sessions_by_key.end())
{
if (return_null_if_not_found)
{
return nullptr;
}
if (throw_if_not_found)
throw Exception("Session not found.", ErrorCodes::SESSION_NOT_FOUND);
it = sessions.insert(std::make_pair(session_id, std::make_shared<NamedSession>(session_id, context, timeout, *this))).first;
session_exist = false;
it = sessions_by_key.insert(std::make_shared<NamedSession>(session_id, context, timeout, *this)).first;
}
/// Use existing session.
const auto & session = it->second;
scheduleCloseSession(*session, lock);
const auto & session = *it;
/// For cnch, it's of for session to not be unique, e.g. in union query, the sub-query will have same transaction id,
/// therefore they shared same session on worker.
@ -84,118 +92,116 @@ std::shared_ptr<NamedSession> NamedSessionsImpl<NamedSession>::acquireSession(
{
if (!session.unique())
throw Exception("Session is locked by a concurrent client.", ErrorCodes::SESSION_IS_LOCKED);
// If http session enter again, try update timeout of it
if (session_exist)
{
size_t new_close_time = current_time.epochTime() + timeout;
if (session->close_time < new_close_time)
sessions.template get<1>().modify(it, [&new_close_time](auto & temp) { temp->close_time = new_close_time; });
}
}
return session;
}
template<typename NamedSession>
std::vector<std::pair<typename NamedSession::NamedSessionKey, std::shared_ptr<CnchWorkerResource>>> NamedSessionsImpl<NamedSession>::getAllWorkerResources() const
template <typename NamedSession>
std::vector<std::pair<typename NamedSession::NamedSessionKey, std::shared_ptr<CnchWorkerResource>>>
NamedSessionsImpl<NamedSession>::getAllWorkerResources() const
{
std::lock_guard lock(mutex);
std::vector<std::pair<Key, CnchWorkerResourcePtr>> res;
for (const auto & [key, session]: sessions)
std::lock_guard lock(mutex);
for (const auto & session : sessions)
{
if (auto resource = session->context->getCnchWorkerResource())
res.emplace_back(key, resource);
res.emplace_back(session->key, resource);
}
return res;
}
template<typename NamedSession>
void NamedSessionsImpl<NamedSession>::scheduleCloseSession(NamedSession & session, std::unique_lock<std::mutex> &)
{
/// Push it on a queue of sessions to close, on a position corresponding to the timeout.
/// (timeout is measured from current moment of time)
Poco::Timestamp current_time;
if (session.close_time < current_time.epochTime() + session.timeout)
{
session.close_time = current_time.epochTime() + session.timeout;
close_times.emplace(session.close_time, session.key);
}
}
template<typename NamedSession>
void NamedSessionsImpl<NamedSession>::cleanThread()
{
setThreadName("SessionCleaner");
std::unique_lock lock{mutex};
while (true)
{
auto interval = closeSessions(lock);
auto interval = closeSessions();
std::unique_lock lock{mutex};
if (cond.wait_for(lock, interval, [this]() -> bool { return quit; }))
break;
}
}
template<typename NamedSession>
std::chrono::steady_clock::duration NamedSessionsImpl<NamedSession>::closeSessions(std::unique_lock<std::mutex> & lock)
template <typename NamedSession>
std::chrono::steady_clock::duration NamedSessionsImpl<NamedSession>::closeSessions()
{
/// Schedule closeSessions() every 1 second by default.
static constexpr std::chrono::steady_clock::duration close_interval = std::chrono::seconds(1);
auto log = getLogger("NamedSession");
if (close_times.empty())
return close_interval;
static constexpr std::chrono::steady_clock::duration close_interval = std::chrono::seconds(10);
static constexpr size_t max_batch_clean_size = 100;
Poco::Timestamp current_time;
std::vector<String> released_sessions;
while (!close_times.empty())
{
auto curr_session = *close_times.begin();
std::unique_lock lock{mutex};
auto & sessions_by_close = sessions.template get<0>();
auto session_iter = sessions_by_close.begin();
if (curr_session.first > static_cast<size_t>(current_time.epochTime()))
break;
auto session_iter = sessions.find(curr_session.second);
if (session_iter != sessions.end() && session_iter->second->close_time == curr_session.first)
while (session_iter != sessions_by_close.end()
&& (*session_iter)->close_time
<= static_cast<size_t>(current_time.epochTime() && released_sessions.size() < max_batch_clean_size))
{
if (!session_iter->second.unique())
if ((*session_iter).unique())
{
/// Skip to recycle and move it to close on the next interval.
session_iter->second->timeout = 1;
scheduleCloseSession(*session_iter->second, lock);
released_sessions.emplace_back((*session_iter)->getID());
session_iter = sessions_by_close.erase(session_iter);
}
else
{
LOG_DEBUG(log, "Release timed out session: {}", session_iter->second->getID());
sessions.erase(session_iter);
}
session_iter++;
}
close_times.erase(close_times.begin());
}
for (auto & session_id : released_sessions)
LOG_INFO(getLogger("NamedSessionImpl"), "release timed out session: {}", session_id);
return close_interval;
}
NamedSession::NamedSession(NamedSessionKey key_, ContextPtr context_, size_t timeout_, NamedSessions & parent_)
: key(key_), context(Context::createCopy(context_)), timeout(timeout_), parent(parent_)
{
CurrentMetrics::add(CurrentMetrics::ActiveHttpSession);
close_time = Poco::Timestamp().epochTime() + timeout;
}
NamedSession::~NamedSession()
{
CurrentMetrics::sub(CurrentMetrics::ActiveHttpSession);
}
void NamedSession::release()
{
parent.releaseSession(*this);
parent.tryUpdateSessionCloseTime(*this, Poco::Timestamp().epochTime() + timeout);
}
NamedCnchSession::NamedCnchSession(NamedSessionKey key_, ContextPtr context_, size_t timeout_, NamedCnchSessions & parent_)
: key(key_), context(Context::createCopy(context_)), timeout(timeout_), parent(parent_)
{
CurrentMetrics::add(CurrentMetrics::ActiveCnchSession);
context->worker_resource = std::make_shared<CnchWorkerResource>();
close_time = Poco::Timestamp().epochTime() + timeout;
}
NamedCnchSession::~NamedCnchSession()
{
CurrentMetrics::sub(CurrentMetrics::ActiveCnchSession);
}
void NamedCnchSession::release()
{
timeout = 0; /// schedule immediately
close_time = 0;
parent.releaseSession(*this);
LOG_DEBUG(getLogger("NamedCnchSession"), "Release CnchWorkerResource {}", key);
LOG_DEBUG(getLogger("NamedCnchSession"), "release CnchWorkerResource({})", key);
}
void NamedCnchSession::registerPlanSegmentsCount(size_t _plan_segments_count)

View File

@ -15,9 +15,13 @@
#pragma once
#include <Core/Types.h>
#include <Interpreters/Context_fwd.h>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index_container.hpp>
#include <Common/SipHash.h>
#include <Common/ThreadPool.h>
#include <Interpreters/Context_fwd.h>
#include <atomic>
#include <mutex>
@ -33,38 +37,57 @@ class NamedSessionsImpl
{
public:
using Key = typename NamedSession::NamedSessionKey;
using SessionKeyHash = typename NamedSession::SessionKeyHash;
using SessionPtr = std::shared_ptr<NamedSession>;
// sessions can be indexed by both close_time or txn_id
// we sort sessions by close_time to release timeout session in order, use txd_id to release specific session
using SessionContainer = boost::multi_index::multi_index_container<
SessionPtr,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::member<NamedSession, size_t, &NamedSession::close_time>>,
boost::multi_index::hashed_unique<boost::multi_index::member<NamedSession, Key, &NamedSession::key>>>>;
~NamedSessionsImpl();
/// Find existing session or create a new.
std::shared_ptr<NamedSession> acquireSession(
const Key & session_id,
ContextPtr context,
size_t timeout,
bool throw_if_not_found,
bool return_null_if_not_found = false);
const Key & session_id, ContextPtr context, size_t timeout, bool throw_if_not_found, bool return_null_if_not_found = false);
void releaseSession(NamedSession & session)
{
LOG_DEBUG(getLogger("NamedSessionImpl"), "release finished session: {}", session.getID());
std::unique_lock lock(mutex);
scheduleCloseSession(session, lock);
auto & sessions_by_key = sessions.template get<1>();
sessions_by_key.erase(session.key);
}
void tryUpdateSessionCloseTime(NamedSession & session, size_t new_close_time)
{
if (session.close_time < new_close_time)
{
std::unique_lock lock(mutex);
auto & sessions_by_key = sessions.template get<1>();
sessions_by_key.modify(
sessions_by_key.find(session.key), [&new_close_time](auto & temp) { temp->close_time = new_close_time; });
}
}
std::vector<std::pair<Key, std::shared_ptr<CnchWorkerResource>>> getAllWorkerResources() const;
private:
using Container = std::unordered_map<Key, std::shared_ptr<NamedSession>, SessionKeyHash>;
using CloseTimes = std::multimap<size_t, Key>;
Container sessions;
CloseTimes close_times;
// Used only for test
size_t getCurrentActiveSession() const
{
std::unique_lock lock(mutex);
return sessions.size();
}
void scheduleCloseSession(NamedSession & session, std::unique_lock<std::mutex> &);
private:
SessionContainer sessions;
void cleanThread();
/// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added.
std::chrono::steady_clock::duration closeSessions(std::unique_lock<std::mutex> & lock);
/// Close sessions, that has been expired. ATTENTION: you need have a lock before calling this method.
std::chrono::steady_clock::duration closeSessions();
mutable std::mutex mutex;
std::condition_variable cond;
@ -82,29 +105,25 @@ using NamedCnchSessions = NamedSessionsImpl<NamedCnchSession>;
struct NamedSession
{
/// User name and session identifier. Named sessions are local to users.
using NamedSessionKey = std::pair<String, String>;
struct NamedSessionKey
{
String session_id;
String user;
bool operator==(const NamedSessionKey & other) const { return session_id == other.session_id && user == other.user; }
};
NamedSessionKey key;
ContextMutablePtr context;
size_t timeout;
size_t timeout{0};
size_t close_time{0};
NamedSessionsImpl<NamedSession> & parent;
NamedSession(NamedSessionKey key_, ContextPtr context_, size_t timeout_, NamedSessions & parent_);
~NamedSession();
void release();
String getID() const { return key.first + "-" + key.second; }
class SessionKeyHash
{
public:
size_t operator()(const NamedSessionKey & session_key) const
{
SipHash hash;
hash.update(session_key.first);
hash.update(session_key.second);
return hash.get64();
}
};
String getID() const { return key.session_id + "-" + key.user; }
};
struct NamedCnchSession
@ -114,11 +133,12 @@ struct NamedCnchSession
NamedSessionKey key;
ContextMutablePtr context;
size_t timeout;
size_t timeout{0};
size_t close_time{0};
NamedSessionsImpl<NamedCnchSession> & parent;
NamedCnchSession(NamedSessionKey key_, ContextPtr context_, size_t timeout_, NamedCnchSessions & parent_);
~NamedCnchSession();
void release();
std::optional<std::atomic_size_t> plan_segments_count;
@ -128,4 +148,12 @@ struct NamedCnchSession
String getID() const { return std::to_string(key); }
};
inline std::size_t hash_value(const NamedSession::NamedSessionKey & session_key)
{
SipHash hash;
hash.update(session_key.session_id);
hash.update(session_key.user);
return hash.get64();
}
}

View File

@ -0,0 +1,94 @@
#include <Interpreters/Context.h>
#include <Interpreters/NamedSession.h>
#include <gtest/gtest.h>
#include <Common/tests/gtest_global_context.h>
using namespace DB;
TEST(NamedSessionsTest, AcquireAndReleaseSession)
{
NamedCnchSessions sessions;
auto context = Context::createCopy(getContext().context);
auto session = sessions.acquireSession(1, context, 10, false, false);
// 2 = shared_ptr of current session + that in sessions
ASSERT_EQ(session.use_count(), 2);
auto session2 = sessions.acquireSession(2, context, 30, false, false);
auto session3 = sessions.acquireSession(3, context, 20, false, false);
ASSERT_EQ(sessions.getCurrentActiveSession(), 3);
session->release();
ASSERT_EQ(sessions.getCurrentActiveSession(), 2);
session2->release();
session3->release();
ASSERT_EQ(sessions.getCurrentActiveSession(), 0);
}
TEST(NamedSessionsTest, SessionContainerTest)
{
NamedCnchSessions parent;
NamedCnchSessions::SessionContainer sessions;
auto context = Context::createCopy(getContext().context);
sessions.insert(std::make_shared<NamedCnchSession>(1, context, 10, parent));
sessions.insert(std::make_shared<NamedCnchSession>(2, context, 30, parent));
sessions.insert(std::make_shared<NamedCnchSession>(3, context, 20, parent));
ASSERT_EQ(sessions.size(), 3);
// verify traverse by close time
auto & sessions_by_close = sessions.template get<0>();
auto session_iter = sessions_by_close.begin();
ASSERT_EQ((*session_iter)->timeout, 10);
session_iter++;
ASSERT_EQ((*session_iter)->timeout, 20);
// verify erase by iterator and the iterator position return by erase()
session_iter = sessions_by_close.erase(session_iter);
ASSERT_EQ((*session_iter)->timeout, 30);
session_iter++;
ASSERT_TRUE(session_iter == sessions_by_close.end());
ASSERT_EQ(sessions.size(), 2);
// verify find by session key
auto & sessions_by_key = sessions.template get<1>();
auto session_iter_2 = sessions_by_key.find(1);
ASSERT_EQ((*session_iter_2)->timeout, 10);
// verify erase by session key
sessions_by_key.erase(1);
ASSERT_EQ(sessions.size(), 1);
}
TEST(NamedSessionsTest, SessionContainerUpdateTest)
{
NamedCnchSessions parent;
NamedCnchSessions::SessionContainer sessions;
auto context = Context::createCopy(getContext().context);
sessions.insert(std::make_shared<NamedCnchSession>(1, context, 10, parent));
sessions.insert(std::make_shared<NamedCnchSession>(2, context, 30, parent));
sessions.insert(std::make_shared<NamedCnchSession>(3, context, 20, parent));
// verify the sequence after updating close time
size_t timeout = 100;
auto & sessions_by_key = sessions.template get<1>();
sessions_by_key.modify(sessions_by_key.find(1), [&timeout](auto & local_session) {
local_session->timeout = timeout;
local_session->close_time = Poco::Timestamp().epochTime() + timeout;
});
timeout = 50;
sessions_by_key.modify(sessions_by_key.find(3), [&timeout](auto & local_session) {
local_session->timeout = timeout;
local_session->close_time = Poco::Timestamp().epochTime() + timeout;
});
auto & sessions_by_close = sessions.template get<0>();
auto session_iter = sessions_by_close.begin();
ASSERT_EQ((*session_iter)->timeout, 30);
session_iter++;
ASSERT_EQ((*session_iter)->timeout, 50);
session_iter++;
ASSERT_EQ((*session_iter)->timeout, 100);
session_iter++;
ASSERT_TRUE(session_iter == sessions_by_close.end());
}