修复提取请求参数的bug

This commit is contained in:
aaronchenyongzhi 2023-07-14 22:00:45 +08:00
parent 6536cb5f41
commit ae9b459aa3
9 changed files with 214 additions and 216 deletions

View File

@ -5,7 +5,7 @@
<sourceFolder url="file://$MODULE_DIR$" isTestSource="false" /> <sourceFolder url="file://$MODULE_DIR$" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/venv" /> <excludeFolder url="file://$MODULE_DIR$/venv" />
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="jdk" jdkName="Pipenv (apitest)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="PackageRequirementsSettings"> <component name="PackageRequirementsSettings">

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="dataSourceStorageLocal" created-in="PY-231.9161.41"> <component name="dataSourceStorageLocal" created-in="PY-231.9011.38">
<data-source name="@localhost" uuid="49b6f686-3676-4df5-9645-cd7a2fe91d80"> <data-source name="@localhost" uuid="49b6f686-3676-4df5-9645-cd7a2fe91d80">
<database-info product="MySQL" version="8.0.26" jdbc-version="4.2" driver-name="MySQL Connector/J" driver-version="mysql-connector-java-8.0.25 (Revision: 08be9e9b4cba6aa115f9b27b215887af40b159e0)" dbms="MYSQL" exact-version="8.0.26" exact-driver-version="8.0"> <database-info product="MySQL" version="8.0.26" jdbc-version="4.2" driver-name="MySQL Connector/J" driver-version="mysql-connector-java-8.0.25 (Revision: 08be9e9b4cba6aa115f9b27b215887af40b159e0)" dbms="MYSQL" exact-version="8.0.26" exact-driver-version="8.0">
<extra-name-characters>#@</extra-name-characters> <extra-name-characters>#@</extra-name-characters>

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Pipenv (api-test-project)" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Pipenv (apitest)" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser"> <component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" /> <option name="shown" value="true" />
</component> </component>

Binary file not shown.

View File

@ -18,46 +18,46 @@ from common.validation.validator import Validator
@singleton @singleton
class Action(Extractor, LoadScript, Validator): class Action(Extractor, LoadScript, Validator):
def __init__(self, initialize_data=None, bif_functions=None): def __init__(self, initialize_data=None, bif_functions=None):
super().__init__() super().__init__()
self.encrypt = EncryptData() self.encrypt = EncryptData()
self.__variables = {} self.__variables = {}
self.set_environments(initialize_data) self.set_environments(initialize_data)
self.set_bif_fun(bif_functions) self.set_bif_fun(bif_functions)
def execute_dynamic_code(self, item, code): def execute_dynamic_code(self, item, code):
self.set_variables(item) self.set_variables(item)
if code is not None: if code is not None:
try: try:
ast_obj = ast.parse(code, mode='exec') ast_obj = ast.parse(code, mode='exec')
compiled = compile(ast_obj, '<string>', 'exec') compiled = compile(ast_obj, '<string>', 'exec')
exec(compiled, {"pm": self}) exec(compiled, {"pm": self})
except SyntaxError as e: except SyntaxError as e:
error_message = f'Syntax error in dynamic code: {e}' error_message = f'Syntax error in dynamic code: {e}'
self._handle_error(error_message) self._handle_error(error_message)
except Exception as e: except Exception as e:
error_message = f"Error executing dynamic code: {e}" error_message = f"Error executing dynamic code: {e}"
self._handle_error(error_message) self._handle_error(error_message)
finally: finally:
return self.__variables return self.__variables
return item return item
def _handle_error(self, error_message): def _handle_error(self, error_message):
print(f'Error occurred: {error_message}') print(f'Error occurred: {error_message}')
def set_variables(self, item): def set_variables(self, item):
self.__variables = item self.__variables = item
def update_variables(self, key, value): def update_variables(self, key, value):
self.__variables[f"{{{{{key}}}}}"] = value self.__variables[f"{{{{{key}}}}}"] = value
def get_variables(self, key=None): def get_variables(self, key=None):
"""获取依赖表 或 依赖表中key对应的值""" """获取依赖表 或 依赖表中key对应的值"""
return self.__variables if not key else self.__variables.get(key) return self.__variables if not key else self.__variables.get(key)
def analysis_request(self, request_data, headers_crypto, headers, request_crypto, extract_request_data): def analysis_request(self, request_data, headers_crypto, headers, request_crypto, extract_request_data):
# 请求头及body加密或者加签 # 请求头及body加密或者加签
headers, request_data = self.encrypt.encrypts(headers_crypto, headers, request_crypto, request_data) headers, request_data = self.encrypt.encrypts(headers_crypto, headers, request_crypto, request_data)
# 提取请求参数信息 # 提取请求参数信息
self.substitute_data(request_data, jp_dict=extract_request_data) self.substitute_data(request_data, jp_dict=extract_request_data)
return headers, request_data return headers, request_data

