api-automation-test/ApiAutomationTest/app/models.py

3814 lines
163 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
from flask import current_app
from flask_login import UserMixin, current_user
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
from time import time
from sqlalchemy import text
from typing import Iterable, List, Dict, Any, Union, Optional
from app.extensions import db
from app.cores.dictionaries import STATUS, EXPECTATION_LOGIC, CASE_TYPE, ELEMENT_TYPE, LOGIC_CONTROLLER_TYPE, DB_TYPE, \
DISPATCHER_STATUS, TOOL_TYPE
from app.cores.parser import new_parse_data as _p
class User(UserMixin, db.Model):
"""
字段说明
status: 账户状态
"""
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), nullable=False, default='', server_default='')
email = db.Column(db.String(128), nullable=False, default='', server_default='')
password_hash = db.Column(db.String(128), nullable=False, default='', server_default='')
last_seen = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
status = db.Column(db.String(64), nullable=False, default='', server_default='')
# relationship
email_setting = db.relationship('EmailSetting', back_populates='user')
__table_args__ = (
db.UniqueConstraint('username', 'email', name='uix_user_username_email'),
)
def __repr__(self):
return '<User id:{}, username:{}>'.format(self.id, self.username)
def set_password(self, password):
self.password_hash = generate_password_hash(password=password)
def check_password(self, passowrd):
return check_password_hash(pwhash=self.password_hash, password=passowrd)
@classmethod
def add(cls, username, email, password):
user = cls(username=username, email=email, status=STATUS.NORMAL)
user.set_password(password=password)
db.session.add(user)
db.session.commit()
@classmethod
def update_password(cls, username: str, email: str, password: str):
user = cls.query.filter_by(username=username, email=email).first()
user.set_password(password=password)
db.session.commit()
@classmethod
def get_email_token(cls, email: str, expires_in=600) -> str:
return jwt.encode(
{'email': email, 'exp': time() + expires_in},
key=current_app.config['SECRET_KEY'],
algorithm='HS256'
).decode('utf-8')
@classmethod
def get_email_from_token(cls, token: str) -> Optional[str]:
try:
email = jwt.decode(
token,
key=current_app.config['SECRET_KEY'],
algorithms=['HS256']
)['email']
return email
except:
return None
@classmethod
def check_existence_buy_username_email(cls, username, email):
if cls.query.filter_by(username=username, email=email).all():
return True
else:
return False
class EmailSetting(db.Model):
"""
字段说明
whether_send_email: 是否发送邮件
whether_gen_report: 是否生成报告
email_title: 邮件主题
email_text: 正文内容
receiver_address: 收件地址, 多个收件人以逗号分隔
"""
id = db.Column(db.Integer, primary_key=True)
whether_send_email = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
whether_gen_report = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
email_title = db.Column(db.String(128), nullable=False, default='', server_default='')
email_text = db.Column(db.Text, nullable=False, default='', server_default='')
receiver_address = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user = db.relationship('User', back_populates='email_setting')
def __repr__(self):
return '<EmailSetting id:{}, smtp_login_name:{}>'.format(self.id, self.smtp_login_name)
@classmethod
def get_current_email_setting(cls):
"""根据当前用户current_user自动匹配到对应的email_setting数据"""
return cls.query.filter_by(user=current_user).first()
@classmethod
def add(cls, **kwargs):
email_setting = cls(
whether_send_email=kwargs.get('whether_send_email', False),
whether_gen_report=kwargs.get('whether_gen_report', False),
email_title=kwargs.get('whether_gen_report', ''),
email_text=kwargs.get('whether_gen_report', ''),
receiver_address=kwargs.get('receiver_address', ''),
user_id=current_user.id,
)
db.session.add(email_setting)
db.session.commit()
def update(self, **kwargs):
self.whether_send_email = kwargs.get('whether_send_email', self.whether_send_email)
self.whether_gen_report = kwargs.get('whether_gen_report', self.whether_gen_report)
self.email_title = kwargs.get('email_title', self.email_title)
self.email_text = kwargs.get('email_text', self.email_text)
self.receiver_address = kwargs.get('receiver_address', self.receiver_address)
db.session.commit()
class Project(db.Model):
"""
测试项目表
字段说明
name: 项目名称
description: 项目描述
create_time: 项目创建时间
last_updated_time: 项目上次修改时间
status: 项目状态
"""
# 需要转换为json数据格式的字段
render_field_list = ['id', 'name', 'description', 'create_time', 'last_updated_time']
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), nullable=False, default='', server_default='')
description = db.Column(db.String(256), nullable=False, default='', server_default='')
create_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
last_updated_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
status = db.Column(db.String(64), nullable=False, default=STATUS.NORMAL, server_default=STATUS.NORMAL)
# relationship
# modules = db.relationship('Module', back_populates='project', cascade='all, delete-orphan') # 解除与module关系后会删除module数据
modules = db.relationship('Module', back_populates='project', cascade='all')
project_advanced_configuration = db.relationship('ProjectAdvancedConfiguration', back_populates='project',
cascade='all, delete-orphan', uselist=False)
@classmethod
def add(cls, name, description):
now = datetime.now()
current_time = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute,
second=now.second)
project = cls(name=name, description=description, create_time=current_time, last_updated_time=current_time)
project.project_advanced_configuration = ProjectAdvancedConfiguration(stop_on_error=True)
db.session.add(project)
db.session.commit()
@classmethod
def delete(cls, id):
"""不会真正从数据库删除更改status状态为'删除'"""
project = cls.query.filter_by(id=int(id)).first()
if project is not None:
project.status = STATUS.DELETED
db.session.commit()
@classmethod
def update(cls, id, name, description):
project = cls.query.filter_by(id=int(id)).first()
if project is not None:
project.name = name
project.description = description
now = datetime.now()
current_time = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute,
second=now.second)
project.last_updated_time = current_time
db.session.commit()
def add_module(self, name, description):
now = datetime.now()
current_time = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute,
second=now.second)
module = Module(name=name, description=description, creator=current_user.username, create_time=current_time,
last_updater=current_user.username, last_updated_time=current_time,
order_in_project=Module._get_max_order_in_project(project_id=self.id) + 1)
self.modules.append(module)
db.session.commit()
def delete_module(self, id):
for i, m in enumerate(self.modules):
if m.id == int(id):
# self.modules.pop(i)
m.status = STATUS.DELETED
m.order_in_project = -1
db.session.commit()
class ProjectAdvancedConfiguration(db.Model):
"""
产品高级设置
字段说明
stop_on_error 发生错误时终止执行
ding_talk_notify 执行完毕后是否发送钉钉通知
email_notify 执行完毕后是否发送邮件
email_notify_when_default 在默认条件下发送邮件(前提email_notify=True)
email_notify_when_failure_or_error 在失败或错误条件下发送邮件(前提email_notify=True)
clear_http_cookie 构建前清理HTTP的Cookie数据
"""
id = db.Column(db.Integer, primary_key=True)
stop_on_error = db.Column(db.Boolean, nullable=False, default=True, server_default='1')
ding_talk_notify = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
email_notify = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
email_notify_when_default = db.Column(db.Boolean, nullable=False, default=True, server_default='1')
email_notify_when_failure_or_error = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
clear_http_cookie = db.Column(db.Boolean, nullable=False, default=True, server_default='1')
# relationship
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
project = db.relationship('Project', back_populates='project_advanced_configuration')
def update(self, stop_on_error, ding_talk_notify, email_notify, email_notify_when_default,
email_notify_when_failure_or_error, clear_http_cookie):
self.stop_on_error = stop_on_error
self.ding_talk_notify = ding_talk_notify
self.email_notify = email_notify
self.email_notify_when_default = email_notify_when_default
self.email_notify_when_failure_or_error = email_notify_when_failure_or_error
self.clear_http_cookie = clear_http_cookie
db.session.commit()
class Module(db.Model):
"""
测试模块表
字段说明
name: 测试模块名称
description: 测试模块描述
creator: 测试模块创建者
create_time: 测试模块创建时间
last_updater: 测试模块上次修改人
last_updated_time: 测试模块上次修改时间
status: 测试模块状态
"""
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), nullable=False, default='', server_default='')
description = db.Column(db.String(256), nullable=False, default='', server_default='')
creator = db.Column(db.String(64), nullable=False, default='', server_default='')
create_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
last_updater = db.Column(db.String(64), nullable=False, default='', server_default='')
last_updated_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
status = db.Column(db.String(64), nullable=False, default=STATUS.NORMAL, server_default=STATUS.NORMAL)
order_in_project = db.Column(db.Integer, nullable=False, default=-1, server_default=text('-1'))
# relationship
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
project = db.relationship('Project', back_populates='modules')
scenes = db.relationship('Scene', back_populates='module', cascade='all')
@classmethod
def update(cls, id, name, description):
module = cls.query.filter_by(id=int(id)).first()
if module is not None:
module.name = name
module.description = description
now = datetime.now()
current_time = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute,
second=now.second)
module.last_updater = current_user.username
module.last_updated_time = current_time
db.session.commit()
@classmethod
def add_new_empty_scene(cls, module_id):
module = cls.query.filter_by(id=module_id, status=STATUS.NORMAL).first()
if module:
max_order_in_module = Scene._get_max_order_in_module(module_id=module_id)
scene = Scene(name='新增场景', description='', order_in_module=max_order_in_module + 1)
module.scenes.append(scene)
db.session.add(scene)
db.session.commit()
SceneController.add(scene_id=scene.id) # 需要在commit之后拿到id
return scene
@classmethod
def reorder_scene(cls, module_id, reorder_scene_id):
"""
对测试模块中测试场景重置和排序
:param module_id: 模块id
:type module_id: int
:param reorder_scene_id: 排序后的scene_id数组
:type reorder_scene_id: Iterable
:return:
"""
def _get_deleted_scenes(scenes):
"""
返回scenes集合中已删除状态的scene测试场景
:param scenes:
:return:
:rtype: Iterable
"""
return list(filter(lambda scene: scene.status == STATUS.DELETED, scenes))
module = cls.query.filter_by(id=module_id).first()
if module:
deleted_scenes = _get_deleted_scenes(module.scenes)
module.scenes = []
for index, scene_id in enumerate(reorder_scene_id):
scene = Scene.query.filter_by(id=scene_id).first()
if scene:
scene.order_in_module = index
module.scenes.append(scene)
module.scenes.extend(deleted_scenes)
db.session.commit()
@classmethod
def update_order_in_project(cls, modules):
for index, module_ in enumerate(modules):
id = module_.get('id')
module = cls.query.filter_by(id=id).first()
if module:
module.order_in_project = index
db.session.commit()
@classmethod
def _get_max_order_in_project(cls, project_id):
"""根据project_id查询最大的order_in_project"""
sql = text('select max(order_in_project) from "module" where project_id=:project_id;')
max_order_in_project = db.session.query('max(order_in_project)').from_statement(sql).params(project_id=project_id).scalar()
if max_order_in_project is None:
max_order_in_project = -1
return max_order_in_project
class Scene(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), nullable=False, default='', server_default='')
description = db.Column(db.String(256), nullable=False, default='', server_default='')
status = db.Column(db.String(64), nullable=False, default=STATUS.NORMAL, server_default=STATUS.NORMAL)
order_in_module = db.Column(db.Integer, nullable=False, default=-1, server_default=text('-1'))
# relationship
module_id = db.Column(db.Integer, db.ForeignKey('module.id'))
module = db.relationship('Module', back_populates='scenes')
cases = db.relationship('Case', back_populates='scene', cascade='all')
scene_controller = db.relationship('SceneController', back_populates='scene', uselist=False, cascade='all, delete-orphan')
@classmethod
def add(cls, name, description, status, order_in_module, module_id):
"""
新增测试场景
:param name: 场景名称
:param description: 场景描述
:param status: 场景状态
:param order_in_module: 场景在模块中的排序
:param module_id: 模块编号
:return: 场景对象
:rtype: Scene
"""
scene = cls(
name=name,
description=description,
status=status,
order_in_module=order_in_module,
module_id=module_id,
)
db.session.add(scene)
db.session.commit()
SceneController.add(scene_id=scene.id) # 需要在commit之后拿到id TODO 和Module.add_new_empty_scene合并
return scene
@classmethod
def delete(cls, id):
scene = Scene.query.filter_by(id=int(id)).first()
if scene:
scene.status = STATUS.DELETED
scene.order_in_module = -1
db.session.commit()
@classmethod
def update(cls, id, name, description):
scene = Scene.query.filter_by(id=int(id)).first()
if scene:
scene.name = name
scene.description = description
db.session.commit()
@classmethod
def add_new_empty_case(cls, scene_id, case_type):
"""
新增一个新的case
:param scene_id: 新case所属的场景id
:param case_type: 新case类型 HTTP/SQL/SSH/DEBUG or others
"""
# scene = cls.query.filter_by(id=int(scene_id), status=STATUS.NORMAL).first()
scene = cls.query.filter_by(id=int(scene_id)).filter(cls.status != STATUS.DELETED).first()
if scene:
specific_case = Case.new_case(case_type=case_type)
case = specific_case.case
scene.cases.append(case)
db.session.commit()
# 在SubElementInLogicController表中插入一条子组件数据
logic_controller = scene.scene_controller.logic_controller
SubElementInLogicController.add(element_id=case.id, element_type=ELEMENT_TYPE.CASE,
logic_controller_id=logic_controller.id)
return case
@classmethod
def _get_max_order_in_module(cls, module_id):
"""根据module_id查询最大的order_in_module"""
sql = text('select max(order_in_module) from "scene" where module_id=:module_id;')
max_order_in_module = db.session.query('max(order_in_module)').from_statement(sql).params(module_id=module_id).scalar()
if max_order_in_module is None:
max_order_in_module = -1
return max_order_in_module
@classmethod
def copy_from_id(cls, id):
"""复制一个新的测试场景"""
scene = cls.query.filter_by(id=int(id)).first()
if scene:
max_order_in_module = cls._get_max_order_in_module(module_id=scene.module_id)
copy_scene = cls.add(
name='Copy' + scene.name,
description=scene.description,
status=scene.status,
order_in_module=max_order_in_module + 1,
module_id=scene.module_id,
)
from app.template_global import sort_by_order_in_logic_controller
# 复制逻辑控制器
LogicController.copy(original_logic_controller=scene.scene_controller.logic_controller,
parent_logic_controller=copy_scene.scene_controller.logic_controller,
scene_id=copy_scene.id)
return copy_scene
@classmethod
def forbidden(cls, id):
scene = cls.query.filter_by(id=int(id)).first()
if scene:
scene.status = STATUS.FORBIDDEN
db.session.commit()
@classmethod
def enable(cls, id):
scene = cls.query.filter_by(id=int(id)).first()
if scene:
scene.status = STATUS.NORMAL
db.session.commit()
class LogicController(db.Model):
"""
字段说明
name: 逻辑控制器名称
description: 逻辑控制器注释
logic_controller_type: 逻辑控制器类型 type: dictionaries.LOGIC_CONTROLLER_TYPE
status: 状态 type: dictionaries.STATUS
关系说明
sub_elements: 该逻辑控制器下所有子组件 LogicController:SubElementInLogicController is 1:N
"""
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), nullable=False, default='', server_default='')
description = db.Column(db.String(256), nullable=False, default='', server_default='')
logic_controller_type = db.Column(db.String(64), nullable=False, default='', server_default='')
status = db.Column(db.String(64), nullable=False, default='', server_default='')
# relationship
sub_elements = db.relationship('SubElementInLogicController', back_populates='logic_controller', cascade='all, delete-orphan')
@property
def specific_controller(self):
"""返回指定的逻辑控制(场景控制器SceneController, IF控制器IfController)"""
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER:
return SceneController.query.filter_by(logic_controller_id=self.id).first()
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.IF_CONTROLLER:
return IfController.query.filter_by(logic_controller_id=self.id).first()
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.WHILE_CONTROLLER:
return WhileController.query.filter_by(logic_controller_id=self.id).first()
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.LOOP_CONTROLLER:
return LoopController.query.filter_by(logic_controller_id=self.id).first()
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.SIMPLE_CONTROLLER:
return SimpleController.query.filter_by(logic_controller_id=self.id).first()
@classmethod
def add(cls, name, description, logic_controller_type, status=None):
"""新增一条逻辑控制器"""
if status is None:
status = STATUS.NORMAL
logic_controller = cls(name=name, description=description, logic_controller_type=logic_controller_type,
status=status)
db.session.add(logic_controller)
db.session.commit()
return logic_controller
# @classmethod
# def get_max_order_in_logic_controller(cls, logic_controller_id):
# return SubElementInLogicController.get_max_order_in_logic_controller(logic_controller_id=logic_controller_id)
@classmethod
def delete(cls, logic_controller_id):
logic_controller = cls.query.filter_by(id=logic_controller_id).first()
logic_controller.status = STATUS.DELETED
# 所在的逻辑组件序号改为-1
SubElementInLogicController.update_order(element_id=logic_controller.id,
element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
order_in_logic_controller=-1)
db.session.commit()
@classmethod
def forbidden(cls, logic_controller_id):
logic_controller = cls.query.filter_by(id=logic_controller_id).first()
logic_controller.status = STATUS.FORBIDDEN
db.session.commit()
@classmethod
def enable(cls, logic_controller_id):
logic_controller = cls.query.filter_by(id=logic_controller_id).first()
logic_controller.status = STATUS.NORMAL
db.session.commit()
@classmethod
def copy(cls, original_logic_controller, parent_logic_controller, scene_id):
"""
将原逻辑控制器中的组件复制到新逻辑控制器内
递归遍历original_logic_controller下面的组件将组件复制到parent_logic_controller中
:param original_logic_controller: 原逻辑控制器
:param parent_logic_controller: 新组件所在的父逻辑控制器
:param scene_id: 逻辑控制器内案例所在的新场景id
"""
# 避免和app.template_global中import app.modele形成循环导入
from app.template_global import sort_by_order_in_logic_controller
for original_sub_element in sort_by_order_in_logic_controller(logic_controller_id=original_logic_controller.id):
if original_sub_element.element_type == ELEMENT_TYPE.CASE:
original_case = Case.query.filter_by(id=original_sub_element.element_id).first()
if original_case is not None and original_case.status in [STATUS.NORMAL, STATUS.FORBIDDEN]:
Case.copy_from_id(case_id=original_case.id, scene_id=scene_id,
logic_controller_id=parent_logic_controller.id)
elif original_sub_element.element_type == ELEMENT_TYPE.TOOL:
original_tool = Tool.query.filter_by(id=original_sub_element.element_id).first()
if original_tool is not None and original_tool.status in [STATUS.NORMAL, STATUS.FORBIDDEN]:
Tool.copy_from_id(tool_id=original_tool.id, logic_controller_id=parent_logic_controller.id)
elif original_sub_element.element_type == ELEMENT_TYPE.LOGIC_CONTROLLER:
original_logic_controller = LogicController.query.filter_by(id=original_sub_element.element_id).first()
if original_logic_controller is not None and original_logic_controller.status in [STATUS.NORMAL, STATUS.FORBIDDEN]:
if original_logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.IF_CONTROLLER:
new_logic_controller = IfController.add(
description=original_logic_controller.description,
expression=original_logic_controller.specific_controller.expression,
parent_logic_controller_id=parent_logic_controller.id,
status=original_logic_controller.status).logic_controller
elif original_logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.LOOP_CONTROLLER:
new_logic_controller = LoopController.add(
description=original_logic_controller.description,
expression=original_logic_controller.specific_controller.expression,
parent_logic_controller_id=parent_logic_controller.id,
status=original_logic_controller.status).logic_controller
elif original_logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.WHILE_CONTROLLER:
new_logic_controller = WhileController.add(
description=original_logic_controller.description,
expression=original_logic_controller.specific_controller.expression,
parent_logic_controller_id=parent_logic_controller.id,
status=original_logic_controller.status).logic_controller
elif original_logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.SIMPLE_CONTROLLER:
new_logic_controller = SimpleController.add(
description=original_logic_controller.description,
parent_logic_controller_id=parent_logic_controller.id,
status=original_logic_controller.status).logic_controller
else:
raise ValueError(
'不支持此类逻辑控制器类型, logic_controller_type=%s' % original_logic_controller.logic_controller_type)
cls.copy(original_logic_controller=original_logic_controller,
parent_logic_controller=new_logic_controller, scene_id=scene_id)
@classmethod
def get_project_id_from_current_logic_controller(cls, logic_controller):
"""找到当前逻辑控制器所属的项目id"""
def _recursive_find_scene_controller(logic_controller_id):
"""递归查找SceneController的logic_controller_id"""
sub_element_in_logic_controller = SubElementInLogicController.query.filter_by(
element_id=logic_controller_id,
element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
).first()
if sub_element_in_logic_controller:
return _recursive_find_scene_controller(sub_element_in_logic_controller.logic_controller_id)
else:
return logic_controller_id
if logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER:
return logic_controller.specific_controller.scene.module.project_id
else:
# 从SubElementInLogicController关系中找到最上层的SceneController
logic_controller_id = _recursive_find_scene_controller(logic_controller.id)
scene_controller = SceneController.query.filter_by(logic_controller_id=logic_controller_id).first()
if scene_controller:
return scene_controller.scene.module.project_id
class SubElementInLogicController(db.Model):
"""
表明逻辑控制器下有哪些组件(子控制器、案例...
字段说明
logic_controller_id: 逻辑控制器id
element_id: 子组件id子控制器、案例...
element_type: 子组件类型
order_in_logic_controller: 子组件在控制器中的顺序
关系说明
logic_controller: 子组件所属逻辑控制器 LogicController:SubElementInLogicController is 1:N
"""
id = db.Column(db.Integer, primary_key=True)
element_id = db.Column(db.Integer, nullable=False, default=-1, server_default=text('-1'))
element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
order_in_logic_controller = db.Column(db.Integer, nullable=False, default=-1, server_default=text('-1'))
# relationship
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
logic_controller = db.relationship('LogicController', back_populates='sub_elements')
__table_args__ = (
db.UniqueConstraint('element_id', 'element_type', name='uix_element_id_type'),
)
@classmethod
def get_max_order_in_logic_controller(cls, logic_controller_id):
"""
返回该逻辑控制下组件最大的序号
:param logic_controller_id: 逻辑控制器id
:return: 逻辑控制下组件最大的序号
"""
sql = text('select max(order_in_logic_controller) from "sub_element_in_logic_controller"'
' where logic_controller_id=:logic_controller_id;')
max_order_in_logic_controller = db.session.query('max(order_in_logic_controller)').from_statement(sql).params(
logic_controller_id=logic_controller_id).scalar()
if max_order_in_logic_controller is None: # 表示当前逻辑控制器内无排序组件将max_order_in_logic_controller置为-1
max_order_in_logic_controller = -1
return max_order_in_logic_controller
@classmethod
def add(cls, element_id, element_type, logic_controller_id):
max_order_in_logic_controller = cls.get_max_order_in_logic_controller(logic_controller_id=logic_controller_id)
sub_element_logic_controoler = cls(element_id=element_id, element_type=element_type,
logic_controller_id=logic_controller_id,
order_in_logic_controller=max_order_in_logic_controller + 1)
db.session.add(sub_element_logic_controoler)
db.session.commit()
return sub_element_logic_controoler
@classmethod
def update_order(cls, element_id, element_type, order_in_logic_controller, logic_controller_id=None):
"""
更新组件数据
:param element_id: 组件id
:param element_type: 组件类型
:param order_in_logic_controller: 组件在父逻辑控制器内的顺序
:param logic_controller_id: 父逻辑控制id
"""
sub_element_logic_controller = cls.query.filter_by(element_id=element_id, element_type=element_type).first()
if sub_element_logic_controller is not None:
sub_element_logic_controller.order_in_logic_controller = order_in_logic_controller
if logic_controller_id is not None:
sub_element_logic_controller.logic_controller_id = logic_controller_id
db.session.commit()
@classmethod
def update_elements_order(cls, elements_order_in_logic_controller):
"""
更新组件顺序在所属逻辑控制器中
:param elements_order_in_logic_controller: 新排序
:type elements_order_in_logic_controller: List[Dict[str, Any]]
:return:
"""
for element_order in elements_order_in_logic_controller:
cls.update_order(element_id=element_order.get('element_id'),
element_type=element_order.get('element_type'),
order_in_logic_controller=element_order.get('order_in_logic_controller'),
logic_controller_id=element_order.get('logic_controller_id'))
@classmethod
def get_scene_controller_by_element(cls, element_id, element_type):
"""
根据元素获取该组件所在的场景控制器
:param element_id: 组件id
:param element_type: 组件类型
:return: 场景控制器对象
:rtype: SceneController
"""
sub_element_logic_controller = SubElementInLogicController.query.filter_by(
element_id=element_id, element_type=element_type).first()
logic_controller_id = sub_element_logic_controller.logic_controller_id
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
if logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER:
return logic_controller.specific_controller
else:
return cls.get_scene_controller_by_element(
element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER)
def get_element_logic_controller_if_element_type_is_logic_controller(self):
"""
如果元素是一个逻辑控制器,则返回该逻辑控制数据
需要提醒一点的是关系中有个logic_controller这个指的是该组件所属的逻辑控制即父组件
"""
if self.element_type == ELEMENT_TYPE.LOGIC_CONTROLLER:
return LogicController.query.filter_by(id=self.element_id).first()
else:
return None
class SceneController(db.Model):
"""
场景逻辑控制器,只用在场景内第一层逻辑控制
关系说明
scene: 对应的场景scene Scene:SceneController is 1:1
logic_controller: 对应逻辑控制器 LogicController:SceneController is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
# relationship
scene_id = db.Column(db.Integer, db.ForeignKey('scene.id'))
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
scene = db.relationship('Scene', back_populates='scene_controller')
logic_controller = db.relationship('LogicController')
@classmethod
def add(cls, scene_id):
"""新增场景控制器,只在新增场景时默认添加执行"""
logic_controller = LogicController.add(name=LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER,
description='',
logic_controller_type=LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER)
scene_controller = cls(scene_id=scene_id, logic_controller_id=logic_controller.id)
db.session.add(scene_controller)
db.session.commit()
return scene_controller
class SimpleController(db.Model):
"""
简单控制器
关系说明
logic_controller: 对应逻辑控制器
SimpleController:LogicController is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
# relationship
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
logic_controller = db.relationship('LogicController')
@classmethod
def add(cls, parent_logic_controller_id, description='', status=None):
"""
添加简单控制器,一般所有逻辑控制新增时默认都在场景控制器内
:param parent_logic_controller_id: 简单控制所属的父逻辑控制id
:param description: 注释(复制组件时传入)
:param status: 逻辑控制器状态(复制组件时传入)
"""
logic_controller = LogicController.add(name='简单控制器',
description=description,
logic_controller_type=LOGIC_CONTROLLER_TYPE.SIMPLE_CONTROLLER,
status=status)
simple_controller = cls(logic_controller_id=logic_controller.id)
db.session.add(simple_controller)
db.session.commit()
# 将逻辑控制绑定在场景控制中
SubElementInLogicController.add(element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
logic_controller_id=parent_logic_controller_id)
return simple_controller
@classmethod
def update(cls, logic_controller_id, name, description):
"""更新Simple控制器名称"""
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
if logic_controller:
simple_controller = logic_controller.specific_controller
if simple_controller is not None:
simple_controller.logic_controller.name = name
simple_controller.logic_controller.description = description
db.session.commit()
def copy_from_self(self, parent_logic_controller_id):
"""
复制一个新的逻辑控制器
:param parent_logic_controller_id: 新的逻辑控制所属的父逻辑控制器id
:return: SimpleController
:rtype: SimpleController
"""
return SimpleController.add(parent_logic_controller_id=parent_logic_controller_id,
description=self.logic_controller.description,
status=self.logic_controller.status)
class IfController(db.Model):
"""
IF条件控制器
字段说明
expression: 表达式
关系说明
logic_controller: 对应逻辑控制器
IfController:LogicController is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
expression = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
logic_controller = db.relationship('LogicController')
@classmethod
def add(cls, expression, parent_logic_controller_id, description='', status=None):
"""
添加If控制器一般所有逻辑控制新增时默认都在场景控制器内
:param expression: 表达式
:param parent_logic_controller_id: IF控制所属的父逻辑控制id
:param description: 注释(复制组件时传入)
:param status: 逻辑控制器状态(复制组件时传入)
"""
logic_controller = LogicController.add(name='IF控制器',
description=description,
logic_controller_type=LOGIC_CONTROLLER_TYPE.IF_CONTROLLER,
status=status)
if_controller = cls(logic_controller_id=logic_controller.id, expression=expression)
db.session.add(if_controller)
db.session.commit()
# 将逻辑控制绑定在场景控制中
SubElementInLogicController.add(element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
logic_controller_id=parent_logic_controller_id)
return if_controller
@classmethod
def update(cls, logic_controller_id, name, description, expression):
"""更新If控制器表达式和名称"""
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
if logic_controller:
if_controller = logic_controller.specific_controller
if if_controller is not None:
if_controller.logic_controller.name = name
if_controller.logic_controller.description = description
if_controller.expression = expression
db.session.commit()
def copy_from_self(self, parent_logic_controller_id):
"""
复制一个新的逻辑控制器
:param parent_logic_controller_id: 新的逻辑控制所属的父逻辑控制器id
:return: IfController
:rtype: IfController
"""
return IfController.add(expression=self.expression,
parent_logic_controller_id=parent_logic_controller_id,
description=self.logic_controller.description,
status=self.logic_controller.status)
class WhileController(db.Model):
"""
WHILE控制器
字段说明
expression: 表达式
关系说明
logic_controller: 对应逻辑控制器
WhileController:LogicController is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
expression = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
logic_controller = db.relationship('LogicController')
@classmethod
def add(cls, expression, parent_logic_controller_id, description='', status=None):
"""
添加While控制器一般所有逻辑控制新增时默认都在场景控制器内
:param expression: 表达式
:param parent_logic_controller_id: WHILE控制所属的父逻辑控制id
:param description: 注释(复制组件时传入)
:param status: 逻辑控制器状态(复制组件时传入)
"""
logic_controller = LogicController.add(name='While控制器',
description=description,
logic_controller_type=LOGIC_CONTROLLER_TYPE.WHILE_CONTROLLER,
status=status)
while_controller = cls(logic_controller_id=logic_controller.id, expression=expression)
db.session.add(while_controller)
db.session.commit()
# 将逻辑控制绑定在场景控制中
SubElementInLogicController.add(element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
logic_controller_id=parent_logic_controller_id)
return while_controller
@classmethod
def update(cls, logic_controller_id, name, description, expression):
"""更新While控制器表达式和名称"""
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
if logic_controller:
while_controller = logic_controller.specific_controller
if while_controller is not None:
while_controller.logic_controller.name = name
while_controller.logic_controller.description = description
while_controller.expression = expression
db.session.commit()
def copy_from_self(self, parent_logic_controller_id):
"""
复制一个新的逻辑控制器
:param parent_logic_controller_id: 新的逻辑控制所属的父逻辑控制器id
:return: while_controller
:rtype: WhileController
"""
return WhileController.add(expression=self.expression,
parent_logic_controller_id=parent_logic_controller_id,
description=self.logic_controller.description,
status=self.logic_controller.status)
class LoopController(db.Model):
"""
LOOP控制器
字段说明
expression: 表达式
关系说明
logic_controller: 对应逻辑控制器
LoopController:LogicController is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
expression = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
logic_controller = db.relationship('LogicController')
@classmethod
def add(cls, expression, parent_logic_controller_id, description='', status=None):
"""
添加Loop控制器一般所有逻辑控制新增时默认都在场景控制器内
:param expression: 表达式
:param parent_logic_controller_id: WHILE控制所属的父逻辑控制id
:param description: 注释(复制组件时传入)
:param status: 逻辑控制器状态(复制组件时传入)
"""
logic_controller = LogicController.add(name='LOOP控制器',
description=description,
logic_controller_type=LOGIC_CONTROLLER_TYPE.LOOP_CONTROLLER,
status=status)
loop_controller = cls(logic_controller_id=logic_controller.id, expression=expression)
db.session.add(loop_controller)
db.session.commit()
# 将逻辑控制绑定在场景控制中
SubElementInLogicController.add(element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
logic_controller_id=parent_logic_controller_id)
return loop_controller
@classmethod
def update(cls, logic_controller_id, name, description, expression):
"""更新Loop控制器表达式和名称"""
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
if logic_controller:
loop_controller = logic_controller.specific_controller
if loop_controller is not None:
loop_controller.logic_controller.name = name
loop_controller.logic_controller.description = description
loop_controller.expression = expression
db.session.commit()
def copy_from_self(self, parent_logic_controller_id):
"""
复制一个新的逻辑控制器
:param parent_logic_controller_id: 新的逻辑控制所属的父逻辑控制器id
:return: loop_controller
:rtype: LoopController
"""
return LoopController.add(expression=self.expression,
parent_logic_controller_id=parent_logic_controller_id,
description=self.logic_controller.description,
status=self.logic_controller.status)
class Tool(db.Model):
"""
字段说明
name: 工具名称
description: 工具注释
status: 案例状态(删除 正常)
tool_type: 工具类型 TOOL_TYPE
"""
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(512), nullable=False, default='', server_default='')
description = db.Column(db.String(512), nullable=False, default='', server_default='')
status = db.Column(db.String(64), nullable=False, default='', server_default='')
tool_type = db.Column(db.String(64), nullable=False, default='', server_default='')
@property
def specific_tool(self):
if self.tool_type == TOOL_TYPE.TIMER:
return TimerTool.query.filter_by(tool_id=self.id).first()
if self.tool_type == TOOL_TYPE.SCRIPT:
return ScriptTool.query.filter_by(tool_id=self.id).first()
if self.tool_type == TOOL_TYPE.VARIABLE_DEFINITION:
return VariableDefinitionTool.query.filter_by(tool_id=self.id).first()
if self.tool_type == TOOL_TYPE.HTTP_HEADER_MANAGER:
return HTTPHeaderManagerTool.query.filter_by(tool_id=self.id).first()
if self.tool_type == TOOL_TYPE.HTTP_COOKIE_MANAGER:
return HTTPCookieManagerTool.query.filter_by(tool_id=self.id).first()
@classmethod
def exist_and_status_not_delete(cls, tool_id) -> bool:
"""根据tool_id判断工具存在且状态正常未删除"""
tool = cls.query.filter(cls.id == int(tool_id),
cls.status.in_([STATUS.NORMAL, STATUS.FORBIDDEN])).first()
return True if tool is not None else False
@classmethod
def add_new_empty_tool(cls, parent_logic_controller_id, tool_type):
"""
新增一个新的tool
:param parent_logic_controller_id: 新tool所属的逻辑控制器
:param tool_type: 新的组件类型
"""
specific_tool = Tool.new_tool(tool_type=tool_type)
tool = specific_tool.tool
# 在SubElementInLogicController表中插入一条子组件数据
SubElementInLogicController.add(element_id=tool.id, element_type=ELEMENT_TYPE.TOOL,
logic_controller_id=parent_logic_controller_id)
return tool
@classmethod
def new_tool(cls, tool_type):
"""
:param tool_type: 工具组件类型
:return: 特定类型工具对象
:rtype: Union[TimerTool]
"""
tool = cls(
name='新的' + tool_type + '工具',
description='',
tool_type=tool_type,
status=STATUS.NORMAL,
)
if tool_type == TOOL_TYPE.TIMER:
specific_tool = TimerTool(delay='5000')
specific_tool.tool = tool
elif tool_type == TOOL_TYPE.SCRIPT:
specific_tool = ScriptTool(script='# 请编写测试脚本')
specific_tool.tool = tool
elif tool_type == TOOL_TYPE.VARIABLE_DEFINITION:
specific_tool = VariableDefinitionTool.new_tool()
specific_tool.tool = tool
elif tool_type == TOOL_TYPE.HTTP_HEADER_MANAGER:
specific_tool = HTTPHeaderManagerTool.new_tool()
specific_tool.tool = tool
elif tool_type == TOOL_TYPE.HTTP_COOKIE_MANAGER:
specific_tool = HTTPCookieManagerTool.new_tool()
specific_tool.tool = tool
else:
raise ValueError('不支持当前tool_type类型, tool_type=%s' % tool_type)
db.session.add(specific_tool)
db.session.commit()
return specific_tool
@classmethod
def delete(cls, tool_id):
"""不会真正从数据库删除更改status状态为0"""
tool = cls.query.filter_by(id=tool_id).first()
if tool is not None:
tool.status = STATUS.DELETED
# tool所在的逻辑组件序号改为-1
SubElementInLogicController.update_order(element_id=tool.id, element_type=ELEMENT_TYPE.TOOL,
order_in_logic_controller=-1)
db.session.commit()
@classmethod
def copy_from_id(cls, tool_id, logic_controller_id=None):
"""
根据id复制一个新的tool
:param tool_id: 被复制的tool id
:type tool_id: int
:param logic_controller_id: 新tool所属的父逻辑控制id
:type logic_controller_id: int
:rtype: Tool
"""
tool = cls.query.filter_by(id=tool_id).first()
if tool:
copy_specific_tool = tool.specific_tool.copy_from_self()
# 在SubElementInLogicController表中插入一条子组件数据
SubElementInLogicController.add(element_id=copy_specific_tool.tool.id, element_type=ELEMENT_TYPE.TOOL,
logic_controller_id=logic_controller_id)
return copy_specific_tool.tool
@classmethod
def forbidden(cls, tool_id):
"""禁用测试组件"""
tool = cls.query.filter_by(id=tool_id).first()
if tool:
tool.status = STATUS.FORBIDDEN
db.session.commit()
@classmethod
def enable(cls, tool_id):
"""启用测试组件"""
tool = cls.query.filter_by(id=tool_id).first()
if tool:
tool.status = STATUS.NORMAL
db.session.commit()
@classmethod
def get_project_id_from_current_tool(cls, tool):
"""找到当前工具所属的项目id"""
# 找到工具所在的逻辑控制器
sub_element_in_logic_controller = SubElementInLogicController.query.filter_by(
element_id=tool.id,
element_type=ELEMENT_TYPE.TOOL,
).first()
if sub_element_in_logic_controller:
parent_logic_controller_id = sub_element_in_logic_controller.logic_controller_id
logic_controller = LogicController.query.filter_by(id=parent_logic_controller_id).first()
if logic_controller:
return LogicController.get_project_id_from_current_logic_controller(logic_controller=logic_controller)
class TimerTool(db.Model):
"""
字段说明
delay: 暂停时间(ms)
关系说明
tool: 定时器对应的工具对象 TimerTool:Tool is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
delay = db.Column(db.String(64), nullable=False, default='', server_default='')
# relationship
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
tool = db.relationship('Tool')
@property
def delay_(self):
return _p(self.delay)
@classmethod
def update(cls, tool_id, name, description, delay):
"""
更新工具数据
:param tool_id: 工具编号
:param name: 名称
:param description: 注释
:param delay: 暂停时间
"""
tool = Tool.query.filter_by(id=tool_id).first()
if tool:
timer_tool = tool.specific_tool
tool.name = name
tool.description = description
timer_tool.delay = delay
db.session.commit()
@classmethod
def add(cls, name, description, delay, status=STATUS.NORMAL):
"""
新增一条工具数据
:param name: 案例名
:param description: 案例注释
:param delay: 暂停时间
:param status: 案例状态(删除 正常)
"""
tool = Tool(
name=name,
description=description,
status=status,
tool_type=TOOL_TYPE.TIMER,
)
timer_tool = cls(
delay=delay,
)
timer_tool.tool = tool
db.session.add(tool)
db.session.add(timer_tool)
db.session.commit()
return timer_tool
def copy_from_self(self):
"""
复制一个新的工具
:return: 新的工具
:rtype: Tool
"""
copy_tool = self.add(
# Tool
name='Copy' + self.tool.name,
description=self.tool.description,
status=self.tool.status,
# TimerTool
delay=self.delay,
)
return copy_tool
class ScriptTool(db.Model):
"""
字段说明
script: 脚本
log: 日志
关系说明
tool: 脚本对应的工具对象 ScriptTool:Tool is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
script = db.Column(db.Text, nullable=False, default='', server_default='')
log = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
tool = db.relationship('Tool')
@property
def script_(self):
return _p(self.script)
@classmethod
def update(cls, tool_id, name, description, script):
"""
更新工具数据
:param tool_id: 工具编号
:param name: 名称
:param description: 注释
:param script: 脚本
"""
tool = Tool.query.filter_by(id=tool_id).first()
if tool:
script_tool = tool.specific_tool
tool.name = name
tool.description = description
script_tool.script = script
db.session.commit()
@classmethod
def add(cls, name, description, script, status=STATUS.NORMAL):
"""
新增一条工具数据
:param name: 案例名
:param description: 案例注释
:param script: 脚本
:param status: 案例状态(删除 正常)
"""
tool = Tool(
name=name,
description=description,
status=status,
tool_type=TOOL_TYPE.SCRIPT,
)
script_tool = cls(
script=script,
)
script_tool.tool = tool
db.session.add(tool)
db.session.add(script_tool)
db.session.commit()
return script_tool
def copy_from_self(self):
"""
复制一个新的工具
:return: 新的工具
:rtype: Tool
"""
copy_tool = self.add(
# Tool
name='Copy' + self.tool.name,
description=self.tool.description,
status=self.tool.status,
# ScriptTool
script=self.script,
)
return copy_tool
class VariableDefinitionTool(db.Model):
"""
关系说明
tool: 变量对应的工具对象 VariableDefinitionTool:Tool is 1:1
variable_definition_list: 变量工具所包含的所有变量与值数据 VariableDefinitionTool:VariableDefinitionList is 1:N
"""
id = db.Column(db.Integer, primary_key=True)
# relationship
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
tool = db.relationship('Tool')
variable_definition_list = db.relationship('VariableDefinitionList', back_populates='variable_definition_tool',
cascade='all, delete-orphan')
@property
def variables(self):
return [{
'name': row.name,
'value': row.value,
'description': row.description,
} for row in self.variable_definition_list]
@classmethod
def new_tool(cls):
"""新增默认"""
variable_definition_tool = cls()
variable_definition_tool.variable_definition_list = []
variable_definition_tool.variable_definition_list.append(
VariableDefinitionList(name='var_name', value='var_value', description='')
)
db.session.add(variable_definition_tool)
db.session.commit()
return variable_definition_tool
@classmethod
def update(cls, name, description, tool_id, variables):
"""
更新工具数据
:param tool_id: 工具编号
:type tool_id: int
:param name: 名称
:type name: str
:param description: 注释
:type description: str
:param variables: 变量键值对列表
:type variables: List[Mapping[str, str]]
"""
tool = Tool.query.filter_by(id=tool_id).first()
if tool:
tool.name = name
tool.description = description
variable_definition_tool = tool.specific_tool
variable_definition_tool.variable_definition_list = []
for var in variables:
variable_definition_tool.variable_definition_list.append(
VariableDefinitionList(name=var.get('name', ''),
value=var.get('value', ''),
description=var.get('description', ''))
)
db.session.commit()
@classmethod
def add(cls, name, description, variables, status=STATUS.NORMAL):
"""
新增一条工具数据
:param name: 名称
:type name: str
:param description: 注释
:type description: str
:param variables: 变量键值对
:type variables: List[Mapping[str, str]]
:param status: 案例状态(删除 正常)
:type status: str
"""
tool = Tool(
name=name,
description=description,
status=status,
tool_type=TOOL_TYPE.VARIABLE_DEFINITION,
)
variable_definition_tool = cls()
variable_definition_tool.variable_definition_list = []
for var in variables:
variable_definition_tool.variable_definition_list.append(
VariableDefinitionList(name=var.get('name', ''),
value=var.get('value', ''),
description=var.get('description', ''))
)
variable_definition_tool.tool = tool
db.session.add(tool)
db.session.add(variable_definition_tool)
db.session.commit()
return variable_definition_tool
def copy_from_self(self):
"""
复制一个新的工具
:return: 新的工具
:rtype: Tool
"""
# 先获取变量键值对数据
copy_tool = self.add(
# Tool
name='Copy' + self.tool.name,
description=self.tool.description,
status=self.tool.status,
# VariableDefinitionTool
variables=self.variables,
)
return copy_tool
class VariableDefinitionList(db.Model):
"""
字段说明
name: 变量名
value: 变量值
description: 注释
关系说明
variable_definition_tool: 变量设置列表对应的变量工具对象 VariableDefinitionList:VariableDefinitionTool is N:1
"""
# 需要转换为json数据格式的字段
render_field_list = ['name', 'value', 'description']
# filed
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(512), nullable=False, default='', server_default='')
value = db.Column(db.String(512), nullable=False, default='', server_default='')
description = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
variable_definition_tool_id = db.Column(db.Integer, db.ForeignKey('variable_definition_tool.id'))
variable_definition_tool = db.relationship('VariableDefinitionTool', back_populates='variable_definition_list')
@property
def name_(self):
return _p(self.name)
@property
def value_(self):
return _p(self.value)
class HTTPHeaderManagerTool(db.Model):
"""
关系说明
tool: HTTP请求头对应的工具对象 HTTPHeaderManagerTool:Tool is 1:1
http_header_manager_list: HTTP请求头工具所包含的所有变量与值数据 HTTPHeaderManagerTool:HTTPHeaderManagerList is 1:N
"""
id = db.Column(db.Integer, primary_key=True)
# relationship
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
tool = db.relationship('Tool')
http_header_manager_list = db.relationship('HTTPHeaderManagerList',
back_populates='http_header_manager_tool',
cascade='all, delete-orphan')
@property
def variables(self):
return [{
'name': row.name,
'value': row.value,
'description': row.description,
} for row in self.http_header_manager_list]
@classmethod
def new_tool(cls):
"""新增默认"""
http_header_manager_tool = cls()
http_header_manager_tool.http_header_manager_list = []
http_header_manager_tool.http_header_manager_list.append(
HTTPHeaderManagerList(name='header_name', value='header_value', description='')
)
db.session.add(http_header_manager_tool)
db.session.commit()
return http_header_manager_tool
@classmethod
def update(cls, name, description, tool_id, variables):
"""
更新工具数据
:param tool_id: 工具编号
:type tool_id: int
:param name: 名称
:type name: str
:param description: 注释
:type description: str
:param variables: 请求头键值对列表
:type variables: List[Mapping[str, str]]
"""
tool = Tool.query.filter_by(id=tool_id).first()
if tool:
tool.name = name
tool.description = description
http_header_manager_tool = tool.specific_tool
http_header_manager_tool.http_header_manager_list = []
for var in variables:
http_header_manager_tool.http_header_manager_list.append(
HTTPHeaderManagerList(name=var.get('name', ''),
value=var.get('value', ''),
description=var.get('description', ''))
)
db.session.commit()
@classmethod
def add(cls, name, description, variables, status=STATUS.NORMAL):
"""
新增一条工具数据
:param name: 名称
:type name: str
:param description: 注释
:type description: str
:param variables: 请求头键值对
:type variables: List[Mapping[str, str]]
:param status: 案例状态(删除 正常)
:type status: str
"""
tool = Tool(
name=name,
description=description,
status=status,
tool_type=TOOL_TYPE.HTTP_HEADER_MANAGER,
)
http_header_manager_tool = cls()
http_header_manager_tool.http_header_manager_list = []
for var in variables:
http_header_manager_tool.http_header_manager_list.append(
HTTPHeaderManagerList(name=var.get('name', ''),
value=var.get('value', ''),
description=var.get('description', ''))
)
http_header_manager_tool.tool = tool
db.session.add(tool)
db.session.add(http_header_manager_tool)
db.session.commit()
return http_header_manager_tool
def copy_from_self(self):
"""
复制一个新的工具
:return: 新的工具
:rtype: Tool
"""
# 先获取变量键值对数据
copy_tool = self.add(
# Tool
name='Copy' + self.tool.name,
description=self.tool.description,
status=self.tool.status,
# HTTPHeaderManagerTool
variables=self.variables,
)
return copy_tool
class HTTPHeaderManagerList(db.Model):
"""
字段说明
name: 名称
value: 值
description: 注释
关系说明
http_header_manager_tool: HTTP请求头数据列表对应的HTTP请求头管理工具对象 HTTPHeaderManagerList:HTTPHeaderManagerTool is N:1
"""
# 需要转换为json数据格式的字段
render_field_list = ['name', 'value', 'description']
# filed
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(512), nullable=False, default='', server_default='')
value = db.Column(db.String(512), nullable=False, default='', server_default='')
description = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
http_header_manager_tool_id = db.Column(db.Integer, db.ForeignKey('http_header_manager_tool.id'))
http_header_manager_tool = db.relationship('HTTPHeaderManagerTool',
back_populates='http_header_manager_list')
@property
def name_(self):
return _p(self.name)
@property
def value_(self):
return _p(self.value)
class HTTPCookieManagerTool(db.Model):
"""
关系说明
tool: HTTPCookieManagerTool对应的工具对象 HTTPCookieManagerTool:Tool is 1:1
http_cookie_manager_list: HTTPCookieManagerTool工具所包含的所有变量与值数据 HTTPCookieManagerTool:HTTPCookieManagerList is 1:N
"""
id = db.Column(db.Integer, primary_key=True)
# relationship
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
tool = db.relationship('Tool')
http_cookie_manager_list = db.relationship('HTTPCookieManagerList',
back_populates='http_cookie_manager_tool',
cascade='all, delete-orphan')
@property
def attributes(self):
return [{
'name': row.name,
'value': row.value,
'domain': row.domain,
'path': row.path,
'secure': row.secure,
} for row in self.http_cookie_manager_list]
@classmethod
def new_tool(cls):
"""新增默认"""
http_cookie_manager_tool = cls()
http_cookie_manager_tool.http_cookie_manager_list = []
http_cookie_manager_tool.http_cookie_manager_list.append(
HTTPCookieManagerList(name='cookie_name',
value='cookie_value',
domain='localhost',
path='/',
secure=False)
)
db.session.add(http_cookie_manager_tool)
db.session.commit()
return http_cookie_manager_tool
@classmethod
def update(cls, name, description, tool_id, attributes):
"""
更新工具数据
:param tool_id: 工具编号
:type tool_id: int
:param name: 名称
:type name: str
:param description: 注释
:type description: str
:param attributes: HTTPCookieManager数据列表
:type attributes: List[Mapping[str, str]]
"""
tool = Tool.query.filter_by(id=tool_id).first()
if tool:
tool.name = name
tool.description = description
http_cookie_manager_tool = tool.specific_tool
http_cookie_manager_tool.http_cookie_manager_list = []
for attr in attributes:
http_cookie_manager_tool.http_cookie_manager_list.append(
HTTPCookieManagerList(name=attr.get('name', ''),
value=attr.get('value', ''),
domain=attr.get('domain', ''),
path=attr.get('path', ''),
secure=attr.get('secure', False))
)
db.session.commit()
@classmethod
def add(cls, name, description, attributes, status=STATUS.NORMAL):
"""
新增一条工具数据
:param name: 名称
:type name: str
:param description: 注释
:type description: str
:param attributes: HTTPCookieManager数据列表
:type attributes: List[Mapping[str, str]]
:param status: 案例状态(删除 正常)
:type status: str
"""
tool = Tool(
name=name,
description=description,
status=status,
tool_type=TOOL_TYPE.HTTP_COOKIE_MANAGER,
)
http_cookie_manager_tool = cls()
http_cookie_manager_tool.http_cookie_manager_list = []
for attr in attributes:
http_cookie_manager_tool.http_cookie_manager_list.append(
HTTPCookieManagerList(name=attr.get('name', ''),
value=attr.get('value', ''),
domain=attr.get('domain', ''),
path=attr.get('path', ''),
secure=attr.get('secure', False))
)
http_cookie_manager_tool.tool = tool
db.session.add(tool)
db.session.add(http_cookie_manager_tool)
db.session.commit()
return http_cookie_manager_tool
def copy_from_self(self):
"""
复制一个新的工具
:return: 新的工具
:rtype: Tool
"""
# 先获取变量键值对数据
copy_tool = self.add(
# Tool
name='Copy' + self.tool.name,
description=self.tool.description,
status=self.tool.status,
# HTTPCookieManagerTool
attributes=self.attributes,
)
return copy_tool
class HTTPCookieManagerList(db.Model):
"""
字段说明
name: 名称
value: 值
domain: 域
path: 路径
secure: 安全
关系说明
http_cookie_manager_tool: HTTPCookieManager数据列表对应的HTTPCookieManager工具对象 HTTPCookieManagerList:HTTPCookieManagerTool is N:1
"""
# 需要转换为json数据格式的字段
render_field_list = ['name', 'value', 'domain', 'path', 'secure']
# filed
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(512), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
domain = db.Column(db.String(512), nullable=False, default='', server_default='')
path = db.Column(db.String(512), nullable=False, default='', server_default='')
secure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
# relationship
http_cookie_manager_tool_id = db.Column(db.Integer, db.ForeignKey('http_cookie_manager_tool.id'))
http_cookie_manager_tool = db.relationship('HTTPCookieManagerTool',
back_populates='http_cookie_manager_list')
@property
def name_(self):
return _p(self.name)
@property
def value_(self):
return _p(self.value)
@property
def domain_(self):
return _p(self.domain)
@property
def path_(self):
return _p(self.path)
class Case(db.Model):
"""
字段说明
name: 案例名
description: 案例注释
status: 案例状态(删除 正常)
case_type: 案例类型
关系说明
scene: 案例所属场景 Scene:Case is 1:N
*specific_case: 不同案例类型具体数据 Case:OtherTypeCase is 1:1 OtherTypeCase可以是HTTPCase、SQLCase...
"""
id = db.Column(db.Integer, primary_key=True) # TODO: 从10000开始自增
name = db.Column(db.String(512), nullable=False, default='', server_default='')
description = db.Column(db.String(512), nullable=False, default='', server_default='')
status = db.Column(db.String(64), nullable=False, default='', server_default='')
case_type = db.Column(db.String(64), nullable=False, default='', server_default='')
# relationship
scene_id = db.Column(db.Integer, db.ForeignKey('scene.id'))
scene = db.relationship('Scene', back_populates='cases')
@property
def specific_case(self):
if self.case_type == CASE_TYPE.HTTP:
return HTTPCase.query.filter_by(case_id=self.id).first()
if self.case_type == CASE_TYPE.SSH:
return SSHCase.query.filter_by(case_id=self.id).first()
if self.case_type == CASE_TYPE.SQL:
return SQLCase.query.filter_by(case_id=self.id).first()
if self.case_type == CASE_TYPE.DEBUG:
return DebugCase.query.filter_by(case_id=self.id).first()
@property
def parent_logic_controller(self):
"""获取case所在的父逻辑控制器对象"""
sub_element_in_logic_controller = SubElementInLogicController.query.filter_by(
element_type=ELEMENT_TYPE.CASE,
element_id=self.id).first()
if sub_element_in_logic_controller is not None:
return sub_element_in_logic_controller.logic_controller
@property
def has_effective_preprocessor_script(self):
"""判断当前case是否包含有效的预处理脚本"""
script = self.specific_case.preprocessor_script
return script.strip() != ''
@property
def has_effective_postprocessor_script(self):
"""判断当前case是否包含有效的后处理脚本"""
script = self.specific_case.postprocessor_script
return script.strip() != ''
@classmethod
def new_case(cls, case_type):
"""
:param case_type: 案例组件类型
:return: 特定类型案例对象
:rtype: Union[HTTPCase, SSHCase, SQLCase, DebugCase]
"""
case = __class__(
name='新的' + case_type + '测试案例',
description='',
case_type=case_type,
status=STATUS.NORMAL,
)
if case_type == CASE_TYPE.HTTP:
specific_case = HTTPCase(protocol='HTTP', method='GET')
specific_case.case = case
elif case_type == CASE_TYPE.SSH:
specific_case = SSHCase(host_name='127.0.0.1', port='22', connection_timeout='5000', method='SSH')
specific_case.case = case
elif case_type == CASE_TYPE.SQL:
specific_case = SQLCase(host='localhost', port='3306', connect_timeout='10', db_type=DB_TYPE.MYSQL,
charset='utf8')
specific_case.case = case
elif case_type == CASE_TYPE.DEBUG:
specific_case = DebugCase(project_variable=True)
specific_case.case = case
else:
raise ValueError('不支持当前case_type类型, case_type=%s' % case_type)
db.session.add(specific_case)
db.session.commit()
return specific_case
@classmethod
def update(cls, id, name, description):
"""
更新一条案例
:param id: 案例编号
:param name: 案例名
:param description: 案例注释
"""
case = cls.query.filter_by(id=int(id)).first()
if case:
case.name = name
case.description = description
db.session.commit()
return case
@classmethod
def delete(cls, id):
"""不会真正从数据库删除更改status状态为0"""
case = cls.query.filter_by(id=int(id)).first()
if case is not None:
case.status = STATUS.DELETED
# case所在的逻辑组件序号改为-1
SubElementInLogicController.update_order(element_id=case.id, element_type=ELEMENT_TYPE.CASE,
order_in_logic_controller=-1)
db.session.commit()
@classmethod
def exist_and_status_not_delete(cls, id) -> bool:
"""根据id判断该案例存在且状态正常未删除"""
case = cls.query.filter(cls.id == int(id),
cls.status.in_([STATUS.NORMAL, STATUS.FORBIDDEN])).first()
return True if case is not None else False
@classmethod
def copy_from_id(cls, case_id, scene_id, logic_controller_id=None):
"""
根据id复制一个新的case
:param case_id: 被复制的case id
:type case_id: int
:param scene_id: 新case所属的scene_id, 默认为被复制case的scene_id
:type scene_id: int
:param logic_controller_id: 新case所属的父逻辑控制id
:type logic_controller_id: int
:rtype: Case
"""
case = cls.query.filter_by(id=int(case_id)).first()
if case:
copy_specific_case = case.specific_case.copy_from_self(scene_id=scene_id)
# 在SubElementInLogicController表中插入一条子组件数据
SubElementInLogicController.add(element_id=copy_specific_case.case.id, element_type=ELEMENT_TYPE.CASE,
logic_controller_id=logic_controller_id)
return copy_specific_case.case
@classmethod
def forbidden(cls, id):
"""禁用测试组件"""
case = cls.query.filter_by(id=int(id)).first()
if case:
case.status = STATUS.FORBIDDEN
db.session.commit()
@classmethod
def enable(cls, id):
"""启用测试组件"""
case = cls.query.filter_by(id=int(id)).first()
if case:
case.status = STATUS.NORMAL
db.session.commit()
class HTTPCase(db.Model):
"""
字段说明
protocol: 协议
domain: 服务器名称或IP
port: 端口
method: 方法
path: url路径
encoding: 编码
expectation_logic: 期望结果逻辑处理(与 或)
message_body: 消息体数据
content_type: HTTP请求体数据类型
preprocessor_script: 预处理脚本
postprocessor_script: 后处理脚本
postprocessor_failure: 后处理断言失败
postprocessor_failure_message: 后处理断言失败信息
关系说明
case: 案例组件 Case:HTTPCase is 1:1
parameters: 案例参数 HTTPCase:HTTPCaseParameter is 1:N
headers: 请求头参数 HTTPCase:HTTPCaseHeader is 1:N
expectations: 案例期望 HTTPCase:HTTPCaseExpectation is 1:N
file_upload: 文件上传参数 HTTPCase:HTTPCaseFileUpload is 1:N
"""
id = db.Column(db.Integer, primary_key=True)
protocol = db.Column(db.String(128), nullable=False, default='', server_default='')
domain = db.Column(db.String(512), nullable=False, default='', server_default='')
port = db.Column(db.String(128), nullable=False, default='', server_default='')
method = db.Column(db.String(128), nullable=False, default='', server_default='')
path = db.Column(db.String(512), nullable=False, default='', server_default='')
encoding = db.Column(db.String(128), nullable=False, default='', server_default='')
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND, server_default=EXPECTATION_LOGIC.AND)
message_body = db.Column(db.Text, nullable=False, default='', server_default='')
content_type = db.Column(db.String(128), nullable=False, default='', server_default='')
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_failure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
case_id = db.Column(db.Integer, db.ForeignKey('case.id'))
case = db.relationship('Case')
parameters = db.relationship('HTTPCaseParameter', back_populates='http_case', cascade='all, delete-orphan') # 解除与case的关系后删除
headers = db.relationship('HTTPCaseHeader', back_populates='http_case', cascade='all, delete-orphan')
expectations = db.relationship('HTTPCaseExpectation', back_populates='http_case', cascade='all, delete-orphan')
file_upload = db.relationship('HTTPCaseFileUpload', back_populates='http_case', cascade='all, delete-orphan')
# 解析处理
@property
def domain_(self):
return _p(self.domain)
@property
def port_(self):
return _p(self.port)
@property
def path_(self):
return _p(self.path)
@property
def encoding_(self):
return _p(self.encoding)
@property
def message_body_(self):
return _p(self.message_body)
@property
def preprocessor_script_(self):
return _p(self.preprocessor_script)
@property
def postprocessor_script_(self):
return _p(self.postprocessor_script)
@classmethod
def add(cls, name, description, scene_id, protocol, domain, port, method, path, encoding, expectation_logic,
message_body, content_type, preprocessor_script, postprocessor_script, status=STATUS.NORMAL,
parameters=None, headers=None, file_upload=None, expectations=None):
"""
新增一条案例
:param name: 案例名
:param description: 案例注释
:param status: 案例状态(删除 正常)
:param scene_id: 所属的场景编号scene_id
:param protocol: 协议
:param domain: 服务器名称或IP
:param port: 端口
:param method: 方法
:param path: url路径
:param encoding: 编码
:param expectation_logic: 期望结果逻辑处理
:param message_body: 消息体数据
:param content_type: 请求体数据类型
:param preprocessor_script: 预处理脚本
:param postprocessor_script: 后处理脚本
:param parameters: 参数数据
:param headers: 请求头数据
:param file_upload: 文件上传参数数据
:param expectations: 期望数据
"""
case = Case(
name=name,
description=description,
status=status,
scene_id=scene_id,
case_type=CASE_TYPE.HTTP,
)
http_case = cls(
protocol=protocol,
domain=domain,
port=port,
method=method,
path=path,
encoding=encoding,
expectation_logic=expectation_logic,
message_body=message_body,
content_type=content_type,
preprocessor_script=preprocessor_script,
postprocessor_script=postprocessor_script,
)
http_case.case = case
http_case._update_parameters(parameters)
http_case._update_headers(headers)
http_case._update_expectations(expectations)
http_case._update_file_upload(file_upload)
db.session.add(case)
db.session.add(http_case)
db.session.commit()
return http_case
@classmethod
def update(cls, id, name, description, protocol, domain, port, method, path, encoding, expectation_logic,
message_body, content_type, preprocessor_script, postprocessor_script, params=None, headers=None,
file_upload=None, expectations=None):
"""
更新一条案例
:param id: 案例编号
:param name: 案例名
:param description: 案例注释
:param protocol: 协议
:param domain: 服务器名称或IP
:param port: 端口
:param method: 方法
:param path: url路径
:param encoding: 编码
:param expectation_logic: 期望结果逻辑处理
:param message_body: 消息体数据
:param content_type: 请求体数据类型
:param preprocessor_script: 预处理脚本
:param postprocessor_script: 后处理脚本
:param params: 参数数据
:param headers: 请求头数据
:param file_upload: 文件上传参数
:param expectations: 期望数据
"""
case = Case.query.filter_by(id=int(id)).first()
http_case = case.specific_case
if case:
case.name = name
case.description = description
if http_case:
http_case.protocol = protocol
http_case.domain = domain
http_case.port = port
http_case.method = method
http_case.path = path
http_case.encoding = encoding
http_case.expectation_logic = expectation_logic
http_case.message_body = message_body
http_case.content_type = content_type
http_case.preprocessor_script = preprocessor_script
http_case.postprocessor_script = postprocessor_script
http_case._update_parameters(params)
http_case._update_headers(headers)
http_case._update_file_upload(file_upload)
http_case._update_expectations(expectations)
db.session.commit()
def copy_from_self(self, scene_id):
"""
复制一个新的案例
:param scene_id: 新case所属的scene id如果是None则新案例还在当前测试场景中
:return: 新的案例
:rtype: Case
"""
copy_case = self.add(
# Case
name='Copy' + self.case.name,
description=self.case.description,
status=self.case.status,
scene_id=self.case.scene_id if scene_id is None else scene_id, # if scene_id is None 表示
# HTTPCase
protocol=self.protocol,
domain=self.domain,
port=self.port,
method=self.method,
path=self.path,
encoding=self.encoding,
expectation_logic=self.expectation_logic,
message_body=self.message_body,
content_type=self.content_type,
preprocessor_script=self.preprocessor_script,
postprocessor_script=self.postprocessor_script,
parameters=self.parameters,
headers=self.headers,
expectations=self.expectations,
file_upload=self.file_upload,
)
return copy_case
def update_postprocessor_failure(self, postprocessor_failure, postprocessor_failure_message):
"""更新后处理脚本结果和错误信息"""
self.postprocessor_failure = postprocessor_failure
self.postprocessor_failure_message = postprocessor_failure_message
db.session.commit()
def _update_parameters(self, parameters):
self.parameters = []
if parameters is not None:
for _parameter in parameters:
if hasattr(_parameter, 'get'):
parameter = HTTPCaseParameter(
name=_parameter.get('name'),
value=_parameter.get('value'),
url_encode=_parameter.get('url_encode'),
content_type=_parameter.get('content_type'),
include_equals=_parameter.get('include_equals'),
)
else: # copy_from_self func exec this branch
parameter = HTTPCaseParameter(
name=_parameter.name,
value=_parameter.value,
url_encode=_parameter.url_encode,
content_type=_parameter.content_type,
include_equals=_parameter.include_equals,
)
self.parameters.append(parameter)
def _update_headers(self, headers):
self.headers = []
if headers is not None:
for _header in headers:
if hasattr(_header, 'get'):
header = HTTPCaseHeader(
name=_header.get('name'),
value=_header.get('value'),
)
else: # copy_from_self func exec this branch
header = HTTPCaseHeader(
name=_header.name,
value=_header.value,
)
self.headers.append(header)
def _update_expectations(self, expectations):
self.expectations = []
if expectations is not None:
for _expectation in expectations:
if hasattr(_expectation, 'get'):
expectation = HTTPCaseExpectation(
test_field=_expectation.get('test_field'),
value=_expectation.get('value'),
matching_rule=_expectation.get('matching_rule'),
last_result=_expectation.get('last_result'),
last_failure_msg=_expectation.get('last_failure_msg'),
negater=_expectation.get('negater'),
)
else: # copy_from_self func exec this branch
expectation = HTTPCaseExpectation(
test_field=_expectation.test_field,
value=_expectation.value,
matching_rule=_expectation.matching_rule,
last_result=_expectation.last_result,
last_failure_msg=_expectation.last_failure_msg,
negater=_expectation.negater,
)
self.expectations.append(expectation)
def _update_file_upload(self, file_upload):
self.file_upload = []
if file_upload is not None:
for _file_upload in file_upload:
if hasattr(_file_upload, 'get'):
instance = HTTPCaseFileUpload(
name=_file_upload.get('name'),
value=_file_upload.get('value'),
)
else: # copy_from_self func exec this branch
instance = HTTPCaseFileUpload(
name=_file_upload.name,
value=_file_upload.value,
)
self.file_upload.append(instance)
class HTTPCaseParameter(db.Model):
"""
字段说明
name: 参数名
value: 参数值
url_encode: 编码?
content_type: 内容类型
include_equals: 包含等于?
关系说明
http_case: 参数所属案例 HTTPCase:HTTPCaseParameter is 1:N
"""
# 需要转换为json数据格式的字段
render_field_list = ['name', 'value', 'url_encode', 'content_type', 'include_equals']
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
url_encode = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
content_type = db.Column(db.String(128), nullable=False, default='', server_default='')
include_equals = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
# relationship
http_case_id = db.Column(db.Integer, db.ForeignKey('http_case.id'))
http_case = db.relationship('HTTPCase', back_populates='parameters')
# 解析处理
@property
def name_(self):
return _p(self.name)
@property
def value_(self):
return _p(self.value)
class HTTPCaseHeader(db.Model):
"""
字段说明
name: 参数名
value: 参数值
关系说明
http_case: 请求头所属HTTP案例 HTTPCase:HTTPCaseHeader is 1:N
"""
# 需要转换为json数据格式的字段
render_field_list = ['name', 'value']
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(512), nullable=False, default='', server_default='')
value = db.Column(db.String(512), nullable=False, default='', server_default='')
# relationship
http_case_id = db.Column(db.Integer, db.ForeignKey('http_case.id'))
http_case = db.relationship('HTTPCase', back_populates='headers')
# 解析处理
@property
def name_(self):
return _p(self.name)
@property
def value_(self):
return _p(self.value)
class HTTPCaseExpectation(db.Model):
"""
字段说明
test_field: 测试字段
value: 期望值
matching_rule: 匹配模式
last_result: 上次断言结果
last_failure_msg: 上次断言信息
negater: 断言结果取反
关系说明
http_case: 期望所属案例 HTTPCase:HTTPCaseExpectation is 1:N
"""
# 需要转换为json数据格式的字段
render_field_list = ['test_field', 'value', 'matching_rule', 'last_result', 'last_failure_msg', 'negater']
# field
id = db.Column(db.Integer, primary_key=True)
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
last_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
last_failure_msg = db.Column(db.Text, nullable=False, default='初次执行前默认失败', server_default='初次执行前默认失败')
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
# relationship
http_case_id = db.Column(db.Integer, db.ForeignKey('http_case.id'))
http_case = db.relationship('HTTPCase', back_populates='expectations')
last_failure_msg_max_length = 1000
# 解析处理
@property
def value_(self):
return _p(self.value)
def update_assert_result(self, last_result, last_failure_msg):
"""更新断言结果"""
self.last_result = last_result
if isinstance(last_failure_msg, str):
if len(last_failure_msg) > __class__.last_failure_msg_max_length:
self.last_failure_msg = last_failure_msg[:__class__.last_failure_msg_max_length] + '... (内容过长已截断)'
else:
self.last_failure_msg = last_failure_msg
else:
self.last_failure_msg = ''
db.session.commit()
class HTTPCaseFileUpload(db.Model):
"""
字段说明
name: 参数名
value: 参数值
关系说明
http_case: 文件上传参数所属案例 HTTPCase:HTTPCaseFileUpload is 1:N
"""
# 需要转换为json数据格式的字段
render_field_list = ['name', 'value']
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
http_case_id = db.Column(db.Integer, db.ForeignKey('http_case.id'))
http_case = db.relationship('HTTPCase', back_populates='file_upload')
# 解析处理
@property
def name_(self):
return _p(self.name)
@property
def value_(self):
return _p(self.value)
class SSHCase(db.Model):
"""
字段说明
host_name: 主机名
port: 端口号
connection_timeout: 超时时间
user_name: 用户名
password: 密码
command: 命令内容
expectation_logic: 期望结果逻辑处理(与 或)
method: 方法默认为'SSH'
preprocessor_script: 预处理脚本
postprocessor_script: 后处理脚本
postprocessor_failure: 后处理断言失败
postprocessor_failure_message: 后处理断言失败信息
关系说明
case: 案例组件 Case:SSHCase is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
host_name = db.Column(db.String(128), nullable=False, default='', server_default='')
port = db.Column(db.String(128), nullable=False, default='', server_default='')
connection_timeout = db.Column(db.String(64), nullable=False, default='', server_default='')
user_name = db.Column(db.String(128), nullable=False, default='', server_default='')
password = db.Column(db.String(128), nullable=False, default='', server_default='')
command = db.Column(db.TEXT, nullable=False, default='', server_default='')
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND, server_default=EXPECTATION_LOGIC.AND)
method = db.Column(db.String(128), nullable=False, default='', server_default='')
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_failure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
case_id = db.Column(db.Integer, db.ForeignKey('case.id'))
case = db.relationship('Case')
expectations = db.relationship('SSHCaseExpectation', back_populates='ssh_case', cascade='all, delete-orphan')
@property
def host_name_(self):
return _p(self.host_name)
@property
def port_(self):
return _p(self.port)
@property
def connection_timeout_(self):
return _p(self.connection_timeout)
@property
def user_name_(self):
return _p(self.user_name)
@property
def password_(self):
return _p(self.password)
@property
def command_(self):
return _p(self.command)
@property
def preprocessor_script_(self):
return _p(self.preprocessor_script)
@property
def postprocessor_script_(self):
return _p(self.postprocessor_script)
@classmethod
def add(cls, name, description, scene_id, host_name, port, method, connection_timeout, user_name, password, command,
expectation_logic, preprocessor_script, postprocessor_script, status=STATUS.NORMAL, expectations=None):
"""
新增一条案例
:param name: 案例名
:param description: 案例注释
:param status: 案例状态(删除 正常)
:param scene_id: 所属的场景编号scene_id
:param host_name: 主机名
:param port: 端口
:param method: 方法
:param connection_timeout: 超时时间
:param user_name: 用户名
:param password: 密码
:param command: 命令内容
:param expectation_logic: 期望结果逻辑处理
:param preprocessor_script: 预处理脚本
:param postprocessor_script: 后处理脚本
:param expectations: 期望数据
"""
case = Case(
name=name,
description=description,
status=status,
scene_id=scene_id,
case_type=CASE_TYPE.SSH,
)
ssh_case = cls(
host_name=host_name,
port=port,
connection_timeout=connection_timeout,
method=method,
user_name=user_name,
password=password,
command=command,
expectation_logic=expectation_logic,
preprocessor_script=preprocessor_script,
postprocessor_script=postprocessor_script,
)
ssh_case.case = case
ssh_case._update_expectations(expectations)
db.session.add(case)
db.session.add(ssh_case)
db.session.commit()
return ssh_case
@classmethod
def update(cls, id, name, description, host_name, port, connection_timeout, user_name, password, command,
expectation_logic, preprocessor_script, postprocessor_script, expectations=None):
"""
更新一条案例
:param id: 案例编号
:param name: 案例名
:param description: 案例注释
:param host_name: 主机名
:param port: 端口
:param connection_timeout: 超时时间
:param user_name: 用户名
:param password: 密码
:param expectation_logic: 期望结果逻辑处理
:param command: 命令内容
:param preprocessor_script: 预处理脚本
:param postprocessor_script: 后处理脚本
:param expectations: 期望数据
"""
case = Case.query.filter_by(id=int(id)).first()
ssh_case = case.specific_case
if case:
case.name = name
case.description = description
if ssh_case:
ssh_case.host_name = host_name
ssh_case.port = port
ssh_case.connection_timeout = connection_timeout
ssh_case.user_name = user_name
ssh_case.password = password
ssh_case.command = command
ssh_case.expectation_logic = expectation_logic
ssh_case.preprocessor_script = preprocessor_script
ssh_case.postprocessor_script = postprocessor_script
ssh_case._update_expectations(expectations)
db.session.commit()
def copy_from_self(self, scene_id):
"""
复制一个新的案例
:param scene_id: 新case所属的scene id如果是None则新案例还在当前测试场景中
:return:
"""
copy_case = self.add(
# Case
name='Copy' + self.case.name,
description=self.case.description,
status=self.case.status,
scene_id=self.case.scene_id if scene_id is None else scene_id, # if scene_id is None 表示
# SSHCase
host_name=self.host_name,
port=self.port,
method=self.method,
connection_timeout=self.connection_timeout,
user_name=self.user_name,
password=self.password,
command=self.command,
expectation_logic=self.expectation_logic,
preprocessor_script=self.preprocessor_script,
postprocessor_script=self.postprocessor_script,
expectations=self.expectations,
)
return copy_case
def _update_expectations(self, expectations):
self.expectations = []
if expectations is not None:
for _expectation in expectations:
if hasattr(_expectation, 'get'):
expectation = SSHCaseExpectation(
test_field=_expectation.get('test_field'),
value=_expectation.get('value'),
matching_rule=_expectation.get('matching_rule'),
last_result=_expectation.get('last_result'),
last_failure_msg=_expectation.get('last_failure_msg'),
negater=_expectation.get('negater'),
)
else: # copy_from_self func exec this branch
expectation = SSHCaseExpectation(
test_field=_expectation.test_field,
value=_expectation.value,
matching_rule=_expectation.matching_rule,
last_result=_expectation.last_result,
last_failure_msg=_expectation.last_failure_msg,
negater=_expectation.negater,
)
self.expectations.append(expectation)
def update_postprocessor_failure(self, postprocessor_failure, postprocessor_failure_message):
"""更新后处理脚本结果和错误信息"""
self.postprocessor_failure = postprocessor_failure
self.postprocessor_failure_message = postprocessor_failure_message
db.session.commit()
class SSHCaseExpectation(db.Model):
"""
字段说明
test_field: 测试字段
value: 期望值
matching_rule: 匹配模式
last_result: 上次断言结果
last_failure_msg: 上次断言信息
negater: 断言结果取反
关系说明
ssh_case: 期望所属案例 SSHCase:SSHCaseExpectation is 1:N
"""
# 需要转换为json数据格式的字段
render_field_list = ['test_field', 'value', 'matching_rule', 'last_result', 'last_failure_msg', 'negater']
# field
id = db.Column(db.Integer, primary_key=True)
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
last_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
last_failure_msg = db.Column(db.Text, nullable=False, default='初次执行前默认失败', server_default='初次执行前默认失败')
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
# relationship
ssh_case_id = db.Column(db.Integer, db.ForeignKey('ssh_case.id'))
ssh_case = db.relationship('SSHCase', back_populates='expectations')
last_failure_msg_max_length = 1000
# 解析处理
@property
def value_(self):
return _p(self.value)
def update_assert_result(self, last_result, last_failure_msg):
"""更新断言结果"""
self.last_result = last_result
if isinstance(last_failure_msg, str):
if len(last_failure_msg) > __class__.last_failure_msg_max_length:
self.last_failure_msg = last_failure_msg[:__class__.last_failure_msg_max_length] + '... (内容过长已截断)'
else:
self.last_failure_msg = last_failure_msg
else:
self.last_failure_msg = ''
db.session.commit()
class SQLCase(db.Model):
"""
字段说明
host: 主机地址
port: 端口号
connect_timeout: 超时时间 单位s
user: 用户名
password: 密码
sql: sql脚本
expectation_logic: 期望结果逻辑处理(与 或)
db_type: 数据库类型 DB_TYPE
charset: 字符集
preprocessor_script: 预处理脚本
postprocessor_script: 后处理脚本
postprocessor_failure: 后处理断言失败
postprocessor_failure_message: 后处理断言失败信息
关系说明
case: 案例组件 Case:SQLCase is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
host = db.Column(db.String(128), nullable=False, default='', server_default='')
port = db.Column(db.String(128), nullable=False, default='', server_default='')
connect_timeout = db.Column(db.String(64), nullable=False, default='', server_default='')
user = db.Column(db.String(128), nullable=False, default='', server_default='')
password = db.Column(db.String(128), nullable=False, default='', server_default='')
sql = db.Column(db.TEXT, nullable=False, default='', server_default='')
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND, server_default=EXPECTATION_LOGIC.AND)
db_type = db.Column(db.String(128), nullable=False, default='', server_default='')
charset = db.Column(db.String(64), nullable=False, default='', server_default='')
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_failure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
case_id = db.Column(db.Integer, db.ForeignKey('case.id'))
case = db.relationship('Case')
expectations = db.relationship('SQLCaseExpectation', back_populates='sql_case', cascade='all, delete-orphan')
@property
def host_(self):
return _p(self.host)
@property
def port_(self):
return _p(self.port)
@property
def connect_timeout_(self):
return _p(self.connect_timeout)
@property
def user_(self):
return _p(self.user)
@property
def password_(self):
return _p(self.password)
@property
def sql_(self):
return _p(self.sql)
@property
def charset_(self):
return _p(self.charset)
@property
def preprocessor_script_(self):
return _p(self.preprocessor_script)
@property
def postprocessor_script_(self):
return _p(self.postprocessor_script)
@classmethod
def add(cls, name, description, scene_id, host, port, db_type, connect_timeout, user, password, sql, charset,
expectation_logic, preprocessor_script, postprocessor_script, status=STATUS.NORMAL, expectations=None):
"""
新增一条案例
:param name: 案例名
:param description: 案例注释
:param status: 案例状态(删除 正常)
:param scene_id: 所属的场景编号scene_id
:param host: 主机地址
:param port: 端口
:param db_type: 数据库类型
:param connect_timeout: 超时时间 单位s
:param user: 用户名
:param password: 密码
:param sql: sql脚本
:param charset: 字符集
:param expectation_logic: 期望结果逻辑处理
:param preprocessor_script: 预处理脚本
:param postprocessor_script: 后处理脚本
:param expectations: 期望数据
"""
case = Case(
name=name,
description=description,
status=status,
scene_id=scene_id,
case_type=CASE_TYPE.SQL,
)
sql_case = cls(
host=host,
port=port,
connect_timeout=connect_timeout,
db_type=db_type,
user=user,
password=password,
sql=sql,
charset=charset,
expectation_logic=expectation_logic,
preprocessor_script=preprocessor_script,
postprocessor_script=postprocessor_script,
)
sql_case.case = case
sql_case._update_expectations(expectations)
db.session.add(case)
db.session.add(sql_case)
db.session.commit()
return sql_case
@classmethod
def update(cls, id, name, description, host, port, connect_timeout, user, password, db_type, sql, charset,
expectation_logic, preprocessor_script, postprocessor_script, expectations=None):
"""
更新一条案例
:param id: 案例编号
:param name: 案例名
:param description: 案例注释
:param host: 主机地址
:param port: 端口
:param connect_timeout: 超时时间 单位s
:param user: 用户名
:param password: 密码
:param expectation_logic: 期望结果逻辑处理
:param db_type: 数据库类型
:param sql: sql脚本
:param charset: 字符集
:param preprocessor_script: 预处理脚本
:param postprocessor_script: 后处理脚本
:param expectations: 期望数据
"""
case = Case.query.filter_by(id=int(id)).first()
sql_case = case.specific_case
if case:
case.name = name
case.description = description
if sql_case:
sql_case.host = host
sql_case.port = port
sql_case.connect_timeout = connect_timeout
sql_case.user = user
sql_case.password = password
sql_case.db_type = db_type
sql_case.sql = sql
sql_case.charset = charset
sql_case.expectation_logic = expectation_logic
sql_case.preprocessor_script = preprocessor_script
sql_case.postprocessor_script = postprocessor_script
sql_case._update_expectations(expectations)
db.session.commit()
def copy_from_self(self, scene_id):
"""
复制一个新的案例
:param scene_id: 新case所属的scene id如果是None则新案例还在当前测试场景中
:return:
"""
copy_case = self.add(
# Case
name='Copy' + self.case.name,
description=self.case.description,
status=self.case.status,
scene_id=self.case.scene_id if scene_id is None else scene_id, # if scene_id is None 表示
# SQLCase
host=self.host,
port=self.port,
db_type=self.db_type,
connect_timeout=self.connect_timeout,
user=self.user,
password=self.password,
sql=self.sql,
charset=self.charset,
expectation_logic=self.expectation_logic,
preprocessor_script=self.preprocessor_script,
postprocessor_script=self.postprocessor_script,
expectations=self.expectations,
)
return copy_case
def _update_expectations(self, expectations):
self.expectations = []
if expectations is not None:
for _expectation in expectations:
if hasattr(_expectation, 'get'):
expectation = SQLCaseExpectation(
test_field=_expectation.get('test_field'),
value=_expectation.get('value'),
matching_rule=_expectation.get('matching_rule'),
last_result=_expectation.get('last_result'),
last_failure_msg=_expectation.get('last_failure_msg'),
negater=_expectation.get('negater'),
)
else: # copy_from_self func exec this branch
expectation = SQLCaseExpectation(
test_field=_expectation.test_field,
value=_expectation.value,
matching_rule=_expectation.matching_rule,
last_result=_expectation.last_result,
last_failure_msg=_expectation.last_failure_msg,
negater=_expectation.negater,
)
self.expectations.append(expectation)
def update_postprocessor_failure(self, postprocessor_failure, postprocessor_failure_message):
"""更新后处理脚本结果和错误信息"""
self.postprocessor_failure = postprocessor_failure
self.postprocessor_failure_message = postprocessor_failure_message
db.session.commit()
class SQLCaseExpectation(db.Model):
"""
字段说明
test_field: 测试字段
value: 期望值
matching_rule: 匹配模式
last_result: 上次断言结果
last_failure_msg: 上次断言信息
negater: 断言结果取反
关系说明
sql_case: 期望所属案例 SQLCase:SQLCaseExpectation is 1:N
"""
# 需要转换为json数据格式的字段
render_field_list = ['test_field', 'value', 'matching_rule', 'last_result', 'last_failure_msg', 'negater']
# field
id = db.Column(db.Integer, primary_key=True)
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
last_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
last_failure_msg = db.Column(db.Text, nullable=False, default='初次执行前默认失败', server_default='初次执行前默认失败')
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
# relationship
sql_case_id = db.Column(db.Integer, db.ForeignKey('sql_case.id'))
sql_case = db.relationship('SQLCase', back_populates='expectations')
last_failure_msg_max_length = 1000
# 解析处理
@property
def value_(self):
return _p(self.value)
def update_assert_result(self, last_result, last_failure_msg):
"""更新断言结果"""
self.last_result = last_result
if isinstance(last_failure_msg, str):
if len(last_failure_msg) > __class__.last_failure_msg_max_length:
self.last_failure_msg = last_failure_msg[:__class__.last_failure_msg_max_length] + '... (内容过长已截断)'
else:
self.last_failure_msg = last_failure_msg
else:
self.last_failure_msg = ''
db.session.commit()
class DebugCase(db.Model):
"""
字段说明
project_variable: 是否展示项目变量
expectation_logic: 期望结果逻辑处理(与 或)
preprocessor_script: 预处理脚本
postprocessor_script: 后处理脚本
postprocessor_failure: 后处理断言失败
postprocessor_failure_message: 后处理断言失败信息
关系说明
case: 案例组件 Case:DebugCase is 1:1
"""
id = db.Column(db.Integer, primary_key=True)
project_variable = db.Column(db.Boolean, nullable=False, default=False, server_default='1')
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND, server_default=EXPECTATION_LOGIC.AND)
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_failure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
case_id = db.Column(db.Integer, db.ForeignKey('case.id'))
case = db.relationship('Case')
expectations = db.relationship('DebugCaseExpectation', back_populates='debug_case', cascade='all, delete-orphan')
@property
def preprocessor_script_(self):
return _p(self.preprocessor_script)
@property
def postprocessor_script_(self):
return _p(self.postprocessor_script)
@classmethod
def add(cls, name, description, scene_id, project_variable, expectation_logic, preprocessor_script,
postprocessor_script, status=STATUS.NORMAL, expectations=None):
"""
新增一条案例
:param name: 案例名
:param description: 案例注释
:param status: 案例状态(删除 正常)
:param scene_id: 所属的场景编号scene_id
:param project_variable: 是否展示变量
:param expectation_logic: 期望结果逻辑处理
:param preprocessor_script: 预处理脚本
:param postprocessor_script: 后处理脚本
:param expectations: 期望数据
"""
case = Case(
name=name,
description=description,
status=status,
scene_id=scene_id,
case_type=CASE_TYPE.DEBUG,
)
debug_case = cls(
project_variable=project_variable,
expectation_logic=expectation_logic,
preprocessor_script=preprocessor_script,
postprocessor_script=postprocessor_script,
)
debug_case.case = case
debug_case._update_expectations(expectations)
db.session.add(case)
db.session.add(debug_case)
db.session.commit()
return debug_case
@classmethod
def update(cls, id, name, description, project_variable, expectation_logic, preprocessor_script,
postprocessor_script, expectations=None):
"""
更新一条案例
:param id: 案例编号
:param name: 案例名
:param description: 案例注释
:param project_variable: 是否展示项目变量
:param expectation_logic: 期望结果逻辑处理
:param preprocessor_script: 预处理脚本
:param postprocessor_script: 后处理脚本
:param expectations: 期望数据
"""
case = Case.query.filter_by(id=int(id)).first()
debug_case = case.specific_case
if case:
case.name = name
case.description = description
if debug_case:
debug_case.project_variable = project_variable
debug_case.expectation_logic = expectation_logic
debug_case.preprocessor_script = preprocessor_script
debug_case.postprocessor_script = postprocessor_script
debug_case._update_expectations(expectations)
db.session.commit()
def copy_from_self(self, scene_id):
"""
复制一个新的案例
:param scene_id: 新case所属的scene id如果是None则新案例还在当前测试场景中
:return:
"""
copy_case = self.add(
# Case
name='Copy' + self.case.name,
description=self.case.description,
status=self.case.status,
scene_id=self.case.scene_id if scene_id is None else scene_id, # if scene_id is None 表示
# DebugCase
project_variable=self.project_variable,
expectation_logic=self.expectation_logic,
preprocessor_script=self.preprocessor_script,
postprocessor_script=self.postprocessor_script,
expectations=self.expectations,
)
return copy_case
def _update_expectations(self, expectations):
self.expectations = []
if expectations is not None:
for _expectation in expectations:
if hasattr(_expectation, 'get'):
expectation = DebugCaseExpectation(
test_field=_expectation.get('test_field'),
value=_expectation.get('value'),
matching_rule=_expectation.get('matching_rule'),
last_result=_expectation.get('last_result'),
last_failure_msg=_expectation.get('last_failure_msg'),
negater=_expectation.get('negater'),
)
else: # copy_from_self func exec this branch
expectation = DebugCaseExpectation(
test_field=_expectation.test_field,
value=_expectation.value,
matching_rule=_expectation.matching_rule,
last_result=_expectation.last_result,
last_failure_msg=_expectation.last_failure_msg,
negater=_expectation.negater,
)
self.expectations.append(expectation)
def update_postprocessor_failure(self, postprocessor_failure, postprocessor_failure_message):
"""更新后处理脚本结果和错误信息"""
self.postprocessor_failure = postprocessor_failure
self.postprocessor_failure_message = postprocessor_failure_message
db.session.commit()
class DebugCaseExpectation(db.Model):
"""
字段说明
test_field: 测试字段
value: 期望值
matching_rule: 匹配模式
last_result: 上次断言结果
last_failure_msg: 上次断言信息
negater: 断言结果取反
关系说明
debug_case: 期望所属案例 DebugCase:DebugCaseExpectation is 1:N
"""
# 需要转换为json数据格式的字段
render_field_list = ['test_field', 'value', 'matching_rule', 'last_result', 'last_failure_msg', 'negater']
# field
id = db.Column(db.Integer, primary_key=True)
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
last_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
last_failure_msg = db.Column(db.Text, nullable=False, default='初次执行前默认失败', server_default='初次执行前默认失败')
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
# relationship
debug_case_id = db.Column(db.Integer, db.ForeignKey('debug_case.id'))
debug_case = db.relationship('DebugCase', back_populates='expectations')
last_failure_msg_max_length = 1000
# 解析处理
@property
def value_(self):
return _p(self.value)
def update_assert_result(self, last_result, last_failure_msg):
"""更新断言结果"""
self.last_result = last_result
if isinstance(last_failure_msg, str):
if len(last_failure_msg) > __class__.last_failure_msg_max_length:
self.last_failure_msg = last_failure_msg[:__class__.last_failure_msg_max_length] + '... (内容过长已截断)'
else:
self.last_failure_msg = last_failure_msg
else:
self.last_failure_msg = ''
db.session.commit()
class Dispatcher(db.Model):
"""
记录项目或模块运行测试时相关信息
字段说明
element_type: 元素类型 (项目 or 模块)
element_id: 元素id
status: 状态
progress: 进度
start_time: 开始时间
end_time: 结束时间
log: 调度/构建 日志
user_id: 启动调度者
关系说明
DispatcherDetail: 调度中所有执行元素 Dispatcher:DispatcherDetail is 1:N
Report: 调度结果对应报告 Dispatcher:Report is 1:1
"""
# field
id = db.Column(db.Integer, primary_key=True)
element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
element_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
status = db.Column(db.String(64), nullable=False, default='', server_default='')
progress = db.Column(db.Float(), nullable=False, default=0.0, server_default=text('0.0')) # TODO 控制精度?
start_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
end_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
log = db.Column(db.Text, nullable=False, default='', server_default='')
user_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
# relationship
details = db.relationship('DispatcherDetail', back_populates='dispatcher', cascade='all, delete-orphan')
report = db.relationship('Report', uselist=False, back_populates='dispatcher', cascade='all')
end_type = '' # 调度结束类型 DISPATCHER_END_TYPE, 暂时不需要将其作为表字段使用,而是作为实例属性使用便于在调度过程中标识
stop_on_error = True # 调度执行发生错误时终止执行与项目配置ProjectAdvancedConfiguration.stop_on_error一致
@classmethod
def add(cls, element_type, element_id):
# 定时构建会在服务器启动后第一个请求前app.before_first_request注册
# 而此时如果用户未登录则请求上下文中无法获取正确的用户id即current_user.id
# 因此这里需要做下判断
if current_user.is_authenticated:
dispatcher = cls(element_type=element_type, element_id=element_id, user_id=current_user.id)
else:
dispatcher = cls(element_type=element_type, element_id=element_id)
db.session.add(dispatcher)
db.session.commit()
return dispatcher
def update_status(self, status):
if status in [DISPATCHER_STATUS.STOPPED]: # 只在STOPPED时更新结束日期FINISHED时需手动更新日期
self.update_end_time()
self.status = status
db.session.commit()
def update_end_time(self):
self.end_time = datetime.now()
db.session.commit()
class DispatcherDetail(db.Model):
"""
记录调度中每个元素执行
字段说明
element_type: 元素类型 (项目、模块、场景、案例)
element_id: 元素id
element_name: 元素名称
parent_dispatcher_detail_id: 调度节点所属的父调度节点
parent_element_id: 所属父元素id
parent_element_type: 所属父元素类型
start_time: 开始时间
关系说明
dispatcher: 所属调度 Dispatcher:DispatcherDetail is 1:N
report_data: 单挑调度执行对应的报告数据 DispatcherDetail:ReportData is 1:1
"""
# field
id = db.Column(db.Integer, primary_key=True)
element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
element_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
element_name = db.Column(db.String(512), nullable=False, default='', server_default='') # case.name大小为512
parent_dispatcher_detail_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
parent_element_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
parent_element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
start_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
# relationship
dispatcher_id = db.Column(db.Integer, db.ForeignKey('dispatcher.id'))
dispatcher = db.relationship('Dispatcher', back_populates='details')
@property
def specific_element_report_data(self):
if self.element_type == ELEMENT_TYPE.CASE:
return ReportCaseData.query.filter_by(dispatcher_detail_id=self.id).first()
elif self.element_type == ELEMENT_TYPE.TOOL:
return ReportToolData.query.filter_by(dispatcher_detail_id=self.id).first()
@classmethod
def add(cls, element_type, element_id, element_name, dispatcher, parent_dispatcher_detail_id=None):
"""
添加一条调度子数据
:param element_type: 元素类型 (项目、模块、场景、案例、逻辑控制器)
:param element_id: 元素id
:param element_name: 元素名称
:param dispatcher: 调度对象
:param parent_dispatcher_detail_id: 当前调度子节点所属的父调度子节点
:return:
"""
parent_element_id = None
parent_element_type = None
if element_id == dispatcher.element_id and element_type == dispatcher.element_type:
# 表示当前element_id为调度触发元素parent字段为缺省值
dispatcher_detail = cls(element_type=element_type, element_id=element_id, element_name=element_name,
dispatcher_id=dispatcher.id, parent_element_id=-1, parent_element_type='',
parent_dispatcher_detail_id=-1)
else:
if element_type == ELEMENT_TYPE.MODULE:
parent_element_type = ELEMENT_TYPE.PROJECT
parent_element_id = Module.query.filter_by(id=int(element_id)).first().project_id
parent_dispatcher_detail_id = DispatcherDetail.query.filter_by(
dispatcher_id=dispatcher.id,
element_id=parent_element_id,
element_type=parent_element_type,
).first().id
elif element_type == ELEMENT_TYPE.SCENE: # 将scene的element_type改为logic_controller
parent_element_type = ELEMENT_TYPE.MODULE
parent_element_id = Scene.query.filter_by(id=int(element_id)).first().module_id
# 将scene改为logic_controller记录在表中(传进来参数element_type是SCENE, 但存到表里面需要转换为scene_controller的逻辑控制器对象数据)
element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
scene = Scene.query.filter_by(id=element_id).first()
element_id = scene.scene_controller.logic_controller.id
parent_dispatcher_detail_id = DispatcherDetail.query.filter_by(
dispatcher_id=dispatcher.id,
element_id=parent_element_id,
element_type=parent_element_type,
).first().id
elif element_type == ELEMENT_TYPE.CASE:
# parent_element_type = ELEMENT_TYPE.SCENE
parent_element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
# parent_element_id = Case.query.filter_by(id=int(element_id)).first().scene_id
parent_element_id = SubElementInLogicController.query.filter_by(
element_type=ELEMENT_TYPE.CASE,
element_id=element_id
).first().logic_controller_id
elif element_type == ELEMENT_TYPE.LOGIC_CONTROLLER:
parent_element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
parent_element_id = SubElementInLogicController.query.filter_by(
element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
element_id=element_id
).first().logic_controller_id
elif element_type == ELEMENT_TYPE.TOOL:
parent_element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
parent_element_id = SubElementInLogicController.query.filter_by(
element_type=ELEMENT_TYPE.TOOL,
element_id=element_id
).first().logic_controller_id
# parent_dispatcher_detail_id 当element_type=CASE或LOGIC_CONTROLLER会通过参数传入
dispatcher_detail = cls(element_type=element_type, element_id=element_id, element_name=element_name,
dispatcher_id=dispatcher.id, parent_dispatcher_detail_id=parent_dispatcher_detail_id,
parent_element_type=parent_element_type, parent_element_id=parent_element_id)
db.session.add(dispatcher_detail)
db.session.commit()
return dispatcher_detail
class Report(db.Model):
"""
调度报告
字段说明
name: 报告名
result: 报告结果 REPORT_RESULT
关系说明
dispatcher: 报告对应的调度 Dispatcher:Report is 1:N
report_datas: 报告中详细数据 Report:ReportData is 1:N
"""
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), nullable=False, default='', server_default='')
result = db.Column(db.String(64), nullable=False, default='', server_default='')
status = db.Column(db.String(64), nullable=False, default=STATUS.NORMAL, server_default=STATUS.NORMAL)
# relationship
dispatcher_id = db.Column(db.Integer, db.ForeignKey('dispatcher.id'))
dispatcher = db.relationship('Dispatcher', back_populates='report')
report_case_data = db.relationship('ReportCaseData', back_populates='report', cascade='all, delete-orphan')
@classmethod
def add(cls, dispatcher_id, name, result=False):
report = cls(name=name, result=result, dispatcher_id=dispatcher_id)
db.session.add(report)
db.session.commit()
return report
@classmethod
def update(cls, id, **kwargs):
report = cls.query.filter_by(id=int(id)).first()
if report is not None:
report.status = STATUS.DELETED
status = kwargs.get('status')
if status:
report.status = status
db.session.commit()
def update_result(self, result):
self.result = result
db.session.commit()
class ReportCaseData(db.Model):
"""
案例报告执行数据
字段说明
case_type: 案例类型
case_id: 案例编号
module_id: 案例所属模块id
module_name: 案例所属模块名称
request_header: 请求头
request_body: 请求体
response_header: 应答头
response_body: 应答体
preprocessor_script: 预处理脚本
postprocessor_script: 后处理脚本
postprocessor_result: 后处理脚本结果
postprocessor_failure_message: 后处理脚本错误信息
log: 执行日志
expectation_logic: 期望结果逻辑处理
elapsed_time: 耗时
result: 执行结果 REPORT_RESULT
关系说明
report: 执行数据所属的报告 Report:ReportData is 1:N
report_case_expectation_data: 执行数据包含的期望结果数据 ReportData:ReportExpectationData is 1:N
dispatcher_detail: 报告数据对应单条调度数据 DispatcherDetail:ReportData is 1:1
"""
# field
id = db.Column(db.Integer, primary_key=True)
case_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
case_type = db.Column(db.String(64), nullable=False, default='', server_default='')
module_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
module_name = db.Column(db.String(64), nullable=False, default='', server_default='')
request_header = db.Column(db.Text, nullable=False, default='', server_default='')
request_body = db.Column(db.Text, nullable=False, default='', server_default='')
response_header = db.Column(db.Text, nullable=False, default='', server_default='')
response_body = db.Column(db.Text, nullable=False, default='', server_default='')
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
postprocessor_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
log = db.Column(db.Text, nullable=False, default='', server_default='')
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND,
server_default=EXPECTATION_LOGIC.AND)
elapsed_time = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
result = db.Column(db.String(64), nullable=False, default='', server_default='')
# relationship
report_id = db.Column(db.Integer, db.ForeignKey('report.id'))
dispatcher_detail_id = db.Column(db.Integer, db.ForeignKey('dispatcher_detail.id'))
report = db.relationship('Report', back_populates='report_case_data')
report_case_expectation_data = db.relationship('ReportCaseExpectationData', back_populates='report_case_data',
cascade='all, delete-orphan')
dispatcher_detail = db.relationship('DispatcherDetail')
@classmethod
def add(cls, report_id, dispatcher_detail_id, case_type, case_id, module_id, module_name, request_header,
request_body, response_header, response_body, elapsed_time, preprocessor_script, postprocessor_script,
postprocessor_result, postprocessor_failure_message, log, expectation_logic, result):
report_case_data = cls(case_type=case_type,
case_id=case_id,
module_id=module_id,
module_name=module_name,
request_header=request_header,
request_body=request_body,
response_header=response_header,
response_body=response_body,
preprocessor_script=preprocessor_script,
postprocessor_script=postprocessor_script,
postprocessor_result=postprocessor_result,
postprocessor_failure_message=postprocessor_failure_message,
log=log,
expectation_logic=expectation_logic,
result=result,
elapsed_time=elapsed_time,
report_id=report_id,
dispatcher_detail_id=dispatcher_detail_id)
db.session.add(report_case_data)
db.session.commit()
return report_case_data
class ReportCaseExpectationData(db.Model):
"""
报告记录案例期望结果数据
字段说明
test_field: 测试字段
value: 期望值
matching_rule: 匹配模式
negater: 断言结果取反
result: 断言结果
failure_msg: 断言信息
关系说明
report_data: 期望结果数据所属的执行数据 ReportData:ReportExpectationData is 1:N
"""
# 需要转换为json数据格式的字段
render_field_list = ['test_field', 'value', 'matching_rule', 'negater', 'result', 'failure_msg']
# field
id = db.Column(db.Integer, primary_key=True)
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
failure_msg = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
report_case_data_id = db.Column(db.Integer, db.ForeignKey('report_case_data.id'))
report_case_data = db.relationship('ReportCaseData', back_populates='report_case_expectation_data')
@classmethod
def add(cls, report_case_data_id, test_field, value, matching_rule, negater, result, failure_msg):
report_case_expectation_data = cls(
report_case_data_id=report_case_data_id,
test_field=test_field,
value=value,
matching_rule=matching_rule,
negater=negater,
result=result,
failure_msg=failure_msg,
)
db.session.add(report_case_expectation_data)
db.session.commit()
return report_case_expectation_data
class ReportToolData(db.Model):
"""
工具报告执行数据
字段说明
tool_type: 工具类型
tool_id: 工具编号
关系说明
report: 执行数据所属的报告 Report:ReportToolData is 1:N
dispatcher_detail: 报告数据对应单条调度数据 DispatcherDetail:ReportToolData is 1:1
"""
# field
id = db.Column(db.Integer, primary_key=True)
tool_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
tool_type = db.Column(db.String(64), nullable=False, default='', server_default='')
# relationship
report_id = db.Column(db.Integer, db.ForeignKey('report.id'))
dispatcher_detail_id = db.Column(db.Integer, db.ForeignKey('dispatcher_detail.id'))
report = db.relationship('Report')
dispatcher_detail = db.relationship('DispatcherDetail')
report_script_tool_data = db.relationship('ReportScriptToolData',
back_populates='report_tool_data',
uselist=False,
cascade='all, delete-orphan')
report_timer_tool_data = db.relationship('ReportTimerToolData',
back_populates='report_tool_data',
uselist=False,
cascade='all, delete-orphan')
report_variable_definition_tool_data = db.relationship('ReportVariableDefinitionToolData',
back_populates='report_tool_data',
uselist=False,
cascade='all, delete-orphan')
report_http_header_manager_tool_data = db.relationship('ReportHTTPHeaderManagerToolData',
back_populates='report_tool_data',
uselist=False,
cascade='all, delete-orphan')
report_http_cookie_manager_tool_data = db.relationship('ReportHTTPCookieManagerToolData',
back_populates='report_tool_data',
uselist=False,
cascade='all, delete-orphan')
@classmethod
def add(cls, tool, report_id, dispatcher_detail_id):
report_tool_data = cls(tool_id=tool.id, tool_type=tool.tool_type, report_id=report_id,
dispatcher_detail_id=dispatcher_detail_id)
db.session.add(report_tool_data)
db.session.commit()
if tool.tool_type == TOOL_TYPE.TIMER:
ReportTimerToolData.add(delay=tool.specific_tool.delay, report_tool_data_id=report_tool_data.id)
elif tool.tool_type == TOOL_TYPE.SCRIPT:
ReportScriptToolData.add(script=tool.specific_tool.script, report_tool_data_id=report_tool_data.id)
elif tool.tool_type == TOOL_TYPE.VARIABLE_DEFINITION:
ReportVariableDefinitionToolData.add(variables=tool.specific_tool.variables, report_tool_data_id=report_tool_data.id)
elif tool.tool_type == TOOL_TYPE.HTTP_HEADER_MANAGER:
ReportHTTPHeaderManagerToolData.add(variables=tool.specific_tool.variables,
report_tool_data_id=report_tool_data.id)
elif tool.tool_type == TOOL_TYPE.HTTP_COOKIE_MANAGER:
ReportHTTPCookieManagerToolData.add(attributes=tool.specific_tool.attributes,
report_tool_data_id=report_tool_data.id)
return report_tool_data
class ReportTimerToolData(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
delay = db.Column(db.String(64), nullable=False, default='', server_default='')
# relationship
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
report_tool_data = db.relationship('ReportToolData', back_populates='report_timer_tool_data')
@classmethod
def add(cls, delay, report_tool_data_id):
db.session.add(cls(delay=delay, report_tool_data_id=report_tool_data_id))
db.session.commit()
class ReportScriptToolData(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
script = db.Column(db.TEXT, nullable=False, default='', server_default='')
# relationship
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
report_tool_data = db.relationship('ReportToolData', back_populates='report_script_tool_data')
@classmethod
def add(cls, script, report_tool_data_id):
db.session.add(cls(script=script, report_tool_data_id=report_tool_data_id))
db.session.commit()
class ReportVariableDefinitionToolData(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
# relationship
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
report_tool_data = db.relationship('ReportToolData', back_populates='report_variable_definition_tool_data')
report_variable_definition_tool_data_list = db.relationship('ReportVariableDefinitionToolListData',
back_populates='report_variable_definition_tool_data',
cascade='all, delete-orphan')
@property
def variables(self):
return [{
'name': row.name,
'value': row.value,
'description': row.description,
} for row in self.report_variable_definition_tool_data_list]
@classmethod
def add(cls, variables, report_tool_data_id):
report_variable_definition_tool_data = ReportVariableDefinitionToolData(report_tool_data_id=report_tool_data_id)
db.session.add(report_variable_definition_tool_data)
db.session.commit() # commit后report_variable_definition_tool_data.id才有值否则为None
for variable in variables:
db.session.add(ReportVariableDefinitionToolListData(
name=variable.get('name', ''),
value=variable.get('value', ''),
description=variable.get('description', ''),
report_variable_definition_tool_data_id=report_variable_definition_tool_data.id,
))
db.session.commit()
class ReportVariableDefinitionToolListData(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(512), nullable=False, default='', server_default='')
value = db.Column(db.String(512), nullable=False, default='', server_default='')
description = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
report_variable_definition_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_variable_definition_tool_data.id'))
report_variable_definition_tool_data = db.relationship('ReportVariableDefinitionToolData',
back_populates='report_variable_definition_tool_data_list')
class ReportHTTPHeaderManagerToolData(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
# relationship
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
report_tool_data = db.relationship('ReportToolData', back_populates='report_http_header_manager_tool_data')
report_http_header_manager_tool_list_data = db.relationship('ReportHTTPHeaderManagerToolListData',
back_populates='report_http_header_manager_tool_data',
cascade='all, delete-orphan')
@property
def variables(self):
return [{
'name': row.name,
'value': row.value,
'description': row.description,
} for row in self.report_http_header_manager_tool_list_data]
@classmethod
def add(cls, variables, report_tool_data_id):
report_http_header_manager_tool_data = ReportHTTPHeaderManagerToolData(report_tool_data_id=report_tool_data_id)
db.session.add(report_http_header_manager_tool_data)
db.session.commit() # commit后report_http_header_manager_tool_data.id才有值否则为None
for variable in variables:
db.session.add(ReportHTTPHeaderManagerToolListData(
name=variable.get('name', ''),
value=variable.get('value', ''),
description=variable.get('description', ''),
report_http_header_manager_tool_data_id=report_http_header_manager_tool_data.id,
))
db.session.commit()
class ReportHTTPHeaderManagerToolListData(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(512), nullable=False, default='', server_default='')
value = db.Column(db.String(512), nullable=False, default='', server_default='')
description = db.Column(db.Text, nullable=False, default='', server_default='')
# relationship
report_http_header_manager_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_http_header_manager_tool_data.id'))
report_http_header_manager_tool_data = db.relationship('ReportHTTPHeaderManagerToolData',
back_populates='report_http_header_manager_tool_list_data')
class ReportHTTPCookieManagerToolData(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
# relationship
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
report_tool_data = db.relationship('ReportToolData', back_populates='report_http_cookie_manager_tool_data')
report_http_cookie_manager_tool_list_data = db.relationship('ReportHTTPCookieManagerToolListData',
back_populates='report_http_cookie_manager_tool_data',
cascade='all, delete-orphan')
@property
def attributes(self):
return [{
'name': row.name,
'value': row.value,
'domain': row.domain,
'path': row.path,
'secure': row.secure,
} for row in self.report_http_cookie_manager_tool_list_data]
@classmethod
def add(cls, attributes, report_tool_data_id):
report_http_cookie_manager_tool_data = ReportHTTPCookieManagerToolData(report_tool_data_id=report_tool_data_id)
db.session.add(report_http_cookie_manager_tool_data)
db.session.commit() # commit后report_http_cookie_manager_tool_data.id才有值否则为None
for attr in attributes:
db.session.add(ReportHTTPCookieManagerToolListData(
name=attr.get('name', ''),
value=attr.get('value', ''),
domain=attr.get('domain', ''),
path=attr.get('path', ''),
secure=attr.get('secure', False),
report_http_cookie_manager_tool_data_id=report_http_cookie_manager_tool_data.id,
))
db.session.commit()
class ReportHTTPCookieManagerToolListData(db.Model):
# field
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(512), nullable=False, default='', server_default='')
value = db.Column(db.Text, nullable=False, default='', server_default='')
domain = db.Column(db.String(512), nullable=False, default='', server_default='')
path = db.Column(db.String(512), nullable=False, default='', server_default='')
secure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
# relationship
report_http_cookie_manager_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_http_cookie_manager_tool_data.id'))
report_http_cookie_manager_tool_data = db.relationship('ReportHTTPCookieManagerToolData',
back_populates='report_http_cookie_manager_tool_list_data')
class Scheduler(db.Model):
"""
调度定时任务
字段说明
element_type 元素类型
element_id 元素id
cron cron表达式
enable 是否启用调度任务
"""
# field
id = db.Column(db.Integer, primary_key=True)
element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
element_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
cron = db.Column(db.Text, nullable=False, default='', server_default='')
enable = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
@classmethod
def get_scheduler(cls, element_type, element_id):
return cls.query.filter_by(element_id=element_id, element_type=element_type).first()
@classmethod
def update(cls, element_type, element_id, enable, cron):
scheduler = cls.get_scheduler(element_type=element_type, element_id=element_id)
scheduler.cron = cron
scheduler.enable = enable
db.session.commit()
@classmethod
def add(cls, element_type, element_id, enable, cron):
db.session.add(cls(element_type=element_type, element_id=element_id, enable=enable, cron=cron))
db.session.commit()
class DingTalkRobotSetting(db.Model):
"""
钉钉机器人配置
字段说明
enable 是否启用当前钉钉机器人
access_token token信息创建钉钉机器人时自动生成
secret 钉钉机器人密钥(安全设置-加签)参考文档: https://ding-doc.dingtalk.com/document#/org-dev-guide/qf2nxq#topic-1914047
at_all 通知时是否@全体成员
at_mobiles 通知时@一个或多个成员(使用手机号,多个手机号使用逗号分隔)
"""
# field
id = db.Column(db.Integer, primary_key=True)
enable = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
access_token = db.Column(db.String(512), nullable=False, default='', server_default='')
secret = db.Column(db.String(512), nullable=False, default='', server_default='')
at_all = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
at_mobiles = db.Column(db.Text, nullable=False, default='', server_default='')
@classmethod
def get_project_ding_talk_robot_setting(cls, project_id):
"""获取该项目绑定的钉钉机器人,返回列表,可能有多个钉钉机器人配置"""
ding_talk_robot_settings = []
rows = DingTalkRobotSettingAssociationProject.query.filter_by(project_id=project_id).all()
if rows:
for row in rows:
ding_talk_robot_setting = cls.query.filter_by(id=row.setting_id).first()
if ding_talk_robot_setting:
ding_talk_robot_settings.append(ding_talk_robot_setting)
return ding_talk_robot_settings
@classmethod
def add(cls, enable, access_token, secret, at_all, at_mobiles, projects=None):
ding_talk_robot_setting = cls(enable=enable,
access_token=access_token,
secret=secret,
at_all=at_all,
at_mobiles=at_mobiles)
db.session.add(ding_talk_robot_setting)
db.session.commit()
DingTalkRobotSettingAssociationProject.add(setting_id=ding_talk_robot_setting.id, projects=projects)
return ding_talk_robot_setting
@classmethod
def modify(cls, id, enable, access_token, secret, at_all, at_mobiles, projects=None):
ding_talk_robot_setting = cls.query.filter_by(id=id).first()
if ding_talk_robot_setting:
ding_talk_robot_setting.enable = enable
ding_talk_robot_setting.access_token = access_token
ding_talk_robot_setting.secret = secret
ding_talk_robot_setting.at_all = at_all
ding_talk_robot_setting.at_mobiles = at_mobiles
db.session.commit()
DingTalkRobotSettingAssociationProject.delete(setting_id=id)
DingTalkRobotSettingAssociationProject.add(setting_id=id, projects=projects)
@classmethod
def delete(cls, id):
ding_talk_robot_setting = cls.query.filter_by(id=id).first()
if ding_talk_robot_setting:
db.session.delete(ding_talk_robot_setting)
db.session.commit()
DingTalkRobotSettingAssociationProject.delete(setting_id=ding_talk_robot_setting.id)
class DingTalkRobotSettingAssociationProject(db.Model):
"""钉钉机器人配置与项目关系表"""
# field
id = db.Column(db.Integer, primary_key=True)
setting_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
project_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
@classmethod
def add(cls, setting_id, projects):
for project_id in projects:
db.session.add(cls(setting_id=setting_id, project_id=project_id))
db.session.commit()
@classmethod
def delete(cls, setting_id):
rows = cls.query.filter_by(setting_id=setting_id).all()
for row in rows:
db.session.delete(row)
db.session.commit()
class EmailReceiverSetting(db.Model):
"""
邮件收件人配置
字段说明
enable 是否启用当前收件人地址
name 收件人名称
address 邮件地址
"""
# field
id = db.Column(db.Integer, primary_key=True)
enable = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
name = db.Column(db.String(128), nullable=False, default='', server_default='')
address = db.Column(db.String(512), nullable=False, default='', server_default='')
@classmethod
def get_project_email_receiver_setting(cls, project_id):
"""获取该项目需要通知的所有收件人"""
email_receiver_settings = []
rows = EmailReceiverSettingAssociationProject.query.filter_by(project_id=project_id).all()
if rows:
for row in rows:
email_receiver_setting = cls.query.filter_by(id=row.setting_id).first()
if email_receiver_setting:
email_receiver_settings.append(email_receiver_setting)
return email_receiver_settings
@classmethod
def add(cls, enable, name, address, projects=None):
email_receiver_setting = cls(enable=enable,
name=name,
address=address)
db.session.add(email_receiver_setting)
db.session.commit()
EmailReceiverSettingAssociationProject.add(setting_id=email_receiver_setting.id, projects=projects)
return email_receiver_setting
@classmethod
def modify(cls, id, enable, name, address, projects=None):
email_receiver_setting = cls.query.filter_by(id=id).first()
if email_receiver_setting:
email_receiver_setting.enable = enable
email_receiver_setting.name = name
email_receiver_setting.address = address
db.session.commit()
EmailReceiverSettingAssociationProject.delete(setting_id=id)
EmailReceiverSettingAssociationProject.add(setting_id=id, projects=projects)
@classmethod
def delete(cls, id):
email_receiver_setting = cls.query.filter_by(id=id).first()
if email_receiver_setting:
db.session.delete(email_receiver_setting)
db.session.commit()
EmailReceiverSettingAssociationProject.delete(setting_id=email_receiver_setting.id)
class EmailReceiverSettingAssociationProject(db.Model):
"""邮件收件人配置与项目关系表"""
# field
id = db.Column(db.Integer, primary_key=True)
setting_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
project_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
@classmethod
def add(cls, setting_id, projects):
for project_id in projects:
db.session.add(cls(setting_id=setting_id, project_id=project_id))
db.session.commit()
@classmethod
def delete(cls, setting_id):
rows = cls.query.filter_by(setting_id=setting_id).all()
for row in rows:
db.session.delete(row)
db.session.commit()