Fixed bug when connections are acquired from the pool with a different

class than the pool itself.
This commit is contained in:
Anthony Tuininga 2023-07-20 13:40:53 -06:00
parent 90872e23fa
commit d77bb0fd9d
5 changed files with 118 additions and 53 deletions

View File

@ -35,6 +35,9 @@ Thin Mode Changes
by a 16-byte unique identifier converted to base64 encoding. by a 16-byte unique identifier converted to base64 encoding.
#) Fixed bug when a dynamically sized pool is created with an ``increment`` #) Fixed bug when a dynamically sized pool is created with an ``increment``
of zero and the pool needs to grow. of zero and the pool needs to grow.
#) Fixed bug affecting connection reuse when connections were acquired from
the connection pool with a ``cclass`` different to the one used to
create the pool.
#) Fixed bug when a connection is discarded from the pool during #) Fixed bug when a connection is discarded from the pool during
:meth:`ConnectionPool.acquire()` and the ping check fails due to the :meth:`ConnectionPool.acquire()` and the ping check fails due to the
connection being dead. connection being dead.

View File

@ -785,12 +785,13 @@ cdef class Description:
temp_parts.append(f"(SID={self.sid})") temp_parts.append(f"(SID={self.sid})")
if self.server_type is not None: if self.server_type is not None:
temp_parts.append(f"(SERVER={self.server_type})") temp_parts.append(f"(SERVER={self.server_type})")
if cid is not None:
temp_parts.append(f"(CID={cid})")
else:
if self.cclass is not None: if self.cclass is not None:
temp_parts.append(f"(POOL_CONNECTION_CLASS={self.cclass})") temp_parts.append(f"(POOL_CONNECTION_CLASS={self.cclass})")
if self.purity != 0: if self.purity != 0:
temp_parts.append(f"(POOL_PURITY={self.purity})") temp_parts.append(f"(POOL_PURITY={self.purity})")
if cid is not None:
temp_parts.append(f"(CID={cid})")
if self.connection_id is not None: if self.connection_id is not None:
temp_parts.append(f"(CONNECTION_ID={self.connection_id})") temp_parts.append(f"(CONNECTION_ID={self.connection_id})")
if temp_parts: if temp_parts:

View File

@ -68,6 +68,7 @@ cdef class ThinConnImpl(BaseConnImpl):
int _dbobject_type_cache_num int _dbobject_type_cache_num
bytes _combo_key bytes _combo_key
str _connection_id str _connection_id
bint _is_pool_extra
def __init__(self, str dsn, ConnectParamsImpl params): def __init__(self, str dsn, ConnectParamsImpl params):
if not HAS_CRYPTOGRAPHY: if not HAS_CRYPTOGRAPHY:

View File

