103 lines
4.2 KiB
Python
103 lines
4.2 KiB
Python
# -*- coding: utf-8 -*-
|
||
# -------------------------------
|
||
# @文件:response_assert.py
|
||
# @时间:2024/5/9 上午9:53
|
||
# @作者:caiweichao
|
||
# @功能描述:接口返回结果校验
|
||
# -------------------------------
|
||
from util.auto_test.api_test.case_model import ApiTestCase
|
||
from util.db.connect_mysql import ConnectMysql
|
||
from util.basic.analysis_json import analysis_json
|
||
from util.basic.log import Log
|
||
|
||
|
||
def response_assert(testcase: ApiTestCase, response):
|
||
for data in zip(testcase.check, testcase.result):
|
||
|
||
if "all_response" in data[0]:
|
||
Log.info("开始执行 all_response 接口全量返回值校验逻辑")
|
||
# 后续版本更新
|
||
pass
|
||
|
||
|
||
class ResponseAssert:
|
||
def __init__(self, testcase: ApiTestCase, response):
|
||
self.testcase = testcase
|
||
self.response = response
|
||
|
||
def assert_data(self):
|
||
for data in zip(self.testcase.check, self.testcase.result):
|
||
try:
|
||
if "sql" in data[0]:
|
||
if self.check_response_sql(data):
|
||
Log.info("sql 校验通过")
|
||
else:
|
||
return False
|
||
elif "exist" in data[0]:
|
||
if self.check_response_exist(data):
|
||
Log.info("key存在校验通过")
|
||
else:
|
||
return False
|
||
elif "not_exist" in data[0]:
|
||
if self.check_response_not_exist(data):
|
||
Log.info("key不存在校验通过")
|
||
else:
|
||
return False
|
||
elif "json" in data[0]:
|
||
if self.check_response_jsonpath(data):
|
||
Log.info("jsonpath 校验通过")
|
||
else:
|
||
return False
|
||
else:
|
||
Log.info(f"{data[0]} 不存在对应的校验逻辑")
|
||
except Exception:
|
||
raise Exception('校验异常请检查!')
|
||
return True
|
||
|
||
def check_response_exist(self, data):
|
||
Log.info("------------------ 开始执行 判断 key 是否存在的逻辑 ------------------")
|
||
if data[0].get("exist") in self.response:
|
||
Log.info(f"key:{data[0].get('exist')}在返回结果中存在")
|
||
return True
|
||
else:
|
||
Log.info(f"key:{data[0].get('exist')}在返回结果不存在")
|
||
return False
|
||
|
||
def check_response_not_exist(self, data):
|
||
Log.info("------------------ 开始执行 判断 key 不存在的逻辑 ------------------")
|
||
if data[0].get("not_exist") in self.response:
|
||
Log.info(f"key:{data[0].get('not_exist')}在返回结果中存在")
|
||
return False
|
||
else:
|
||
Log.info(f"key:{data[0].get('not_exist')}在返回结果不存在")
|
||
return True
|
||
|
||
def check_response_jsonpath(self, data):
|
||
Log.info("------------------ 开始执行 jsonpath 校验逻辑 ------------------")
|
||
value = analysis_json(self.response, data[0].get("json"))
|
||
if value == data[1]:
|
||
Log.info(f" {value} == {data[1]} ---校验通过")
|
||
return True
|
||
else:
|
||
Log.info(f"{value} == {data[1]} ---校验失败(如果日志打印出来的结果一致请检查数据类型)")
|
||
Log.info(f"{value} jsonpath解析的值类型为:{type(value)}")
|
||
Log.info(f"{data[1]} 类型为:{type(data[1])}")
|
||
return False
|
||
|
||
def check_response_sql(self, data):
|
||
Log.info("------------------ 开始执行 sql 校验逻辑 ------------------")
|
||
# 获取数据库连接
|
||
mysql_name = data[0].get("sql")[0]
|
||
sql = data[0].get("sql")[1]
|
||
with ConnectMysql(mysql_name) as db:
|
||
query_results = db.fetch_one(sql)
|
||
Log.info("开始比对 sql 的值和字段校验的值")
|
||
for i in range(len(query_results)):
|
||
if query_results[i] == data[1][i]:
|
||
Log.info(f"{query_results[i]} == {data[1][i]} ---校验通过")
|
||
else:
|
||
Log.info(f"{query_results[i]} == {data[1][i]} ---校验失败(如果日志打印出来的结果一致请检查数据类型)")
|
||
Log.info(f"{query_results[i]} 类型为:{type(query_results[i])}")
|
||
return False
|
||
return True
|