优化action方法以及去掉一些敏感信息

This commit is contained in:
chenyongzhiaaron 2023-07-05 15:44:24 +08:00
parent e2bfdff465
commit 4e4dd079be
28 changed files with 399 additions and 492 deletions

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="dataSourceStorageLocal" created-in="PY-231.9011.38">
<component name="dataSourceStorageLocal" created-in="PY-231.9161.41">
<data-source name="@localhost" uuid="49b6f686-3676-4df5-9645-cd7a2fe91d80">
<database-info product="MySQL" version="8.0.26" jdbc-version="4.2" driver-name="MySQL Connector/J" driver-version="mysql-connector-java-8.0.25 (Revision: 08be9e9b4cba6aa115f9b27b215887af40b159e0)" dbms="MYSQL" exact-version="8.0.26" exact-driver-version="8.0">
<extra-name-characters>#@</extra-name-characters>

View File

@ -34,3 +34,75 @@
2023-07-05 09:52:15 | ERROR | key:{{get_timestamp()}},在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:05:00 | ERROR | key:<re.Match object; span=(461, 468), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:05:02 | ERROR | key:<re.Match object; span=(153, 166), match='{{BSP_TOKEN}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:05:05 | ERROR | key:<re.Match object; span=(166, 179), match='{{BSP_TOKEN}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:07:57 | ERROR | key:<re.Match object; span=(463, 470), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:07:57 | ERROR | 异常用例: 安全纯净大屏_2_登录_非BIP用户登录
'NoneType' object has no attribute 'json'
2023-07-05 15:07:59 | ERROR | key:<re.Match object; span=(409, 416), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:07:59 | ERROR | 异常用例: 安全纯净大屏_4_劳务基础配置_绑定TV
'NoneType' object has no attribute 'json'
2023-07-05 15:08:02 | ERROR | key:<re.Match object; span=(360, 367), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:08:02 | ERROR | 被提取对象非字典、非字符串、非列表不执行jsonpath提取被提取对象: None
2023-07-05 15:08:02 | ERROR | 异常用例: 安全纯净大屏_5_劳务基础配置_查询配置
'NoneType' object has no attribute 'json'
2023-07-05 15:12:26 | ERROR | key:<re.Match object; span=(459, 466), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:12:26 | ERROR | 异常用例: 安全纯净大屏_2_登录_非BIP用户登录
'NoneType' object has no attribute 'json'
2023-07-05 15:12:28 | ERROR | key:<re.Match object; span=(405, 412), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:12:29 | ERROR | 异常用例: 安全纯净大屏_4_劳务基础配置_绑定TV
'NoneType' object has no attribute 'json'
2023-07-05 15:12:32 | ERROR | key:<re.Match object; span=(356, 363), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:12:32 | ERROR | 被提取对象非字典、非字符串、非列表不执行jsonpath提取被提取对象: None
2023-07-05 15:12:32 | ERROR | 异常用例: 安全纯净大屏_5_劳务基础配置_查询配置
'NoneType' object has no attribute 'json'
2023-07-05 15:15:53 | ERROR | key:<re.Match object; span=(465, 472), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:15:53 | ERROR | 异常用例: 安全纯净大屏_2_登录_非BIP用户登录
'NoneType' object has no attribute 'json'
2023-07-05 15:15:55 | ERROR | key:<re.Match object; span=(411, 418), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:15:55 | ERROR | 异常用例: 安全纯净大屏_4_劳务基础配置_绑定TV
'NoneType' object has no attribute 'json'
2023-07-05 15:15:58 | ERROR | key:<re.Match object; span=(362, 369), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:15:58 | ERROR | 被提取对象非字典、非字符串、非列表不执行jsonpath提取被提取对象: None
2023-07-05 15:15:59 | ERROR | 异常用例: 安全纯净大屏_5_劳务基础配置_查询配置
'NoneType' object has no attribute 'json'
2023-07-05 15:17:24 | ERROR | key:<re.Match object; span=(465, 472), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:17:24 | ERROR | 异常用例: 安全纯净大屏_2_登录_非BIP用户登录
'NoneType' object has no attribute 'json'
2023-07-05 15:17:26 | ERROR | key:<re.Match object; span=(411, 418), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:17:26 | ERROR | 异常用例: 安全纯净大屏_4_劳务基础配置_绑定TV
'NoneType' object has no attribute 'json'
2023-07-05 15:17:29 | ERROR | key:<re.Match object; span=(362, 369), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:17:29 | ERROR | 被提取对象非字典、非字符串、非列表不执行jsonpath提取被提取对象: None
2023-07-05 15:17:29 | ERROR | 异常用例: 安全纯净大屏_5_劳务基础配置_查询配置
'NoneType' object has no attribute 'json'
2023-07-05 15:17:52 | ERROR | key:<re.Match object; span=(465, 472), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:17:54 | ERROR | key:<re.Match object; span=(457, 464), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:17:57 | ERROR | key:<re.Match object; span=(408, 415), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:17:57 | ERROR | 被提取对象非字典、非字符串、非列表不执行jsonpath提取被提取对象: None
2023-07-05 15:31:17 | ERROR | key:<re.Match object; span=(463, 470), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:31:19 | ERROR | key:<re.Match object; span=(455, 462), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:31:22 | ERROR | key:<re.Match object; span=(406, 413), match='{{999}}'>,在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常
2023-07-05 15:31:22 | ERROR | 被提取对象非字典、非字符串、非列表不执行jsonpath提取被提取对象: None

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.1 KiB

View File

@ -18,27 +18,44 @@ from common.validation.validator import Validator
@singleton
class Action(Extractor, LoadScript, Validator):
def __init__(self):
super().__init__()
self.encrypt = EncryptData()
self.vars = {}
def execute_dynamic_code(self, code):
try:
ast_obj = ast.parse(code, mode='exec')
compiled = compile(ast_obj, '<string>', 'exec')
exec(compiled, {"action": self})
print("exec dynamic code success")
return self.vars
except SyntaxError as e:
error_message = f'Syntax error in dynamic code: {e}'
self._handle_error(error_message)
except Exception as e:
error_message = f"Error executing dynamic code: {e}"
self._handle_error(error_message)
def _handle_error(self, error_message):
print(f'Error occurred: {error_message}')
def send_request(self, host, url, method, **kwargs):
self.http_client(host, url, method, **kwargs)
def __init__(self, initialize_data=None, bif_functions=None):
super().__init__()
self.encrypt = EncryptData()
self.__vars = {}
self.set_variable(initialize_data)
self.set_bif_fun(bif_functions)
def execute_dynamic_code(self, item, code):
self.set_vars(item)
try:
ast_obj = ast.parse(code, mode='exec')
compiled = compile(ast_obj, '<string>', 'exec')
exec(compiled, {"action": self})
except SyntaxError as e:
error_message = f'Syntax error in dynamic code: {e}'
self._handle_error(error_message)
except Exception as e:
error_message = f"Error executing dynamic code: {e}"
self._handle_error(error_message)
finally:
return self.__vars
def _handle_error(self, error_message):
print(f'Error occurred: {error_message}')
def set_vars(self, item):
self.__vars = item
def update_vars(self, key, value):
self.__vars[f"{{{{{key}}}}}"] = value
def get_vars(self, key=None):
"""获取依赖表 或 依赖表中key对应的值"""
return self.__vars if not key else self.__vars.get(key)
def analysis_request(self, request_data, jp_dict, headers_crypto, headers, request_crypto):
# 提取请求参数信息
self.substitute_data(request_data, jp_dict=jp_dict)
# 请求头及body加密或者加签
headers, request_data = self.encrypt.encrypts(headers_crypto, headers, request_crypto, request_data)
return headers, request_data

View File

@ -15,122 +15,125 @@ from common.file_handling import logger
@singleton
class DoExcel:
def __init__(self, file_name):
self.file_name = file_name
self.wb = load_workbook(self.file_name)
self.init_sheet = self.wb["init"]
# def __enter__(self):
# self.wb = load_workbook(self.file_name)
# self.init_sheet = self.wb['init']
# return self
#
# def __exit__(self, exc_type, exc_val, exc_tb):
# self.wb.save(self.file_name)
# self.wb.close()
def do_excel_yield(self):
"""
读取excel数据
Returns:
"""
sheets = eval(self.get_excel_init().get("sheets"))
for sheet_name in sheets:
sheet = self.wb[sheet_name]
max_row = sheet.max_row
max_column = sheet.max_column
first_header = []
for i in range(1, max_column + 1):
first_header.append(sheet.cell(1, i).value)
for i in range(2, max_row + 1):
sub_data = {}
for k in range(1, max_column + 1):
sub_data[first_header[k - 1]] = sheet.cell(i, k).value
sub_data["sheet"] = sheet_name
yield sub_data
@logger.log_decorator()
def write_back(self, sheet_name, i, **kwargs):
"""
Args:
sheet_name:sheet 名称
i:用例 Id
response_value: 响应结果
test_result: 测试结果
assert_log: 报错结果
Returns:
"""
response_value = kwargs.get("response")
test_result = kwargs.get("test_result")
assert_log = kwargs.get("assert_log")
sheet = self.wb[sheet_name]
sheet.cell(i + 1, 22).value = response_value
sheet.cell(i + 1, 23).value = test_result
sheet.cell(i + 1, 24).value = assert_log
self.wb.save(self.file_name)
@logger.log_decorator()
def clear_date(self):
"""
执行清空单元格数据
Returns:
"""
sheets = eval(self.get_excel_init().get("sheets"))
for sheet_name in sheets:
sheet = self.wb[sheet_name]
max_row = sheet.max_row # 获取最大行
for i in range(2, max_row + 1):
sheet.cell(i, 22).value = ""
sheet.cell(i, 23).value = ""
sheet.cell(i, 24).value = ""
self.wb.save(self.file_name)
return f"清空指定 {sheets} 中的单元格成功"
@logger.log_decorator()
def get_excel_init(self):
"""
获取 excel sheet 名称为 init 中的基础数据
Returns:init 表中的所有数据
"""
max_row = self.init_sheet.max_row # 获取最大行
max_column = self.init_sheet.max_column # 获取最大列
first_head = [] # 存储标题的 list
for i in range(max_column):
first_head.append(self.init_sheet.cell(1, i + 1).value)
init = {}
for k in range(2, max_row + 1):
for i, v in enumerate(first_head):
init[v] = self.init_sheet.cell(k, i + 1).value
if init.get("run").upper() == "YES":
break
return init
def get_excel_init_and_cases(self):
"""
Returns:初始化数据及测试用例组成的元组
"""
try:
self.clear_date()
test_case = self.do_excel_yield()
init_data = self.get_excel_init()
except Exception as e:
raise e
return init_data, test_case
def __init__(self, file_name):
self.file_name = file_name
self.wb = load_workbook(self.file_name)
self.init_sheet = self.wb["init"]
# def __enter__(self):
# self.wb = load_workbook(self.file_name)
# self.init_sheet = self.wb['init']
# return self
#
# def __exit__(self, exc_type, exc_val, exc_tb):
# self.wb.save(self.file_name)
# self.wb.close()
def do_excel_yield(self):
"""
读取excel数据
Returns:
"""
sheets = eval(self.get_excel_init().get("sheets"))
for sheet_name in sheets:
sheet = self.wb[sheet_name]
max_row = sheet.max_row
max_column = sheet.max_column
first_header = []
for i in range(1, max_column + 1):
first_header.append(sheet.cell(1, i).value)
for i in range(2, max_row + 1):
sub_data = {}
for k in range(1, max_column + 1):
sub_data[first_header[k - 1]] = sheet.cell(i, k).value
sub_data["sheet"] = sheet_name
yield sub_data
@logger.log_decorator()
def write_back(self, sheet_name, i, **kwargs):
"""
Args:
sheet_name:sheet 名称
i:序号
response_value: 响应结果
test_result: 测试结果
assert_log: 报错结果
Returns:
"""
response_value = kwargs.get("response")
test_result = kwargs.get("test_result")
assert_log = kwargs.get("assert_log")
sheet = self.wb[sheet_name]
sheet.cell(i + 1, 24).value = response_value
sheet.cell(i + 1, 25).value = test_result
sheet.cell(i + 1, 26).value = assert_log
self.wb.save(self.file_name)
@logger.log_decorator()
def clear_date(self):
"""
执行清空单元格数据
Returns:
"""
sheets = eval(self.get_excel_init().get("sheets"))
for sheet_name in sheets:
sheet = self.wb[sheet_name]
max_row = sheet.max_row # 获取最大行
for i in range(2, max_row + 1):
sheet.cell(i, 22).value = ""
sheet.cell(i, 23).value = ""
sheet.cell(i, 24).value = ""
self.wb.save(self.file_name)
return f"清空指定 {sheets} 中的单元格成功"
@logger.log_decorator()
def get_excel_init(self):
"""
获取 excel sheet 名称为 init 中的基础数据
Returns:init 表中的所有数据
"""
max_row = self.init_sheet.max_row # 获取最大行
max_column = self.init_sheet.max_column # 获取最大列
first_head = [] # 存储标题的 list
for i in range(max_column):
first_head.append(self.init_sheet.cell(1, i + 1).value)
init = {}
for k in range(2, max_row + 1):
for i, v in enumerate(first_head):
init[v] = self.init_sheet.cell(k, i + 1).value
if init.get("run").upper() == "YES":
break
return init
def get_excel_init_and_cases(self):
"""
Returns:初始化数据及测试用例组成的元组
"""
try:
self.clear_date()
test_case = self.do_excel_yield()
init_data = self.get_excel_init()
except Exception as e:
raise e
return init_data, test_case
def close_excel(self):
self.wb.close()
if __name__ == '__main__':
from common.config import Config
file_n = Config.test_case
excel = DoExcel(file_n)
# excel.get_excel_init()
# excel.do_excel()
# excel.clear_date()
# excel.do_excel()
from common.config import Config
file_n = Config.test_case
excel = DoExcel(file_n)
# excel.get_excel_init()
# excel.do_excel()
# excel.clear_date()
# excel.do_excel()

View File

@ -1,34 +0,0 @@
class Hooks:
def __init__(self):
self.before_request_funcs = [] # 存放 before_request 钩子函数的列表
self.after_request_funcs = [] # 存放 after_request 钩子函数的列表
def before_request(self, func):
"""
注册 before_request 钩子函数将其添加到 before_request_funcs 列表中
"""
self.before_request_funcs.append(func)
return func
def after_request(self, func):
"""
注册 after_request 钩子函数将其添加到 after_request_funcs 列表中
"""
self.after_request_funcs.append(func)
return func
def run_before_request_funcs(self, request):
"""
依次执行 before_request 钩子函数处理请求参数并返回处理后的请求对象
"""
for func in self.before_request_funcs:
request = func(request)
return request
def run_after_request_funcs(self, response):
"""
依次执行 after_request 钩子函数处理响应结果并返回处理后的响应对象
"""
for func in self.after_request_funcs:
response = func(response)
return response

View File

@ -50,7 +50,7 @@ class Loaders(Pyt):
# Returns:
#
# """
# for name, item in vars(model).items():
# for name, item in __vars(model).items():
# if isinstance(item, types.FunctionType):
# Variables.update_variable(f"{name}()", item)

View File

@ -1,9 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: __init__.py.py
@time: 2023/6/13 16:09
@desc:
"""

View File

@ -1,64 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: hooks.py
@time: 2023/6/16 16:52
@desc:
"""
import requests
class Hooks:
def __init__(self):
self.before_request_funcs = []
self.after_request_funcs = []
def before_request(self, func):
"""
注册 before_request 钩子函数
"""
self.before_request_funcs.append(func)
return func
def after_request(self, func):
"""
注册 after_request 钩子函数
"""
self.after_request_funcs.append(func)
return func
def execute_hooks(self, url, method, **kwargs):
"""
执行所有的钩子函数
"""
for func in self.before_request_funcs:
kwargs = func(url, method, **kwargs)
response = requests.request(method, url, **kwargs)
for func in self.after_request_funcs:
response = func(response)
return response
hooks = Hooks()
def before_decorator(func):
def wrapper(*args, **kwargs):
# hooks.execute_hooks(*args, **kwargs)
return func(*args, **kwargs)
return wrapper
def after_decorator(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
# hooks.execute_hooks(*args, **kwargs)
return result
return wrapper

View File

@ -1,59 +0,0 @@
# !/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: prepost_script.py
@time: 2023/6/16 16:58
@desc:
"""
from common.action import Action
from temp.hooks import hooks
@hooks.before_request
def add_authentication_headers(url, method, **kwargs):
"""
添加认证头信息
"""
print("------开始执行前置操作-----")
headers = kwargs.get('headers', {})
headers["Authorization"] = "Bearer " + "这是token"
kwargs['headers'] = headers
return kwargs
@hooks.before_request
def handle_dependent_parameters(url, method, **kwargs):
"""
处理依赖参数
"""
print("------开始执行后置操作-----")
payload = kwargs.get('json', {})
payload["title"] = payload.get("title")
kwargs['json'] = payload
return kwargs
# if __name__ == '__main__':
# action = Action()
# code = 'def setup(action):\n print(action.vars)\n print(action.get_variable())'
# ast_obj = ast.parse(code, mode='exec')
# compiled = compile(ast_obj, '<string>', 'exec')
# print("======>", compiled)
# result = exec(compiled, {"action": action})
if __name__ == '__main__':
import ast
action = Action()
func_str = '\ndef setup(action):\n print("action.vars",action.vars)\n print("action.get_cariable()",action.get_variable())\n \nsetup(action)'
# 解析代码字符串为抽象语法树
ast_obj = ast.parse(func_str, mode='exec')
# 编译抽象语法树
compiled = compile(ast_obj, '<string>', 'exec')
# 执行编译后的代码
exec(compiled, {"action": action})

View File

@ -24,190 +24,171 @@ host = init_data.get('host', "") + init_data.get("path", "")
@ddt
class TestProjectApi(unittest.TestCase):
maxDiff = None
action = Action()
@classmethod
def setUpClass(cls) -> None:
cls.action.set_variable(initialize_data) # 加载初始化数据
cls.action.set_bif_fun(bif_functions) # 加载内置方法
@data(*test_case)
def test_api(self, item):
sheet, iid, condition, st, name, desc, headers_crypto, request_crypto, method = self.__base_info(item)
regex, keys, deps, jp_dict, extract_request_data = self.__extractor_info(item)
if self.__is_run(condition):
return
self.__pause_execution(st)
sql, sql_params_dict = self.__sql_info(item)
# 首先执行sql替换,将sql替换为正确的sql语句
sql = self.action.replace_dependent_parameter(sql)
self.__exc_sql(sql, sql_params_dict, method)
# 拼接动态代码段文件
# prepost_script = f"prepost_script_{sheet}_{iid}.py"
# item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item)
# 替换 URL, PARAMETERS, HEADER,期望值
item = self.action.replace_dependent_parameter(item)
url, query_str, request_data, headers, expected, request_data_type, setup_script, teardown_script = self.__request_info(
item)
self.action.vars.update({"url": url, "query_str": query_str, "request_data": request_data, "headers": headers})
# 执行前置动态代码
self.action.execute_dynamic_code(setup_script)
url = self.action.vars.get("url")
query_str = self.action.vars.get("query_str")
request_data = self.action.vars.get("request_data")
headers = self.action.vars.get("headers")
# 提取请求参数信息
self.action.substitute_data(request_data, jp_dict=jp_dict)
# 请求头及请求body加密或者加签
headers, request_data = self.action.encrypt.encrypts(headers_crypto, headers, request_crypto, request_data)
result_tuple = None
result = "PASS"
response = None
try:
# 执行请求操作
kwargs = {request_data_type: request_data, 'headers': headers, "params": query_str}
response = self.action.send_request(host, url, method, **kwargs)
# response = self.action.http_client(host, url, method, **kwargs)
# 执行后置代码片段
self.action.execute_dynamic_code(teardown_script)
# self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "teardown", response)
result_tuple = self.action.run_validate(expected, response.json()) # 执行断言 返回结果元组
self.assertNotIn("FAIL", result_tuple, "FAIL 存在结果元组中")
try:
# 提取响应
self.action.substitute_data(response.json(), regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except Exception as err:
logger.error(f"提取响应失败:{sheet}_{iid}_{name}_{desc}"
f"\nregex={regex};"
f" \nkeys={keys};"
f"\ndeps={deps};"
f"\njp_dict={jp_dict}"
f"\n{err}")
except Exception as e:
result = "FAIL"
logger.error(f'异常用例: {sheet}_{iid}_{name}_{desc}\n{e}')
raise e
finally:
response = response.text if response is not None else str(response)
assert_log = str(result_tuple)
# 响应结果及测试结果回写 excel
excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result, assert_log=assert_log)
@classmethod
def tearDownClass(cls) -> None:
logger.info(f"所有用例执行完毕")
@staticmethod
def __base_info(item):
"""
获取基础信息
Args:
item:
Returns:
"""
sheet = item.pop("sheet")
item_id = item.pop("Id")
condition = item.pop("Run")
sleep_time = item.pop("Time")
name = item.pop("Name")
desc = item.pop("Description")
headers_crypto = item.pop("Headers Crypto")
request_data_crypto = item.pop("Request Data Crypto")
method = item.pop("Method")
return sheet, item_id, condition, sleep_time, name, desc, headers_crypto, request_data_crypto, method
@staticmethod
def __sql_info(item):
sql = item.pop("SQL")
sql_params_dict = item.pop("Sql Params Dict")
return sql, sql_params_dict
@staticmethod
def __extractor_info(item):
"""
获取提取参数的基本字段信息
Args:
item:
Returns:
"""
regex = item.pop("Regex")
keys = item.pop("Regex Params List")
deps = item.pop("Retrieve Value")
jp_dict = item.pop("Jsonpath")
extract_request_data = item.pop("Extract Request Data")
return regex, keys, deps, jp_dict, extract_request_data
@staticmethod
def __request_info(item):
"""
请求数据
Args:
item:
Returns:
"""
url = item.pop("Url")
query_str = item.pop("Query Str")
request_data = item.pop("Request Data")
headers = item.pop("Headers")
expected = item.pop("Expected")
request_data_type = item.pop("Request Data Type") if item.get("Request Data Type") else 'params'
setup_script = item.pop("Setup Script")
teardown_script = item.pop("Teardown Script")
return url, query_str, request_data, headers, expected, request_data_type, setup_script, teardown_script
@staticmethod
def __is_run(condition):
is_run = condition
if not is_run or is_run.upper() != "YES":
return True
@staticmethod
def __pause_execution(sleep_time):
if sleep_time:
try:
time.sleep(sleep_time)
except Exception as e:
logger.error("暂时时间必须是数字")
raise e
def __exc_sql(self, sql, sql_params_dict, method):
if sql:
try:
execute_sql_results = mysql.do_mysql(sql)
if execute_sql_results and sql_params_dict:
self.action.extract_request_data(execute_sql_results, jp_dict=sql_params_dict)
if method == "SQL" and mysql:
return None
except Exception as e:
logger.error(f'执行 sql 失败:{sql},异常信息:{e}')
raise e
return sql
maxDiff = None
action = Action(initialize_data, bif_functions)
@classmethod
def setUpClass(cls) -> None:
pass
#
@data(*test_case)
def test_api(self, item):
sheet, iid, condition, st, name, desc, h_crypto, r_crypto, method = self.__base_info(item)
regex, keys, deps, jp_dict, extract_request_data = self.__extractor_info(item)
setup_script, teardown_script = self.script(item)
if self.__is_run(condition):
return
self.__pause_execution(st)
# 首执行 sql
self.__exc_sql(item, method)
# 执行动态代码
item = self.action.execute_dynamic_code(item, setup_script)
# prepost_script = f"prepost_script_{sheet}_{iid}.py"
# item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item)
# 修正参数
item = self.action.replace_dependent_parameter(item)
url, query_str, request_data, headers, expected, request_data_type = self.__request_info(item)
# 分析请求参数信息
headers, request_data = self.action.analysis_request(request_data, jp_dict, h_crypto, headers, r_crypto)
result_tuple = None
result = "PASS"
response = None
try:
# 执行请求操作
kwargs = {request_data_type: request_data, 'headers': headers, "params": query_str}
response = self.action.http_client(host, url, method, **kwargs)
# 执行后置代码片段
self.action.execute_dynamic_code(response, teardown_script)
# 执行断言 返回结果元组
result_tuple = self.action.run_validate(expected, response.json())
self.assertNotIn("FAIL", result_tuple, "FAIL 存在结果元组中")
try:
# 提取响应
self.action.substitute_data(response.json(), regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except Exception as err:
logger.error(f"提取响应失败:{sheet}_{iid}_{name}_{desc}"
f"\nregex={regex};"
f" \nkeys={keys};"
f"\ndeps={deps};"
f"\njp_dict={jp_dict}"
f"\n{err}")
except Exception as e:
result = "FAIL"
logger.error(f'异常用例: {sheet}_{iid}_{name}_{desc}\n{e}')
raise e
finally:
response = response.text if response is not None else str(response)
# 响应结果及测试结果回写 excel
excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result,
assert_log=str(result_tuple))
@classmethod
def tearDownClass(cls) -> None:
excel.close_excel()
logger.info(f"所有用例执行完毕")
@staticmethod
def __base_info(item):
"""
获取基础信息
"""
sheet = item.pop("sheet")
item_id = item.pop("Id")
condition = item.pop("Run")
sleep_time = item.pop("Time")
name = item.pop("Name")
desc = item.pop("Description")
headers_crypto = item.pop("Headers Crypto")
request_data_crypto = item.pop("Request Data Crypto")
method = item.pop("Method")
return sheet, item_id, condition, sleep_time, name, desc, headers_crypto, request_data_crypto, method
@staticmethod
def __sql_info(item):
sql = item.pop("SQL")
sql_params_dict = item.pop("Sql Params Dict")
return sql, sql_params_dict
@staticmethod
def __extractor_info(item):
"""
获取提取参数的基本字段信息
Args:
item:
Returns:
"""
regex = item.pop("Regex")
keys = item.pop("Regex Params List")
deps = item.pop("Retrieve Value")
jp_dict = item.pop("Jsonpath")
extract_request_data = item.pop("Extract Request Data")
return regex, keys, deps, jp_dict, extract_request_data
@staticmethod
def __request_info(item):
"""
请求数据
"""
url = item.pop("Url")
query_str = item.pop("Query Str")
request_data = item.pop("Request Data")
headers = item.pop("Headers")
expected = item.pop("Expected")
request_data_type = item.pop("Request Data Type") if item.get("Request Data Type") else 'params'
return url, query_str, request_data, headers, expected, request_data_type
@staticmethod
def script(item):
setup_script = item.pop("Setup Script")
teardown_script = item.pop("Teardown Script")
return setup_script, teardown_script
@staticmethod
def __is_run(condition):
is_run = condition
if not is_run or is_run.upper() != "YES":
return True
@staticmethod
def __pause_execution(sleep_time):
if sleep_time:
try:
time.sleep(sleep_time)
except Exception as e:
logger.error("暂时时间必须是数字")
raise e
def __exc_sql(self, item, method):
sql, sql_params_dict = self.__sql_info(item)
sql = self.action.replace_dependent_parameter(sql)
if sql:
try:
execute_sql_results = mysql.do_mysql(sql)
if execute_sql_results and sql_params_dict:
self.action.extract_request_data(execute_sql_results, jp_dict=sql_params_dict)
if method == "SQL" and mysql:
return None
except Exception as e:
logger.error(f'执行 sql 失败:{sql},异常信息:{e}')
raise e
return sql
if __name__ == '__main__':
unittest.main()
unittest.main()