@ -36,12 +36,14 @@ cdef class ThinPoolImpl(BasePoolImpl):
list _free_used_conn_impls list _free_used_conn_impls
list _busy_conn_impls list _busy_conn_impls
list _conn_impls_to_drop list _conn_impls_to_drop
list _extra_conn_impls
uint32_t _getmode uint32_t _getmode
uint32_t _stmt_cache_size uint32_t _stmt_cache_size
uint32_t _timeout uint32_t _timeout
uint32_t _max_lifetime_session uint32_t _max_lifetime_session
uint32_t _num_waiters uint32_t _num_waiters
uint32_t _auth_mode uint32_t _auth_mode
uint32_t _num_to_create
int _ping_interval int _ping_interval
object _wait_timeout object _wait_timeout
object _bg_exc object _bg_exc
@ -72,6 +74,7 @@ cdef class ThinPoolImpl(BasePoolImpl):
self._free_used_conn_impls = [] self._free_used_conn_impls = []
self._busy_conn_impls = [] self._busy_conn_impls = []
self._conn_impls_to_drop = [] self._conn_impls_to_drop = []
self._extra_conn_impls = []
self._auth_mode = constants.AUTH_MODE_DEFAULT self._auth_mode = constants.AUTH_MODE_DEFAULT
self._open = True self._open = True
self._condition = threading.Condition() self._condition = threading.Condition()
@ -105,32 +108,34 @@ cdef class ThinPoolImpl(BasePoolImpl):
needed. The thread terminates automatically when the pool is closed. needed. The thread terminates automatically when the pool is closed.
""" """
cdef: cdef:
uint32_t num_conns, open_count uint32_t open_count, num_to_create
list conn_impls_to_drop list conn_impls_to_drop
bint wait bint wait
# create connections and close connections as needed # create connections and close connections as needed
while True: while True:
conn_impls_to_drop = [] conn_impls_to_drop = []
num_conns = 0
wait = True wait = True
# determine if there is any work to do # determine if there is any work to do
with self._condition: with self._condition:
open_count = self.get_open_count() open_count = self.get_open_count()
if self._open and \ if self._open and self._num_to_create == 0:
(self._num_waiters > 0 or open_count < self.min): if open_count < self.min:
num_conns = max(self.min - open_count, self._num_to_create = self.min - open_count
min(self.increment, self.max - open_count)) elif self._num_waiters > 0:
self._num_to_create = min(self.increment,
self.max - open_count)
if not self._open or self._bg_exc is None: if not self._open or self._bg_exc is None:
conn_impls_to_drop = self._conn_impls_to_drop conn_impls_to_drop = self._conn_impls_to_drop
self._conn_impls_to_drop = [] self._conn_impls_to_drop = []
# create connections, if needed # create connections, if needed
if num_conns > 0: if self._num_to_create > 0:
wait = False wait = False
self._create_conn_impls_helper(num_conns) num_to_create = self._num_to_create
if num_conns > 1: self._create_conn_impls_helper()
if num_to_create > 1:
self._check_timeout() self._check_timeout()
# close connections, if needed # close connections, if needed
@ -161,7 +166,7 @@ cdef class ThinPoolImpl(BasePoolImpl):
self._process_timeout) self._process_timeout)
self._timer_thread.start() self._timer_thread.start()
cdef int _create_conn_impls_helper(self, uint32_t num_conns) except -1: cdef int _create_conn_impls_helper(self) except -1:
""" """
Helper method which creates the specified number of connections. In Helper method which creates the specified number of connections. In
order to prevent the thread from dying, exceptions are captured and order to prevent the thread from dying, exceptions are captured and
@ -171,8 +176,7 @@ cdef class ThinPoolImpl(BasePoolImpl):
cdef: cdef:
ThinConnImpl conn_impl ThinConnImpl conn_impl
object exc object exc
uint32_t i while self._num_to_create > 0:
for i in range(num_conns):
conn_impl = None conn_impl = None
try: try:
conn_impl = self._create_conn_impl() conn_impl = self._create_conn_impl()
@ -182,6 +186,7 @@ cdef class ThinPoolImpl(BasePoolImpl):
if conn_impl is not None: if conn_impl is not None:
if self._open: if self._open:
self._free_new_conn_impls.append(conn_impl) self._free_new_conn_impls.append(conn_impl)
self._num_to_create -= 1
else: else:
conn_impl._force_close() conn_impl._force_close()
break break
@ -215,7 +220,7 @@ cdef class ThinPoolImpl(BasePoolImpl):
self._bg_exc = e self._bg_exc = e
cdef object _get_connection(self, bint wants_new, bint cclass_matches, cdef object _get_connection(self, bint wants_new, bint cclass_matches,
str cclass): str cclass, bint* must_reconnect):
""" """
Returns a connection from the pool if one is available. If one is not Returns a connection from the pool if one is available. If one is not
available and a new connection needs to be created, the value True is available and a new connection needs to be created, the value True is
@ -223,6 +228,7 @@ cdef class ThinPoolImpl(BasePoolImpl):
""" """
cdef: cdef:
ThinConnImpl conn_impl ThinConnImpl conn_impl
uint32_t open_count
object exc object exc
ssize_t i ssize_t i
@ -233,45 +239,51 @@ cdef class ThinPoolImpl(BasePoolImpl):
raise exc raise exc
# check for an available used connection (only permitted if a new # check for an available used connection (only permitted if a new
# connection is not required); in addition, if the cclass does not # connection is not required); in addition, ensure that the cclass
# match, a new connection will be forced if one cannot be found # matches
if not wants_new and self._free_used_conn_impls: if not wants_new and self._free_used_conn_impls:
if cclass_matches: for i, conn_impl in enumerate(reversed(self._free_used_conn_impls)):
conn_impl = self._free_used_conn_impls.pop() if cclass is None or conn_impl._cclass == cclass:
self._busy_conn_impls.append(conn_impl) i = len(self._free_used_conn_impls) - i - 1
return conn_impl
i = len(self._free_used_conn_impls) - 1
while i >= 0:
conn_impl = self._free_used_conn_impls[i]
if conn_impl._cclass == cclass:
self._free_used_conn_impls.pop(i) self._free_used_conn_impls.pop(i)
self._busy_conn_impls.append(conn_impl)
return conn_impl return conn_impl
i -= 1
# check for an available new connection (only permitted if the cclass # check for an available new connection (only permitted if the cclass
# matches) # matches)
if cclass_matches and self._free_new_conn_impls: if cclass_matches and self._free_new_conn_impls:
conn_impl = self._free_new_conn_impls.pop() return self._free_new_conn_impls.pop()
self._busy_conn_impls.append(conn_impl)
return conn_impl
# no connections are immediately available; if a brand new connection # no matching connections are available; if the pool is full, see if
# is desired, the cclass doesn't match, or the pool is full and a # any connections are available and if so, return one of them (but let
# getmode of FORCE has been specified, force the creation of a new # the caller know that it must be discarded and a new connection
# connection # created); if no connections are available to replace but the force
if wants_new or not cclass_matches \ # get flag is set, return True which signals that a new "extra"
or (self._force_get and self.get_open_count() >= self.max): # connection will be created; a new "extra" connection is also created
# if the connection class doesn't match, since the background thread
# will only create connections with the pool's connection class
open_count = self.get_open_count() + self._num_to_create
if open_count >= self.max:
if self._free_new_conn_impls:
must_reconnect[0] = True
return self._free_new_conn_impls.pop()
elif self._free_used_conn_impls:
must_reconnect[0] = True
return self._free_used_conn_impls.pop()
elif self._force_get:
return True
elif not cclass_matches:
return True return True
# wake up the background thread to create a connection # wake up the background thread to create a connection if the pool
# is not already full
if open_count < self.max:
with self._bg_thread_condition: with self._bg_thread_condition:
self._bg_thread_condition.notify() self._bg_thread_condition.notify()
cdef ThinConnImpl _acquire_helper(self, ConnectParamsImpl params): cdef ThinConnImpl _acquire_helper(self, ConnectParamsImpl params):
cdef: cdef:
ConnectParamsImpl creation_params = self.connect_params ConnectParamsImpl creation_params = self.connect_params
bint wants_new, cclass_matches bint wants_new, cclass_matches, must_reconnect = False
object result, predicate object result, predicate
str pool_cclass, cclass str pool_cclass, cclass
ThinConnImpl conn_impl ThinConnImpl conn_impl
@ -284,7 +296,7 @@ cdef class ThinPoolImpl(BasePoolImpl):
wants_new = (params._default_description.purity == PURITY_NEW) wants_new = (params._default_description.purity == PURITY_NEW)
cclass_matches = (cclass is None or cclass == pool_cclass) cclass_matches = (cclass is None or cclass == pool_cclass)
predicate = lambda: self._get_connection(wants_new, cclass_matches, predicate = lambda: self._get_connection(wants_new, cclass_matches,
cclass) cclass, &must_reconnect)
# get a connection from the pool; if one is not immediately available, # get a connection from the pool; if one is not immediately available,
# wait as long as requested for one to be made available. # wait as long as requested for one to be made available.
@ -298,19 +310,20 @@ cdef class ThinPoolImpl(BasePoolImpl):
if result is None: if result is None:
errors._raise_err(errors.ERR_POOL_NO_CONNECTION_AVAILABLE) errors._raise_err(errors.ERR_POOL_NO_CONNECTION_AVAILABLE)
if isinstance(result, ThinConnImpl): if isinstance(result, ThinConnImpl):
self._busy_conn_impls.append(result)
if not must_reconnect:
return result return result
# no connection was returned from the pool so a new connection needs to # no connection was returned from the pool so a new connection needs to
# be created # be created
conn_impl = self._create_conn_impl(params=params) conn_impl = self._create_conn_impl(params=params)
with self._condition: with self._condition:
if self.get_open_count() < self.max: if must_reconnect:
self._busy_conn_impls.append(conn_impl) self._busy_conn_impls.remove(result)
elif self._free_used_conn_impls:
self._drop_conn_impl(self._free_used_conn_impls.pop(0))
self._busy_conn_impls.append(conn_impl) self._busy_conn_impls.append(conn_impl)
else: else:
conn_impl._pool = None conn_impl._is_pool_extra = True
self._extra_conn_impls.append(conn_impl)
return conn_impl return conn_impl
def _process_timeout(self): def _process_timeout(self):
@ -328,20 +341,37 @@ cdef class ThinPoolImpl(BasePoolImpl):
""" """
Returns the connection to the pool. If the connection was closed for Returns the connection to the pool. If the connection was closed for
some reason it will be dropped; otherwise, it will be returned to the some reason it will be dropped; otherwise, it will be returned to the
list of connections available for further use. list of connections available for further use. If an "extra" connection
was created (because the pool has a mode of "force" get or because a
different connection class than that used by the pool was requested)
then it will be added to the pool or will replace an unused new
connection or will be discarded depending on the current pool size.
""" """
cdef: cdef:
bint is_open = conn_impl._protocol._socket is not None
ThinDbObjectTypeCache type_cache ThinDbObjectTypeCache type_cache
uint32_t open_count
int cache_num int cache_num
with self._condition: with self._condition:
if conn_impl._dbobject_type_cache_num > 0: if conn_impl._dbobject_type_cache_num > 0:
cache_num = conn_impl._dbobject_type_cache_num cache_num = conn_impl._dbobject_type_cache_num
type_cache = get_dbobject_type_cache(cache_num) type_cache = get_dbobject_type_cache(cache_num)
type_cache._clear_meta_cursor() type_cache._clear_meta_cursor()
if conn_impl._protocol._socket is not None: if conn_impl._is_pool_extra:
self._extra_conn_impls.remove(conn_impl)
conn_impl._is_pool_extra = False
open_count = self.get_open_count() + self._num_to_create
if is_open and open_count >= self.max:
if self._free_new_conn_impls and open_count == self.max:
self._drop_conn_impl(self._free_new_conn_impls.pop(0))
else:
self._drop_conn_impl(conn_impl)
is_open = False
else:
self._busy_conn_impls.remove(conn_impl)
if is_open:
self._free_used_conn_impls.append(conn_impl) self._free_used_conn_impls.append(conn_impl)
conn_impl._time_in_pool = time.monotonic() conn_impl._time_in_pool = time.monotonic()
self._busy_conn_impls.remove(conn_impl)
self._check_timeout() self._check_timeout()
self._condition.notify() self._condition.notify()
@ -436,7 +466,8 @@ cdef class ThinPoolImpl(BasePoolImpl):
self._open = False self._open = False
for lst in (self._free_used_conn_impls, for lst in (self._free_used_conn_impls,
self._free_new_conn_impls, self._free_new_conn_impls,
self._busy_conn_impls): self._busy_conn_impls,
self._extra_conn_impls):
self._conn_impls_to_drop.extend(lst) self._conn_impls_to_drop.extend(lst)
for conn_impl in lst: for conn_impl in lst:
conn_impl._pool = None conn_impl._pool = None

