优化代码,增加内置复杂对象断言工具。

This commit is contained in:
chenyongzhi 2024-04-30 15:47:10 +08:00
parent 1b970c8a0e
commit 32eac87831
29 changed files with 2051 additions and 1548 deletions

View File

@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Pipenv (apitest)" project-jdk-type="Python SDK" />
<component name="Black">
<option name="sdkName" value="Python 3.11 (apitest)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11 (apitest)" project-jdk-type="Python SDK" />
</project>

View File

@ -1,5 +1,5 @@
[[source]]
url = "https://pypi.org/simple"
url = "https://mirrors.aliyun.com/pypi/simple"
verify_ssl = true
name = "pypi"

1564
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -11,202 +11,145 @@ import ast
import time
from common import bif_functions
from common.crypto.encrypt_data import EncryptData
from common.database.mysql_client import MysqlClient
from common.utils.decorators import singleton, send_request_decorator
from common.utils.exceptions import *
from common.validation.extractor import Extractor
from common.validation.load_and_execute_script import LoadScript
from common.validation.validator import Validator
from config.field_constants import FieldNames
@singleton
class Action(Extractor, LoadScript, Validator, EncryptData):
def __init__(self, initialize_data=None, db_config=None):
super().__init__()
self.db_config = db_config
self.__variables = {}
self.set_environments(initialize_data)
self.set_bif_fun(bif_functions)
@send_request_decorator
def send_request(self, host, method, extract_request_data, scripts_dir, prepost_script):
"""发送请求"""
url, kwargs = self.prepare_request(extract_request_data, self.variables)
self.http_client(host, url, method, **kwargs)
def prepare_request(self, extract_request_data, item):
"""准备请求数据"""
item = self.replace_dependent_parameter(item)
url, query_str, request_data, headers, request_data_type, h_crypto, r_crypto = self.request_info(item)
headers, query_str, request_data = self.analysis_request(query_str, request_data, h_crypto, headers, r_crypto,
extract_request_data)
kwargs = {request_data_type: request_data, "headers": headers, "params": query_str}
return url, kwargs
def analysis_request(self, query_str, request_data, headers_crypto, headers, request_crypto, extract_request_data):
"""分析请求数据"""
headers, request_data = self.encrypts(headers_crypto, headers, request_crypto, request_data)
if extract_request_data:
for data in (query_str, request_data):
if data:
self.substitute_data(data, jp_dict=extract_request_data)
return headers, query_str, request_data
def exc_sql(self, item):
"""执行sql处理"""
sql, sql_params_dict = self.sql_info(item)
self.variables = item
sql = self.replace_dependent_parameter(sql)
if sql:
client = MysqlClient(self.db_config)
execute_sql_results = client.execute_sql(sql)
print(f"| 执行 sql 成功--> {execute_sql_results}")
if execute_sql_results and sql_params_dict:
self.substitute_data(execute_sql_results, jp_dict=sql_params_dict)
def analysis_response(self, sheet, iid, name, desc, regex, keys, deps, jp_dict):
"""分析响应结果并提取响应"""
try:
self.substitute_data(self.response_json, regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except Exception as err:
msg = f"| 分析响应失败:{sheet}_{iid}_{name}_{desc}"
f"\nregex={regex};"
f" \nkeys={keys};"
f"\ndeps={deps};"
f"\njp_dict={jp_dict}"
f"\n{err}"
ParameterExtractionError(msg, err)
def execute_validation(self, excel, sheet, iid, name, desc, expected):
"""执行断言校验"""
expected = self.replace_dependent_parameter(expected)
try:
self.run_validate(expected, self.response_json)
# 如果不填写断言,则自动断言状态码
if not expected:
from common.validation.comparators import eq
eq(200, self.response.status_code)
result = "PASS"
except Exception as e:
result = "FAIL"
error_info = f"| exception case:**{sheet}_{iid}_{name}_{desc},{self.assertions}"
AssertionFailedError(error_info, e)
raise e
finally:
print(f'| <span style="color:yellow">断言结果-->{self.assertions}</span>\n')
print("-" * 50 + "我是分割线" + "-" * 50)
response = self.response.text if self.response is not None else str(self.response)
excel.write_back(sheet_name=sheet, i=iid, response=response, result=result, assertions=str(self.assertions))
def execute_dynamic_code(self, item, code):
self.variables = item
if code is not None:
try:
ast_obj = ast.parse(code, mode='exec')
compiled = compile(ast_obj, '<string>', 'exec')
exec(compiled, {"pm": self})
except Exception as e:
ExecuteDynamiCodeError(code, e)
raise e
return self.variables
def execute_prepost_script(self, scripts_dir, prepost_script, method_name):
self.load_and_execute_script(scripts_dir, prepost_script, self, method_name)
@property
def variables(self, key=None):
return self.__variables if not key else self.__variables.get(key)
@variables.setter
def variables(self, item):
self.__variables = item
@staticmethod
def base_info(item):
"""
获取基础信息
"""
sheet = item.pop(FieldNames.SHEET)
item_id = item.pop(FieldNames.ITEM_ID)
condition = item.pop(FieldNames.RUN_CONDITION)
sleep_time = item.pop(FieldNames.SLEEP_TIME)
name = item.pop(FieldNames.NAME)
desc = item.pop(FieldNames.DESCRIPTION)
method = item.pop(FieldNames.METHOD)
expected = item.pop(FieldNames.EXPECTED)
prepost_script = f"prepost_script_{sheet}_{item_id}.py"
return sheet, item_id, condition, sleep_time, name, desc, method, expected, prepost_script
@staticmethod
def sql_info(item):
sql = item.pop(FieldNames.SQL)
sql_params_dict = item.pop(FieldNames.SQL_PARAMS_DICT)
return sql, sql_params_dict
@staticmethod
def extractor_info(item):
"""
获取提取参数的基本字段信息
Args:
item:
class Action(LoadScript):
def __init__(self, host='', initialize_data=None):
super().__init__()
self.__variables = {}
if not initialize_data:
initialize_data = {}
self.set_environments(initialize_data)
self.set_bif_fun(bif_functions)
self.host = host
Returns:
def base_info(self, item):
"""
获取基础信息
"""
# self.variable = deepcopy(item)
self.sheet = item.pop(self.SHEET)
self.item_id = item.pop(self.ITEM_ID)
self.condition = item.pop(self.RUN_CONDITION)
self.sleep_time = item.pop(self.SLEEP_TIME)
self.name = item.pop(self.NAME)
self.desc = item.pop(self.DESCRIPTION)
self.method = item.pop(self.METHOD)
self.prepost_script = f"prepost_script_{self.sheet}_{self.item_id}.py"
self.sql = self.replace_dependent_parameter(item.pop(self.SQL))
self.exc_sql = item.pop(self.SQL_PARAMS_DICT)
self.regex = item.pop(self.REGEX)
self.keys = item.pop(self.REGEX_PARAMS_LIST)
self.deps = item.pop(self.RETRIEVE_VALUE)
self.jp_dict = item.pop(self.JSON_PATH_DICT)
self.extract_request_data = item.pop(self.EXTRACT_REQUEST_DATA)
self.url = self.replace_dependent_parameter(item.pop(self.URL))
self.query_str = self.replace_dependent_parameter(item.pop(self.QUERY_STRING))
self.body = self.replace_dependent_parameter(item.pop(self.REQUEST_DATA))
self.headers = self.replace_dependent_parameter(item.pop(self.HEADERS))
self.headers_crypto = item.pop(self.HEADERS_CRYPTO) # 请求头加密方式
self.body_typ = item.pop(self.REQUEST_DATA_TYPE) if item.get(self.REQUEST_DATA_TYPE) else self.PARAMS
self.body_crypto = item.pop(self.REQUEST_DATA_CRYPTO) # 请求数据加密方式
self.setup_script = item.pop(self.SETUP_SCRIPT)
self.teardown_script = item.pop(self.TEARDOWN_SCRIPT)
self.expected = item.pop(self.EXPECTED)
"""
regex = item.pop(FieldNames.REGEX)
keys = item.pop(FieldNames.REGEX_PARAMS_LIST)
deps = item.pop(FieldNames.RETRIEVE_VALUE)
jp_dict = item.pop(FieldNames.JSON_PATH_DICT)
extract_request_data = item.pop(FieldNames.EXTRACT_REQUEST_DATA)
return regex, keys, deps, jp_dict, extract_request_data
@staticmethod
def request_info(item):
"""
请求数据
"""
url = item.pop(FieldNames.URL)
query_str = item.pop(FieldNames.QUERY_STRING)
request_data = item.pop(FieldNames.REQUEST_DATA)
headers = item.pop(FieldNames.HEADERS)
request_data_type = item.pop(FieldNames.REQUEST_DATA_TYPE) if item.get(
FieldNames.REQUEST_DATA_TYPE) else FieldNames.PARAMS
headers_crypto = item.pop(FieldNames.HEADERS_CRYPTO)
request_data_crypto = item.pop(FieldNames.REQUEST_DATA_CRYPTO)
return url, query_str, request_data, headers, request_data_type, headers_crypto, request_data_crypto
@staticmethod
def script(item):
setup_script = item.pop(FieldNames.SETUP_SCRIPT)
teardown_script = item.pop(FieldNames.TEARDOWN_SCRIPT)
return setup_script, teardown_script
@staticmethod
def is_run(condition):
is_run = condition
if not is_run or is_run.upper() != FieldNames.YES:
return True
return None
@staticmethod
def is_only_sql(method):
if method.upper() == FieldNames.SQL:
return True
return None
@staticmethod
def pause_execution(sleep_time):
if sleep_time:
try:
time.sleep(sleep_time)
except Exception as e:
raise InvalidSleepTimeError(f"{sleep_time}", e)
@send_request_decorator
def send_request(self):
"""发送请求"""
if not self.body_typ:
raise "当前测试用例没有填写请求参数类型!"
self.http_client()
def analysis_request(self):
"""分析请求数据"""
# 开始请求头、请求body加密
headers, body = self.encrypts(self.headers_crypto, self.headers, self.body_crypto, self.body)
if self.extract_request_data:
for data in (self.query_str, body):
if data:
self.substitute_data(data, jp_dict=self.extract_request_data)
kwargs = {self.body_typ: body, "headers": headers, "params": self.query_str}
self.processing_data(self.host, self.url, self.method, **kwargs)
if __name__ == '__main__':
print(Action())
def analysis_response(self):
"""分析响应结果并提取响应"""
try:
self.substitute_data(self.response_json, regex=self.regex, keys=self.keys, deps=self.deps,
jp_dict=self.jp_dict)
except Exception as err:
msg = f"| 分析响应失败:{self.sheet}_{self.iid}_{self.name}_{self.desc}"
f"\nregex={self.regex};"
f" \nkeys={self.keys};"
f"\ndeps={self.deps};"
f"\njp_dict={self.jp_dict}"
f"\n{err}"
ParameterExtractionError(msg, err)
def execute_validation(self, excel=None):
"""执行断言校验"""
expected = self.replace_dependent_parameter(self.expected)
result = None
try:
self.run_validate(expected, self.response_json)
# 如果不填写断言,则自动断言状态码
if not self.expected:
from common.validation.comparators import eq
eq(200, self.response.status_code)
result = "PASS"
except Exception as e:
result = "FAIL"
error_info = f"| exception case:**{self.sheet}_{self.item_id}_{self.name}_{self.desc},{self.assertions}"
AssertionFailedError(error_info, e)
raise e
finally:
print(f'| <span style="color:yellow">断言结果-->{self.assertions}</span>\n')
print("-" * 50 + "我是分割线" + "-" * 50)
response = self.response.text if self.response is not None else str(self.response)
if not excel:
excel.write_back(sheet_name=self.sheet, i=self.item_id, response=response, result=result,
assertions=str(self.assertions))
def execute_dynamic_code(self, code):
if code is not None:
try:
ast_obj = ast.parse(code, mode='exec')
compiled = compile(ast_obj, '<string>', 'exec')
exec(compiled, {"py": self})
except Exception as e:
ExecuteDynamiCodeError(code, e)
raise e
def execute_prepost_script(self, scripts_dir, prepost_script, method_name):
self.load_and_execute_script(scripts_dir, prepost_script, self, method_name)
def is_run(self):
if not self.condition or self.condition.upper() != 'NO':
return True
return None
def exec_sql(self, client):
"""执行sql处理"""
if self.sql:
execute_sql_results = client.execute_sql(self.sql)
print(f"| 执行 sql 成功--> {execute_sql_results}")
if execute_sql_results and self.exc_sql:
self.substitute_data(execute_sql_results, jp_dict=self.exc_sql)
def is_only_sql(self, client):
if self.method.upper() == self.SQL:
self.exec_sql(client)
return True
return None
def pause_execution(self):
if self.sleep_time:
try:
time.sleep(self.sleep_time)
except Exception as e:
raise InvalidSleepTimeError(f"{self.sleep_time}", e)

View File

@ -1,66 +1,67 @@
from functools import wraps
def _create_test_name(index,name, title):
"""
Create a new test name based on index and name.
:param index: Index for generating the test name.
:param name: Base name for the test.
:param title: Base title for the test.
:return: Generated test name.
"""
test_name = f"{name}_{index + 1:03}_{title}"
return test_name
def _create_test_name(index, name, title):
"""
Create a new test name based on index and name.
:param index: Index for generating the test name.
:param name: Base name for the test.
:param title: Base title for the test.
:return: Generated test name.
"""
test_name = f"{name}_{index + 1:03}_{title}"
return test_name
def _set_function_attributes(func, original_func, new_name, test_desc):
"""
Set attributes of a function.
:param func: The function to set attributes for.
:param original_func: The original function being wrapped.
:param new_name: New name for the function.
:param test_desc: New documentation for the function.
"""
func.__wrapped__ = original_func
func.__name__ = new_name
func.__doc__ = test_desc
"""
Set attributes of a function.
:param func: The function to set attributes for.
:param original_func: The original function being wrapped.
:param new_name: New name for the function.
:param test_desc: New documentation for the function.
"""
func.__wrapped__ = original_func
func.__name__ = new_name
func.__doc__ = test_desc
def _update_func(new_func_name, params, test_desc, func, *args, **kwargs):
"""
Create a wrapper function with updated attributes.
:param new_func_name: New name for the wrapper function.
:param params: Test parameters.
:param test_desc: Test description.
:param func: Original function to be wrapped.
:param args: Additional positional arguments for the function.
:param kwargs: Additional keyword arguments for the function.
:return: Wrapped function.
"""
@wraps(func)
def wrapper(self):
return func(self, params, *args, **kwargs)
_set_function_attributes(wrapper, func, new_func_name, test_desc)
return wrapper
"""
Create a wrapper function with updated attributes.
:param new_func_name: New name for the wrapper function.
:param params: Test parameters.
:param test_desc: Test description.
:param func: Original function to be wrapped.
:param args: Additional positional arguments for the function.
:param kwargs: Additional keyword arguments for the function.
:return: Wrapped function.
"""
@wraps(func)
def wrapper(self):
return func(self, params, *args, **kwargs)
_set_function_attributes(wrapper, func, new_func_name, test_desc)
return wrapper
def ddt(cls):
"""
:param cls: 测试类
:return:
"""
for func_name, func in list(cls.__dict__.items()):
if hasattr(func, "PARAMS"):
for index, case_data in enumerate(getattr(func, "PARAMS")):
name = str(case_data.get("Name", "缺少Name"))
test_desc = str(case_data.get("Description", "缺少Description"))
# new_test_name = _create_test_name(name)
new_test_name = _create_test_name(index, func_name, name)
func2 = _update_func(new_test_name, case_data, test_desc, func)
setattr(cls, new_test_name, func2)
else:
# Avoid name clashes
delattr(cls, func_name)
return cls
"""
:param cls: 测试类
:return:
"""
for func_name, func in list(cls.__dict__.items()):
if hasattr(func, "PARAMS"):
for index, case_data in enumerate(getattr(func, "PARAMS")):
print(case_data)
name = str(case_data.get("Name", "缺少Name"))
test_desc = str(case_data.get("Description", "缺少Description"))
# new_test_name = _create_test_name(name)
new_test_name = _create_test_name(index, func_name, name)
func2 = _update_func(new_test_name, case_data, test_desc, func)
setattr(cls, new_test_name, func2)
else:
# Avoid name clashes
delattr(cls, func_name)
return cls

View File

@ -14,50 +14,51 @@
class AnalysisJson:
def get_target_value(self, key, dic, tmp_list):
"""
:param key: 目标 key 的值
:param dic: json 数据
:param tmp_list: 用于储蓄获取到的数据
:return: 返回储蓄数据的 list
"""
if not isinstance(dic, dict) or not isinstance(tmp_list, list):
msg = "argv[1] not an dict or argv[-1] not an list "
return msg
if key in dic.keys():
tmp_list.append(dic[key]) # 传入数据如果存在 dict 则将字典遍历一次存入到 tmp_list 中
for value in dic.values(): # 传入数据不符合则对其value 值进行遍历
if isinstance(value, dict):
self.get_target_value(key, value, tmp_list) # 传入数据的value 是字典,则直接调用自身
elif isinstance(value, (list, tuple)):
self.get_value(key, value, tmp_list) # 传入数据的value值是列表或者元组则调用 get_value
else:
for value in dic.values(): # 遍历传入的字典值,value 为dic的值
if isinstance(value, dict): # 判断 value 是否字典
self.get_target_value(key, value, tmp_list) # 传入数据的值是字典,则直接调用自身
elif isinstance(value, (list, tuple)): # 判断 value 是否元组或者 list
self.get_value(key, value, tmp_list) # 传入数据的value 值是列表或者元组,则调用 get_value
return tmp_list
def get_value(self, key, val, tmp_list):
"""
:param key:目标 key 的值
:param val:
:param tmp_list:
:return:
"""
for sub_val in val:
if isinstance(sub_val, dict): # 判断 sub_val 的值是否是字典
self.get_target_value(key, sub_val, tmp_list) # 传入数据 value 值是字典 则调用get_target_value
elif isinstance(sub_val, (list, tuple)):
self.get_value(key, sub_val, tmp_list) # 传入数据的 value 值是列表或者元组,则调用自身
def get_target_value(self, key, dic, tmp_list):
"""
:param key: 目标 key 的值
:param dic: json 数据
:param tmp_list: 用于储蓄获取到的数据
:return: 返回储蓄数据的 list
"""
if not isinstance(dic, dict) or not isinstance(tmp_list, list):
msg = "argv[1] not an dict or argv[-1] not an list "
return msg
if key in dic.keys():
tmp_list.append(dic[key]) # 传入数据如果存在 dict 则将字典遍历一次存入到 tmp_list 中
for value in dic.values(): # 传入数据不符合则对其value 值进行遍历
if isinstance(value, dict):
self.get_target_value(key, value, tmp_list) # 传入数据的value 是字典,则直接调用自身
elif isinstance(value, (list, tuple)):
self.get_value(key, value, tmp_list) # 传入数据的value值是列表或者元组则调用 get_value
else:
for value in dic.values(): # 遍历传入的字典值,value 为dic的值
if isinstance(value, dict): # 判断 value 是否字典
self.get_target_value(key, value, tmp_list) # 传入数据的值是字典,则直接调用自身
elif isinstance(value, (list, tuple)): # 判断 value 是否元组或者 list
self.get_value(key, value, tmp_list) # 传入数据的value 值是列表或者元组,则调用 get_value
return tmp_list
def get_value(self, key, val, tmp_list):
"""
:param key:目标 key 的值
:param val:
:param tmp_list:
:return:
"""
for sub_val in val:
if isinstance(sub_val, dict): # 判断 sub_val 的值是否是字典
self.get_target_value(key, sub_val, tmp_list) # 传入数据 value 值是字典 则调用get_target_value
elif isinstance(sub_val, (list, tuple)):
self.get_value(key, sub_val, tmp_list) # 传入数据的 value 值是列表或者元组,则调用自身
if __name__ == '__main__':
test_dic = {'a': '1', 'g': '2', 'c': {
'd': [{'e': [{'f': [{'v': [{'g': '6'}, [{'g': '7'}, [{'g': 8}]]]}, 'm']}]}, 'h', {'g': [10, 12]}]}}
test_list = []
test = AnalysisJson()
resp = test.get_target_value("g", test_dic, test_list)
print(resp)
test_dic = {'a': '1', 'g': '2', 'c': {
'd': [{'e': [{'f': [{'v': [{'g': '6'}, [{'g': '7'}, [{'g': 8}]]]}, 'm']}]}, 'h', {'g': [10, 12]}]}}
test_list = []
test = AnalysisJson()
resp = test.get_target_value("g", test_dic, test_list)
print(resp)
[].extend(resp)

View File

@ -12,7 +12,7 @@ import os
import pymysql
def execute_sql_files(sql_path, db_config):
def execute_sql_files(sql_path, error_log_dir, db_config):
"""
批量执行sql语句
Args:
@ -23,32 +23,45 @@ def execute_sql_files(sql_path, db_config):
"""
connection = pymysql.connect(**db_config)
error_sql_list = []
try:
with connection.cursor() as cur:
# 获取指定目录下的所有SQL文件
sql_files = glob.glob(os.path.join(sql_path, "*.sql"))
print(f"current executing sql_files is {sql_files}\n")
for file in sql_files:
with open(file, "r", encoding="utf-8") as f:
sql_statements = f.read().strip().split(";")
for sql_statement in sql_statements:
if sql_statement:
cur.execute(sql_statement)
connection.commit()
base_name = os.path.basename(file)
error_log_file = os.path.join(error_log_dir, os.path.splitext(base_name)[0] + "_error.sql") # 收集错误sql文件
with open(error_log_file, 'w', encoding="utf-8") as error_log:
with open(file, "r", encoding="utf-8") as f:
# 按照分号拆分SQL语句
sql_statements = f.read().strip().split(";")
for sql_statement in sql_statements:
if sql_statement:
try:
cur.execute(sql_statement)
connection.commit()
print(f"-----> Successfully executed SQL: \n {sql_statement} <-----\n")
except Exception as e:
# 失败sql重写入错误sql收集文件
error_log.write(f"{sql_statement};\n")
print(f"Error executing SQL from {file}: {e};\n")
except Exception as e:
print(f'Error connecting to {sql_files}: {e}')
finally:
connection.close()
if __name__ == '__main__':
config = {
"host": "xxxx.xxx.xxx.xx",
"host": "....com",
"port": 3306,
"database": "db_name",
"user": "root",
"password": "xxxx"
"database": "",
"user": "",
"password": ""
}
execute_sql_files('./sql.sql', config)
error_sql_dir = r'D:\apiProject\apitest\error_sql_dir'
sql_files = r'D:\apiProject\apitest\sql_statements'
execute_sql_files(sql_files, error_sql_dir, config)

View File

@ -12,8 +12,6 @@ import json
import pymysql
# from DBUtils.PooledDB import PooledDB
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor
@ -40,7 +38,6 @@ class MysqlClient:
self.cursor = self.conn.cursor(DictCursor)
except Exception as e:
DatabaseExceptionError(self.db_base, e)
raise
def execute_sql(self, sql):
"""

View File

@ -24,94 +24,90 @@ from common.utils.exceptions import ResponseJsonConversionError
class HttpClient(LoadModulesFromFolder):
session = requests.Session()
def __init__(self):
super().__init__()
self.request = None
self.response = None
self.response_json = None
self.files = []
@request_retry_on_exception()
def http_client(self, host, url, method, **kwargs):
"""
发送 http 请求
@param host: 域名
@param url: 接口 __url
@param method: http 请求方法
@param kwargs: 接受 requests 原生的关键字参数
@return: 响应对象
"""
# 关闭 https 警告信息
urllib3.disable_warnings()
kwargs = self.processing_data(host, url, method, **kwargs)
# 发送请求
self.request = requests.Request(**kwargs)
self.response = self.session.send(self.request.prepare(), timeout=(15, 20), verify=True)
# 关闭文件
[i.close() for i in self.files if len(self.files) > 0]
self.post_response()
return self.response
def processing_data(self, host, url, method, **kwargs):
kwargs["method"] = method.lower()
if not url:
raise ValueError("URL cannot be None")
__url = f'{host}{url}' if not re.match(r"https?", url) else url
kwargs["url"] = __url
# 兼容处理 headers 参数为字符串类型的情况
if 'headers' in kwargs and isinstance(kwargs['headers'], str):
kwargs['headers'] = json.loads(kwargs['headers'])
# 兼容处理 json 参数为字符串类型的情况
if 'json' in kwargs and isinstance(kwargs['json'], str):
kwargs['json'] = json.loads(kwargs['json'])
kwargs['params'] = json.loads(kwargs['params']) if isinstance(kwargs['params'], str) and kwargs[
'params'] else None
# 处理 files 参数的情况
if 'files' in kwargs:
file_paths = kwargs['files']
if isinstance(file_paths, str):
file_paths = json.loads(file_paths)
files = []
file_utils = FileUtils()
for i, file_path in enumerate(file_paths):
file_type = mimetypes.guess_type(file_path)[0]
file_path_completion = file_utils.get_file_path(file_path)
f = open(file_path_completion, 'rb')
self.files.append(f)
files.append(
('file', (f'{file_path}', f, file_type))
)
kwargs['files'] = files
return kwargs
def post_response(self):
# 处理响应结果
self.update_environments("responseStatusCode", self.response.status_code)
self.update_environments("responseTime", round(self.response.elapsed.total_seconds() * 1000, 2))
try:
self.response_json = self.response.json()
except Exception as e:
ResponseJsonConversionError(self.response.text, str(e))
self.response_json = None
session = requests.Session()
def __init__(self):
super().__init__()
self.request = None
self.response = None
self.response_json = None
self.files = []
@request_retry_on_exception()
def http_client(self):
"""
发送 http 请求
"""
# 关闭 https 警告信息
urllib3.disable_warnings()
# 发送请求
prepared_request = requests.Request(**self.request).prepare()
self.response = self.session.send(prepared_request, timeout=(15, 20), verify=True)
self.post_response()
def processing_data(self, host, url, method, **kwargs):
"""
处理请求参数
:param host: 主机地址
:param url: 资源路径
:param method: 请求方法
:param kwargs: 额外的请求参数支持如headersjsonparamsfiles等关键字参数
:return:
"""
kwargs["method"] = method.lower()
if not url:
raise ValueError("URL cannot be None")
__url = f'{host}{url}' if not re.match(r"https?", url) else url
kwargs["url"] = __url
# 兼容处理 headers,json 参数为字符串类型的情况
for key in ['headers', 'json']:
if key in kwargs and isinstance(kwargs[key], str):
kwargs[key] = json.loads(kwargs[key])
kwargs['params'] = json.loads(kwargs['params']) if isinstance(kwargs['params'], str) and kwargs[
'params'] else kwargs.get('params')
# 处理 files 参数的情况
if 'files' in kwargs:
file_paths = kwargs['files']
if isinstance(file_paths, str):
file_paths = json.loads(file_paths)
files = []
file_utils = FileUtils()
for i, file_path in enumerate(file_paths):
file_type = mimetypes.guess_type(file_path)[0]
file_path_completion = file_utils.get_file_path(file_path)
f = open(file_path_completion, 'rb')
self.files.append(f)
files.append(
('file', (f'{file_path}', f, file_type))
)
kwargs['files'] = files
self.request = kwargs
def post_response(self):
# 处理响应结果
[i.close() for i in self.files if len(self.files) > 0] # 关闭文件
self.files.clear() # 清空文件列表
self.update_environments("response_status_code", self.response.status_code)
self.update_environments("response_time", round(self.response.elapsed.total_seconds() * 1000, 2))
self.update_environments("response", self.response.text)
try:
self.response_json = self.response.json()
except Exception as e:
ResponseJsonConversionError(self.response.text, str(e))
self.response_json = None
if __name__ == '__main__':
hst = 'http://localhost:3000'
ul = '/get'
meth = 'get'
kwarg = {
'headers': {},
'data': {},
"params":"",
# 'files': ['test.txt']
}
pyt = HttpClient()
pyt.http_client(hst, ul, meth, **kwarg)
hst = 'http://localhost:3000'
ul = '/get'
meth = 'get'
kwarg = {
'headers': {},
'data': {},
"params": "",
# 'files': ['test.txt']
}
pyt = HttpClient()
pyt.http_client(hst, ul, meth, **kwarg)

View File

@ -65,9 +65,10 @@ class SendEmail:
def send_mail(self, content, file_path=None):
"""发送邮件"""
try:
# s = smtplib.SMTP_SSL(self.host, self.port)
s = smtplib.SMTP(self.host, self.port)
s = smtplib.SMTP_SSL(self.host, self.port)
# s = smtplib.SMTP(self.host, self.port)
s.starttls() # 尝试启用TLS/SSL连接
s.set_debuglevel(True)
s.login(self.user, self.password)
s.sendmail(self.sender, self.receivers, self.content(content, file_path))
s.quit()
@ -78,4 +79,4 @@ class SendEmail:
if __name__ == '__main__':
s = SendEmail()
s.send_mail()
s.send_mail("abc")

View File

@ -16,194 +16,183 @@ import yaml
def singleton(cls):
"""
Args:
cls:Decorated class
Returns:
"""
instance = {}
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instance:
instance[cls] = cls(*args, **kwargs)
return instance[cls]
return get_instance
"""
Args:
cls:Decorated class
Returns:
"""
instance = {}
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instance:
instance[cls] = cls(*args, **kwargs)
return instance[cls]
return get_instance
def request_retry_on_exception(retries=2, delay=1.5):
"""Retry on Failed Requests"""
from common.utils.exceptions import RequestSendingError
def request_decorator(func):
e = None
@wraps(func)
def wrapper(*args, **kwargs):
nonlocal e
for i in range(retries):
# st = time.time()
try:
response = func(*args, **kwargs)
# print(f"| 代码耗时--> {time.time() - st}")
print(f"| 请求地址 --> {response.request.url}")
print(f"| 请求方法 --> {response.request.method}")
print(f"| 请求头 --> {response.request.headers}")
print(f"| 请求 body --> {response.request.body}")
print(f"| 接口状态--> {response.status_code}")
print(f"| 接口耗时--> {response.elapsed}")
try:
print(f"| 接口响应--> {response.json()}")
except:
print(f"| 接口响应--> {response.text}")
except Exception as error:
print(f"| 第{i + 1}次请求参数=【{args}__{kwargs}")
# print(f"| 代码耗时--> {time.time() - st}")
e = error
time.sleep(delay)
else:
return response
raise RequestSendingError(f"{args}__{kwargs}", e)
return wrapper
return request_decorator
"""Retry on Failed Requests"""
from common.utils.exceptions import RequestSendingError
def request_decorator(func):
@wraps(func)
def wrapper(self):
last_exception = None
for i in range(retries):
try:
func(self)
print(f"| 请求地址 --> {self.response.request.url}")
print(f"| 请求方法 --> {self.response.request.method}")
print(f"| 请求头 --> {self.response.request.headers}")
print(f"| 请求 body --> {self.response.request.body}")
print(f"| 接口状态--> {self.response.status_code}")
print(f"| 接口耗时--> {self.response.elapsed}")
try:
print(f"| 接口响应--> {self.response.json()}")
except ValueError:
print(f"| 接口响应--> {self.response.text}")
break
except Exception as error:
last_exception = error
print(f"| 第{i + 1}次请求参数=【{self.request}")
print(f"| 请求失败,原因: {error}")
time.sleep(delay)
if last_exception:
raise RequestSendingError(f"Failed to send request after {retries} retries.", last_exception)
return wrapper
return request_decorator
def list_data(datas):
"""
:param datas: Test data
:return:
"""
def wrapper(func):
setattr(func, "PARAMS", datas)
return func
return wrapper
"""
:param datas: Test data
:return:
"""
def wrapper(func):
setattr(func, "PARAMS", datas)
return func
return wrapper
def yaml_data(file_path):
"""
:param file_path:YAML file path
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
setattr(func, "PARAMS", datas)
return func
return wrapper
"""
:param file_path:YAML file path
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
setattr(func, "PARAMS", datas)
return func
return wrapper
def json_data(file_path):
"""
:param file_path: YAML file path
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = json.load(f)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = json.load(f)
setattr(func, "PARAMS", datas)
return func
return wrapper
"""
:param file_path: YAML file path
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = json.load(f)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = json.load(f)
setattr(func, "PARAMS", datas)
return func
return wrapper
import time
def run_count(count, interval, func, *args, **kwargs):
"""Run Count"""
for i in range(count):
try:
func(*args, **kwargs)
except Exception as e:
# print("====用例执行失败===", e)
# traceback.print_exc()
if i + 1 == count:
raise e
else:
print("==============开始第{}次重运行=============".format(i))
time.sleep(interval)
else:
break
"""Run Count"""
for i in range(count):
try:
func(*args, **kwargs)
except Exception as e:
# print("====用例执行失败===", e)
# traceback.print_exc()
if i + 1 == count:
raise e
else:
print("==============开始第{}次重运行=============".format(i))
time.sleep(interval)
else:
break
def rerun(count, interval=2):
"""
Decorator for rerunning a single test case; note that if using ddt, this method should be used before ddt
:param count: Number of retries on failure
:param interval: Interval time between each retry, default is three seconds
:return:
"""
def wrapper(func):
def decorator(*args, **kwargs):
run_count(count, interval, func, *args, **kwargs)
return decorator
return wrapper
"""
Decorator for rerunning a single test case; note that if using ddt, this method should be used before ddt
:param count: Number of retries on failure
:param interval: Interval time between each retry, default is three seconds
:return:
"""
def wrapper(func):
def decorator(*args, **kwargs):
run_count(count, interval, func, *args, **kwargs)
return decorator
return wrapper
def install_dependencies(func):
"""Checking and Installing Dependencies"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
print("---------------- Checking and Installing Dependencies ----------------")
subprocess.check_call(["pipenv", "install"])
print("---------------- Successfully Installed All Dependencies ----------------")
except Exception as e:
print(f"Failed to install dependencies: {str(e)}")
sys.exit(1)
else:
return func(*args, **kwargs)
return wrapper
"""Checking and Installing Dependencies"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
print("---------------- Checking and Installing Dependencies ----------------")
subprocess.check_call(["pipenv", "install"])
print("---------------- Successfully Installed All Dependencies ----------------")
except Exception as e:
print(f"Failed to install dependencies: {str(e)}")
sys.exit(1)
else:
return func(*args, **kwargs)
return wrapper
def send_request_decorator(func):
"""Decorator to handle the logic of sending requests."""
def decorator(self, host, method, extract_request_data, scripts_dir, prepost_script):
"""Handles setup, request execution, and teardown logic for sending requests.
"""Decorator to handle the logic of sending requests."""
Args:
self: The instance of the class.
host: The host for the request.
method: The HTTP method for the request.
extract_request_data: Data for extracting request details.
scripts_dir: The script dir.
prepost_script: The prepost script name.
def decorator(self):
"""Handles setup, request execution, and teardown logic for sending requests.
Returns:
The response from the request.
"""
setup_script, teardown_script = self.script(self.variables)
# 执行 excel 与 指定模块文件中的动态代码
self.execute_dynamic_code(self.variables, setup_script)
self.execute_prepost_script(scripts_dir, prepost_script, "setup")
response = func(self, host, method, extract_request_data, scripts_dir, prepost_script)
self.execute_dynamic_code(self.response, teardown_script)
self.execute_prepost_script(scripts_dir, prepost_script, "teardown")
return response
return decorator
Args:
self: The instance of the class.
Returns:
The response from the request.
"""
# 执行 excel 与 指定模块文件中的动态代码
self.analysis_request()
self.execute_dynamic_code(self.setup_script)
self.execute_prepost_script(self.scripts_dir, self.prepost_script, "setup")
func(self)
self.execute_dynamic_code(self.teardown_script)
self.execute_prepost_script(self.scripts_dir, self.prepost_script, "teardown")
return decorator

View File

@ -11,9 +11,10 @@
import re
from dataclasses import dataclass
from config.field_constants import FieldNames
@dataclass
class Environments:
class Environments(FieldNames):
environments = {}
PARAMETER_MATCHER = re.compile(r"{{\s*([^}\s]+)\s*}}(?:\[(\d+)\])?") # 匹配需要替换的参数
PARAMETER_PATTERN = re.compile(r"{{(.*?)}}") # 匹配参数模式 {{...}}

View File

@ -31,4 +31,5 @@ comparator_dict = {
'regex_search': 'regex_search:正则匹配(从字符串的任意位置匹配)',
'startswith': 'startswith:实际值是以期望值开始',
'endswith': 'endswith:实际值是以期望值结束',
'check': 'check: 实际复杂对象包含预期复杂对象'
}

View File

@ -11,206 +11,220 @@ import json
import re
def p_string(a, e):
def p_string(a, e, failed_info_ignore=None):
ta = str(type(a)).replace("<", "(").replace(">", ")")
te = str(type(e)).replace("<", "(").replace(">", ")")
return f"<span style='color:red' >预期:{a} -> {ta}, 实际:{e} -> {te}</span>"
return f"<span style='color:red' >实际: {a} -> {ta}, 预期: {e} -> {te}, 失败信息及忽略字段: {failed_info_ignore}</span>"
def eq(actual_value, expect_value):
def eq(actual_value, expect_value, ignore=None):
"""
实际值与期望值相等
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert actual_value == expect_value, p_string(actual_value, expect_value)
assert actual_value == expect_value, p_string(actual_value, expect_value, ignore)
def lt(actual_value, expect_value):
def lt(actual_value, expect_value, ignore=None):
"""
实际值小于期望值
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert actual_value < expect_value, p_string(actual_value, expect_value)
assert actual_value < expect_value, p_string(actual_value, expect_value, ignore)
def lte(actual_value, expect_value):
def lte(actual_value, expect_value, ignore=None):
"""
实际值小于或等于期望值
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert actual_value <= expect_value, p_string(actual_value, expect_value)
assert actual_value <= expect_value, p_string(actual_value, expect_value, ignore)
def gt(actual_value, expect_value):
def gt(actual_value, expect_value, ignore=None):
"""
实际值大于期望值
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert actual_value > expect_value, p_string(actual_value, expect_value)
assert actual_value > expect_value, p_string(actual_value, expect_value, ignore)
def gte(actual_value, expect_value):
def gte(actual_value, expect_value, ignore=None):
"""
实际值大于或等于期望值
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert actual_value >= expect_value, p_string(actual_value, expect_value)
assert actual_value >= expect_value, p_string(actual_value, expect_value, ignore)
def neq(actual_value, expect_value):
def neq(actual_value, expect_value, ignore=None):
"""
实际值与期望值不相等
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert actual_value != expect_value, p_string(actual_value, expect_value)
assert actual_value != expect_value, p_string(actual_value, expect_value, ignore)
def str_eq(actual_value, expect_value):
def str_eq(actual_value, expect_value, ignore=None):
"""
字符串实际值与期望值相同
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert str(actual_value) == str(
expect_value), p_string(actual_value, expect_value)
assert str(actual_value) == str(expect_value), p_string(actual_value, expect_value, ignore)
def length_eq(actual_value, expect_value):
def length_eq(actual_value, expect_value, ignore=None):
"""
实际值的长度等于期望长度
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value)
assert len(actual_value) == expect_value, p_string(actual_value, expect_value)
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value, ignore)
assert len(actual_value) == expect_value, p_string(actual_value, expect_value, ignore)
def length_gt(actual_value, expect_value):
def length_gt(actual_value, expect_value, ignore=None):
"""
实际值的长度大于期望长度
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value)
assert len(actual_value) > expect_value, p_string(actual_value, expect_value)
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value, ignore)
assert len(actual_value) > expect_value, p_string(actual_value, expect_value, ignore)
def length_gte(actual_value, expect_value):
def length_gte(actual_value, expect_value, ignore=None):
"""
实际值的长度大于或等于期望长度
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value)
assert len(actual_value) >= expect_value, p_string(actual_value, expect_value)
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value, ignore)
assert len(actual_value) >= expect_value, p_string(actual_value, expect_value, ignore)
def length_lt(actual_value, expect_value):
def length_lt(actual_value, expect_value, ignore=None):
"""
实际值的长度小于期望长度
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value)
assert len(actual_value) < expect_value, p_string(actual_value, expect_value)
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value, ignore)
assert len(actual_value) < expect_value, p_string(actual_value, expect_value, ignore)
def length_lte(actual_value, expect_value):
def length_lte(actual_value, expect_value, ignore=None):
"""
实际值的长度小于或等于期望长度
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value)
assert len(actual_value) <= expect_value, p_string(actual_value, expect_value)
assert isinstance(expect_value, (int,)), p_string(actual_value, expect_value, ignore)
assert len(actual_value) <= expect_value, p_string(actual_value, expect_value, ignore)
def contains(actual_value, expect_value):
def contains(actual_value, expect_value, ignore=None):
"""
期望值包含在实际值中
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert isinstance(actual_value, (list, tuple, dict, str, bytes)), p_string(actual_value, expect_value)
assert expect_value in actual_value, p_string(actual_value, expect_value)
assert isinstance(actual_value, (list, tuple, dict, str, bytes)), p_string(actual_value, expect_value, ignore)
assert expect_value in actual_value, p_string(actual_value, expect_value, ignore)
def contained_by(actual_value, expect_value):
def contained_by(actual_value, expect_value, ignore=None):
"""
实际值被包含在期望值中
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert isinstance(expect_value, (list, tuple, dict, str, bytes)), p_string(actual_value, expect_value)
assert actual_value in expect_value, p_string(actual_value, expect_value)
assert isinstance(expect_value, (list, tuple, dict, str, bytes)), p_string(actual_value, expect_value, ignore)
assert actual_value in expect_value, p_string(actual_value, expect_value, ignore)
def type_match(actual_value, expect_value):
def type_match(actual_value, expect_value, ignore=None):
"""
实际值的类型与期望值的类型相匹配
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
@ -229,13 +243,14 @@ def type_match(actual_value, expect_value):
else:
raise ValueError(name)
assert isinstance(actual_value, get_type(expect_value)), p_string(actual_value, expect_value)
assert isinstance(actual_value, get_type(expect_value)), p_string(actual_value, expect_value, ignore)
def regex_match(actual_value, expect_value):
def regex_match(actual_value, expect_value, ignore=None):
"""
正则匹配(从字符串的起始位置匹配)
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
@ -246,13 +261,14 @@ def regex_match(actual_value, expect_value):
actual_value = json.dumps(actual_value, ensure_ascii=False)
if not isinstance(expect_value, str):
expect_value = json.dumps(expect_value, ensure_ascii=False)
assert re.match(expect_value, actual_value), p_string(actual_value, expect_value)
assert re.match(expect_value, actual_value), p_string(actual_value, expect_value, ignore)
def regex_search(actual_value, expect_value):
def regex_search(actual_value, expect_value, ignore=None):
"""
正则匹配(从字符串的任意位置匹配)
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
@ -263,35 +279,334 @@ def regex_search(actual_value, expect_value):
actual_value = json.dumps(actual_value, ensure_ascii=False)
if not isinstance(expect_value, str):
expect_value = json.dumps(expect_value, ensure_ascii=False)
assert re.search(expect_value, actual_value), p_string(actual_value, expect_value)
assert re.search(expect_value, actual_value), p_string(actual_value, expect_value, ignore)
def startswith(actual_value, expect_value):
def startswith(actual_value, expect_value, ignore=None):
"""
实际值是以期望值开始
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert str(actual_value).startswith(str(expect_value)), p_string(actual_value, expect_value)
assert str(actual_value).startswith(str(expect_value)), p_string(actual_value, expect_value, ignore)
def endswith(actual_value, expect_value):
def endswith(actual_value, expect_value, ignore=None):
"""
实际值是以期望值结束
Args:
ignore:
actual_value: 实际值
expect_value: 期望值
Returns:
"""
assert str(actual_value).endswith(str(expect_value)), p_string(actual_value, expect_value)
assert str(actual_value).endswith(str(expect_value)), p_string(actual_value, expect_value, ignore)
def check(actual_value, expect_value, ignore=None):
"""
查两个复杂的对象是否一致支持忽略不进行比较的 key
Args:
actual_value: 实际结果对象
expect_value: 期望结果对象
ignore (list): 要忽略比较的 key 列表
Returns:
tuple:
- bool: 表示比较结果的布尔值True 表示一致False 表示不一致
- list: 包含失败信息的列表
"""
failed_info = []
check_rest = True
def check_data(actual, expect, ignore=None, path='', current_level_failed_info=None):
"""
递归比较两个对象
Args:
actual: 实际对象
expect: 期望对象
ignore (list): 要忽略比较的 key 列表
path (str): 当前比较路径
current_level_failed_info (list): 当前级别的失败信息列表
Returns:
tuple:
- bool: 表示比较结果的布尔值True 表示一致False 表示不一致
- list: 包含失败信息的列表
"""
if current_level_failed_info is None:
current_level_failed_info = []
ignore = ignore if ignore else []
dict_type = [dict]
array_type = [list, tuple, set]
actual_type = type(actual)
expect_type = type(expect)
if actual_type == expect_type:
check_ret = True
if actual_type in array_type:
is_homogeneous = all(isinstance(item, (str, int, float)) for item in actual) and all(
isinstance(item, (str, int, float)) for item in expect)
# 检查长度
actual_len = len(actual)
expect_len = len(expect)
if actual_len != expect_len:
current_level_failed_info.append(
{"status": "Mismatch", "message": f"预期值长度:{expect_len}, 不等于实际值长度:{actual_len}",
"path": path, "actual_value": actual, "expect_value": expect})
return False, current_level_failed_info
# 无序对比
if is_homogeneous:
set_actual = set(actual)
set_expect = set(expect)
if set_actual == set_expect:
return True, current_level_failed_info
else:
current_level_failed_info.append(
{"status": "Mismatch", "message": "预期值不等于实际值", "path": path,
"actual_value": actual, "expect_value": expect})
return False, current_level_failed_info
elif all(isinstance(item, dict) for item in actual) and all(
isinstance(item, dict) for item in expect):
# 循环递归
for act_item in actual:
match_found = False
for exp_item in expect:
ret, _ = check_data(act_item, exp_item, ignore=ignore, path=path,
current_level_failed_info=current_level_failed_info)
if ret:
expect.remove(exp_item)
match_found = True
break
if not match_found:
current_level_failed_info.append(
{"status": "Mismatch", "message": "预期值与实际不相等", "path": path,
"actual_value": act_item, "expect_value": None})
return False, current_level_failed_info
return True, current_level_failed_info
else:
# 开始循环递归,并移除对比过的元素
for act_item in actual:
match_found = False
for exp_item in expect:
ret, _ = check_data(act_item, exp_item, ignore=ignore, path=path,
current_level_failed_info=current_level_failed_info)
if ret:
expect.remove(exp_item)
match_found = True
break
if not match_found:
current_level_failed_info.append(
{"status": "Mismatch", "message": "预期值与实际不相等", "path": path,
"actual_value": act_item, "expect_value": None})
return False, current_level_failed_info
return True, current_level_failed_info
elif actual_type in dict_type:
common_keys = set(actual.keys()) & set(expect.keys())
for key in sorted(common_keys):
if key not in ignore:
ret, _ = check_data(actual[key], expect[key], ignore=ignore,
path="{}{}{}".format(path, '.' if path else '', key),
current_level_failed_info=current_level_failed_info)
check_ret = ret if not ret and check_ret else check_ret
extra_keys = set(expect.keys()) - common_keys
if extra_keys:
for key in extra_keys:
path_with_key = "{}{}{}".format(path, '.' if path else '', key)
if key not in ignore:
current_level_failed_info.append(
{"status": "Mismatch", "message": "value_not_equal", "path": path_with_key,
"actual_value": None, "expect_value": expect[key]})
check_ret = False
return check_ret, current_level_failed_info
else:
if actual != expect:
current_level_failed_info.append(
{"status": "Mismatch", "message": "value_not_equal", "path": path,
"actual_value": actual, "expect_value": expect})
return False, current_level_failed_info
else:
return True, current_level_failed_info
else:
current_level_failed_info.append(
{"status": "Mismatch", "message": "预期值类型不等于实际值类型", "path": path,
"actual_type": str(actual_type),
"expect_type": str(expect_type)})
return False, current_level_failed_info
current_level_failed_info = []
check_ret, _ = check_data(actual_value, expect_value, ignore=ignore,
current_level_failed_info=current_level_failed_info)
if not check_ret:
failed_info.extend(current_level_failed_info)
failed_info.append({"ignore_field": ignore})
check_rest = check_ret if not check_ret and check_rest else check_rest
assert check_rest is True, p_string(actual_value, expect_value, failed_info)
if __name__ == '__main__':
eq(1, "1")
lte(1.23, 400.3)
# 测试用例
test_case_list = [
# 简单数据匹配
("1", "1", None), # 预期true
(1, "1", None), # 预期true
(1, 1, None), # 预期true
(1.0, 1, None), # 预期true
# # # 字典匹配(无排除字段)
({'a': 1, 'b': 2}, {'a': 1, 'b': 2}, None), # 预期: 匹配成功
({'a': {'c': 1}, 'b': 2}, {'a': {'c': 1}, 'b': 2}, []), # 预期: 匹配成功
# # 字典不匹配(部分或全部键值对不一致)
({'a': 1, 'b': 2}, {'a': 1, 'b': 3}, None), # 预期: 不匹配
({'a': 1, 'b': 2}, {'a': 1, 'b': 3}, ["b"]), # 预期: 不匹配
({'a': {'c': 1}, 'b': 2}, {'a': {'c': 2}, 'b': 2}, []), # 预期: 不匹配
({'a': {'c': 1}, 'b': 2}, {'a': {'c': 2}, 'b': 2}, ['c']), # 预期: 匹配成功
#
# # 列表中字典匹配(无排除字段)
([{'a': 1, 'b': 3}, {'a': 1, 'b': 2}, {'a': 1, 'b': 3}, {'a': 1, 'b': 3}, {'a': 1, 'b': 3}],
[{'a': 1, 'b': 2}, {'a': 1, 'b': 3}, {'a': 1, 'b': 3}, {'a': 1, 'b': 3}, {'a': 1, 'b': 3}], None), # 预期: 匹配成功
# 嵌套结构匹配并排除指定字段
([{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [1, {'f': 2}]}, 'b': 2}],
[{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [2, {'f': 2}]}, 'b': 2}], ['c', 'e[0]']), # 预期: 不匹配
([{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [1, {'f': 2}]}, 'b': 2}],
[{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [2, {'f': 2}]}, 'b': 2}], ['c', 'e']), # 预期: 匹成功
([{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [1, {'f': 2}]}, 'b': 2}],
[{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [2, {'f': 2}]}, 'b': 2}], ['e']), # 预期: 匹配成功
([{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [1, {'f': 2}, 'xxx']}, 'b': 2}],
[{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [1, {'f': 2}, 'xxx']}, 'b': 2}], [1, 'xxxx']), # 预期: 匹配成功
([{'b': 2, 'a': 1, 'c': {'d': 3}}, {'a': {'b': 2, 'e': [1, {'f': 2}]}}],
[{'a': 1, 'b': 2, 'c': {'d': 3}}, {'a': {'e': [2, {'f': 2}]}, 'b': 2}], ['e', "b"]), # 预期: 匹配成功
({'childStock': True, 'childComposeList': [{'childSkuCode': '1684379802095', 'childSkuPriceId': None}],
'skuCode': '1684379802095'},
{'childStock': True, 'childComposeList': [
{'childSkuId': None, 'childSkuName': '', 'childSkuCode': '1684379802095', 'id': None,
'childSkuPriceId': None}], 'id': None, 'skuId': None},
['childSkuId', 'childSkuName', 'id', 'skuId', 'skuCode']), # 预期: 匹配成功
# # 简单类型列表匹配
([1, 2, 3], [1, 2, 3], None), # 预期: 匹配成功
(['a', 'b', 'c'], ['a', 'b', 'c'], None), # 预期: 匹配成功
([{'a': 'a'}, {'b': 'b'}, {'c': 'c'}], [{'b': 'b'}, {'a': 'a'}, {'c': 'c'}], None), # 预期: 匹配成功
# 简单类型列表不匹配
([1, 2, 3], [1, 2, 4], None), # 预期: 不匹配
([1, 2, 3], [1, 2, 4], [3, 4]), # 预期: 匹配成功
(['a', 'b', 'c'], ['a', 'b', 'd'], None), # 预期: 不匹配
(['a', 'b', 'c'], ['a', 'b', 'd'], ['c', 'd']), # 预期: 匹配成功
([3, 2, 1], [1, 2, 3], None), # 预期: 匹配成功
# 无序字段匹配
({'b': 2, 'a': {'c': 1}, }, {'a': {'c': 1}, 'b': 2}, []), # 预期: 匹配成功
({'b': 2, 'a': {'c': 2}}, {'a': {'c': 1}, 'b': 2}, ['c']), # 预期: 匹配成功
({'b': 2, 'a': {'c': 2}, 'd': [3, 2, 1]}, {'a': {'c': 1}, 'b': 2, 'd': [2, 1, 3]}, ['c']), # 预期: 匹配成功
([{"bindingCode": "MoonPromotion", "updateTime": "2013-12-16T23:11:14", "id": 7, "status": 1},
{"bindingCode": "84097&80462一并销售", "updateTime": "2013-12-16T23:11:13", "id": 1, "status": 1},
{"bindingCode": "Amway710Promotion", "updateTime": "2013-12-16T23:11:13", "id": 3, "status": 1},
{"bindingCode": "AtriSummer", "updateTime": "2013-12-16T23:11:13", "id": 5, "status": 1}],
[{"bindingCode": "MoonPromotion", "updateTime": "2013-12-16T23:11:14", "id": 7, "status": 1},
{"bindingCode": "AtriSummer", "updateTime": "2013-12-16T23:11:13", "id": 5, "status": 1},
{"bindingCode": "Amway710Promotion", "updateTime": "2013-12-16T23:11:13", "id": 3, "status": 1},
{"bindingCode": "84097&80462一并销售", "updateTime": "2013-12-16T23:11:13", "id": 1, "status": 1}], None),
(["AONV", "AO", "AOHD"], ["AO", "AONV", "AOHD"], None), # 预期:匹配成功
([], ["AO", "AONV", "AOHD"], None), # 预期:匹配失败,长度不同
([], [{"AO": "AO"}, {"AONV": "AONV"}, {"AOHD": "AOHD"}], None), # 预期:匹配失败,长度不同
([], [{"AO": "AO"}, {"AONV": "AONV"}, {"AOHD": "AOHD"}], ['AO', 'AONV', 'AOHD']), # 预期:匹配失败
([{"bindingCode": "222", "updateTime": "2023-11-17T16:20:52", "id": 200016, "status": True},
{"bindingCode": "87476&87477一并销售", "updateTime": "2022-11-21T18:42:39", "id": 200014, "status": True},
{"bindingCode": "WOKPromotion", "updateTime": "2022-11-21T18:25:36", "id": 200012, "status": True},
{"bindingCode": "不粘锅86808&10413&86841", "updateTime": "2021-11-14T00:55:56", "id": 48, "status": True},
{"bindingCode": "GooseneckKit", "updateTime": "2021-08-19T18:28:26", "id": 43, "status": True},
{"bindingCode": "GooseneckWaterPurifier", "updateTime": "2021-08-19T18:28:25", "id": 41, "status": True},
{"bindingCode": "110003", "updateTime": "2020-10-20T12:28:50", "id": 81, "status": True}],
[{"bindingCode": "222", "updateTime": "2023-11-17T16:20:52", "id": 200016, "status": 1},
{"bindingCode": "WOKPromotion", "updateTime": "2022-11-21T18:25:36", "id": 200012, "status": 1},
{"bindingCode": "不粘锅86808&10413&86841", "updateTime": "2021-11-14T00:55:56", "id": 48, "status": 1},
{"bindingCode": "87476&87477一并销售", "updateTime": "2022-11-21T18:42:39", "id": 200014, "status": 1},
{"bindingCode": "GooseneckKit", "updateTime": "2021-08-19T18:28:26", "id": 43, "status": 1},
{"bindingCode": "GooseneckWaterPurifier", "updateTime": "2021-08-19T18:28:25", "id": 41, "status": 1},
{"bindingCode": "110003", "updateTime": "2020-10-20T12:28:50", "id": 81, "status": 1}], ["status"]),
# 预期,匹配成功
([{"tagCode": "AO", "role": "", "productCode": "9752_base", "startTime": "2021-05-29T19:09:46",
"endTime": "2026-11-19T19:09:46", "skuCode": "AUQMSU7iu4TgJLoJL", "channelCode": "DEFAULT"},
{"role": "ABO", "tagCode": "PROMOTIONS", "productCode": "9752_base", "startTime": "2024-01-29T15:09:16",
"endTime": "2024-03-29T15:09:16", "skuCode": "AUQMSU7iu4TgJLoJL", "channelCode": "DEFAULT"}],
[{"tagCode": "AO", "role": "", "productCode": "9752_base", "startTime": "2021-05-29T19:09:46",
"endTime": "2026-11-19T19:09:46", "skuCode": "AUQMSU7iu4TgJLoJL", "channelCode": "DEFAULT"},
{"tagCode": "PROMOTIONS", "role": "ABO", "productCode": "9752_base", "startTime": "2024-01-29T15:09:16",
"endTime": "2024-03-29T15:09:16", "skuCode": "AUQMSU7iu4TgJLoJL", "channelCode": "DEFAULT"}], None),
# 预期,匹配成功
([{"tagCode": "PROMOTIONS", "role": "ABO", "productCode": "20508_base",
"endTime": "2024-04-03T11:12:18", "skuCode": "20508", "channelCode": "DEFAULT"}],
[{"tagCode": "PROMOTIONS", "role": "ABO", "productCode": "20508_base", "startTime": "2024-02-03T11:12:18",
"endTime": "2024-04-03T11:12:18", "skuCode": "20508", "channelCode": "DEFAULT"}], ["startTime"]), # 预期,匹配成功
([
{"tagCode": "AO", "role": "", "productCode": "9752_base", "startTime": "2021-05-29T19:09:46",
"skuCode": "AUQMSU7iu4TgJLoJL", "channelCode": "DEFAULT"},
{"role": "ABO", "tagCode": "PROMOTIONS", "productCode": "9752_base", "startTime": "2024-01-29T15:09:16",
"endTime": "2024-03-29T15:09:16", "skuCode": "AUQMSU7iu4TgJLoJL", "channelCode": "DEFAULT"}
],
[
{"tagCode": "AO", "role": "", "productCode": "9752_base", "startTime": "2021-05-29T19:09:46",
"endTime": "2026-11-19T19:09:46", "skuCode": "AUQMSU7iu4TgJLoJL", "channelCode": "DEFAULT"},
{"tagCode": "PROMOTIONS", "role": "ABO", "productCode": "9752_base", "startTime": "2024-01-29T15:09:16",
"skuCode": "AUQMSU7iu4TgJLoJL", "channelCode": "DEFAULT"}
], ["endTime"]), # 预期,匹配成功
([{"micro_sort_value": 257, "sort_number_value": 10238, "skuCode": "41374"},
{"micro_sort_value": 257, "sort_number_value": 10238, "skuCode": "41375"},
{"micro_sort_value": 252, "sort_number_value": 10233, "skuCode": "999785"},
{"micro_sort_value": 245, "sort_number_value": 10223, "skuCode": "41131"},
{"micro_sort_value": 244, "sort_number_value": 10221, "skuCode": "41333"},
{"micro_sort_value": 243, "sort_number_value": 10220, "skuCode": "41211"},
{"micro_sort_value": 243, "sort_number_value": 10220, "skuCode": "41213"},
{"micro_sort_value": 242, "sort_number_value": 10218, "skuCode": "40822"},
{"micro_sort_value": 242, "sort_number_value": 10218, "skuCode": "40823"},
{"micro_sort_value": 242, "sort_number_value": 10218, "skuCode": "40824"}],
[{"micro_sort_value": 257, "sort_number_value": 10238, "skuCode": "41374"},
{"micro_sort_value": 257, "sort_number_value": 10238, "skuCode": "41375"},
{"micro_sort_value": 252, "sort_number_value": 10233, "skuCode": "999785"},
{"micro_sort_value": 245, "sort_number_value": 10223, "skuCode": "41131"},
{"micro_sort_value": 244, "sort_number_value": 10221, "skuCode": "41333"},
{"micro_sort_value": 243, "sort_number_value": 10220, "skuCode": "41211"},
{"micro_sort_value": 243, "sort_number_value": 10220, "skuCode": "41213"},
{"micro_sort_value": 242, "sort_number_value": 10218, "skuCode": "40822"},
{"micro_sort_value": 242, "sort_number_value": 10218, "skuCode": "40823"},
{"micro_sort_value": 242, "sort_number_value": 10218, "skuCode": "40824"}], None) # 预期,匹配成功
]
# 运行测试用例
for case in test_case_list:
exp, act, exclude_fields = case
try:
check(exp, act, exclude_fields)
print(f'Passed: {case}')
except Exception as e:
print(f'Failed: {case}, {e}')

View File

@ -75,8 +75,6 @@ class Extractor:
Returns:
"""
# logger.debug(f'正在执行数据提取:提取数据源内容:{resp_obj},{type(resp_obj)}')
# logger.debug('正在执行数据提取:提取表达式:{expr}'.format(expr=expr))
try:
result = jsonpath.jsonpath(resp_obj if isinstance(resp_obj, (dict, list)) else json.dumps(resp_obj), expr)
except Exception as e:
@ -89,11 +87,11 @@ class Extractor:
elif isinstance(result, list):
if len(result) == 1:
result = result[0]
# logger.info(f'提取成功,输出结果,提取表达式:{expr},提取结果:{result}')
return result
if __name__ == '__main__':
r_obg = {"data": ["key", 1, "val", 2]}
Extractor.extract_value_by_jsonpath(r_obg, "$.data[0]")
Extractor.extract_value_by_jsonpath(r_obg, 200)
res = Extractor.extract_value_by_jsonpath(r_obg, "$..*")
print(res)
# Extractor.extract_value_by_jsonpath(r_obg, 200)

