229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
#!/usr/bin/python3
|
||
# -*- coding: UTF-8 -*-
|
||
# 设置utf-8 显示中文
|
||
"""
|
||
@Author: guo
|
||
@File:handle_checkdata.py
|
||
"""
|
||
import pytest
|
||
from jsonpath import jsonpath
|
||
from common.util.handle_jsonfile import HandleJsonFile
|
||
from common.yml.get_yml_keys import GetYmlKeys
|
||
from config.get_conf_data import GetConfData
|
||
from common.util.handle_datetime import HandleDateTime
|
||
|
||
|
||
|
||
class HandleCheck:
|
||
|
||
""""""
|
||
def __init__(self):
|
||
self.__ymlkeys = GetYmlKeys()
|
||
self.__conf = GetConfData()
|
||
self.__check_data = None
|
||
self.__res = None
|
||
self.__hjson = HandleJsonFile()
|
||
self.__htime = HandleDateTime()
|
||
|
||
|
||
def handle_checkdata(self,res,ymldata):
|
||
"""处理数据检查"""
|
||
res = res
|
||
ymldata = ymldata
|
||
apidata_key = self.__ymlkeys.get_api_data_key()
|
||
apidata = ymldata[apidata_key]
|
||
check_key = self.__ymlkeys.ensure_check_key
|
||
check_value = apidata[check_key]
|
||
checkdata_key = self.__ymlkeys.get_check_data_key()
|
||
checkdata = apidata[checkdata_key]
|
||
if check_value == True:
|
||
if isinstance(res,dict):
|
||
self.__res = res
|
||
else:
|
||
self.__res = res.json()
|
||
|
||
self.__check_data = checkdata
|
||
self.__checkdata()
|
||
|
||
|
||
def __checkdata(self):
|
||
""""""
|
||
global res
|
||
res = self.__res
|
||
checkdata = self.__check_data
|
||
# 获取总记录条数
|
||
rows = self.__get_amount()
|
||
# 获取真正的校验的值的内容
|
||
check_key = self.__ymlkeys.get_check_data_key()
|
||
checkdata_second = checkdata[check_key]
|
||
# 获取期望值
|
||
expect_key = self.__ymlkeys.get_expect_data_key()
|
||
expect_data = checkdata_second[expect_key]
|
||
# 获取实际值
|
||
actual_key = self.__ymlkeys.get_actual_data_key()
|
||
actual_data = checkdata_second[actual_key]
|
||
# 获取是否校验时间差值
|
||
ensure_check_key = self.__ymlkeys.get_ensure_checktime_key()
|
||
ensure_check = checkdata_second[ensure_check_key]
|
||
# 目标数据下标
|
||
flag_index = None
|
||
# 计算列数
|
||
cols = len(actual_data)
|
||
# 计数器
|
||
count = 0
|
||
if ensure_check == True:
|
||
timedelta_key = self.__ymlkeys.get_time_delta_key()
|
||
timedelta_value = checkdata_second[timedelta_key]
|
||
|
||
# 获取当前时间
|
||
cur_time = self.__htime.get_curtime_to_ss()
|
||
# 获取记录的创建时间
|
||
create_time_key = self.__ymlkeys.get_create_time_key()
|
||
cre_time = timedelta_value[create_time_key]
|
||
cre_time = eval(cre_time)
|
||
# 获取时间差类型,及要求的差值
|
||
type_key = self.__ymlkeys.time_type_key
|
||
type_value = timedelta_value[type_key]
|
||
delta_key = self.__ymlkeys.get_timedelta_value_key()
|
||
delta_value = timedelta_value[delta_key]
|
||
|
||
# 开始进行数据校验
|
||
for row in range(rows):
|
||
# 重置计数器
|
||
count = 0
|
||
# 先获取时间差值校验结果
|
||
flag = self.__calc_delta(type_value, delta_value, cre_time[row], cur_time)
|
||
if flag == True:
|
||
for col in range(cols):
|
||
expect = expect_data[col]
|
||
actual = eval(actual_data[col])[row]
|
||
|
||
if isinstance(expect, list):
|
||
if isinstance(actual, list):
|
||
# 当实际值与预期值都为list时,直接对排完序后的值进行比较
|
||
if sorted(expect) == sorted(actual):
|
||
count += 1
|
||
# 表示实际值为非list
|
||
else:
|
||
if len(expect) == 1 and expect[0] == actual :
|
||
count += 1
|
||
# 表示预期值不是list
|
||
else:
|
||
if isinstance(actual, list):
|
||
if len(actual) == 1 and expect == actual[0] :
|
||
count += 1
|
||
|
||
else:
|
||
if expect == actual:
|
||
count += 1
|
||
# 判断是否获取到了目标数据
|
||
if count == cols:
|
||
# 获取下标
|
||
flag_index = row
|
||
break
|
||
|
||
# 表示不需要进行时间差校验。
|
||
else:
|
||
# 开始进行数据校验
|
||
for row in range(rows):
|
||
count = 0
|
||
for col in range(cols):
|
||
expect = expect_data[col]
|
||
actual = eval(actual_data[col])[row]
|
||
|
||
if isinstance(expect,list):
|
||
if isinstance(actual,list):
|
||
# 当实际值与预期值都为list时,直接对排完序后的值进行比较
|
||
if sorted(expect) == sorted(actual):
|
||
count +=1
|
||
# 表示实际值为非list
|
||
else:
|
||
if len(expect) == 1 and expect[0] == actual:
|
||
count +=1
|
||
# 表示预期值不是list
|
||
else:
|
||
if isinstance(actual,list):
|
||
if len(actual) ==1 and expect == actual[0]:
|
||
count +=1
|
||
|
||
else:
|
||
if expect == actual:
|
||
count +=1
|
||
|
||
# 进行计数器的变量
|
||
if count == cols:
|
||
# 获取下标
|
||
flag_index = row
|
||
break
|
||
|
||
if flag_index != None:
|
||
|
||
# 获取目标数据
|
||
save_keys_key = self.__ymlkeys.get_save_keys_key()
|
||
save_keys_list = checkdata[save_keys_key]
|
||
query_keys_key = self.__ymlkeys.get_query_keys_key()
|
||
query_keys = checkdata[query_keys_key]
|
||
check_value = {}
|
||
for key in save_keys_list:
|
||
item = query_keys[key]
|
||
item = eval(item)[flag_index]
|
||
check_value[key] = item
|
||
|
||
# 保存最后的结果。
|
||
filepath_key = self.__ymlkeys.get_save_jsonfile_path_key()
|
||
filepath = checkdata[filepath_key]
|
||
self.__hjson.write_dict_to_json_file(filepath,check_value)
|
||
|
||
else:
|
||
pytest.fail(f"在进行数据校验时,遍历了所有的数据,均未找到目标数据,故将case置为fail")
|
||
|
||
|
||
def __calc_delta(self,timetype,delta_value,create_time,cur_time)->bool:
|
||
""""""
|
||
second_str = self.__conf.get_second_str()
|
||
minute_str = self.__conf.get_minute_str()
|
||
hour_str = self.__conf.get_hour_str()
|
||
day_str = self.__conf.get_day_str()
|
||
value = None
|
||
flag = False
|
||
if timetype == second_str:
|
||
value = self.__htime.time_delta_ss(create_time,cur_time)
|
||
elif timetype == minute_str:
|
||
value = self.__htime.time_delta_mm(create_time,cur_time)
|
||
elif timetype == hour_str:
|
||
value = self.__htime.time_delta_hh(create_time,cur_time)
|
||
elif timetype == day_str:
|
||
value = self.__htime.time_delta_days(create_time,cur_time)
|
||
|
||
# 计算时间差值是否符合预期
|
||
if value <= delta_value:
|
||
flag = True
|
||
# 返回结果
|
||
return flag
|
||
|
||
|
||
|
||
def __get_amount(self):
|
||
"""获取记录的总条数"""
|
||
res = self.__res
|
||
query_keys_key = self.__ymlkeys.get_query_keys_key()
|
||
checkdata = self.__check_data
|
||
query_keys_value = checkdata[query_keys_key]
|
||
amount_key = self.__ymlkeys.get_amount_key()
|
||
amount = query_keys_value[amount_key]
|
||
return eval(amount)
|
||
|
||
|
||
|
||
|
||
if __name__ == '__main__':
|
||
yml_path = "../../ymlfile/qingjia/test_askforleave.yml"
|
||
jsonfile_path = "../../tmp/askforleave.json"
|
||
import yaml
|
||
ymldata = yaml.safe_load(open(yml_path))["test_query"]
|
||
hjson = HandleJsonFile()
|
||
res = hjson.read_json_file_to_dict(jsonfile_path)
|
||
aa = HandleCheck()
|
||
aa.handle_checkdata(res,ymldata)
|
||
|