api-automation-test/ApiAutomationTest/app/cores/dispatcher.py

1022 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
from flask import current_app, _request_ctx_stack
from abc import ABC, abstractmethod
from flask import session, request
from typing import Optional, Union
from concurrent.futures import ThreadPoolExecutor
import traceback
import json
from functools import partial
import contextlib
from app.template_global import sort_by_order_in_module, sort_by_order_in_project, sort_by_order_in_logic_controller
from app.cores.dictionaries import (ELEMENT_TYPE, STATUS, CASE_TYPE, DISPATCHER_STATUS, DISPATCHER_TYPE,
DISPATCHER_END_TYPE, REPORT_RESULT, TOOL_TYPE, LOGIC_CONTROLLER_TYPE,
DISPATCHER_TRIGGER_TYPE)
from app.cores.logger import DispatcherLogger
from app.models import (Case, Scene, Module, Project, Dispatcher, DispatcherDetail, Report, ReportCaseData,
ReportCaseExpectationData, LogicController, DingTalkRobotSetting, Tool, ReportToolData,
EmailReceiverSetting, ProjectAdvancedConfiguration, SubElementInLogicController)
from app.cores.ws import emit_dispatcher_result, emit_dispatcher_end, emit_dispatcher_start
from app.cores.exceptions import *
from app.cores import dingtalk
from app.cores.email import send_email_report
from app.cores.case.http.http_cookie_pool_manager import HTTPCookiePoolManager
class AbstractDispatcher(ABC):
def __init__(self, element, logger=None, dispatcher=None, dispatcher_type=None):
"""
:param element: 构建元素
:type element: Union[Case, Scene, Module, Project]
:param logger: 调度日志
:type logger: Optional[DispatcherLogger]
:param dispatcher: 调度数据
:type dispatcher: Optional[Dispatcher]
:param dispatcher_type: 标识构建是通过单独案例调试(DISPATCHER_TYPE.DEBUG)还是通过模块/项目构建测试(DISPATCHER_TYPE.BUILD)
:type dispatcher_type: str
"""
# 调度数据对象
self.dispatcher = dispatcher
# 调度类型
self.dispatcher_type = dispatcher_type
# 调度日志
self.dispatcher_logger = logger
# 标识触发调度开始的元素
# 对于参与调度的元素默认将触发标志置为False
self.element_is_dispatcher_trigger = False
# 触发调度元素类型 PROJECT or MODULE
self.trigger_element_type = None
# 当前组件类型
self.element_type = None
# 当前组件调度详细数据对象
self.dispatcher_detail = None
# 调试模式直接返回退出__init__
if self.dispatcher_type == DISPATCHER_TYPE.DEBUG:
# 标记当前执行案例所属的project_id执行结束后会删除掉该session
# 用于在app.cores.parser.new_parse_data 中方便获取指定项目的变量池
# 定时任务可能存在并发执行的情况而调度任务是在单独的子线程中执行因此即使多个调度任务并发执行对session进行操作互不影响
# 对于debug模式执行时由于Flask本身对于每次请求都是不同线程且不同session因此也不会相互影响
if isinstance(element, Tool):
scene_controller = SubElementInLogicController.get_scene_controller_by_element(element_id=element.id,
element_type=ELEMENT_TYPE.TOOL)
project_id = scene_controller.scene.module.project.id
session['project_id'] = project_id
elif isinstance(element, Case):
project_id = element.scene.module.project.id
session['project_id'] = project_id
return
# 下面是对参与调度的元素进行处理
stop_on_error = True # 调度执行发生错误时终止执行
if isinstance(element, Project):
self.element_type = ELEMENT_TYPE.PROJECT
project_id = element.id
stop_on_error = element.project_advanced_configuration.stop_on_error
elif isinstance(element, Module):
self.element_type = ELEMENT_TYPE.MODULE
project_id = element.project.id
stop_on_error = element.project.project_advanced_configuration.stop_on_error
elif isinstance(element, Scene):
self.element_type = ELEMENT_TYPE.SCENE
project_id = element.module.project.id
elif isinstance(element, Tool):
self.element_type = ELEMENT_TYPE.TOOL
project_id = None # 根据tool.id较难找到其project_id
elif isinstance(element, Case):
self.element_type = ELEMENT_TYPE.CASE
project_id = element.scene.module.project.id
elif isinstance(element, LogicController):
self.element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
project_id = None # 根据logic_controller.id较难找到其project_id
else:
raise TypeError('调度构建暂未支持该类型组件element_type=%s' % (type(element)))
# 只有调度触发元素会执行下面if语句块内的代码
if (self.dispatcher_logger is None or self.dispatcher is None) and self.element_type in [ELEMENT_TYPE.PROJECT, ELEMENT_TYPE.MODULE]:
# 标记当前执行案例所属的project_id调度结束后会删除掉该session
# 用于在app.cores.parser.new_parse_data 中方便获取指定项目的变量池
# 定时任务可能存在并发执行的情况而调度任务是在单独的子线程中执行因此即使多个调度任务并发执行对session进行操作互不影响
# 对于debug模式执行时由于Flask本身对于每次请求都是不同线程且不同session因此也不会相互影响
session['project_id'] = project_id
self.element_is_dispatcher_trigger = True
self.trigger_element_type = self.element_type
# 如果当前element是触发者通过项目/模块开始构建测试),则创建一条调度数据, 并实例化调度日志dispatcher_logger
self.dispatcher = Dispatcher.add(element_type=self.trigger_element_type, element_id=element.id)
self.dispatcher.update_status(status=DISPATCHER_STATUS.RUNNING)
self.dispatcher.end_type = DISPATCHER_END_TYPE.SUCCESS # 标识调度终止类型
self.dispatcher.stop_on_error = stop_on_error # 调度执行发生错误时终止执行
# 实例化调度日志dispatcher_logger
self.dispatcher_logger = DispatcherLogger(use_memory_string_handler=True,
use_dispatcher_log_db_handler=True,
dispatcher_id=self.dispatcher.id)
# 调度报告
report = Report.add(name=element.name, result=REPORT_RESULT.RUNNING, dispatcher_id=self.dispatcher.id)
# 发送调度执行开始信息
emit_dispatcher_start(id=self.dispatcher.id, type=self.trigger_element_type, report_id=report.id)
# 调度预处理
self._dispatcher_set_up()
# 创建一条调度子数据
if self.element_type in [ELEMENT_TYPE.PROJECT, ELEMENT_TYPE.MODULE, ELEMENT_TYPE.SCENE]:
# 元素类型为Project、Module、Scene时会在构造函数中直接创建调度子数据。而元素类型Case/Tool/LogicController是在Scene.execute()方法中创建
self.dispatcher_detail = DispatcherDetail.add(element_type=self.element_type, element_id=element.id,
element_name=element.name, dispatcher=self.dispatcher)
@abstractmethod
def set_up(self):
"""执行前"""
pass
@abstractmethod
def execute(self):
"""执行"""
pass
@abstractmethod
def tear_down(self):
"""执行后"""
pass
def clean(self):
# 调试任务最后步骤
if self.dispatcher_type == DISPATCHER_TYPE.DEBUG:
# 清理日志
self.dispatcher_logger.close()
# 清除当前请求session中project_id
session.pop('project_id')
# 调度任务最后步骤
elif self.dispatcher_type == DISPATCHER_TYPE.BUILD and self.element_is_dispatcher_trigger:
# 更新结束日期
self.dispatcher.update_end_time()
# 报告分析
report_data = self._analyse_report_data(report=self.dispatcher.report)
# 发送钉钉通知
self._ding_talk_send_message(report_data=report_data)
# 发送邮件通知
self._email_send_summary_report(report_data=report_data)
# 清理日志
self.dispatcher_logger.close()
# 发送调度执行结束信息
emit_dispatcher_end(id=self.dispatcher.id,
type=self.trigger_element_type,
end_type=self.dispatcher.end_type)
# 清除当前请求session中project_id
session.pop('project_id')
# 置状态为已完成
self.dispatcher.update_status(status=DISPATCHER_STATUS.FINISHED)
def run(self):
try:
self.set_up()
self.execute()
self.tear_down()
self.check_dispatcher_status_and_stop()
finally:
self.clean()
def exception_handler(self, e):
"""
异常处理
:param e: 异常对象
:type e: Exception
"""
if isinstance(e, ManualStopException):
if self.dispatcher_type != DISPATCHER_TYPE.DEBUG:
self.dispatcher.end_type = DISPATCHER_END_TYPE.ABORT # 标识调度终止类型
self.dispatcher.update_status(status=DISPATCHER_STATUS.STOPPED)
raise e
else:
self.dispatcher_logger.logger.error('执行异常: %s' % e)
self.dispatcher_logger.logger.error(traceback.format_exc())
if self.dispatcher_type != DISPATCHER_TYPE.DEBUG and self.dispatcher.stop_on_error:
self.dispatcher.update_status(status=DISPATCHER_STATUS.STOPPED)
self.dispatcher.end_type = DISPATCHER_END_TYPE.ERROR # 标识调度终止类型
raise e
def check_dispatcher_status_and_stop(self):
"""检查调度状态,如果是正在停止状态则终止执行"""
if self.dispatcher.status == DISPATCHER_STATUS.STOPPING:
msg = '检测到调度id:%s的状态为%s, 将终止运行' % (self.dispatcher.id, self.dispatcher.status)
self.dispatcher_logger.logger.warning(msg)
raise ManualStopException(msg)
def _dispatcher_set_up(self):
"""调度预处理,执行测试前执行"""
self.dispatcher_logger.logger.info('[执行预处理][开始]')
# 清理http请求的cookie数据
self._clear_http_cookie()
self.dispatcher_logger.logger.info('[执行预处理][结束]')
def _analyse_report_data(self, report):
"""
分析报告数据
:param report: 本次构建所报告
:type report: Report
:return: 案例执行结果
:rtype: dict
"""
self.dispatcher_logger.logger.info('[报告分析][开始]')
report_case_data = report.report_case_data
case_count = 0
success_count = 0
failure_count = 0
error_count = 0
skip_count = 0
report_result = ''
start_time = 0
end_time = 0
elapsed_time = 0
dispatcher_id = 0
report_id = 0
report_name = ''
trigger_element_type = ''
dispatcher_end_type = ''
dispatcher_trigger_type = ''
module_results = []
try:
dispatcher_id = self.dispatcher.id
report_id = self.dispatcher.report.id
report_name = self.dispatcher.report.name + ' ' + str(self.dispatcher.start_time)
trigger_element_type = self.trigger_element_type
dispatcher_end_type = self.dispatcher.end_type
dispatcher_trigger_type = session.get('dispatcher_trigger_type', '')
# 按模块汇总结果
for data in report_case_data:
for module_result in module_results:
if module_result['module_id'] == data.module_id:
module_result['case_count'] += 1
module_result['success_count'] += 1 if data.result == REPORT_RESULT.SUCCESS else 0
module_result['failure_count'] += 1 if data.result == REPORT_RESULT.FAILURE else 0
module_result['error_count'] += 1 if data.result == REPORT_RESULT.ERROR else 0
module_result['skip_count'] += 1 if data.result == REPORT_RESULT.SKIP else 0
break
else: # 如果module_results中没有找到则插入
module_result = {
'module_id': data.module_id,
'module_name': data.module_name,
'case_count': 1,
'success_count': 1 if data.result == REPORT_RESULT.SUCCESS else 0,
'failure_count': 1 if data.result == REPORT_RESULT.FAILURE else 0,
'error_count': 1 if data.result == REPORT_RESULT.ERROR else 0,
'skip_count': 1 if data.result == REPORT_RESULT.SKIP else 0,
}
module_results.append(module_result)
# 分析本次构建所有案例执行结果
for data in report_case_data:
case_count += 1
if data.result == REPORT_RESULT.SUCCESS:
success_count += 1
elif data.result == REPORT_RESULT.FAILURE:
failure_count += 1
elif data.result == REPORT_RESULT.ERROR:
error_count += 1
elif data.result == REPORT_RESULT.SKIP:
skip_count += 1
if self.dispatcher.end_type == DISPATCHER_END_TYPE.ERROR:
report.update_result(REPORT_RESULT.ERROR)
report_result = REPORT_RESULT.ERROR
elif self.dispatcher.end_type == DISPATCHER_END_TYPE.ABORT:
report.update_result(REPORT_RESULT.ABORT)
report_result = REPORT_RESULT.ABORT
elif success_count + skip_count == case_count:
report.update_result(REPORT_RESULT.SUCCESS)
report_result = REPORT_RESULT.SUCCESS
else:
report.update_result(REPORT_RESULT.FAILURE)
report_result = REPORT_RESULT.FAILURE
self.dispatcher_logger.logger.info('[报告分析]本次共执行了%s个案例' % case_count)
start_time = report.dispatcher.start_time
end_time = report.dispatcher.end_time
elapsed_time = end_time - start_time
self.dispatcher_logger.logger.info('[报告分析]开始时间: %s, 结束时间: %s, 耗时: %s' % (start_time, end_time, elapsed_time))
self.dispatcher_logger.logger.info(
'[报告分析]成功:%s个, 失败:%s个, 错误%s个, 跳过%s' % (success_count, failure_count, error_count, skip_count))
self.dispatcher_logger.logger.info('[报告分析][结束]')
except Exception as e:
report_result = REPORT_RESULT.ERROR
self.dispatcher_logger.logger.error('[报告分析][异常] %s' % traceback.format_exc())
return {
'case_count': case_count, # 总案例个数
'success_count': success_count, # 成功个数
'failure_count': failure_count, # 失败个数
'error_count': error_count, # 错误个数
'skip_count': skip_count, # 跳过个数
'report_result': report_result, # 结果
'start_time': start_time, # 调度开始时间
'end_time': end_time, # 调度结束时间
'elapsed_time': elapsed_time, # 耗时
'dispatcher_id': dispatcher_id, # 调度id
'report_id': report_id, # 报告id
'report_name': report_name, # 报告名称
'trigger_element_type': trigger_element_type, # 调度元素类型
'dispatcher_end_type': dispatcher_end_type, # 调度结束类型
'dispatcher_trigger_type': dispatcher_trigger_type, # 调度触发类型 手动/定时任务
'module_results': module_results, # 统计各模块案例执行情况
}
def _ding_talk_send_message(self, report_data):
"""
调度结束时发送钉钉通知
:param report_data: 案例执行结果数据
:type report_data: dict
"""
self.dispatcher_logger.logger.info('[钉钉通知][开始]')
project_id = session.get('project_id')
if project_id is None:
self.dispatcher_logger.logger.warning('[钉钉通知]当前调度获取项目编号project_id为None, 无法获取钉钉配置信息')
else:
text = """# 构建结果
调度编号: {dispatcher_id}
组件类型: {element_type}
执行结果: {report_result}
成功: {success_count} 失败: {failure_count} 错误: {error_count} 跳过: {skip_count}
开始时间: {start_time}
结束时间: {end_time}
耗时: {elapsed_time}
"""
data = {
"msgtype": "text",
"text": {
"title": "测试调度通知",
"content": text.format(
dispatcher_id=str(self.dispatcher.id),
element_type='模块' if self.dispatcher.element_type == ELEMENT_TYPE.MODULE else '项目',
report_result=str(report_data.get('report_result', '')),
success_count=str(report_data.get('success_count', '')),
failure_count=str(report_data.get('failure_count', '')),
error_count=str(report_data.get('error_count', '')),
skip_count=str(report_data.get('skip_count', '')),
start_time=str(report_data.get('start_time', '')),
end_time=str(report_data.get('end_time', '')),
elapsed_time=str(report_data.get('elapsed_time', '')),
)
},
}
# 检查项目钉钉通知开关
project_advanced_configuration = ProjectAdvancedConfiguration.query.filter_by(project_id=project_id).first()
if project_advanced_configuration is None:
self.dispatcher_logger.logger.warning('[钉钉通知]当前项目project_id=%s, 未获取到钉钉通知开关数据' % project_id)
else:
if project_advanced_configuration.ding_talk_notify:
ding_talk_robot_settings = DingTalkRobotSetting.get_project_ding_talk_robot_setting(project_id=project_id)
if not ding_talk_robot_settings:
self.dispatcher_logger.logger.warning('钉钉通知]当前项目project_id=%s, 未找到相关钉钉配置信息' % project_id)
else:
for ding_talk_robot_setting in ding_talk_robot_settings:
if ding_talk_robot_setting.enable:
self.dispatcher_logger.logger.info('[钉钉通知][发送消息]')
try:
response_content = dingtalk.send_message(data=data,
access_token=ding_talk_robot_setting.access_token,
secret=ding_talk_robot_setting.secret,
at_all=ding_talk_robot_setting.at_all,
at_mobiles=ding_talk_robot_setting.at_mobiles)
self.dispatcher_logger.logger.info('[钉钉通知][收到应答]: ' + str(response_content))
except Exception as e:
self.dispatcher_logger.logger.error('[钉钉通知][发送错误]: 错误信息:\n' + traceback.format_exc())
else:
self.dispatcher_logger.logger.info('[钉钉通知]钉钉通知开关关闭,不进行钉钉通知')
self.dispatcher_logger.logger.info('[钉钉通知][结束]')
def _email_send_summary_report(self, report_data):
"""
调度结束时发送邮件通知
:param report_data: 案例执行结果数据
:type report_data: dict
"""
self.dispatcher_logger.logger.info('[邮件通知][开始]')
project_id = session.get('project_id')
if project_id is None:
self.dispatcher_logger.logger.warning('[邮件通知]当前调度获取项目编号project_id为None, 无法获取邮件收件人配置信息')
else:
# 检查项目邮件通知开关
project_advanced_configuration = ProjectAdvancedConfiguration.query.filter_by(project_id=project_id).first()
if project_advanced_configuration is None:
self.dispatcher_logger.logger.warning('[邮件通知]当前项目project_id=%s, 未获取到邮件通知开关数据' % project_id)
else:
if project_advanced_configuration.email_notify:
email_receiver_settings = EmailReceiverSetting.get_project_email_receiver_setting(project_id=project_id)
addresses = [row.address for row in email_receiver_settings if row.enable]
self.dispatcher_logger.logger.info('[邮件通知]收件人列表: ' + str(addresses))
try:
send_email_report(addresses=addresses, report_data=report_data)
except Exception as e:
self.dispatcher_logger.logger.error('[邮件通知][发送错误]错误信息:\n' + traceback.format_exc())
else:
self.dispatcher_logger.logger.info('[邮件通知]邮件通知开关关闭,不进行邮件通知')
self.dispatcher_logger.logger.info('[邮件通知][结束]')
def _clear_http_cookie(self):
"""清理http请求的cookie数据"""
project_id = session.get('project_id')
if project_id:
# 检查项目clear_http_cookie数据开关
project_advanced_configuration = ProjectAdvancedConfiguration.query.filter_by(project_id=project_id).first()
if project_advanced_configuration:
if project_advanced_configuration.clear_http_cookie:
try:
HTTPCookiePoolManager.clear_cookie_pool(type=DISPATCHER_TYPE.BUILD)
except Exception as e:
self.dispatcher_logger.logger.warning(f'清理HTTPCookie数据失败错误信息:\n{traceback.format_exc()}')
else:
self.dispatcher_logger.logger.info('清理HTTPCookie数据成功')
else:
self.dispatcher_logger.logger.warning(f'判断是否清理HTTPCookie数据失败原因:未获取到项目id为{project_id}的配置数据')
else:
self.dispatcher_logger.logger.warning('判断是否清理HTTPCookie数据失败原因:当前session中为获取到project_id')
class AbstractCaseDispatcher(AbstractDispatcher, ABC):
def __init__(self, case, dispatcher_type=DISPATCHER_TYPE.DEBUG, logger=None, dispatcher=None):
"""
:param dispatcher_type: 标识构建是通过单独案例调试(DISPATCHER_TYPE.DEBUG)还是通过模块/项目构建测试(DISPATCHER_TYPE.BUILD)
:type dispatcher_type: str
"""
super().__init__(element=case, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
# 日志
if self.dispatcher_type == DISPATCHER_TYPE.DEBUG:
# 对于single_case触发的案例则单独生成构建日志
self.dispatcher_logger = DispatcherLogger(use_memory_string_handler=True, use_queue_handler=False)
self.dispatcher_logger.clear_string_buffer()
# 案例组件
self.case = case
self.dispatcher_logger.logger.info('[执行案例开始] ==> [案例名称:%s][案例类型:%s]' % (self.case.name, self.case.case_type))
@abstractmethod
def _load_data(self):
"""加载并解析本次案例组件执行需要用到的数据"""
pass
@abstractmethod
def execute(self):
"""执行"""
self._load_data()
def clean(self):
self.dispatcher_logger.logger.info('[案例执行结束] ==> [案例名称:%s][案例类型:%s]' % (self.case.name, self.case.case_type))
super().clean()
def run(self):
try:
self.dispatcher_logger.logger.info('[测试案例]执行阶段[set_up]')
self.set_up()
self.dispatcher_logger.logger.info('[测试案例]执行阶段[execute]')
self.execute()
self.dispatcher_logger.logger.info('[测试案例]执行阶段[tear_down]')
self.tear_down()
if self.dispatcher_type == DISPATCHER_TYPE.BUILD:
self.check_dispatcher_status_and_stop()
except Exception as e:
self.exception_handler(e)
finally:
self.dispatcher_logger.logger.info('[测试案例]执行阶段[clean]')
self.clean()
class AbstractToolDispatcher(AbstractDispatcher, ABC):
def __init__(self, tool, dispatcher_type=DISPATCHER_TYPE.DEBUG, logger=None, dispatcher=None):
"""
:param dispatcher_type: 标识构建是通过单独案例调试(DISPATCHER_TYPE.DEBUG)还是通过模块/项目构建测试(DISPATCHER_TYPE.BUILD)
:type dispatcher_type: str
"""
super().__init__(element=tool, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
# 工具组件
self.tool = tool
# 日志
if self.dispatcher_type == DISPATCHER_TYPE.DEBUG:
# 对于single_case触发的案例则单独生成构建日志
self.dispatcher_logger = DispatcherLogger(use_memory_string_handler=True, use_queue_handler=False)
self.dispatcher_logger.clear_string_buffer()
def run(self):
try:
self.set_up()
self.execute()
self.tear_down()
if self.dispatcher_type == DISPATCHER_TYPE.BUILD:
self.check_dispatcher_status_and_stop()
except Exception as e:
self.exception_handler(e)
finally:
self.clean()
class AbstractLogicControllerDispatcher(AbstractDispatcher, ABC):
def __init__(self, logic_controller, recursive_func, dispatcher_type=DISPATCHER_TYPE.BUILD, logger=None, dispatcher=None):
"""
:param dispatcher_type: 标识构建是通过单独案例调试(DISPATCHER_TYPE.DEBUG)还是通过模块/项目构建测试(DISPATCHER_TYPE.BUILD)
:type dispatcher_type: str
"""
super().__init__(element=logic_controller, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
# 逻辑控制器递归执行函数
self.recursive_func = recursive_func
# 逻辑控制器组件
self.logic_controller = logic_controller
def run(self):
try:
self.set_up()
self.execute()
self.tear_down()
if self.dispatcher_type == DISPATCHER_TYPE.BUILD:
self.check_dispatcher_status_and_stop()
except Exception as e:
self.exception_handler(e)
finally:
self.clean()
class AbstractSceneDispatcher(AbstractDispatcher, ABC):
def run(self):
try:
self.dispatcher_logger.logger.info('[测试场景]执行阶段[set_up]')
self.set_up()
self.dispatcher_logger.logger.info('[测试场景]执行阶段[execute]')
self.execute()
self.dispatcher_logger.logger.info('[测试场景]执行阶段[tear_down]')
self.tear_down()
self.check_dispatcher_status_and_stop()
except Exception as e:
self.exception_handler(e)
finally:
self.dispatcher_logger.logger.info('[测试场景]执行阶段[clean]')
self.clean()
class AbstractModuleDispatcher(AbstractDispatcher, ABC):
def run(self):
try:
self.dispatcher_logger.logger.info('[测试模块]执行阶段[set_up]')
self.set_up()
self.dispatcher_logger.logger.info('[测试模块]执行阶段[execute]')
self.execute()
self.dispatcher_logger.logger.info('[测试模块]执行阶段[tear_down]')
self.tear_down()
self.check_dispatcher_status_and_stop()
except Exception as e:
self.exception_handler(e)
finally:
self.dispatcher_logger.logger.info('[测试模块]执行阶段[clean]')
self.clean()
class AbstractProjectDispatcher(AbstractDispatcher, ABC):
def run(self):
try:
self.dispatcher_logger.logger.info('[测试项目]执行阶段[set_up]')
self.set_up()
self.dispatcher_logger.logger.info('[测试项目]执行阶段[execute]')
self.execute()
self.dispatcher_logger.logger.info('[测试项目]执行阶段[tear_down]')
self.tear_down()
self.check_dispatcher_status_and_stop()
except Exception as e:
self.exception_handler(e)
finally:
self.dispatcher_logger.logger.info('[测试项目]执行阶段[clean]')
self.clean()
class ProjectDispatcher(AbstractProjectDispatcher):
def __init__(self, project_id):
project = Project.query.filter_by(id=project_id).first()
super().__init__(element=project, logger=None, dispatcher=None, dispatcher_type=DISPATCHER_TYPE.BUILD)
# 项目组件
self.project = project
self.dispatcher_logger.logger.info(('+'*20 + ' [项目测试开始] ==> [项目名称:%s] ' + '+'*20) % self.project.name)
def set_up(self):
super().set_up()
def execute(self):
# 获取到当前项目下所有模块数据)
modules = self.project.modules
for module in sort_by_order_in_project(modules):
if module.status in [STATUS.NORMAL]:
ModuleDispatcher(module_id=module.id, logger=self.dispatcher_logger, dispatcher=self.dispatcher,
dispatcher_type=self.dispatcher_type).run()
def tear_down(self):
super().tear_down()
def clean(self):
self.dispatcher_logger.logger.info(('+'*20 + ' [项目测试结束] ==> [项目名称:%s] ' + '+'*20) % self.project.name)
super().clean()
class ModuleDispatcher(AbstractModuleDispatcher):
def __init__(self, module_id, logger=None, dispatcher=None, dispatcher_type=None):
module = Module.query.filter_by(id=module_id).first()
super().__init__(element=module, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
# 模块组件
self.module = module
self.dispatcher_logger.logger.info(('='*15 + ' [模块测试开始] ==> [模块名称:%s] ' + '='*15) % self.module.name)
def set_up(self):
super().set_up()
def execute(self):
# 获取到当前模块下所有场景数据)
scenes = self.module.scenes
for scene in sort_by_order_in_module(scenes):
if scene.status in [STATUS.NORMAL]:
SceneDispatcher(scene=scene, logger=self.dispatcher_logger, dispatcher=self.dispatcher,
dispatcher_type=self.dispatcher_type).run()
def tear_down(self):
super().tear_down()
def clean(self):
self.dispatcher_logger.logger.info(('='*15 + ' [模块测试结束] ==> [模块名称:%s] ' + '='*15) % self.module.name)
super().clean()
class SceneDispatcher(AbstractSceneDispatcher):
def __init__(self, scene, logger=None, dispatcher=None, dispatcher_type=None):
super().__init__(element=scene, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
# 场景组件
self.scene = scene
# 标识组件执行结果
self.element_execute_result = ''
self.dispatcher_logger.logger.info(('#'*5 + ' [场景测试开始] ==> [场景名称:%s] ' + '#'*5) % self.scene.name)
@contextlib.contextmanager
def _element_execute_context(self):
"""
提供组件执行上下文处理器
当组件执行出现异常时统一处理
"""
try:
yield
except Exception as e:
self.dispatcher_logger.logger.error(traceback.format_exc())
# 如果是手动中止则直接抛出异常
if isinstance(e, ManualStopException):
self.element_execute_result = REPORT_RESULT.ABORT
raise e
else:
self.element_execute_result = REPORT_RESULT.ERROR
# 如果配置了执行错误时不终止,则不向上抛出异常
if self.dispatcher.stop_on_error:
raise e
else:
pass
else:
pass
finally:
pass
def set_up(self):
super().set_up()
def execute(self):
# 场景控制器直接执行该控制器下所有组件
self._recursive_execute_logic_controller(
logic_controller_id=self.scene.scene_controller.logic_controller.id,
parent_dispatcher_detail_id=self.dispatcher_detail.id
)
def tear_down(self):
super().tear_down()
def clean(self):
self.dispatcher_logger.logger.info(('#' * 5 + ' [场景测试结束] ==> [场景名称:%s] ' + '#' * 5) % self.scene.name)
super().clean()
def _recursive_execute_logic_controller(self, logic_controller_id, parent_dispatcher_detail_id):
"""
存在逻辑控制器嵌套,使用递归方式执行
:param logic_controller_id: 逻辑控制器id
:param parent_dispatcher_detail_id: 当前调度所属父调度id是为了写入DispatcherDetail.parent_dispatcher_detail_id表中作为层级关系
"""
for sub_element_in_logic_controller in sort_by_order_in_logic_controller(logic_controller_id):
# 执行案例组件
if sub_element_in_logic_controller.element_type == ELEMENT_TYPE.CASE:
case = Case.query.filter_by(id=sub_element_in_logic_controller.element_id).first()
if case is None:
raise DispatcherException('未在表 %s 中查到案例, Case.id=%s' %
('Case', sub_element_in_logic_controller.element_id))
self._execute_case(case=case, parent_dispatcher_detail_id=parent_dispatcher_detail_id)
# 执行工具组件
elif sub_element_in_logic_controller.element_type == ELEMENT_TYPE.TOOL:
tool = Tool.query.filter_by(id=sub_element_in_logic_controller.element_id).first()
if tool is None:
raise DispatcherException('未在表 %s 中查到逻辑控制器, Tool.id=%s' %
('Tool', sub_element_in_logic_controller.element_id))
self._execute_tool(tool=tool, parent_dispatcher_detail_id=parent_dispatcher_detail_id)
# 执行逻辑控制器组件(递归执行)
elif sub_element_in_logic_controller.element_type == ELEMENT_TYPE.LOGIC_CONTROLLER:
logic_controller = LogicController.query.filter_by(id=sub_element_in_logic_controller.element_id).first()
if logic_controller is None:
raise DispatcherException('未在表 %s 中查到逻辑控制器, LogicController.id=%s' %
('LogicController', sub_element_in_logic_controller.element_id))
self._exec_logic_controller(logic_controller=logic_controller,
parent_dispatcher_detail_id=parent_dispatcher_detail_id)
def _execute_case(self, case, parent_dispatcher_detail_id):
"""
执行一个案例组件
:param case: 案例
:type case: Case
:param parent_dispatcher_detail_id: 当前案例调度所属的父调度id
:type parent_dispatcher_detail_id: int
"""
if case.status in [STATUS.NORMAL] and case.case_type in [CASE_TYPE.HTTP, CASE_TYPE.SSH, CASE_TYPE.SQL,
CASE_TYPE.DEBUG]:
# ReportCaseData表数据字段
report_id = self.dispatcher.report.id
case_type = case.case_type
case_id = case.id
module_id = case.scene.module.id
module_name = case.scene.module.name
expectation_logic = case.specific_case.expectation_logic
postprocessor_script = case.specific_case.postprocessor_script
preprocessor_script = case.specific_case.preprocessor_script
request_header = ''
request_body = ''
response_header = ''
response_body = ''
postprocessor_result = False
postprocessor_failure_message = ''
self.element_execute_result = ''
expectations = []
elapsed_time = 0
# 案例调度对象
case_dispatcher = None
try:
with self._element_execute_context():
if case.case_type == CASE_TYPE.HTTP:
from app.cores.case.http.dispatcher import HTTPCaseDispatcher
case_dispatcher = HTTPCaseDispatcher(
case=case,
dispatcher_type=self.dispatcher_type,
logger=self.dispatcher_logger,
dispatcher=self.dispatcher
)
elif case.case_type == CASE_TYPE.SSH:
from app.cores.case.ssh.dispatcher import SSHCaseDispatcher
case_dispatcher = SSHCaseDispatcher(
case=case,
dispatcher_type=self.dispatcher_type,
logger=self.dispatcher_logger,
dispatcher=self.dispatcher
)
elif case.case_type == CASE_TYPE.SQL:
from app.cores.case.sql.dispatcher import SQLCaseDispatcher
case_dispatcher = SQLCaseDispatcher(
case=case,
dispatcher_type=self.dispatcher_type,
logger=self.dispatcher_logger,
dispatcher=self.dispatcher
)
elif case.case_type == CASE_TYPE.DEBUG:
from app.cores.case.debug.dispatcher import DebugCaseDispatcher
case_dispatcher = DebugCaseDispatcher(
case=case,
dispatcher_type=self.dispatcher_type,
logger=self.dispatcher_logger,
dispatcher=self.dispatcher
)
result = case_dispatcher.run()
# 获取报告需要的数据
request_ = result.get('request_')
request_header = json.dumps(request_.request_headers, ensure_ascii=False)
request_body = json.dumps(request_.request_body, ensure_ascii=False)
response_header = json.dumps(request_.response_headers, ensure_ascii=False)
response_body = json.dumps(request_.response_body, ensure_ascii=False, default=str) if isinstance(
request_.response_body, dict) else request_.response_body
postprocessor_result = not bool(result.get('postprocessor_failure'))
postprocessor_failure_message = result.get('postprocessor_failure_message')
self.element_execute_result = result.get('result', REPORT_RESULT.FAILURE)
expectations = result.get('expectations', [])
elapsed_time = result.get('elapsed_time', 0)
finally: # 即使try块中出现异常也要执行finally块中代码
emit_dispatcher_result( # 通知客户端
id=case.id,
type=ELEMENT_TYPE.CASE,
result=self.element_execute_result,
)
# 为案例增加调度子数据
dispatcher_detail_id = DispatcherDetail.add(element_type=ELEMENT_TYPE.CASE, element_id=case.id,
element_name=case.name, dispatcher=self.dispatcher,
parent_dispatcher_detail_id=parent_dispatcher_detail_id).id
# 案例增加报告数据
report_case_data = ReportCaseData.add(
report_id=report_id,
dispatcher_detail_id=dispatcher_detail_id,
case_type=case_type,
case_id=case_id,
module_id=module_id,
module_name=module_name,
request_header=request_header,
request_body=request_body,
response_header=response_header,
response_body=response_body,
preprocessor_script=preprocessor_script,
postprocessor_script=postprocessor_script,
postprocessor_result=postprocessor_result,
postprocessor_failure_message=postprocessor_failure_message,
log=case_dispatcher.dispatcher_logger.get_string_buffer() if case_dispatcher is not None else '',
expectation_logic=expectation_logic,
result=self.element_execute_result,
elapsed_time=elapsed_time,
)
if expectations:
for expectation in expectations:
ReportCaseExpectationData.add(
report_case_data_id=report_case_data.id,
test_field=expectation.test_field,
value=expectation.value,
matching_rule=expectation.matching_rule,
negater=expectation.negater,
result=expectation.last_result,
failure_msg=expectation.last_failure_msg,
)
def _execute_tool(self, tool, parent_dispatcher_detail_id):
"""
执行工具
:param tool: 工具
:type tool: Tool
:param parent_dispatcher_detail_id: 当前工具所属的父调度id
:type parent_dispatcher_detail_id: int
"""
if tool.status in [STATUS.NORMAL] and tool.tool_type in [TOOL_TYPE.TIMER,
TOOL_TYPE.SCRIPT,
TOOL_TYPE.VARIABLE_DEFINITION,
TOOL_TYPE.HTTP_HEADER_MANAGER,
TOOL_TYPE.HTTP_COOKIE_MANAGER]:
# ReportToolData表数据字段
report_id = self.dispatcher.report.id
try:
with self._element_execute_context():
if tool.tool_type == TOOL_TYPE.TIMER:
from app.cores.tool.dispatcher import TimerToolDispatcher
TimerToolDispatcher(tool=tool, dispatcher_type=DISPATCHER_TYPE.BUILD,
logger=self.dispatcher_logger, dispatcher=self.dispatcher).run()
elif tool.tool_type == TOOL_TYPE.SCRIPT:
from app.cores.tool.dispatcher import ScriptToolDispatcher
ScriptToolDispatcher(tool=tool, dispatcher_type=DISPATCHER_TYPE.BUILD,
logger=self.dispatcher_logger, dispatcher=self.dispatcher).run()
elif tool.tool_type == TOOL_TYPE.VARIABLE_DEFINITION:
from app.cores.tool.dispatcher import VariableDefinitionToolDispatcher
VariableDefinitionToolDispatcher(tool=tool, dispatcher_type=DISPATCHER_TYPE.BUILD,
logger=self.dispatcher_logger, dispatcher=self.dispatcher).run()
elif tool.tool_type == TOOL_TYPE.HTTP_HEADER_MANAGER:
from app.cores.tool.dispatcher import HTTPHeaderManagerToolDispatcher
HTTPHeaderManagerToolDispatcher(tool=tool, dispatcher_type=DISPATCHER_TYPE.BUILD,
logger=self.dispatcher_logger,
dispatcher=self.dispatcher).run()
elif tool.tool_type == TOOL_TYPE.HTTP_COOKIE_MANAGER:
from app.cores.tool.dispatcher import HTTPCookieManagerToolDispatcher
HTTPCookieManagerToolDispatcher(tool=tool, dispatcher_type=DISPATCHER_TYPE.BUILD,
logger=self.dispatcher_logger,
dispatcher=self.dispatcher).run()
finally:
# 工具增加调度子数据
dispatcher_detail_id = DispatcherDetail.add(element_type=ELEMENT_TYPE.TOOL, element_id=tool.id,
element_name=tool.name, dispatcher=self.dispatcher,
parent_dispatcher_detail_id=parent_dispatcher_detail_id).id
# 工具增加报告数据
ReportToolData.add(tool=tool, report_id=report_id, dispatcher_detail_id=dispatcher_detail_id)
def _exec_logic_controller(self, logic_controller, parent_dispatcher_detail_id):
"""
执行逻辑控制器
:param logic_controller: 逻辑控制器
:type logic_controller: LogicController
:param parent_dispatcher_detail_id: 当前逻辑控制器所属的父调度id
:type parent_dispatcher_detail_id: int
"""
if logic_controller.status == STATUS.NORMAL:
# 给逻辑控制器加上调度详细数据,供报告中展示该逻辑控制器节点
logic_controller_dispatcher_detail_id = DispatcherDetail.add(
element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
element_id=logic_controller.id,
element_name=logic_controller.name,
dispatcher=self.dispatcher,
parent_dispatcher_detail_id=parent_dispatcher_detail_id).id
# 递归执行函数
recursive_func = partial(self._recursive_execute_logic_controller,
parent_dispatcher_detail_id=logic_controller_dispatcher_detail_id)
with self._element_execute_context():
if logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.IF_CONTROLLER:
from app.cores.logic_controller.dispatcher import IfControllerDispatcher
IfControllerDispatcher(logic_controller=logic_controller, recursive_func=recursive_func,
dispatcher_type=DISPATCHER_TYPE.BUILD, logger=self.dispatcher_logger,
dispatcher=self.dispatcher).run()
elif logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.WHILE_CONTROLLER:
from app.cores.logic_controller.dispatcher import WhileControllerDispatcher
WhileControllerDispatcher(logic_controller=logic_controller, recursive_func=recursive_func,
dispatcher_type=DISPATCHER_TYPE.BUILD, logger=self.dispatcher_logger,
dispatcher=self.dispatcher).run()
elif logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.LOOP_CONTROLLER:
from app.cores.logic_controller.dispatcher import LoopControllerDispatcher
LoopControllerDispatcher(logic_controller=logic_controller, recursive_func=recursive_func,
dispatcher_type=DISPATCHER_TYPE.BUILD, logger=self.dispatcher_logger,
dispatcher=self.dispatcher).run()
elif logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.SIMPLE_CONTROLLER:
from app.cores.logic_controller.dispatcher import SimpleControllerDispatcher
SimpleControllerDispatcher(logic_controller=logic_controller, recursive_func=recursive_func,
dispatcher_type=DISPATCHER_TYPE.BUILD, logger=self.dispatcher_logger,
dispatcher=self.dispatcher).run()
def async_module_run(module_id):
"""
异步执行模块测试
"""
def run(app, request, session):
try:
with app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
# 将主线程请求上下文栈中的request和session放入子线程的请求上下文栈顶
_request_ctx_stack.top.request = request
session['dispatcher_trigger_type'] = DISPATCHER_TRIGGER_TYPE.BY_HAND # 调度触发类型
_request_ctx_stack.top.session = session
ModuleDispatcher(module_id=module_id, dispatcher_type=DISPATCHER_TYPE.BUILD).run()
except Exception as e:
app.logger.error(traceback.format_exc())
executor = ThreadPoolExecutor(1)
executor.submit(run, current_app._get_current_object(), request._get_current_object(), session._get_current_object())
# d = DispatcherThread()
# from threading import Thread
# t = Thread(target=d.run, args=(module_id,))
# t.start()
def apscheduler_async_module_run(module_id, app):
"""定时任务触发"""
try:
with app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
session['dispatcher_trigger_type'] = DISPATCHER_TRIGGER_TYPE.BY_SCHEDULE # 调度触发类型
ModuleDispatcher(module_id=module_id, dispatcher_type=DISPATCHER_TYPE.BUILD).run()
except Exception as e:
app.logger.error(traceback.format_exc())
# class DispatcherThread:
# def __init__(self):
# self.app = current_app._get_current_object()
# self.request = request._get_current_object()
# self.session = session._get_current_object()
#
# def run(self, module_id):
# try:
# with self.app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
# # 将主线程请求上下文栈中的request和session放入子线程的请求上下文栈顶
# _request_ctx_stack.top.request = self.request
# _request_ctx_stack.top.session = self.session
# ModuleDispatcher(module_id=module_id).run()
# except Exception as e:
# print(traceback.format_exc())
#
def async_project_run(project_id):
"""
异步执行项目测试
"""
def run(app, request, session):
try:
with app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
# 将主线程请求上下文栈中的request和session放入子线程的请求上下文栈顶
_request_ctx_stack.top.request = request
session['dispatcher_trigger_type'] = DISPATCHER_TRIGGER_TYPE.BY_HAND # 调度触发类型
_request_ctx_stack.top.session = session
ProjectDispatcher(project_id=project_id).run()
except Exception as e:
app.logger.error(traceback.format_exc())
executor = ThreadPoolExecutor(1)
executor.submit(run, current_app._get_current_object(), request._get_current_object(), session._get_current_object())
def apscheduler_async_project_run(project_id, app):
"""定时任务触发"""
try:
with app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
session['dispatcher_trigger_type'] = DISPATCHER_TRIGGER_TYPE.BY_SCHEDULE # 调度触发类型
ProjectDispatcher(project_id=project_id).run()
except Exception as e:
app.logger.error(traceback.format_exc())