Reworked test suite to eliminate the use of default passwords and to make the

individual test modules directly runnable (instead of using execfile() within
test.py).
This commit is contained in:
Anthony Tuininga 2019-01-31 10:18:04 -07:00
parent 81583c224d
commit 4ed95aad94
34 changed files with 1680 additions and 1386 deletions

View File

@ -1,14 +1,16 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
"""Module for testing AQ objects.""" """Module for testing AQ objects."""
import TestEnv
import cx_Oracle import cx_Oracle
import decimal import decimal
import threading import threading
class TestAQ(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
bookData = [ bookData = [
("Wings of Fire", "A.P.J. Abdul Kalam", ("Wings of Fire", "A.P.J. Abdul Kalam",
decimal.Decimal("15.75")), decimal.Decimal("15.75")),
@ -30,7 +32,7 @@ class TestAQ(BaseTestCase):
pass pass
def __deqInThread(self, results): def __deqInThread(self, results):
connection = self.getConnection() connection = TestEnv.GetConnection()
booksType = connection.gettype("UDT_BOOK") booksType = connection.gettype("UDT_BOOK")
book = booksType.newobject() book = booksType.newobject()
options = connection.deqoptions() options = connection.deqoptions()
@ -175,7 +177,7 @@ class TestAQ(BaseTestCase):
props = self.connection.msgproperties() props = self.connection.msgproperties()
self.connection.enq("BOOKS", enqOptions, props, book) self.connection.enq("BOOKS", enqOptions, props, book)
otherConnection = self.getConnection() otherConnection = TestEnv.GetConnection()
deqOptions = otherConnection.deqoptions() deqOptions = otherConnection.deqoptions()
deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
deqOptions.wait = cx_Oracle.DEQ_NO_WAIT deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
@ -199,7 +201,7 @@ class TestAQ(BaseTestCase):
props = self.connection.msgproperties() props = self.connection.msgproperties()
self.connection.enq("BOOKS", enqOptions, props, book) self.connection.enq("BOOKS", enqOptions, props, book)
otherConnection = self.getConnection() otherConnection = TestEnv.GetConnection()
deqOptions = otherConnection.deqoptions() deqOptions = otherConnection.deqoptions()
deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
deqOptions.visibility = cx_Oracle.DEQ_ON_COMMIT deqOptions.visibility = cx_Oracle.DEQ_ON_COMMIT
@ -224,7 +226,7 @@ class TestAQ(BaseTestCase):
props = self.connection.msgproperties() props = self.connection.msgproperties()
self.connection.enq("BOOKS", enqOptions, props, book) self.connection.enq("BOOKS", enqOptions, props, book)
otherConnection = self.getConnection() otherConnection = TestEnv.GetConnection()
deqOptions = otherConnection.deqoptions() deqOptions = otherConnection.deqoptions()
deqOptions.deliverymode = cx_Oracle.MSG_BUFFERED deqOptions.deliverymode = cx_Oracle.MSG_BUFFERED
deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
@ -250,7 +252,7 @@ class TestAQ(BaseTestCase):
props = self.connection.msgproperties() props = self.connection.msgproperties()
self.connection.enq("BOOKS", enqOptions, props, book) self.connection.enq("BOOKS", enqOptions, props, book)
otherConnection = self.getConnection() otherConnection = TestEnv.GetConnection()
deqOptions = otherConnection.deqoptions() deqOptions = otherConnection.deqoptions()
deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT
deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
@ -276,7 +278,7 @@ class TestAQ(BaseTestCase):
props = self.connection.msgproperties() props = self.connection.msgproperties()
self.connection.enq("BOOKS", enqOptions, props, book) self.connection.enq("BOOKS", enqOptions, props, book)
otherConnection = self.getConnection() otherConnection = TestEnv.GetConnection()
deqOptions = otherConnection.deqoptions() deqOptions = otherConnection.deqoptions()
deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT_OR_BUFFERED deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT_OR_BUFFERED
deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
@ -302,7 +304,7 @@ class TestAQ(BaseTestCase):
props = self.connection.msgproperties() props = self.connection.msgproperties()
self.connection.enq("BOOKS", enqOptions, props, book) self.connection.enq("BOOKS", enqOptions, props, book)
otherConnection = self.getConnection() otherConnection = TestEnv.GetConnection()
deqOptions = otherConnection.deqoptions() deqOptions = otherConnection.deqoptions()
deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT deqOptions.deliverymode = cx_Oracle.MSG_PERSISTENT
deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
@ -326,7 +328,7 @@ class TestAQ(BaseTestCase):
self.connection.enq("BOOKS", enqOptions, props, book) self.connection.enq("BOOKS", enqOptions, props, book)
self.connection.commit() self.connection.commit()
otherConnection = self.getConnection() otherConnection = TestEnv.GetConnection()
deqOptions = otherConnection.deqoptions() deqOptions = otherConnection.deqoptions()
deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE
@ -352,7 +354,7 @@ class TestAQ(BaseTestCase):
self.connection.enq("BOOKS", enqOptions, props, book) self.connection.enq("BOOKS", enqOptions, props, book)
self.connection.commit() self.connection.commit()
otherConnection = self.getConnection() otherConnection = TestEnv.GetConnection()
deqOptions = otherConnection.deqoptions() deqOptions = otherConnection.deqoptions()
deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE deqOptions.visibility = cx_Oracle.DEQ_IMMEDIATE
@ -364,3 +366,6 @@ class TestAQ(BaseTestCase):
otherPrice = book.PRICE otherPrice = book.PRICE
self.assertEqual(otherPrice, expectedPrice) self.assertEqual(otherPrice, expectedPrice)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,7 +9,11 @@
"""Module for testing boolean variables.""" """Module for testing boolean variables."""
class TestBooleanVar(BaseTestCase): import TestEnv
import cx_Oracle
class TestCase(TestEnv.BaseTestCase):
def testBindFalse(self): def testBindFalse(self):
"test binding in a False value" "test binding in a False value"
@ -42,3 +46,6 @@ class TestBooleanVar(BaseTestCase):
(True,)) (True,))
self.assertEqual(result, "TRUE") self.assertEqual(result, "TRUE")
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,15 +9,17 @@
"""Module for testing connections.""" """Module for testing connections."""
import TestEnv
import cx_Oracle
import random import random
import threading import threading
class TestConnection(TestCase): class TestCase(TestEnv.BaseTestCase):
def __ConnectAndDrop(self): def __ConnectAndDrop(self):
"""Connect to the database, perform a query and drop the connection.""" """Connect to the database, perform a query and drop the connection."""
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection(threaded=True)
self.tnsentry, threaded = True)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute(u"select count(*) from TestNumbers") cursor.execute(u"select count(*) from TestNumbers")
count, = cursor.fetchone() count, = cursor.fetchone()
@ -31,21 +33,22 @@ class TestConnection(TestCase):
self.assertEqual(result, value, "%s value mismatch" % attrName) self.assertEqual(result, value, "%s value mismatch" % attrName)
def setUp(self): def setUp(self):
self.username = USERNAME pass
self.password = PASSWORD
self.tnsentry = TNSENTRY def tearDown(self):
pass
def verifyArgs(self, connection): def verifyArgs(self, connection):
self.assertEqual(connection.username, self.username, self.assertEqual(connection.username, TestEnv.GetMainUser(),
"user name differs") "user name differs")
self.assertEqual(connection.tnsentry, self.tnsentry, self.assertEqual(connection.tnsentry, TestEnv.GetConnectString(),
"tnsentry differs") "tnsentry differs")
self.assertEqual(connection.dsn, self.tnsentry, "dsn differs") self.assertEqual(connection.dsn, TestEnv.GetConnectString(),
"dsn differs")
def testAllArgs(self): def testAllArgs(self):
"connection to database with user, password, TNS separate" "connection to database with user, password, TNS separate"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
self.verifyArgs(connection) self.verifyArgs(connection)
def testAppContext(self): def testAppContext(self):
@ -56,8 +59,7 @@ class TestConnection(TestCase):
( namespace, "ATTR2", "VALUE2" ), ( namespace, "ATTR2", "VALUE2" ),
( namespace, "ATTR3", "VALUE3" ) ( namespace, "ATTR3", "VALUE3" )
] ]
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection(appcontext=appContextEntries)
self.tnsentry, appcontext = appContextEntries)
cursor = connection.cursor() cursor = connection.cursor()
for namespace, name, value in appContextEntries: for namespace, name, value in appContextEntries:
cursor.execute("select sys_context(:1, :2) from dual", cursor.execute("select sys_context(:1, :2) from dual",
@ -67,14 +69,14 @@ class TestConnection(TestCase):
def testAppContextNegative(self): def testAppContextNegative(self):
"test invalid use of application context" "test invalid use of application context"
self.assertRaises(TypeError, cx_Oracle.connect, self.username, self.assertRaises(TypeError, cx_Oracle.connect, TestEnv.GetMainUser(),
self.password, self.tnsentry, TestEnv.GetMainPassword(), TestEnv.GetConnectString(),
appcontext = [('userenv', 'action')]) appcontext=[('userenv', 'action')])
def testAttributes(self): def testAttributes(self):
"test connection end-to-end tracing attributes" "test connection end-to-end tracing attributes"
connection = cx_Oracle.connect(USERNAME, PASSWORD, TNSENTRY) connection = TestEnv.GetConnection()
if CLIENT_VERSION >= (12, 1): if TestEnv.GetClientVersion() >= (12, 1):
self.__VerifyAttributes(connection, "dbop", "cx_OracleTest_DBOP", self.__VerifyAttributes(connection, "dbop", "cx_OracleTest_DBOP",
"select dbop_name from v$sql_monitor " "select dbop_name from v$sql_monitor "
"where sid = sys_context('userenv', 'sid')" "where sid = sys_context('userenv', 'sid')"
@ -92,9 +94,9 @@ class TestConnection(TestCase):
def testAutoCommit(self): def testAutoCommit(self):
"test use of autocommit" "test use of autocommit"
connection = cx_Oracle.connect(USERNAME, PASSWORD, TNSENTRY) connection = TestEnv.GetConnection()
cursor = connection.cursor() cursor = connection.cursor()
otherConnection = cx_Oracle.connect(USERNAME, PASSWORD, TNSENTRY) otherConnection = TestEnv.GetConnection()
otherCursor = otherConnection.cursor() otherCursor = otherConnection.cursor()
cursor.execute("truncate table TestTempTable") cursor.execute("truncate table TestTempTable")
cursor.execute("insert into TestTempTable (IntCol) values (1)") cursor.execute("insert into TestTempTable (IntCol) values (1)")
@ -110,53 +112,57 @@ class TestConnection(TestCase):
def testBadConnectString(self): def testBadConnectString(self):
"connection to database with bad connect string" "connection to database with bad connect string"
self.assertRaises(cx_Oracle.DatabaseError, cx_Oracle.connect, self.assertRaises(cx_Oracle.DatabaseError, cx_Oracle.connect,
self.username) TestEnv.GetMainUser())
self.assertRaises(cx_Oracle.DatabaseError, cx_Oracle.connect, self.assertRaises(cx_Oracle.DatabaseError, cx_Oracle.connect,
self.username + u"@" + self.tnsentry) TestEnv.GetMainUser() + u"@" + TestEnv.GetConnectString())
self.assertRaises(cx_Oracle.DatabaseError, cx_Oracle.connect, self.assertRaises(cx_Oracle.DatabaseError, cx_Oracle.connect,
self.username + "@" + self.tnsentry + "/" + self.password) TestEnv.GetMainUser() + "@" + \
TestEnv.GetConnectString() + "/" + TestEnv.GetMainPassword())
def testBadPassword(self): def testBadPassword(self):
"connection to database with bad password" "connection to database with bad password"
self.assertRaises(cx_Oracle.DatabaseError, cx_Oracle.connect, self.assertRaises(cx_Oracle.DatabaseError, cx_Oracle.connect,
self.username, self.password + u"X", self.tnsentry) TestEnv.GetMainUser(), TestEnv.GetMainPassword() + "X",
TestEnv.GetConnectString())
def testChangePassword(self): def testChangePassword(self):
"test changing password" "test changing password"
newPassword = "NEW_PASSWORD" newPassword = "NEW_PASSWORD"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry) connection.changepassword(TestEnv.GetMainPassword(), newPassword)
connection.changepassword(self.password, newPassword) cconnection = cx_Oracle.connect(TestEnv.GetMainUser(), newPassword,
cconnection = cx_Oracle.connect(self.username, newPassword, TestEnv.GetConnectString())
self.tnsentry) connection.changepassword(newPassword, TestEnv.GetMainPassword())
connection.changepassword(newPassword, self.password)
def testChangePasswordNegative(self): def testChangePasswordNegative(self):
"test changing password to an invalid value" "test changing password to an invalid value"
newPassword = "1" * 150 newPassword = "1" * 150
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
self.assertRaises(cx_Oracle.DatabaseError, connection.changepassword, self.assertRaises(cx_Oracle.DatabaseError, connection.changepassword,
self.password, newPassword) TestEnv.GetMainPassword(), newPassword)
def testEncodings(self): def testEncodings(self):
"connection with only encoding or nencoding specified should work" "connection with only encoding or nencoding specified should work"
connection = cx_Oracle.connect(self.username, self.password, connection = cx_Oracle.connect(TestEnv.GetMainUser(),
self.tnsentry) TestEnv.GetMainPassword(), TestEnv.GetConnectString())
encoding = connection.encoding encoding = connection.encoding
nencoding = connection.nencoding nencoding = connection.nencoding
connection = cx_Oracle.connect(self.username, self.password, altEncoding = "ISO-8859-1"
self.tnsentry, encoding = "UTF-8") connection = cx_Oracle.connect(TestEnv.GetMainUser(),
self.assertEqual(connection.encoding, "UTF-8") TestEnv.GetMainPassword(), TestEnv.GetConnectString(),
encoding=altEncoding)
self.assertEqual(connection.encoding, altEncoding)
self.assertEqual(connection.nencoding, nencoding) self.assertEqual(connection.nencoding, nencoding)
connection = cx_Oracle.connect(self.username, self.password, connection = cx_Oracle.connect(TestEnv.GetMainUser(),
self.tnsentry, nencoding = "UTF-8") TestEnv.GetMainPassword(), TestEnv.GetConnectString(),
nencoding=altEncoding)
self.assertEqual(connection.encoding, encoding) self.assertEqual(connection.encoding, encoding)
self.assertEqual(connection.nencoding, "UTF-8") self.assertEqual(connection.nencoding, altEncoding)
def testDifferentEncodings(self): def testDifferentEncodings(self):
connection = cx_Oracle.connect(self.username, self.password, connection = cx_Oracle.connect(TestEnv.GetMainUser(),
self.tnsentry, encoding = "UTF-8", nencoding = "UTF-16") TestEnv.GetMainPassword(), TestEnv.GetConnectString(),
encoding="UTF-8", nencoding="UTF-16")
value = u"\u03b4\u4e2a" value = u"\u03b4\u4e2a"
cursor = connection.cursor() cursor = connection.cursor()
ncharVar = cursor.var(cx_Oracle.NCHAR, 100) ncharVar = cursor.var(cx_Oracle.NCHAR, 100)
@ -167,15 +173,13 @@ class TestConnection(TestCase):
def testExceptionOnClose(self): def testExceptionOnClose(self):
"confirm an exception is raised after closing a connection" "confirm an exception is raised after closing a connection"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
connection.close() connection.close()
self.assertRaises(cx_Oracle.DatabaseError, connection.rollback) self.assertRaises(cx_Oracle.DatabaseError, connection.rollback)
def testConnectWithHandle(self): def testConnectWithHandle(self):
"test creating a connection using a handle" "test creating a connection using a handle"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("truncate table TestTempTable") cursor.execute("truncate table TestTempTable")
intValue = random.randint(1, 32768) intValue = random.randint(1, 32768)
@ -206,25 +210,23 @@ class TestConnection(TestCase):
self.assertEqual(result, formatString % args) self.assertEqual(result, formatString % args)
def testSingleArg(self): def testSingleArg(self):
"connection to database with user, password, TNS together" "connection to database with user, password, DSN together"
connection = cx_Oracle.connect("%s/%s@%s" % \ connection = cx_Oracle.connect("%s/%s@%s" % \
(self.username, self.password, self.tnsentry)) (TestEnv.GetMainUser(), TestEnv.GetMainPassword(),
TestEnv.GetConnectString()))
self.verifyArgs(connection) self.verifyArgs(connection)
def testVersion(self): def testVersion(self):
"connection version is a string" "connection version is a string"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
self.assertTrue(isinstance(connection.version, str)) self.assertTrue(isinstance(connection.version, str))
def testRollbackOnClose(self): def testRollbackOnClose(self):
"connection rolls back before close" "connection rolls back before close"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("truncate table TestTempTable") cursor.execute("truncate table TestTempTable")
otherConnection = cx_Oracle.connect(self.username, self.password, otherConnection = TestEnv.GetConnection()
self.tnsentry)
otherCursor = otherConnection.cursor() otherCursor = otherConnection.cursor()
otherCursor.execute("insert into TestTempTable (IntCol) values (1)") otherCursor.execute("insert into TestTempTable (IntCol) values (1)")
otherCursor.close() otherCursor.close()
@ -235,12 +237,10 @@ class TestConnection(TestCase):
def testRollbackOnDel(self): def testRollbackOnDel(self):
"connection rolls back before destruction" "connection rolls back before destruction"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("truncate table TestTempTable") cursor.execute("truncate table TestTempTable")
otherConnection = cx_Oracle.connect(self.username, self.password, otherConnection = TestEnv.GetConnection()
self.tnsentry)
otherCursor = otherConnection.cursor() otherCursor = otherConnection.cursor()
otherCursor.execute("insert into TestTempTable (IntCol) values (1)") otherCursor.execute("insert into TestTempTable (IntCol) values (1)")
del otherCursor del otherCursor
@ -261,16 +261,14 @@ class TestConnection(TestCase):
def testStringFormat(self): def testStringFormat(self):
"test string format of connection" "test string format of connection"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
expectedValue = "<cx_Oracle.Connection to %s@%s>" % \ expectedValue = "<cx_Oracle.Connection to %s@%s>" % \
(self.username, self.tnsentry) (TestEnv.GetMainUser(), TestEnv.GetConnectString())
self.assertEqual(str(connection), expectedValue) self.assertEqual(str(connection), expectedValue)
def testCtxMgrClose(self): def testCtxMgrClose(self):
"test context manager - close" "test context manager - close"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
with connection: with connection:
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("truncate table TestTempTable") cursor.execute("truncate table TestTempTable")
@ -278,8 +276,7 @@ class TestConnection(TestCase):
connection.commit() connection.commit()
cursor.execute("insert into TestTempTable (IntCol) values (2)") cursor.execute("insert into TestTempTable (IntCol) values (2)")
self.assertRaises(cx_Oracle.DatabaseError, connection.ping) self.assertRaises(cx_Oracle.DatabaseError, connection.ping)
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("select count(*) from TestTempTable") cursor.execute("select count(*) from TestTempTable")
count, = cursor.fetchone() count, = cursor.fetchone()
@ -287,13 +284,15 @@ class TestConnection(TestCase):
def testConnectionAttributes(self): def testConnectionAttributes(self):
"test connection attribute values" "test connection attribute values"
connection = cx_Oracle.connect(self.username, self.password, connection = cx_Oracle.connect(TestEnv.GetMainUser(),
self.tnsentry, encoding = "ASCII") TestEnv.GetMainPassword(), TestEnv.GetConnectString(),
encoding="ASCII")
self.assertEqual(connection.maxBytesPerCharacter, 1) self.assertEqual(connection.maxBytesPerCharacter, 1)
connection = cx_Oracle.connect(self.username, self.password, connection = cx_Oracle.connect(TestEnv.GetMainUser(),
self.tnsentry, encoding = "UTF-8") TestEnv.GetMainPassword(), TestEnv.GetConnectString(),
encoding="UTF-8")
self.assertEqual(connection.maxBytesPerCharacter, 4) self.assertEqual(connection.maxBytesPerCharacter, 4)
if CLIENT_VERSION >= (12, 1): if TestEnv.GetClientVersion() >= (12, 1):
self.assertEqual(connection.ltxid, b'') self.assertEqual(connection.ltxid, b'')
self.assertEqual(connection.current_schema, None) self.assertEqual(connection.current_schema, None)
connection.current_schema = "test_schema" connection.current_schema = "test_schema"
@ -310,12 +309,11 @@ class TestConnection(TestCase):
def testClosedConnectionAttributes(self): def testClosedConnectionAttributes(self):
"test closed connection attribute values" "test closed connection attribute values"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
connection.close() connection.close()
attrNames = ["current_schema", "edition", "external_name", attrNames = ["current_schema", "edition", "external_name",
"internal_name", "stmtcachesize"] "internal_name", "stmtcachesize"]
if CLIENT_VERSION >= (12, 1): if TestEnv.GetClientVersion() >= (12, 1):
attrNames.append("ltxid") attrNames.append("ltxid")
for name in attrNames: for name in attrNames:
self.assertRaises(cx_Oracle.DatabaseError, getattr, connection, self.assertRaises(cx_Oracle.DatabaseError, getattr, connection,
@ -323,14 +321,12 @@ class TestConnection(TestCase):
def testPing(self): def testPing(self):
"test connection ping" "test connection ping"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
connection.ping() connection.ping()
def testTransactionBegin(self): def testTransactionBegin(self):
"test begin, prepare, cancel transaction" "test begin, prepare, cancel transaction"
connection = cx_Oracle.connect(self.username, self.password, connection = TestEnv.GetConnection()
self.tnsentry)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("truncate table TestTempTable") cursor.execute("truncate table TestTempTable")
connection.begin(10, 'trxnId', 'branchId') connection.begin(10, 'trxnId', 'branchId')
@ -346,3 +342,6 @@ class TestConnection(TestCase):
count, = cursor.fetchone() count, = cursor.fetchone()
self.assertEqual(count, 0) self.assertEqual(count, 0)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -9,11 +9,13 @@
"""Module for testing cursor objects.""" """Module for testing cursor objects."""
import TestEnv
import cx_Oracle import cx_Oracle
import decimal import decimal
import sys import sys
class TestCursor(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def testCreateScrollableCursor(self): def testCreateScrollableCursor(self):
"""test creating a scrollable cursor""" """test creating a scrollable cursor"""
@ -567,7 +569,8 @@ class TestCursor(BaseTestCase):
def testStringFormat(self): def testStringFormat(self):
"""test string format of cursor""" """test string format of cursor"""
formatString = "<cx_Oracle.Cursor on <cx_Oracle.Connection to %s@%s>>" formatString = "<cx_Oracle.Cursor on <cx_Oracle.Connection to %s@%s>>"
expectedValue = formatString % (USERNAME, TNSENTRY) expectedValue = formatString % \
(TestEnv.GetMainUser(), TestEnv.GetConnectString())
self.assertEqual(str(self.cursor), expectedValue) self.assertEqual(str(self.cursor), expectedValue)
def testCursorFetchRaw(self): def testCursorFetchRaw(self):
@ -668,3 +671,6 @@ class TestCursor(BaseTestCase):
result, = self.cursor.fetchone() result, = self.cursor.fetchone()
self.assertEqual(result, expectedResult) self.assertEqual(result, expectedResult)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,9 +9,12 @@
"""Module for testing cursor variables.""" """Module for testing cursor variables."""
import TestEnv
import cx_Oracle
import sys import sys
class TestCursorVar(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def testBindCursor(self): def testBindCursor(self):
"test binding in a cursor" "test binding in a cursor"
@ -23,8 +26,8 @@ class TestCursorVar(BaseTestCase):
end;""", end;""",
cursor = cursor) cursor = cursor)
self.assertEqual(cursor.description, self.assertEqual(cursor.description,
[ ('STRINGVALUE', cx_Oracle.FIXED_CHAR, 1, CS_RATIO, None, [ ('STRINGVALUE', cx_Oracle.FIXED_CHAR, 1,
None, 1) ]) TestEnv.GetCharSetRatio(), None, None, 1) ])
self.assertEqual(cursor.fetchall(), [('X',)]) self.assertEqual(cursor.fetchall(), [('X',)])
def testBindCursorInPackage(self): def testBindCursorInPackage(self):
@ -34,8 +37,8 @@ class TestCursorVar(BaseTestCase):
self.cursor.callproc("pkg_TestRefCursors.TestOutCursor", (2, cursor)) self.cursor.callproc("pkg_TestRefCursors.TestOutCursor", (2, cursor))
self.assertEqual(cursor.description, self.assertEqual(cursor.description,
[ ('INTCOL', cx_Oracle.NUMBER, 10, None, 9, 0, 0), [ ('INTCOL', cx_Oracle.NUMBER, 10, None, 9, 0, 0),
('STRINGCOL', cx_Oracle.STRING, 20, 20 * CS_RATIO, None, ('STRINGCOL', cx_Oracle.STRING, 20, 20 *
None, 0) ]) TestEnv.GetCharSetRatio(), None, None, 0) ])
self.assertEqual(cursor.fetchall(), self.assertEqual(cursor.fetchall(),
[ (1, 'String 1'), (2, 'String 2') ]) [ (1, 'String 1'), (2, 'String 2') ])
@ -85,3 +88,6 @@ class TestCursorVar(BaseTestCase):
self.assertEqual(number, i) self.assertEqual(number, i)
self.assertEqual(cursor.fetchall(), [(i + 1,)]) self.assertEqual(cursor.fetchall(), [(i + 1,)])
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,12 +1,14 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
"""Module for testing DML returning clauses.""" """Module for testing DML returning clauses."""
import sys import TestEnv
class TestDMLReturning(BaseTestCase): import cx_Oracle
class TestCase(TestEnv.BaseTestCase):
def testInsert(self): def testInsert(self):
"test insert statement (single row) with DML returning" "test insert statement (single row) with DML returning"
@ -255,3 +257,6 @@ class TestDMLReturning(BaseTestCase):
self.cursor.execute(None, [4, intVar]) self.cursor.execute(None, [4, intVar])
self.assertEqual(intVar.getvalue(), []) self.assertEqual(intVar.getvalue(), [])
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,13 +9,16 @@
"""Module for testing date/time variables.""" """Module for testing date/time variables."""
import TestEnv
import cx_Oracle
import datetime import datetime
import time import time
class TestDateTimeVar(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def setUp(self): def setUp(self):
BaseTestCase.setUp(self) TestEnv.BaseTestCase.setUp(self)
self.rawData = [] self.rawData = []
self.dataByKey = {} self.dataByKey = {}
for i in range(1, 11): for i in range(1, 11):
@ -243,3 +246,6 @@ class TestDateTimeVar(BaseTestCase):
self.assertEqual(self.cursor.fetchone(), self.dataByKey[4]) self.assertEqual(self.cursor.fetchone(), self.dataByKey[4])
self.assertEqual(self.cursor.fetchone(), None) self.assertEqual(self.cursor.fetchone(), None)
if __name__ == "__main__":
TestEnv.RunTestCases()

27
test/DropTest.py Normal file
View File

@ -0,0 +1,27 @@
#------------------------------------------------------------------------------
# Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# DropTest.py
#
# Drops the database objects used for the cx_Oracle test suite.
#------------------------------------------------------------------------------
from __future__ import print_function
import cx_Oracle
import TestEnv
def DropTests(conn):
print("Dropping test schemas...")
TestEnv.RunSqlScript(conn, "DropTest",
main_user = TestEnv.GetMainUser(),
proxy_user = TestEnv.GetProxyUser())
if __name__ == "__main__":
conn = cx_Oracle.connect(TestEnv.GetSysdbaConnectString(),
mode = cx_Oracle.SYSDBA)
DropTests(conn)
print("Done.")

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,10 +9,12 @@
"""Module for testing error objects.""" """Module for testing error objects."""
import TestEnv
import cx_Oracle import cx_Oracle
import pickle import pickle
class TestError(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def testPickleError(self): def testPickleError(self):
"test picking/unpickling an error object" "test picking/unpickling an error object"
@ -38,3 +40,6 @@ class TestError(BaseTestCase):
self.assertTrue(newErrorObj.context == errorObj.context) self.assertTrue(newErrorObj.context == errorObj.context)
self.assertTrue(newErrorObj.isrecoverable == errorObj.isrecoverable) self.assertTrue(newErrorObj.isrecoverable == errorObj.isrecoverable)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,13 +9,16 @@
"""Module for testing features introduced in 12.1""" """Module for testing features introduced in 12.1"""
import TestEnv
import cx_Oracle
import datetime import datetime
import sys # import sys
if sys.version_info > (3,): # if sys.version_info > (3,):
long = int # long = int
class TestFeatures12_1(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def testArrayDMLRowCountsOff(self): def testArrayDMLRowCountsOff(self):
"test executing with arraydmlrowcounts mode disabled" "test executing with arraydmlrowcounts mode disabled"
@ -44,8 +47,7 @@ class TestFeatures12_1(BaseTestCase):
"values (:1,:2,:3)" "values (:1,:2,:3)"
self.cursor.executemany(sql, rows, arraydmlrowcounts = True) self.cursor.executemany(sql, rows, arraydmlrowcounts = True)
self.connection.commit() self.connection.commit()
self.assertEqual(self.cursor.getarraydmlrowcounts(), self.assertEqual(self.cursor.getarraydmlrowcounts(), [1, 1, 1, 1, 1])
[long(1), long(1), long(1), long(1), long(1)])
self.cursor.execute("select count(*) from TestArrayDML") self.cursor.execute("select count(*) from TestArrayDML")
count, = self.cursor.fetchone() count, = self.cursor.fetchone()
self.assertEqual(count, len(rows)) self.assertEqual(count, len(rows))
@ -265,8 +267,7 @@ class TestFeatures12_1(BaseTestCase):
sql = "insert into TestArrayDML (IntCol,StringCol) values (:1,:2)" sql = "insert into TestArrayDML (IntCol,StringCol) values (:1,:2)"
self.assertRaises(cx_Oracle.DatabaseError, self.cursor.executemany, self.assertRaises(cx_Oracle.DatabaseError, self.cursor.executemany,
sql, rows, arraydmlrowcounts = True) sql, rows, arraydmlrowcounts = True)
self.assertEqual(self.cursor.getarraydmlrowcounts(), self.assertEqual(self.cursor.getarraydmlrowcounts(), [1, 1])
[long(1), long(1)])
def testExecutingDelete(self): def testExecutingDelete(self):
"test executing delete statement with arraydmlrowcount mode" "test executing delete statement with arraydmlrowcount mode"
@ -285,8 +286,7 @@ class TestFeatures12_1(BaseTestCase):
rows = [ (200,), (300,), (400,) ] rows = [ (200,), (300,), (400,) ]
statement = "delete from TestArrayDML where IntCol2 = :1" statement = "delete from TestArrayDML where IntCol2 = :1"
self.cursor.executemany(statement, rows, arraydmlrowcounts = True) self.cursor.executemany(statement, rows, arraydmlrowcounts = True)
self.assertEqual(self.cursor.getarraydmlrowcounts(), self.assertEqual(self.cursor.getarraydmlrowcounts(), [1, 3, 2])
[long(1), long(3), long(2)])
def testExecutingUpdate(self): def testExecutingUpdate(self):
"test executing update statement with arraydmlrowcount mode" "test executing update statement with arraydmlrowcount mode"
@ -308,8 +308,7 @@ class TestFeatures12_1(BaseTestCase):
("Four", 400) ] ("Four", 400) ]
sql = "update TestArrayDML set StringCol = :1 where IntCol2 = :2" sql = "update TestArrayDML set StringCol = :1 where IntCol2 = :2"
self.cursor.executemany(sql, rows, arraydmlrowcounts = True) self.cursor.executemany(sql, rows, arraydmlrowcounts = True)
self.assertEqual(self.cursor.getarraydmlrowcounts(), self.assertEqual(self.cursor.getarraydmlrowcounts(), [1, 1, 3, 2])
[long(1), long(1), long(3), long(2)])
def testImplicitResults(self): def testImplicitResults(self):
"test getimplicitresults() returns the correct data" "test getimplicitresults() returns the correct data"
@ -356,17 +355,17 @@ class TestFeatures12_1(BaseTestCase):
"values (:1, :2, :3)" "values (:1, :2, :3)"
self.cursor.executemany(sql, rows, batcherrors = True, self.cursor.executemany(sql, rows, batcherrors = True,
arraydmlrowcounts = True) arraydmlrowcounts = True)
user = TestEnv.GetMainUser()
expectedErrors = [ expectedErrors = [
( 4, 1438, "ORA-01438: value larger than specified " \ ( 4, 1438, "ORA-01438: value larger than specified " \
"precision allowed for this column" ), "precision allowed for this column" ),
( 2, 1, "ORA-00001: unique constraint " \ ( 2, 1, "ORA-00001: unique constraint " \
"(CX_ORACLE.TESTARRAYDML_PK) violated") "(%s.TESTARRAYDML_PK) violated" % user.upper())
] ]
actualErrors = [(e.offset, e.code, e.message) \ actualErrors = [(e.offset, e.code, e.message) \
for e in self.cursor.getbatcherrors()] for e in self.cursor.getbatcherrors()]
self.assertEqual(actualErrors, expectedErrors) self.assertEqual(actualErrors, expectedErrors)
self.assertEqual(self.cursor.getarraydmlrowcounts(), self.assertEqual(self.cursor.getarraydmlrowcounts(), [1, 1, 0, 1, 0])
[long(1), long(1), long(0), long(1), long(0)])
def testBatchErrorFalse(self): def testBatchErrorFalse(self):
"test batcherrors mode set to False" "test batcherrors mode set to False"
@ -393,9 +392,10 @@ class TestFeatures12_1(BaseTestCase):
sql = "insert into TestArrayDML (IntCol, StringCol, IntCol2) " \ sql = "insert into TestArrayDML (IntCol, StringCol, IntCol2) " \
"values (:1, :2, :3)" "values (:1, :2, :3)"
self.cursor.executemany(sql, rows, batcherrors = True) self.cursor.executemany(sql, rows, batcherrors = True)
user = TestEnv.GetMainUser()
expectedErrors = [ expectedErrors = [
( 6, 1, "ORA-00001: unique constraint " \ ( 6, 1, "ORA-00001: unique constraint " \
"(CX_ORACLE.TESTARRAYDML_PK) violated") "(%s.TESTARRAYDML_PK) violated" % user.upper())
] ]
actualErrors = [(e.offset, e.code, e.message) \ actualErrors = [(e.offset, e.code, e.message) \
for e in self.cursor.getbatcherrors()] for e in self.cursor.getbatcherrors()]
@ -416,6 +416,9 @@ class TestFeatures12_1(BaseTestCase):
for e in self.cursor.getbatcherrors()] for e in self.cursor.getbatcherrors()]
self.assertEqual(actualErrors, expectedErrors) self.assertEqual(actualErrors, expectedErrors)
self.assertEqual(self.cursor.getarraydmlrowcounts(), self.assertEqual(self.cursor.getarraydmlrowcounts(),
[long(1), long(2), long(0), long(0), long(1)]) [1, 2, 0, 0, 1])
self.assertEqual(self.cursor.rowcount, 4) self.assertEqual(self.cursor.rowcount, 4)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,12 +9,15 @@
"""Module for testing interval variables.""" """Module for testing interval variables."""
import TestEnv
import cx_Oracle
import datetime import datetime
class TestIntervalVar(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def setUp(self): def setUp(self):
BaseTestCase.setUp(self) TestEnv.BaseTestCase.setUp(self)
self.rawData = [] self.rawData = []
self.dataByKey = {} self.dataByKey = {}
for i in range(1, 11): for i in range(1, 11):
@ -140,3 +143,6 @@ class TestIntervalVar(BaseTestCase):
self.assertEqual(self.cursor.fetchone(), self.dataByKey[4]) self.assertEqual(self.cursor.fetchone(), self.dataByKey[4])
self.assertEqual(self.cursor.fetchone(), None) self.assertEqual(self.cursor.fetchone(), None)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,9 +9,12 @@
"""Module for testing LOB (CLOB and BLOB) variables.""" """Module for testing LOB (CLOB and BLOB) variables."""
import TestEnv
import cx_Oracle
import sys import sys
class TestLobVar(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def __GetTempLobs(self, sid): def __GetTempLobs(self, sid):
cursor = self.connection.cursor() cursor = self.connection.cursor()
@ -221,7 +224,8 @@ class TestLobVar(BaseTestCase):
def testNCLOBDifferentEncodings(self): def testNCLOBDifferentEncodings(self):
"test binding and fetching NCLOB data (different encodings)" "test binding and fetching NCLOB data (different encodings)"
connection = cx_Oracle.connect(USERNAME, PASSWORD, TNSENTRY, connection = cx_Oracle.connect(TestEnv.GetMainUser(),
TestEnv.GetMainPassword(), TestEnv.GetConnectString(),
encoding = "UTF-8", nencoding = "UTF-16") encoding = "UTF-8", nencoding = "UTF-16")
value = u"\u03b4\u4e2a" value = u"\u03b4\u4e2a"
cursor = connection.cursor() cursor = connection.cursor()
@ -271,3 +275,6 @@ class TestLobVar(BaseTestCase):
nclobVar = self.cursor.var(cx_Oracle.NCLOB) nclobVar = self.cursor.var(cx_Oracle.NCLOB)
self.assertRaises(IndexError, nclobVar.setvalue, 1, "test char") self.assertRaises(IndexError, nclobVar.setvalue, 1, "test char")
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,9 +9,12 @@
"""Module for testing long and long raw variables.""" """Module for testing long and long raw variables."""
import TestEnv
import cx_Oracle
import sys import sys
class TestLongVar(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def __PerformTest(self, a_Type, a_InputType): def __PerformTest(self, a_Type, a_InputType):
self.cursor.execute("truncate table Test%ss" % a_Type) self.cursor.execute("truncate table Test%ss" % a_Type)
@ -98,3 +101,6 @@ class TestLongVar(BaseTestCase):
self.assertRaises(cx_Oracle.DatabaseError, self.cursor.execute, self.assertRaises(cx_Oracle.DatabaseError, self.cursor.execute,
"select * from TestLongRaws") "select * from TestLongRaws")
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,13 +1,16 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
"""Module for testing module methods.""" """Module for testing module methods."""
import TestEnv
import cx_Oracle
import datetime import datetime
import time import time
class TestModule(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def testDateFromTicks(self): def testDateFromTicks(self):
"test DateFromTicks()" "test DateFromTicks()"
@ -36,3 +39,6 @@ class TestModule(BaseTestCase):
self.assertRaises(cx_Oracle.NotSupportedError, cx_Oracle.TimeFromTicks, self.assertRaises(cx_Oracle.NotSupportedError, cx_Oracle.TimeFromTicks,
100) 100)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,10 +9,14 @@
"""Module for testing NCHAR variables.""" """Module for testing NCHAR variables."""
class TestNCharVar(BaseTestCase): import TestEnv
import cx_Oracle
class TestCase(TestEnv.BaseTestCase):
def setUp(self): def setUp(self):
BaseTestCase.setUp(self) TestEnv.BaseTestCase.setUp(self)
self.rawData = [] self.rawData = []
self.dataByKey = {} self.dataByKey = {}
for i in range(1, 11): for i in range(1, 11):
@ -234,3 +238,6 @@ class TestNCharVar(BaseTestCase):
self.assertEqual(self.cursor.fetchone(), self.dataByKey[4]) self.assertEqual(self.cursor.fetchone(), self.dataByKey[4])
self.assertEqual(self.cursor.fetchone(), None) self.assertEqual(self.cursor.fetchone(), None)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -9,14 +9,13 @@
"""Module for testing number variables.""" """Module for testing number variables."""
import TestEnv
import cx_Oracle import cx_Oracle
import decimal import decimal
import sys import sys
if sys.version_info > (3,): class TestCase(TestEnv.BaseTestCase):
long = int
class TestNumberVar(BaseTestCase):
def outputTypeHandlerNativeInt(self, cursor, name, defaultType, size, def outputTypeHandlerNativeInt(self, cursor, name, defaultType, size,
precision, scale): precision, scale):
@ -29,7 +28,7 @@ class TestNumberVar(BaseTestCase):
arraysize = cursor.arraysize) arraysize = cursor.arraysize)
def setUp(self): def setUp(self):
BaseTestCase.setUp(self) TestEnv.BaseTestCase.setUp(self)
self.rawData = [] self.rawData = []
self.dataByKey = {} self.dataByKey = {}
for i in range(1, 11): for i in range(1, 11):
@ -37,10 +36,10 @@ class TestNumberVar(BaseTestCase):
floatCol = i + i * 0.75 floatCol = i + i * 0.75
unconstrainedCol = i ** 3 + i * 0.5 unconstrainedCol = i ** 3 + i * 0.5
if i % 2: if i % 2:
nullableCol = long(143) ** i nullableCol = 143 ** i
else: else:
nullableCol = None nullableCol = None
dataTuple = (i, long(38) ** i, numberCol, floatCol, dataTuple = (i, 38 ** i, numberCol, floatCol,
unconstrainedCol, nullableCol) unconstrainedCol, nullableCol)
self.rawData.append(dataTuple) self.rawData.append(dataTuple)
self.dataByKey[i] = dataTuple self.dataByKey[i] = dataTuple
@ -78,14 +77,6 @@ class TestNumberVar(BaseTestCase):
value = 2) value = 2)
self.assertEqual(self.cursor.fetchall(), [self.dataByKey[2]]) self.assertEqual(self.cursor.fetchall(), [self.dataByKey[2]])
def testBindSmallLong(self):
"test binding in a small long integer"
self.cursor.execute("""
select * from TestNumbers
where IntCol = :value""",
value = long(3))
self.assertEqual(self.cursor.fetchall(), [self.dataByKey[3]])
def testBindLargeLongAsOracleNumber(self): def testBindLargeLongAsOracleNumber(self):
"test binding in a large long integer as Oracle number" "test binding in a large long integer as Oracle number"
valueVar = self.cursor.var(cx_Oracle.NUMBER) valueVar = self.cursor.var(cx_Oracle.NUMBER)
@ -309,7 +300,7 @@ class TestNumberVar(BaseTestCase):
from TestNumbers from TestNumbers
where IntCol = 9""") where IntCol = 9""")
col, = self.cursor.fetchone() col, = self.cursor.fetchone()
self.assertTrue(isinstance(col, long), "long integer not returned") self.assertEqual(col, 25004854810776297743)
def testReturnConstantFloat(self): def testReturnConstantFloat(self):
"test that fetching a floating point number returns such in Python" "test that fetching a floating point number returns such in Python"
@ -348,8 +339,10 @@ class TestNumberVar(BaseTestCase):
invalidErr, noRepErr, noRepErr, invalidErr, invalidErr, invalidErr, noRepErr, noRepErr, invalidErr, invalidErr,
invalidErr] invalidErr]
for inValue, error in zip(inValues, expectedErrors): for inValue, error in zip(inValues, expectedErrors):
self.assertRaisesRegexp(cx_Oracle.DatabaseError, error, method = self.assertRaisesRegex if sys.version_info[0] == 3 \
self.cursor.execute, "select :1 from dual", (inValue,)) else self.assertRaisesRegexp
method(cx_Oracle.DatabaseError, error, self.cursor.execute,
"select :1 from dual", (inValue,))
def testReturnFloatFromDivision(self): def testReturnFloatFromDivision(self):
"test that fetching the result of division returns a float" "test that fetching the result of division returns a float"
@ -392,3 +385,6 @@ class TestNumberVar(BaseTestCase):
fetchedValue, = self.cursor.fetchone() fetchedValue, = self.cursor.fetchone()
self.assertEqual(value, fetchedValue) self.assertEqual(value, fetchedValue)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,11 +9,13 @@
"""Module for testing object variables.""" """Module for testing object variables."""
import TestEnv
import cx_Oracle import cx_Oracle
import datetime import datetime
import decimal import decimal
class TestObjectVar(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def __GetObjectAsTuple(self, obj): def __GetObjectAsTuple(self, obj):
if obj.type.iscollection: if obj.type.iscollection:
@ -300,8 +302,9 @@ class TestObjectVar(BaseTestCase):
def testStringFormat(self): def testStringFormat(self):
"test object string format" "test object string format"
objType = self.connection.gettype("UDT_OBJECT") objType = self.connection.gettype("UDT_OBJECT")
user = TestEnv.GetMainUser()
self.assertEqual(str(objType), self.assertEqual(str(objType),
"<cx_Oracle.ObjectType CX_ORACLE.UDT_OBJECT>") "<cx_Oracle.ObjectType %s.UDT_OBJECT>" % user.upper())
self.assertEqual(str(objType.attributes[0]), self.assertEqual(str(objType.attributes[0]),
"<cx_Oracle.ObjectAttribute NUMBERVALUE>") "<cx_Oracle.ObjectAttribute NUMBERVALUE>")
@ -326,3 +329,6 @@ class TestObjectVar(BaseTestCase):
arrayObj.trim(1) arrayObj.trim(1)
self.assertEqual(self.__GetObjectAsTuple(arrayObj), []) self.assertEqual(self.__GetObjectAsTuple(arrayObj), [])
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,27 +1,47 @@
This directory contains the test suite for cx_Oracle. This directory contains the test suite for cx_Oracle.
The schemas and SQL objects that are referenced in the test suite can be 1. The schemas and SQL objects that are referenced in the test suite can be
created by running the SQL script sql/SetupTest.sql. The syntax is: created by running the Python script [SetupTest.py][1]. The script requires
SYSDBA privileges and will prompt for these credentials as well as the
names of the schemas that will be created, unless a number of environment
variables are set as documented in the Python script [TestEnv.py][2]. Run
the script using the following command:
sqlplus sys/syspassword@hostname/servicename as sysdba @sql/SetupTest.sql python SetupTest.py
The script will create users cx_Oracle and cx_Oracle_proxy. If you wish to Alternatively, the [SQL script][3] can be run directly via SQL\*Plus, which
change the names of the users or the name of the edition you can edit the file will always prompt for the names of the schemas that will be created. Run
sql/TestEnv.sql. You will also need to edit the file TestEnv.py or set the script using the following command:
environment variables as documented in TestEnv.py.
The test suite can be run without having cx_Oracle installed by issuing the sqlplus sys/syspassword@hostname/servicename @sql/SetupTest.sql
following command in the main directory:
python setup.py test 2. Run the test suite by issuing the following command in the top-level
directory of your cx_Oracle installation:
If cx_Oracle is already installed, you can also run it by issuing the following python setup.py test
command in this directory:
python test.py Alternatively, you can run the test suite directly within this directory:
After running the test suite, the schemas and SQL objects can be dropped by python test.py
running the SQL script sql/DropTest.sql. The syntax is
sqlplus sys/syspassword@hostname/servicename as sysdba @sql/DropTest.sql 3. After running the test suite, the schemas can be dropped by running the
Python script [DropTest.py][4]. The script requires SYSDBA privileges and
will prompt for these credentials as well as the names of the schemas
that will be dropped, unless a number of environment variables are set as
documented in the Python script [TestEnv.py][2]. Run the script using the
following command:
python DropTest.py
Alternatively, the [SQL script][5] can be run directly via SQL\*Plus, which
will always prompt for the names of the schemas that will be dropped. Run
the script using the following command:
sqlplus sys/syspassword@hostname/servicename @sql/DropTest.sql
[1]: https://github.com/oracle/python-cx_Oracle/blob/master/test/SetupTest.py
[2]: https://github.com/oracle/python-cx_Oracle/blob/master/test/TestEnv.py
[3]: https://github.com/oracle/python-cx_Oracle/blob/master/test/sql/SetupTest.sql
[4]: https://github.com/oracle/python-cx_Oracle/blob/master/test/DropTest.py
[5]: https://github.com/oracle/python-cx_Oracle/blob/master/test/sql/DropTest.sql

View File

@ -1,11 +1,15 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
"""Module for testing Rowids""" """Module for testing Rowids"""
class Rowids(BaseTestCase): import TestEnv
import cx_Oracle
class TestCase(TestEnv.BaseTestCase):
def __TestSelectRowids(self, tableName): def __TestSelectRowids(self, tableName):
self.cursor.execute("select rowid, IntCol from %s""" % tableName) self.cursor.execute("select rowid, IntCol from %s""" % tableName)
@ -53,3 +57,6 @@ class Rowids(BaseTestCase):
self.assertEqual(len(rows), 1) self.assertEqual(len(rows), 1)
self.assertEqual(rows[0][0], intVal) self.assertEqual(rows[0][0], intVal)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,9 +9,12 @@
"""Module for testing session pools.""" """Module for testing session pools."""
import TestEnv
import cx_Oracle
import threading import threading
class TestConnection(TestCase): class TestCase(TestEnv.BaseTestCase):
def __ConnectAndDrop(self): def __ConnectAndDrop(self):
"""Connect to the database, perform a query and drop the connection.""" """Connect to the database, perform a query and drop the connection."""
@ -41,13 +44,21 @@ class TestConnection(TestCase):
self.assertEqual(actualProxyUser, self.assertEqual(actualProxyUser,
expectedProxyUser and expectedProxyUser.upper()) expectedProxyUser and expectedProxyUser.upper())
def setUp(self):
pass
def tearDown(self):
pass
def testPool(self): def testPool(self):
"""test that the pool is created and has the right attributes""" """test that the pool is created and has the right attributes"""
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3, pool = TestEnv.GetPool(min=2, max=8, increment=3,
encoding = ENCODING, nencoding = NENCODING) getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT)
self.assertEqual(pool.username, USERNAME, "user name differs") self.assertEqual(pool.username, TestEnv.GetMainUser(),
self.assertEqual(pool.tnsentry, TNSENTRY, "tnsentry differs") "user name differs")
self.assertEqual(pool.dsn, TNSENTRY, "dsn differs") self.assertEqual(pool.tnsentry, TestEnv.GetConnectString(),
"tnsentry differs")
self.assertEqual(pool.dsn, TestEnv.GetConnectString(), "dsn differs")
self.assertEqual(pool.max, 8, "max differs") self.assertEqual(pool.max, 8, "max differs")
self.assertEqual(pool.min, 2, "min differs") self.assertEqual(pool.min, 2, "min differs")
self.assertEqual(pool.increment, 3, "increment differs") self.assertEqual(pool.increment, 3, "increment differs")
@ -68,46 +79,43 @@ class TestConnection(TestCase):
self.assertEqual(pool.busy, 1, "busy not 1 after del") self.assertEqual(pool.busy, 1, "busy not 1 after del")
pool.getmode = cx_Oracle.SPOOL_ATTRVAL_NOWAIT pool.getmode = cx_Oracle.SPOOL_ATTRVAL_NOWAIT
self.assertEqual(pool.getmode, cx_Oracle.SPOOL_ATTRVAL_NOWAIT) self.assertEqual(pool.getmode, cx_Oracle.SPOOL_ATTRVAL_NOWAIT)
if CLIENT_VERSION >= (12, 2): if TestEnv.GetClientVersion() >= (12, 2):
pool.getmode = cx_Oracle.SPOOL_ATTRVAL_TIMEDWAIT pool.getmode = cx_Oracle.SPOOL_ATTRVAL_TIMEDWAIT
self.assertEqual(pool.getmode, cx_Oracle.SPOOL_ATTRVAL_TIMEDWAIT) self.assertEqual(pool.getmode, cx_Oracle.SPOOL_ATTRVAL_TIMEDWAIT)
pool.stmtcachesize = 50 pool.stmtcachesize = 50
self.assertEqual(pool.stmtcachesize, 50) self.assertEqual(pool.stmtcachesize, 50)
pool.timeout = 10 pool.timeout = 10
self.assertEqual(pool.timeout, 10) self.assertEqual(pool.timeout, 10)
if CLIENT_VERSION >= (12, 1): if TestEnv.GetClientVersion() >= (12, 1):
pool.max_lifetime_session = 10 pool.max_lifetime_session = 10
self.assertEqual(pool.max_lifetime_session, 10) self.assertEqual(pool.max_lifetime_session, 10)
def testProxyAuth(self): def testProxyAuth(self):
"""test that proxy authentication is possible""" """test that proxy authentication is possible"""
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3, pool = TestEnv.GetPool(min=2, max=8, increment=3,
encoding = ENCODING, nencoding = NENCODING) getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT)
self.assertEqual(pool.homogeneous, 1, self.assertEqual(pool.homogeneous, 1,
"homogeneous should be 1 by default") "homogeneous should be 1 by default")
self.assertRaises(cx_Oracle.ProgrammingError, pool.acquire, self.assertRaises(cx_Oracle.ProgrammingError, pool.acquire,
user = u"missing_proxyuser") user = u"missing_proxyuser")
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3, pool = TestEnv.GetPool(min=2, max=8, increment=3,
homogeneous = False, encoding = ENCODING, getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT, homogeneous=False)
nencoding = NENCODING)
self.assertEqual(pool.homogeneous, 0, self.assertEqual(pool.homogeneous, 0,
"homogeneous should be 0 after setting it in the constructor") "homogeneous should be 0 after setting it in the constructor")
connection = pool.acquire(user = PROXY_USERNAME) connection = pool.acquire(user = TestEnv.GetProxyUser())
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute('select user from dual') cursor.execute('select user from dual')
result, = cursor.fetchone() result, = cursor.fetchone()
self.assertEqual(result, PROXY_USERNAME.upper()) self.assertEqual(result, TestEnv.GetProxyUser().upper())
def testRollbackOnDel(self): def testRollbackOnDel(self):
"connection rolls back before being destroyed" "connection rolls back before being destroyed"
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3, pool = TestEnv.GetPool()
encoding = ENCODING, nencoding = NENCODING)
connection = pool.acquire() connection = pool.acquire()
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("truncate table TestTempTable") cursor.execute("truncate table TestTempTable")
cursor.execute("insert into TestTempTable (IntCol) values (1)") cursor.execute("insert into TestTempTable (IntCol) values (1)")
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3, pool = TestEnv.GetPool()
encoding = ENCODING, nencoding = NENCODING)
connection = pool.acquire() connection = pool.acquire()
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("select count(*) from TestTempTable") cursor.execute("select count(*) from TestTempTable")
@ -116,16 +124,14 @@ class TestConnection(TestCase):
def testRollbackOnRelease(self): def testRollbackOnRelease(self):
"connection rolls back before released back to the pool" "connection rolls back before released back to the pool"
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3, pool = TestEnv.GetPool()
encoding = ENCODING, nencoding = NENCODING)
connection = pool.acquire() connection = pool.acquire()
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("truncate table TestTempTable") cursor.execute("truncate table TestTempTable")
cursor.execute("insert into TestTempTable (IntCol) values (1)") cursor.execute("insert into TestTempTable (IntCol) values (1)")
cursor.close() cursor.close()
pool.release(connection) pool.release(connection)
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 1, 8, 3, pool = TestEnv.GetPool()
encoding = ENCODING, nencoding = NENCODING)
connection = pool.acquire() connection = pool.acquire()
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("select count(*) from TestTempTable") cursor.execute("select count(*) from TestTempTable")
@ -134,10 +140,8 @@ class TestConnection(TestCase):
def testThreading(self): def testThreading(self):
"""test session pool to database with multiple threads""" """test session pool to database with multiple threads"""
self.pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, self.pool = TestEnv.GetPool(min=5, max=20, increment=2, threaded=True,
min = 5, max = 20, increment = 2, threaded = True, getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT)
getmode = cx_Oracle.SPOOL_ATTRVAL_WAIT, encoding = ENCODING,
nencoding = NENCODING)
threads = [] threads = []
for i in range(20): for i in range(20):
thread = threading.Thread(None, self.__ConnectAndDrop) thread = threading.Thread(None, self.__ConnectAndDrop)
@ -148,10 +152,8 @@ class TestConnection(TestCase):
def testThreadingWithErrors(self): def testThreadingWithErrors(self):
"""test session pool to database with multiple threads (with errors)""" """test session pool to database with multiple threads (with errors)"""
self.pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, self.pool = TestEnv.GetPool(min=5, max=20, increment=2, threaded=True,
min = 5, max = 20, increment = 2, threaded = True, getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT)
getmode = cx_Oracle.SPOOL_ATTRVAL_WAIT, encoding = ENCODING,
nencoding = NENCODING)
threads = [] threads = []
for i in range(20): for i in range(20):
thread = threading.Thread(None, self.__ConnectAndGenerateError) thread = threading.Thread(None, self.__ConnectAndGenerateError)
@ -163,9 +165,8 @@ class TestConnection(TestCase):
def testPurity(self): def testPurity(self):
"""test session pool with various types of purity""" """test session pool with various types of purity"""
action = "TEST_ACTION" action = "TEST_ACTION"
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, min = 1, pool = TestEnv.GetPool(min=1, max=8, increment=1,
max = 8, increment = 1, encoding = ENCODING, getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT)
nencoding = NENCODING)
# get connection and set the action # get connection and set the action
connection = pool.acquire() connection = pool.acquire()
@ -199,42 +200,46 @@ class TestConnection(TestCase):
def testHeterogeneous(self): def testHeterogeneous(self):
"""test heterogeneous pool with user and password specified""" """test heterogeneous pool with user and password specified"""
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3, pool = TestEnv.GetPool(min=2, max=8, increment=3, homogeneous=False,
encoding = ENCODING, nencoding = NENCODING, getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT)
homogeneous = False)
self.assertEqual(pool.homogeneous, 0) self.assertEqual(pool.homogeneous, 0)
self.__VerifyConnection(pool.acquire(), USERNAME) self.__VerifyConnection(pool.acquire(), TestEnv.GetMainUser())
self.__VerifyConnection(pool.acquire(USERNAME, PASSWORD), USERNAME) self.__VerifyConnection(pool.acquire(TestEnv.GetMainUser(),
self.__VerifyConnection(pool.acquire(PROXY_USERNAME, PROXY_PASSWORD), TestEnv.GetMainPassword()), TestEnv.GetMainUser())
PROXY_USERNAME) self.__VerifyConnection(pool.acquire(TestEnv.GetProxyUser(),
userStr = "%s[%s]" % (USERNAME, PROXY_USERNAME) TestEnv.GetProxyPassword()), TestEnv.GetProxyUser())
self.__VerifyConnection(pool.acquire(userStr, PASSWORD), userStr = "%s[%s]" % (TestEnv.GetMainUser(), TestEnv.GetProxyUser())
PROXY_USERNAME, USERNAME) self.__VerifyConnection(pool.acquire(userStr,
TestEnv.GetMainPassword()), TestEnv.GetProxyUser(),
TestEnv.GetMainUser())
def testHeterogenousWithoutUser(self): def testHeterogenousWithoutUser(self):
"""test heterogeneous pool without user and password specified""" """test heterogeneous pool without user and password specified"""
pool = cx_Oracle.SessionPool("", "", TNSENTRY, 2, 8, 3, pool = TestEnv.GetPool(user="", password="", min=2, max=8, increment=3,
encoding = ENCODING, nencoding = NENCODING, getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT, homogeneous=False)
homogeneous = False) self.__VerifyConnection(pool.acquire(TestEnv.GetMainUser(),
self.__VerifyConnection(pool.acquire(USERNAME, PASSWORD), USERNAME) TestEnv.GetMainPassword()), TestEnv.GetMainUser())
self.__VerifyConnection(pool.acquire(PROXY_USERNAME, PROXY_PASSWORD), self.__VerifyConnection(pool.acquire(TestEnv.GetProxyUser(),
PROXY_USERNAME) TestEnv.GetProxyPassword()), TestEnv.GetProxyUser())
userStr = "%s[%s]" % (USERNAME, PROXY_USERNAME) userStr = "%s[%s]" % (TestEnv.GetMainUser(), TestEnv.GetProxyUser())
self.__VerifyConnection(pool.acquire(userStr, PASSWORD), self.__VerifyConnection(pool.acquire(userStr,
PROXY_USERNAME, USERNAME) TestEnv.GetMainPassword()), TestEnv.GetProxyUser(),
TestEnv.GetMainUser())
def testHeterogenousWithoutPassword(self): def testHeterogenousWithoutPassword(self):
"""test heterogeneous pool without password""" """test heterogeneous pool without password"""
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3, pool = TestEnv.GetPool(min=2, max=8, increment=3,
encoding = ENCODING, nencoding = NENCODING, getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT, homogeneous=False)
homogeneous = False) self.assertRaises(cx_Oracle.DatabaseError, pool.acquire,
self.assertRaises(cx_Oracle.DatabaseError, pool.acquire, USERNAME) TestEnv.GetMainUser())
def testHeterogeneousWrongPassword(self): def testHeterogeneousWrongPassword(self):
"""test heterogeneous pool with wrong password specified""" """test heterogeneous pool with wrong password specified"""
pool = cx_Oracle.SessionPool(USERNAME, PASSWORD, TNSENTRY, 2, 8, 3, pool = TestEnv.GetPool(min=2, max=8, increment=3,
encoding = ENCODING, nencoding = NENCODING, getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT, homogeneous=False)
homogeneous = False)
self.assertRaises(cx_Oracle.DatabaseError, pool.acquire, self.assertRaises(cx_Oracle.DatabaseError, pool.acquire,
PROXY_USERNAME, "this is the wrong password") TestEnv.GetProxyUser(), "this is the wrong password")
if __name__ == "__main__":
TestEnv.RunTestCases()

