559 lines
25 KiB
Python
559 lines
25 KiB
Python
#------------------------------------------------------------------------------
|
|
# Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
|
|
#
|
|
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
|
|
#
|
|
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
|
|
# Canada. All rights reserved.
|
|
#------------------------------------------------------------------------------
|
|
|
|
"""
|
|
2400 - Module for testing session pools
|
|
"""
|
|
|
|
import threading
|
|
|
|
import cx_Oracle as oracledb
|
|
import test_env
|
|
|
|
class TestCase(test_env.BaseTestCase):
|
|
require_connection = False
|
|
|
|
def __connect_and_drop(self):
|
|
with self.pool.acquire() as connection:
|
|
cursor = connection.cursor()
|
|
cursor.execute("select count(*) from TestNumbers")
|
|
count, = cursor.fetchone()
|
|
self.assertEqual(count, 10)
|
|
|
|
def __connect_and_generate_error(self):
|
|
with self.pool.acquire() as connection:
|
|
cursor = connection.cursor()
|
|
self.assertRaises(oracledb.DatabaseError, cursor.execute,
|
|
"select 1 / 0 from dual")
|
|
|
|
def __callable_session_callback(self, conn, requested_tag):
|
|
self.session_called = True
|
|
|
|
supported_formats = {
|
|
"SIMPLE" : "'YYYY-MM-DD HH24:MI'",
|
|
"FULL" : "'YYYY-MM-DD HH24:MI:SS'"
|
|
}
|
|
|
|
supported_time_zones = {
|
|
"UTC" : "'UTC'",
|
|
"MST" : "'-07:00'"
|
|
}
|
|
|
|
supported_keys = {
|
|
"NLS_DATE_FORMAT" : supported_formats,
|
|
"TIME_ZONE" : supported_time_zones
|
|
}
|
|
if requested_tag is not None:
|
|
state_parts = []
|
|
for directive in requested_tag.split(";"):
|
|
parts = directive.split("=")
|
|
if len(parts) != 2:
|
|
raise ValueError("Tag must contain key=value pairs")
|
|
key, value = parts
|
|
value_dict = supported_keys.get(key)
|
|
if value_dict is None:
|
|
raise ValueError("Tag only supports keys: %s" % \
|
|
(", ".join(supported_keys)))
|
|
actual_value = value_dict.get(value)
|
|
if actual_value is None:
|
|
raise ValueError("Key %s only supports values: %s" % \
|
|
(key, ", ".join(value_dict)))
|
|
state_parts.append("%s = %s" % (key, actual_value))
|
|
sql = "alter session set %s" % " ".join(state_parts)
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql)
|
|
conn.tag = requested_tag
|
|
|
|
def __perform_reconfigure_test(self, parameter_name, parameter_value,
|
|
min=3, max=30, increment=4, timeout=5,
|
|
wait_timeout=5000, stmtcachesize=25,
|
|
max_lifetime_session=1000,
|
|
max_sessions_per_shard=3, ping_interval=30,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT):
|
|
creation_args = dict(min=min, max=max, increment=increment,
|
|
timeout=timeout, wait_timeout=wait_timeout,
|
|
stmtcachesize=stmtcachesize,
|
|
max_lifetime_session=max_lifetime_session,
|
|
max_sessions_per_shard=max_sessions_per_shard,
|
|
ping_interval=ping_interval, getmode=getmode)
|
|
reconfigure_args = {}
|
|
reconfigure_args[parameter_name] = parameter_value
|
|
|
|
pool = test_env.get_pool(**creation_args)
|
|
connection = pool.acquire()
|
|
pool.reconfigure(**reconfigure_args)
|
|
actual_args = dict(min=pool.min, max=pool.max,
|
|
increment=pool.increment, timeout=pool.timeout,
|
|
wait_timeout=pool.wait_timeout,
|
|
stmtcachesize=pool.stmtcachesize,
|
|
max_lifetime_session=pool.max_lifetime_session,
|
|
max_sessions_per_shard=pool.max_sessions_per_shard,
|
|
ping_interval=pool.ping_interval,
|
|
getmode=pool.getmode)
|
|
expected_args = creation_args.copy()
|
|
expected_args.update(reconfigure_args)
|
|
self.assertEqual(actual_args, expected_args)
|
|
|
|
def __verify_connection(self, connection, expected_user,
|
|
expected_proxy_user=None):
|
|
cursor = connection.cursor()
|
|
cursor.execute("""
|
|
select
|
|
sys_context('userenv', 'session_user'),
|
|
sys_context('userenv', 'proxy_user')
|
|
from dual""")
|
|
actual_user, actual_proxy_user = cursor.fetchone()
|
|
self.assertEqual(actual_user, expected_user.upper())
|
|
self.assertEqual(actual_proxy_user,
|
|
expected_proxy_user and expected_proxy_user.upper())
|
|
|
|
def test_2400_pool(self):
|
|
"2400 - test that the pool is created and has the right attributes"
|
|
pool = test_env.get_pool(min=2, max=8, increment=3,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
self.assertEqual(pool.username, test_env.get_main_user(),
|
|
"user name differs")
|
|
self.assertEqual(pool.tnsentry, test_env.get_connect_string(),
|
|
"tnsentry differs")
|
|
self.assertEqual(pool.dsn, test_env.get_connect_string(),
|
|
"dsn differs")
|
|
self.assertEqual(pool.max, 8, "max differs")
|
|
self.assertEqual(pool.min, 2, "min differs")
|
|
self.assertEqual(pool.increment, 3, "increment differs")
|
|
self.assertEqual(pool.busy, 0, "busy not 0 at start")
|
|
connection_1 = pool.acquire()
|
|
self.assertEqual(pool.busy, 1, "busy not 1 after acquire")
|
|
connection_2 = pool.acquire()
|
|
self.assertEqual(pool.busy, 2, "busy not 2 after acquire")
|
|
self.assertEqual(pool.opened, 2, "opened differs")
|
|
connection_3 = pool.acquire()
|
|
self.assertEqual(pool.busy, 3, "busy not 3 after acquire")
|
|
pool.release(connection_3)
|
|
self.assertEqual(pool.busy, 2, "busy not 2 after release")
|
|
pool.release(connection_1)
|
|
pool.release(connection_2)
|
|
self.assertEqual(pool.busy, 0, "busy not 0 after release")
|
|
pool.getmode = oracledb.SPOOL_ATTRVAL_NOWAIT
|
|
self.assertEqual(pool.getmode, oracledb.SPOOL_ATTRVAL_NOWAIT)
|
|
if test_env.get_client_version() >= (12, 2):
|
|
pool.getmode = oracledb.SPOOL_ATTRVAL_TIMEDWAIT
|
|
self.assertEqual(pool.getmode, oracledb.SPOOL_ATTRVAL_TIMEDWAIT)
|
|
pool.stmtcachesize = 50
|
|
self.assertEqual(pool.stmtcachesize, 50)
|
|
pool.timeout = 10
|
|
self.assertEqual(pool.timeout, 10)
|
|
if test_env.get_client_version() >= (12, 1):
|
|
pool.max_lifetime_session = 10
|
|
self.assertEqual(pool.max_lifetime_session, 10)
|
|
|
|
def test_2401_proxy_auth(self):
|
|
"2401 - test that proxy authentication is possible"
|
|
pool = test_env.get_pool(min=2, max=8, increment=3,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
self.assertEqual(pool.homogeneous, True,
|
|
"homogeneous should be True by default")
|
|
self.assertRaises(oracledb.DatabaseError, pool.acquire,
|
|
user="missing_proxyuser")
|
|
pool = test_env.get_pool(min=2, max=8, increment=3,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT,
|
|
homogeneous=False)
|
|
msg = "homogeneous should be False after setting it in the constructor"
|
|
self.assertEqual(pool.homogeneous, False, msg)
|
|
connection = pool.acquire(user=test_env.get_proxy_user())
|
|
cursor = connection.cursor()
|
|
cursor.execute('select user from dual')
|
|
result, = cursor.fetchone()
|
|
self.assertEqual(result, test_env.get_proxy_user().upper())
|
|
connection.close()
|
|
|
|
def test_2403_rollback_on_release(self):
|
|
"2403 - connection rolls back before released back to the pool"
|
|
pool = test_env.get_pool(getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
connection = pool.acquire()
|
|
cursor = connection.cursor()
|
|
cursor.execute("truncate table TestTempTable")
|
|
cursor.execute("insert into TestTempTable (IntCol) values (1)")
|
|
cursor.close()
|
|
pool.release(connection)
|
|
pool = test_env.get_pool(getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
connection = pool.acquire()
|
|
cursor = connection.cursor()
|
|
cursor.execute("select count(*) from TestTempTable")
|
|
count, = cursor.fetchone()
|
|
self.assertEqual(count, 0)
|
|
connection.close()
|
|
|
|
def test_2404_threading(self):
|
|
"2404 - test session pool with multiple threads"
|
|
self.pool = test_env.get_pool(min=5, max=20, increment=2,
|
|
threaded=True,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
threads = []
|
|
for i in range(20):
|
|
thread = threading.Thread(None, self.__connect_and_drop)
|
|
threads.append(thread)
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
def test_2405_threading_with_errors(self):
|
|
"2405 - test session pool with multiple threads (with errors)"
|
|
self.pool = test_env.get_pool(min=5, max=20, increment=2,
|
|
threaded=True,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
threads = []
|
|
for i in range(20):
|
|
thread = threading.Thread(None, self.__connect_and_generate_error)
|
|
threads.append(thread)
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
def test_2406_purity(self):
|
|
"2406 - test session pool with various types of purity"
|
|
pool = test_env.get_pool(min=1, max=8, increment=1,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
|
|
# get connection and set the action
|
|
action = "TEST_ACTION"
|
|
connection = pool.acquire()
|
|
connection.action = action
|
|
cursor = connection.cursor()
|
|
cursor.execute("select 1 from dual")
|
|
cursor.close()
|
|
pool.release(connection)
|
|
self.assertEqual(pool.opened, 1, "opened (1)")
|
|
|
|
# verify that the connection still has the action set on it
|
|
connection = pool.acquire()
|
|
cursor = connection.cursor()
|
|
cursor.execute("select sys_context('userenv', 'action') from dual")
|
|
result, = cursor.fetchone()
|
|
self.assertEqual(result, action)
|
|
cursor.close()
|
|
pool.release(connection)
|
|
self.assertEqual(pool.opened, 1, "opened (2)")
|
|
|
|
# get a new connection with new purity (should not have state)
|
|
connection = pool.acquire(purity=oracledb.ATTR_PURITY_NEW)
|
|
cursor = connection.cursor()
|
|
cursor.execute("select sys_context('userenv', 'action') from dual")
|
|
result, = cursor.fetchone()
|
|
self.assertEqual(result, None)
|
|
cursor.close()
|
|
pool.release(connection)
|
|
|
|
def test_2407_heterogeneous(self):
|
|
"2407 - test heterogeneous pool with user and password specified"
|
|
pool = test_env.get_pool(min=2, max=8, increment=3, homogeneous=False,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
self.assertEqual(pool.homogeneous, 0)
|
|
conn = pool.acquire()
|
|
self.__verify_connection(pool.acquire(), test_env.get_main_user())
|
|
conn.close()
|
|
conn = pool.acquire(test_env.get_main_user(),
|
|
test_env.get_main_password())
|
|
self.__verify_connection(conn, test_env.get_main_user())
|
|
conn.close()
|
|
conn = pool.acquire(test_env.get_proxy_user(),
|
|
test_env.get_proxy_password())
|
|
self.__verify_connection(conn, test_env.get_proxy_user())
|
|
conn.close()
|
|
user_str = "%s[%s]" % \
|
|
(test_env.get_main_user(), test_env.get_proxy_user())
|
|
conn = pool.acquire(user_str, test_env.get_main_password())
|
|
self.__verify_connection(conn, test_env.get_proxy_user(),
|
|
test_env.get_main_user())
|
|
conn.close()
|
|
|
|
def test_2408_heterogenous_without_user(self):
|
|
"2408 - test heterogeneous pool without user and password specified"
|
|
pool = test_env.get_pool(user="", password="", min=2, max=8,
|
|
increment=3,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT,
|
|
homogeneous=False)
|
|
conn = pool.acquire(test_env.get_main_user(),
|
|
test_env.get_main_password())
|
|
self.__verify_connection(conn, test_env.get_main_user())
|
|
conn.close()
|
|
conn = pool.acquire(test_env.get_proxy_user(),
|
|
test_env.get_proxy_password())
|
|
self.__verify_connection(conn, test_env.get_proxy_user())
|
|
conn.close()
|
|
user_str = "%s[%s]" % \
|
|
(test_env.get_main_user(), test_env.get_proxy_user())
|
|
conn = pool.acquire(user_str, test_env.get_main_password())
|
|
self.__verify_connection(conn, test_env.get_proxy_user(),
|
|
test_env.get_main_user())
|
|
|
|
def test_2409_heterogeneous_wrong_password(self):
|
|
"2409 - test heterogeneous pool with wrong password specified"
|
|
pool = test_env.get_pool(min=2, max=8, increment=3,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT,
|
|
homogeneous=False)
|
|
self.assertRaises(oracledb.DatabaseError, pool.acquire,
|
|
test_env.get_proxy_user(),
|
|
"this is the wrong password")
|
|
|
|
def test_2410_tagging_session(self):
|
|
"2410 - test tagging a session"
|
|
pool = test_env.get_pool(min=2, max=8, increment=3,
|
|
getmode=oracledb.SPOOL_ATTRVAL_NOWAIT)
|
|
|
|
tag_mst = "TIME_ZONE=MST"
|
|
tag_utc = "TIME_ZONE=UTC"
|
|
|
|
conn = pool.acquire()
|
|
self.assertEqual(conn.tag, None)
|
|
pool.release(conn, tag=tag_mst)
|
|
|
|
conn = pool.acquire()
|
|
self.assertEqual(conn.tag, None)
|
|
conn.tag = tag_utc
|
|
conn.close()
|
|
|
|
conn = pool.acquire(tag=tag_mst)
|
|
self.assertEqual(conn.tag, tag_mst)
|
|
conn.close()
|
|
|
|
conn = pool.acquire(tag=tag_utc)
|
|
self.assertEqual(conn.tag, tag_utc)
|
|
conn.close()
|
|
|
|
def test_2411_plsql_session_callbacks(self):
|
|
"2411 - test PL/SQL session callbacks"
|
|
if test_env.get_client_version() < (12, 2):
|
|
self.skipTest("PL/SQL session callbacks not supported before 12.2")
|
|
callback = "pkg_SessionCallback.TheCallback"
|
|
pool = test_env.get_pool(min=2, max=8, increment=3,
|
|
getmode=oracledb.SPOOL_ATTRVAL_NOWAIT,
|
|
session_callback=callback)
|
|
tags = [
|
|
"NLS_DATE_FORMAT=SIMPLE",
|
|
"NLS_DATE_FORMAT=FULL;TIME_ZONE=UTC",
|
|
"NLS_DATE_FORMAT=FULL;TIME_ZONE=MST"
|
|
]
|
|
actual_tags = [None, None, "NLS_DATE_FORMAT=FULL;TIME_ZONE=UTC"]
|
|
|
|
# truncate PL/SQL session callback log
|
|
conn = pool.acquire()
|
|
cursor = conn.cursor()
|
|
cursor.execute("truncate table PLSQLSessionCallbacks")
|
|
conn.close()
|
|
|
|
# request sessions with each of the first two tags
|
|
for tag in tags[:2]:
|
|
conn = pool.acquire(tag=tag)
|
|
conn.close()
|
|
|
|
# for the last tag, use the matchanytag flag
|
|
conn = pool.acquire(tag=tags[2], matchanytag=True)
|
|
conn.close()
|
|
|
|
# verify the PL/SQL session callback log is accurate
|
|
conn = pool.acquire()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
select RequestedTag, ActualTag
|
|
from PLSQLSessionCallbacks
|
|
order by FixupTimestamp""")
|
|
results = cursor.fetchall()
|
|
expected_results = list(zip(tags, actual_tags))
|
|
self.assertEqual(results, expected_results)
|
|
conn.close()
|
|
|
|
def test_2412_tagging_invalid_key(self):
|
|
"2412 - testTagging with Invalid key"
|
|
pool = test_env.get_pool(getmode=oracledb.SPOOL_ATTRVAL_NOWAIT)
|
|
conn = pool.acquire()
|
|
self.assertRaises(TypeError, pool.release, conn, tag=12345)
|
|
if test_env.get_client_version() >= (12, 2):
|
|
self.assertRaises(oracledb.DatabaseError, pool.release, conn,
|
|
tag="INVALID_TAG")
|
|
|
|
def test_2413_close_and_drop_connection_from_pool(self):
|
|
"2413 - test dropping/closing a connection from the pool"
|
|
pool = test_env.get_pool(min=1, max=8, increment=1,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
conn = pool.acquire()
|
|
self.assertEqual(pool.busy, 1, "busy (1)")
|
|
self.assertEqual(pool.opened, 1, "opened (1)")
|
|
pool.drop(conn)
|
|
self.assertEqual(pool.busy, 0, "busy (2)")
|
|
self.assertEqual(pool.opened, 0, "opened (2)")
|
|
conn = pool.acquire()
|
|
self.assertEqual(pool.busy, 1, "busy (3)")
|
|
self.assertEqual(pool.opened, 1, "opened (3)")
|
|
conn.close()
|
|
self.assertEqual(pool.busy, 0, "busy (4)")
|
|
self.assertEqual(pool.opened, 1, "opened (4)")
|
|
|
|
def test_2414_create_new_pure_connection(self):
|
|
"2414 - test to ensure pure connections are being created correctly"
|
|
pool = test_env.get_pool(min=1, max=2, increment=1,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
connection_1 = pool.acquire()
|
|
connection_2 = pool.acquire()
|
|
self.assertEqual(pool.opened, 2, "opened (1)")
|
|
pool.release(connection_1)
|
|
pool.release(connection_2)
|
|
connection_3 = pool.acquire(purity=oracledb.ATTR_PURITY_NEW)
|
|
self.assertEqual(pool.opened, 2, "opened (2)")
|
|
pool.release(connection_3)
|
|
|
|
def test_2415_reconfigure_pool(self):
|
|
"2415 - test to ensure reconfigure() updates pool properties"
|
|
pool = test_env.get_pool(min=1, max=2, increment=1,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
self.assertEqual(pool.min, 1, "min (1)")
|
|
self.assertEqual(pool.max, 2, "max (2)")
|
|
self.assertEqual(pool.increment, 1, "increment (1)")
|
|
self.assertEqual(pool.getmode, oracledb.SPOOL_ATTRVAL_WAIT,
|
|
"getmode differs")
|
|
self.assertEqual(pool.timeout, 0, "timeout (0)")
|
|
self.assertEqual(pool.wait_timeout, 5000, "wait_timeout (5000)")
|
|
self.assertEqual(pool.max_lifetime_session, 0,
|
|
"max_lifetime_sessionmeout (0)")
|
|
self.assertEqual(pool.max_sessions_per_shard, 0,
|
|
"max_sessions_per_shard (0)")
|
|
self.assertEqual(pool.stmtcachesize, 20, "stmtcachesize (20)")
|
|
self.assertEqual(pool.ping_interval, 60, "ping_interval (60)")
|
|
|
|
pool.reconfigure(min=2, max=5, increment=2, timeout=30,
|
|
getmode=oracledb.SPOOL_ATTRVAL_TIMEDWAIT,
|
|
wait_timeout=3000, max_lifetime_session=20,
|
|
max_sessions_per_shard=2, stmtcachesize=30,
|
|
ping_interval=30)
|
|
self.assertEqual(pool.min, 2, "min (2)")
|
|
self.assertEqual(pool.max, 5, "max (5)")
|
|
self.assertEqual(pool.increment, 2, "increment (2)")
|
|
self.assertEqual(pool.getmode, oracledb.SPOOL_ATTRVAL_TIMEDWAIT,
|
|
"getmode differs")
|
|
self.assertEqual(pool.timeout, 30, "timeout (30)")
|
|
self.assertEqual(pool.wait_timeout, 3000, "wait_timeout (3000)")
|
|
self.assertEqual(pool.max_lifetime_session, 20,
|
|
"max_lifetime_sessionmeout (20)")
|
|
self.assertEqual(pool.max_sessions_per_shard, 2,
|
|
"max_sessions_per_shard (2)")
|
|
self.assertEqual(pool.stmtcachesize, 30, "stmtcachesize (30)")
|
|
self.assertEqual(pool.ping_interval, 30, "ping_interval (30)")
|
|
|
|
def test_2416_test_reconfigure_pool_with_missing_values(self):
|
|
"2416 - test the reconfigure values are changed and rest unchanged"
|
|
self.__perform_reconfigure_test("min", 5)
|
|
self.__perform_reconfigure_test("max", 20)
|
|
self.__perform_reconfigure_test("increment", 5)
|
|
self.__perform_reconfigure_test("timeout", 10)
|
|
self.__perform_reconfigure_test("wait_timeout", 8000)
|
|
self.__perform_reconfigure_test("stmtcachesize", 40)
|
|
self.__perform_reconfigure_test("max_lifetime_session", 2000)
|
|
self.__perform_reconfigure_test("max_sessions_per_shard", 5)
|
|
self.__perform_reconfigure_test("ping_interval", 50)
|
|
self.__perform_reconfigure_test("getmode",
|
|
oracledb.SPOOL_ATTRVAL_NOWAIT)
|
|
|
|
def test_2417_setting_each_pool_param(self):
|
|
"2417 - test to see if specified parameters are set during creation"
|
|
pool = test_env.get_pool(min=1, max=2, increment=1, timeout=10,
|
|
wait_timeout=10, max_lifetime_session=20,
|
|
max_sessions_per_shard=1, stmtcachesize=25,
|
|
ping_interval=25,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT)
|
|
self.assertEqual(pool.min, 1, "min (1)")
|
|
self.assertEqual(pool.max, 2, "max (2)")
|
|
self.assertEqual(pool.increment, 1, "increment (1)")
|
|
self.assertEqual(pool.getmode, oracledb.SPOOL_ATTRVAL_WAIT,
|
|
"getmode differs")
|
|
self.assertEqual(pool.timeout, 10, "timeout (10)")
|
|
self.assertEqual(pool.wait_timeout, 10, "wait_timeout (10)")
|
|
self.assertEqual(pool.max_lifetime_session, 20,
|
|
"max_lifetime_sessionmeout (20)")
|
|
self.assertEqual(pool.max_sessions_per_shard, 1,
|
|
"max_sessions_per_shard (1)")
|
|
self.assertEqual(pool.stmtcachesize, 25, "stmtcachesize (25)")
|
|
self.assertEqual(pool.ping_interval, 25, "ping_interval (25)")
|
|
|
|
def test_2418_deprecations(self):
|
|
"2418 - test to verify deprecations"
|
|
callback = "pkg_SessionCallback.TheCallback"
|
|
self.assertRaises(oracledb.ProgrammingError, test_env.get_pool,
|
|
min=1, max=2, increment=1, wait_timeout=10,
|
|
waitTimeout=10)
|
|
self.assertRaises(oracledb.ProgrammingError, test_env.get_pool,
|
|
min=1, max=2, increment=1, max_lifetime_session=20,
|
|
maxLifetimeSession=20)
|
|
self.assertRaises(oracledb.ProgrammingError, test_env.get_pool,
|
|
min=1, max=2, increment=1, max_sessions_per_shard=1,
|
|
maxSessionsPerShard=1)
|
|
self.assertRaises(oracledb.ProgrammingError, test_env.get_pool,
|
|
min=2, max=8, increment=3,
|
|
getmode=oracledb.SPOOL_ATTRVAL_NOWAIT,
|
|
session_callback=callback, sessionCallback=callback)
|
|
|
|
def test_2419_statement_cache_size(self):
|
|
"2419 - test to verify statement cache size is retained"
|
|
pool = test_env.get_pool(min=1, max=2, increment=1,
|
|
getmode=oracledb.SPOOL_ATTRVAL_WAIT,
|
|
stmtcachesize=25)
|
|
self.assertEqual(pool.stmtcachesize, 25, "stmtcachesize (25)")
|
|
pool.stmtcachesize = 35
|
|
self.assertEqual(pool.stmtcachesize, 35, "stmtcachesize (35)")
|
|
|
|
def test_2420_callable_session_callbacks(self):
|
|
"2420 - test that session callbacks are being called correctly"
|
|
pool = test_env.get_pool(min=2, max=5, increment=1,
|
|
session_callback=self.__callable_session_callback)
|
|
|
|
# new connection with a tag should invoke the session callback
|
|
with pool.acquire(tag="NLS_DATE_FORMAT=SIMPLE") as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("select to_char(2021-05-20) from dual")
|
|
result, = cursor.fetchone()
|
|
self.assertEqual(self.session_called, True)
|
|
|
|
# acquiring a connection with the same tag should not invoke the
|
|
# session callback
|
|
self.session_called = False
|
|
with pool.acquire(tag="NLS_DATE_FORMAT=SIMPLE") as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("select to_char(2021-05-20) from dual")
|
|
result, = cursor.fetchone()
|
|
self.assertEqual(self.session_called, False)
|
|
|
|
# acquiring a connection with a new tag should invoke the session
|
|
# callback
|
|
self.session_called = False
|
|
with pool.acquire(tag="NLS_DATE_FORMAT=FULL;TIME_ZONE=UTC") as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("select to_char(current_date) from dual")
|
|
result, = cursor.fetchone()
|
|
self.assertEqual(self.session_called, True)
|
|
|
|
# acquiring a connection with a new tag and specifying that a
|
|
# connection with any tag can be acquired should invoke the session
|
|
# callback
|
|
self.session_called = False
|
|
with pool.acquire(tag="NLS_DATE_FORMAT=FULL;TIME_ZONE=MST", \
|
|
matchanytag=True) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("select to_char(current_date) from dual")
|
|
result, = cursor.fetchone()
|
|
self.assertEqual(self.session_called, True)
|
|
|
|
# new session with no tag should invoke the session callback
|
|
self.session_called = False
|
|
with pool.acquire() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("select to_char(current_date) from dual")
|
|
result, = cursor.fetchone()
|
|
self.assertEqual(self.session_called, True)
|
|
|
|
if __name__ == "__main__":
|
|
test_env.run_test_cases()
|