View File

@ -13,6 +13,8 @@ import importlib.util
import os
import sys
from common.validation.validator import Validator
sys.path.append('..')
sys.path.append('../utils')
@ -20,59 +22,59 @@ from common.utils.exceptions import ScriptNotFoundError, ScriptExecuteError
from common.file_handling.file_utils import FileUtils
class LoadScript:
def __init__(self):
super().__init__()
def load_script(self, script_path):
"""
加载脚本文件并返回模块对象
class LoadScript(Validator):
Args:
script_path (str): 脚本文件的路径
def __init__(self):
super().__init__()
Returns:
module: 脚本文件对应的模块对象
"""
try:
spec = importlib.util.spec_from_file_location(os.path.basename(script_path), script_path)
script_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(script_module)
return script_module
except Exception as e:
ScriptNotFoundError(script_path, e)
return
def load_and_execute_script(self, script_directory, script_name, request, method_name=None):
"""
加载并执行脚本文件中的指定方法
Args:
request: 请求获响应对象
script_directory (str): 脚本文件所在的目录
script_name (str): 脚本文件的名称
method_name (str): 要执行的方法的名称
"""
if method_name is None:
return request
file_list = FileUtils.get_files_in_folder(script_directory)
# 指定文件夹下是否存在这个测试用例脚本
if script_name in file_list:
script_path = FileUtils.get_file_path(script_name, script_directory)
script = self.load_script(script_path)
if script and hasattr(script, method_name):
try:
method = getattr(script, method_name)
return method(request)
except Exception as e:
ScriptExecuteError(script_path, e)
return request
def load_script(self, script_path):
"""
加载脚本文件并返回模块对象
Args:
script_path (str): 脚本文件的路径
Returns:
module: 脚本文件对应的模块对象
"""
try:
spec = importlib.util.spec_from_file_location(os.path.basename(script_path), script_path)
script_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(script_module)
return script_module
except Exception as e:
ScriptNotFoundError(script_path, e)
return
def load_and_execute_script(self, script_directory, script_name, request, method_name=None):
"""
加载并执行脚本文件中的指定方法
Args:
request: 请求获响应对象
script_directory (str): 脚本文件所在的目录
script_name (str): 脚本文件的名称
method_name (str): 要执行的方法的名称
"""
if method_name is None:
return request
file_list = FileUtils.get_files_in_folder(script_directory)
# 指定文件夹下是否存在这个测试用例脚本
if script_name in file_list:
script_path = FileUtils.get_file_path(script_name, script_directory)
script = self.load_script(script_path)
if script and hasattr(script, method_name):
try:
method = getattr(script, method_name)
return method(request)
except Exception as e:
ScriptExecuteError(script_path, e)
return request
if __name__ == '__main__':
from config.config import Config
SCRIPTS_DIR = Config.SCRIPTS_DIR
load_and_exe_s = LoadScript()
load_and_exe_s.load_and_execute_script(SCRIPTS_DIR, 'prepost_script_sheetname_id.py', 'setup', {"y": "z"})
from config.config import Config
SCRIPTS_DIR = Config.SCRIPTS_DIR
load_and_exe_s = LoadScript()
load_and_exe_s.load_and_execute_script(SCRIPTS_DIR, 'prepost_script_sheetname_id.py', 'setup', {"y": "z"})

