1018 lines
52 KiB
Python
1018 lines
52 KiB
Python
# coding=utf-8
|
||
|
||
from flask import current_app, _request_ctx_stack
|
||
from abc import ABC, abstractmethod
|
||
from flask import session, request
|
||
from typing import Optional, Union
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import traceback
|
||
import json
|
||
from functools import partial
|
||
import contextlib
|
||
|
||
from app.template_global import sort_by_order_in_module, sort_by_order_in_project, sort_by_order_in_logic_controller
|
||
from app.extensions import http_cookie_manager
|
||
from app.cores.dictionaries import (ELEMENT_TYPE, STATUS, CASE_TYPE, DISPATCHER_STATUS, DISPATCHER_TYPE,
|
||
DISPATCHER_END_TYPE, REPORT_RESULT, TOOL_TYPE, LOGIC_CONTROLLER_TYPE,
|
||
DISPATCHER_TRIGGER_TYPE)
|
||
from app.cores.logger import DispatcherLogger
|
||
from app.models import (Case, Scene, Module, Project, Dispatcher, DispatcherDetail, Report, ReportCaseData,
|
||
ReportCaseExpectationData, LogicController, DingTalkRobotSetting, Tool, ReportToolData,
|
||
EmailReceiverSetting, ProjectAdvancedConfiguration, SubElementInLogicController)
|
||
from app.cores.ws import emit_dispatcher_result, emit_dispatcher_end, emit_dispatcher_start
|
||
from app.cores.exceptions import *
|
||
from app.cores import dingtalk
|
||
from app.cores.email import send_email_report
|
||
|
||
|
||
class AbstractDispatcher(ABC):
|
||
def __init__(self, element, logger=None, dispatcher=None, dispatcher_type=None):
|
||
"""
|
||
:param element: 构建元素
|
||
:type element: Union[Case, Scene, Module, Project]
|
||
:param logger: 调度日志
|
||
:type logger: Optional[DispatcherLogger]
|
||
:param dispatcher: 调度数据
|
||
:type dispatcher: Optional[Dispatcher]
|
||
:param dispatcher_type: 标识构建是通过单独案例调试(DISPATCHER_TYPE.DEBUG)还是通过模块/项目构建测试(DISPATCHER_TYPE.BUILD)
|
||
:type dispatcher_type: str
|
||
"""
|
||
# 调度数据对象
|
||
self.dispatcher = dispatcher
|
||
|
||
# 调度类型
|
||
self.dispatcher_type = dispatcher_type
|
||
|
||
# 调度日志
|
||
self.dispatcher_logger = logger
|
||
|
||
# 标识触发调度开始的元素
|
||
# 对于参与调度的元素,默认将触发标志置为False
|
||
self.element_is_dispatcher_trigger = False
|
||
|
||
# 触发调度元素类型 PROJECT or MODULE
|
||
self.trigger_element_type = None
|
||
|
||
# 当前组件类型
|
||
self.element_type = None
|
||
|
||
# 当前组件调度详细数据对象
|
||
self.dispatcher_detail = None
|
||
|
||
# 调试模式直接返回退出__init__
|
||
if self.dispatcher_type == DISPATCHER_TYPE.DEBUG:
|
||
# 标记当前执行案例所属的project_id,执行结束后会删除掉该session
|
||
# 用于在app.cores.parser.new_parse_data 中方便获取指定项目的变量池
|
||
# 定时任务可能存在并发执行的情况,而调度任务是在单独的子线程中执行,因此即使多个调度任务并发执行对session进行操作互不影响
|
||
# 对于debug模式执行时,由于Flask本身对于每次请求都是不同线程且不同session因此也不会相互影响
|
||
if isinstance(element, Tool):
|
||
scene_controller = SubElementInLogicController.get_scene_controller_by_element(element_id=element.id,
|
||
element_type=ELEMENT_TYPE.TOOL)
|
||
project_id = scene_controller.scene.module.project.id
|
||
session['project_id'] = project_id
|
||
elif isinstance(element, Case):
|
||
project_id = element.scene.module.project.id
|
||
session['project_id'] = project_id
|
||
return
|
||
# 下面是对参与调度的元素进行处理
|
||
if isinstance(element, Project):
|
||
self.element_type = ELEMENT_TYPE.PROJECT
|
||
project_id = element.id
|
||
elif isinstance(element, Module):
|
||
self.element_type = ELEMENT_TYPE.MODULE
|
||
project_id = element.project.id
|
||
elif isinstance(element, Scene):
|
||
self.element_type = ELEMENT_TYPE.SCENE
|
||
project_id = element.module.project.id
|
||
elif isinstance(element, Tool):
|
||
self.element_type = ELEMENT_TYPE.TOOL
|
||
project_id = None # 根据tool.id较难找到其project_id
|
||
elif isinstance(element, Case):
|
||
self.element_type = ELEMENT_TYPE.CASE
|
||
project_id = element.scene.module.project.id
|
||
elif isinstance(element, LogicController):
|
||
self.element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
|
||
project_id = None # 根据logic_controller.id较难找到其project_id
|
||
else:
|
||
raise TypeError('调度构建暂未支持该类型组件,element_type=%s' % (type(element)))
|
||
|
||
# 只有调度触发元素会执行下面if语句块内的代码
|
||
if (self.dispatcher_logger is None or self.dispatcher is None) and self.element_type in [ELEMENT_TYPE.PROJECT, ELEMENT_TYPE.MODULE]:
|
||
# 标记当前执行案例所属的project_id,调度结束后会删除掉该session
|
||
# 用于在app.cores.parser.new_parse_data 中方便获取指定项目的变量池
|
||
# 定时任务可能存在并发执行的情况,而调度任务是在单独的子线程中执行,因此即使多个调度任务并发执行对session进行操作互不影响
|
||
# 对于debug模式执行时,由于Flask本身对于每次请求都是不同线程且不同session因此也不会相互影响
|
||
session['project_id'] = project_id
|
||
self.element_is_dispatcher_trigger = True
|
||
self.trigger_element_type = self.element_type
|
||
# 如果当前element是触发者(通过项目/模块开始构建测试),则创建一条调度数据, 并实例化调度日志dispatcher_logger
|
||
self.dispatcher = Dispatcher.add(element_type=self.trigger_element_type, element_id=element.id)
|
||
self.dispatcher.update_status(status=DISPATCHER_STATUS.RUNNING)
|
||
self.dispatcher.end_type = DISPATCHER_END_TYPE.SUCCESS # 标识调度终止类型
|
||
# 实例化调度日志dispatcher_logger
|
||
self.dispatcher_logger = DispatcherLogger(use_memory_string_handler=True,
|
||
use_dispatcher_log_db_handler=True,
|
||
dispatcher_id=self.dispatcher.id)
|
||
# 调度报告
|
||
report = Report.add(name=element.name, result=REPORT_RESULT.RUNNING, dispatcher_id=self.dispatcher.id)
|
||
# 发送调度执行开始信息
|
||
emit_dispatcher_start(id=self.dispatcher.id, type=self.trigger_element_type, report_id=report.id)
|
||
# 调度预处理
|
||
self._dispatcher_set_up()
|
||
# 创建一条调度子数据
|
||
if self.element_type in [ELEMENT_TYPE.PROJECT, ELEMENT_TYPE.MODULE, ELEMENT_TYPE.SCENE]:
|
||
# 元素类型为Project、Module、Scene时会在构造函数中直接创建调度子数据。而元素类型Case/Tool/LogicController是在Scene.execute()方法中创建
|
||
self.dispatcher_detail = DispatcherDetail.add(element_type=self.element_type, element_id=element.id,
|
||
element_name=element.name, dispatcher=self.dispatcher)
|
||
|
||
@abstractmethod
|
||
def set_up(self):
|
||
"""执行前"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def execute(self):
|
||
"""执行"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def tear_down(self):
|
||
"""执行后"""
|
||
pass
|
||
|
||
def clean(self):
|
||
# 调试任务最后步骤
|
||
if self.dispatcher_type == DISPATCHER_TYPE.DEBUG:
|
||
# 清理日志
|
||
self.dispatcher_logger.close()
|
||
# 清除当前请求session中project_id
|
||
session.pop('project_id')
|
||
# 调度任务最后步骤
|
||
elif self.dispatcher_type == DISPATCHER_TYPE.BUILD and self.element_is_dispatcher_trigger:
|
||
# 更新结束日期
|
||
self.dispatcher.update_end_time()
|
||
# 报告分析
|
||
report_data = self._analyse_report_data(report=self.dispatcher.report)
|
||
# 发送钉钉通知
|
||
self._ding_talk_send_message(report_data=report_data)
|
||
# 发送邮件通知
|
||
self._email_send_summary_report(report_data=report_data)
|
||
# 清理日志
|
||
self.dispatcher_logger.close()
|
||
# 发送调度执行结束信息
|
||
emit_dispatcher_end(id=self.dispatcher.id,
|
||
type=self.trigger_element_type,
|
||
end_type=self.dispatcher.end_type)
|
||
# 清除当前请求session中project_id
|
||
session.pop('project_id')
|
||
# 置状态为已完成
|
||
self.dispatcher.update_status(status=DISPATCHER_STATUS.FINISHED)
|
||
|
||
def run(self):
|
||
try:
|
||
self.set_up()
|
||
self.execute()
|
||
self.tear_down()
|
||
self.check_dispatcher_status_and_stop()
|
||
finally:
|
||
self.clean()
|
||
|
||
def exception_handler(self, e):
|
||
"""
|
||
异常处理
|
||
:param e: 异常对象
|
||
:type e: Exception
|
||
"""
|
||
if isinstance(e, ManualStopException):
|
||
if self.dispatcher_type != DISPATCHER_TYPE.DEBUG:
|
||
self.dispatcher.end_type = DISPATCHER_END_TYPE.ABORT # 标识调度终止类型
|
||
self.dispatcher.update_status(status=DISPATCHER_STATUS.STOPPED)
|
||
raise e
|
||
else:
|
||
self.dispatcher_logger.logger.error('执行异常: %s' % e)
|
||
self.dispatcher_logger.logger.error(traceback.format_exc())
|
||
if self.dispatcher_type != DISPATCHER_TYPE.DEBUG:
|
||
self.dispatcher.update_status(status=DISPATCHER_STATUS.STOPPED)
|
||
self.dispatcher.end_type = DISPATCHER_END_TYPE.ERROR # 标识调度终止类型
|
||
raise e
|
||
|
||
def check_dispatcher_status_and_stop(self):
|
||
"""检查调度状态,如果是正在停止状态则终止执行"""
|
||
if self.dispatcher.status == DISPATCHER_STATUS.STOPPING:
|
||
msg = '检测到调度id:%s的状态为%s, 将终止运行' % (self.dispatcher.id, self.dispatcher.status)
|
||
self.dispatcher_logger.logger.warning(msg)
|
||
raise ManualStopException(msg)
|
||
|
||
def _dispatcher_set_up(self):
|
||
"""调度预处理,执行测试前执行"""
|
||
self.dispatcher_logger.logger.info('[执行预处理][开始]')
|
||
# 清理http请求的cookie数据
|
||
self._clear_http_cookie()
|
||
self.dispatcher_logger.logger.info('[执行预处理][结束]')
|
||
|
||
def _analyse_report_data(self, report):
|
||
"""
|
||
分析报告数据
|
||
:param report: 本次构建所报告
|
||
:type report: Report
|
||
:return: 案例执行结果
|
||
:rtype: dict
|
||
"""
|
||
self.dispatcher_logger.logger.info('[报告分析][开始]')
|
||
report_case_data = report.report_case_data
|
||
case_count = 0
|
||
success_count = 0
|
||
failure_count = 0
|
||
error_count = 0
|
||
skip_count = 0
|
||
report_result = ''
|
||
start_time = 0
|
||
end_time = 0
|
||
elapsed_time = 0
|
||
dispatcher_id = 0
|
||
report_id = 0
|
||
report_name = ''
|
||
trigger_element_type = ''
|
||
dispatcher_end_type = ''
|
||
dispatcher_trigger_type = ''
|
||
module_results = []
|
||
try:
|
||
dispatcher_id = self.dispatcher.id
|
||
report_id = self.dispatcher.report.id
|
||
report_name = self.dispatcher.report.name + ' ' + str(self.dispatcher.start_time)
|
||
trigger_element_type = self.trigger_element_type
|
||
dispatcher_end_type = self.dispatcher.end_type
|
||
dispatcher_trigger_type = session.get('dispatcher_trigger_type', '')
|
||
# 按模块汇总结果
|
||
for data in report_case_data:
|
||
for module_result in module_results:
|
||
if module_result['module_id'] == data.module_id:
|
||
module_result['case_count'] += 1
|
||
module_result['success_count'] += 1 if data.result == REPORT_RESULT.SUCCESS else 0
|
||
module_result['failure_count'] += 1 if data.result == REPORT_RESULT.FAILURE else 0
|
||
module_result['error_count'] += 1 if data.result == REPORT_RESULT.ERROR else 0
|
||
module_result['skip_count'] += 1 if data.result == REPORT_RESULT.SKIP else 0
|
||
break
|
||
else: # 如果module_results中没有找到则插入
|
||
module_result = {
|
||
'module_id': data.module_id,
|
||
'module_name': data.module_name,
|
||
'case_count': 1,
|
||
'success_count': 1 if data.result == REPORT_RESULT.SUCCESS else 0,
|
||
'failure_count': 1 if data.result == REPORT_RESULT.FAILURE else 0,
|
||
'error_count': 1 if data.result == REPORT_RESULT.ERROR else 0,
|
||
'skip_count': 1 if data.result == REPORT_RESULT.SKIP else 0,
|
||
}
|
||
module_results.append(module_result)
|
||
# 分析本次构建所有案例执行结果
|
||
for data in report_case_data:
|
||
case_count += 1
|
||
if data.result == REPORT_RESULT.SUCCESS:
|
||
success_count += 1
|
||
elif data.result == REPORT_RESULT.FAILURE:
|
||
failure_count += 1
|
||
elif data.result == REPORT_RESULT.ERROR:
|
||
error_count += 1
|
||
elif data.result == REPORT_RESULT.SKIP:
|
||
skip_count += 1
|
||
if self.dispatcher.end_type == DISPATCHER_END_TYPE.ERROR:
|
||
report.update_result(REPORT_RESULT.ERROR)
|
||
report_result = REPORT_RESULT.ERROR
|
||
elif self.dispatcher.end_type == DISPATCHER_END_TYPE.ABORT:
|
||
report.update_result(REPORT_RESULT.ABORT)
|
||
report_result = REPORT_RESULT.ABORT
|
||
elif success_count + skip_count == case_count:
|
||
report.update_result(REPORT_RESULT.SUCCESS)
|
||
report_result = REPORT_RESULT.SUCCESS
|
||
else:
|
||
report.update_result(REPORT_RESULT.FAILURE)
|
||
report_result = REPORT_RESULT.FAILURE
|
||
self.dispatcher_logger.logger.info('[报告分析]本次共执行了%s个案例' % case_count)
|
||
start_time = report.dispatcher.start_time
|
||
end_time = report.dispatcher.end_time
|
||
elapsed_time = end_time - start_time
|
||
self.dispatcher_logger.logger.info('[报告分析]开始时间: %s, 结束时间: %s, 耗时: %s' % (start_time, end_time, elapsed_time))
|
||
self.dispatcher_logger.logger.info(
|
||
'[报告分析]成功:%s个, 失败:%s个, 错误%s个, 跳过%s个' % (success_count, failure_count, error_count, skip_count))
|
||
self.dispatcher_logger.logger.info('[报告分析][结束]')
|
||
except Exception as e:
|
||
report_result = REPORT_RESULT.ERROR
|
||
self.dispatcher_logger.logger.error('[报告分析][异常] %s' % traceback.format_exc())
|
||
return {
|
||
'case_count': case_count, # 总案例个数
|
||
'success_count': success_count, # 成功个数
|
||
'failure_count': failure_count, # 失败个数
|
||
'error_count': error_count, # 错误个数
|
||
'skip_count': skip_count, # 跳过个数
|
||
'report_result': report_result, # 结果
|
||
'start_time': start_time, # 调度开始时间
|
||
'end_time': end_time, # 调度结束时间
|
||
'elapsed_time': elapsed_time, # 耗时
|
||
'dispatcher_id': dispatcher_id, # 调度id
|
||
'report_id': report_id, # 报告id
|
||
'report_name': report_name, # 报告名称
|
||
'trigger_element_type': trigger_element_type, # 调度元素类型
|
||
'dispatcher_end_type': dispatcher_end_type, # 调度结束类型
|
||
'dispatcher_trigger_type': dispatcher_trigger_type, # 调度触发类型 手动/定时任务
|
||
'module_results': module_results, # 统计各模块案例执行情况
|
||
}
|
||
|
||
def _ding_talk_send_message(self, report_data):
|
||
"""
|
||
调度结束时发送钉钉通知
|
||
:param report_data: 案例执行结果数据
|
||
:type report_data: dict
|
||
"""
|
||
self.dispatcher_logger.logger.info('[钉钉通知][开始]')
|
||
project_id = session.get('project_id')
|
||
if project_id is None:
|
||
self.dispatcher_logger.logger.warning('[钉钉通知]当前调度获取项目编号project_id为None, 无法获取钉钉配置信息')
|
||
else:
|
||
text = """# 构建结果
|
||
调度编号: {dispatcher_id}
|
||
组件类型: {element_type}
|
||
执行结果: {report_result}
|
||
成功: {success_count} 失败: {failure_count} 错误: {error_count} 跳过: {skip_count}
|
||
开始时间: {start_time}
|
||
结束时间: {end_time}
|
||
耗时: {elapsed_time}
|
||
"""
|
||
data = {
|
||
"msgtype": "text",
|
||
"text": {
|
||
"title": "测试调度通知",
|
||
"content": text.format(
|
||
dispatcher_id=str(self.dispatcher.id),
|
||
element_type='模块' if self.dispatcher.element_type == ELEMENT_TYPE.MODULE else '项目',
|
||
report_result=str(report_data.get('report_result', '')),
|
||
success_count=str(report_data.get('success_count', '')),
|
||
failure_count=str(report_data.get('failure_count', '')),
|
||
error_count=str(report_data.get('error_count', '')),
|
||
skip_count=str(report_data.get('skip_count', '')),
|
||
start_time=str(report_data.get('start_time', '')),
|
||
end_time=str(report_data.get('end_time', '')),
|
||
elapsed_time=str(report_data.get('elapsed_time', '')),
|
||
)
|
||
},
|
||
}
|
||
# 检查项目钉钉通知开关
|
||
project_advanced_configuration = ProjectAdvancedConfiguration.query.filter_by(project_id=project_id).first()
|
||
if project_advanced_configuration is None:
|
||
self.dispatcher_logger.logger.warning('[钉钉通知]当前项目project_id=%s, 未获取到钉钉通知开关数据' % project_id)
|
||
else:
|
||
if project_advanced_configuration.ding_talk_notify:
|
||
ding_talk_robot_settings = DingTalkRobotSetting.get_project_ding_talk_robot_setting(project_id=project_id)
|
||
if not ding_talk_robot_settings:
|
||
self.dispatcher_logger.logger.warning('钉钉通知]当前项目project_id=%s, 未找到相关钉钉配置信息' % project_id)
|
||
else:
|
||
for ding_talk_robot_setting in ding_talk_robot_settings:
|
||
if ding_talk_robot_setting.enable:
|
||
self.dispatcher_logger.logger.info('[钉钉通知][发送消息]')
|
||
try:
|
||
response_content = dingtalk.send_message(data=data,
|
||
access_token=ding_talk_robot_setting.access_token,
|
||
secret=ding_talk_robot_setting.secret,
|
||
at_all=ding_talk_robot_setting.at_all,
|
||
at_mobiles=ding_talk_robot_setting.at_mobiles)
|
||
self.dispatcher_logger.logger.info('[钉钉通知][收到应答]: ' + str(response_content))
|
||
except Exception as e:
|
||
self.dispatcher_logger.logger.error('[钉钉通知][发送错误]: 错误信息:\n' + traceback.format_exc())
|
||
else:
|
||
self.dispatcher_logger.logger.info('[钉钉通知]钉钉通知开关关闭,不进行钉钉通知')
|
||
self.dispatcher_logger.logger.info('[钉钉通知][结束]')
|
||
|
||
def _email_send_summary_report(self, report_data):
|
||
"""
|
||
调度结束时发送邮件通知
|
||
:param report_data: 案例执行结果数据
|
||
:type report_data: dict
|
||
"""
|
||
self.dispatcher_logger.logger.info('[邮件通知][开始]')
|
||
project_id = session.get('project_id')
|
||
if project_id is None:
|
||
self.dispatcher_logger.logger.warning('[邮件通知]当前调度获取项目编号project_id为None, 无法获取邮件收件人配置信息')
|
||
else:
|
||
# 检查项目邮件通知开关
|
||
project_advanced_configuration = ProjectAdvancedConfiguration.query.filter_by(project_id=project_id).first()
|
||
if project_advanced_configuration is None:
|
||
self.dispatcher_logger.logger.warning('[邮件通知]当前项目project_id=%s, 未获取到邮件通知开关数据' % project_id)
|
||
else:
|
||
if project_advanced_configuration.email_notify:
|
||
email_receiver_settings = EmailReceiverSetting.get_project_email_receiver_setting(project_id=project_id)
|
||
addresses = [row.address for row in email_receiver_settings if row.enable]
|
||
self.dispatcher_logger.logger.info('[邮件通知]收件人列表: ' + str(addresses))
|
||
try:
|
||
send_email_report(addresses=addresses, report_data=report_data)
|
||
except Exception as e:
|
||
self.dispatcher_logger.logger.error('[邮件通知][发送错误]错误信息:\n' + traceback.format_exc())
|
||
else:
|
||
self.dispatcher_logger.logger.info('[邮件通知]邮件通知开关关闭,不进行邮件通知')
|
||
self.dispatcher_logger.logger.info('[邮件通知][结束]')
|
||
|
||
def _clear_http_cookie(self):
|
||
"""清理http请求的cookie数据"""
|
||
project_id = session.get('project_id')
|
||
if project_id:
|
||
# 检查项目clear_http_cookie数据开关
|
||
project_advanced_configuration = ProjectAdvancedConfiguration.query.filter_by(project_id=project_id).first()
|
||
if project_advanced_configuration:
|
||
if project_advanced_configuration.clear_http_cookie:
|
||
try:
|
||
http_cookie_manager.clear_cookie_pool(type=DISPATCHER_TYPE.BUILD)
|
||
except Exception as e:
|
||
self.dispatcher_logger.logger.warning(f'清理HTTPCookie数据失败,错误信息:\n{traceback.format_exc()}')
|
||
else:
|
||
self.dispatcher_logger.logger.info('清理HTTPCookie数据成功')
|
||
else:
|
||
self.dispatcher_logger.logger.warning(f'判断是否清理HTTPCookie数据失败,原因:未获取到项目id为{project_id}的配置数据')
|
||
else:
|
||
self.dispatcher_logger.logger.warning('判断是否清理HTTPCookie数据失败,原因:当前session中为获取到project_id')
|
||
|
||
|
||
class AbstractCaseDispatcher(AbstractDispatcher, ABC):
|
||
def __init__(self, case, dispatcher_type=DISPATCHER_TYPE.DEBUG, logger=None, dispatcher=None):
|
||
"""
|
||
:param dispatcher_type: 标识构建是通过单独案例调试(DISPATCHER_TYPE.DEBUG)还是通过模块/项目构建测试(DISPATCHER_TYPE.BUILD)
|
||
:type dispatcher_type: str
|
||
"""
|
||
super().__init__(element=case, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
|
||
|
||
# 日志
|
||
if self.dispatcher_type == DISPATCHER_TYPE.DEBUG:
|
||
# 对于single_case触发的案例,则单独生成构建日志
|
||
self.dispatcher_logger = DispatcherLogger(use_memory_string_handler=True, use_queue_handler=False)
|
||
self.dispatcher_logger.clear_string_buffer()
|
||
|
||
# 案例组件
|
||
self.case = case
|
||
|
||
self.dispatcher_logger.logger.info('[执行案例开始] ==> [案例名称:%s][案例类型:%s]' % (self.case.name, self.case.case_type))
|
||
|
||
@abstractmethod
|
||
def _load_data(self):
|
||
"""加载并解析本次案例组件执行需要用到的数据"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def execute(self):
|
||
"""执行"""
|
||
self._load_data()
|
||
|
||
def clean(self):
|
||
self.dispatcher_logger.logger.info('[案例执行结束] ==> [案例名称:%s][案例类型:%s]' % (self.case.name, self.case.case_type))
|
||
super().clean()
|
||
|
||
def run(self):
|
||
try:
|
||
self.dispatcher_logger.logger.info('[测试案例]执行阶段[set_up]')
|
||
self.set_up()
|
||
self.dispatcher_logger.logger.info('[测试案例]执行阶段[execute]')
|
||
self.execute()
|
||
self.dispatcher_logger.logger.info('[测试案例]执行阶段[tear_down]')
|
||
self.tear_down()
|
||
if self.dispatcher_type == DISPATCHER_TYPE.BUILD:
|
||
self.check_dispatcher_status_and_stop()
|
||
except Exception as e:
|
||
self.exception_handler(e)
|
||
finally:
|
||
self.dispatcher_logger.logger.info('[测试案例]执行阶段[clean]')
|
||
self.clean()
|
||
|
||
|
||
class AbstractToolDispatcher(AbstractDispatcher, ABC):
|
||
def __init__(self, tool, dispatcher_type=DISPATCHER_TYPE.DEBUG, logger=None, dispatcher=None):
|
||
"""
|
||
:param dispatcher_type: 标识构建是通过单独案例调试(DISPATCHER_TYPE.DEBUG)还是通过模块/项目构建测试(DISPATCHER_TYPE.BUILD)
|
||
:type dispatcher_type: str
|
||
"""
|
||
super().__init__(element=tool, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
|
||
|
||
# 工具组件
|
||
self.tool = tool
|
||
|
||
# 日志
|
||
if self.dispatcher_type == DISPATCHER_TYPE.DEBUG:
|
||
# 对于single_case触发的案例,则单独生成构建日志
|
||
self.dispatcher_logger = DispatcherLogger(use_memory_string_handler=True, use_queue_handler=False)
|
||
self.dispatcher_logger.clear_string_buffer()
|
||
|
||
def run(self):
|
||
try:
|
||
self.set_up()
|
||
self.execute()
|
||
self.tear_down()
|
||
if self.dispatcher_type == DISPATCHER_TYPE.BUILD:
|
||
self.check_dispatcher_status_and_stop()
|
||
except Exception as e:
|
||
self.exception_handler(e)
|
||
finally:
|
||
self.clean()
|
||
|
||
|
||
class AbstractLogicControllerDispatcher(AbstractDispatcher, ABC):
|
||
def __init__(self, logic_controller, recursive_func, dispatcher_type=DISPATCHER_TYPE.BUILD, logger=None, dispatcher=None):
|
||
"""
|
||
:param dispatcher_type: 标识构建是通过单独案例调试(DISPATCHER_TYPE.DEBUG)还是通过模块/项目构建测试(DISPATCHER_TYPE.BUILD)
|
||
:type dispatcher_type: str
|
||
"""
|
||
super().__init__(element=logic_controller, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
|
||
|
||
# 逻辑控制器递归执行函数
|
||
self.recursive_func = recursive_func
|
||
|
||
# 逻辑控制器组件
|
||
self.logic_controller = logic_controller
|
||
|
||
def run(self):
|
||
try:
|
||
self.set_up()
|
||
self.execute()
|
||
self.tear_down()
|
||
if self.dispatcher_type == DISPATCHER_TYPE.BUILD:
|
||
self.check_dispatcher_status_and_stop()
|
||
except Exception as e:
|
||
self.exception_handler(e)
|
||
finally:
|
||
self.clean()
|
||
|
||
|
||
class AbstractSceneDispatcher(AbstractDispatcher, ABC):
|
||
def run(self):
|
||
try:
|
||
self.dispatcher_logger.logger.info('[测试场景]执行阶段[set_up]')
|
||
self.set_up()
|
||
self.dispatcher_logger.logger.info('[测试场景]执行阶段[execute]')
|
||
self.execute()
|
||
self.dispatcher_logger.logger.info('[测试场景]执行阶段[tear_down]')
|
||
self.tear_down()
|
||
self.check_dispatcher_status_and_stop()
|
||
except Exception as e:
|
||
self.exception_handler(e)
|
||
finally:
|
||
self.dispatcher_logger.logger.info('[测试场景]执行阶段[clean]')
|
||
self.clean()
|
||
|
||
|
||
class AbstractModuleDispatcher(AbstractDispatcher, ABC):
|
||
def run(self):
|
||
try:
|
||
self.dispatcher_logger.logger.info('[测试模块]执行阶段[set_up]')
|
||
self.set_up()
|
||
self.dispatcher_logger.logger.info('[测试模块]执行阶段[execute]')
|
||
self.execute()
|
||
self.dispatcher_logger.logger.info('[测试模块]执行阶段[tear_down]')
|
||
self.tear_down()
|
||
self.check_dispatcher_status_and_stop()
|
||
except Exception as e:
|
||
self.exception_handler(e)
|
||
finally:
|
||
self.dispatcher_logger.logger.info('[测试模块]执行阶段[clean]')
|
||
self.clean()
|
||
|
||
|
||
class AbstractProjectDispatcher(AbstractDispatcher, ABC):
|
||
def run(self):
|
||
try:
|
||
self.dispatcher_logger.logger.info('[测试项目]执行阶段[set_up]')
|
||
self.set_up()
|
||
self.dispatcher_logger.logger.info('[测试项目]执行阶段[execute]')
|
||
self.execute()
|
||
self.dispatcher_logger.logger.info('[测试项目]执行阶段[tear_down]')
|
||
self.tear_down()
|
||
self.check_dispatcher_status_and_stop()
|
||
except Exception as e:
|
||
self.exception_handler(e)
|
||
finally:
|
||
self.dispatcher_logger.logger.info('[测试项目]执行阶段[clean]')
|
||
self.clean()
|
||
|
||
|
||
class ProjectDispatcher(AbstractProjectDispatcher):
|
||
def __init__(self, project_id):
|
||
project = Project.query.filter_by(id=project_id).first()
|
||
super().__init__(element=project, logger=None, dispatcher=None, dispatcher_type=DISPATCHER_TYPE.BUILD)
|
||
|
||
# 项目组件
|
||
self.project = project
|
||
|
||
self.dispatcher_logger.logger.info(('+'*20 + ' [项目测试开始] ==> [项目名称:%s] ' + '+'*20) % self.project.name)
|
||
|
||
def set_up(self):
|
||
super().set_up()
|
||
|
||
def execute(self):
|
||
# 获取到当前项目下所有模块数据)
|
||
modules = self.project.modules
|
||
for module in sort_by_order_in_project(modules):
|
||
if module.status in [STATUS.NORMAL]:
|
||
ModuleDispatcher(module_id=module.id, logger=self.dispatcher_logger, dispatcher=self.dispatcher,
|
||
dispatcher_type=self.dispatcher_type).run()
|
||
|
||
def tear_down(self):
|
||
super().tear_down()
|
||
|
||
def clean(self):
|
||
self.dispatcher_logger.logger.info(('+'*20 + ' [项目测试结束] ==> [项目名称:%s] ' + '+'*20) % self.project.name)
|
||
super().clean()
|
||
|
||
|
||
class ModuleDispatcher(AbstractModuleDispatcher):
|
||
def __init__(self, module_id, logger=None, dispatcher=None, dispatcher_type=None):
|
||
module = Module.query.filter_by(id=module_id).first()
|
||
super().__init__(element=module, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
|
||
|
||
# 模块组件
|
||
self.module = module
|
||
|
||
self.dispatcher_logger.logger.info(('='*15 + ' [模块测试开始] ==> [模块名称:%s] ' + '='*15) % self.module.name)
|
||
|
||
def set_up(self):
|
||
super().set_up()
|
||
|
||
def execute(self):
|
||
# 获取到当前模块下所有场景数据)
|
||
scenes = self.module.scenes
|
||
for scene in sort_by_order_in_module(scenes):
|
||
if scene.status in [STATUS.NORMAL]:
|
||
SceneDispatcher(scene=scene, logger=self.dispatcher_logger, dispatcher=self.dispatcher,
|
||
dispatcher_type=self.dispatcher_type).run()
|
||
|
||
def tear_down(self):
|
||
super().tear_down()
|
||
|
||
def clean(self):
|
||
self.dispatcher_logger.logger.info(('='*15 + ' [模块测试结束] ==> [模块名称:%s] ' + '='*15) % self.module.name)
|
||
super().clean()
|
||
|
||
|
||
class SceneDispatcher(AbstractSceneDispatcher):
|
||
def __init__(self, scene, logger=None, dispatcher=None, dispatcher_type=None):
|
||
super().__init__(element=scene, logger=logger, dispatcher=dispatcher, dispatcher_type=dispatcher_type)
|
||
|
||
# 场景组件
|
||
self.scene = scene
|
||
|
||
# 是否发生异常时终止执行
|
||
self.stop_on_error = self.scene.module.project.project_advanced_configuration.stop_on_error
|
||
|
||
# 标识组件执行结果
|
||
self.element_execute_result = ''
|
||
|
||
self.dispatcher_logger.logger.info(('#'*5 + ' [场景测试开始] ==> [场景名称:%s] ' + '#'*5) % self.scene.name)
|
||
|
||
@contextlib.contextmanager
|
||
def _element_execute_context(self):
|
||
"""
|
||
提供组件执行上下文处理器
|
||
当组件执行出现异常时统一处理
|
||
"""
|
||
try:
|
||
yield
|
||
except Exception as e:
|
||
self.dispatcher_logger.logger.error(traceback.format_exc())
|
||
# 如果是手动中止则直接抛出异常
|
||
if isinstance(e, ManualStopException):
|
||
self.element_execute_result = REPORT_RESULT.ABORT
|
||
raise e
|
||
else:
|
||
self.element_execute_result = REPORT_RESULT.ERROR
|
||
# 如果配置了执行错误时不终止,则不向上抛出异常
|
||
if self.stop_on_error:
|
||
raise e
|
||
else:
|
||
# 在dispatcher.run()中发生异常会将其置为ABORT,因此这里需要重置为SUCCESS
|
||
# 表示当组件发生错误但继续执行时调度结束类型不更新(初始就是SUCCESS)
|
||
self.dispatcher.end_type = DISPATCHER_END_TYPE.SUCCESS
|
||
# 和上面同理,当继续执行时,重置调度状态为RUNNING(初始就是RUNNING)
|
||
self.dispatcher.update_status(status=DISPATCHER_STATUS.RUNNING)
|
||
else:
|
||
pass
|
||
finally:
|
||
pass
|
||
|
||
def set_up(self):
|
||
super().set_up()
|
||
|
||
def execute(self):
|
||
# 场景控制器直接执行该控制器下所有组件
|
||
self._recursive_execute_logic_controller(
|
||
logic_controller_id=self.scene.scene_controller.logic_controller.id,
|
||
parent_dispatcher_detail_id=self.dispatcher_detail.id
|
||
)
|
||
|
||
def tear_down(self):
|
||
super().tear_down()
|
||
|
||
def clean(self):
|
||
self.dispatcher_logger.logger.info(('#' * 5 + ' [场景测试结束] ==> [场景名称:%s] ' + '#' * 5) % self.scene.name)
|
||
super().clean()
|
||
|
||
def _recursive_execute_logic_controller(self, logic_controller_id, parent_dispatcher_detail_id):
|
||
"""
|
||
存在逻辑控制器嵌套,使用递归方式执行
|
||
:param logic_controller_id: 逻辑控制器id
|
||
:param parent_dispatcher_detail_id: 当前调度所属父调度id,是为了写入DispatcherDetail.parent_dispatcher_detail_id表中作为层级关系
|
||
"""
|
||
for sub_element_in_logic_controller in sort_by_order_in_logic_controller(logic_controller_id):
|
||
# 执行案例组件
|
||
if sub_element_in_logic_controller.element_type == ELEMENT_TYPE.CASE:
|
||
case = Case.query.filter_by(id=sub_element_in_logic_controller.element_id).first()
|
||
if case is None:
|
||
raise DispatcherException('未在表 %s 中查到案例, Case.id=%s' %
|
||
('Case', sub_element_in_logic_controller.element_id))
|
||
self._execute_case(case=case, parent_dispatcher_detail_id=parent_dispatcher_detail_id)
|
||
# 执行工具组件
|
||
elif sub_element_in_logic_controller.element_type == ELEMENT_TYPE.TOOL:
|
||
tool = Tool.query.filter_by(id=sub_element_in_logic_controller.element_id).first()
|
||
if tool is None:
|
||
raise DispatcherException('未在表 %s 中查到逻辑控制器, Tool.id=%s' %
|
||
('Tool', sub_element_in_logic_controller.element_id))
|
||
self._execute_tool(tool=tool, parent_dispatcher_detail_id=parent_dispatcher_detail_id)
|
||
# 执行逻辑控制器组件(递归执行)
|
||
elif sub_element_in_logic_controller.element_type == ELEMENT_TYPE.LOGIC_CONTROLLER:
|
||
logic_controller = LogicController.query.filter_by(id=sub_element_in_logic_controller.element_id).first()
|
||
if logic_controller is None:
|
||
raise DispatcherException('未在表 %s 中查到逻辑控制器, LogicController.id=%s' %
|
||
('LogicController', sub_element_in_logic_controller.element_id))
|
||
self._exec_logic_controller(logic_controller=logic_controller,
|
||
parent_dispatcher_detail_id=parent_dispatcher_detail_id)
|
||
|
||
def _execute_case(self, case, parent_dispatcher_detail_id):
|
||
"""
|
||
执行一个案例组件
|
||
:param case: 案例
|
||
:type case: Case
|
||
:param parent_dispatcher_detail_id: 当前案例调度所属的父调度id
|
||
:type parent_dispatcher_detail_id: int
|
||
"""
|
||
if case.status in [STATUS.NORMAL] and case.case_type in [CASE_TYPE.HTTP, CASE_TYPE.SSH, CASE_TYPE.SQL,
|
||
CASE_TYPE.DEBUG]:
|
||
# ReportCaseData表数据字段
|
||
report_id = self.dispatcher.report.id
|
||
case_type = case.case_type
|
||
case_id = case.id
|
||
module_id = case.scene.module.id
|
||
module_name = case.scene.module.name
|
||
expectation_logic = case.specific_case.expectation_logic
|
||
postprocessor_script = case.specific_case.postprocessor_script
|
||
preprocessor_script = case.specific_case.preprocessor_script
|
||
request_header = ''
|
||
request_body = ''
|
||
response_header = ''
|
||
response_body = ''
|
||
postprocessor_result = False
|
||
postprocessor_failure_message = ''
|
||
self.element_execute_result = ''
|
||
expectations = []
|
||
elapsed_time = 0
|
||
# 案例调度对象
|
||
case_dispatcher = None
|
||
try:
|
||
with self._element_execute_context():
|
||
if case.case_type == CASE_TYPE.HTTP:
|
||
from app.cores.case.http.dispatcher import HTTPCaseDispatcher
|
||
case_dispatcher = HTTPCaseDispatcher(
|
||
case=case,
|
||
dispatcher_type=self.dispatcher_type,
|
||
logger=self.dispatcher_logger,
|
||
dispatcher=self.dispatcher
|
||
)
|
||
elif case.case_type == CASE_TYPE.SSH:
|
||
from app.cores.case.ssh.dispatcher import SSHCaseDispatcher
|
||
case_dispatcher = SSHCaseDispatcher(
|
||
case=case,
|
||
dispatcher_type=self.dispatcher_type,
|
||
logger=self.dispatcher_logger,
|
||
dispatcher=self.dispatcher
|
||
)
|
||
elif case.case_type == CASE_TYPE.SQL:
|
||
from app.cores.case.sql.dispatcher import SQLCaseDispatcher
|
||
case_dispatcher = SQLCaseDispatcher(
|
||
case=case,
|
||
dispatcher_type=self.dispatcher_type,
|
||
logger=self.dispatcher_logger,
|
||
dispatcher=self.dispatcher
|
||
)
|
||
elif case.case_type == CASE_TYPE.DEBUG:
|
||
from app.cores.case.debug.dispatcher import DebugCaseDispatcher
|
||
case_dispatcher = DebugCaseDispatcher(
|
||
case=case,
|
||
dispatcher_type=self.dispatcher_type,
|
||
logger=self.dispatcher_logger,
|
||
dispatcher=self.dispatcher
|
||
)
|
||
result = case_dispatcher.run()
|
||
# 获取报告需要的数据
|
||
request_ = result.get('request_')
|
||
request_header = json.dumps(request_.request_headers, ensure_ascii=False)
|
||
request_body = json.dumps(request_.request_body, ensure_ascii=False)
|
||
response_header = json.dumps(request_.response_headers, ensure_ascii=False)
|
||
response_body = json.dumps(request_.response_body, ensure_ascii=False, default=str) if isinstance(
|
||
request_.response_body, dict) else request_.response_body
|
||
postprocessor_result = not bool(result.get('postprocessor_failure'))
|
||
postprocessor_failure_message = result.get('postprocessor_failure_message')
|
||
self.element_execute_result = result.get('result', REPORT_RESULT.FAILURE)
|
||
expectations = result.get('expectations', [])
|
||
elapsed_time = result.get('elapsed_time', 0)
|
||
finally: # 即使try块中出现异常也要执行finally块中代码
|
||
emit_dispatcher_result( # 通知客户端
|
||
id=case.id,
|
||
type=ELEMENT_TYPE.CASE,
|
||
result=self.element_execute_result,
|
||
)
|
||
# 为案例增加调度子数据
|
||
dispatcher_detail_id = DispatcherDetail.add(element_type=ELEMENT_TYPE.CASE, element_id=case.id,
|
||
element_name=case.name, dispatcher=self.dispatcher,
|
||
parent_dispatcher_detail_id=parent_dispatcher_detail_id).id
|
||
# 案例增加报告数据
|
||
report_case_data = ReportCaseData.add(
|
||
report_id=report_id,
|
||
dispatcher_detail_id=dispatcher_detail_id,
|
||
case_type=case_type,
|
||
case_id=case_id,
|
||
module_id=module_id,
|
||
module_name=module_name,
|
||
request_header=request_header,
|
||
request_body=request_body,
|
||
response_header=response_header,
|
||
response_body=response_body,
|
||
preprocessor_script=preprocessor_script,
|
||
postprocessor_script=postprocessor_script,
|
||
postprocessor_result=postprocessor_result,
|
||
postprocessor_failure_message=postprocessor_failure_message,
|
||
log=case_dispatcher.dispatcher_logger.get_string_buffer() if case_dispatcher is not None else '',
|
||
expectation_logic=expectation_logic,
|
||
result=self.element_execute_result,
|
||
elapsed_time=elapsed_time,
|
||
)
|
||
if expectations:
|
||
for expectation in expectations:
|
||
ReportCaseExpectationData.add(
|
||
report_case_data_id=report_case_data.id,
|
||
test_field=expectation.test_field,
|
||
value=expectation.value,
|
||
matching_rule=expectation.matching_rule,
|
||
negater=expectation.negater,
|
||
result=expectation.last_result,
|
||
failure_msg=expectation.last_failure_msg,
|
||
)
|
||
|
||
def _execute_tool(self, tool, parent_dispatcher_detail_id):
|
||
"""
|
||
执行工具
|
||
:param tool: 工具
|
||
:type tool: Tool
|
||
:param parent_dispatcher_detail_id: 当前工具所属的父调度id
|
||
:type parent_dispatcher_detail_id: int
|
||
"""
|
||
if tool.status in [STATUS.NORMAL] and tool.tool_type in [TOOL_TYPE.TIMER,
|
||
TOOL_TYPE.SCRIPT,
|
||
TOOL_TYPE.VARIABLE_DEFINITION]:
|
||
# ReportToolData表数据字段
|
||
report_id = self.dispatcher.report.id
|
||
try:
|
||
with self._element_execute_context():
|
||
if tool.tool_type == TOOL_TYPE.TIMER:
|
||
from app.cores.tool.dispatcher import TimerToolDispatcher
|
||
TimerToolDispatcher(tool=tool, dispatcher_type=DISPATCHER_TYPE.BUILD,
|
||
logger=self.dispatcher_logger, dispatcher=self.dispatcher).run()
|
||
elif tool.tool_type == TOOL_TYPE.SCRIPT:
|
||
from app.cores.tool.dispatcher import ScriptToolDispatcher
|
||
ScriptToolDispatcher(tool=tool, dispatcher_type=DISPATCHER_TYPE.BUILD,
|
||
logger=self.dispatcher_logger, dispatcher=self.dispatcher).run()
|
||
elif tool.tool_type == TOOL_TYPE.VARIABLE_DEFINITION:
|
||
from app.cores.tool.dispatcher import VariableDefinitionToolDispatcher
|
||
VariableDefinitionToolDispatcher(tool=tool, dispatcher_type=DISPATCHER_TYPE.BUILD,
|
||
logger=self.dispatcher_logger, dispatcher=self.dispatcher).run()
|
||
finally:
|
||
# 工具增加调度子数据
|
||
dispatcher_detail_id = DispatcherDetail.add(element_type=ELEMENT_TYPE.TOOL, element_id=tool.id,
|
||
element_name=tool.name, dispatcher=self.dispatcher,
|
||
parent_dispatcher_detail_id=parent_dispatcher_detail_id).id
|
||
# 工具增加报告数据
|
||
ReportToolData.add(tool=tool, report_id=report_id, dispatcher_detail_id=dispatcher_detail_id)
|
||
|
||
def _exec_logic_controller(self, logic_controller, parent_dispatcher_detail_id):
|
||
"""
|
||
执行逻辑控制器
|
||
:param logic_controller: 逻辑控制器
|
||
:type logic_controller: LogicController
|
||
:param parent_dispatcher_detail_id: 当前逻辑控制器所属的父调度id
|
||
:type parent_dispatcher_detail_id: int
|
||
"""
|
||
if logic_controller.status == STATUS.NORMAL:
|
||
# 给逻辑控制器加上调度详细数据,供报告中展示该逻辑控制器节点
|
||
logic_controller_dispatcher_detail_id = DispatcherDetail.add(
|
||
element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
|
||
element_id=logic_controller.id,
|
||
element_name=logic_controller.name,
|
||
dispatcher=self.dispatcher,
|
||
parent_dispatcher_detail_id=parent_dispatcher_detail_id).id
|
||
# 递归执行函数
|
||
recursive_func = partial(self._recursive_execute_logic_controller,
|
||
parent_dispatcher_detail_id=logic_controller_dispatcher_detail_id)
|
||
with self._element_execute_context():
|
||
if logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.IF_CONTROLLER:
|
||
from app.cores.logic_controller.dispatcher import IfControllerDispatcher
|
||
IfControllerDispatcher(logic_controller=logic_controller, recursive_func=recursive_func,
|
||
dispatcher_type=DISPATCHER_TYPE.BUILD, logger=self.dispatcher_logger,
|
||
dispatcher=self.dispatcher).run()
|
||
elif logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.WHILE_CONTROLLER:
|
||
from app.cores.logic_controller.dispatcher import WhileControllerDispatcher
|
||
WhileControllerDispatcher(logic_controller=logic_controller, recursive_func=recursive_func,
|
||
dispatcher_type=DISPATCHER_TYPE.BUILD, logger=self.dispatcher_logger,
|
||
dispatcher=self.dispatcher).run()
|
||
elif logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.LOOP_CONTROLLER:
|
||
from app.cores.logic_controller.dispatcher import LoopControllerDispatcher
|
||
LoopControllerDispatcher(logic_controller=logic_controller, recursive_func=recursive_func,
|
||
dispatcher_type=DISPATCHER_TYPE.BUILD, logger=self.dispatcher_logger,
|
||
dispatcher=self.dispatcher).run()
|
||
elif logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.SIMPLE_CONTROLLER:
|
||
from app.cores.logic_controller.dispatcher import SimpleControllerDispatcher
|
||
SimpleControllerDispatcher(logic_controller=logic_controller, recursive_func=recursive_func,
|
||
dispatcher_type=DISPATCHER_TYPE.BUILD, logger=self.dispatcher_logger,
|
||
dispatcher=self.dispatcher).run()
|
||
|
||
|
||
def async_module_run(module_id):
|
||
"""
|
||
异步执行模块测试
|
||
"""
|
||
def run(app, request, session):
|
||
try:
|
||
with app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
|
||
# 将主线程请求上下文栈中的request和session放入子线程的请求上下文栈顶
|
||
_request_ctx_stack.top.request = request
|
||
session['dispatcher_trigger_type'] = DISPATCHER_TRIGGER_TYPE.BY_HAND # 调度触发类型
|
||
_request_ctx_stack.top.session = session
|
||
ModuleDispatcher(module_id=module_id, dispatcher_type=DISPATCHER_TYPE.BUILD).run()
|
||
except Exception as e:
|
||
app.logger.error(traceback.format_exc())
|
||
executor = ThreadPoolExecutor(1)
|
||
executor.submit(run, current_app._get_current_object(), request._get_current_object(), session._get_current_object())
|
||
# d = DispatcherThread()
|
||
# from threading import Thread
|
||
# t = Thread(target=d.run, args=(module_id,))
|
||
# t.start()
|
||
|
||
|
||
def apscheduler_async_module_run(module_id, app, request, session):
|
||
"""定时任务触发"""
|
||
try:
|
||
with app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
|
||
# 将主线程请求上下文栈中的request和session放入子线程的请求上下文栈顶
|
||
_request_ctx_stack.top.request = request
|
||
session['dispatcher_trigger_type'] = DISPATCHER_TRIGGER_TYPE.BY_SCHEDULE # 调度触发类型
|
||
_request_ctx_stack.top.session = session
|
||
ModuleDispatcher(module_id=module_id, dispatcher_type=DISPATCHER_TYPE.BUILD).run()
|
||
except Exception as e:
|
||
app.logger.error(traceback.format_exc())
|
||
|
||
|
||
# class DispatcherThread:
|
||
# def __init__(self):
|
||
# self.app = current_app._get_current_object()
|
||
# self.request = request._get_current_object()
|
||
# self.session = session._get_current_object()
|
||
#
|
||
# def run(self, module_id):
|
||
# try:
|
||
# with self.app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
|
||
# # 将主线程请求上下文栈中的request和session放入子线程的请求上下文栈顶
|
||
# _request_ctx_stack.top.request = self.request
|
||
# _request_ctx_stack.top.session = self.session
|
||
# ModuleDispatcher(module_id=module_id).run()
|
||
# except Exception as e:
|
||
# print(traceback.format_exc())
|
||
#
|
||
|
||
def async_project_run(project_id):
|
||
"""
|
||
异步执行项目测试
|
||
"""
|
||
def run(app, request, session):
|
||
try:
|
||
with app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
|
||
# 将主线程请求上下文栈中的request和session放入子线程的请求上下文栈顶
|
||
_request_ctx_stack.top.request = request
|
||
session['dispatcher_trigger_type'] = DISPATCHER_TRIGGER_TYPE.BY_HAND # 调度触发类型
|
||
_request_ctx_stack.top.session = session
|
||
ProjectDispatcher(project_id=project_id).run()
|
||
except Exception as e:
|
||
app.logger.error(traceback.format_exc())
|
||
executor = ThreadPoolExecutor(1)
|
||
executor.submit(run, current_app._get_current_object(), request._get_current_object(), session._get_current_object())
|
||
|
||
|
||
def apscheduler_async_project_run(project_id, app, request, session):
|
||
"""定时任务触发"""
|
||
try:
|
||
with app.test_request_context(): # 在线程中创建请求上下文,当栈中没有应用上下文时同时也会创建应用上下文
|
||
# 将主线程请求上下文栈中的request和session放入子线程的请求上下文栈顶
|
||
_request_ctx_stack.top.request = request
|
||
session['dispatcher_trigger_type'] = DISPATCHER_TRIGGER_TYPE.BY_SCHEDULE # 调度触发类型
|
||
_request_ctx_stack.top.session = session
|
||
ProjectDispatcher(project_id=project_id).run()
|
||
except Exception as e:
|
||
app.logger.error(traceback.format_exc())
|