Improvement to samples.

This commit is contained in:
Anthony Tuininga 2022-06-08 11:14:24 -06:00
parent 74a2616c22
commit e2485af06e
50 changed files with 1578 additions and 927 deletions

View File

@ -32,6 +32,7 @@ oracledb 1.0.1 (TBD)
fetched as floats when oracledb.defaults.fetch_lobs was set to `False`.
#) Thin: ensure that errors that occur during fetch are detected consistently.
#) Thin: fixed issue when fetching null values in implicit results.
#) Improved samples and documentation.
oracledb 1.0.0 (May 2022)

View File

@ -41,7 +41,7 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
# define constants used throughout the script; adjust as desired
# client context attributes to be set
APP_CTX_NAMESPACE = "CLIENTCONTEXT"
APP_CTX_ENTRIES = [
( APP_CTX_NAMESPACE, "ATTR1", "VALUE1" ),
@ -49,10 +49,15 @@ APP_CTX_ENTRIES = [
( APP_CTX_NAMESPACE, "ATTR3", "VALUE3" )
]
connection = oracledb.connect(sample_env.get_main_connect_string(),
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(),
appcontext=APP_CTX_ENTRIES)
cursor = connection.cursor()
for namespace, name, value in APP_CTX_ENTRIES:
cursor.execute("select sys_context(:1, :2) from dual", (namespace, name))
with connection.cursor() as cursor:
for namespace, name, value in APP_CTX_ENTRIES:
cursor.execute("select sys_context(:1, :2) from dual",
(namespace, name))
value, = cursor.fetchone()
print("Value of context key", name, "is", value)

View File

@ -26,9 +26,8 @@
# aq_notification.py
#
# Demonstrates using advanced queuing notification. Once this script is
# running, use another session to enqueue a few messages to the
# "DEMO_BOOK_QUEUE" queue. This is most easily accomplished by running the
# object_aq.py sample.
# running, run object_aq.py in another terminal to to enqueue a few messages to
# the "DEMO_BOOK_QUEUE" queue.
#------------------------------------------------------------------------------
import time
@ -51,8 +50,11 @@ def process_messages(message):
print("Queue name:", message.queue_name)
print("Consumer name:", message.consumer_name)
connection = oracledb.connect(sample_env.get_main_connect_string(),
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(),
events=True)
sub = connection.subscribe(namespace=oracledb.SUBSCR_NAMESPACE_AQ,
name="DEMO_BOOK_QUEUE", callback=process_messages,
timeout=300)

View File

@ -38,33 +38,36 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# show the number of rows for each parent ID as a means of verifying the
# output from the delete statement
for parent_id, count in cursor.execute("""
with connection.cursor() as cursor:
# show the number of rows for each parent ID as a means of verifying the
# output from the delete statement
for parent_id, count in cursor.execute("""
select ParentId, count(*)
from ChildTable
group by ParentId
order by ParentId"""):
print("Parent ID:", parent_id, "has", int(count), "rows.")
print()
print()
# delete the following parent IDs only
parent_ids_to_delete = [20, 30, 50]
# delete the following parent IDs only
parent_ids_to_delete = [20, 30, 50]
print("Deleting Parent IDs:", parent_ids_to_delete)
print()
print("Deleting Parent IDs:", parent_ids_to_delete)
print()
# enable array DML row counts for each iteration executed in executemany()
cursor.executemany("""
# enable array DML row counts for each iteration executed in executemany()
cursor.executemany("""
delete from ChildTable
where ParentId = :1""",
[(i,) for i in parent_ids_to_delete],
arraydmlrowcounts = True)
# display the number of rows deleted for each parent ID
row_counts = cursor.getarraydmlrowcounts()
for parent_id, count in zip(parent_ids_to_delete, row_counts):
# display the number of rows deleted for each parent ID
row_counts = cursor.getarraydmlrowcounts()
for parent_id, count in zip(parent_ids_to_delete, row_counts):
print("Parent ID:", parent_id, "deleted", count, "rows.")

View File

@ -40,11 +40,21 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# define data to insert
data_to_insert = [
with connection.cursor() as cursor:
# retrieve the number of rows in the table
cursor.execute("""
select count(*)
from ChildTable""")
count, = cursor.fetchone()
print("Number of rows in child table:", int(count))
# define data to insert
data_to_insert = [
(1016, 10, 'Child B of Parent 10'),
(1017, 10, 'Child C of Parent 10'),
(1018, 20, 'Child D of Parent 20'),
@ -53,56 +63,52 @@ data_to_insert = [
(1020, 30, 'Child D of Parent 40'),
(1021, 60, 'Child A of Parent 60'), # parent does not exist
(1022, 40, 'Child F of Parent 40'),
]
]
print("Number of rows to insert:", len(data_to_insert))
# retrieve the number of rows in the table
cursor.execute("""
select count(*)
from ChildTable""")
count, = cursor.fetchone()
print("number of rows in child table:", int(count))
print("number of rows to insert:", len(data_to_insert))
# old method: executemany() with data errors results in stoppage after the
# first error takes place; the row count is updated to show how many rows
# actually succeeded
try:
# old method: executemany() with data errors results in stoppage after the
# first error takes place; the row count is updated to show how many rows
# actually succeeded
try:
cursor.executemany("insert into ChildTable values (:1, :2, :3)",
data_to_insert)
except oracledb.DatabaseError as e:
except oracledb.DatabaseError as e:
error, = e.args
print("FAILED with error:", error.message)
print("number of rows which succeeded:", cursor.rowcount)
print("Failure with error:", error.message)
print("Number of rows successfully inserted:", cursor.rowcount)
# demonstrate that the row count is accurate
cursor.execute("""
# demonstrate that the row count is accurate
cursor.execute("""
select count(*)
from ChildTable""")
count, = cursor.fetchone()
print("number of rows in child table after failed insert:", int(count))
count, = cursor.fetchone()
print("Number of rows in child table after failed insert:", int(count))
# roll back so we can perform the same work using the new method
connection.rollback()
# roll back so we can perform the same work using the new method
connection.rollback()
# new method: executemany() with batch errors enabled (and array DML row counts
# also enabled) results in no immediate error being raised
cursor.executemany("insert into ChildTable values (:1, :2, :3)",
data_to_insert, batcherrors=True, arraydmlrowcounts=True)
# new method: executemany() with batch errors enabled (and array DML row
# counts also enabled) results in no immediate error being raised
cursor.executemany("insert into ChildTable values (:1, :2, :3)",
data_to_insert, batcherrors=True,
arraydmlrowcounts=True)
# where errors have taken place, the row count is 0; otherwise it is 1
row_counts = cursor.getarraydmlrowcounts()
print("Array DML row counts:", row_counts)
# display the errors that have taken place
errors = cursor.getbatcherrors()
print("number of errors which took place:", len(errors))
for error in errors:
# display the errors that have taken place
errors = cursor.getbatcherrors()
print("Number of rows with bad values:", len(errors))
for error in errors:
print("Error", error.message.rstrip(), "at row offset", error.offset)
# demonstrate that all of the rows without errors have been successfully
# inserted
cursor.execute("""
# arraydmlrowcounts also shows rows with invalid data: they have a row
# count of 0; otherwise 1 is shown
row_counts = cursor.getarraydmlrowcounts()
print("Array DML row counts:", row_counts)
# demonstrate that all of the rows without errors have been successfully
# inserted
cursor.execute("""
select count(*)
from ChildTable""")
count, = cursor.fetchone()
print("number of rows in child table after successful insert:", int(count))
count, = cursor.fetchone()
print("Number of rows in child table after insert with batcherrors "
"enabled:", int(count))

View File

@ -25,7 +25,7 @@
#------------------------------------------------------------------------------
# bind_insert.py
#
# Demonstrates how to insert a row into a table using bind variables.
# Demonstrates how to insert rows into a table using bind variables.
#------------------------------------------------------------------------------
import oracledb
@ -35,7 +35,9 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
#------------------------------------------------------------------------------
# "Bind by position"
@ -51,13 +53,14 @@ rows = [
(7, "Seventh")
]
cursor = connection.cursor()
with connection.cursor() as cursor:
# predefine maximum string size to avoid data scans and memory reallocations;
# the None value indicates that the default processing can take place
cursor.setinputsizes(None, 20)
# predefine the maximum string size to avoid data scans and memory
# reallocations. The value 'None' indicates that the default processing
# can take place
cursor.setinputsizes(None, 20)
cursor.executemany("insert into mytab(id, data) values (:1, :2)", rows)
cursor.executemany("insert into mytab(id, data) values (:1, :2)", rows)
#------------------------------------------------------------------------------
# "Bind by name"
@ -66,15 +69,17 @@ cursor.executemany("insert into mytab(id, data) values (:1, :2)", rows)
rows = [
{"d": "Eighth", "i": 8},
{"d": "Ninth", "i": 9},
{"d": "Tenth", "i": 10}
{"d": "Tenth", "i": 10},
{"i": 11} # Insert a NULL value
]
cursor = connection.cursor()
with connection.cursor() as cursor:
# Predefine maximum string size to avoid data scans and memory reallocations
cursor.setinputsizes(d=20)
# Predefine maximum string size to avoid data scans and memory
# reallocations
cursor.setinputsizes(d=20)
cursor.executemany("insert into mytab(id, data) values (:i, :d)", rows)
cursor.executemany("insert into mytab(id, data) values (:i, :d)", rows)
#------------------------------------------------------------------------------
# Inserting a single bind still needs tuples
@ -85,15 +90,16 @@ rows = [
("Twelth",)
]
cursor = connection.cursor()
cursor.executemany("insert into mytab(id, data) values (11, :1)", rows)
with connection.cursor() as cursor:
cursor.executemany("insert into mytab(id, data) values (12, :1)", rows)
# Don't commit - this lets the demo be run multiple times
# connection.commit()
#------------------------------------------------------------------------------
# Now query the results back
#------------------------------------------------------------------------------
# Don't commit - this lets the demo be run multiple times
# connection.commit()
for row in cursor.execute('select * from mytab'):
with connection.cursor() as cursor:
for row in cursor.execute("select * from mytab order by id"):
print(row)

View File

@ -25,12 +25,13 @@
#------------------------------------------------------------------------------
# bind_query.py
#
# Demonstrates how to perform a simple query limiting the rows retrieved using
# a bind variable. Since the query that is executed is identical, no additional
# parsing is required, thereby reducing overhead and increasing performance. It
# also permits data to be bound without having to be concerned about escaping
# special characters or SQL injection attacks.
#------------------------------------------------------------------------------
# Demonstrates the use of bind variables in queries. Binding is important for
# scalability and security. Since the text of a query that is re-executed is
# unchanged, no additional parsing is required, thereby reducing overhead and
# increasing performance. It also permits data to be bound without having to be
# concerned about escaping special characters, or be concerned about SQL
# injection attacks.
##------------------------------------------------------------------------------
import oracledb
import sample_env
@ -39,16 +40,110 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
sql = 'select * from SampleQueryTab where id = :bvid'
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
print("Query results with id = 4")
for row in cursor.execute(sql, bvid = 4):
print(row)
print()
# Bind by position with lists
with connection.cursor() as cursor:
print("Query results with id = 1")
for row in cursor.execute(sql, bvid = 1):
print("1. Bind by position: single value list")
sql = 'select * from SampleQueryTab where id = :bvid'
for row in cursor.execute(sql, [1]):
print(row)
print()
print()
print("2. Bind by position: multiple values")
sql = 'select * from SampleQueryTab where id = :bvid and 123 = :otherbind'
for row in cursor.execute(sql, [2, 123]):
print(row)
print()
# With bind-by-position, the order of the data in the bind list matches the
# order of the placeholders used in the SQL statement. The bind list data
# order is not associated by the name of the bind variable placeholders in
# the SQL statement, even though those names are ":1" and ":2".
print("3. Bind by position: multiple values with numeric placeholder names")
sql = 'select * from SampleQueryTab where id = :2 and 456 = :1'
for row in cursor.execute(sql, [3, 456]):
print(row)
print()
# With bind-by-position, repeated use of bind placeholder names in the SQL
# statement requires the input list data to be repeated.
print("4. Bind by position: multiple values with a repeated placeholder")
sql = 'select * from SampleQueryTab where id = :2 and 3 = :2'
for row in cursor.execute(sql, [3, 3]):
print(row)
print()
# Bind by position with tuples
with connection.cursor() as cursor:
print("5. Bind by position with single value tuple")
sql = 'select * from SampleQueryTab where id = :bvid'
for row in cursor.execute(sql, (4,)):
print(row)
print()
print("6. Bind by position with a multiple value tuple")
sql = 'select * from SampleQueryTab where id = :bvid and 789 = :otherbind'
for row in cursor.execute(sql, (4,789)):
print(row)
print()
# Bind by name with a dictionary
with connection.cursor() as cursor:
print("7. Bind by name with a dictionary")
sql = 'select * from SampleQueryTab where id = :bvid'
for row in cursor.execute(sql, {"bvid": 4}):
print(row)
print()
# With bind-by-name, repeated use of bind placeholder names in the SQL
# statement lets you supply the data once.
print("8. Bind by name with multiple value dict and repeated placeholders")
sql = 'select * from SampleQueryTab where id = :bvid and 4 = :bvid'
for row in cursor.execute(sql, {"bvid": 4}):
print(row)
print()
# Bind by name with parameters. The execute() parameter names match the bind
# variable placeholder names.
with connection.cursor() as cursor:
print("9. Bind by name using parameters")
sql = 'select * from SampleQueryTab where id = :bvid'
for row in cursor.execute(sql, bvid=5):
print(row)
print()
print("10. Bind by name using multiple parameters")
sql = 'select * from SampleQueryTab where id = :bvid and 101 = :otherbind'
for row in cursor.execute(sql, bvid=5, otherbind=101):
print(row)
print()
# With bind-by-name, repeated use of bind placeholder names in the SQL
# statement lets you supply the data once.
print("11. Bind by name: multiple values with repeated placeholder names")
sql = 'select * from SampleQueryTab where id = :bvid and 6 = :bvid'
for row in cursor.execute(sql, bvid=6):
print(row)
print()
# Rexcuting a query with different data values
with connection.cursor() as cursor:
sql = 'select * from SampleQueryTab where id = :bvid'
print("12. Query results with id = 7")
for row in cursor.execute(sql, [4]):
print(row)
print()
print("13. Rexcuted query results with id = 1")
for row in cursor.execute(sql, [1]):
print(row)
print()

View File

@ -57,40 +57,44 @@ PAYLOAD_DATA = [
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
# connect to database
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# create queue
queue = connection.queue(QUEUE_NAME)
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# create a queue
with connection.cursor() as cursor:
queue = connection.queue(QUEUE_NAME)
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
while queue.deqone():
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
while queue.deqone():
pass
# enqueue a few messages
print("Enqueuing messages...")
batch_size = 6
data_to_enqueue = PAYLOAD_DATA
while data_to_enqueue:
with connection.cursor() as cursor:
print("Enqueuing messages...")
batch_size = 6
data_to_enqueue = PAYLOAD_DATA
while data_to_enqueue:
batch_data = data_to_enqueue[:batch_size]
data_to_enqueue = data_to_enqueue[batch_size:]
messages = [connection.msgproperties(payload=d) for d in batch_data]
for data in batch_data:
print(data)
queue.enqmany(messages)
connection.commit()
connection.commit()
# dequeue the messages
print("\nDequeuing messages...")
batch_size = 8
while True:
with connection.cursor() as cursor:
print("\nDequeuing messages...")
batch_size = 8
while True:
messages = queue.deqmany(batch_size)
if not messages:
break
for props in messages:
print(props.payload.decode())
connection.commit()
print("\nDone.")
connection.commit()
print("\nDone.")

View File

@ -39,26 +39,30 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
connection.call_timeout = 2000
print("Call timeout set at", connection.call_timeout, "milliseconds...")
cursor = connection.cursor()
cursor.execute("select sysdate from dual")
today, = cursor.fetchone()
print("Fetch of current date before timeout:", today)
with connection.cursor() as cursor:
# dbms_session.sleep() replaces dbms_lock.sleep() from Oracle Database 18c
sleep_proc_name = "dbms_session.sleep" \
cursor.execute("select sysdate from dual")
today, = cursor.fetchone()
print("Fetch of current date before timeout:", today)
# dbms_session.sleep() replaces dbms_lock.sleep() from Oracle Database 18c
sleep_proc_name = "dbms_session.sleep" \
if int(connection.version.split(".")[0]) >= 18 \
else "dbms_lock.sleep"
print("Sleeping...should time out...")
try:
print("Sleeping...should time out...")
try:
cursor.callproc(sleep_proc_name, (3,))
except oracledb.DatabaseError as e:
except oracledb.DatabaseError as e:
print("ERROR:", e)
cursor.execute("select sysdate from dual")
today, = cursor.fetchone()
print("Fetch of current date after timeout:", today)
cursor.execute("select sysdate from dual")
today, = cursor.fetchone()
print("Fetch of current date after timeout:", today)

View File

@ -33,7 +33,7 @@
# features help protect against dead connections, and also aid use of Oracle
# Database features such as FAN and Application Continuity.
#
# Install Flask with:
# To run this sample, install Flask with:
# pip install --upgrade Flask
#
# The default route will display a welcome message:
@ -44,6 +44,7 @@
#
# To insert new a user 'fred' you can call:
# http://127.0.0.1:8080/post/fred
#
#------------------------------------------------------------------------------
import os
@ -117,8 +118,7 @@ def create_schema():
id number generated by default as identity,
username varchar2(40))';
execute immediate
'insert into demo (username) values (''chris'')';
insert into demo (username) values ('chris');
commit;
end;""")

View File

@ -70,8 +70,11 @@ def callback(message):
print("-" * 60)
print("=" * 60)
connection = oracledb.connect(sample_env.get_main_connect_string(),
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(),
events=True)
qos = oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS
sub = connection.subscribe(callback=callback, timeout=1800, qos=qos)
print("Subscription:", sub)

View File

@ -23,12 +23,12 @@
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# cqn2.py
# cqn_pool.py
#
# Demonstrates using continuous query notification in Python, a feature that is
# available in Oracle 11g and later. Once this script is running, use another
# session to insert, update or delete rows from the table TestTempTable and you
# will see the notification of that change.
# available in Oracle Database 11g and later. Once this script is running, use
# another session to insert, update or delete rows from the table TestTempTable
# and you will see the notification of that change.
#
# This script differs from cqn.py in that it shows how a connection can be
# acquired from a session pool and used to query the changes that have been
@ -82,8 +82,9 @@ def callback(message):
pool = oracledb.create_pool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(), min=2, max=5,
increment=1, events=True)
dsn=sample_env.get_connect_string(),
min=1, max=4, increment=1, events=True)
with pool.acquire() as connection:
qos = oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_ROWIDS
sub = connection.subscribe(callback=callback, timeout=1800, qos=qos)

View File

@ -67,8 +67,11 @@ def callback(message):
print("-" * 60)
print("=" * 60)
connection = oracledb.connect(sample_env.get_main_connect_string(),
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string(),
events=True)
sub = connection.subscribe(callback=callback, timeout=1800,
qos=oracledb.SUBSCR_QOS_ROWIDS)
print("Subscription:", sub)

View File

@ -36,30 +36,33 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# enable DBMS_OUTPUT
cursor.callproc("dbms_output.enable")
with connection.cursor() as cursor:
# execute some PL/SQL that generates output with DBMS_OUTPUT.PUT_LINE
cursor.execute("""
# enable DBMS_OUTPUT
cursor.callproc("dbms_output.enable")
# execute some PL/SQL that generates output with DBMS_OUTPUT.PUT_LINE
cursor.execute("""
begin
dbms_output.put_line('This is the oracledb manual');
dbms_output.put_line('This is some text');
dbms_output.put_line('');
dbms_output.put_line('Demonstrating use of DBMS_OUTPUT');
end;""")
# tune this size for your application
chunk_size = 10
# tune this size for your application
chunk_size = 10
# create variables to hold the output
lines_var = cursor.arrayvar(str, chunk_size)
num_lines_var = cursor.var(int)
num_lines_var.setvalue(0, chunk_size)
# create variables to hold the output
lines_var = cursor.arrayvar(str, chunk_size)
num_lines_var = cursor.var(int)
num_lines_var.setvalue(0, chunk_size)
# fetch the text that was added by PL/SQL
while True:
# fetch the text that was added by PL/SQL
while True:
cursor.callproc("dbms_output.get_lines", (lines_var, num_lines_var))
num_lines = num_lines_var.getvalue()
lines = lines_var.getvalue()[:num_lines]

View File

@ -41,27 +41,32 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
# truncate table first so that script can be rerun
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
print("Truncating table...")
cursor.execute("truncate table TestTempTable")
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# populate table with a few rows
for i in range(5):
with connection.cursor() as cursor:
# truncate table first so that script can be rerun
print("Truncating table...")
cursor.execute("truncate table TestTempTable")
# populate table with a few rows
for i in range(5):
data = (i + 1, "Test String #%d" % (i + 1))
print("Adding row", data)
cursor.execute("insert into TestTempTable values (:1, :2)", data)
# now delete them and use DML returning to return the data that was inserted
int_col = cursor.var(int)
string_col = cursor.var(str)
print("Deleting data with DML returning...")
cursor.execute("""
# now delete them and use DML returning to return the data that was
# deleted
int_col = cursor.var(int)
string_col = cursor.var(str)
print("Deleting data with DML returning...")
cursor.execute("""
delete from TestTempTable
returning IntCol, StringCol into :int_col, :string_col""",
int_col=int_col,
string_col=string_col)
print("Data returned:")
for int_val, string_val in zip(int_col.getvalue(), string_col.getvalue()):
print("Data returned:")
for int_val, string_val in zip(int_col.getvalue(), string_col.getvalue()):
print(tuple([int_val, string_val]))

View File

@ -1,64 +0,0 @@
#------------------------------------------------------------------------------
# Copyright (c) 2016, 2022, Oracle and/or its affiliates.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta,
# Canada. All rights reserved.
#
# This software is dual-licensed to you under the Universal Permissive License
# (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
# 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
# either license.
#
# If you elect to accept the software under the Apache License, Version 2.0,
# the following applies:
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# drcp.py
#
# Demonstrates the use of Database Resident Connection Pooling (DRCP) which
# provides a connection pool in the database server, thereby reducing the cost
# of creating and tearing down client connections. The pool can be started and
# stopped in the database by issuing the following commands in SQL*Plus:
#
# exec dbms_connection_pool.start_pool()
# exec dbms_connection_pool.stop_pool()
#
# Statistics regarding the pool can be acquired from the following query:
#
# select * from v$cpool_cc_stats;
#
# There is no difference in how a connection is used once it has been
# established.
#
# DRCP has most benefit when used in conjunction with a local connection pool,
# see the python-oracledb documentation.
#------------------------------------------------------------------------------
import oracledb
import sample_env
# determine whether to use python-oracledb thin mode or thick mode
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
conn = oracledb.connect(sample_env.get_drcp_connect_string(),
cclass="PYCLASS", purity=oracledb.ATTR_PURITY_SELF)
cursor = conn.cursor()
print("Performing query using DRCP...")
for row in cursor.execute("select * from TestNumbers order by IntCol"):
print(row)

218
samples/drcp_pool.py Normal file
View File

@ -0,0 +1,218 @@
#------------------------------------------------------------------------------
# Copyright (c) 2022, Oracle and/or its affiliates.
#
# This software is dual-licensed to you under the Universal Permissive License
# (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
# 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
# either license.
#
# If you elect to accept the software under the Apache License, Version 2.0,
# the following applies:
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#------------------------------------------------------------------------------
#------------------------------------------------------------------------------
# drcp_pool.py
#
# Demonstrates the use of Database Resident Connection Pooling (DRCP)
# connection pooling using a Flask web application. This sample is similar to
# connection_pool.py
#
# DRCP can be used with standalone connections from connect(), but it is often
# used together with a python-oracledb connection pool created with
# create_pool(), as shown here.
#
# DRCP provides a connection pool in the database server. The pool reduces the
# cost of creating and tearing down database server processs. This pool of
# server processes can be shared across application processs, allowing for
# resource sharing.
#
# There is no difference in how a connection is used once it has been
# established.
#
# To use DRCP, the connection string should request the database use a pooled
# server. For example, "localhost/orclpdb:pool". It is best practice for
# connections to specify a connection class and server purity when creating
# a pool
#
# For on premise databases, the DRCP pool can be started and stopped in the
# database by issuing the following commands in SQL*Plus:
#
# exec dbms_connection_pool.start_pool()
# exec dbms_connection_pool.stop_pool()
#
# For multitenant databases, DRCP management needs to be done the root ("CDB")
# database unless the database initialization parameter ENABLE_PER_PDB_DRCP is
# TRUE.
#
# Oracle Autonomous Databases already have DRCP enabled.
#
# Statistics on DRCP usage are recorded in various data dictionary views, for
# example in V$CPOOL_CC_STATS.
#
# See the python-oracledb documentation for more information on DRCP.
#
# To run this sample, install Flask with:
# pip install --upgrade Flask
#
# The default route will display a welcome message:
# http://127.0.0.1:8080/
#
# To find a username you can pass an id, for example 1:
# http://127.0.0.1:8080/user/1
#
# To insert new a user 'fred' you can call:
# http://127.0.0.1:8080/post/fred
#
# Multi-user load can be simulated with a testing tool such as 'ab':
#
# ab -n 1000 -c 4 http://127.0.0.1:8080/user/1
#
# Then you can query the data dictionary:
#
# select cclass_name, num_requests, num_hits,
# num_misses, num_waits, num_authentications
# from v$cpool_cc_stats;
#
# Output will be like:
#
# CCLASS_NAME NUM_REQUESTS NUM_HITS NUM_MISSES NUM_WAITS NUM_AUTHENTICATIONS
# ---------------- ------------ -------- ---------- --------- -------------------
# PYTHONDEMO.MYAPP 1001 997 4 0 4
#
#------------------------------------------------------------------------------
from flask import Flask
import os
import oracledb
import sample_env
# Port to listen on
port = int(os.environ.get('PORT', '8080'))
# determine whether to use python-oracledb thin mode or thick mode
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
#------------------------------------------------------------------------------
# start_pool(): starts the connection pool
def start_pool():
# Generally a fixed-size pool is recommended, i.e. pool_min=pool_max. Here
# the pool contains 4 connections, which will allow 4 concurrent users.
pool_min = 4
pool_max = 4
pool_inc = 0
pool = oracledb.create_pool(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_drcp_connect_string(),
min=pool_min, max=pool_max, increment=pool_inc,
session_callback=init_session,
cclass="MYAPP",
purity=oracledb.ATTR_PURITY_SELF)
return pool
# init_session(): a 'session callback' to efficiently set any initial state
# that each connection should have.
#
# This particular demo doesn't use dates, so sessionCallback could be omitted,
# but it does show the kinds of settings many apps would use.
#
# If you have multiple SQL statements, then call them all in a PL/SQL anonymous
# block with BEGIN/END so you only use execute() once. This is shown later in
# create_schema().
#
def init_session(connection, requestedTag_ignored):
with connection.cursor() as cursor:
cursor.execute("""
alter session set
time_zone = 'UTC'
nls_date_format = 'YYYY-MM-DD HH24:MI'""")
#------------------------------------------------------------------------------
# create_schema(): drop and create the demo table, and add a row
def create_schema():
with pool.acquire() as connection:
with connection.cursor() as cursor:
cursor.execute("""
begin
begin
execute immediate 'drop table demo';
exception when others then
if sqlcode <> -942 then
raise;
end if;
end;
execute immediate 'create table demo (
id number generated by default as identity,
username varchar2(40))';
insert into demo (username) values ('chris');
commit;
end;""")
#------------------------------------------------------------------------------
app = Flask(__name__)
# Display a welcome message on the 'home' page
@app.route('/')
def index():
return "Welcome to the demo app"
# Add a new username
#
# The new user's id is generated by the database and returned in the OUT bind
# variable 'idbv'.
@app.route('/post/<string:username>')
def post(username):
with pool.acquire() as connection:
with connection.cursor() as cursor:
connection.autocommit = True
idbv = cursor.var(int)
cursor.execute("""
insert into demo (username)
values (:unbv)
returning id into :idbv""", [username, idbv])
return f'Inserted {username} with id {idbv.getvalue()[0]}'
# Show the username for a given id
@app.route('/user/<int:id>')
def show_username(id):
with pool.acquire() as connection:
with connection.cursor() as cursor:
cursor.execute("select username from demo where id = :idbv", [id])
r = cursor.fetchone()
return r[0] if r is not None else "Unknown user id"
#------------------------------------------------------------------------------
if __name__ == '__main__':
# Start a pool of connections
pool = start_pool()
# Create a demo table
create_schema()
# Start a webserver
app.run(port=port)

View File

@ -57,15 +57,18 @@ class Cursor(oracledb.Cursor):
return result
# create new subclassed connection and cursor
connection = Connection(sample_env.get_main_connect_string())
cursor = connection.cursor()
# create a new subclassed connection and cursor
connection = Connection(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# the names are now available directly for each query executed
for row in cursor.execute("select ParentId, Description from ParentTable"):
with connection.cursor() as cursor:
# the names are now available directly for each query executed
for row in cursor.execute("select ParentId, Description from ParentTable"):
print(row.PARENTID, "->", row.DESCRIPTION)
print()
print()
for row in cursor.execute("select ChildId, Description from ChildTable"):
for row in cursor.execute("select ChildId, Description from ChildTable"):
print(row.CHILDID, "->", row.DESCRIPTION)
print()
print()

View File

@ -42,11 +42,14 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# use PL/SQL block to return two cursors
cursor.execute("""
with connection.cursor() as cursor:
# A PL/SQL block that returns two cursors
cursor.execute("""
declare
c1 sys_refcursor;
c2 sys_refcursor;
@ -64,8 +67,8 @@ cursor.execute("""
end;""")
# display results
for ix, result_set in enumerate(cursor.getimplicitresults()):
# display results
for ix, result_set in enumerate(cursor.getimplicitresults()):
print("Result Set #" + str(ix + 1))
for row in result_set:
print(row)

View File

@ -40,8 +40,11 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# create and populate Oracle objects
connection = oracledb.connect(sample_env.get_main_connect_string())
type_obj = connection.gettype("MDSYS.SDO_GEOMETRY")
element_info_type_obj = connection.gettype("MDSYS.SDO_ELEM_INFO_ARRAY")
ordinate_type_obj = connection.gettype("MDSYS.SDO_ORDINATE_ARRAY")
@ -53,25 +56,27 @@ obj.SDO_ORDINATES = ordinate_type_obj.newobject()
obj.SDO_ORDINATES.extend([1, 1, 5, 7])
print("Created object", obj)
# create table, if necessary
cursor = connection.cursor()
cursor.execute("""
select count(*)
from user_tables
where table_name = 'TESTGEOMETRY'""")
count, = cursor.fetchone()
if count == 0:
print("Creating table...")
cursor.execute("""
create table TestGeometry (
IntCol number(9) not null,
Geometry MDSYS.SDO_GEOMETRY not null
)""")
with connection.cursor() as cursor:
# remove all existing rows and then add a new one
print("Removing any existing rows...")
cursor.execute("delete from TestGeometry")
print("Adding row to table...")
cursor.execute("insert into TestGeometry values (1, :obj)", obj=obj)
connection.commit()
print("Success!")
# create sample table
cursor.execute("""
begin
begin
execute immediate 'drop table TestGeometry';
exception
when others then
if sqlcode <> -942 then
raise;
end if;
end;
execute immediate 'create table TestGeometry (
IntCol number(9) not null,
Geometry MDSYS.SDO_GEOMETRY)';
end;""")
print("Adding row to table...")
cursor.execute("insert into TestGeometry values (1, :objbv)", objbv=obj)
connection.commit()
print("Success!")

View File

@ -46,7 +46,10 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
if not connection.thin:
client_version = oracledb.clientversion()[0]
db_version = int(connection.version.split(".")[0])
@ -55,46 +58,46 @@ db_version = int(connection.version.split(".")[0])
if db_version < 12:
sys.exit("This example requires Oracle Database 12.1.0.2 or later")
cursor = connection.cursor()
# Insert JSON data
with connection.cursor() as cursor:
data = dict(name="Rod", dept="Sales", location="Germany")
inssql = "insert into CustomersAsBlob values (:1, :2)"
data = dict(name="Rod", dept="Sales", location="Germany")
inssql = "insert into CustomersAsBlob values (:1, :2)"
if not connection.thin and client_version >= 21 and db_version >= 21:
if not connection.thin and client_version >= 21 and db_version >= 21:
# Take advantage of direct binding
cursor.setinputsizes(None, oracledb.DB_TYPE_JSON)
cursor.execute(inssql, [1, data])
else:
else:
# Insert the data as a JSON string
cursor.execute(inssql, [1, json.dumps(data)])
# Select JSON data
with connection.cursor() as cursor:
sql = "SELECT c.json_data FROM CustomersAsBlob c"
for j, in cursor.execute(sql):
sql = "SELECT c.json_data FROM CustomersAsBlob c"
for j, in cursor.execute(sql):
print(json.loads(j.read()))
# Using JSON_VALUE to extract a value from a JSON column
# Using JSON_VALUE to extract a value from a JSON column
sql = """SELECT JSON_VALUE(json_data, '$.location')
sql = """SELECT JSON_VALUE(json_data, '$.location')
FROM CustomersAsBlob
OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY"""
for r in cursor.execute(sql):
for r in cursor.execute(sql):
print(r)
# Using dot-notation to extract a value from a JSON (BLOB storage) column
# Using dot-notation to extract a value from a JSON (BLOB storage) column
sql = """SELECT c.json_data.location
sql = """SELECT c.json_data.location
FROM CustomersAsBlob c
OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY"""
for j, in cursor.execute(sql):
for j, in cursor.execute(sql):
print(j)
# Using JSON_OBJECT to extract relational data as JSON
# Using JSON_OBJECT to extract relational data as JSON
sql = """SELECT JSON_OBJECT('key' IS d.dummy) dummy
sql = """SELECT JSON_OBJECT('key' IS d.dummy) dummy
FROM dual d"""
for r in cursor.execute(sql):
for r in cursor.execute(sql):
print(r)

View File

@ -44,63 +44,65 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
client_version = oracledb.clientversion()[0]
db_version = int(connection.version.split(".")[0])
# this script only works with Oracle Database 21
if db_version < 21:
sys.exit("This example requires Oracle Database 21.1 or later. "
"Try json_blob.py")
cursor = connection.cursor()
# Insert JSON data
with connection.cursor() as cursor:
data = dict(name="Rod", dept="Sales", location="Germany")
inssql = "insert into CustomersAsJson values (:1, :2)"
if client_version >= 21:
data = dict(name="Rod", dept="Sales", location="Germany")
inssql = "insert into CustomersAsJson values (:1, :2)"
if client_version >= 21:
# Take advantage of direct binding
cursor.setinputsizes(None, oracledb.DB_TYPE_JSON)
cursor.execute(inssql, [1, data])
else:
else:
# Insert the data as a JSON string
cursor.execute(inssql, [1, json.dumps(data)])
# Select JSON data
with connection.cursor() as cursor:
sql = "SELECT c.json_data FROM CustomersAsJson c"
if client_version >= 21:
sql = "select c.json_data from CustomersAsJson c"
if client_version >= 21:
for j, in cursor.execute(sql):
print(j)
else:
else:
for j, in cursor.execute(sql):
print(json.loads(j.read()))
# Using JSON_VALUE to extract a value from a JSON column
# Using JSON_VALUE to extract a value from a JSON column
sql = """SELECT JSON_VALUE(json_data, '$.location')
FROM CustomersAsJson
OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY"""
for r in cursor.execute(sql):
sql = """select json_value(json_data, '$.location')
from CustomersAsJson
offset 0 rows fetch next 1 rows only"""
for r in cursor.execute(sql):
print(r)
# Using dot-notation to extract a value from a JSON column
# Using dot-notation to extract a value from a JSON column
sql = """SELECT c.json_data.location
FROM CustomersAsJson c
OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY"""
if client_version >= 21:
sql = """select c.json_data.location
from CustomersAsJson c
offset 0 rows fetch next 1 rows only"""
if client_version >= 21:
for j, in cursor.execute(sql):
print(j)
else:
else:
for j, in cursor.execute(sql):
print(json.loads(j.read()))
# Using JSON_OBJECT to extract relational data as JSON
# Using JSON_OBJECT to extract relational data as JSON
sql = """SELECT JSON_OBJECT('key' IS d.dummy) dummy
FROM dual d"""
for r in cursor.execute(sql):
sql = """select json_object('key' is d.dummy) dummy
from dual d"""
for r in cursor.execute(sql):
print(r)

View File

@ -48,10 +48,13 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
conn = oracledb.connect(sample_env.get_main_connect_string())
if not conn.thin:
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
if not connection.thin:
client_version = oracledb.clientversion()[0]
db_version = int(conn.version.split(".")[0])
db_version = int(connection.version.split(".")[0])
# Minimum database vesion is 21
if db_version < 21:
@ -68,29 +71,27 @@ def type_handler(cursor, name, default_type, size, precision, scale):
return cursor.var(default_type, arraysize=cursor.arraysize,
outconverter=lambda v: json.loads(v.read()))
cursor = conn.cursor()
# Insert JSON data into a JSON column
data = [
with connection.cursor() as cursor:
data = [
(1, dict(name="Rod", dept="Sales", location="Germany")),
(2, dict(name="George", dept="Marketing", location="Bangalore")),
(3, dict(name="Sam", dept="Sales", location="Mumbai")),
(4, dict(name="Jill", dept="Marketing", location="Germany"))
]
insert_sql = "insert into CustomersAsJson values (:1, :2)"
if not conn.thin and client_version >= 21:
]
insert_sql = "insert into CustomersAsJson values (:1, :2)"
if not connection.thin and client_version >= 21:
# Take advantage of direct binding
cursor.setinputsizes(None, oracledb.DB_TYPE_JSON)
cursor.executemany(insert_sql, data)
else:
else:
# Insert the data as a JSON string
cursor.executemany(insert_sql, [(i, json.dumps(j)) for i, j in data])
# Select JSON data from a JSON column
if conn.thin or client_version < 21:
with connection.cursor() as cursor:
if connection.thin or client_version < 21:
cursor.outputtypehandler = type_handler
for row in cursor.execute("select * from CustomersAsJson"):
for row in cursor.execute("select * from CustomersAsJson"):
print(row)

View File

@ -35,45 +35,48 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
row1 = [1, "First"]
row2 = [2, "Second"]
with connection.cursor() as cursor:
# insert a couple of rows and retain the rowid of each
cursor = connection.cursor()
cursor.execute("insert into mytab (id, data) values (:1, :2)", row1)
rowid1 = cursor.lastrowid
print("Row 1:", row1)
print("Rowid 1:", rowid1)
print()
# insert a couple of rows and retain the rowid of each
row1 = [1, "First"]
row2 = [2, "Second"]
cursor.execute("insert into mytab (id, data) values (:1, :2)", row2)
rowid2 = cursor.lastrowid
print("Row 2:", row2)
print("Rowid 2:", rowid2)
print()
cursor.execute("insert into mytab (id, data) values (:1, :2)", row1)
rowid1 = cursor.lastrowid
print("Row 1:", row1)
print("Rowid 1:", rowid1)
print()
# the row can be fetched with the rowid that was retained
cursor.execute("select id, data from mytab where rowid = :1", [rowid1])
print("Row 1:", cursor.fetchone())
cursor.execute("select id, data from mytab where rowid = :1", [rowid2])
print("Row 2:", cursor.fetchone())
print()
cursor.execute("insert into mytab (id, data) values (:1, :2)", row2)
rowid2 = cursor.lastrowid
print("Row 2:", row2)
print("Rowid 2:", rowid2)
print()
# updating multiple rows only returns the rowid of the last updated row
cursor.execute("update mytab set data = data || ' (Modified)'")
cursor.execute("select id, data from mytab where rowid = :1",
# the row can be fetched with the rowid that was returned
cursor.execute("select id, data from mytab where rowid = :1", [rowid1])
print("Row 1:", cursor.fetchone())
cursor.execute("select id, data from mytab where rowid = :1", [rowid2])
print("Row 2:", cursor.fetchone())
print()
# updating multiple rows only returns the rowid of the last updated row
cursor.execute("update mytab set data = data || ' (Modified)'")
cursor.execute("select id, data from mytab where rowid = :1",
[cursor.lastrowid])
print("Last updated row:", cursor.fetchone())
print("Last updated row:", cursor.fetchone())
# deleting multiple rows only returns the rowid of the last deleted row
cursor.execute("delete from mytab")
print("Rowid of last deleted row:", cursor.lastrowid)
# deleting multiple rows only returns the rowid of the last deleted row
cursor.execute("delete from mytab")
print("Rowid of last deleted row:", cursor.lastrowid)
# deleting no rows results in a value of None
cursor.execute("delete from mytab")
print("Rowid when no rows are deleted:", cursor.lastrowid)
# deleting no rows results in a value of None
cursor.execute("delete from mytab")
print("Rowid when no rows are deleted:", cursor.lastrowid)
# Don't commit - this lets us run the demo multiple times
#connection.commit()
# Don't commit - this lets us run the demo multiple times
#connection.commit()

View File

@ -49,41 +49,45 @@ PAYLOAD_DATA = [
]
# connect to database
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# create queue
# create a queue
queue = connection.queue(QUEUE_NAME)
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# enqueue a few messages
print("Enqueuing messages...")
for data in PAYLOAD_DATA:
with connection.cursor() as cursor:
print("Enqueuing messages...")
for data in PAYLOAD_DATA:
print(data)
queue.enqone(connection.msgproperties(payload=data))
connection.commit()
print()
connection.commit()
print()
# dequeue the messages for consumer A
print("Dequeuing the messages for consumer A...")
queue.deqoptions.consumername = "SUBSCRIBER_A"
while True:
with connection.cursor() as cursor:
print("Dequeuing the messages for consumer A...")
queue.deqoptions.consumername = "SUBSCRIBER_A"
while True:
props = queue.deqone()
if not props:
break
print(props.payload.decode())
connection.commit()
print()
connection.commit()
print()
# dequeue the message for consumer B
print("Dequeuing the messages for consumer B...")
queue.deqoptions.consumername = "SUBSCRIBER_B"
while True:
with connection.cursor() as cursor:
print("Dequeuing the messages for consumer B...")
queue.deqoptions.consumername = "SUBSCRIBER_B"
while True:
props = queue.deqone()
if not props:
break
print(props.payload.decode())
connection.commit()
connection.commit()
print("\nDone.")

View File

@ -52,10 +52,11 @@ BOOK_DATA = [
]
# connect to database
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# create queue
# create a queue
books_type = connection.gettype(BOOK_TYPE_NAME)
queue = connection.queue(QUEUE_NAME, payload_type=books_type)
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT

View File

@ -37,31 +37,34 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# create new empty object of the correct type
# note the use of a PL/SQL type defined in a package
# create a new empty object of the correct type.
# note the use of a PL/SQL type that is defined in a package
type_obj = connection.gettype("PKG_DEMO.UDT_STRINGLIST")
obj = type_obj.newobject()
# call the stored procedure which will populate the object
cursor = connection.cursor()
cursor.callproc("pkg_Demo.DemoCollectionOut", (obj,))
with connection.cursor() as cursor:
# show the indexes that are used by the collection
print("Indexes and values of collection:")
ix = obj.first()
while ix is not None:
cursor.callproc("pkg_Demo.DemoCollectionOut", (obj,))
# show the indexes that are used by the collection
print("Indexes and values of collection:")
ix = obj.first()
while ix is not None:
print(ix, "->", obj.getelement(ix))
ix = obj.next(ix)
print()
print()
# show the values as a simple list
print("Values of collection as list:")
print(obj.aslist())
print()
# show the values as a simple list
print("Values of collection as list:")
print(obj.aslist())
print()
# show the values as a simple dictionary
print("Values of collection as dictionary:")
print(obj.asdict())
print()
# show the values as a simple dictionary
print("Values of collection as dictionary:")
print(obj.asdict())
print()

View File

@ -35,8 +35,11 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
cursor = connection.cursor()
res = cursor.callfunc('myfunc', int, ('abc', 2))
print(res)
with connection.cursor() as cursor:
# The second parameter is the expected return type of the PL/SQL function
res = cursor.callfunc('myfunc', int, ('abc', 2))
print(res)

View File

@ -36,9 +36,11 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
cursor = connection.cursor()
myvar = cursor.var(int)
cursor.callproc('myproc', (123, myvar))
print(myvar.getvalue())
with connection.cursor() as cursor:
myvar = cursor.var(int)
cursor.callproc('myproc', (123, myvar))
print(myvar.getvalue())

View File

@ -25,7 +25,7 @@
#------------------------------------------------------------------------------
# plsql_record.py
#
# Demonstrates how to bind (in and out) a PL/SQL record.
# Demonstrates how to bind (IN and OUT) a PL/SQL record.
#
# This feature is only available in Oracle Database 12.1 and higher.
#------------------------------------------------------------------------------
@ -38,7 +38,9 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# create new object of the correct type
# note the use of a PL/SQL record defined in a package
@ -57,13 +59,14 @@ print("DATEVALUE ->", obj.DATEVALUE)
print("BOOLEANVALUE ->", obj.BOOLEANVALUE)
print()
# call the stored procedure which will modify the object
cursor = connection.cursor()
cursor.callproc("pkg_Demo.DemoRecordsInOut", (obj,))
with connection.cursor() as cursor:
# show the modified values
print("NUMBERVALUE ->", obj.NUMBERVALUE)
print("STRINGVALUE ->", obj.STRINGVALUE)
print("DATEVALUE ->", obj.DATEVALUE)
print("BOOLEANVALUE ->", obj.BOOLEANVALUE)
print()
# call the stored procedure which will modify the object
cursor.callproc("pkg_Demo.DemoRecordsInOut", (obj,))
# show the modified values
print("NUMBERVALUE ->", obj.NUMBERVALUE)
print("STRINGVALUE ->", obj.STRINGVALUE)
print("DATEVALUE ->", obj.DATEVALUE)
print("BOOLEANVALUE ->", obj.BOOLEANVALUE)
print()

View File

@ -25,7 +25,7 @@
#------------------------------------------------------------------------------
# query.py
#
# Demonstrates how to perform a query in different ways.
# Demonstrates different ways of fetching rows from a query.
#------------------------------------------------------------------------------
import oracledb
@ -35,36 +35,44 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
sql = """
select * from SampleQueryTab
sql = """select * from SampleQueryTab
where id < 6
order by id"""
print("Get all rows via iterator")
cursor = connection.cursor()
for result in cursor.execute(sql):
with connection.cursor() as cursor:
print("Get all rows via an iterator")
for result in cursor.execute(sql):
print(result)
print()
print()
print("Query one row at a time")
cursor.execute(sql)
row = cursor.fetchone()
print(row)
row = cursor.fetchone()
print(row)
print()
print("Fetch many rows")
cursor.execute(sql)
res = cursor.fetchmany(3)
print(res)
print()
print("Fetch each row as a Dictionary")
cursor.execute(sql)
columns = [col[0] for col in cursor.description]
cursor.rowfactory = lambda *args: dict(zip(columns, args))
for row in cursor:
print("Query one row at a time")
cursor.execute(sql)
row = cursor.fetchone()
print(row)
row = cursor.fetchone()
print(row)
print()
print("Fetch many rows")
cursor.execute(sql)
res = cursor.fetchmany(3)
print(res)
print()
print("Fetch all rows")
cursor.execute(sql)
res = cursor.fetchall()
print(res)
print()
print("Fetch each row as a Dictionary")
cursor.execute(sql)
columns = [col[0] for col in cursor.description]
cursor.rowfactory = lambda *args: dict(zip(columns, args))
for row in cursor:
print(row)

View File

@ -25,9 +25,14 @@
#------------------------------------------------------------------------------
# query_arraysize.py
#
# Demonstrates how to alter the array size and prefetch rows value on a cursor
# in order to reduce the number of network round trips and overhead required to
# fetch all of the rows from a large table.
# Demonstrates how to alter the array size and prefetch rows values in order to
# tune the performance of fetching data from the database. Increasing these
# values can reduce the number of network round trips and overhead required to
# fetch all of the rows from a large table. The value affect internal buffers
# and do not affect how, or when, rows are returned to your application.
#
# The best values need to be determined by tuning in your production
# environment.
#------------------------------------------------------------------------------
import time
@ -39,16 +44,40 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
start = time.time()
# Global values can be set to override the defaults used when a cursor is
# created
oracledb.defaults.prefetchrows = 200 # default is 2
oracledb.defaults.arraysize = 200 # default is 100
cursor = connection.cursor()
cursor.prefetchrows = 1000
cursor.arraysize = 1000
cursor.execute('select * from bigtab')
res = cursor.fetchall()
# print(res) # uncomment to display the query results
with connection.cursor() as cursor:
elapsed = (time.time() - start)
print("Retrieved", len(res), "rows in", elapsed, "seconds")
# Example 1
start = time.time()
cursor.execute('select * from bigtab')
res = cursor.fetchall()
elapsed = (time.time() - start)
print("Prefetchrows:", cursor.prefetchrows, "Arraysize:", cursor.arraysize)
print("Retrieved", len(res), "rows in", elapsed, "seconds")
# Example 2
start = time.time()
# values can be set per-cursor
cursor.prefetchrows = 1000
cursor.arraysize = 1000
cursor.execute('select * from bigtab')
res = cursor.fetchall()
# print(res) # uncomment to display the query results
elapsed = (time.time() - start)
print("Prefetchrows:", cursor.prefetchrows, "Arraysize:", cursor.arraysize)
print("Retrieved", len(res), "rows in", elapsed, "seconds")

View File

@ -44,26 +44,28 @@ def return_strings_as_bytes(cursor, name, default_type, size, precision,
if default_type == oracledb.DB_TYPE_VARCHAR:
return cursor.var(str, arraysize=cursor.arraysize, bypass_decode=True)
with oracledb.connect(sample_env.get_main_connect_string()) as conn:
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# truncate table and populate with our data of choice
with conn.cursor() as cursor:
# truncate table and populate with our data of choice
with connection.cursor() as cursor:
cursor.execute("truncate table TestTempTable")
cursor.execute("insert into TestTempTable values (1, :val)",
val=STRING_VAL)
conn.commit()
connection.commit()
# fetch the data normally and show that it is returned as a string
with conn.cursor() as cursor:
# fetch the data normally and show that it is returned as a string
with connection.cursor() as cursor:
cursor.execute("select IntCol, StringCol from TestTempTable")
print("Data fetched using normal technique:")
for row in cursor:
print(row)
print()
# fetch the data, bypassing the decode and show that it is returned as
# bytes
with conn.cursor() as cursor:
# fetch the data, bypassing the decode and show that it is returned as
# bytes
with connection.cursor() as cursor:
cursor.outputtypehandler = return_strings_as_bytes
cursor.execute("select IntCol, StringCol from TestTempTable")
print("Data fetched using bypass decode technique:")

View File

@ -48,33 +48,39 @@ PAYLOAD_DATA = [
"The fourth and final message"
]
# connect to database
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# create queue
queue = connection.queue(QUEUE_NAME)
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# create a queue
with connection.cursor() as cursor:
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
while queue.deqone():
queue = connection.queue(QUEUE_NAME)
queue.deqoptions.wait = oracledb.DEQ_NO_WAIT
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
while queue.deqone():
pass
# enqueue a few messages
print("Enqueuing messages...")
for data in PAYLOAD_DATA:
with connection.cursor() as cursor:
for data in PAYLOAD_DATA:
print(data)
queue.enqone(connection.msgproperties(payload=data))
connection.commit()
connection.commit()
# dequeue the messages
print("\nDequeuing messages...")
while True:
with connection.cursor() as cursor:
while True:
props = queue.deqone()
if not props:
break
print(props.payload.decode())
connection.commit()
print("\nDone.")
connection.commit()
print("\nDone.")

View File

@ -25,9 +25,11 @@
#------------------------------------------------------------------------------
# ref_cursor.py
#
# Demonstrates the use of REF cursors.
# Demonstrates the use of REF CURSORS.
#------------------------------------------------------------------------------
import time
import oracledb
import sample_env
@ -35,46 +37,70 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
ref_cursor = connection.cursor()
cursor.callproc("myrefcursorproc", (2, 6, ref_cursor))
print("Rows between 2 and 6:")
for row in ref_cursor:
with connection.cursor() as cursor:
ref_cursor = connection.cursor()
cursor.callproc("myrefcursorproc", (2, 6, ref_cursor))
print("Rows between 2 and 6:")
for row in ref_cursor:
print(row)
print()
print()
ref_cursor = connection.cursor()
cursor.callproc("myrefcursorproc", (8, 9, ref_cursor))
print("Rows between 8 and 9:")
for row in ref_cursor:
ref_cursor = connection.cursor()
cursor.callproc("myrefcursorproc", (8, 9, ref_cursor))
print("Rows between 8 and 9:")
for row in ref_cursor:
print(row)
print()
print()
#------------------------------------------------------------------------------
# Setting prefetchrows and arraysize of a REF cursor can improve performance
# when fetching a large number of rows (Tuned Fetch)
#------------------------------------------------------------------------------
#--------------------------------------------------------------------------
# Setting prefetchrows and arraysize of a REF CURSOR can improve
# performance when fetching a large number of rows by reducing network
# round-trips.
#--------------------------------------------------------------------------
# Truncate the table used for this demo
cursor.execute("truncate table TestTempTable")
# Truncate the table used for this demo
cursor.execute("truncate table TestTempTable")
# Populate the table with a large number of rows
num_rows = 50000
sql = "insert into TestTempTable (IntCol) values (:1)"
data = [(n + 1,) for n in range(num_rows)]
cursor.executemany(sql, data)
# Populate the table with a large number of rows
num_rows = 50000
sql = "insert into TestTempTable (IntCol) values (:1)"
data = [(n + 1,) for n in range(num_rows)]
cursor.executemany(sql, data)
# Set the arraysize and prefetch rows of the REF cursor
ref_cursor = connection.cursor()
ref_cursor.prefetchrows = 1000
ref_cursor.arraysize = 1000
# Perform an untuned fetch
ref_cursor = connection.cursor()
# Perform the tuned fetch
sum_rows = 0
cursor.callproc("myrefcursorproc2", [ref_cursor])
print("Sum of IntCol for", num_rows, "rows:")
for row in ref_cursor:
print("ref_cursor.prefetchrows =", ref_cursor.prefetchrows,
"ref_cursor.arraysize =", ref_cursor.arraysize)
start = time.time()
sum_rows = 0
cursor.callproc("myrefcursorproc2", [ref_cursor])
for row in ref_cursor:
sum_rows += row[0]
print(sum_rows)
elapsed = (time.time() - start)
print("Sum of IntCol for", num_rows, "rows is ", sum_rows,
"in", elapsed, "seconds")
print()
# Repeat the call but increase the internal arraysize and prefetch row
# buffers for the REF CURSOR to tune the number of round-trips to the
# database
ref_cursor = connection.cursor()
ref_cursor.prefetchrows = 1000
ref_cursor.arraysize = 1000
print("ref_cursor.prefetchrows =", ref_cursor.prefetchrows,
"ref_cursor.arraysize =", ref_cursor.arraysize)
start = time.time()
sum_rows = 0
cursor.callproc("myrefcursorproc2", [ref_cursor])
for row in ref_cursor:
sum_rows += row[0]
elapsed = (time.time() - start)
print("Sum of IntCol for", num_rows, "rows is ", sum_rows,
"in", elapsed, "seconds")

View File

@ -47,40 +47,43 @@ if not sample_env.get_is_thin():
# indicate that LOBS should not be fetched
oracledb.defaults.fetch_lobs = False
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# add some data to the tables
print("Populating tables with data...")
cursor.execute("truncate table TestClobs")
cursor.execute("truncate table TestBlobs")
long_string = ""
for i in range(10):
with connection.cursor() as cursor:
# add some data to the tables
print("Populating tables with data...")
cursor.execute("truncate table TestClobs")
cursor.execute("truncate table TestBlobs")
long_string = ""
for i in range(10):
char = chr(ord('A') + i)
long_string += char * 25000
cursor.execute("insert into TestClobs values (:1, :2)",
(i + 1, "STRING " + long_string))
cursor.execute("insert into TestBlobs values (:1, :2)",
(i + 1, long_string.encode("ascii")))
connection.commit()
connection.commit()
# fetch the data and show the results
print("CLOBS returned as strings")
cursor.execute("""
# fetch the data and show the results
print("CLOBS returned as strings")
cursor.execute("""
select
IntCol,
ClobCol
from TestClobs
order by IntCol""")
for int_col, value in cursor:
for int_col, value in cursor:
print("Row:", int_col, "string of length", len(value))
print()
print("BLOBS returned as bytes")
cursor.execute("""
print()
print("BLOBS returned as bytes")
cursor.execute("""
select
IntCol,
BlobCol
from TestBlobs
order by IntCol""")
for int_col, value in cursor:
for int_col, value in cursor:
print("Row:", int_col, "string of length", value and len(value) or 0)

View File

@ -37,15 +37,18 @@ import decimal
import oracledb
import sample_env
# indicate that numbers should be fetched as decimals
oracledb.defaults.fetch_decimals = True
# determine whether to use python-oracledb thin mode or thick mode
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
# indicate that numbers should be fetched as decimals
oracledb.defaults.fetch_decimals = True
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
cursor.execute("select * from TestNumbers")
for row in cursor:
with connection.cursor() as cursor:
cursor.execute("select * from TestNumbers")
for row in cursor:
print("Row:", row)

View File

@ -49,32 +49,41 @@ class Test:
self.b = b
self.c = c
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# change this to False if you want to create the table yourself using SQL*Plus
# and then populate it with the data of your choice
if True:
with connection.cursor() as cursor:
# create sample data
cursor.execute("""
select count(*)
from user_tables
where table_name = 'TESTINSTANCES'""")
count, = cursor.fetchone()
if count:
cursor.execute("drop table TestInstances")
cursor.execute("""
create table TestInstances (
begin
begin
execute immediate 'drop table TestInstances';
exception
when others then
if sqlcode <> -942 then
raise;
end if;
end;
execute immediate 'create table TestInstances (
a varchar2(60) not null,
b number(9) not null,
c date not null
)""")
cursor.execute("insert into TestInstances values ('First', 5, sysdate)")
cursor.execute("insert into TestInstances values ('Second', 25, sysdate)")
connection.commit()
c date not null)';
# retrieve the data and display it
cursor.execute("select * from TestInstances")
cursor.rowfactory = Test
print("Rows:")
for row in cursor:
execute immediate
'insert into TestInstances values (''First'', 5, sysdate)';
execute immediate
'insert into TestInstances values (''Second'', 25, sysdate)';
commit;
end;""")
# retrieve the data and display it
cursor.execute("select * from TestInstances")
cursor.rowfactory = Test
print("Rows:")
for row in cursor:
print("a = %s, b = %s, c = %s" % (row.a, row.b, row.c))

View File

@ -0,0 +1,84 @@
# NAME
#
# Dockerfile
#
# PURPOSE
#
# Creates a container with the Python python-oracledb samples and a running
# Oracle Database so python-oracledb can be evaluated.
#
# Python-oracledb is the Python database driver for Oracle Database. See
# https://oracle.github.io/python-oracledb/
#
# USAGE
#
# Get an Oracle Database container (see
# https://hub.docker.com/r/gvenzl/oracle-xe):
#
# podman pull docker.io/gvenzl/oracle-xe:21-slim
#
# Create a container with the database, Python, python-oracledb and the
# samples. Choose a password for the sample schemas and pass it as an
# argument:
#
# podman build -t pyo --build-arg PYO_PASSWORD=a_secret .
#
# Start the container, which creates the database. Choose a password for the
# privileged database users and pass it as a variable:
#
# podman run -d --name pyo -p 1521:1521 -it -e ORACLE_PASSWORD=a_secret pyo
#
# Log into the container:
#
# podman exec -it pyo bash
#
# At the first login, create the sample schema:
#
# python setup.py
#
# Run samples like:
#
# python bind_insert.py
#
# The database will persist across container shutdowns, but will be deleted
# when the container is deleted.
FROM docker.io/gvenzl/oracle-xe:21-slim
USER root
RUN microdnf module disable python36 && \
microdnf module enable python39 && \
microdnf install python39 python39-pip python39-setuptools python39-wheel vim && \
microdnf clean all
WORKDIR /samples/
COPY setup.py setup.py
RUN curl -LO https://github.com/oracle/python-oracledb/archive/refs/tags/v1.0.0.zip && \
unzip v1.0.0.zip && mv python-oracledb-1.0.0/samples/* . && \
/bin/rm -rf python-oracledb-1.0.0 samples v1.0.0.zip && \
cat create_schema.py >> /samples/setup.py && chown -R oracle.oinstall /samples/
USER oracle
RUN python3 -m pip install oracledb --user
ARG PYO_PASSWORD
ENV PYO_SAMPLES_MAIN_USER=pythondemo
ENV PYO_SAMPLES_MAIN_PASSWORD=${PYO_PASSWORD}
ENV PYO_SAMPLES_EDITION_USER=pythoneditions
ENV PYO_SAMPLES_EDITION_PASSWORD=${PYO_PASSWORD}
ENV PYO_SAMPLES_EDITION_NAME=python_e1
ENV PYO_SAMPLES_CONNECT_STRING="localhost/xepdb1"
ENV PYO_SAMPLES_DRCP_CONNECT_STRING="localhost/xepdb1:pooled"
ENV PYO_SAMPLES_ADMIN_USER=system
# Run the samples using the default python-oracledb 'Thin' mode, if possible
ENV PYO_SAMPLES_DRIVER_MODE="thin"
# The privileged user password is set in setup.py from the "podman run"
# environment variable ORACLE_PASSWORD
#ENV PYO_SAMPLES_ADMIN_PASSWORD=

View File

@ -0,0 +1,55 @@
# python-oracledb Samples in a Container
This Dockerfile creates a container with python-oracledb samples and a running
Oracle Database.
It has been tested in an Oracle Linux 8 environment using 'podman', but
'docker' should work too.
## Usage
- Get an Oracle Database container (see
https://hub.docker.com/r/gvenzl/oracle-xe):
```
podman pull docker.io/gvenzl/oracle-xe:21-slim
```
- Create a container with the database, Python, python-oracledb and the
samples. Choose a password for the sample schemas and pass it as an argument:
```
podman build -t pyo --build-arg PYO_PASSWORD=a_secret .
```
- Start the container, which creates the database. Choose a password for the
privileged database users and pass it as a variable:
```
podman run -d --name pyo -p 1521:1521 -it -e ORACLE_PASSWORD=a_secret_password pyo
```
- Log into the container:
```
podman exec -it pyo bash
```
- At the first login, create the sample schema:
```
python setup.py
```
The schema used can be seen in `sql/create_schema.sql`
- In the container, run samples like:
```
python bind_insert.py
```
Use `vim` to edit files, if required.
The database will persist across container shutdowns, but will be deleted when
the container is deleted.

View File

@ -0,0 +1,53 @@
#! /usr/bin/env python3.9
#
# NAME
#
# setup.py
#
# PURPOSE
#
# Creates the python-oracledb sample schema after waiting for the database to
# open.
#
# USAGE
#
# ./setup.py
import oracledb
import os
import time
pw = os.environ.get("ORACLE_PASSWORD")
os.environ["PYO_SAMPLES_ADMIN_PASSWORD"] = pw
c = None
for i in range(30):
try:
c = oracledb.connect(user="system",
password=pw,
dsn="localhost/xepdb1",
tcp_connect_timeout=5)
break
except (OSError, oracledb.Error) as e:
print("Waiting for database to open")
time.sleep(5)
if c:
print("PDB is open")
else:
print("PDB did not open in allocated time")
print("Try again in a few minutes")
exit()
# Connect to the CDB to start DRCP because enable_per_pdb_drcp is FALSE by
# default
print("Starting DRCP pool")
with oracledb.connect(user="sys",
password=pw,
dsn="localhost/XE",
mode=oracledb.AUTH_MODE_SYSDBA) as connection:
with connection.cursor() as cursor:
cursor.callproc("dbms_connection_pool.start_pool")
# create_schema.py will be appended here by the Dockerfile

View File

@ -27,45 +27,60 @@
# applications should consider using External Authentication to avoid hard
# coded credentials.
#
# You can set values in environment variables to bypass the sample requesting
# the information it requires.
# The samples will prompt for credentials and schema information unless the
# following environment variables are set:
#
# PYO_SAMPLES_MAIN_USER: user used for most samples
# PYO_SAMPLES_MAIN_PASSWORD: password of user used for most samples
# PYO_SAMPLES_EDITION_USER: user for editioning
# PYO_SAMPLES_EDITION_PASSWORD: password of user for editioning
# PYO_SAMPLES_EDITION_NAME: name of edition for editioning
# PYO_SAMPLES_CONNECT_STRING: connect string
# PYO_SAMPLES_DRCP_CONNECT_STRING: DRCP connect string
# PYO_SAMPLES_ADMIN_USER: admin user for setting up samples
# PYO_SAMPLES_ADMIN_PASSWORD: admin password for setting up samples
# PYO_SAMPLES_DRIVER_MODE: python-oracledb mode (thick or thin) to use
# PYO_SAMPLES_ORACLE_CLIENT_PATH: Oracle Client or Instant Client library dir
# PYO_SAMPLES_ADMIN_USER: privileged administrative user for setting up samples
# PYO_SAMPLES_ADMIN_PASSWORD: password of PYO_SAMPLES_ADMIN_USER
# PYO_SAMPLES_CONNECT_STRING: database connection string
# PYO_SAMPLES_DRCP_CONNECT_STRING: database connecttion string for DRCP
# PYO_SAMPLES_MAIN_USER: user to be created. Used for most samples
# PYO_SAMPLES_MAIN_PASSWORD: password for PYO_SAMPLES_MAIN_USER
# PYO_SAMPLES_EDITION_USER: user to be created for editiong samples
# PYO_SAMPLES_EDITION_PASSWORD: password of PYO_SAMPLES_EDITION_USER
# PYO_SAMPLES_EDITION_NAME: name of edition for editioning samples
# PYO_SAMPLES_DRIVER_MODE: python-oracledb mode (Thick or thin) to use
#
# On Windows set PYO_SAMPLES_ORACLE_CLIENT_PATH if Oracle libraries are not in
# PATH. On macOS set the variable to the Instant Client directory. On Linux
# do not set the variable; instead set LD_LIBRARY_PATH or configure ldconfig
# before running Python.
# - On Windows set PYO_SAMPLES_ORACLE_CLIENT_PATH if Oracle libraries are not
# in PATH. On macOS set the variable to the Instant Client directory. On
# Linux do not set the variable; instead set LD_LIBRARY_PATH or configure
# ldconfig before running Python.
#
# PYO_SAMPLES_CONNECT_STRING can be set to an Easy Connect string, or a
# Net Service Name from a tnsnames.ora file or external naming service,
# or it can be the name of a local Oracle database instance.
# - PYO_SAMPLES_ADMIN_USER should be the administrative user ADMIN for cloud
# databases and SYSTEM for on premises databases.
#
# If using Instant Client, then an Easy Connect string is generally
# appropriate. The syntax is:
# - PYO_SAMPLES_CONNECT_STRING is the connection string for your database. It
# can be set to an Easy Connect string or to a Net Service Name from a
# tnsnames.ora file or external naming service.
#
# [//]host_name[:port][/service_name][:server_type][/instance_name]
# The Easy Connect string is generally easiest. The basic syntax is:
#
# host_name[:port][/service_name][:server_type]
#
# Commonly just the host_name and service_name are needed
# e.g. "localhost/orclpdb1" or "localhost/XEPDB1"
# e.g. "localhost/orclpdb" or "localhost/XEPDB1".
#
# If using a tnsnames.ora file, the file can be in a default
# location such as $ORACLE_HOME/network/admin/tnsnames.ora or
# /etc/tnsnames.ora. Alternatively set the TNS_ADMIN environment
# variable and put the file in $TNS_ADMIN/tnsnames.ora.
# If PYO_SAMPLES_CONNECT_STRING is an aliases from tnsnames.ora file, then
# set the TNS_ADMIN environment variable and put the file in
# $TNS_ADMIN/tnsnames.ora. Alternatively for python-oracledb Thick mode, the
# file will be automatically used if it is in a location such as
# instantclient_XX_Y/network/admin/tnsnames.ora,
# $ORACLE_HOME/network/admin/tnsnames.ora or /etc/tnsnames.ora.
#
# - PYO_SAMPLES_DRCP_CONNECT_STRING should be a connect string requesting a
# pooled server, for example "localhost/orclpdb:pooled".
#
# - PYO_SAMPLES_MAIN_USER and PYO_SAMPLES_EDITION_USER are names of users that
# will be created and used for running samples. Choose names that do not
# exist because the schemas will be dropped and then recreated.
#
# - PYO_SAMPLES_EDITION_NAME can be set to a new name to be used for Edition
# Based Redefinition examples.
#
# - PYO_SAMPLES_DRIVER_MODE should be "thin" or "thick". It is used by samples
# that can run in both python-oracledb modes.
#
# The administrative user for cloud databases is ADMIN and the administrative
# user for on premises databases is SYSTEM.
#------------------------------------------------------------------------------
import getpass
@ -88,8 +103,7 @@ def get_value(name, label, default_value=None, password=False):
value = PARAMETERS.get(name)
if value is not None:
return value
env_name = "PYO_SAMPLES_" + name
value = os.environ.get(env_name)
value = os.environ.get(name)
if value is None:
if default_value is not None:
label += " [%s]" % default_value
@ -104,54 +118,55 @@ def get_value(name, label, default_value=None, password=False):
return value
def get_main_user():
return get_value("MAIN_USER", "Main User Name", DEFAULT_MAIN_USER)
return get_value("PYO_SAMPLES_MAIN_USER", "Main User Name",
DEFAULT_MAIN_USER)
def get_main_password():
return get_value("MAIN_PASSWORD", "Password for %s" % get_main_user(),
password=True)
return get_value("PYO_SAMPLES_MAIN_PASSWORD",
f"Password for {get_main_user()}", password=True)
def get_edition_user():
return get_value("EDITION_USER", "Edition User Name", DEFAULT_EDITION_USER)
return get_value("PYO_SAMPLES_EDITION_USER", "Edition User Name",
DEFAULT_EDITION_USER)
def get_edition_password():
return get_value("EDITION_PASSWORD",
"Password for %s" % get_edition_user(), password=True)
return get_value("PYO_SAMPLES_EDITION_PASSWORD",
f"Password for {get_edition_user()}", password=True)
def get_edition_name():
return get_value("EDITION_NAME", "Edition Name", DEFAULT_EDITION_NAME)
return get_value("PYO_SAMPLES_EDITION_NAME", "Edition Name",
DEFAULT_EDITION_NAME)
def get_connect_string():
return get_value("CONNECT_STRING", "Connect String",
return get_value("PYO_SAMPLES_CONNECT_STRING", "Connect String",
DEFAULT_CONNECT_STRING)
def get_main_connect_string(password=None):
if password is None:
password = get_main_password()
return "%s/%s@%s" % (get_main_user(), password, get_connect_string())
def get_drcp_connect_string():
return get_value("PYO_SAMPLES_DRCP_CONNECT_STRING", "DRCP Connect String",
DEFAULT_DRCP_CONNECT_STRING)
def get_driver_mode():
return get_value("DRIVER_MODE", "Driver mode (thin|thick)", "thin")
return get_value("PYO_SAMPLES_DRIVER_MODE", "Driver mode (thin|thick)",
"thin")
def get_is_thin():
return get_driver_mode() == "thin"
def get_drcp_connect_string():
connect_string = get_value("DRCP_CONNECT_STRING", "DRCP Connect String",
DEFAULT_DRCP_CONNECT_STRING)
return "%s/%s@%s" % (get_main_user(), get_main_password(), connect_string)
def get_edition_connect_string():
return "%s/%s@%s" % \
(get_edition_user(), get_edition_password(), get_connect_string())
def get_admin_connect_string():
admin_user = get_value("ADMIN_USER", "Administrative user", "admin")
admin_password = get_value("ADMIN_PASSWORD", f"Password for {admin_user}", password=True)
admin_user = get_value("PYO_SAMPLES_ADMIN_USER", "Administrative user",
"admin")
admin_password = get_value("PYO_SAMPLES_ADMIN_PASSWORD",
f"Password for {admin_user}", password=True)
return "%s/%s@%s" % (admin_user, admin_password, get_connect_string())
def get_oracle_client():
if sys.platform in ("darwin", "win32"):
return get_value("ORACLE_CLIENT_PATH", "Oracle Instant Client Path")
return get_value("PYO_SAMPLES_ORACLE_CLIENT_PATH",
"Oracle Instant Client Path")
def run_sql_script(conn, script_name, **kwargs):
statement_parts = []

View File

@ -41,52 +41,53 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# show all of the rows available in the table
cursor = connection.cursor()
cursor.execute("select * from TestStrings order by IntCol")
print("ALL ROWS")
for row in cursor:
with connection.cursor() as cursor:
cursor.execute("select * from TestStrings order by IntCol")
print("ALL ROWS")
for row in cursor:
print(row)
print()
print()
# create a scrollable cursor
cursor = connection.cursor(scrollable = True)
with connection.cursor(scrollable = True) as cursor:
# set array size smaller than the default (100) to force scrolling by the
# database; otherwise, scrolling occurs directly within the buffers
cursor.arraysize = 3
cursor.execute("select * from TestStrings order by IntCol")
# set array size smaller than the default (100) to force scrolling by the
# database; otherwise, scrolling occurs directly within the buffers
cursor.arraysize = 3
cursor.execute("select * from TestStrings order by IntCol")
# scroll to last row in the result set; the first parameter is not needed
# and is ignored)
cursor.scroll(mode = "last")
print("LAST ROW")
print(cursor.fetchone())
print()
# scroll to last row in the result set; the first parameter is not needed and
# is ignored)
cursor.scroll(mode = "last")
print("LAST ROW")
print(cursor.fetchone())
print()
# scroll to the first row in the result set; the first parameter not needed
# and is ignored
cursor.scroll(mode = "first")
print("FIRST ROW")
print(cursor.fetchone())
print()
# scroll to the first row in the result set; the first parameter not needed and
# is ignored
cursor.scroll(mode = "first")
print("FIRST ROW")
print(cursor.fetchone())
print()
# scroll to an absolute row number
cursor.scroll(5, mode = "absolute")
print("ROW 5")
print(cursor.fetchone())
print()
# scroll to an absolute row number
cursor.scroll(5, mode = "absolute")
print("ROW 5")
print(cursor.fetchone())
print()
# scroll forward six rows (the mode parameter defaults to relative)
cursor.scroll(3)
print("SKIP 3 ROWS")
print(cursor.fetchone())
print()
# scroll forward six rows (the mode parameter defaults to relative)
cursor.scroll(3)
print("SKIP 3 ROWS")
print(cursor.fetchone())
print()
# scroll backward four rows (the mode parameter defaults to relative)
cursor.scroll(-4)
print("SKIP BACK 4 ROWS")
print(cursor.fetchone())
print()
# scroll backward four rows (the mode parameter defaults to relative)
cursor.scroll(-4)
print("SKIP BACK 4 ROWS")
print(cursor.fetchone())
print()

View File

@ -38,7 +38,9 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# The general recommendation for simple SODA usage is to enable autocommit
connection.autocommit = True

View File

@ -38,7 +38,9 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# the general recommendation for simple SODA usage is to enable autocommit
connection.autocommit = True

View File

@ -53,7 +53,10 @@ import sample_env
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
# create Oracle connection and cursor objects
connection = oracledb.connect(sample_env.get_main_connect_string())
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
cursor = connection.cursor()
# enable autocommit to avoid the additional round trip to the database to

View File

@ -37,16 +37,16 @@ import sample_env
if not sample_env.get_is_thin():
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
# sample subclassed connection which overrides the constructor (so no
# sample subclassed Connection which overrides the constructor (so no
# parameters are required) and the cursor() method (so that the subclassed
# cursor is returned instead of the default cursor implementation)
class Connection(oracledb.Connection):
def __init__(self):
connect_string = sample_env.get_main_connect_string()
print("CONNECT to database")
super().__init__(connect_string)
print("CONNECT", sample_env.get_connect_string())
super().__init__(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
def cursor(self):
return Cursor(self)
@ -63,15 +63,16 @@ class Cursor(oracledb.Cursor):
return super().execute(statement, args)
def fetchone(self):
print("FETCH ONE")
print("FETCHONE")
return super().fetchone()
# create instances of the subclassed connection and cursor
# create instances of the subclassed Connection and cursor
connection = Connection()
cursor = connection.cursor()
# demonstrate that the subclassed connection and cursor are being used
cursor.execute("select count(*) from ChildTable where ParentId = :1", (30,))
count, = cursor.fetchone()
print("COUNT:", int(count))
with connection.cursor() as cursor:
# demonstrate that the subclassed connection and cursor are being used
cursor.execute("select count(*) from ChildTable where ParentId = :1", (30,))
count, = cursor.fetchone()
print("COUNT:", int(count))

View File

@ -84,36 +84,39 @@ def output_type_handler(cursor, name, default_type, size, precision, scale):
return cursor.var(default_type, arraysize=cursor.arraysize,
outconverter=Building.from_json)
conn = oracledb.connect(sample_env.get_main_connect_string())
cur = conn.cursor()
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
buildings = [
with connection.cursor() as cursor:
buildings = [
Building(1, "The First Building", 5),
Building(2, "The Second Building", 87),
Building(3, "The Third Building", 12)
]
]
# Insert building data (python object) as a JSON string
cur.inputtypehandler = input_type_handler
for building in buildings:
cur.execute("insert into BuildingsAsJsonStrings values (:1, :2)",
# Insert building data (python object) as a JSON string
cursor.inputtypehandler = input_type_handler
for building in buildings:
cursor.execute("insert into BuildingsAsJsonStrings values (:1, :2)",
(building.building_id, building))
# fetch the building data as a JSON string
print("NO OUTPUT TYPE HANDLER:")
for row in cur.execute("""
# fetch the building data as a JSON string
print("NO OUTPUT TYPE HANDLER:")
for row in cursor.execute("""
select * from BuildingsAsJsonStrings
order by BuildingId"""):
print(row)
print()
print()
cur = conn.cursor()
with connection.cursor() as cursor:
# fetch the building data as python objects
cur.outputtypehandler = output_type_handler
print("WITH OUTPUT TYPE HANDLER:")
for row in cur.execute("""
# fetch the building data as python objects
cursor.outputtypehandler = output_type_handler
print("WITH OUTPUT TYPE HANDLER:")
for row in cursor.execute("""
select * from BuildingsAsJsonStrings
order by BuildingId"""):
print(row)
print()
print()

View File

@ -44,8 +44,11 @@ import sample_env
# this script is currently only supported in python-oracledb thick mode
oracledb.init_oracle_client(lib_dir=sample_env.get_oracle_client())
con = oracledb.connect(sample_env.get_main_connect_string())
obj_type = con.gettype("UDT_BUILDING")
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
obj_type = connection.gettype("UDT_BUILDING")
class Building:
@ -89,20 +92,26 @@ buildings = [
Building(3, "The Third Building", 12, datetime.date(2005, 6, 19)),
]
cur = con.cursor()
cur.inputtypehandler = input_type_handler
for building in buildings:
cur.execute("insert into BuildingsAsObjects values (:1, :2)",
with connection.cursor() as cursor:
cursor.inputtypehandler = input_type_handler
for building in buildings:
cursor.execute("insert into BuildingsAsObjects values (:1, :2)",
(building.building_id, building))
print("NO OUTPUT TYPE HANDLER:")
for row in cur.execute("select * from BuildingsAsObjects order by BuildingId"):
print("NO OUTPUT TYPE HANDLER:")
for row in cursor.execute("""
select *
from BuildingsAsObjects
order by BuildingId"""):
print(row)
print()
print()
cur = con.cursor()
cur.outputtypehandler = output_type_handler
print("WITH OUTPUT TYPE HANDLER:")
for row in cur.execute("select * from BuildingsAsObjects order by BuildingId"):
with connection.cursor() as cursor:
cursor.outputtypehandler = output_type_handler
print("WITH OUTPUT TYPE HANDLER:")
for row in cursor.execute("""
select *
from BuildingsAsObjects
order by BuildingId"""):
print(row)
print()
print()

View File

@ -50,31 +50,37 @@ DATA = [
(3, "A" * 250, datetime.datetime(2017, 4, 6))
]
# truncate table so sample can be rerun
connection = oracledb.connect(sample_env.get_main_connect_string())
cursor = connection.cursor()
print("Truncating table...")
cursor.execute("truncate table TestUniversalRowids")
connection = oracledb.connect(user=sample_env.get_main_user(),
password=sample_env.get_main_password(),
dsn=sample_env.get_connect_string())
# populate table with a few rows
print("Populating table...")
for row in DATA:
with connection.cursor() as cursor:
# truncate table so sample can be rerun
print("Truncating table...")
cursor.execute("truncate table TestUniversalRowids")
# populate table with a few rows
print("Populating table...")
for row in DATA:
print("Inserting", row)
cursor.execute("insert into TestUniversalRowids values (:1, :2, :3)", row)
connection.commit()
cursor.execute("insert into TestUniversalRowids values (:1, :2, :3)",
row)
connection.commit()
# fetch the rowids from the table
rowids = [r for r, in cursor.execute("select rowid from TestUniversalRowids")]
# fetch the rowids from the table
cursor.execute("select rowid from TestUniversalRowids")
rowids = [r for r, in cursor]
# fetch each of the rows given the rowid
for rowid in rowids:
# fetch each of the rows given the rowid
for rowid in rowids:
print("-" * 79)
print("Rowid:", rowid)
cursor.execute("""
select IntCol, StringCol, DateCol
from TestUniversalRowids
where rowid = :rid""",
rid = rowid)
{"rid": rowid})
int_col, string_col, dateCol = cursor.fetchone()
print("IntCol:", int_col)
print("StringCol:", string_col)