pytest_ui_api_fw/common/yml/handle_checkdata.py

229 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
# 设置utf-8 显示中文
"""
@Author: guo
@Filehandle_checkdata.py
"""
import pytest
from jsonpath import jsonpath
from common.util.handle_jsonfile import HandleJsonFile
from common.yml.get_yml_keys import GetYmlKeys
from config.get_conf_data import GetConfData
from common.util.handle_datetime import HandleDateTime
class HandleCheck:
""""""
def __init__(self):
self.__ymlkeys = GetYmlKeys()
self.__conf = GetConfData()
self.__check_data = None
self.__res = None
self.__hjson = HandleJsonFile()
self.__htime = HandleDateTime()
def handle_checkdata(self,res,ymldata):
"""处理数据检查"""
res = res
ymldata = ymldata
apidata_key = self.__ymlkeys.get_api_data_key()
apidata = ymldata[apidata_key]
check_key = self.__ymlkeys.ensure_check_key
check_value = apidata[check_key]
checkdata_key = self.__ymlkeys.get_check_data_key()
checkdata = apidata[checkdata_key]
if check_value == True:
if isinstance(res,dict):
self.__res = res
else:
self.__res = res.json()
self.__check_data = checkdata
self.__checkdata()
def __checkdata(self):
""""""
global res
res = self.__res
checkdata = self.__check_data
# 获取总记录条数
rows = self.__get_amount()
# 获取真正的校验的值的内容
check_key = self.__ymlkeys.get_check_data_key()
checkdata_second = checkdata[check_key]
# 获取期望值
expect_key = self.__ymlkeys.get_expect_data_key()
expect_data = checkdata_second[expect_key]
# 获取实际值
actual_key = self.__ymlkeys.get_actual_data_key()
actual_data = checkdata_second[actual_key]
# 获取是否校验时间差值
ensure_check_key = self.__ymlkeys.get_ensure_checktime_key()
ensure_check = checkdata_second[ensure_check_key]
# 目标数据下标
flag_index = None
# 计算列数
cols = len(actual_data)
# 计数器
count = 0
if ensure_check == True:
timedelta_key = self.__ymlkeys.get_time_delta_key()
timedelta_value = checkdata_second[timedelta_key]
# 获取当前时间
cur_time = self.__htime.get_curtime_to_ss()
# 获取记录的创建时间
create_time_key = self.__ymlkeys.get_create_time_key()
cre_time = timedelta_value[create_time_key]
cre_time = eval(cre_time)
# 获取时间差类型,及要求的差值
type_key = self.__ymlkeys.time_type_key
type_value = timedelta_value[type_key]
delta_key = self.__ymlkeys.get_timedelta_value_key()
delta_value = timedelta_value[delta_key]
# 开始进行数据校验
for row in range(rows):
# 重置计数器
count = 0
# 先获取时间差值校验结果
flag = self.__calc_delta(type_value, delta_value, cre_time[row], cur_time)
if flag == True:
for col in range(cols):
expect = expect_data[col]
actual = eval(actual_data[col])[row]
if isinstance(expect, list):
if isinstance(actual, list):
# 当实际值与预期值都为list时直接对排完序后的值进行比较
if sorted(expect) == sorted(actual):
count += 1
# 表示实际值为非list
else:
if len(expect) == 1 and expect[0] == actual :
count += 1
# 表示预期值不是list
else:
if isinstance(actual, list):
if len(actual) == 1 and expect == actual[0] :
count += 1
else:
if expect == actual:
count += 1
# 判断是否获取到了目标数据
if count == cols:
# 获取下标
flag_index = row
break
# 表示不需要进行时间差校验。
else:
# 开始进行数据校验
for row in range(rows):
count = 0
for col in range(cols):
expect = expect_data[col]
actual = eval(actual_data[col])[row]
if isinstance(expect,list):
if isinstance(actual,list):
# 当实际值与预期值都为list时直接对排完序后的值进行比较
if sorted(expect) == sorted(actual):
count +=1
# 表示实际值为非list
else:
if len(expect) == 1 and expect[0] == actual:
count +=1
# 表示预期值不是list
else:
if isinstance(actual,list):
if len(actual) ==1 and expect == actual[0]:
count +=1
else:
if expect == actual:
count +=1
# 进行计数器的变量
if count == cols:
# 获取下标
flag_index = row
break
if flag_index != None:
# 获取目标数据
save_keys_key = self.__ymlkeys.get_save_keys_key()
save_keys_list = checkdata[save_keys_key]
query_keys_key = self.__ymlkeys.get_query_keys_key()
query_keys = checkdata[query_keys_key]
check_value = {}
for key in save_keys_list:
item = query_keys[key]
item = eval(item)[flag_index]
check_value[key] = item
# 保存最后的结果。
filepath_key = self.__ymlkeys.get_save_jsonfile_path_key()
filepath = checkdata[filepath_key]
self.__hjson.write_dict_to_json_file(filepath,check_value)
else:
pytest.fail(f"在进行数据校验时,遍历了所有的数据,均未找到目标数据,故将case置为fail")
def __calc_delta(self,timetype,delta_value,create_time,cur_time)->bool:
""""""
second_str = self.__conf.get_second_str()
minute_str = self.__conf.get_minute_str()
hour_str = self.__conf.get_hour_str()
day_str = self.__conf.get_day_str()
value = None
flag = False
if timetype == second_str:
value = self.__htime.time_delta_ss(create_time,cur_time)
elif timetype == minute_str:
value = self.__htime.time_delta_mm(create_time,cur_time)
elif timetype == hour_str:
value = self.__htime.time_delta_hh(create_time,cur_time)
elif timetype == day_str:
value = self.__htime.time_delta_days(create_time,cur_time)
# 计算时间差值是否符合预期
if value <= delta_value:
flag = True
# 返回结果
return flag
def __get_amount(self):
"""获取记录的总条数"""
res = self.__res
query_keys_key = self.__ymlkeys.get_query_keys_key()
checkdata = self.__check_data
query_keys_value = checkdata[query_keys_key]
amount_key = self.__ymlkeys.get_amount_key()
amount = query_keys_value[amount_key]
return eval(amount)
if __name__ == '__main__':
yml_path = "../../ymlfile/qingjia/test_askforleave.yml"
jsonfile_path = "../../tmp/askforleave.json"
import yaml
ymldata = yaml.safe_load(open(yml_path))["test_query"]
hjson = HandleJsonFile()
res = hjson.read_json_file_to_dict(jsonfile_path)
aa = HandleCheck()
aa.handle_checkdata(res,ymldata)