增加直接在python文件中写前后置脚本以及未填写断言,则自动断言响应状态码

This commit is contained in:
aaronchenyongzhi 2023-09-12 19:21:47 +08:00
parent 243f1f362c
commit 09bcbd2fd0
15 changed files with 1089 additions and 992 deletions

File diff suppressed because one or more lines are too long

View File

@ -23,182 +23,190 @@ from config.field_constants import FieldNames
@singleton
class Action(Extractor, LoadScript, Validator, EncryptData):
def __init__(self, initialize_data=None, db_config=None):
super().__init__()
self.db_config = db_config
self.__variables = {}
self.set_environments(initialize_data)
self.set_bif_fun(bif_functions)
def __init__(self, initialize_data=None, db_config=None):
super().__init__()
self.db_config = db_config
self.__variables = {}
self.set_environments(initialize_data)
self.set_bif_fun(bif_functions)
@send_request_decorator
def send_request(self, host, method, extract_request_data, scripts_dir, prepost_script):
"""发送请求"""
url, kwargs = self.prepare_request(extract_request_data, self.variables)
self.http_client(host, url, method, **kwargs)
def prepare_request(self, extract_request_data, item):
"""准备请求数据"""
item = self.replace_dependent_parameter(item)
url, query_str, request_data, headers, request_data_type, h_crypto, r_crypto = self.request_info(item)
headers, query_str, request_data = self.analysis_request(query_str, request_data, h_crypto, headers, r_crypto,
extract_request_data)
kwargs = {request_data_type: request_data, "headers": headers, "params": query_str}
return url, kwargs
def analysis_request(self, query_str, request_data, headers_crypto, headers, request_crypto, extract_request_data):
"""分析请求数据"""
headers, request_data = self.encrypts(headers_crypto, headers, request_crypto, request_data)
if extract_request_data:
for data in (query_str, request_data):
if data:
self.substitute_data(data, jp_dict=extract_request_data)
return headers, query_str, request_data
def exc_sql(self, item):
"""执行sql处理"""
sql, sql_params_dict = self.sql_info(item)
self.variables = item
sql = self.replace_dependent_parameter(sql)
if sql:
client = MysqlClient(self.db_config)
execute_sql_results = client.execute_sql(sql)
print(f"| 执行 sql 成功--> {execute_sql_results}")
if execute_sql_results and sql_params_dict:
self.substitute_data(execute_sql_results, jp_dict=sql_params_dict)
def analysis_response(self, sheet, iid, name, desc, regex, keys, deps, jp_dict):
"""分析响应结果并提取响应"""
try:
self.substitute_data(self.response_json, regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except Exception as err:
msg = f"| 分析响应失败:{sheet}_{iid}_{name}_{desc}"
f"\nregex={regex};"
f" \nkeys={keys};"
f"\ndeps={deps};"
f"\njp_dict={jp_dict}"
f"\n{err}"
ParameterExtractionError(msg, err)
def execute_validation(self, excel, sheet, iid, name, desc, expected):
"""执行断言校验"""
expected = self.replace_dependent_parameter(expected)
try:
res = self.run_validate(expected, self.response_json)
# 如果不填写断言,则自动断言状态码
if res is None:
from common.validation.comparators import eq
eq(200, self.response.status_code)
result = "PASS"
except Exception as e:
result = "FAIL"
error_info = f"| exception case:**{sheet}_{iid}_{name}_{desc},{self.assertions}"
AssertionFailedError(error_info, e)
raise e
finally:
print(f'| <span style="color:yellow">断言结果-->{self.assertions}</span>\n')
print("-" * 50 + "我是分割线" + "-" * 50)
response = self.response.text if self.response is not None else str(self.response)
excel.write_back(sheet_name=sheet, i=iid, response=response, result=result, assertions=str(self.assertions))
def execute_dynamic_code(self, item, code):
self.variables = item
if code is not None:
try:
ast_obj = ast.parse(code, mode='exec')
compiled = compile(ast_obj, '<string>', 'exec')
exec(compiled, {"pm": self})
except Exception as e:
ExecuteDynamiCodeError(code, e)
raise e
return self.variables
def execute_prepost_script(self, scripts_dir, prepost_script, method_name):
self.load_and_execute_script(scripts_dir, prepost_script, self, method_name)
@property
def variables(self, key=None):
return self.__variables if not key else self.__variables.get(key)
@variables.setter
def variables(self, item):
self.__variables = item
@staticmethod
def base_info(item):
"""
获取基础信息
"""
sheet = item.pop(FieldNames.SHEET)
item_id = item.pop(FieldNames.ITEM_ID)
condition = item.pop(FieldNames.RUN_CONDITION)
sleep_time = item.pop(FieldNames.SLEEP_TIME)
name = item.pop(FieldNames.NAME)
desc = item.pop(FieldNames.DESCRIPTION)
method = item.pop(FieldNames.METHOD)
expected = item.pop(FieldNames.EXPECTED)
prepost_script = f"prepost_script_{sheet}_{item_id}.py"
return sheet, item_id, condition, sleep_time, name, desc, method, expected, prepost_script
@staticmethod
def sql_info(item):
sql = item.pop(FieldNames.SQL)
sql_params_dict = item.pop(FieldNames.SQL_PARAMS_DICT)
return sql, sql_params_dict
@staticmethod
def extractor_info(item):
"""
获取提取参数的基本字段信息
Args:
item:
@send_request_decorator
def send_request(self, host, method, extract_request_data):
"""发送请求"""
url, kwargs = self.prepare_request(extract_request_data, self.variables)
self.http_client(host, url, method, **kwargs)
Returns:
def prepare_request(self, extract_request_data, item):
"""准备请求数据"""
item = self.replace_dependent_parameter(item)
url, query_str, request_data, headers, request_data_type, h_crypto, r_crypto = self.request_info(item)
headers, query_str, request_data = self.analysis_request(query_str, request_data, h_crypto, headers, r_crypto,
extract_request_data)
kwargs = {request_data_type: request_data, "headers": headers, "params": query_str}
return url, kwargs
def analysis_request(self, query_str, request_data, headers_crypto, headers, request_crypto, extract_request_data):
"""分析请求数据"""
headers, request_data = self.encrypts(headers_crypto, headers, request_crypto, request_data)
if extract_request_data:
for data in (query_str, request_data):
if data:
self.substitute_data(data, jp_dict=extract_request_data)
return headers, query_str, request_data
def exc_sql(self, item):
"""执行sql处理"""
sql, sql_params_dict = self.sql_info(item)
self.variables = item
sql = self.replace_dependent_parameter(sql)
if sql:
client = MysqlClient(self.db_config)
execute_sql_results = client.execute_sql(sql)
print(f"| 执行 sql 成功--> {execute_sql_results}")
if execute_sql_results and sql_params_dict:
self.substitute_data(execute_sql_results, jp_dict=sql_params_dict)
def analysis_response(self, sheet, iid, name, desc, regex, keys, deps, jp_dict):
"""分析响应结果并提取响应"""
try:
self.substitute_data(self.response_json, regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except Exception as err:
msg = f"| 分析响应失败:{sheet}_{iid}_{name}_{desc}"
f"\nregex={regex};"
f" \nkeys={keys};"
f"\ndeps={deps};"
f"\njp_dict={jp_dict}"
f"\n{err}"
ParameterExtractionError(msg, err)
def execute_validation(self, excel, sheet, iid, name, desc, expected):
"""执行断言校验"""
expected = self.replace_dependent_parameter(expected)
try:
self.run_validate(expected, self.response_json)
result = "PASS"
except Exception as e:
result = "FAIL"
error_info = f"| exception case:**{sheet}_{iid}_{name}_{desc},{self.assertions}"
AssertionFailedError(error_info, e)
raise e
finally:
print(f'| <span style="color:yellow">断言结果-->{self.assertions}</span>\n')
print("-" * 50 + "我是分割线" + "-" * 50)
response = self.response.text if self.response is not None else str(self.response)
excel.write_back(sheet_name=sheet, i=iid, response=response, result=result, assertions=str(self.assertions))
def execute_dynamic_code(self, item, code):
self.variables = item
if code is not None:
try:
ast_obj = ast.parse(code, mode='exec')
compiled = compile(ast_obj, '<string>', 'exec')
exec(compiled, {"pm": self})
except Exception as e:
ExecuteDynamiCodeError(code, e)
raise e
return self.variables
@property
def variables(self, key=None):
return self.__variables if not key else self.__variables.get(key)
@variables.setter
def variables(self, item):
self.__variables = item
@staticmethod
def base_info(item):
"""
获取基础信息
"""
sheet = item.pop(FieldNames.SHEET)
item_id = item.pop(FieldNames.ITEM_ID)
condition = item.pop(FieldNames.RUN_CONDITION)
sleep_time = item.pop(FieldNames.SLEEP_TIME)
name = item.pop(FieldNames.NAME)
desc = item.pop(FieldNames.DESCRIPTION)
method = item.pop(FieldNames.METHOD)
expected = item.pop(FieldNames.EXPECTED)
return sheet, item_id, condition, sleep_time, name, desc, method, expected
@staticmethod
def sql_info(item):
sql = item.pop(FieldNames.SQL)
sql_params_dict = item.pop(FieldNames.SQL_PARAMS_DICT)
return sql, sql_params_dict
@staticmethod
def extractor_info(item):
"""
获取提取参数的基本字段信息
Args:
item:
Returns:
"""
regex = item.pop(FieldNames.REGEX)
keys = item.pop(FieldNames.REGEX_PARAMS_LIST)
deps = item.pop(FieldNames.RETRIEVE_VALUE)
jp_dict = item.pop(FieldNames.JSON_PATH_DICT)
extract_request_data = item.pop(FieldNames.EXTRACT_REQUEST_DATA)
return regex, keys, deps, jp_dict, extract_request_data
@staticmethod
def request_info(item):
"""
请求数据
"""
url = item.pop(FieldNames.URL)
query_str = item.pop(FieldNames.QUERY_STRING)
request_data = item.pop(FieldNames.REQUEST_DATA)
headers = item.pop(FieldNames.HEADERS)
request_data_type = item.pop(FieldNames.REQUEST_DATA_TYPE) if item.get(
FieldNames.REQUEST_DATA_TYPE) else FieldNames.PARAMS
headers_crypto = item.pop(FieldNames.HEADERS_CRYPTO)
request_data_crypto = item.pop(FieldNames.REQUEST_DATA_CRYPTO)
return url, query_str, request_data, headers, request_data_type, headers_crypto, request_data_crypto
@staticmethod
def script(item):
setup_script = item.pop(FieldNames.SETUP_SCRIPT)
teardown_script = item.pop(FieldNames.TEARDOWN_SCRIPT)
return setup_script, teardown_script
@staticmethod
def is_run(condition):
is_run = condition
if not is_run or is_run.upper() != FieldNames.YES:
return True
return None
@staticmethod
def is_only_sql(method):
if method.upper() == FieldNames.SQL:
return True
return None
@staticmethod
def pause_execution(sleep_time):
if sleep_time:
try:
time.sleep(sleep_time)
except Exception as e:
raise InvalidSleepTimeError(f"{sleep_time}", e)
"""
regex = item.pop(FieldNames.REGEX)
keys = item.pop(FieldNames.REGEX_PARAMS_LIST)
deps = item.pop(FieldNames.RETRIEVE_VALUE)
jp_dict = item.pop(FieldNames.JSON_PATH_DICT)
extract_request_data = item.pop(FieldNames.EXTRACT_REQUEST_DATA)
return regex, keys, deps, jp_dict, extract_request_data
@staticmethod
def request_info(item):
"""
请求数据
"""
url = item.pop(FieldNames.URL)
query_str = item.pop(FieldNames.QUERY_STRING)
request_data = item.pop(FieldNames.REQUEST_DATA)
headers = item.pop(FieldNames.HEADERS)
request_data_type = item.pop(FieldNames.REQUEST_DATA_TYPE) if item.get(
FieldNames.REQUEST_DATA_TYPE) else FieldNames.PARAMS
headers_crypto = item.pop(FieldNames.HEADERS_CRYPTO)
request_data_crypto = item.pop(FieldNames.REQUEST_DATA_CRYPTO)
return url, query_str, request_data, headers, request_data_type, headers_crypto, request_data_crypto
@staticmethod
def script(item):
setup_script = item.pop(FieldNames.SETUP_SCRIPT)
teardown_script = item.pop(FieldNames.TEARDOWN_SCRIPT)
return setup_script, teardown_script
@staticmethod
def is_run(condition):
is_run = condition
if not is_run or is_run.upper() != FieldNames.YES:
return True
return None
@staticmethod
def is_only_sql(method):
if method.upper() == FieldNames.SQL:
return True
return None
@staticmethod
def pause_execution(sleep_time):
if sleep_time:
try:
time.sleep(sleep_time)
except Exception as e:
raise InvalidSleepTimeError(f"{sleep_time}", e)
if __name__ == '__main__':
print(Action())
print(Action())

View File

@ -40,7 +40,7 @@ class MysqlClient:
self.cursor = self.conn.cursor(DictCursor)
except Exception as e:
DatabaseExceptionError(self.db_base, e)
# raise
raise
def execute_sql(self, sql):
"""
@ -117,7 +117,6 @@ class MysqlClient:
try:
self.cursor.execute(sql_)
self.result[sql_name] = self.cursor.fetchall()
except Exception as err:
DatabaseExceptionError(sql_, err)
raise err
@ -127,15 +126,15 @@ if __name__ == '__main__':
sql_2 = {
"select":
{
# "select_one": "select username,password as pwd from lea.user where username ='luoshunwen003';"
# "slect_1": "select name,age from test_ea;"
}
}
database_2 = {
"host": "localhost",
"port": 3306,
"database": "lea",
"database": "test",
"user": "root",
"password": "admin"
"password": ""
}
res = MysqlClient(database_2).execute_sql(sql_2)
print("数据执行结果", res)

View File

@ -18,165 +18,165 @@ from config.config import Config
class FileUtils:
@staticmethod
def get_all_path(open_file_path):
"""
递归获取目录下所有的文件的路径
Args:
open_file_path: 指定目录路径
@staticmethod
def get_all_path(open_file_path):
"""
递归获取目录下所有的文件的路径
Args:
open_file_path: 指定目录路径
Returns:
包含所有文件路径的列表
"""
path_list = []
for root, dirs, files in os.walk(open_file_path):
path_list.extend([os.path.join(root, file) for file in files])
return path_list
@staticmethod
def get_files_in_folder(folder_path):
"""
获取指定文件夹内的所有文件
Args:
folder_path: 指定文件夹路径
Returns:
包含所有文件路径的列表
"""
path_list = []
for root, dirs, files in os.walk(open_file_path):
path_list.extend([os.path.join(root, file) for file in files])
return path_list
@staticmethod
def get_files_in_folder(folder_path):
"""
获取指定文件夹内的所有文件
Args:
folder_path: 指定文件夹路径
Returns:
包含所有文件名的列表
"""
if not os.path.isdir(folder_path):
raise ValueError("Invalid folder path")
file_list = []
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
file_list.append(filename)
return file_list
@staticmethod
def get_file_path(file_name):
"""根据文件名获取指定目录下的文件路径"""
for root, dirs, files in os.walk(Config.TEST_FILES):
for file in files:
if file == file_name:
return os.path.join(root, file)
return
@staticmethod
def get_folders_in_path(dir_path):
"""
获取指定路径下的所有文件夹
Args:
dir_path: 指定路径
Returns:
包含所有文件名的列表
"""
if not os.path.isdir(folder_path):
raise ValueError("Invalid folder path")
file_list = []
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
file_list.append(filename)
return file_list
@staticmethod
def get_file_path(file_name, dir_path=Config.TEST_FILES):
"""根据文件名获取指定目录下的文件路径"""
for root, dirs, files in os.walk(dir_path):
for file in files:
if file == file_name:
return os.path.join(root, file)
return
@staticmethod
def get_folders_in_path(dir_path):
"""
获取指定路径下的所有文件夹
Args:
dir_path: 指定路径
Returns:
包含所有文件夹名的列表
"""
if not os.path.isdir(dir_path):
raise ValueError("Invalid directory path")
folder_list = []
for foldername in os.listdir(dir_path):
folder_path = os.path.join(dir_path, foldername)
if os.path.isdir(folder_path):
folder_list.append(foldername)
return folder_list
@staticmethod
def read_file(file_path):
"""
读取文件内容
Args:
file_path: 文件路径
Returns:
包含所有文件夹名的列表
"""
if not os.path.isdir(dir_path):
raise ValueError("Invalid directory path")
folder_list = []
for foldername in os.listdir(dir_path):
folder_path = os.path.join(dir_path, foldername)
if os.path.isdir(folder_path):
folder_list.append(foldername)
return folder_list
@staticmethod
def read_file(file_path):
"""
读取文件内容
Args:
file_path: 文件路径
Returns:
文件内容的字符串
"""
if not os.path.isfile(file_path):
raise ValueError("Invalid file path")
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
@staticmethod
def read_json_file(file_path):
"""
读取 JSON 文件
Args:
file_path: JSON 文件路径
Returns:
文件内容的字符串
"""
if not os.path.isfile(file_path):
raise ValueError("Invalid file path")
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
@staticmethod
def read_json_file(file_path):
"""
读取 JSON 文件
Args:
file_path: JSON 文件路径
Returns:
解析后的 JSON 数据
"""
content = FileUtils.read_file(file_path)
try:
return json.loads(content)
except json.JSONDecodeError as e:
raise ValueError("Invalid JSON file: {}".format(e))
@staticmethod
def read_yaml_file(file_path):
"""
读取 YAML 文件
Args:
file_path: YAML 文件路径
Returns:
解析后的 JSON 数据
"""
content = FileUtils.read_file(file_path)
try:
return json.loads(content)
except json.JSONDecodeError as e:
raise ValueError("Invalid JSON file: {}".format(e))
@staticmethod
def read_yaml_file(file_path):
"""
读取 YAML 文件
Args:
file_path: YAML 文件路径
Returns:
解析后的 YAML 数据
"""
with open(file_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
@staticmethod
def get_value_from_dict(data, key_path):
"""
从嵌套字典中获取指定键路径的值
Args:
data: 嵌套字典
key_path: 键路径可以是用点分隔的字符串或字符串列表
Returns:
解析后的 YAML 数据
"""
with open(file_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
@staticmethod
def get_value_from_dict(data, key_path):
"""
从嵌套字典中获取指定键路径的值
Args:
data: 嵌套字典
key_path: 键路径可以是用点分隔的字符串或字符串列表
Returns:
指定键路径的值如果路径不存在则返回 None
"""
if isinstance(key_path, str):
key_path = key_path.split('.')
for key in key_path:
if isinstance(data, dict) and key in data:
data = data[key]
else:
return None
return data
@staticmethod
def read_config_data(file_path, section, option):
"""
读取配置文件中的数据
Args:
file_path: 配置文件路径
section: 文件中的 section
option: 文件中的 option
Returns:
指定键路径的值如果路径不存在则返回 None
"""
if isinstance(key_path, str):
key_path = key_path.split('.')
for key in key_path:
if isinstance(data, dict) and key in data:
data = data[key]
else:
return None
return data
@staticmethod
def read_config_data(file_path, section, option):
"""
读取配置文件中的数据
Args:
file_path: 配置文件路径
section: 文件中的 section
option: 文件中的 option
Returns:
配置文件中指定数据的值
"""
cf = RawConfigParser()
cf.read(file_path, encoding="UTF-8")
return eval(cf.get(section, option))
@staticmethod
def read_json_data(file_path):
"""
读取 JSON 文件中的数据
Args:
file_path: JSON 文件路径
Returns:
配置文件中指定数据的值
"""
cf = RawConfigParser()
cf.read(file_path, encoding="UTF-8")
return eval(cf.get(section, option))
@staticmethod
def read_json_data(file_path):
"""
读取 JSON 文件中的数据
Args:
file_path: JSON 文件路径
Returns:
JSON 文件中的数据
"""
with open(file_path, "r", encoding="utf-8") as fb:
return json.load(fb)
Returns:
JSON 文件中的数据
"""
with open(file_path, "r", encoding="utf-8") as fb:
return json.load(fb)
if __name__ == '__main__':
fu = FileUtils()
# rest = fu.read_yaml_file(r'../../config.yaml')
# print(rest)
fu = FileUtils()
# rest = fu.read_yaml_file(r'../../config.yaml')
# print(rest)

View File

@ -16,186 +16,191 @@ import yaml
def singleton(cls):
"""
Args:
cls:Decorated class
Returns:
"""
instance = {}
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instance:
instance[cls] = cls(*args, **kwargs)
return instance[cls]
return get_instance
"""
Args:
cls:Decorated class
Returns:
"""
instance = {}
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instance:
instance[cls] = cls(*args, **kwargs)
return instance[cls]
return get_instance
def request_retry_on_exception(retries=2, delay=1.5):
"""Retry on Failed Requests"""
from common.utils.exceptions import RequestSendingError
def request_decorator(func):
e = None
@wraps(func)
def wrapper(*args, **kwargs):
nonlocal e
for i in range(retries):
# st = time.time()
try:
response = func(*args, **kwargs)
# print(f"| 代码耗时--> {time.time() - st}")
print(f"| 请求地址 --> {response.request.url}")
print(f"| 请求方法 --> {response.request.method}")
print(f"| 请求头 --> {response.request.headers}")
print(f"| 请求 body --> {response.request.body}")
print(f"| 接口状态--> {response.status_code}")
print(f"| 接口耗时--> {response.elapsed}")
print(f"| 接口响应--> {response.text}")
except Exception as error:
print(f"| 第{i + 1}次请求参数=【{args}__{kwargs}")
# print(f"| 代码耗时--> {time.time() - st}")
e = error
time.sleep(delay)
else:
return response
raise RequestSendingError(f"{args}__{kwargs}", e)
return wrapper
return request_decorator
"""Retry on Failed Requests"""
from common.utils.exceptions import RequestSendingError
def request_decorator(func):
e = None
@wraps(func)
def wrapper(*args, **kwargs):
nonlocal e
for i in range(retries):
# st = time.time()
try:
response = func(*args, **kwargs)
# print(f"| 代码耗时--> {time.time() - st}")
print(f"| 请求地址 --> {response.request.url}")
print(f"| 请求方法 --> {response.request.method}")
print(f"| 请求头 --> {response.request.headers}")
print(f"| 请求 body --> {response.request.body}")
print(f"| 接口状态--> {response.status_code}")
print(f"| 接口耗时--> {response.elapsed}")
print(f"| 接口响应--> {response.text}")
except Exception as error:
print(f"| 第{i + 1}次请求参数=【{args}__{kwargs}")
# print(f"| 代码耗时--> {time.time() - st}")
e = error
time.sleep(delay)
else:
return response
raise RequestSendingError(f"{args}__{kwargs}", e)
return wrapper
return request_decorator
def list_data(datas):
"""
:param datas: Test data
:return:
"""
def wrapper(func):
setattr(func, "PARAMS", datas)
return func
return wrapper
"""
:param datas: Test data
:return:
"""
def wrapper(func):
setattr(func, "PARAMS", datas)
return func
return wrapper
def yaml_data(file_path):
"""
:param file_path:YAML file path
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
setattr(func, "PARAMS", datas)
return func
return wrapper
"""
:param file_path:YAML file path
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
setattr(func, "PARAMS", datas)
return func
return wrapper
def json_data(file_path):
"""
:param file_path: YAML file path
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = json.load(f)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = json.load(f)
setattr(func, "PARAMS", datas)
return func
return wrapper
"""
:param file_path: YAML file path
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = json.load(f)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = json.load(f)
setattr(func, "PARAMS", datas)
return func
return wrapper
import time
def run_count(count, interval, func, *args, **kwargs):
"""Run Count"""
for i in range(count):
try:
func(*args, **kwargs)
except Exception as e:
# print("====用例执行失败===", e)
# traceback.print_exc()
if i + 1 == count:
raise e
else:
print("==============开始第{}次重运行=============".format(i))
time.sleep(interval)
else:
break
"""Run Count"""
for i in range(count):
try:
func(*args, **kwargs)
except Exception as e:
# print("====用例执行失败===", e)
# traceback.print_exc()
if i + 1 == count:
raise e
else:
print("==============开始第{}次重运行=============".format(i))
time.sleep(interval)
else:
break
def rerun(count, interval=2):
"""
Decorator for rerunning a single test case; note that if using ddt, this method should be used before ddt
:param count: Number of retries on failure
:param interval: Interval time between each retry, default is three seconds
:return:
"""
def wrapper(func):
def decorator(*args, **kwargs):
run_count(count, interval, func, *args, **kwargs)
return decorator
return wrapper
"""
Decorator for rerunning a single test case; note that if using ddt, this method should be used before ddt
:param count: Number of retries on failure
:param interval: Interval time between each retry, default is three seconds
:return:
"""
def wrapper(func):
def decorator(*args, **kwargs):
run_count(count, interval, func, *args, **kwargs)
return decorator
return wrapper
def install_dependencies(func):
"""Checking and Installing Dependencies"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
print("---------------- Checking and Installing Dependencies ----------------")
subprocess.check_call(["pipenv", "install"])
print("---------------- Successfully Installed All Dependencies ----------------")
except Exception as e:
print(f"Failed to install dependencies: {str(e)}")
sys.exit(1)
else:
return func(*args, **kwargs)
return wrapper
"""Checking and Installing Dependencies"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
print("---------------- Checking and Installing Dependencies ----------------")
subprocess.check_call(["pipenv", "install"])
print("---------------- Successfully Installed All Dependencies ----------------")
except Exception as e:
print(f"Failed to install dependencies: {str(e)}")
sys.exit(1)
else:
return func(*args, **kwargs)
return wrapper
def send_request_decorator(func):
"""Decorator to handle the logic of sending requests."""
"""Decorator to handle the logic of sending requests."""
def decorator(self, host, method, extract_request_data, scripts_dir, prepost_script):
"""Handles setup, request execution, and teardown logic for sending requests.
def decorator(self, host, method, extract_request_data):
"""Handles setup, request execution, and teardown logic for sending requests.
Args:
self: The instance of the class.
host: The host for the request.
method: The HTTP method for the request.
extract_request_data: Data for extracting request details.
scripts_dir: The script dir.
prepost_script: The prepost script name.
Args:
self: The instance of the class.
host: The host for the request.
method: The HTTP method for the request.
extract_request_data: Data for extracting request details.
Returns:
The response from the request.
"""
setup_script, teardown_script = self.script(self.variables)
self.execute_dynamic_code(self.variables, setup_script)
response = func(self, host, method, extract_request_data)
self.execute_dynamic_code(self.response, teardown_script)
return response
return decorator
Returns:
The response from the request.
"""
setup_script, teardown_script = self.script(self.variables)
# 执行 excel 与 指定模块文件中的动态代码
self.execute_dynamic_code(self.variables, setup_script)
self.execute_prepost_script(scripts_dir, prepost_script, "setup")
response = func(self, host, method, extract_request_data, scripts_dir, prepost_script)
self.execute_dynamic_code(self.response, teardown_script)
self.execute_prepost_script(scripts_dir, prepost_script, "teardown")
return response
return decorator

View File

@ -13,133 +13,144 @@ logger = MyLogger()
class MyBaseException(Exception):
def __init__(self, msg):
self.msg = msg
self.logger = logger
def __str__(self):
return self.msg
def __init__(self, msg):
self.msg = msg
self.logger = logger
def __str__(self):
return self.msg
class RequestSendingError(MyBaseException):
"""请求异常"""
ERROR_CODE = 1001
def __init__(self, request_info, reason):
msg = f"请求异常request_info={request_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""请求异常"""
ERROR_CODE = 1001
def __init__(self, request_info, reason):
msg = f"请求异常request_info={request_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class DatabaseExceptionError(MyBaseException):
"""数据库异常"""
ERROR_CODE = 1002
def __init__(self, operation_info, reason):
msg = f"数据库异常:操作信息={operation_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""数据库异常"""
ERROR_CODE = 1002
def __init__(self, operation_info, reason):
msg = f"数据库异常:操作信息={operation_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ParameterExtractionError(MyBaseException):
"""参数提取异常"""
ERROR_CODE = 1003
def __init__(self, parameter_info, reason):
msg = f"参数提取异常:参数信息={parameter_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""参数提取异常"""
ERROR_CODE = 1003
def __init__(self, parameter_info, reason):
msg = f"参数提取异常:参数信息={parameter_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ParameterReplacementError(MyBaseException):
"""参数替换异常"""
ERROR_CODE = 1004
def __init__(self, parameter_info, reason):
msg = f"参数替换异常:参数名称={parameter_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""参数替换异常"""
ERROR_CODE = 1004
def __init__(self, parameter_info, reason):
msg = f"参数替换异常:参数名称={parameter_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class AssertionFailedError(MyBaseException):
"""断言异常"""
ERROR_CODE = 1005
def __init__(self, assertion, reason):
msg = f"执行断言失败:断言信息={assertion}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""断言异常"""
ERROR_CODE = 1005
def __init__(self, assertion, reason):
msg = f"执行断言失败:断言信息={assertion}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ExecuteDynamiCodeError(MyBaseException):
"""执行动态代码异常"""
ERROR_CODE = 1006
def __init__(self, code_info, reason):
msg = f"执行动态代码异常:动态代码信息={code_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""执行动态代码异常"""
ERROR_CODE = 1006
def __init__(self, code_info, reason):
msg = f"执行动态代码异常:动态代码信息={code_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class InvalidSleepTimeError(MyBaseException):
"""无效的暂停时间异常"""
ERROR_CODE = 1007
def __init__(self, sleep_time, reason):
msg = f"无效的暂停时间sleep_time={sleep_time},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""无效的暂停时间异常"""
ERROR_CODE = 1007
def __init__(self, sleep_time, reason):
msg = f"无效的暂停时间sleep_time={sleep_time},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ScriptNotFoundError(MyBaseException):
"""脚本不存在异常"""
ERROR_CODE = 1008
"""脚本不存在异常"""
ERROR_CODE = 1008
def __init__(self, script_info, reason):
msg = f"脚本不存在异常script_info={script_info},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
def __init__(self, script_info, reason):
msg = f"脚本不存在异常script_info={script_info},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ScriptExecuteError(MyBaseException):
"""脚本执行存在异常"""
ERROR_CODE = 10013
def __init__(self, script_info, reason):
msg = f"脚本执行异常script_info={script_info},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class InvalidParameterFormatError(MyBaseException):
"""无效的参数格式异常"""
ERROR_CODE = 1009
def __init__(self, parameter_info, reason):
msg = f"无效的参数格式异常parameter_info={parameter_info},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""无效的参数格式异常"""
ERROR_CODE = 1009
def __init__(self, parameter_info, reason):
msg = f"无效的参数格式异常parameter_info={parameter_info},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ResponseJsonConversionError(MyBaseException):
"""响应内容转换为 JSON 格式异常"""
ERROR_CODE = 1010
def __init__(self, response_text, reason):
msg = f"响应内容转换为 JSON 格式异常:响应内容={response_text}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""响应内容转换为 JSON 格式异常"""
ERROR_CODE = 1010
def __init__(self, response_text, reason):
msg = f"响应内容转换为 JSON 格式异常:响应内容={response_text}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class DynamicLoadingError(MyBaseException):
"""动态加载模块或文件异常"""
ERROR_CODE = 1011
def __init__(self, code_info, reason):
msg = f"动态加载模块或文件发生异常:动态模块或文件={code_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
"""动态加载模块或文件异常"""
ERROR_CODE = 1011
def __init__(self, code_info, reason):
msg = f"动态加载模块或文件发生异常:动态模块或文件={code_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class EncryptionError(MyBaseException):
"""加密失败异常"""
ERROR_CODE = 1012
"""加密失败异常"""
ERROR_CODE = 1012
def __init__(self, method_name, error_message):
msg = f"加密失败异常: 加密方法={method_name} 原因={error_message}"
super().__init__(msg)
self.logger.error(msg)
def __init__(self, method_name, error_message):
msg = f"加密失败异常: 加密方法={method_name} 原因={error_message}"
super().__init__(msg)
self.logger.error(msg)
if __name__ == '__main__':
raise EncryptionError("111","222")
raise EncryptionError("111", "222")

View File

@ -13,59 +13,66 @@ import importlib.util
import os
import sys
from common.utils.exceptions import ScriptNotFoundError
sys.path.append('..')
sys.path.append('../utils')
from common.utils.exceptions import ScriptNotFoundError, ScriptExecuteError
from common.file_handling.file_utils import FileUtils
class LoadScript:
def __init__(self):
super().__init__()
def load_script(self, script_path):
"""
加载脚本文件并返回模块对象
def __init__(self):
super().__init__()
Args:
script_path (str): 脚本文件的路径
def load_script(self, script_path):
"""
加载脚本文件并返回模块对象
Args:
script_path (str): 脚本文件的路径
Returns:
module: 脚本文件对应的模块对象
"""
try:
spec = importlib.util.spec_from_file_location(os.path.basename(script_path), script_path)
script_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(script_module)
return script_module
except Exception as e:
raise ScriptNotFoundError(script_path, e)
def load_and_execute_script(self, script_directory, script_name, method_name, request):
"""
加载并执行脚本文件中的指定方法
Args:
request: 请求数据
script_directory (str): 脚本文件所在的目录
script_name (str): 脚本文件的名称
method_name (str): 要执行的方法的名称
"""
script_path = os.path.join(script_directory, script_name)
try:
script = self.load_script(script_path)
if hasattr(script, method_name):
method = getattr(script, method_name)
return method(request)
except Exception as e:
ScriptNotFoundError(script_path, e)
return request
Returns:
module: 脚本文件对应的模块对象
"""
try:
spec = importlib.util.spec_from_file_location(os.path.basename(script_path), script_path)
script_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(script_module)
return script_module
except Exception as e:
ScriptNotFoundError(script_path, e)
return
def load_and_execute_script(self, script_directory, script_name, request, method_name=None):
"""
加载并执行脚本文件中的指定方法
Args:
request: 请求获响应对象
script_directory (str): 脚本文件所在的目录
script_name (str): 脚本文件的名称
method_name (str): 要执行的方法的名称
"""
if method_name is None:
return request
file_list = FileUtils.get_files_in_folder(script_directory)
# 指定文件夹下是否存在这个测试用例脚本
if script_name in file_list:
script_path = FileUtils.get_file_path(script_name, script_directory)
script = self.load_script(script_path)
if script and hasattr(script, method_name):
try:
method = getattr(script, method_name)
return method(request)
except Exception as e:
ScriptExecuteError(script_path, e)
return request
if __name__ == '__main__':
from config.config import Config
SCRIPTS_DIR = Config.SCRIPTS_DIR
load_and_exe_s = LoadScript()
load_and_exe_s.load_and_execute_script(SCRIPTS_DIR, 'request_script_sheetname_id.py', 'setup', {"y": "z"})
from config.config import Config
SCRIPTS_DIR = Config.SCRIPTS_DIR
load_and_exe_s = LoadScript()
load_and_exe_s.load_and_execute_script(SCRIPTS_DIR, 'prepost_script_sheetname_id.py', 'setup', {"y": "z"})

View File

@ -15,118 +15,118 @@ from common.validation.loaders import Loaders
class Validator(Loaders):
"""
校验器
主要功能
1格式化校验变量
2校验期望结果与实际结果与预期一致并返回校验结果
"""
validate_variables_list = []
assertions = []
"""
校验器
主要功能
1格式化校验变量
2校验期望结果与实际结果与预期一致并返回校验结果
"""
validate_variables_list = []
assertions = []
def __init__(self):
super().__init__()
def uniform_validate(self, validate_variables):
"""
统一格式化测试用例的验证变量validate
Args:
validate_variables: 参数格式 listdict
示例
[{"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}]
or {"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}
def __init__(self):
super().__init__()
Returns: 返回数据格式 list
示例
[{"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}]
def uniform_validate(self, validate_variables):
"""
统一格式化测试用例的验证变量validate
Args:
validate_variables: 参数格式 listdict
示例
[{"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}]
or {"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}
"""
if isinstance(validate_variables, str):
validate_variables = json.loads(validate_variables)
if isinstance(validate_variables, list):
for item in validate_variables:
self.uniform_validate(item)
elif isinstance(validate_variables, dict):
if "check" in validate_variables.keys() and "expect" in validate_variables.keys():
# 如果验证mapping中不包含comparator时默认为{"comparator": "eq"}
check_item = validate_variables.get("check")
expect_value = validate_variables.get("expect")
comparator = validate_variables.get("comparator", "eq")
self.validate_variables_list.append({
"check": check_item,
"expect": expect_value,
"comparator": comparator
})
else:
InvalidParameterFormatError(validate_variables, "参数格式错误!")
def validate(self, resp=None):
"""
校验期望结果与实际结果与预期一致
Args:
resp: ResponseObject对象实例
Returns: 返回数据格式 list
示例
[{"check":"result.user.name","comparator":"eq","expect":"chenyongzhi"}]
Returns:
"""
if isinstance(validate_variables, str):
validate_variables = json.loads(validate_variables)
if isinstance(validate_variables, list):
for item in validate_variables:
self.uniform_validate(item)
elif isinstance(validate_variables, dict):
if "check" in validate_variables.keys() and "expect" in validate_variables.keys():
# 如果验证mapping中不包含comparator时默认为{"comparator": "eq"}
check_item = validate_variables.get("check")
expect_value = validate_variables.get("expect")
comparator = validate_variables.get("comparator", "eq")
self.validate_variables_list.append({
"check": check_item,
"expect": expect_value,
"comparator": comparator
})
else:
InvalidParameterFormatError(validate_variables, "参数格式错误!")
"""
for validate_variable in self.validate_variables_list:
check_item = validate_variable['check']
expect_value = validate_variable['expect']
comparator = validate_variable['comparator']
if not str(check_item).startswith("$"):
actual_value = check_item
else:
actual_value = Extractor.extract_value_by_jsonpath(resp_obj=resp, expr=check_item)
try:
fun = self.load_built_in_comparators()[comparator]
fun(actual_value=actual_value, expect_value=expect_value)
validate_result = "通过"
except (AssertionError, TypeError) as e:
validate_result = "失败"
raise e
finally:
self.assertions.append({
'检查项': check_item,
'期望值': expect_value,
'实际值': actual_value,
'断言方法': comparator_dict.get(comparator),
"断言结果": validate_result
})
def run_validate(self, validate_variables, resp=None):
"""
统一格式化测试用例的验证变量validate然后校验期望结果与实际结果与预期一致
Args:
validate_variables:参数格式 listdict
resp:ResponseObject对象实例
def validate(self, resp=None):
"""
校验期望结果与实际结果与预期一致
Args:
resp: ResponseObject对象实例
Returns:返回校验结果
Returns:
"""
for validate_variable in self.validate_variables_list:
check_item = validate_variable['check']
expect_value = validate_variable['expect']
comparator = validate_variable['comparator']
if not str(check_item).startswith("$"):
actual_value = check_item
else:
actual_value = Extractor.extract_value_by_jsonpath(resp_obj=resp, expr=check_item)
try:
fun = self.load_built_in_comparators()[comparator]
fun(actual_value=actual_value, expect_value=expect_value)
validate_result = "通过"
except (AssertionError, TypeError) as e:
validate_result = "失败"
raise e
finally:
self.assertions.append({
'检查项': check_item,
'期望值': expect_value,
'实际值': actual_value,
'断言方法': comparator_dict.get(comparator),
"断言结果": validate_result
})
def run_validate(self, validate_variables, resp=None):
"""
统一格式化测试用例的验证变量validate然后校验期望结果与实际结果与预期一致
Args:
validate_variables:参数格式 listdict
resp:ResponseObject对象实例
Returns:返回校验结果
"""
if not validate_variables:
self.assertions = '未填写断言信息,默认断言通过!!!'
return
self.validate_variables_list.clear()
self.assertions.clear()
self.uniform_validate(validate_variables)
if not self.validate_variables_list:
raise InvalidParameterFormatError(self.validate_variables_list,
"uniform_validate 执行失败,无法进行 validate 校验")
self.validate(resp)
"""
if not validate_variables:
self.assertions = '未填写预期结果默认断言HTTP请求状态码'
return
self.validate_variables_list.clear()
self.assertions.clear()
self.uniform_validate(validate_variables)
if not self.validate_variables_list:
raise InvalidParameterFormatError(self.validate_variables_list,
"uniform_validate 执行失败,无法进行 validate 校验")
self.validate(resp)
if __name__ == '__main__':
validate_variables1 = {"check": "$.result.user.name", "comparator": "eq", "expect": "chenyongzhi"}
validate_variables2 = [
{"check": "$.code", "comparator": "eq", "expect": "200"},
{"check": "result.user.name", "comparator": "eq", "expect": "chenyongzhi"}
]
resp_obj = {"code": "200", "result": {"user": {"name": "chenyongzhi"}}}
validator = Validator()
validator.run_validate(validate_variables1, resp_obj)
print(validator.assertions)
validator.run_validate(validate_variables2, resp_obj)
print(validator.assertions)
validator.run_validate(validate_variables2, resp_obj)
print(validator.assertions)
validate_variables1 = {"check": "$.result.user.name", "comparator": "eq", "expect": "chenyongzhi"}
validate_variables2 = [
{"check": "$.code", "comparator": "eq", "expect": "200"},
{"check": "result.user.name", "comparator": "eq", "expect": "chenyongzhi"}
]
resp_obj = {"code": "200", "result": {"user": {"name": "chenyongzhi"}}}
validator = Validator()
validator.run_validate(validate_variables1, resp_obj)
print(validator.assertions)
validator.run_validate(validate_variables2, resp_obj)
print(validator.assertions)
validator.run_validate(validate_variables2, resp_obj)
print(validator.assertions)

View File

@ -3,21 +3,21 @@ import json
def setup(pm):
print("pm---------------->", pm.variables)
# request_data = pm.get_variables() # 获取得到请求数据
"""
request_data 的值: {'Url': '/login',
'Headers': '{"Content-Type": "application/json"}',
'Query Str': None,
'Request Data Type': 'params',
'Request Data': '{"account": "{{account}}", "password": "{{passwd}}"}',
'Expected': None, 'Response': '', 'Assertion': '', 'Error Log': ''
}
"""
request = pm.variables
email = json.loads(request.get("Request Data")).get("email")
pm.update_environments("email", email) # 设置环境变量
print("---->pm.get_environments", pm.get_environments("{{email}}"))
print("pm---------------->", pm.variables)
# request_data = pm.variables # 获取得到请求数据
"""
request_data 的值: {'Url': '/login',
'Headers': '{"Content-Type": "application/json"}',
'Query Str': None,
'Request Data Type': 'params',
'Request Data': '{"account": "{{account}}", "password": "{{passwd}}"}',
'Expected': None, 'Response': '', 'Assertion': '', 'Error Log': ''
}
"""
request = pm.variables
email = json.loads(request.get("Request Data")).get("email")
pm.update_environments("email", email) # 设置环境变量
print("---->pm.get_environments", pm.get_environments("{{email}}"))
setup(pm)
@ -25,19 +25,19 @@ setup(pm)
# 后置脚本代码
def tear_down(pm):
# vars_data = pm.get_environments("{{变量名称}}") # 获取环境变量
response = pm.variables # 获取得到响应结果对象
response.json()
# print(f"请求地址 --> {response.request.url}")
# print(f"请求头 --> {response.request.headers}")
# print(f"请求 body --> {response.request.body}")
# print(f"接口状态--> {response.status_code}")
# print(f"接口耗时--> {response.elapsed}")
# print(f"接口响应--> {response.text}")
token = response.json()['token']
pm.update_environments("token", token) # 重新设置环境变量
# print("---->pm.get_environments", pm.get_environments("{{BSP_TOKEN_NEWS}}"))
# print("---->pm.get_variables", pm.get_variables())
# vars_data = pm.get_environments("{{变量名称}}") # 获取环境变量
response = pm.variables # 获取得到响应结果对象
response.json()
# print(f"请求地址 --> {response.request.url}")
# print(f"请求头 --> {response.request.headers}")
# print(f"请求 body --> {response.request.body}")
# print(f"接口状态--> {response.status_code}")
# print(f"接口耗时--> {response.elapsed}")
# print(f"接口响应--> {response.text}")
token = response.json()['token']
pm.update_environments("token", token) # 重新设置环境变量
# print("---->pm.get_environments", pm.get_environments("{{BSP_TOKEN_NEWS}}"))
# print("---->pm.get_variables", pm.get_variables())
tear_down(pm)

File diff suppressed because one or more lines are too long

4
run.py
View File

@ -26,8 +26,8 @@ def run():
# # get_failed_test_cases = runner.get_failed_test_cases()
# 发送通知
# runner.email_notice()
runner.dingtalk_notice()
runner.weixin_notice()
# runner.dingtalk_notice()
# runner.weixin_notice()
if __name__ == '__main__':

View File

@ -0,0 +1,26 @@
def setup(request):
"""这是某某sheet 中第 xx条测试用例的前置脚本"""
"""前置脚本处理请求对象的逻辑代码"""
# 从request 对象获取数据信息
import json
request_obj = request.variables
print("前置脚本请求对象=", request_obj)
print("获取请求URL:", request_obj.get("Url"))
print("获取请求Header:", json.loads(request_obj.get("Headers")))
print("获取请求数据:", json.loads(request_obj.get("RequestData")))
request.update_environments("emailwww", "这个是你想要的值") # 设置环境变量
print(f"---->获取环境变量={request.get_environments('{{emailwww}}')}")
return request
def teardown(response):
"""这是某某sheet 中第 xx条测试用例的前置脚本"""
"""后置脚本处理响应对象的逻辑代码"""
print("后置脚本响应对象=", str(response.variables).replace("<", "(").replace(">", ")"))
response_text = response.variables.text
response_json = response.variables.json()
print("响应对象转json", response_json)
print("响应对象转text", response_text)
response.update_environments("response_json", response_json) # 设置环境变量
print("获取环境变量={}".format(response.get_environments('{{response_json}}')))
return response

View File

@ -1,23 +0,0 @@
def setup(request):
"""前置脚本处理请求对象的逻辑代码"""
# rquest 对象输出如下:
"""{'Run': 'YES', 'Time': 1,
'Url': '/auth/loginByNotBip',
'Headers': '{"Content-Type": "application/json"}',
'Params': None,
'Request Data': '{"account": "{{account}}", "password": "{{passwd}}"}',
'Expected': None,
'Response': '',
'Assertion': '',
'Error log': '',
None: None}"""
# 获取请求参数
request.get("Request Data") # 获取到的结果是一个json字符串使用需要转字典
return request
def teardown(response):
# print(f"执行后置代码片段处理:{response.json}")
"""后置脚本处理响应对象的逻辑代码"""
return response

Binary file not shown.

View File

@ -24,37 +24,35 @@ test_case, databases, initialize_data, host = excel.get_excel_init_and_cases()
@ddt
class TestProjectApi(unittest.TestCase):
maxDiff = None
action = Action(initialize_data, databases)
@classmethod
def setUpClass(cls) -> None:
cls.action.load_modules_from_folder(extensions)
def setUp(self) -> None:
pass
@list_data(test_case)
def test_api(self, item):
sheet, iid, condition, st, name, desc, method, expected = self.action.base_info(item)
if self.action.is_run(condition):
self.skipTest("这个测试用例听说泡面比较好吃,所以放弃执行了!!")
regex, keys, deps, jp_dict, ex_request_data = self.action.extractor_info(item)
self.action.pause_execution(st)
self.action.exc_sql(item)
if self.action.is_only_sql(method):
self.skipTest("这条测试用例被 SQL 吃了,所以放弃执行了!!")
# prepost_script = f"prepost_script_{sheet}_{iid}.py"
# item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item)
self.action.send_request(host, method, ex_request_data)
self.action.analysis_response(sheet, iid, name, desc, regex, keys, deps, jp_dict)
self.action.execute_validation(excel, sheet, iid, name, desc, expected)
@classmethod
def tearDownClass(cls) -> None:
excel.close_excel()
maxDiff = None
action = Action(initialize_data, databases)
@classmethod
def setUpClass(cls) -> None:
cls.action.load_modules_from_folder(extensions)
def setUp(self) -> None:
pass
@list_data(test_case)
def test_api(self, item):
sheet, iid, condition, st, name, desc, method, expected, prepost_script = self.action.base_info(item)
if self.action.is_run(condition):
self.skipTest("这个测试用例听说泡面比较好吃,所以放弃执行了!!")
regex, keys, deps, jp_dict, ex_request_data = self.action.extractor_info(item)
self.action.pause_execution(st)
self.action.exc_sql(item)
if self.action.is_only_sql(method):
self.skipTest("这条测试用例被 SQL 吃了,所以放弃执行了!!")
self.action.send_request(host, method, ex_request_data, Config.SCRIPTS_DIR, prepost_script)
self.action.analysis_response(sheet, iid, name, desc, regex, keys, deps, jp_dict)
self.action.execute_validation(excel, sheet, iid, name, desc, expected)
@classmethod
def tearDownClass(cls) -> None:
excel.close_excel()
if __name__ == '__main__':
unittest.main()
unittest.main()