34
test/SetupTest.py Normal file
View File

@ -0,0 +1,34 @@
#------------------------------------------------------------------------------
# Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# SetupTest.py
#
# Creates users and populates their schemas with the tables and packages
# necessary for the cx_Oracle test suite.
#------------------------------------------------------------------------------
from __future__ import print_function
import cx_Oracle
import TestEnv
import DropTest
# connect as SYSDBA
conn = cx_Oracle.connect(TestEnv.GetSysdbaConnectString(),
mode = cx_Oracle.SYSDBA)
# drop existing users and editions, if applicable
DropTest.DropTests(conn)
# create test schemas
print("Creating test schemas...")
TestEnv.RunSqlScript(conn, "SetupTest",
main_user = TestEnv.GetMainUser(),
main_password = TestEnv.GetMainPassword(),
proxy_user = TestEnv.GetProxyUser(),
proxy_password = TestEnv.GetProxyPassword())
print("Done.")

View File

@ -1,10 +1,14 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
"""Module for testing Simple Oracle Document Access (SODA) Collections""" """Module for testing Simple Oracle Document Access (SODA) Collections"""
class TestSodaDocuments(BaseTestCase): import TestEnv
import cx_Oracle
class TestCase(TestEnv.BaseTestCase):
def __testSkip(self, coll, numToSkip, expectedContent): def __testSkip(self, coll, numToSkip, expectedContent):
filterSpec = {'$orderby': [{'path': 'name', 'order': 'desc'}]} filterSpec = {'$orderby': [{'path': 'name', 'order': 'desc'}]}
@ -271,3 +275,6 @@ class TestSodaDocuments(BaseTestCase):
doc = coll.insertOneAndGet(data) doc = coll.insertOneAndGet(data)
self.assertEqual(doc.createdOn, doc.lastModified) self.assertEqual(doc.createdOn, doc.lastModified)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,12 +1,15 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
"""Module for testing Simple Oracle Document Access (SODA) Database""" """Module for testing Simple Oracle Document Access (SODA) Database"""
import TestEnv
import cx_Oracle
import json import json
class TestSodaCollection(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def __dropExistingCollections(self, sodaDatabase): def __dropExistingCollections(self, sodaDatabase):
for name in sodaDatabase.getCollectionNames(): for name in sodaDatabase.getCollectionNames():
@ -83,7 +86,7 @@ class TestSodaCollection(BaseTestCase):
def testRepr(self): def testRepr(self):
"test SodaDatabase representation" "test SodaDatabase representation"
con1 = self.connection con1 = self.connection
con2 = self.getConnection() con2 = TestEnv.GetConnection()
sodaDatabase1 = con1.getSodaDatabase() sodaDatabase1 = con1.getSodaDatabase()
sodaDatabase2 = con1.getSodaDatabase() sodaDatabase2 = con1.getSodaDatabase()
sodaDatabase3 = con2.getSodaDatabase() sodaDatabase3 = con2.getSodaDatabase()
@ -99,3 +102,6 @@ class TestSodaCollection(BaseTestCase):
sodaDatabase.createCollection, None) sodaDatabase.createCollection, None)
self.assertRaises(TypeError, sodaDatabase.getCollectionNames, 1) self.assertRaises(TypeError, sodaDatabase.getCollectionNames, 1)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -10,14 +10,17 @@
"""Module for testing string variables.""" """Module for testing string variables."""
import TestEnv
import cx_Oracle
import datetime import datetime
import string import string
import random import random
class TestStringVar(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def setUp(self): def setUp(self):
BaseTestCase.setUp(self) TestEnv.BaseTestCase.setUp(self)
self.rawData = [] self.rawData = []
self.dataByKey = {} self.dataByKey = {}
for i in range(1, 11): for i in range(1, 11):
@ -293,12 +296,15 @@ class TestStringVar(BaseTestCase):
self.cursor.execute("select * from TestStrings") self.cursor.execute("select * from TestStrings")
self.assertEqual(self.cursor.description, self.assertEqual(self.cursor.description,
[ ('INTCOL', cx_Oracle.NUMBER, 10, None, 9, 0, 0), [ ('INTCOL', cx_Oracle.NUMBER, 10, None, 9, 0, 0),
('STRINGCOL', cx_Oracle.STRING, 20, 20 * CS_RATIO, None, ('STRINGCOL', cx_Oracle.STRING, 20,
20 * TestEnv.GetCharSetRatio(), None,
None, 0), None, 0),
('RAWCOL', cx_Oracle.BINARY, 30, 30, None, None, 0), ('RAWCOL', cx_Oracle.BINARY, 30, 30, None, None, 0),
('FIXEDCHARCOL', cx_Oracle.FIXED_CHAR, 40, 40 * CS_RATIO, ('FIXEDCHARCOL', cx_Oracle.FIXED_CHAR, 40,
40 * TestEnv.GetCharSetRatio(),
None, None, 0), None, None, 0),
('NULLABLECOL', cx_Oracle.STRING, 50, 50 * CS_RATIO, None, ('NULLABLECOL', cx_Oracle.STRING, 50,
50 * TestEnv.GetCharSetRatio(), None,
None, 1) ]) None, 1) ])
def testFetchAll(self): def testFetchAll(self):
@ -426,3 +432,6 @@ class TestStringVar(BaseTestCase):
actualValue, = self.cursor.fetchone() actualValue, = self.cursor.fetchone()
self.assertEqual(actualValue.strip(), xmlString) self.assertEqual(actualValue.strip(), xmlString)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,9 +1,12 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
"""Module for testing subscriptions.""" """Module for testing subscriptions."""
import TestEnv
import cx_Oracle
import threading import threading
class SubscriptionData(object): class SubscriptionData(object):
@ -31,7 +34,7 @@ class SubscriptionData(object):
self.condition.release() self.condition.release()
class Subscription(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def testSubscription(self): def testSubscription(self):
"test Subscription for insert, update, delete and truncate" "test Subscription for insert, update, delete and truncate"
@ -47,8 +50,7 @@ class Subscription(BaseTestCase):
# set up subscription # set up subscription
data = SubscriptionData(5) data = SubscriptionData(5)
connection = cx_Oracle.connect(USERNAME, PASSWORD, TNSENTRY, connection = TestEnv.GetConnection(threaded=True, events=True)
threaded = True, events = True)
sub = connection.subscribe(callback = data.CallbackHandler, sub = connection.subscribe(callback = data.CallbackHandler,
timeout = 10, qos = cx_Oracle.SUBSCR_QOS_ROWIDS) timeout = 10, qos = cx_Oracle.SUBSCR_QOS_ROWIDS)
sub.registerquery("select * from TestTempTable") sub.registerquery("select * from TestTempTable")
@ -95,6 +97,10 @@ class Subscription(BaseTestCase):
# test string format of subscription object is as expected # test string format of subscription object is as expected
fmt = "<cx_Oracle.Subscription on <cx_Oracle.Connection to %s@%s>>" fmt = "<cx_Oracle.Subscription on <cx_Oracle.Connection to %s@%s>>"
expectedValue = fmt % (USERNAME, TNSENTRY) expectedValue = fmt % \
(TestEnv.GetMainUser(), TestEnv.GetConnectString())
self.assertEqual(str(sub), expectedValue) self.assertEqual(str(sub), expectedValue)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -12,18 +12,16 @@
# applications should consider using External Authentication to # applications should consider using External Authentication to
# avoid hard coded credentials. # avoid hard coded credentials.
# #
# You can set values in environment variables to override the default values. # You can set values in environment variables to bypass having the test suite
# If the default values are not going to be used, however, the SQL script # request the information it requires.
# sql/TestEnv.sql will also need to be modified.
# #
# CX_ORACLE_TEST_MAIN_USER: user used for most samples # CX_ORACLE_TEST_MAIN_USER: user used for most samples
# CX_ORACLE_TEST_MAIN_PASSWORD: password of user used for most samples # CX_ORACLE_TEST_MAIN_PASSWORD: password of user used for most samples
# CX_ORACLE_TEST_PROXY_USER: user for testing proxy connections # CX_ORACLE_TEST_PROXY_USER: user for testing proxy connections
# CX_ORACLE_TEST_PROXY_PASSWORD: password of user for proxying # CX_ORACLE_TEST_PROXY_PASSWORD: password of user for proxying
# CX_ORACLE_TEST_CONNECT_STRING: connect string # CX_ORACLE_TEST_CONNECT_STRING: connect string
# CX_ORACLE_TEST_ENCODING: encoding for CHAR/VARCHAR2 data # CX_ORACLE_TEST_SYSDBA_USER: SYSDBA user for setting up test suite
# CX_ORACLE_TEST_NENCODING: encoding for NCHAR/NVARCHAR2 data # CX_ORACLE_TEST_SYSDBA_PASSWORD: SYSDBA password for setting up test suite
# CX_ORACLE_TEST_ARRAY_SIZE: array size to use for tests
# #
# CX_ORACLE_TEST_CONNECT_STRING can be set to an Easy Connect string, or a # CX_ORACLE_TEST_CONNECT_STRING can be set to an Easy Connect string, or a
# Net Service Name from a tnsnames.ora file or external naming service, # Net Service Name from a tnsnames.ora file or external naming service,
@ -43,30 +41,132 @@
# variable and put the file in $TNS_ADMIN/tnsnames.ora. # variable and put the file in $TNS_ADMIN/tnsnames.ora.
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
from __future__ import print_function
import cx_Oracle
import getpass
import os import os
import sys
import unittest
# default values # default values
DEFAULT_MAIN_USER = "cx_Oracle" DEFAULT_MAIN_USER = "pythontest"
DEFAULT_MAIN_PASSWORD = "welcome" DEFAULT_PROXY_USER = "pythontestproxy"
DEFAULT_PROXY_USER = "cx_Oracle_proxy"
DEFAULT_PROXY_PASSWORD = "welcome"
DEFAULT_CONNECT_STRING = "localhost/orclpdb" DEFAULT_CONNECT_STRING = "localhost/orclpdb"
DEFAULT_ENCODING = "UTF-8"
DEFAULT_NENCODING = "UTF-8"
DEFAULT_ARRAY_SIZE = 5
# values that will be used are the default values unless environment variables # dictionary containing all parameters; these are acquired as needed by the
# have been set as noted above # methods below (which should be used instead of consulting this dictionary
MAIN_USER = os.environ.get("CX_ORACLE_TEST_MAIN_USER", DEFAULT_MAIN_USER) # directly) and then stored so that a value is not requested more than once
MAIN_PASSWORD = os.environ.get("CX_ORACLE_TEST_MAIN_PASSWORD", PARAMETERS = {}
DEFAULT_MAIN_PASSWORD)
PROXY_USER = os.environ.get("CX_ORACLE_TEST_PROXY_USER", DEFAULT_PROXY_USER) def GetValue(name, label, defaultValue=""):
PROXY_PASSWORD = os.environ.get("CX_ORACLE_TEST_PROXY_PASSWORD", value = PARAMETERS.get(name)
DEFAULT_PROXY_PASSWORD) if value is not None:
CONNECT_STRING = os.environ.get("CX_ORACLE_TEST_CONNECT_STRING", return value
DEFAULT_CONNECT_STRING) envName = "CX_ORACLE_TEST_" + name
ENCODING = os.environ.get("CX_ORACLE_TEST_ENCODING", DEFAULT_ENCODING) value = os.environ.get(envName)
NENCODING = os.environ.get("CX_ORACLE_TEST_NENCODING", DEFAULT_NENCODING) if value is None:
ARRAY_SIZE = int(os.environ.get("CX_ORACLE_TEST_ARRAY_SIZE", if defaultValue:
DEFAULT_ARRAY_SIZE)) label += " [%s]" % defaultValue
label += ": "
if defaultValue:
value = input(label).strip()
else:
value = getpass.getpass(label)
if not value:
value = defaultValue
PARAMETERS[name] = value
return value
def GetMainUser():
return GetValue("MAIN_USER", "Main User Name", DEFAULT_MAIN_USER)
def GetMainPassword():
return GetValue("MAIN_PASSWORD", "Password for %s" % GetMainUser())
def GetProxyUser():
return GetValue("PROXY_USER", "Proxy User Name", DEFAULT_PROXY_USER)
def GetProxyPassword():
return GetValue("PROXY_PASSWORD", "Password for %s" % GetProxyUser())
def GetConnectString():
return GetValue("CONNECT_STRING", "Connect String", DEFAULT_CONNECT_STRING)
def GetCharSetRatio():
value = PARAMETERS.get("CS_RATIO")
if value is None:
connection = GetConnection()
cursor = connection.cursor()
cursor.execute("select 'X' from dual")
col, = cursor.description
value = col[3]
PARAMETERS["CS_RATIO"] = value
return value
def GetSysdbaConnectString():
sysdbaUser = GetValue("SYSDBA_USER", "SYSDBA user", "sys")
sysdbaPassword = GetValue("SYSDBA_PASSWORD",
"Password for %s" % sysdbaUser)
return "%s/%s@%s" % (sysdbaUser, sysdbaPassword, GetConnectString())
def RunSqlScript(conn, scriptName, **kwargs):
statementParts = []
cursor = conn.cursor()
replaceValues = [("&" + k + ".", v) for k, v in kwargs.items()] + \
[("&" + k, v) for k, v in kwargs.items()]
scriptDir = os.path.dirname(os.path.abspath(sys.argv[0]))
fileName = os.path.join(scriptDir, "sql", scriptName + "Exec.sql")
for line in open(fileName):
if line.strip() == "/":
statement = "".join(statementParts).strip()
if statement:
for searchValue, replaceValue in replaceValues:
statement = statement.replace(searchValue, replaceValue)
cursor.execute(statement)
statementParts = []
else:
statementParts.append(line)
cursor.execute("""
select name, type, line, position, text
from dba_errors
where owner = upper(:owner)
order by name, type, line, position""",
owner = GetMainUser())
prevName = prevObjType = None
for name, objType, lineNum, position, text in cursor:
if name != prevName or objType != prevObjType:
print("%s (%s)" % (name, objType))
prevName = name
prevObjType = objType
print(" %s/%s %s" % (lineNum, position, text))
def RunTestCases():
unittest.main(testRunner=unittest.TextTestRunner(verbosity=2))
def GetConnection(**kwargs):
return cx_Oracle.connect(GetMainUser(), GetMainPassword(),
GetConnectString(), encoding="UTF-8", nencoding="UTF-8", **kwargs)
def GetPool(user=None, password=None, **kwargs):
if user is None:
user = GetMainUser()
if password is None:
password = GetMainPassword()
return cx_Oracle.SessionPool(user, password, GetConnectString(),
encoding="UTF-8", nencoding="UTF-8", **kwargs)
def GetClientVersion():
return cx_Oracle.clientversion()
class BaseTestCase(unittest.TestCase):
def setUp(self):
self.connection = GetConnection()
self.cursor = self.connection.cursor()
def tearDown(self):
self.connection.close()
del self.cursor
del self.connection

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -9,12 +9,15 @@
"""Module for testing timestamp variables.""" """Module for testing timestamp variables."""
import TestEnv
import cx_Oracle
import time import time
class TestTimestampVar(BaseTestCase): class TestCase(TestEnv.BaseTestCase):
def setUp(self): def setUp(self):
BaseTestCase.setUp(self) TestEnv.BaseTestCase.setUp(self)
self.rawData = [] self.rawData = []
self.dataByKey = {} self.dataByKey = {}
for i in range(1, 11): for i in range(1, 11):
@ -137,3 +140,6 @@ class TestTimestampVar(BaseTestCase):
self.assertEqual(self.cursor.fetchone(), self.dataByKey[4]) self.assertEqual(self.cursor.fetchone(), self.dataByKey[4])
self.assertEqual(self.cursor.fetchone(), None) self.assertEqual(self.cursor.fetchone(), None)
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -1,33 +1,27 @@
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. * Copyright 2019, Oracle and/or its affiliates. All rights reserved.
*---------------------------------------------------------------------------*/ *---------------------------------------------------------------------------*/
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* DropTest.sql * DropTest.sql
* Drops database objects used for testing. * Drops database objects used for cx_Oracle tests.
* *
* Run this like: * Run this like:
* sqlplus / as sysdba @DropTest * sqlplus sys/syspassword@hostname/servicename as sysdba @DropTest
*
* Note that the script TestEnv.sql should be modified if you would like to
* use something other than the default schemas and passwords.
*---------------------------------------------------------------------------*/ *---------------------------------------------------------------------------*/
whenever sqlerror exit failure whenever sqlerror exit failure
-- setup environment -- get parameters
@@TestEnv.sql set echo off termout on feedback off verify off
accept main_user char default pythontest -
prompt "Name of main schema [pythontest]: "
accept proxy_user char default pythontestproxy -
prompt "Name of proxy schema [pythontestproxy]: "
set feedback on
begin -- perform work
@@DropTestExec.sql
for r in exit
( select username
from dba_users
where username in (upper('&main_user'), upper('&proxy_user'))
) loop
execute immediate 'drop user ' || r.username || ' cascade';
end loop;
end;
/

25
test/sql/DropTestExec.sql Normal file
View File

@ -0,0 +1,25 @@
/*-----------------------------------------------------------------------------
* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
*---------------------------------------------------------------------------*/
/*-----------------------------------------------------------------------------
* DropTestExec.sql
* This script performs the actual work of dropping the database schemas used
* by the cx_Oracle test suite. It is called by the DropTest.sql and
* SetupTest.sql scripts after acquiring the necessary parameters and also by
* the Python script DropTest.py.
*---------------------------------------------------------------------------*/
begin
for r in
( select username
from dba_users
where username in (upper('&main_user'), upper('&proxy_user'))
) loop
execute immediate 'drop user ' || r.username || ' cascade';
end loop;
end;
/

View File

@ -1,970 +1,31 @@
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. * Copyright 2019, Oracle and/or its affiliates. All rights reserved.
*
* Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
*
* Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
* Canada. All rights reserved.
*---------------------------------------------------------------------------*/ *---------------------------------------------------------------------------*/
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* SetupTest.sql * SetupTest.sql
* Creates users and populates their schemas with the tables and packages * Creates and populates schemas with the database objects used by the
* necessary for the cx_Oracle test suite. * cx_Oracle test suite.
* *
* Run this like: * Run this like:
* sqlplus / as sysdba @SetupTest * sqlplus sys/syspassword@hostname/servicename as sysdba @SetupTest
*
* Note that the script TestEnv.sql should be modified if you would like to use
* something other than the default configuration.
*---------------------------------------------------------------------------*/ *---------------------------------------------------------------------------*/
whenever sqlerror exit failure whenever sqlerror exit failure
-- drop existing users, if present -- get parameters
@@DropTest.sql set echo off termout on feedback off verify off
accept main_user char default pythontest -
alter session set nls_date_format = 'YYYY-MM-DD HH24:MI:SS'; prompt "Name of main schema [pythontest]: "
alter session set nls_numeric_characters='.,'; accept main_password char prompt "Password for &main_user: " HIDE
accept proxy_user char default pythontestproxy -
create user &main_user identified by &main_password prompt "Name of edition schema [pythontestproxy]: "
quota unlimited on users accept proxy_password char prompt "Password for &proxy_user: " HIDE
default tablespace users; set feedback on
create user &proxy_user identified by &proxy_password; -- perform work
alter user &proxy_user grant connect through &main_user; @@DropTestExec.sql
@@SetupTestExec.sql
grant create session to &proxy_user;
exit
grant
create session,
create table,
create procedure,
create type,
select any dictionary,
change notification
to &main_user;
grant execute on dbms_aqadm to &main_user;
grant execute on dbms_transform to &main_user;
begin
for r in
( select role
from dba_roles
where role in ('SODA_APP')
) loop
execute immediate 'grant ' || r.role || ' to &main_user';
end loop;
end;
/
-- create types
create type &main_user..udt_SubObject as object (
SubNumberValue number,
SubStringValue varchar2(60)
);
/
create type &main_user..udt_ObjectArray as
varray(10) of &main_user..udt_SubObject;
/
create type &main_user..udt_Object as object (
NumberValue number,
StringValue varchar2(60),
FixedCharValue char(10),
NStringValue nvarchar2(60),
NFixedCharValue nchar(10),
RawValue raw(16),
IntValue integer,
SmallIntValue smallint,
RealValue real,
DoublePrecisionValue double precision,
FloatValue float,
BinaryFloatValue binary_float,
BinaryDoubleValue binary_double,
DateValue date,
TimestampValue timestamp,
TimestampTZValue timestamp with time zone,
TimestampLTZValue timestamp with local time zone,
CLOBValue clob,
NCLOBValue nclob,
BLOBValue blob,
SubObjectValue &main_user..udt_SubObject,
SubObjectArray &main_user..udt_ObjectArray
);
/
create type &main_user..udt_Array as varray(10) of number;
/
create or replace type &main_user..udt_Building as object (
BuildingId number(9),
NumFloors number(3),
Description varchar2(60),
DateBuilt date
);
/
create or replace type &main_user..udt_Book as object (
Title varchar2(100),
Authors varchar2(100),
Price number(5,2)
);
/
-- create tables
create table &main_user..TestNumbers (
IntCol number(9) not null,
LongIntCol number(16) not null,
NumberCol number(9, 2) not null,
FloatCol float not null,
UnconstrainedCol number not null,
NullableCol number(38)
);
create table &main_user..TestStrings (
IntCol number(9) not null,
StringCol varchar2(20) not null,
RawCol raw(30) not null,
FixedCharCol char(40) not null,
NullableCol varchar2(50)
);
create table &main_user..TestUnicodes (
IntCol number(9) not null,
UnicodeCol nvarchar2(20) not null,
FixedUnicodeCol nchar(40) not null,
NullableCol nvarchar2(50)
);
create table &main_user..TestDates (
IntCol number(9) not null,
DateCol date not null,
NullableCol date
);
create table &main_user..TestCLOBs (
IntCol number(9) not null,
CLOBCol clob not null
);
create table &main_user..TestNCLOBs (
IntCol number(9) not null,
NCLOBCol nclob not null
);
create table &main_user..TestBLOBs (
IntCol number(9) not null,
BLOBCol blob not null
);
create table &main_user..TestXML (
IntCol number(9) not null,
XMLCol xmltype not null
);
create table &main_user..TestLongs (
IntCol number(9) not null,
LongCol long not null
);
create table &main_user..TestLongRaws (
IntCol number(9) not null,
LongRawCol long raw not null
);
create table &main_user..TestTempTable (
IntCol number(9) not null,
StringCol varchar2(400),
NumberCol number(25,2),
constraint TestTempTable_pk primary key (IntCol)
);
create table &main_user..TestArrayDML (
IntCol number(9) not null,
StringCol varchar2(100),
IntCol2 number(3),
constraint TestArrayDML_pk primary key (IntCol)
);
create table &main_user..TestObjects (
IntCol number(9) not null,
ObjectCol &main_user..udt_Object,
ArrayCol &main_user..udt_Array
);
create table &main_user..TestTimestamps (
IntCol number(9) not null,
TimestampCol timestamp not null,
NullableCol timestamp
);
create table &main_user..TestIntervals (
IntCol number(9) not null,
IntervalCol interval day to second not null,
NullableCol interval day to second
);
create table &main_user..TestUniversalRowids (
IntCol number(9) not null,
StringCol varchar2(250) not null,
DateCol date not null,
constraint TestUniversalRowids_pk primary key (IntCol, StringCol, DateCol)
) organization index;
create table &main_user..TestBuildings (
BuildingId number(9) not null,
BuildingObj &main_user..udt_Building not null
);
create table &main_user..TestRowids (
IntCol number(9) not null,
RowidCol rowid,
URowidCol urowid
);
-- create queue table and queues for testing advanced queuing
begin
dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE',
'&main_user..UDT_BOOK');
dbms_aqadm.create_queue('&main_user..BOOKS', '&main_user..BOOK_QUEUE');
dbms_aqadm.start_queue('&main_user..BOOKS');
end;
/
-- create transformations
begin
dbms_transform.create_transformation('&main_user', 'transform1',
'&main_user', 'UDT_BOOK', '&main_user', 'UDT_BOOK',
'&main_user..UDT_BOOK(source.user_data.TITLE, ' ||
'source.user_data.AUTHORS, source.user_data.PRICE + 5)');
dbms_transform.create_transformation('&main_user', 'transform2',
'&main_user', 'UDT_BOOK', '&main_user', 'UDT_BOOK',
'&main_user..UDT_BOOK(source.user_data.TITLE, ' ||
'source.user_data.AUTHORS, source.user_data.PRICE + 10)');
end;
/
-- populate tables
begin
for i in 1..10 loop
insert into &main_user..TestNumbers
values (i, power(38, i), i + i * 0.25, i + i * .75, i * i * i + i *.5,
decode(mod(i, 2), 0, null, power(143, i)));
end loop;
end;
/
declare
t_RawValue raw(30);
function ConvertHexDigit(a_Value number) return varchar2 is
begin
if a_Value between 0 and 9 then
return to_char(a_Value);
end if;
return chr(ascii('A') + a_Value - 10);
end;
function ConvertToHex(a_Value varchar2) return varchar2 is
t_HexValue varchar2(60);
t_Digit number;
begin
for i in 1..length(a_Value) loop
t_Digit := ascii(substr(a_Value, i, 1));
t_HexValue := t_HexValue ||
ConvertHexDigit(trunc(t_Digit / 16)) ||
ConvertHexDigit(mod(t_Digit, 16));
end loop;
return t_HexValue;
end;
begin
for i in 1..10 loop
t_RawValue := hextoraw(ConvertToHex('Raw ' || to_char(i)));
insert into &main_user..TestStrings
values (i, 'String ' || to_char(i), t_RawValue,
'Fixed Char ' || to_char(i),
decode(mod(i, 2), 0, null, 'Nullable ' || to_char(i)));
end loop;
end;
/
begin
for i in 1..10 loop
insert into &main_user..TestUnicodes
values (i, 'Unicode ' || unistr('\3042') || ' ' || to_char(i),
'Fixed Unicode ' || to_char(i),
decode(mod(i, 2), 0, null, unistr('Nullable ') || to_char(i)));
end loop;
end;
/
begin
for i in 1..10 loop
insert into &main_user..TestDates
values (i, to_date(20021209, 'YYYYMMDD') + i + i * .1,
decode(mod(i, 2), 0, null,
to_date(20021209, 'YYYYMMDD') + i + i + i * .15));
end loop;
end;
/
begin
for i in 1..100 loop
insert into &main_user..TestXML
values (i, '<?xml version="1.0"?><records>' ||
dbms_random.string('x', 1024) || '</records>');
end loop;
end;
/
begin
for i in 1..10 loop
insert into &main_user..TestTimestamps
values (i, to_timestamp('20021209', 'YYYYMMDD') +
to_dsinterval(to_char(i) || ' 00:00:' || to_char(i * 2) ||
'.' || to_char(i * 50)),
decode(mod(i, 2), 0, to_timestamp(null, 'YYYYMMDD'),
to_timestamp('20021209', 'YYYYMMDD') +
to_dsinterval(to_char(i + 1) || ' 00:00:' ||
to_char(i * 3) || '.' || to_char(i * 125))));
end loop;
end;
/
begin
for i in 1..10 loop
insert into &main_user..TestIntervals
values (i, to_dsinterval(to_char(i) || ' ' || to_char(i) || ':' ||
to_char(i * 2) || ':' || to_char(i * 3)),
decode(mod(i, 2), 0, to_dsinterval(null),
to_dsinterval(to_char(i + 5) || ' ' || to_char(i + 2) || ':' ||
to_char(i * 2 + 5) || ':' || to_char(i * 3 + 5))));
end loop;
end;
/
insert into &main_user..TestObjects values (1,
&main_user..udt_Object(1, 'First row', 'First', 'N First Row', 'N First',
'52617720446174612031', 2, 5, 12.125, 0.5, 12.5, 25.25, 50.125,
to_date(20070306, 'YYYYMMDD'),
to_timestamp('20080912 16:40:00', 'YYYYMMDD HH24:MI:SS'),
to_timestamp_tz('20091013 17:50:00 00:00',
'YYYYMMDD HH24:MI:SS TZH:TZM'),
to_timestamp_tz('20101114 18:55:00 00:00',
'YYYYMMDD HH24:MI:SS TZH:TZM'),
'Short CLOB value', 'Short NCLOB Value',
utl_raw.cast_to_raw('Short BLOB value'),
&main_user..udt_SubObject(11, 'Sub object 1'),
&main_user..udt_ObjectArray(
&main_user..udt_SubObject(5, 'first element'),
&main_user..udt_SubObject(6, 'second element'))),
&main_user..udt_Array(5, 10, null, 20));
insert into &main_user..TestObjects values (2, null,
&main_user..udt_Array(3, null, 9, 12, 15));
insert into &main_user..TestObjects values (3,
&main_user..udt_Object(3, 'Third row', 'Third', 'N Third Row', 'N Third',
'52617720446174612033', 4, 10, 6.5, 0.75, 43.25, 86.5, 192.125,
to_date(20070621, 'YYYYMMDD'),
to_timestamp('20071213 07:30:45', 'YYYYMMDD HH24:MI:SS'),
to_timestamp_tz('20170621 23:18:45 00:00',
'YYYYMMDD HH24:MI:SS TZH:TZM'),
to_timestamp_tz('20170721 08:27:13 00:00',
'YYYYMMDD HH24:MI:SS TZH:TZM'),
'Another short CLOB value', 'Another short NCLOB Value',
utl_raw.cast_to_raw('Yet another short BLOB value'),
&main_user..udt_SubObject(13, 'Sub object 3'),
&main_user..udt_ObjectArray(
&main_user..udt_SubObject(10, 'element #1'),
&main_user..udt_SubObject(20, 'element #2'),
&main_user..udt_SubObject(30, 'element #3'),
&main_user..udt_SubObject(40, 'element #4'))), null);
commit;
-- create procedures for testing callproc()
create procedure &main_user..proc_Test (
a_InValue varchar2,
a_InOutValue in out number,
a_OutValue out number
) as
begin
a_InOutValue := a_InOutValue * length(a_InValue);
a_OutValue := length(a_InValue);
end;
/
create procedure &main_user..proc_TestNoArgs as
begin
null;
end;
/
-- create functions for testing callfunc()
create function &main_user..func_Test (
a_String varchar2,
a_ExtraAmount number
) return number as
begin
return length(a_String) + a_ExtraAmount;
end;
/
create function &main_user..func_TestNoArgs
return number as
begin
return 712;
end;
/
-- create packages
create or replace package &main_user..pkg_TestStringArrays as
type udt_StringList is table of varchar2(100) index by binary_integer;
function TestInArrays (
a_StartingLength number,
a_Array udt_StringList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_StringList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_StringList
);
procedure TestIndexBy (
a_Array out nocopy udt_StringList
);
end;
/
create or replace package body &main_user..pkg_TestStringArrays as
function TestInArrays (
a_StartingLength number,
a_Array udt_StringList
) return number is
t_Length number;
begin
t_Length := a_StartingLength;
for i in 1..a_Array.count loop
t_Length := t_Length + length(a_Array(i));
end loop;
return t_Length;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_StringList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := 'Converted element # ' ||
to_char(i) || ' originally had length ' ||
to_char(length(a_Array(i)));
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_StringList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := 'Test out element # ' || to_char(i);
end loop;
end;
procedure TestIndexBy (
a_Array out nocopy udt_StringList
) is
begin
a_Array(-1048576) := 'First element';
a_Array(-576) := 'Second element';
a_Array(284) := 'Third element';
a_Array(8388608) := 'Fourth element';
end;
end;
/
create or replace package &main_user..pkg_TestUnicodeArrays as
type udt_UnicodeList is table of nvarchar2(100) index by binary_integer;
function TestInArrays (
a_StartingLength number,
a_Array udt_UnicodeList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_UnicodeList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_UnicodeList
);
end;
/
create or replace package body &main_user..pkg_TestUnicodeArrays as
function TestInArrays (
a_StartingLength number,
a_Array udt_UnicodeList
) return number is
t_Length number;
begin
t_Length := a_StartingLength;
for i in 1..a_Array.count loop
t_Length := t_Length + length(a_Array(i));
end loop;
return t_Length;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_UnicodeList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := unistr('Converted element ' || unistr('\3042') ||
' # ') || to_char(i) || ' originally had length ' ||
to_char(length(a_Array(i)));
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_UnicodeList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := unistr('Test out element ') || unistr('\3042') ||
' # ' || to_char(i);
end loop;
end;
end;
/
create or replace package &main_user..pkg_TestNumberArrays as
type udt_NumberList is table of number index by binary_integer;
function TestInArrays (
a_StartingValue number,
a_Array udt_NumberList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_NumberList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_NumberList
);
end;
/
create or replace package body &main_user..pkg_TestNumberArrays as
function TestInArrays (
a_StartingValue number,
a_Array udt_NumberList
) return number is
t_Value number;
begin
t_Value := a_StartingValue;
for i in 1..a_Array.count loop
t_Value := t_Value + a_Array(i);
end loop;
return t_Value;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_NumberList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := a_Array(i) * 10;
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_NumberList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := i * 100;
end loop;
end;
end;
/
create or replace package &main_user..pkg_TestDateArrays as
type udt_DateList is table of date index by binary_integer;
function TestInArrays (
a_StartingValue number,
a_BaseDate date,
a_Array udt_DateList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_DateList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_DateList
);
end;
/
create or replace package body &main_user..pkg_TestDateArrays as
function TestInArrays (
a_StartingValue number,
a_BaseDate date,
a_Array udt_DateList
) return number is
t_Value number;
begin
t_Value := a_StartingValue;
for i in 1..a_Array.count loop
t_Value := t_Value + a_Array(i) - a_BaseDate;
end loop;
return t_Value;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_DateList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := a_Array(i) + 7;
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_DateList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := to_date(20021212, 'YYYYMMDD') + i * 1.2;
end loop;
end;
end;
/
create or replace package &main_user..pkg_TestRefCursors as
procedure TestOutCursor (
a_MaxIntValue number,
a_Cursor out sys_refcursor
);
function TestInCursor (
a_Cursor sys_refcursor
) return varchar2;
end;
/
create or replace package body &main_user..pkg_TestRefCursors as
procedure TestOutCursor (
a_MaxIntValue number,
a_Cursor out sys_refcursor
) is
begin
open a_Cursor for
select
IntCol,
StringCol
from TestStrings
where IntCol <= a_MaxIntValue
order by IntCol;
end;
function TestInCursor (
a_Cursor sys_refcursor
) return varchar2 is
t_String varchar2(100);
begin
fetch a_Cursor into t_String;
return t_String || ' (Modified)';
end;
end;
/
create or replace package &main_user..pkg_TestBooleans as
type udt_BooleanList is table of boolean index by binary_integer;
function GetStringRep (
a_Value boolean
) return varchar2;
function IsLessThan10 (
a_Value number
) return boolean;
function TestInArrays (
a_Value udt_BooleanList
) return number;
procedure TestOutArrays (
a_NumElements number,
a_Value out nocopy udt_BooleanList
);
end;
/
create or replace package body &main_user..pkg_TestBooleans as
function GetStringRep (
a_Value boolean
) return varchar2 is
begin
if a_Value is null then
return 'NULL';
elsif a_Value then
return 'TRUE';
end if;
return 'FALSE';
end;
function IsLessThan10 (
a_Value number
) return boolean is
begin
return a_Value < 10;
end;
function TestInArrays (
a_Value udt_BooleanList
) return number is
t_Result pls_integer;
begin
t_Result := 0;
for i in 1..a_Value.count loop
if a_Value(i) then
t_Result := t_Result + 1;
end if;
end loop;
return t_Result;
end;
procedure TestOutArrays (
a_NumElements number,
a_Value out nocopy udt_BooleanList
) is
begin
for i in 1..a_NumElements loop
a_Value(i) := (mod(i, 2) = 1);
end loop;
end;
end;
/
create or replace package &main_user..pkg_TestBindObject as
function GetStringRep (
a_Object udt_Object
) return varchar2;
procedure BindObjectOut (
a_NumberValue number,
a_StringValue varchar2,
a_Object out nocopy udt_Object
);
end;
/
create or replace package body &main_user..pkg_TestBindObject as
function GetStringRep (
a_Object udt_SubObject
) return varchar2 is
begin
if a_Object is null then
return 'null';
end if;
return 'udt_SubObject(' ||
nvl(to_char(a_Object.SubNumberValue), 'null') || ', ' ||
case when a_Object.SubStringValue is null then 'null'
else '''' || a_Object.SubStringValue || '''' end || ')';
end;
function GetStringRep (
a_Array udt_ObjectArray
) return varchar2 is
t_StringRep varchar2(4000);
begin
if a_Array is null then
return 'null';
end if;
t_StringRep := 'udt_ObjectArray(';
for i in 1..a_Array.count loop
if i > 1 then
t_StringRep := t_StringRep || ', ';
end if;
t_StringRep := t_StringRep || GetStringRep(a_Array(i));
end loop;
return t_StringRep || ')';
end;
function GetStringRep (
a_Object udt_Object
) return varchar2 is
begin
if a_Object is null then
return 'null';
end if;
return 'udt_Object(' ||
nvl(to_char(a_Object.NumberValue), 'null') || ', ' ||
case when a_Object.StringValue is null then 'null'
else '''' || a_Object.StringValue || '''' end || ', ' ||
case when a_Object.FixedCharValue is null then 'null'
else '''' || a_Object.FixedCharValue || '''' end || ', ' ||
case when a_Object.DateValue is null then 'null'
else 'to_date(''' ||
to_char(a_Object.DateValue, 'YYYY-MM-DD') ||
''', ''YYYY-MM-DD'')' end || ', ' ||
case when a_Object.TimestampValue is null then 'null'
else 'to_timestamp(''' || to_char(a_Object.TimestampValue,
'YYYY-MM-DD HH24:MI:SS') ||
''', ''YYYY-MM-DD HH24:MI:SS'')' end || ', ' ||
GetStringRep(a_Object.SubObjectValue) || ', ' ||
GetStringRep(a_Object.SubObjectArray) || ')';
end;
procedure BindObjectOut (
a_NumberValue number,
a_StringValue varchar2,
a_Object out nocopy udt_Object
) is
begin
a_Object := udt_Object(a_NumberValue, a_StringValue, null, null, null,
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null);
end;
end;
/
create or replace package &main_user..pkg_TestRecords as
type udt_Record is record (
NumberValue number,
StringValue varchar2(30),
DateValue date,
TimestampValue timestamp,
BooleanValue boolean
);
type udt_RecordArray is table of udt_Record index by binary_integer;
function GetStringRep (
a_Value udt_Record
) return varchar2;
procedure TestOut (
a_Value out nocopy udt_Record
);
function TestInArrays (
a_Value udt_RecordArray
) return varchar2;
end;
/
create or replace package body &main_user..pkg_TestRecords as
function GetStringRep (
a_Value udt_Record
) return varchar2 is
begin
return 'udt_Record(' ||
nvl(to_char(a_Value.NumberValue), 'null') || ', ' ||
case when a_Value.StringValue is null then 'null'
else '''' || a_Value.StringValue || '''' end || ', ' ||
case when a_Value.DateValue is null then 'null'
else 'to_date(''' ||
to_char(a_Value.DateValue, 'YYYY-MM-DD') ||
''', ''YYYY-MM-DD'')' end || ', ' ||
case when a_Value.TimestampValue is null then 'null'
else 'to_timestamp(''' || to_char(a_Value.TimestampValue,
'YYYY-MM-DD HH24:MI:SS') ||
''', ''YYYY-MM-DD HH24:MI:SS'')' end || ', ' ||
case when a_Value.BooleanValue is null then 'null'
when a_Value.BooleanValue then 'true'
else 'false' end || ')';
end;
procedure TestOut (
a_Value out nocopy udt_Record
) is
begin
a_Value.NumberValue := 25;
a_Value.StringValue := 'String in record';
a_Value.DateValue := to_date(20160216, 'YYYYMMDD');
a_Value.TimestampValue := to_timestamp('20160216 18:23:55',
'YYYYMMDD HH24:MI:SS');
a_Value.BooleanValue := true;
end;
function TestInArrays (
a_Value udt_RecordArray
) return varchar2 is
t_Result varchar2(4000);
begin
for i in 0..a_Value.count - 1 loop
if t_Result is not null then
t_Result := t_Result || '; ';
end if;
t_Result := t_Result || GetStringRep(a_Value(i));
end loop;
return t_Result;
end;
end;
/

992
test/sql/SetupTestExec.sql Normal file
View File

@ -0,0 +1,992 @@
/*-----------------------------------------------------------------------------
* Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
*
* Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
*
* Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
* Canada. All rights reserved.
*---------------------------------------------------------------------------*/
/*-----------------------------------------------------------------------------
* SetupTestExec.sql
* This script performs the actual work of creating and populating the
* schemas with the database objects used by the cx_Oracle test suite. It is
* called by the SetupTest.sql file after acquiring the necessary parameters
* and also by the Python script SetupTest.py.
*---------------------------------------------------------------------------*/
alter session set nls_date_format = 'YYYY-MM-DD HH24:MI:SS'
/
alter session set nls_numeric_characters='.,'
/
create user &main_user identified by &main_password
quota unlimited on users
default tablespace users
/
create user &proxy_user identified by &proxy_password
/
alter user &proxy_user grant connect through &main_user
/
grant create session to &proxy_user
/
grant
create session,
create table,
create procedure,
create type,
select any dictionary,
change notification
to &main_user
/
grant execute on dbms_aqadm to &main_user
/
grant execute on dbms_transform to &main_user
/
begin
for r in
( select role
from dba_roles
where role in ('SODA_APP')
) loop
execute immediate 'grant ' || r.role || ' to &main_user';
end loop;
end;
/
-- create types
create type &main_user..udt_SubObject as object (
SubNumberValue number,
SubStringValue varchar2(60)
);
/
create type &main_user..udt_ObjectArray as
varray(10) of &main_user..udt_SubObject;
/
create type &main_user..udt_Object as object (
NumberValue number,
StringValue varchar2(60),
FixedCharValue char(10),
NStringValue nvarchar2(60),
NFixedCharValue nchar(10),
RawValue raw(16),
IntValue integer,
SmallIntValue smallint,
RealValue real,
DoublePrecisionValue double precision,
FloatValue float,
BinaryFloatValue binary_float,
BinaryDoubleValue binary_double,
DateValue date,
TimestampValue timestamp,
TimestampTZValue timestamp with time zone,
TimestampLTZValue timestamp with local time zone,
CLOBValue clob,
NCLOBValue nclob,
BLOBValue blob,
SubObjectValue &main_user..udt_SubObject,
SubObjectArray &main_user..udt_ObjectArray
);
/
create type &main_user..udt_Array as varray(10) of number;
/
create or replace type &main_user..udt_Building as object (
BuildingId number(9),
NumFloors number(3),
Description varchar2(60),
DateBuilt date
);
/
create or replace type &main_user..udt_Book as object (
Title varchar2(100),
Authors varchar2(100),
Price number(5,2)
);
/
-- create tables
create table &main_user..TestNumbers (
IntCol number(9) not null,
LongIntCol number(16) not null,
NumberCol number(9, 2) not null,
FloatCol float not null,
UnconstrainedCol number not null,
NullableCol number(38)
)
/
create table &main_user..TestStrings (
IntCol number(9) not null,
StringCol varchar2(20) not null,
RawCol raw(30) not null,
FixedCharCol char(40) not null,
NullableCol varchar2(50)
)
/
create table &main_user..TestUnicodes (
IntCol number(9) not null,
UnicodeCol nvarchar2(20) not null,
FixedUnicodeCol nchar(40) not null,
NullableCol nvarchar2(50)
)
/
create table &main_user..TestDates (
IntCol number(9) not null,
DateCol date not null,
NullableCol date
)
/
create table &main_user..TestCLOBs (
IntCol number(9) not null,
CLOBCol clob not null
)
/
create table &main_user..TestNCLOBs (
IntCol number(9) not null,
NCLOBCol nclob not null
)
/
create table &main_user..TestBLOBs (
IntCol number(9) not null,
BLOBCol blob not null
)
/
create table &main_user..TestXML (
IntCol number(9) not null,
XMLCol xmltype not null
)
/
create table &main_user..TestLongs (
IntCol number(9) not null,
LongCol long not null
)
/
create table &main_user..TestLongRaws (
IntCol number(9) not null,
LongRawCol long raw not null
)
/
create table &main_user..TestTempTable (
IntCol number(9) not null,
StringCol varchar2(400),
NumberCol number(25,2),
constraint TestTempTable_pk primary key (IntCol)
)
/
create table &main_user..TestArrayDML (
IntCol number(9) not null,
StringCol varchar2(100),
IntCol2 number(3),
constraint TestArrayDML_pk primary key (IntCol)
)
/
create table &main_user..TestObjects (
IntCol number(9) not null,
ObjectCol &main_user..udt_Object,
ArrayCol &main_user..udt_Array
)
/
create table &main_user..TestTimestamps (
IntCol number(9) not null,
TimestampCol timestamp not null,
NullableCol timestamp
)
/
create table &main_user..TestIntervals (
IntCol number(9) not null,
IntervalCol interval day to second not null,
NullableCol interval day to second
)
/
create table &main_user..TestUniversalRowids (
IntCol number(9) not null,
StringCol varchar2(250) not null,
DateCol date not null,
constraint TestUniversalRowids_pk primary key (IntCol, StringCol, DateCol)
) organization index
/
create table &main_user..TestBuildings (
BuildingId number(9) not null,
BuildingObj &main_user..udt_Building not null
)
/
create table &main_user..TestRowids (
IntCol number(9) not null,
RowidCol rowid,
URowidCol urowid
)
/
-- create queue table and queues for testing advanced queuing
begin
dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE',
'&main_user..UDT_BOOK');
dbms_aqadm.create_queue('&main_user..BOOKS', '&main_user..BOOK_QUEUE');
dbms_aqadm.start_queue('&main_user..BOOKS');
end;
/
-- create transformations
begin
dbms_transform.create_transformation('&main_user', 'transform1',
'&main_user', 'UDT_BOOK', '&main_user', 'UDT_BOOK',
'&main_user..UDT_BOOK(source.user_data.TITLE, ' ||
'source.user_data.AUTHORS, source.user_data.PRICE + 5)');
dbms_transform.create_transformation('&main_user', 'transform2',
'&main_user', 'UDT_BOOK', '&main_user', 'UDT_BOOK',
'&main_user..UDT_BOOK(source.user_data.TITLE, ' ||
'source.user_data.AUTHORS, source.user_data.PRICE + 10)');
end;
/
-- populate tables
begin
for i in 1..10 loop
insert into &main_user..TestNumbers
values (i, power(38, i), i + i * 0.25, i + i * .75, i * i * i + i *.5,
decode(mod(i, 2), 0, null, power(143, i)));
end loop;
end;
/
declare
t_RawValue raw(30);
function ConvertHexDigit(a_Value number) return varchar2 is
begin
if a_Value between 0 and 9 then
return to_char(a_Value);
end if;
return chr(ascii('A') + a_Value - 10);
end;
function ConvertToHex(a_Value varchar2) return varchar2 is
t_HexValue varchar2(60);
t_Digit number;
begin
for i in 1..length(a_Value) loop
t_Digit := ascii(substr(a_Value, i, 1));
t_HexValue := t_HexValue ||
ConvertHexDigit(trunc(t_Digit / 16)) ||
ConvertHexDigit(mod(t_Digit, 16));
end loop;
return t_HexValue;
end;
begin
for i in 1..10 loop
t_RawValue := hextoraw(ConvertToHex('Raw ' || to_char(i)));
insert into &main_user..TestStrings
values (i, 'String ' || to_char(i), t_RawValue,
'Fixed Char ' || to_char(i),
decode(mod(i, 2), 0, null, 'Nullable ' || to_char(i)));
end loop;
end;
/
begin
for i in 1..10 loop
insert into &main_user..TestUnicodes
values (i, 'Unicode ' || unistr('\3042') || ' ' || to_char(i),
'Fixed Unicode ' || to_char(i),
decode(mod(i, 2), 0, null, unistr('Nullable ') || to_char(i)));
end loop;
end;
/
begin
for i in 1..10 loop
insert into &main_user..TestDates
values (i, to_date(20021209, 'YYYYMMDD') + i + i * .1,
decode(mod(i, 2), 0, null,
to_date(20021209, 'YYYYMMDD') + i + i + i * .15));
end loop;
end;
/
begin
for i in 1..100 loop
insert into &main_user..TestXML
values (i, '<?xml version="1.0"?><records>' ||
dbms_random.string('x', 1024) || '</records>');
end loop;
end;
/
begin
for i in 1..10 loop
insert into &main_user..TestTimestamps
values (i, to_timestamp('20021209', 'YYYYMMDD') +
to_dsinterval(to_char(i) || ' 00:00:' || to_char(i * 2) ||
'.' || to_char(i * 50)),
decode(mod(i, 2), 0, to_timestamp(null, 'YYYYMMDD'),
to_timestamp('20021209', 'YYYYMMDD') +
to_dsinterval(to_char(i + 1) || ' 00:00:' ||
to_char(i * 3) || '.' || to_char(i * 125))));
end loop;
end;
/
begin
for i in 1..10 loop
insert into &main_user..TestIntervals
values (i, to_dsinterval(to_char(i) || ' ' || to_char(i) || ':' ||
to_char(i * 2) || ':' || to_char(i * 3)),
decode(mod(i, 2), 0, to_dsinterval(null),
to_dsinterval(to_char(i + 5) || ' ' || to_char(i + 2) || ':' ||
to_char(i * 2 + 5) || ':' || to_char(i * 3 + 5))));
end loop;
end;
/
insert into &main_user..TestObjects values (1,
&main_user..udt_Object(1, 'First row', 'First', 'N First Row', 'N First',
'52617720446174612031', 2, 5, 12.125, 0.5, 12.5, 25.25, 50.125,
to_date(20070306, 'YYYYMMDD'),
to_timestamp('20080912 16:40:00', 'YYYYMMDD HH24:MI:SS'),
to_timestamp_tz('20091013 17:50:00 00:00',
'YYYYMMDD HH24:MI:SS TZH:TZM'),
to_timestamp_tz('20101114 18:55:00 00:00',
'YYYYMMDD HH24:MI:SS TZH:TZM'),
'Short CLOB value', 'Short NCLOB Value',
utl_raw.cast_to_raw('Short BLOB value'),
&main_user..udt_SubObject(11, 'Sub object 1'),
&main_user..udt_ObjectArray(
&main_user..udt_SubObject(5, 'first element'),
&main_user..udt_SubObject(6, 'second element'))),
&main_user..udt_Array(5, 10, null, 20))
/
insert into &main_user..TestObjects values (2, null,
&main_user..udt_Array(3, null, 9, 12, 15))
/
insert into &main_user..TestObjects values (3,
&main_user..udt_Object(3, 'Third row', 'Third', 'N Third Row', 'N Third',
'52617720446174612033', 4, 10, 6.5, 0.75, 43.25, 86.5, 192.125,
to_date(20070621, 'YYYYMMDD'),
to_timestamp('20071213 07:30:45', 'YYYYMMDD HH24:MI:SS'),
to_timestamp_tz('20170621 23:18:45 00:00',
'YYYYMMDD HH24:MI:SS TZH:TZM'),
to_timestamp_tz('20170721 08:27:13 00:00',
'YYYYMMDD HH24:MI:SS TZH:TZM'),
'Another short CLOB value', 'Another short NCLOB Value',
utl_raw.cast_to_raw('Yet another short BLOB value'),
&main_user..udt_SubObject(13, 'Sub object 3'),
&main_user..udt_ObjectArray(
&main_user..udt_SubObject(10, 'element #1'),
&main_user..udt_SubObject(20, 'element #2'),
&main_user..udt_SubObject(30, 'element #3'),
&main_user..udt_SubObject(40, 'element #4'))), null)
/
commit
/
-- create procedures for testing callproc()
create procedure &main_user..proc_Test (
a_InValue varchar2,
a_InOutValue in out number,
a_OutValue out number
) as
begin
a_InOutValue := a_InOutValue * length(a_InValue);
a_OutValue := length(a_InValue);
end;
/
create procedure &main_user..proc_TestNoArgs as
begin
null;
end;
/
-- create functions for testing callfunc()
create function &main_user..func_Test (
a_String varchar2,
a_ExtraAmount number
) return number as
begin
return length(a_String) + a_ExtraAmount;
end;
/
create function &main_user..func_TestNoArgs
return number as
begin
return 712;
end;
/
-- create packages
create or replace package &main_user..pkg_TestStringArrays as
type udt_StringList is table of varchar2(100) index by binary_integer;
function TestInArrays (
a_StartingLength number,
a_Array udt_StringList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_StringList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_StringList
);
procedure TestIndexBy (
a_Array out nocopy udt_StringList
);
end;
/
create or replace package body &main_user..pkg_TestStringArrays as
function TestInArrays (
a_StartingLength number,
a_Array udt_StringList
) return number is
t_Length number;
begin
t_Length := a_StartingLength;
for i in 1..a_Array.count loop
t_Length := t_Length + length(a_Array(i));
end loop;
return t_Length;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_StringList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := 'Converted element # ' ||
to_char(i) || ' originally had length ' ||
to_char(length(a_Array(i)));
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_StringList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := 'Test out element # ' || to_char(i);
end loop;
end;
procedure TestIndexBy (
a_Array out nocopy udt_StringList
) is
begin
a_Array(-1048576) := 'First element';
a_Array(-576) := 'Second element';
a_Array(284) := 'Third element';
a_Array(8388608) := 'Fourth element';
end;
end;
/
create or replace package &main_user..pkg_TestUnicodeArrays as
type udt_UnicodeList is table of nvarchar2(100) index by binary_integer;
function TestInArrays (
a_StartingLength number,
a_Array udt_UnicodeList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_UnicodeList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_UnicodeList
);
end;
/
create or replace package body &main_user..pkg_TestUnicodeArrays as
function TestInArrays (
a_StartingLength number,
a_Array udt_UnicodeList
) return number is
t_Length number;
begin
t_Length := a_StartingLength;
for i in 1..a_Array.count loop
t_Length := t_Length + length(a_Array(i));
end loop;
return t_Length;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_UnicodeList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := unistr('Converted element ' || unistr('\3042') ||
' # ') || to_char(i) || ' originally had length ' ||
to_char(length(a_Array(i)));
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_UnicodeList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := unistr('Test out element ') || unistr('\3042') ||
' # ' || to_char(i);
end loop;
end;
end;
/
create or replace package &main_user..pkg_TestNumberArrays as
type udt_NumberList is table of number index by binary_integer;
function TestInArrays (
a_StartingValue number,
a_Array udt_NumberList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_NumberList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_NumberList
);
end;
/
create or replace package body &main_user..pkg_TestNumberArrays as
function TestInArrays (
a_StartingValue number,
a_Array udt_NumberList
) return number is
t_Value number;
begin
t_Value := a_StartingValue;
for i in 1..a_Array.count loop
t_Value := t_Value + a_Array(i);
end loop;
return t_Value;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_NumberList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := a_Array(i) * 10;
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_NumberList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := i * 100;
end loop;
end;
end;
/
create or replace package &main_user..pkg_TestDateArrays as
type udt_DateList is table of date index by binary_integer;
function TestInArrays (
a_StartingValue number,
a_BaseDate date,
a_Array udt_DateList
) return number;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out nocopy udt_DateList
);
procedure TestOutArrays (
a_NumElems number,
a_Array out nocopy udt_DateList
);
end;
/
create or replace package body &main_user..pkg_TestDateArrays as
function TestInArrays (
a_StartingValue number,
a_BaseDate date,
a_Array udt_DateList
) return number is
t_Value number;
begin
t_Value := a_StartingValue;
for i in 1..a_Array.count loop
t_Value := t_Value + a_Array(i) - a_BaseDate;
end loop;
return t_Value;
end;
procedure TestInOutArrays (
a_NumElems number,
a_Array in out udt_DateList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := a_Array(i) + 7;
end loop;
end;
procedure TestOutArrays (
a_NumElems number,
a_Array out udt_DateList
) is
begin
for i in 1..a_NumElems loop
a_Array(i) := to_date(20021212, 'YYYYMMDD') + i * 1.2;
end loop;
end;
end;
/
create or replace package &main_user..pkg_TestRefCursors as
procedure TestOutCursor (
a_MaxIntValue number,
a_Cursor out sys_refcursor
);
function TestInCursor (
a_Cursor sys_refcursor
) return varchar2;
end;
/
create or replace package body &main_user..pkg_TestRefCursors as
procedure TestOutCursor (
a_MaxIntValue number,
a_Cursor out sys_refcursor
) is
begin
open a_Cursor for
select
IntCol,
StringCol
from TestStrings
where IntCol <= a_MaxIntValue
order by IntCol;
end;
function TestInCursor (
a_Cursor sys_refcursor
) return varchar2 is
t_String varchar2(100);
begin
fetch a_Cursor into t_String;
return t_String || ' (Modified)';
end;
end;
/
create or replace package &main_user..pkg_TestBooleans as
type udt_BooleanList is table of boolean index by binary_integer;
function GetStringRep (
a_Value boolean
) return varchar2;
function IsLessThan10 (
a_Value number
) return boolean;
function TestInArrays (
a_Value udt_BooleanList
) return number;
procedure TestOutArrays (
a_NumElements number,
a_Value out nocopy udt_BooleanList
);
end;
/
create or replace package body &main_user..pkg_TestBooleans as
function GetStringRep (
a_Value boolean
) return varchar2 is
begin
if a_Value is null then
return 'NULL';
elsif a_Value then
return 'TRUE';
end if;
return 'FALSE';
end;
function IsLessThan10 (
a_Value number
) return boolean is
begin
return a_Value < 10;
end;
function TestInArrays (
a_Value udt_BooleanList
) return number is
t_Result pls_integer;
begin
t_Result := 0;
for i in 1..a_Value.count loop
if a_Value(i) then
t_Result := t_Result + 1;
end if;
end loop;
return t_Result;
end;
procedure TestOutArrays (
a_NumElements number,
a_Value out nocopy udt_BooleanList
) is
begin
for i in 1..a_NumElements loop
a_Value(i) := (mod(i, 2) = 1);
end loop;
end;
end;
/
create or replace package &main_user..pkg_TestBindObject as
function GetStringRep (
a_Object udt_Object
) return varchar2;
procedure BindObjectOut (
a_NumberValue number,
a_StringValue varchar2,
a_Object out nocopy udt_Object
);
end;
/
create or replace package body &main_user..pkg_TestBindObject as
function GetStringRep (
a_Object udt_SubObject
) return varchar2 is
begin
if a_Object is null then
return 'null';
end if;
return 'udt_SubObject(' ||
nvl(to_char(a_Object.SubNumberValue), 'null') || ', ' ||
case when a_Object.SubStringValue is null then 'null'
else '''' || a_Object.SubStringValue || '''' end || ')';
end;
function GetStringRep (
a_Array udt_ObjectArray
) return varchar2 is
t_StringRep varchar2(4000);
begin
if a_Array is null then
return 'null';
end if;
t_StringRep := 'udt_ObjectArray(';
for i in 1..a_Array.count loop
if i > 1 then
t_StringRep := t_StringRep || ', ';
end if;
t_StringRep := t_StringRep || GetStringRep(a_Array(i));
end loop;
return t_StringRep || ')';
end;
function GetStringRep (
a_Object udt_Object
) return varchar2 is
begin
if a_Object is null then
return 'null';
end if;
return 'udt_Object(' ||
nvl(to_char(a_Object.NumberValue), 'null') || ', ' ||
case when a_Object.StringValue is null then 'null'
else '''' || a_Object.StringValue || '''' end || ', ' ||
case when a_Object.FixedCharValue is null then 'null'
else '''' || a_Object.FixedCharValue || '''' end || ', ' ||
case when a_Object.DateValue is null then 'null'
else 'to_date(''' ||
to_char(a_Object.DateValue, 'YYYY-MM-DD') ||
''', ''YYYY-MM-DD'')' end || ', ' ||
case when a_Object.TimestampValue is null then 'null'
else 'to_timestamp(''' || to_char(a_Object.TimestampValue,
'YYYY-MM-DD HH24:MI:SS') ||
''', ''YYYY-MM-DD HH24:MI:SS'')' end || ', ' ||
GetStringRep(a_Object.SubObjectValue) || ', ' ||
GetStringRep(a_Object.SubObjectArray) || ')';
end;
procedure BindObjectOut (
a_NumberValue number,
a_StringValue varchar2,
a_Object out nocopy udt_Object
) is
begin
a_Object := udt_Object(a_NumberValue, a_StringValue, null, null, null,
null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null);
end;
end;
/
create or replace package &main_user..pkg_TestRecords as
type udt_Record is record (
NumberValue number,
StringValue varchar2(30),
DateValue date,
TimestampValue timestamp,
BooleanValue boolean
);
type udt_RecordArray is table of udt_Record index by binary_integer;
function GetStringRep (
a_Value udt_Record
) return varchar2;
procedure TestOut (
a_Value out nocopy udt_Record
);
function TestInArrays (
a_Value udt_RecordArray
) return varchar2;
end;
/
create or replace package body &main_user..pkg_TestRecords as
function GetStringRep (
a_Value udt_Record
) return varchar2 is
begin
return 'udt_Record(' ||
nvl(to_char(a_Value.NumberValue), 'null') || ', ' ||
case when a_Value.StringValue is null then 'null'
else '''' || a_Value.StringValue || '''' end || ', ' ||
case when a_Value.DateValue is null then 'null'
else 'to_date(''' ||
to_char(a_Value.DateValue, 'YYYY-MM-DD') ||
''', ''YYYY-MM-DD'')' end || ', ' ||
case when a_Value.TimestampValue is null then 'null'
else 'to_timestamp(''' || to_char(a_Value.TimestampValue,
'YYYY-MM-DD HH24:MI:SS') ||
''', ''YYYY-MM-DD HH24:MI:SS'')' end || ', ' ||
case when a_Value.BooleanValue is null then 'null'
when a_Value.BooleanValue then 'true'
else 'false' end || ')';
end;
procedure TestOut (
a_Value out nocopy udt_Record
) is
begin
a_Value.NumberValue := 25;
a_Value.StringValue := 'String in record';
a_Value.DateValue := to_date(20160216, 'YYYYMMDD');
a_Value.TimestampValue := to_timestamp('20160216 18:23:55',
'YYYYMMDD HH24:MI:SS');
a_Value.BooleanValue := true;
end;
function TestInArrays (
a_Value udt_RecordArray
) return varchar2 is
t_Result varchar2(4000);
begin
for i in 0..a_Value.count - 1 loop
if t_Result is not null then
t_Result := t_Result || '; ';
end if;
t_Result := t_Result || GetStringRep(a_Value(i));
end loop;
return t_Result;
end;
end;
/

View File

@ -1,29 +0,0 @@
/*-----------------------------------------------------------------------------
* Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
*---------------------------------------------------------------------------*/
/*-----------------------------------------------------------------------------
* TestEnv.sql
* Sets up configuration for the SetupTest.sql and DropTest.sql scripts.
* Change the values below if you would like to use something other than the
* default values. Note that the environment variables noted below will also
* need to be set, or the Python script TestEnv.py will need to be changed if
* non-default values are used.
*---------------------------------------------------------------------------*/
set echo off termout on feedback off verify off
define main_user = "cx_Oracle" -- $CX_ORACLE_TEST_MAIN_USER
define main_password = "welcome" -- $CX_ORACLE_TEST_MAIN_PASSWORD
define proxy_user = "cx_Oracle_proxy" -- $CX_ORACLE_TEST_PROXY_USER
define proxy_password = "welcome" -- $CX_ORACLE_TEST_PROXY_PASSWORD
prompt ************************************************************************
prompt CONFIGURATION
prompt ************************************************************************
prompt Main Schema: &main_user
prompt Proxy Schema: &proxy_user
prompt
set echo on verify on feedback on

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -12,114 +12,61 @@
from __future__ import print_function from __future__ import print_function
import cx_Oracle import cx_Oracle
import imp
import os import os
import sys import sys
import TestEnv import TestEnv
import unittest import unittest
inSetup = (os.path.basename(sys.argv[0]).lower() == "setup.py") # display version of cx_Oracle and Oracle client for which tests are being run
print("Running tests for cx_Oracle version", cx_Oracle.version, print("Running tests for cx_Oracle version", cx_Oracle.version,
"built at", cx_Oracle.buildtime) "built at", cx_Oracle.buildtime)
print("File:", cx_Oracle.__file__) print("File:", cx_Oracle.__file__)
print("Client Version:", ".".join(str(i) for i in cx_Oracle.clientversion())) print("Client Version:", ".".join(str(i) for i in cx_Oracle.clientversion()))
sys.stdout.flush() sys.stdout.flush()
connection = cx_Oracle.connect(TestEnv.MAIN_USER, TestEnv.MAIN_PASSWORD, # verify that we can connect to the database and display database version
TestEnv.CONNECT_STRING, encoding = TestEnv.ENCODING, connection = TestEnv.GetConnection()
nencoding = TestEnv.NENCODING)
print("Server Version:", connection.version) print("Server Version:", connection.version)
sys.stdout.flush() sys.stdout.flush()
if len(sys.argv) > 1 and not inSetup: # define test cases to run
moduleNames = [os.path.splitext(v)[0] for v in sys.argv[1:]] moduleNames = [
else: "Module",
moduleNames = [ "Connection",
"Module", "Cursor",
"Connection", "CursorVar",
"Cursor", "DateTimeVar",
"CursorVar", "DMLReturning",
"DateTimeVar", "Error",
"DMLReturning", "IntervalVar",
"Error", "LobVar",
"IntervalVar", "LongVar",
"LobVar", "NCharVar",
"LongVar", "NumberVar",
"NCharVar", "ObjectVar",
"NumberVar", "SessionPool",
"ObjectVar", "StringVar",
"SessionPool", "TimestampVar",
"StringVar", "AQ",
"TimestampVar", "Rowid",
"AQ", "Subscription"
"Rowid", ]
"Subscription" clientVersion = cx_Oracle.clientversion()
] if clientVersion[:2] >= (12, 1):
clientVersion = cx_Oracle.clientversion() moduleNames.append("BooleanVar")
if clientVersion[:2] >= (12, 1): moduleNames.append("Features12_1")
moduleNames.append("BooleanVar") if clientVersion[:2] >= (18, 3):
moduleNames.append("Features12_1") moduleNames.append("SodaDatabase")
if clientVersion[:2] >= (18, 3): moduleNames.append("SodaCollection")
moduleNames.append("SodaDatabase")
moduleNames.append("SodaCollection")
class BaseTestCase(unittest.TestCase):
def getConnection(self, **kwargs):
import cx_Oracle
import TestEnv
return cx_Oracle.connect(TestEnv.MAIN_USER, TestEnv.MAIN_PASSWORD,
TestEnv.CONNECT_STRING, encoding = TestEnv.ENCODING,
nencoding = TestEnv.NENCODING, **kwargs)
def setUp(self):
import TestEnv
self.connection = self.getConnection()
self.cursor = self.connection.cursor()
self.cursor.arraysize = TestEnv.ARRAY_SIZE
def tearDown(self):
del self.cursor
del self.connection
# determine character set ratio in use in order to determine the buffer size
# that will be reported in cursor.description; this depends on the database
# character set and the client character set
cursor = connection.cursor()
cursor.execute("select 'X' from dual")
col, = cursor.description
csratio = col[3]
# run all test cases
failures = []
loader = unittest.TestLoader() loader = unittest.TestLoader()
runner = unittest.TextTestRunner(verbosity = 2) runner = unittest.TextTestRunner(verbosity = 2)
failures = []
for name in moduleNames: for name in moduleNames:
fileName = name + ".py"
print() print()
print("Running tests in", fileName) print("Running tests in", name)
if inSetup: tests = loader.loadTestsFromName(name + ".TestCase")
fileName = os.path.join("test", fileName)
module = imp.new_module(name)
setattr(module, "CLIENT_VERSION", cx_Oracle.clientversion())
setattr(module, "USERNAME", TestEnv.MAIN_USER)
setattr(module, "PASSWORD", TestEnv.MAIN_PASSWORD)
setattr(module, "PROXY_USERNAME", TestEnv.PROXY_USER)
setattr(module, "PROXY_PASSWORD", TestEnv.PROXY_PASSWORD)
setattr(module, "TNSENTRY", TestEnv.CONNECT_STRING)
setattr(module, "ENCODING", TestEnv.ENCODING)
setattr(module, "NENCODING", TestEnv.NENCODING)
setattr(module, "ARRAY_SIZE", TestEnv.ARRAY_SIZE)
setattr(module, "CS_RATIO", csratio)
setattr(module, "TestCase", unittest.TestCase)
setattr(module, "BaseTestCase", BaseTestCase)
setattr(module, "cx_Oracle", cx_Oracle)
if sys.version_info[0] >= 3:
exec(open(fileName, encoding = "UTF-8").read(), module.__dict__)
else:
execfile(fileName, module.__dict__)
tests = loader.loadTestsFromModule(module)
result = runner.run(tests) result = runner.run(tests)
if not result.wasSuccessful(): if not result.wasSuccessful():
failures.append(name) failures.append(name)

View File

@ -1,5 +1,5 @@
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
# Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. # Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
# #
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. # Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
# #
@ -20,7 +20,8 @@ import TestEnv
class TestSuite(dbapi20.DatabaseAPI20Test): class TestSuite(dbapi20.DatabaseAPI20Test):
connect_args = (TestEnv.USERNAME, TestEnv.PASSWORD, TestEnv.TNSENTRY) connect_args = (TestEnv.GetMainUser(), TestEnv.GetMainPassword(),
TestEnv.GetConnectString())
driver = cx_Oracle driver = cx_Oracle
# not implemented; see cx_Oracle specific test suite instead # not implemented; see cx_Oracle specific test suite instead
@ -52,6 +53,6 @@ class TestSuite(dbapi20.DatabaseAPI20Test):
pass pass
if __name__ == "__main__": if __name__ == "__main__":
print("Testing cx_Oracle version", cx_Oracle.version) print("Testing cx_Oracle version", cx_Oracle.__version__)
unittest.main() TestEnv.RunTestCases()