View File

@ -124,7 +124,6 @@ class DataExtractor(Environments):
Returns: 字符串或者list Returns: 字符串或者list
""" """
json_path_dict = json_path_dict if isinstance(json_path_dict, dict) else json.loads(json_path_dict) json_path_dict = json_path_dict if isinstance(json_path_dict, dict) else json.loads(json_path_dict)
for key, expression in json_path_dict.items(): for key, expression in json_path_dict.items():
try: try:

View File

@ -46,7 +46,6 @@ class DependentParameter(DataExtractor):
# 函数替换 # 函数替换
key = self.pattern_fun.search(jst).group() key = self.pattern_fun.search(jst).group()
if key in self.get_environments().keys(): if key in self.get_environments().keys():
# 如果参数名称存在于关联参数表中,则调用相应的函数获取返回值,并替换字符串中的参数 # 如果参数名称存在于关联参数表中,则调用相应的函数获取返回值,并替换字符串中的参数
value_ = self.get_environments(key)() value_ = self.get_environments(key)()
jst = jst.replace(key, str(value_)) jst = jst.replace(key, str(value_))

View File

@ -10,7 +10,6 @@ import urllib3
sys.path.append("../") sys.path.append("../")
sys.path.append("./common") sys.path.append("./common")
from common.http_client import logger
from common.validation.load_modules_from_folder import LoadModulesFromFolder from common.validation.load_modules_from_folder import LoadModulesFromFolder
from common.file_handling.file_utils import FileUtils from common.file_handling.file_utils import FileUtils
from common.utils.decorators import request_decorator from common.utils.decorators import request_decorator

View File

