Added preliminary test cases for subscriptions.

This commit is contained in:
Anthony Tuininga 2017-12-07 15:54:11 -07:00
parent 1fe14c66dc
commit a8d9b74c3a
2 changed files with 93 additions and 1 deletions

91
test/Subscription.py Normal file
View File

@ -0,0 +1,91 @@
#------------------------------------------------------------------------------
# Copyright 2017, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
"""Module for testing subscriptions."""
import threading
class SubscriptionData(object):
def __init__(self, numMessagesExpected):
self.condition = threading.Condition()
self.numMessagesExpected = numMessagesExpected
self.numMessagesReceived = 0
self.tableOperations = []
self.rowOperations = []
self.rowids = []
def CallbackHandler(self, message):
if message.type != cx_Oracle.EVENT_DEREG:
table, = message.tables
self.tableOperations.append(table.operation)
for row in table.rows:
self.rowOperations.append(row.operation)
self.rowids.append(row.rowid)
self.numMessagesReceived += 1
if message.type == cx_Oracle.EVENT_DEREG or \
self.numMessagesReceived == self.numMessagesExpected:
self.condition.acquire()
self.condition.notify()
self.condition.release()
class Subscription(BaseTestCase):
def testSubscription(self):
"test Subscription for insert, update, delete and truncate"
self.cursor.execute("truncate table TestTempTable")
# expected values
tableOperations = [ cx_Oracle.OPCODE_INSERT, cx_Oracle.OPCODE_UPDATE,
cx_Oracle.OPCODE_INSERT, cx_Oracle.OPCODE_DELETE,
cx_Oracle.OPCODE_ALTER | cx_Oracle.OPCODE_ALLROWS ]
rowOperations = [ cx_Oracle.OPCODE_INSERT, cx_Oracle.OPCODE_UPDATE,
cx_Oracle.OPCODE_INSERT, cx_Oracle.OPCODE_DELETE ]
rowids = []
# set up subscription
data = SubscriptionData(5)
connection = cx_Oracle.connect(USERNAME, PASSWORD, TNSENTRY,
threaded = True, events = True)
sub = connection.subscribe(callback = data.CallbackHandler,
timeout = 10, qos = cx_Oracle.SUBSCR_QOS_ROWIDS)
sub.registerquery("select * from TestTempTable")
connection.autocommit = True
cursor = connection.cursor()
# insert statement
cursor.execute("insert into TestTempTable values (1, 'test')")
cursor.execute("select rowid from TestTempTable where IntCol = 1")
rowids.extend(r for r, in cursor)
# update statement
cursor.execute("""
update TestTempTable set
StringCol = 'update'
where IntCol = 1""")
cursor.execute("select rowid from TestTempTable where IntCol = 1")
rowids.extend(r for r, in cursor)
# second insert statement
cursor.execute("insert into TestTempTable values (2, 'test2')")
cursor.execute("select rowid from TestTempTable where IntCol = 2")
rowids.extend(r for r, in cursor)
# delete statement
cursor.execute("delete TestTempTable where IntCol = 2")
rowids.append(rowids[-1])
# truncate table
cursor.execute("truncate table TestTempTable")
# wait for all messages to be sent
data.condition.acquire()
data.condition.wait(10)
# verify the correct messages were sent
self.assertEqual(data.tableOperations, tableOperations)
self.assertEqual(data.rowOperations, rowOperations)
self.assertEqual(data.rowids, rowids)

View File

@ -52,7 +52,8 @@ else:
"StringVar",
"TimestampVar",
"AQ",
"Rowid"
"Rowid",
"Subscription"
]
clientVersion = cx_Oracle.clientversion()
if clientVersion[:2] >= (12, 1):