View File

@ -19,13 +19,13 @@ class LoadModulesFromFolder(DependentParameter):
def __init__(self):
super().__init__()
def load_modules_from_folder(self, folder_or_mnodule):
def load_modules_from_folder(self, folder_or_module):
"""
动态加载文件或模块
"""
if isinstance(folder_or_mnodule, str):
folder_path = folder_or_mnodule
if isinstance(folder_or_module, str):
folder_path = folder_or_module
if not os.path.exists(folder_path):
raise ValueError("Folder path does not exist.")
@ -42,14 +42,14 @@ class LoadModulesFromFolder(DependentParameter):
for name, obj in vars(module).items():
if callable(obj):
self.update_environments(name, obj)
elif isinstance(folder_or_mnodule, types.ModuleType):
module = folder_or_mnodule
elif isinstance(folder_or_module, types.ModuleType):
module = folder_or_module
module = importlib.reload(module)
for n, o in vars(module).items():
if callable(o):
self.update_environments(n, o)
else:
raise DynamicLoadingError(folder_or_mnodule,
raise DynamicLoadingError(folder_or_module,
"older_or_module should be either a folder path (str) or a module ("
"types.ModuleType).")

View File

@ -10,7 +10,6 @@
import types
from common.http_client.http_client import HttpClient
from common.validation import comparators
from common.validation import logger
@ -19,9 +18,9 @@ class Loaders(HttpClient):
super().__init__()
@logger.catch
def load_built_in_functions(self, model):
def load_built_in_functions(self, model) -> dict:
"""
加载bif_functions包中的内建方法
加载指定模块下的所有函数
Returns:
"""
built_in_functions = {}
@ -30,21 +29,6 @@ class Loaders(HttpClient):
built_in_functions[name] = item
return built_in_functions
@staticmethod
@logger.catch
def load_built_in_comparators() -> object:
"""
加载包中的内建比较器
Returns:
"""
built_in_comparators = {}
for name, item in vars(comparators).items():
if isinstance(item, types.FunctionType):
built_in_comparators[name] = item
return built_in_comparators
@logger.catch
def set_bif_fun(self, model):
"""
@ -57,17 +41,8 @@ class Loaders(HttpClient):
if __name__ == '__main__':
from common.bif_functions import bif_faker
import extensions
import encryption_rules
from encryption_rules import rules
# print()
loaders = Loaders()
res = loaders.load_built_in_functions(rules)
print(res)
# loaders.load_built_in_comparators()
# loaders.set_bif_fun(bif_faker)
# print(loaders.get_environments())
#
# loaders.set_bif_fun(extensions)
# print(loaders.get_environments())

View File

@ -8,125 +8,128 @@
# -------------------------------------------------------------------------------
import json
from common.validation import comparators
from common.crypto.encrypt_data import EncryptData
from common.utils.exceptions import InvalidParameterFormatError
from common.validation.comparator_dict import comparator_dict
from common.validation.extractor import Extractor
from common.validation.loaders import Loaders
class Validator(Loaders):
"""
校验器
主要功能
1格式化校验变量
2校验期望结果与实际结果与预期一致并返回校验结果
"""
validate_variables_list = []
assertions = []
def __init__(self):
super().__init__()
def uniform_validate(self, validate_variables):
"""
统一格式化测试用例的验证变量validate
Args:
validate_variables: 参数格式 listdict
示例
[{"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}]
or {"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}
# from common.validation.loaders import Loaders
Returns: 返回数据格式 list
示例
[{"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}]
"""
if isinstance(validate_variables, str):
validate_variables = json.loads(validate_variables)
if isinstance(validate_variables, list):
for item in validate_variables:
self.uniform_validate(item)
elif isinstance(validate_variables, dict):
if "check" in validate_variables.keys() and "expect" in validate_variables.keys():
# 如果验证mapping中不包含comparator时默认为{"comparator": "eq"}
check_item = validate_variables.get("check")
expect_value = validate_variables.get("expect")
comparator = validate_variables.get("comparator", "eq")
self.validate_variables_list.append({
"check": check_item,
"expect": expect_value,
"comparator": comparator
})
else:
InvalidParameterFormatError(validate_variables, "参数格式错误!")
def validate(self, resp=None):
"""
校验期望结果与实际结果与预期一致
Args:
resp: ResponseObject对象实例
class Validator(Extractor, EncryptData):
"""
校验器
主要功能
1格式化校验变量
2校验期望结果与实际结果与预期一致并返回校验结果
"""
validate_variables_list = []
assertions = []
Returns:
def __init__(self):
super().__init__()
"""
for validate_variable in self.validate_variables_list:
check_item = validate_variable['check']
expect_value = validate_variable['expect']
comparator = validate_variable['comparator']
if not str(check_item).startswith("$"):
actual_value = check_item
else:
actual_value = Extractor.extract_value_by_jsonpath(resp_obj=resp, expr=check_item)
try:
fun = self.load_built_in_comparators()[comparator]
fun(actual_value=actual_value, expect_value=expect_value)
validate_result = "通过"
except (AssertionError, TypeError) as e:
validate_result = "失败"
raise e
finally:
self.assertions.append({
'检查项': check_item,
'期望值': expect_value,
'实际值': actual_value,
'断言方法': comparator_dict.get(comparator),
"断言结果": validate_result
})
def run_validate(self, validate_variables, resp=None):
"""
统一格式化测试用例的验证变量validate然后校验期望结果与实际结果与预期一致
Args:
validate_variables:参数格式 listdict
resp:ResponseObject对象实例
def uniform_validate(self, validate_variables):
"""
统一格式化测试用例的验证变量validate
Args:
validate_variables: 参数格式 listdict
示例
[{"check":"result.user.name","comparator":"eq","expect":"kira"}]
or {"check":"result.user.name","comparator":"eq","expect":"kira"}
Returns:返回校验结果
Returns: 返回数据格式 list
示例
[{"check":"result.user.name","comparator":"eq","expect":"kira"}]
"""
if not validate_variables:
self.assertions = ['未填写预期结果默认断言HTTP请求状态码']
return
self.validate_variables_list.clear()
self.assertions.clear()
self.uniform_validate(validate_variables)
if not self.validate_variables_list:
raise InvalidParameterFormatError(self.validate_variables_list,
"uniform_validate 执行失败,无法进行 validate 校验")
self.validate(resp)
"""
if isinstance(validate_variables, str):
validate_variables = json.loads(validate_variables)
if isinstance(validate_variables, list):
for item in validate_variables:
self.uniform_validate(item)
elif isinstance(validate_variables, dict):
if "check" in validate_variables.keys() and "expect" in validate_variables.keys():
# 如果验证mapping中不包含comparator时默认为{"comparator": "eq"}
check_item = validate_variables.get("check")
expect_value = validate_variables.get("expect")
comparator = validate_variables.get("comparator", "eq")
ignore = validate_variables.get("ignore")
self.validate_variables_list.append({
"check": check_item,
"expect": expect_value,
"comparator": comparator,
"ignore": ignore
})
else:
InvalidParameterFormatError(validate_variables, "参数格式错误!")
def validate(self, resp=None):
"""
校验期望结果与实际结果与预期一致
Args:
resp: ResponseObject对象实例
Returns:
"""
for validate_variable in self.validate_variables_list:
check_item = validate_variable.get('check')
expect_value = validate_variable.get('expect')
comparator = validate_variable.get('comparator')
ignore = validate_variable.get("ignore")
if not str(check_item).startswith("$"):
actual_value = check_item
else:
actual_value = self.extract_value_by_jsonpath(resp_obj=resp, expr=check_item)
e = None
try:
fun = self.load_built_in_functions(comparators)[comparator]
fun(actual_value=actual_value, expect_value=expect_value, ignore=ignore)
validate_result = "通过"
except (AssertionError, TypeError) as err:
validate_result = "失败"
e = err
raise e
finally:
self.assertions.append(dict(检查项=check_item,
期望值=expect_value,
实际值=actual_value,
忽略对比字段=ignore,
断言方法=comparator,
断言结果=validate_result,
失败信息=e))
def run_validate(self, validate_variables, resp=None):
"""
统一格式化测试用例的验证变量validate然后校验期望结果与实际结果与预期一致
Args:
validate_variables:参数格式 listdict
resp:ResponseObject对象实例
Returns:返回校验结果
"""
if not validate_variables:
self.assertions = ['未填写预期结果默认断言HTTP请求状态码']
return
self.validate_variables_list.clear()
self.assertions.clear()
self.uniform_validate(validate_variables)
if not self.validate_variables_list:
raise InvalidParameterFormatError(self.validate_variables_list,
"uniform_validate 执行失败,无法进行 validate 校验")
self.validate(resp)
if __name__ == '__main__':
validate_variables1 = {"check": "$.result.user.name", "comparator": "eq", "expect": "chenyongzhi"}
validate_variables2 = [
{"check": "$.code", "comparator": "eq", "expect": "200"},
{"check": "result.user.name", "comparator": "eq", "expect": "chenyongzhi"}
]
resp_obj = {"code": "200", "result": {"user": {"name": "chenyongzhi"}}}
validator = Validator()
validator.run_validate(validate_variables1, resp_obj)
print(validator.assertions)
validator.run_validate(validate_variables2, resp_obj)
print(validator.assertions)
validator.run_validate(validate_variables2, resp_obj)
print(validator.assertions)
validate_variables1 = {"check": "$.result.user.name", "comparator": "eq", "expect": "chenyongzhi"}
resp_obj = {"code": "200", "result": {"user": {"name": "chenyongzhi"}}}
validate_variables2 = [
{"check": resp_obj, "comparator": "check", "expect": {"result": {"user": {"name": "chenyongzhi"}}}}
]
validator = Validator()
validator.run_validate(validate_variables2, resp_obj)
print(validator.assertions)

View File

@ -35,9 +35,9 @@ class Config:
# 邮件配置信息
MAIL_NOTICE = {
"host": "smtp.qq.com", # 邮件服务地址
"user": "xxxxx@qq.com", # 用户名
"password": "xxxxx", # 密码(部分邮箱为授权码)# 密码
"sender": "xxxxx@qq.com", # 发送人
"user": "262667641@qq.com", # 用户名
"password": "xnvmmcchcxghbgfi", # 密码(部分邮箱为授权码)# 密码
"sender": "262667641@qq.com", # 发送人
"port": 465, # smtp 端口号
"receivers": ['262667641@qq.com'] # 接收方的邮箱
}

View File

@ -46,3 +46,6 @@ class FieldNames:
INITIALIZE_DATA = "InitializeData"
HOST = "Host"
PATH = "Path"
def __init__(self):
super().__init__()

View File

@ -9,3 +9,6 @@
"""
a = {"key": "value", "c": ""}
print(a.get("c", ""))

5
debug/vs.py Normal file
View File

@ -0,0 +1,5 @@
from debug import x
from common.validation.loaders import Loaders
rs = Loaders().load_built_in_functions(x)
print(rs)

0
debug/x.py Normal file
View File

32
run.py
View File

@ -19,22 +19,22 @@ from common.utils.decorators import install_dependencies
@install_dependencies
def run():
test_report = Config.TEST_REPORT
test_case = unittest.defaultTestLoader.discover(Config.SCRIPT, pattern="test_*.py")
runner = TestRunner(test_case,
report_dir=test_report,
filename=Config.TEST_REPORT_FILE,
title="接口自动化测试报告",
templates=2,
tester="kira",
desc="自动化测试")
runner.run()
# # get_failed_test_cases = runner.get_failed_test_cases()
# 发送通知
# runner.email_notice()
runner.dingtalk_notice()
runner.weixin_notice()
test_report = Config.TEST_REPORT
test_case = unittest.defaultTestLoader.discover(Config.SCRIPT, pattern="test_*.py")
runner = TestRunner(test_case,
report_dir=test_report,
filename=Config.TEST_REPORT_FILE,
title="接口自动化测试报告",
templates=2,
tester="kira",
desc="自动化测试")
runner.run()
# # get_failed_test_cases = runner.get_failed_test_cases()
# 发送通知
# runner.email_notice()
# runner.dingtalk_notice()
# runner.weixin_notice()
if __name__ == '__main__':
run()
run()

View File

@ -1,26 +1,26 @@
def setup(request):
"""这是某某sheet 中第 xx条测试用例的前置脚本"""
"""前置脚本处理请求对象的逻辑代码"""
# 从request 对象获取数据信息
import json
request_obj = request.variables
print("前置脚本请求对象=", request_obj)
print("获取请求URL:", request_obj.get("Url"))
print("获取请求Header:", json.loads(request_obj.get("Headers")))
print("获取请求数据:", json.loads(request_obj.get("RequestData")))
request.update_environments("emailwww", "这个是你想要的值") # 设置环境变量
print(f"---->获取环境变量={request.get_environments('{{emailwww}}')}")
return request
def setup(py):
"""这是某某sheet 中第 xx条测试用例的前置脚本"""
"""前置脚本处理请求对象的逻辑代码"""
# 从request 对象获取数据信息
import json
request_obj = py.request
print("前置脚本请求对象=", request_obj)
print("获取请求URL:", request_obj.get("Url"))
print("获取请求Header:", json.loads(request_obj.get("Headers")))
print("获取请求数据:", json.loads(request_obj.get("RequestData")))
py.update_environments("emailwww", "这个是你想要的值") # 设置环境变量
print(f"---->获取环境变量={py.get_environments('{{emailwww}}')}")
return py
def teardown(response):
"""这是某某sheet 中第 xx条测试用例的前置脚本"""
"""后置脚本处理响应对象的逻辑代码"""
print("后置脚本响应对象=", str(response.variables).replace("<", "(").replace(">", ")"))
response_text = response.variables.text
response_json = response.variables.json()
print("响应对象转json", response_json)
print("响应对象转text", response_text)
response.update_environments("response_json", response_json) # 设置环境变量
print("获取环境变量={}".format(response.get_environments('{{response_json}}')))
return response
def teardown(py):
"""这是某某sheet 中第 xx条测试用例的前置脚本"""
"""后置脚本处理响应对象的逻辑代码"""
print("后置脚本响应对象=", str(py.response).replace("<", "(").replace(">", ")"))
response_text = py.response.text
response_json = py.response.json()
print("响应对象转json", response_json)
print("响应对象转text", response_text)
py.update_environments("response_json", response_json) # 设置环境变量
print("获取环境变量={}".format(py.get_environments('{{response_json}}')))
return py

View File

@ -1,10 +1,10 @@
def setup(request):
print(f"执行前置代码片段处理:{request}")
def setup(py):
print(f"执行前置代码片段处理:{py.request}")
"""处理请求对象的逻辑代码"""
return request
return py
def teardown(response):
print(f"执行后置代码片段处理:{response}")
def teardown(py):
print(f"执行后置代码片段处理:{py.response}")
"""处理响应对象的逻辑代码"""
return response
return py

Binary file not shown.

View File

@ -15,44 +15,51 @@ from common.file_handling.do_excel import DoExcel
from common.core.action import Action
from common.utils.decorators import list_data
from config.config import Config
from common.database.mysql_client import MysqlClient
test_file = Config.TEST_CASE # 获取 excel 文件路径
excel = DoExcel(test_file)
# 获取测试用例、数据库、初始化数据和主机
test_case, databases, initialize_data, host = excel.get_excel_init_and_cases()
@ddt
class TestProjectApi(unittest.TestCase):
maxDiff = None
action = Action(initialize_data, databases)
@classmethod
def setUpClass(cls) -> None:
cls.action.load_modules_from_folder(extensions)
def setUp(self) -> None:
pass
@list_data(test_case)
def test_api(self, item):
sheet, iid, condition, st, name, desc, method, expected, prepost_script = self.action.base_info(item)
if self.action.is_run(condition):
self.skipTest("这个测试用例听说泡面比较好吃,所以放弃执行了!!")
regex, keys, deps, jp_dict, ex_request_data = self.action.extractor_info(item)
self.action.pause_execution(st)
self.action.exc_sql(item)
if self.action.is_only_sql(method):
self.skipTest("这条测试用例被 SQL 吃了,所以放弃执行了!!")
self.action.send_request(host, method, ex_request_data, Config.SCRIPTS_DIR, prepost_script)
self.action.analysis_response(sheet, iid, name, desc, regex, keys, deps, jp_dict)
self.action.execute_validation(excel, sheet, iid, name, desc, expected)
@classmethod
def tearDownClass(cls) -> None:
excel.close_excel()
maxDiff = None
@classmethod
def setUpClass(cls) -> None:
cls.action = Action(host, initialize_data)
cls.action.load_modules_from_folder(extensions)
cls.action.client = MysqlClient(databases)
cls.action.scripts_dir = Config.SCRIPTS_DIR
def setUp(self) -> None:
pass
@list_data(test_case)
def test_api(self, item):
self.__class__.action.base_info(item)
if not self.__class__.action.is_run():
self.skipTest("这个测试用例听说泡面比较好吃,所以放弃执行了!!")
# 用例暂停
self.__class__.action.pause_execution()
# 单独执行 sql
if self.__class__.action.is_only_sql(self.__class__.action.client):
self.skipTest("这条测试用例被 SQL 吃了,所以只执行 sql 语句!!")
# 执行 sql 语句
self.__class__.action.exec_sql(self.__class__.action.client)
self.__class__.action.send_request()
# 断言响应及提取响应信息
self.__class__.action.analysis_response()
self.__class__.action.execute_validation(excel)
@classmethod
def tearDownClass(cls) -> None:
excel.close_excel()
if __name__ == '__main__':
unittest.main()
unittest.main()