@ -25,173 +25,174 @@ host = init_data.get('host', "") + init_data.get("path", "")
@ddt @ddt
class TestProjectApi(unittest.TestCase): class TestProjectApi(unittest.TestCase):
maxDiff = None maxDiff = None
action = Action(initialize_data, bif_functions) action = Action(initialize_data, bif_functions)
@classmethod @classmethod
def setUpClass(cls) -> None: def setUpClass(cls) -> None:
pass pass
def setUp(self) -> None: def setUp(self) -> None:
self.action.set_bif_fun(dynamic_scaling_methods) self.action.set_bif_fun(dynamic_scaling_methods)
@data(*test_case) @data(*test_case)
def test_api(self, item): def test_api(self, item):
sheet, iid, condition, st, name, desc, h_crypto, r_crypto, method = self.__base_info(item) sheet, iid, condition, st, name, desc, h_crypto, r_crypto, method = self.__base_info(item)
regex, keys, deps, jp_dict, extract_request_data = self.__extractor_info(item) regex, keys, deps, jp_dict, extract_request_data = self.__extractor_info(item)
setup_script, teardown_script = self.script(item) setup_script, teardown_script = self.script(item)
if self.__is_run(condition): if self.__is_run(condition):
return return
self.__pause_execution(st) self.__pause_execution(st)
# 首执行 sql # 首执行 sql
self.__exc_sql(item, method) self.__exc_sql(item, method)
# 执行动态代码 # 执行动态代码
item = self.action.execute_dynamic_code(item, setup_script) item = self.action.execute_dynamic_code(item, setup_script)
# prepost_script = f"prepost_script_{sheet}_{iid}.py" # prepost_script = f"prepost_script_{sheet}_{iid}.py"
# item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item) # item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item)
# 修正参数 # 修正参数
item = self.action.replace_dependent_parameter(item) item = self.action.replace_dependent_parameter(item)
url, query_str, request_data, headers, expected, request_data_type = self.__request_info(item) url, query_str, request_data, headers, expected, request_data_type = self.__request_info(item)
# 分析请求参数信息 # 分析请求参数信息
headers, request_data = self.action.analysis_request(request_data, h_crypto, headers, r_crypto,extract_request_data) headers, request_data = self.action.analysis_request(request_data, h_crypto, headers, r_crypto,
result_tuple = None extract_request_data)
result = "PASS" result_tuple = None
response = None result = "PASS"
response = None
try:
# 执行请求操作 try:
kwargs = {request_data_type: request_data, 'headers': headers, "params": query_str} # 执行请求操作
response = self.action.http_client(host, url, method, **kwargs) kwargs = {request_data_type: request_data, 'headers': headers, "params": query_str}
response = self.action.http_client(host, url, method, **kwargs)
# 执行后置代码片段
self.action.execute_dynamic_code(response, teardown_script) # 执行后置代码片段
self.action.execute_dynamic_code(response, teardown_script)
# 执行断言 返回结果元组
result_tuple = self.action.run_validate(expected, response.json()) # 执行断言 返回结果元组
self.assertNotIn("FAIL", result_tuple, "FAIL 存在结果元组中") result_tuple = self.action.run_validate(expected, response.json())
try: self.assertNotIn("FAIL", result_tuple, "FAIL 存在结果元组中")
# 提取响应 try:
self.action.substitute_data(response.json(), regex=regex, keys=keys, deps=deps, jp_dict=jp_dict) # 提取响应
self.action.substitute_data(response.json(), regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except Exception as err:
logger.error(f"提取响应失败:{sheet}_{iid}_{name}_{desc}" except Exception as err:
f"\nregex={regex};" logger.error(f"提取响应失败:{sheet}_{iid}_{name}_{desc}"
f" \nkeys={keys};" f"\nregex={regex};"
f"\ndeps={deps};" f" \nkeys={keys};"
f"\njp_dict={jp_dict}" f"\ndeps={deps};"
f"\n{err}") f"\njp_dict={jp_dict}"
except Exception as e: f"\n{err}")
result = "FAIL" except Exception as e:
logger.error(f'异常用例: {sheet}_{iid}_{name}_{desc}\n{e}') result = "FAIL"
raise e logger.error(f'异常用例: {sheet}_{iid}_{name}_{desc}\n{e}')
finally: raise e
response = response.text if response is not None else str(response) finally:
# 响应结果及测试结果回写 excel response = response.text if response is not None else str(response)
excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result, # 响应结果及测试结果回写 excel
assert_log=str(result_tuple)) excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result,
assert_log=str(result_tuple))
@classmethod
def tearDownClass(cls) -> None: @classmethod
excel.close_excel() def tearDownClass(cls) -> None:
logger.info(f"所有用例执行完毕") excel.close_excel()
logger.info(f"所有用例执行完毕")
@staticmethod
def __base_info(item): @staticmethod
""" def __base_info(item):
获取基础信息 """
""" 获取基础信息
sheet = item.pop("sheet") """
item_id = item.pop("Id") sheet = item.pop("sheet")
condition = item.pop("Run") item_id = item.pop("Id")
sleep_time = item.pop("Time") condition = item.pop("Run")
name = item.pop("Name") sleep_time = item.pop("Time")
desc = item.pop("Description") name = item.pop("Name")
headers_crypto = item.pop("Headers Crypto") desc = item.pop("Description")
request_data_crypto = item.pop("Request Data Crypto") headers_crypto = item.pop("Headers Crypto")
method = item.pop("Method") request_data_crypto = item.pop("Request Data Crypto")
return sheet, item_id, condition, sleep_time, name, desc, headers_crypto, request_data_crypto, method method = item.pop("Method")
return sheet, item_id, condition, sleep_time, name, desc, headers_crypto, request_data_crypto, method
@staticmethod
def __sql_info(item): @staticmethod
sql = item.pop("SQL") def __sql_info(item):
sql_params_dict = item.pop("Sql Params Dict") sql = item.pop("SQL")
return sql, sql_params_dict sql_params_dict = item.pop("Sql Params Dict")
return sql, sql_params_dict
@staticmethod
def __extractor_info(item): @staticmethod
""" def __extractor_info(item):
获取提取参数的基本字段信息 """
Args: 获取提取参数的基本字段信息
item: Args:
item:
Returns:
Returns:
"""
regex = item.pop("Regex") """
keys = item.pop("Regex Params List") regex = item.pop("Regex")
deps = item.pop("Retrieve Value") keys = item.pop("Regex Params List")
jp_dict = item.pop("Jsonpath") deps = item.pop("Retrieve Value")
extract_request_data = item.pop("Extract Request Data") jp_dict = item.pop("Jsonpath")
return regex, keys, deps, jp_dict, extract_request_data extract_request_data = item.pop("Extract Request Data")
return regex, keys, deps, jp_dict, extract_request_data
@staticmethod
def __request_info(item): @staticmethod
""" def __request_info(item):
请求数据 """
""" 请求数据
url = item.pop("Url") """
query_str = item.pop("Query Str") url = item.pop("Url")
request_data = item.pop("Request Data") query_str = item.pop("Query Str")
headers = item.pop("Headers") request_data = item.pop("Request Data")
expected = item.pop("Expected") headers = item.pop("Headers")
request_data_type = item.pop("Request Data Type") if item.get("Request Data Type") else 'params' expected = item.pop("Expected")
request_data_type = item.pop("Request Data Type") if item.get("Request Data Type") else 'params'
return url, query_str, request_data, headers, expected, request_data_type
return url, query_str, request_data, headers, expected, request_data_type
@staticmethod
def script(item): @staticmethod
setup_script = item.pop("Setup Script") def script(item):
teardown_script = item.pop("Teardown Script") setup_script = item.pop("Setup Script")
return setup_script, teardown_script teardown_script = item.pop("Teardown Script")
return setup_script, teardown_script
@staticmethod
def __is_run(condition): @staticmethod
is_run = condition def __is_run(condition):
if not is_run or is_run.upper() != "YES": is_run = condition
return True if not is_run or is_run.upper() != "YES":
return True
@staticmethod
def __pause_execution(sleep_time): @staticmethod
if sleep_time: def __pause_execution(sleep_time):
try: if sleep_time:
time.sleep(sleep_time) try:
except Exception as e: time.sleep(sleep_time)
logger.error("暂时时间必须是数字") except Exception as e:
raise e logger.error("暂时时间必须是数字")
raise e
def __exc_sql(self, item, method):
sql, sql_params_dict = self.__sql_info(item) def __exc_sql(self, item, method):
sql = self.action.replace_dependent_parameter(sql) sql, sql_params_dict = self.__sql_info(item)
if sql: sql = self.action.replace_dependent_parameter(sql)
try: if sql:
execute_sql_results = mysql.do_mysql(sql) try:
if execute_sql_results and sql_params_dict: execute_sql_results = mysql.do_mysql(sql)
self.action.extract_request_data(execute_sql_results, jp_dict=sql_params_dict) if execute_sql_results and sql_params_dict:
if method == "SQL" and mysql: self.action.extract_request_data(execute_sql_results, jp_dict=sql_params_dict)
return None if method == "SQL" and mysql:
except Exception as e: return None
logger.error(f'执行 sql 失败:{sql},异常信息:{e}') except Exception as e:
raise e logger.error(f'执行 sql 失败:{sql},异常信息:{e}')
return sql raise e
return sql
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()