Added support for getting the message id of the AQ message which generated
a notification.
This commit is contained in:
parent
fedd95f78e
commit
4fa65095bf
|
@ -126,6 +126,11 @@ Message Objects
|
|||
This read-only attribute returns the name of the database that generated
|
||||
the notification.
|
||||
|
||||
.. attribute:: Message.msgid
|
||||
|
||||
This read-only attribute returns the message id of the AQ message which
|
||||
generated the notification. It will only be populated if the subscription
|
||||
was created with the namespace :data:`oracledb.SUBSCR_NAMESPACE_AQ`.
|
||||
|
||||
.. attribute:: Message.queries
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ Thin Mode Changes
|
|||
Thick Mode Changes
|
||||
++++++++++++++++++
|
||||
|
||||
#) Added support for getting the message id of the AQ message which generated
|
||||
a notification.
|
||||
#) Fixed the ability to use external authentication with connection pools.
|
||||
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ def process_messages(message):
|
|||
return
|
||||
print("Queue name:", message.queue_name)
|
||||
print("Consumer name:", message.consumer_name)
|
||||
print("Message id:", message.msgid)
|
||||
|
||||
connection = oracledb.connect(user=sample_env.get_main_user(),
|
||||
password=sample_env.get_main_password(),
|
||||
|
|
|
@ -480,6 +480,8 @@ cdef extern from "impl/thick/odpi/embed/dpi.c":
|
|||
uint32_t queueNameLength
|
||||
const char *consumerName
|
||||
uint32_t consumerNameLength
|
||||
const void *aqMsgId
|
||||
uint32_t aqMsgIdLength
|
||||
|
||||
ctypedef struct dpiSubscrMessageQuery:
|
||||
uint64_t id
|
||||
|
|
|
@ -99,6 +99,9 @@ cdef class ThickSubscrImpl(BaseSubscrImpl):
|
|||
if message.consumerName != NULL:
|
||||
py_message._consumer_name = \
|
||||
message.consumerName[:message.consumerNameLength].decode()
|
||||
if message.aqMsgId != NULL:
|
||||
msgid = <const char*> message.aqMsgId
|
||||
py_message._msgid = msgid[:message.aqMsgIdLength]
|
||||
if message.eventType == DPI_EVENT_OBJCHANGE:
|
||||
temp = py_message._tables
|
||||
for i in range(message.numTables):
|
||||
|
|
|
@ -168,6 +168,7 @@ class Message:
|
|||
self._tables = []
|
||||
self._txid = None
|
||||
self._type = 0
|
||||
self._msgid = None
|
||||
|
||||
|
||||
@property
|
||||
|
@ -193,6 +194,13 @@ class Message:
|
|||
"""
|
||||
return self._db_name
|
||||
|
||||
@property
|
||||
def msgid(self) -> bytes:
|
||||
"""
|
||||
Returns the message id of the AQ message that generated the notification.
|
||||
"""
|
||||
return self._msgid
|
||||
|
||||
@property
|
||||
def queries(self) -> list:
|
||||
"""
|
||||
|
|
|
@ -423,5 +423,29 @@ class TestCase(test_env.BaseTestCase):
|
|||
props1 = queue.deqone()
|
||||
self.assertTrue(props1 is None)
|
||||
|
||||
def test_2720_aq_notification(self):
|
||||
"2720 - verify msgid of aq message which spawned notification "
|
||||
self.__clear_books_queue()
|
||||
condition = threading.Condition()
|
||||
connection = test_env.get_connection(events=True)
|
||||
def notification_callback(message):
|
||||
self.cursor.execute("select msgid from book_queue_tab")
|
||||
actual_msgid, = self.cursor.fetchone()
|
||||
self.assertEqual(actual_msgid, message.msgid)
|
||||
with condition:
|
||||
condition.notify()
|
||||
sub = connection.subscribe(namespace=oracledb.SUBSCR_NAMESPACE_AQ,
|
||||
name=self.book_queue_name,
|
||||
callback=notification_callback, timeout=300)
|
||||
books_type = connection.gettype(self.book_type_name)
|
||||
queue = connection.queue(self.book_queue_name, books_type)
|
||||
book = books_type.newobject()
|
||||
book.TITLE, book.AUTHORS, book.PRICE = self.book_data[0]
|
||||
props = connection.msgproperties(payload=book)
|
||||
queue.enqone(props)
|
||||
connection.commit()
|
||||
with condition:
|
||||
condition.wait(5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_env.run_test_cases()
|
||||
|
|
Loading…
Reference in New Issue