Added test cases for cursor.lastrowid and SODA collection truncation (and added

code to check for situation where SODA support is lacking and stop running the
test suite for SODA in that case).
This commit is contained in:
Anthony Tuininga 2020-05-18 16:34:58 -06:00
parent 20686a1fc9
commit d5144aa58f
4 changed files with 102 additions and 20 deletions

View File

@ -698,5 +698,57 @@ class TestCase(TestEnv.BaseTestCase):
result, = self.cursor.fetchone()
self.assertEqual(result, expectedResult)
def testLastRowid(self):
"test last rowid"
# no statement executed: no rowid
self.assertEqual(None, self.cursor.lastrowid)
# DDL statement executed: no rowid
self.cursor.execute("truncate table TestTempTable")
self.assertEqual(None, self.cursor.lastrowid)
# statement prepared: no rowid
self.cursor.prepare("insert into TestTempTable (IntCol) values (:1)")
self.assertEqual(None, self.cursor.lastrowid)
# multiple rows inserted: rowid of last row inserted
rows = [(n,) for n in range(225)]
self.cursor.executemany(None, rows)
rowid = self.cursor.lastrowid
self.cursor.execute("""
select rowid
from TestTempTable
where IntCol = :1""", rows[-1])
self.assertEqual(rowid, self.cursor.fetchone()[0])
# statement executed but no rows updated: no rowid
self.cursor.execute("delete from TestTempTable where 1 = 0")
self.assertEqual(None, self.cursor.lastrowid)
# stetement executed with one row updated: rowid of updated row
self.cursor.execute("""
update TestTempTable set
StringCol = 'Modified'
where IntCol = :1""", rows[-2])
rowid = self.cursor.lastrowid
self.cursor.execute("""
select rowid
from TestTempTable
where IntCol = :1""", rows[-2])
self.assertEqual(rowid, self.cursor.fetchone()[0])
# statement executed with many rows updated: rowid of last updated row
self.cursor.execute("""
update TestTempTable set
StringCol = 'Row ' || to_char(IntCol)
where IntCol = :1""", rows[-3])
rowid = self.cursor.lastrowid
self.cursor.execute("""
select StringCol
from TestTempTable
where rowid = :1""", [rowid])
self.assertEqual("Row %s" % rows[-3], self.cursor.fetchone()[0])
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -19,7 +19,7 @@ class TestCase(TestEnv.BaseTestCase):
def testInvalidJson(self):
"test inserting invalid JSON value into SODA collection"
invalidJson = "{testKey:testValue}"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoInvalidJSON")
doc = sodaDatabase.createDocument(invalidJson)
self.assertRaises(cx_Oracle.IntegrityError, coll.insertOne, doc)
@ -27,7 +27,7 @@ class TestCase(TestEnv.BaseTestCase):
def testInsertDocuments(self):
"test inserting documents into a SODA collection"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoInsertDocs")
coll.find().remove()
valuesToInsert = [
@ -49,7 +49,7 @@ class TestCase(TestEnv.BaseTestCase):
def testSkipDocuments(self):
"test skipping documents in a SODA collection"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoSkipDocs")
coll.find().remove()
valuesToInsert = [
@ -69,7 +69,7 @@ class TestCase(TestEnv.BaseTestCase):
def testReplaceDocument(self):
"test replace documents in SODA collection"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoReplaceDoc")
coll.find().remove()
content = {'name': 'John', 'address': {'city': 'Sydney'}}
@ -83,7 +83,7 @@ class TestCase(TestEnv.BaseTestCase):
def testSearchDocumentsWithContent(self):
"test search documents with content using $like and $regex"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoSearchDocContent")
coll.find().remove()
data = [
@ -120,7 +120,7 @@ class TestCase(TestEnv.BaseTestCase):
def testDocumentRemove(self):
"test removing documents"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoRemoveDocs")
coll.find().remove()
data = [
@ -156,7 +156,7 @@ class TestCase(TestEnv.BaseTestCase):
}
]
}
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoTestIndexes")
coll.find().remove()
self.connection.commit()
@ -170,7 +170,7 @@ class TestCase(TestEnv.BaseTestCase):
def testGetDocuments(self):
"test getting documents from Collection"
self.connection.autocommit = True
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoTestGetDocs")
coll.find().remove()
data = [
@ -188,7 +188,7 @@ class TestCase(TestEnv.BaseTestCase):
def testCursor(self):
"test fetching documents from a cursor"
self.connection.autocommit = True
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoFindViaCursor")
coll.find().remove()
data = [
@ -203,7 +203,7 @@ class TestCase(TestEnv.BaseTestCase):
def testMultipleDocumentRemove(self):
"test removing multiple documents using multiple keys"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoRemoveMultipleDocs")
coll.find().remove()
data = [
@ -224,7 +224,7 @@ class TestCase(TestEnv.BaseTestCase):
def testDocumentVersion(self):
"test using version to get documents and remove them"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoDocumentVersion")
coll.find().remove()
content = {'name': 'John', 'address': {'city': 'Bangalore'}}
@ -248,7 +248,7 @@ class TestCase(TestEnv.BaseTestCase):
def testGetCursor(self):
"test keys with GetCursor"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoKeysWithGetCursor")
coll.find().remove()
data = [
@ -268,13 +268,32 @@ class TestCase(TestEnv.BaseTestCase):
def testCreatedOn(self):
"test createdOn attribute of Document"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
coll = sodaDatabase.createCollection("cxoCreatedOn")
coll.find().remove()
data = {'name': 'John', 'address': {'city': 'Bangalore'}}
doc = coll.insertOneAndGet(data)
self.assertEqual(doc.createdOn, doc.lastModified)
def testSodaTruncate(self):
"test Soda truncate"
sodaDatabase = self.getSodaDatabase(minclient=(20,1))
coll = sodaDatabase.createCollection("cxoTruncateDocs")
coll.find().remove()
valuesToInsert = [
{ "name" : "George", "age" : 47 },
{ "name" : "Susan", "age" : 39 },
{ "name" : "John", "age" : 50 },
{ "name" : "Jill", "age" : 54 }
]
for value in valuesToInsert:
coll.insertOne(value)
self.connection.commit()
self.assertEqual(coll.find().count(), len(valuesToInsert))
coll.truncate()
self.assertEqual(coll.find().count(), 0)
coll.drop()
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -27,7 +27,7 @@ class TestCase(TestEnv.BaseTestCase):
def testCreateDocumentWithJson(self):
"test creating documents with JSON data"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
val = {"testKey1" : "testValue1", "testKey2" : "testValue2" }
strVal = json.dumps(val)
bytesVal = strVal.encode("UTF-8")
@ -42,7 +42,7 @@ class TestCase(TestEnv.BaseTestCase):
def testCreateDocumentWithRaw(self):
"test creating documents with raw data"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
val = b"<html/>"
key = "MyRawKey"
mediaType = "text/html"
@ -55,7 +55,7 @@ class TestCase(TestEnv.BaseTestCase):
def testGetCollectionNames(self):
"test getting collection names from the database"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
self.__dropExistingCollections(sodaDatabase)
self.assertEqual(sodaDatabase.getCollectionNames(), [])
names = ["zCol", "dCol", "sCol", "aCol", "gCol"]
@ -74,7 +74,7 @@ class TestCase(TestEnv.BaseTestCase):
def testOpenCollection(self):
"test opening a collection"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
self.__dropExistingCollections(sodaDatabase)
coll = sodaDatabase.openCollection("CollectionThatDoesNotExist")
self.assertEqual(coll, None)
@ -87,7 +87,7 @@ class TestCase(TestEnv.BaseTestCase):
"test SodaDatabase representation"
con1 = self.connection
con2 = TestEnv.GetConnection()
sodaDatabase1 = con1.getSodaDatabase()
sodaDatabase1 = self.getSodaDatabase()
sodaDatabase2 = con1.getSodaDatabase()
sodaDatabase3 = con2.getSodaDatabase()
self.assertEqual(str(sodaDatabase1), str(sodaDatabase2))
@ -95,7 +95,7 @@ class TestCase(TestEnv.BaseTestCase):
def testNegative(self):
"test negative cases for SODA database methods"
sodaDatabase = self.connection.getSodaDatabase()
sodaDatabase = self.getSodaDatabase()
self.assertRaises(TypeError, sodaDatabase.createCollection)
self.assertRaises(TypeError, sodaDatabase.createCollection, 1)
self.assertRaises(cx_Oracle.DatabaseError,
@ -104,4 +104,3 @@ class TestCase(TestEnv.BaseTestCase):
if __name__ == "__main__":
TestEnv.RunTestCases()

View File

@ -165,6 +165,18 @@ def GetClientVersion():
class BaseTestCase(unittest.TestCase):
def getSodaDatabase(self, minclient=(18, 3), minserver=(18, 0),
message="not supported with this client/server combination"):
client = cx_Oracle.clientversion()[:2]
if client < minclient:
self.skipTest(message)
server = tuple(int(s) for s in self.connection.version.split("."))[:2]
if server < minserver:
self.skipTest(message)
if server > (20, 1) and client < (20, 1):
self.skipTest(message)
return self.connection.getSodaDatabase()
def isOnOracleCloud(self, connection=None):
if connection is None:
connection = self.connection