View File

@ -754,5 +754,34 @@ class TestCase(test_env.BaseTestCase):
self.assertEqual(pool.increment, 0) self.assertEqual(pool.increment, 0)
conn = pool.acquire() conn = pool.acquire()
def test_2435_acquire_with_different_cclass(self):
"2435 - verify that connection with different cclass is reused"
cclass = "cclass2435"
pool = test_env.get_pool(min=1, max=1)
# ignore the first acquire which, depending on the speed with which the
# minimum connections are created, may create a connection that is
# discarded; instead, use the second acquire which should remain in the
# pool
with pool.acquire(cclass=cclass) as conn:
pass
with pool.acquire(cclass=cclass) as conn:
with conn.cursor() as cursor:
cursor.execute("""
select
dbms_debug_jdwp.current_session_id || ',' ||
dbms_debug_jdwp.current_session_serial
from dual""")
sid_serial, = cursor.fetchone()
with pool.acquire(cclass=cclass) as conn:
with conn.cursor() as cursor:
cursor.execute("""
select
dbms_debug_jdwp.current_session_id || ',' ||
dbms_debug_jdwp.current_session_serial
from dual""")
next_sid_serial, = cursor.fetchone()
self.assertEqual(next_sid_serial, sid_serial)
self.assertEqual(pool.opened, 1)
if __name__ == "__main__": if __name__ == "__main__":
test_env.run_test_cases() test_env.run_test_cases()