duppr_analysis/experiment_code/minor_revision/dbop.py

75 lines
1.7 KiB
Python

#coding:utf-8
import json
import MySQLdb
with open(".config.json") as fp:
config = json.load(fp)
config = config["local_db"]
import Queue
#########################################
### 一些可以执行简单sql语句的函数
#########################################
THREAD_POOL = Queue.Queue()
def init_pool(pool_size):
for i in range(0,pool_size):
conn = MySQLdb.connect(host=config["db_host"],user=config["db_user"],
passwd=config["db_passwd"],db=config["db_name"],port=3306,charset='utf8mb4')
THREAD_POOL.put(conn)
def get_conn():
conn = THREAD_POOL.get()
try:
conn.ping()
cursor = conn.cursor()
cursor.execute("show processlist;")
cursor.close()
except Exception,e:
conn = MySQLdb.connect(config["db_host"],config["db_user"],
config["db_passwd"],config["db_name"],charset='utf8mb4')
return conn
def put_conn(conn):
THREAD_POOL.put(conn)
def execute(sql_stat,params=None):
conn = get_conn()
cursor = conn.cursor()
result = cursor.execute(sql_stat,params)
conn.commit()
cursor.close()
put_conn(conn)
return result
def execute_patch(patchs):
conn = get_conn()
cursor = conn.cursor()
for p in patchs:
result = cursor.execute(p[0], p[1])
conn.commit()
cursor.close()
put_conn(conn)
return result
def select_one(sql_stat, params,none_return_value=None):
conn = get_conn()
cursor = conn.cursor()
cursor.execute(sql_stat,params)
result = cursor.fetchone()
conn.commit()
cursor.close()
put_conn(conn)
if result is None:
return none_return_value
else:
return result
def select_all(sql_stat,params=None):
conn = get_conn()
cursor = conn.cursor()
cursor.execute(sql_stat,params)
result = cursor.fetchall()
conn.commit()
cursor.close()
put_conn(conn)
return result