3814 lines
163 KiB
Python
3814 lines
163 KiB
Python
# coding=utf-8
|
||
|
||
from flask import current_app
|
||
from flask_login import UserMixin, current_user
|
||
from datetime import datetime
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
import jwt
|
||
from time import time
|
||
from sqlalchemy import text
|
||
from typing import Iterable, List, Dict, Any, Union, Optional
|
||
|
||
from app.extensions import db
|
||
from app.cores.dictionaries import STATUS, EXPECTATION_LOGIC, CASE_TYPE, ELEMENT_TYPE, LOGIC_CONTROLLER_TYPE, DB_TYPE, \
|
||
DISPATCHER_STATUS, TOOL_TYPE
|
||
from app.cores.parser import new_parse_data as _p
|
||
|
||
|
||
class User(UserMixin, db.Model):
|
||
"""
|
||
字段说明
|
||
status: 账户状态
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
username = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
email = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
password_hash = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
last_seen = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
|
||
status = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
# relationship
|
||
email_setting = db.relationship('EmailSetting', back_populates='user')
|
||
|
||
__table_args__ = (
|
||
db.UniqueConstraint('username', 'email', name='uix_user_username_email'),
|
||
)
|
||
|
||
def __repr__(self):
|
||
return '<User id:{}, username:{}>'.format(self.id, self.username)
|
||
|
||
def set_password(self, password):
|
||
self.password_hash = generate_password_hash(password=password)
|
||
|
||
def check_password(self, passowrd):
|
||
return check_password_hash(pwhash=self.password_hash, password=passowrd)
|
||
|
||
@classmethod
|
||
def add(cls, username, email, password):
|
||
user = cls(username=username, email=email, status=STATUS.NORMAL)
|
||
user.set_password(password=password)
|
||
db.session.add(user)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def update_password(cls, username: str, email: str, password: str):
|
||
user = cls.query.filter_by(username=username, email=email).first()
|
||
user.set_password(password=password)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def get_email_token(cls, email: str, expires_in=600) -> str:
|
||
return jwt.encode(
|
||
{'email': email, 'exp': time() + expires_in},
|
||
key=current_app.config['SECRET_KEY'],
|
||
algorithm='HS256'
|
||
).decode('utf-8')
|
||
|
||
@classmethod
|
||
def get_email_from_token(cls, token: str) -> Optional[str]:
|
||
try:
|
||
email = jwt.decode(
|
||
token,
|
||
key=current_app.config['SECRET_KEY'],
|
||
algorithms=['HS256']
|
||
)['email']
|
||
return email
|
||
except:
|
||
return None
|
||
|
||
@classmethod
|
||
def check_existence_buy_username_email(cls, username, email):
|
||
if cls.query.filter_by(username=username, email=email).all():
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
class EmailSetting(db.Model):
|
||
"""
|
||
字段说明
|
||
whether_send_email: 是否发送邮件
|
||
whether_gen_report: 是否生成报告
|
||
email_title: 邮件主题
|
||
email_text: 正文内容
|
||
receiver_address: 收件地址, 多个收件人以逗号分隔
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
whether_send_email = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
whether_gen_report = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
email_title = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
email_text = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
receiver_address = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
# relationship
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
|
||
user = db.relationship('User', back_populates='email_setting')
|
||
|
||
def __repr__(self):
|
||
return '<EmailSetting id:{}, smtp_login_name:{}>'.format(self.id, self.smtp_login_name)
|
||
|
||
@classmethod
|
||
def get_current_email_setting(cls):
|
||
"""根据当前用户current_user自动匹配到对应的email_setting数据"""
|
||
return cls.query.filter_by(user=current_user).first()
|
||
|
||
@classmethod
|
||
def add(cls, **kwargs):
|
||
email_setting = cls(
|
||
whether_send_email=kwargs.get('whether_send_email', False),
|
||
whether_gen_report=kwargs.get('whether_gen_report', False),
|
||
email_title=kwargs.get('whether_gen_report', ''),
|
||
email_text=kwargs.get('whether_gen_report', ''),
|
||
receiver_address=kwargs.get('receiver_address', ''),
|
||
user_id=current_user.id,
|
||
)
|
||
db.session.add(email_setting)
|
||
db.session.commit()
|
||
|
||
def update(self, **kwargs):
|
||
self.whether_send_email = kwargs.get('whether_send_email', self.whether_send_email)
|
||
self.whether_gen_report = kwargs.get('whether_gen_report', self.whether_gen_report)
|
||
self.email_title = kwargs.get('email_title', self.email_title)
|
||
self.email_text = kwargs.get('email_text', self.email_text)
|
||
self.receiver_address = kwargs.get('receiver_address', self.receiver_address)
|
||
db.session.commit()
|
||
|
||
|
||
class Project(db.Model):
|
||
"""
|
||
测试项目表
|
||
字段说明
|
||
name: 项目名称
|
||
description: 项目描述
|
||
create_time: 项目创建时间
|
||
last_updated_time: 项目上次修改时间
|
||
status: 项目状态
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['id', 'name', 'description', 'create_time', 'last_updated_time']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
description = db.Column(db.String(256), nullable=False, default='', server_default='')
|
||
create_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
|
||
last_updated_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
|
||
status = db.Column(db.String(64), nullable=False, default=STATUS.NORMAL, server_default=STATUS.NORMAL)
|
||
|
||
# relationship
|
||
# modules = db.relationship('Module', back_populates='project', cascade='all, delete-orphan') # 解除与module关系后会删除module数据
|
||
modules = db.relationship('Module', back_populates='project', cascade='all')
|
||
project_advanced_configuration = db.relationship('ProjectAdvancedConfiguration', back_populates='project',
|
||
cascade='all, delete-orphan', uselist=False)
|
||
|
||
@classmethod
|
||
def add(cls, name, description):
|
||
now = datetime.now()
|
||
current_time = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute,
|
||
second=now.second)
|
||
project = cls(name=name, description=description, create_time=current_time, last_updated_time=current_time)
|
||
project.project_advanced_configuration = ProjectAdvancedConfiguration(stop_on_error=True)
|
||
db.session.add(project)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def delete(cls, id):
|
||
"""不会真正从数据库删除,更改status状态为'删除'"""
|
||
project = cls.query.filter_by(id=int(id)).first()
|
||
if project is not None:
|
||
project.status = STATUS.DELETED
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def update(cls, id, name, description):
|
||
project = cls.query.filter_by(id=int(id)).first()
|
||
if project is not None:
|
||
project.name = name
|
||
project.description = description
|
||
now = datetime.now()
|
||
current_time = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute,
|
||
second=now.second)
|
||
project.last_updated_time = current_time
|
||
db.session.commit()
|
||
|
||
def add_module(self, name, description):
|
||
now = datetime.now()
|
||
current_time = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute,
|
||
second=now.second)
|
||
module = Module(name=name, description=description, creator=current_user.username, create_time=current_time,
|
||
last_updater=current_user.username, last_updated_time=current_time,
|
||
order_in_project=Module._get_max_order_in_project(project_id=self.id) + 1)
|
||
self.modules.append(module)
|
||
db.session.commit()
|
||
|
||
def delete_module(self, id):
|
||
for i, m in enumerate(self.modules):
|
||
if m.id == int(id):
|
||
# self.modules.pop(i)
|
||
m.status = STATUS.DELETED
|
||
m.order_in_project = -1
|
||
db.session.commit()
|
||
|
||
|
||
class ProjectAdvancedConfiguration(db.Model):
|
||
"""
|
||
产品高级设置
|
||
字段说明
|
||
stop_on_error 发生错误时终止执行
|
||
ding_talk_notify 执行完毕后是否发送钉钉通知
|
||
email_notify 执行完毕后是否发送邮件
|
||
email_notify_when_default 在默认条件下发送邮件(前提email_notify=True)
|
||
email_notify_when_failure_or_error 在失败或错误条件下发送邮件(前提email_notify=True)
|
||
clear_http_cookie 构建前清理HTTP的Cookie数据
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
stop_on_error = db.Column(db.Boolean, nullable=False, default=True, server_default='1')
|
||
ding_talk_notify = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
email_notify = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
email_notify_when_default = db.Column(db.Boolean, nullable=False, default=True, server_default='1')
|
||
email_notify_when_failure_or_error = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
clear_http_cookie = db.Column(db.Boolean, nullable=False, default=True, server_default='1')
|
||
|
||
# relationship
|
||
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
|
||
project = db.relationship('Project', back_populates='project_advanced_configuration')
|
||
|
||
def update(self, stop_on_error, ding_talk_notify, email_notify, email_notify_when_default,
|
||
email_notify_when_failure_or_error, clear_http_cookie):
|
||
self.stop_on_error = stop_on_error
|
||
self.ding_talk_notify = ding_talk_notify
|
||
self.email_notify = email_notify
|
||
self.email_notify_when_default = email_notify_when_default
|
||
self.email_notify_when_failure_or_error = email_notify_when_failure_or_error
|
||
self.clear_http_cookie = clear_http_cookie
|
||
db.session.commit()
|
||
|
||
|
||
class Module(db.Model):
|
||
"""
|
||
测试模块表
|
||
字段说明
|
||
name: 测试模块名称
|
||
description: 测试模块描述
|
||
creator: 测试模块创建者
|
||
create_time: 测试模块创建时间
|
||
last_updater: 测试模块上次修改人
|
||
last_updated_time: 测试模块上次修改时间
|
||
status: 测试模块状态
|
||
"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
description = db.Column(db.String(256), nullable=False, default='', server_default='')
|
||
creator = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
create_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
|
||
last_updater = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
last_updated_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
|
||
status = db.Column(db.String(64), nullable=False, default=STATUS.NORMAL, server_default=STATUS.NORMAL)
|
||
order_in_project = db.Column(db.Integer, nullable=False, default=-1, server_default=text('-1'))
|
||
|
||
# relationship
|
||
project_id = db.Column(db.Integer, db.ForeignKey('project.id'))
|
||
project = db.relationship('Project', back_populates='modules')
|
||
scenes = db.relationship('Scene', back_populates='module', cascade='all')
|
||
|
||
@classmethod
|
||
def update(cls, id, name, description):
|
||
module = cls.query.filter_by(id=int(id)).first()
|
||
if module is not None:
|
||
module.name = name
|
||
module.description = description
|
||
now = datetime.now()
|
||
current_time = datetime(year=now.year, month=now.month, day=now.day, hour=now.hour, minute=now.minute,
|
||
second=now.second)
|
||
module.last_updater = current_user.username
|
||
module.last_updated_time = current_time
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def add_new_empty_scene(cls, module_id):
|
||
module = cls.query.filter_by(id=module_id, status=STATUS.NORMAL).first()
|
||
if module:
|
||
max_order_in_module = Scene._get_max_order_in_module(module_id=module_id)
|
||
scene = Scene(name='新增场景', description='', order_in_module=max_order_in_module + 1)
|
||
module.scenes.append(scene)
|
||
db.session.add(scene)
|
||
db.session.commit()
|
||
SceneController.add(scene_id=scene.id) # 需要在commit之后拿到id
|
||
return scene
|
||
|
||
@classmethod
|
||
def reorder_scene(cls, module_id, reorder_scene_id):
|
||
"""
|
||
对测试模块中测试场景重置和排序
|
||
:param module_id: 模块id
|
||
:type module_id: int
|
||
:param reorder_scene_id: 排序后的scene_id数组
|
||
:type reorder_scene_id: Iterable
|
||
:return:
|
||
"""
|
||
def _get_deleted_scenes(scenes):
|
||
"""
|
||
返回scenes集合中已删除状态的scene测试场景
|
||
:param scenes:
|
||
:return:
|
||
:rtype: Iterable
|
||
"""
|
||
return list(filter(lambda scene: scene.status == STATUS.DELETED, scenes))
|
||
|
||
module = cls.query.filter_by(id=module_id).first()
|
||
if module:
|
||
deleted_scenes = _get_deleted_scenes(module.scenes)
|
||
module.scenes = []
|
||
for index, scene_id in enumerate(reorder_scene_id):
|
||
scene = Scene.query.filter_by(id=scene_id).first()
|
||
if scene:
|
||
scene.order_in_module = index
|
||
module.scenes.append(scene)
|
||
module.scenes.extend(deleted_scenes)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def update_order_in_project(cls, modules):
|
||
for index, module_ in enumerate(modules):
|
||
id = module_.get('id')
|
||
module = cls.query.filter_by(id=id).first()
|
||
if module:
|
||
module.order_in_project = index
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def _get_max_order_in_project(cls, project_id):
|
||
"""根据project_id查询最大的order_in_project"""
|
||
sql = text('select max(order_in_project) from "module" where project_id=:project_id;')
|
||
max_order_in_project = db.session.query('max(order_in_project)').from_statement(sql).params(project_id=project_id).scalar()
|
||
if max_order_in_project is None:
|
||
max_order_in_project = -1
|
||
return max_order_in_project
|
||
|
||
|
||
class Scene(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
description = db.Column(db.String(256), nullable=False, default='', server_default='')
|
||
status = db.Column(db.String(64), nullable=False, default=STATUS.NORMAL, server_default=STATUS.NORMAL)
|
||
order_in_module = db.Column(db.Integer, nullable=False, default=-1, server_default=text('-1'))
|
||
|
||
# relationship
|
||
module_id = db.Column(db.Integer, db.ForeignKey('module.id'))
|
||
module = db.relationship('Module', back_populates='scenes')
|
||
cases = db.relationship('Case', back_populates='scene', cascade='all')
|
||
scene_controller = db.relationship('SceneController', back_populates='scene', uselist=False, cascade='all, delete-orphan')
|
||
|
||
@classmethod
|
||
def add(cls, name, description, status, order_in_module, module_id):
|
||
"""
|
||
新增测试场景
|
||
:param name: 场景名称
|
||
:param description: 场景描述
|
||
:param status: 场景状态
|
||
:param order_in_module: 场景在模块中的排序
|
||
:param module_id: 模块编号
|
||
:return: 场景对象
|
||
:rtype: Scene
|
||
"""
|
||
scene = cls(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
order_in_module=order_in_module,
|
||
module_id=module_id,
|
||
)
|
||
db.session.add(scene)
|
||
db.session.commit()
|
||
SceneController.add(scene_id=scene.id) # 需要在commit之后拿到id TODO 和Module.add_new_empty_scene合并?
|
||
return scene
|
||
|
||
@classmethod
|
||
def delete(cls, id):
|
||
scene = Scene.query.filter_by(id=int(id)).first()
|
||
if scene:
|
||
scene.status = STATUS.DELETED
|
||
scene.order_in_module = -1
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def update(cls, id, name, description):
|
||
scene = Scene.query.filter_by(id=int(id)).first()
|
||
if scene:
|
||
scene.name = name
|
||
scene.description = description
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def add_new_empty_case(cls, scene_id, case_type):
|
||
"""
|
||
新增一个新的case
|
||
:param scene_id: 新case所属的场景id
|
||
:param case_type: 新case类型 HTTP/SQL/SSH/DEBUG or others
|
||
"""
|
||
# scene = cls.query.filter_by(id=int(scene_id), status=STATUS.NORMAL).first()
|
||
scene = cls.query.filter_by(id=int(scene_id)).filter(cls.status != STATUS.DELETED).first()
|
||
if scene:
|
||
specific_case = Case.new_case(case_type=case_type)
|
||
case = specific_case.case
|
||
scene.cases.append(case)
|
||
db.session.commit()
|
||
# 在SubElementInLogicController表中插入一条子组件数据
|
||
logic_controller = scene.scene_controller.logic_controller
|
||
SubElementInLogicController.add(element_id=case.id, element_type=ELEMENT_TYPE.CASE,
|
||
logic_controller_id=logic_controller.id)
|
||
return case
|
||
|
||
@classmethod
|
||
def _get_max_order_in_module(cls, module_id):
|
||
"""根据module_id查询最大的order_in_module"""
|
||
sql = text('select max(order_in_module) from "scene" where module_id=:module_id;')
|
||
max_order_in_module = db.session.query('max(order_in_module)').from_statement(sql).params(module_id=module_id).scalar()
|
||
if max_order_in_module is None:
|
||
max_order_in_module = -1
|
||
return max_order_in_module
|
||
|
||
@classmethod
|
||
def copy_from_id(cls, id):
|
||
"""复制一个新的测试场景"""
|
||
scene = cls.query.filter_by(id=int(id)).first()
|
||
if scene:
|
||
max_order_in_module = cls._get_max_order_in_module(module_id=scene.module_id)
|
||
copy_scene = cls.add(
|
||
name='Copy' + scene.name,
|
||
description=scene.description,
|
||
status=scene.status,
|
||
order_in_module=max_order_in_module + 1,
|
||
module_id=scene.module_id,
|
||
)
|
||
from app.template_global import sort_by_order_in_logic_controller
|
||
# 复制逻辑控制器
|
||
LogicController.copy(original_logic_controller=scene.scene_controller.logic_controller,
|
||
parent_logic_controller=copy_scene.scene_controller.logic_controller,
|
||
scene_id=copy_scene.id)
|
||
return copy_scene
|
||
|
||
@classmethod
|
||
def forbidden(cls, id):
|
||
scene = cls.query.filter_by(id=int(id)).first()
|
||
if scene:
|
||
scene.status = STATUS.FORBIDDEN
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def enable(cls, id):
|
||
scene = cls.query.filter_by(id=int(id)).first()
|
||
if scene:
|
||
scene.status = STATUS.NORMAL
|
||
db.session.commit()
|
||
|
||
|
||
class LogicController(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 逻辑控制器名称
|
||
description: 逻辑控制器注释
|
||
logic_controller_type: 逻辑控制器类型 type: dictionaries.LOGIC_CONTROLLER_TYPE
|
||
status: 状态 type: dictionaries.STATUS
|
||
关系说明
|
||
sub_elements: 该逻辑控制器下所有子组件 LogicController:SubElementInLogicController is 1:N
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
description = db.Column(db.String(256), nullable=False, default='', server_default='')
|
||
logic_controller_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
status = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
sub_elements = db.relationship('SubElementInLogicController', back_populates='logic_controller', cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def specific_controller(self):
|
||
"""返回指定的逻辑控制(场景控制器SceneController, IF控制器IfController)"""
|
||
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER:
|
||
return SceneController.query.filter_by(logic_controller_id=self.id).first()
|
||
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.IF_CONTROLLER:
|
||
return IfController.query.filter_by(logic_controller_id=self.id).first()
|
||
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.WHILE_CONTROLLER:
|
||
return WhileController.query.filter_by(logic_controller_id=self.id).first()
|
||
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.LOOP_CONTROLLER:
|
||
return LoopController.query.filter_by(logic_controller_id=self.id).first()
|
||
if self.logic_controller_type == LOGIC_CONTROLLER_TYPE.SIMPLE_CONTROLLER:
|
||
return SimpleController.query.filter_by(logic_controller_id=self.id).first()
|
||
|
||
@classmethod
|
||
def add(cls, name, description, logic_controller_type, status=None):
|
||
"""新增一条逻辑控制器"""
|
||
if status is None:
|
||
status = STATUS.NORMAL
|
||
logic_controller = cls(name=name, description=description, logic_controller_type=logic_controller_type,
|
||
status=status)
|
||
db.session.add(logic_controller)
|
||
db.session.commit()
|
||
return logic_controller
|
||
|
||
# @classmethod
|
||
# def get_max_order_in_logic_controller(cls, logic_controller_id):
|
||
# return SubElementInLogicController.get_max_order_in_logic_controller(logic_controller_id=logic_controller_id)
|
||
|
||
@classmethod
|
||
def delete(cls, logic_controller_id):
|
||
logic_controller = cls.query.filter_by(id=logic_controller_id).first()
|
||
logic_controller.status = STATUS.DELETED
|
||
# 所在的逻辑组件序号改为-1
|
||
SubElementInLogicController.update_order(element_id=logic_controller.id,
|
||
element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
|
||
order_in_logic_controller=-1)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def forbidden(cls, logic_controller_id):
|
||
logic_controller = cls.query.filter_by(id=logic_controller_id).first()
|
||
logic_controller.status = STATUS.FORBIDDEN
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def enable(cls, logic_controller_id):
|
||
logic_controller = cls.query.filter_by(id=logic_controller_id).first()
|
||
logic_controller.status = STATUS.NORMAL
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def copy(cls, original_logic_controller, parent_logic_controller, scene_id):
|
||
"""
|
||
将原逻辑控制器中的组件复制到新逻辑控制器内
|
||
递归遍历original_logic_controller下面的组件,将组件复制到parent_logic_controller中
|
||
:param original_logic_controller: 原逻辑控制器
|
||
:param parent_logic_controller: 新组件所在的父逻辑控制器
|
||
:param scene_id: 逻辑控制器内案例所在的新场景id
|
||
"""
|
||
# 避免和app.template_global中import app.modele形成循环导入
|
||
from app.template_global import sort_by_order_in_logic_controller
|
||
for original_sub_element in sort_by_order_in_logic_controller(logic_controller_id=original_logic_controller.id):
|
||
if original_sub_element.element_type == ELEMENT_TYPE.CASE:
|
||
original_case = Case.query.filter_by(id=original_sub_element.element_id).first()
|
||
if original_case is not None and original_case.status in [STATUS.NORMAL, STATUS.FORBIDDEN]:
|
||
Case.copy_from_id(case_id=original_case.id, scene_id=scene_id,
|
||
logic_controller_id=parent_logic_controller.id)
|
||
elif original_sub_element.element_type == ELEMENT_TYPE.TOOL:
|
||
original_tool = Tool.query.filter_by(id=original_sub_element.element_id).first()
|
||
if original_tool is not None and original_tool.status in [STATUS.NORMAL, STATUS.FORBIDDEN]:
|
||
Tool.copy_from_id(tool_id=original_tool.id, logic_controller_id=parent_logic_controller.id)
|
||
elif original_sub_element.element_type == ELEMENT_TYPE.LOGIC_CONTROLLER:
|
||
original_logic_controller = LogicController.query.filter_by(id=original_sub_element.element_id).first()
|
||
if original_logic_controller is not None and original_logic_controller.status in [STATUS.NORMAL, STATUS.FORBIDDEN]:
|
||
if original_logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.IF_CONTROLLER:
|
||
new_logic_controller = IfController.add(
|
||
description=original_logic_controller.description,
|
||
expression=original_logic_controller.specific_controller.expression,
|
||
parent_logic_controller_id=parent_logic_controller.id,
|
||
status=original_logic_controller.status).logic_controller
|
||
elif original_logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.LOOP_CONTROLLER:
|
||
new_logic_controller = LoopController.add(
|
||
description=original_logic_controller.description,
|
||
expression=original_logic_controller.specific_controller.expression,
|
||
parent_logic_controller_id=parent_logic_controller.id,
|
||
status=original_logic_controller.status).logic_controller
|
||
elif original_logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.WHILE_CONTROLLER:
|
||
new_logic_controller = WhileController.add(
|
||
description=original_logic_controller.description,
|
||
expression=original_logic_controller.specific_controller.expression,
|
||
parent_logic_controller_id=parent_logic_controller.id,
|
||
status=original_logic_controller.status).logic_controller
|
||
elif original_logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.SIMPLE_CONTROLLER:
|
||
new_logic_controller = SimpleController.add(
|
||
description=original_logic_controller.description,
|
||
parent_logic_controller_id=parent_logic_controller.id,
|
||
status=original_logic_controller.status).logic_controller
|
||
else:
|
||
raise ValueError(
|
||
'不支持此类逻辑控制器类型, logic_controller_type=%s' % original_logic_controller.logic_controller_type)
|
||
cls.copy(original_logic_controller=original_logic_controller,
|
||
parent_logic_controller=new_logic_controller, scene_id=scene_id)
|
||
|
||
@classmethod
|
||
def get_project_id_from_current_logic_controller(cls, logic_controller):
|
||
"""找到当前逻辑控制器所属的项目id"""
|
||
def _recursive_find_scene_controller(logic_controller_id):
|
||
"""递归查找SceneController的logic_controller_id"""
|
||
sub_element_in_logic_controller = SubElementInLogicController.query.filter_by(
|
||
element_id=logic_controller_id,
|
||
element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
|
||
).first()
|
||
if sub_element_in_logic_controller:
|
||
return _recursive_find_scene_controller(sub_element_in_logic_controller.logic_controller_id)
|
||
else:
|
||
return logic_controller_id
|
||
|
||
if logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER:
|
||
return logic_controller.specific_controller.scene.module.project_id
|
||
else:
|
||
# 从SubElementInLogicController关系中找到最上层的SceneController
|
||
logic_controller_id = _recursive_find_scene_controller(logic_controller.id)
|
||
scene_controller = SceneController.query.filter_by(logic_controller_id=logic_controller_id).first()
|
||
if scene_controller:
|
||
return scene_controller.scene.module.project_id
|
||
|
||
|
||
class SubElementInLogicController(db.Model):
|
||
"""
|
||
表明逻辑控制器下有哪些组件(子控制器、案例...)
|
||
字段说明
|
||
logic_controller_id: 逻辑控制器id
|
||
element_id: 子组件id(子控制器、案例...)
|
||
element_type: 子组件类型
|
||
order_in_logic_controller: 子组件在控制器中的顺序
|
||
关系说明
|
||
logic_controller: 子组件所属逻辑控制器 LogicController:SubElementInLogicController is 1:N
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
element_id = db.Column(db.Integer, nullable=False, default=-1, server_default=text('-1'))
|
||
element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
order_in_logic_controller = db.Column(db.Integer, nullable=False, default=-1, server_default=text('-1'))
|
||
|
||
# relationship
|
||
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
|
||
logic_controller = db.relationship('LogicController', back_populates='sub_elements')
|
||
|
||
__table_args__ = (
|
||
db.UniqueConstraint('element_id', 'element_type', name='uix_element_id_type'),
|
||
)
|
||
|
||
@classmethod
|
||
def get_max_order_in_logic_controller(cls, logic_controller_id):
|
||
"""
|
||
返回该逻辑控制下组件最大的序号
|
||
:param logic_controller_id: 逻辑控制器id
|
||
:return: 逻辑控制下组件最大的序号
|
||
"""
|
||
sql = text('select max(order_in_logic_controller) from "sub_element_in_logic_controller"'
|
||
' where logic_controller_id=:logic_controller_id;')
|
||
max_order_in_logic_controller = db.session.query('max(order_in_logic_controller)').from_statement(sql).params(
|
||
logic_controller_id=logic_controller_id).scalar()
|
||
if max_order_in_logic_controller is None: # 表示当前逻辑控制器内无排序组件,将max_order_in_logic_controller置为-1
|
||
max_order_in_logic_controller = -1
|
||
return max_order_in_logic_controller
|
||
|
||
@classmethod
|
||
def add(cls, element_id, element_type, logic_controller_id):
|
||
max_order_in_logic_controller = cls.get_max_order_in_logic_controller(logic_controller_id=logic_controller_id)
|
||
sub_element_logic_controoler = cls(element_id=element_id, element_type=element_type,
|
||
logic_controller_id=logic_controller_id,
|
||
order_in_logic_controller=max_order_in_logic_controller + 1)
|
||
db.session.add(sub_element_logic_controoler)
|
||
db.session.commit()
|
||
return sub_element_logic_controoler
|
||
|
||
@classmethod
|
||
def update_order(cls, element_id, element_type, order_in_logic_controller, logic_controller_id=None):
|
||
"""
|
||
更新组件数据
|
||
:param element_id: 组件id
|
||
:param element_type: 组件类型
|
||
:param order_in_logic_controller: 组件在父逻辑控制器内的顺序
|
||
:param logic_controller_id: 父逻辑控制id
|
||
"""
|
||
sub_element_logic_controller = cls.query.filter_by(element_id=element_id, element_type=element_type).first()
|
||
if sub_element_logic_controller is not None:
|
||
sub_element_logic_controller.order_in_logic_controller = order_in_logic_controller
|
||
if logic_controller_id is not None:
|
||
sub_element_logic_controller.logic_controller_id = logic_controller_id
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def update_elements_order(cls, elements_order_in_logic_controller):
|
||
"""
|
||
更新组件顺序在所属逻辑控制器中
|
||
:param elements_order_in_logic_controller: 新排序
|
||
:type elements_order_in_logic_controller: List[Dict[str, Any]]
|
||
:return:
|
||
"""
|
||
for element_order in elements_order_in_logic_controller:
|
||
cls.update_order(element_id=element_order.get('element_id'),
|
||
element_type=element_order.get('element_type'),
|
||
order_in_logic_controller=element_order.get('order_in_logic_controller'),
|
||
logic_controller_id=element_order.get('logic_controller_id'))
|
||
|
||
@classmethod
|
||
def get_scene_controller_by_element(cls, element_id, element_type):
|
||
"""
|
||
根据元素获取该组件所在的场景控制器
|
||
:param element_id: 组件id
|
||
:param element_type: 组件类型
|
||
:return: 场景控制器对象
|
||
:rtype: SceneController
|
||
"""
|
||
sub_element_logic_controller = SubElementInLogicController.query.filter_by(
|
||
element_id=element_id, element_type=element_type).first()
|
||
logic_controller_id = sub_element_logic_controller.logic_controller_id
|
||
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
|
||
if logic_controller.logic_controller_type == LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER:
|
||
return logic_controller.specific_controller
|
||
else:
|
||
return cls.get_scene_controller_by_element(
|
||
element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER)
|
||
|
||
def get_element_logic_controller_if_element_type_is_logic_controller(self):
|
||
"""
|
||
如果元素是一个逻辑控制器,则返回该逻辑控制数据
|
||
需要提醒一点的是,关系中有个logic_controller,这个指的是该组件所属的逻辑控制(即父组件)
|
||
"""
|
||
if self.element_type == ELEMENT_TYPE.LOGIC_CONTROLLER:
|
||
return LogicController.query.filter_by(id=self.element_id).first()
|
||
else:
|
||
return None
|
||
|
||
|
||
class SceneController(db.Model):
|
||
"""
|
||
场景逻辑控制器,只用在场景内第一层逻辑控制
|
||
关系说明
|
||
scene: 对应的场景scene Scene:SceneController is 1:1
|
||
logic_controller: 对应逻辑控制器 LogicController:SceneController is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
|
||
# relationship
|
||
scene_id = db.Column(db.Integer, db.ForeignKey('scene.id'))
|
||
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
|
||
scene = db.relationship('Scene', back_populates='scene_controller')
|
||
logic_controller = db.relationship('LogicController')
|
||
|
||
@classmethod
|
||
def add(cls, scene_id):
|
||
"""新增场景控制器,只在新增场景时默认添加执行"""
|
||
logic_controller = LogicController.add(name=LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER,
|
||
description='',
|
||
logic_controller_type=LOGIC_CONTROLLER_TYPE.SCENE_CONTROLLER)
|
||
scene_controller = cls(scene_id=scene_id, logic_controller_id=logic_controller.id)
|
||
db.session.add(scene_controller)
|
||
db.session.commit()
|
||
return scene_controller
|
||
|
||
|
||
class SimpleController(db.Model):
|
||
"""
|
||
简单控制器
|
||
关系说明
|
||
logic_controller: 对应逻辑控制器
|
||
SimpleController:LogicController is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
|
||
# relationship
|
||
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
|
||
logic_controller = db.relationship('LogicController')
|
||
|
||
@classmethod
|
||
def add(cls, parent_logic_controller_id, description='', status=None):
|
||
"""
|
||
添加简单控制器,一般所有逻辑控制新增时默认都在场景控制器内
|
||
:param parent_logic_controller_id: 简单控制所属的父逻辑控制id
|
||
:param description: 注释(复制组件时传入)
|
||
:param status: 逻辑控制器状态(复制组件时传入)
|
||
"""
|
||
logic_controller = LogicController.add(name='简单控制器',
|
||
description=description,
|
||
logic_controller_type=LOGIC_CONTROLLER_TYPE.SIMPLE_CONTROLLER,
|
||
status=status)
|
||
simple_controller = cls(logic_controller_id=logic_controller.id)
|
||
db.session.add(simple_controller)
|
||
db.session.commit()
|
||
# 将逻辑控制绑定在场景控制中
|
||
SubElementInLogicController.add(element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
|
||
logic_controller_id=parent_logic_controller_id)
|
||
return simple_controller
|
||
|
||
@classmethod
|
||
def update(cls, logic_controller_id, name, description):
|
||
"""更新Simple控制器名称"""
|
||
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
|
||
if logic_controller:
|
||
simple_controller = logic_controller.specific_controller
|
||
if simple_controller is not None:
|
||
simple_controller.logic_controller.name = name
|
||
simple_controller.logic_controller.description = description
|
||
db.session.commit()
|
||
|
||
def copy_from_self(self, parent_logic_controller_id):
|
||
"""
|
||
复制一个新的逻辑控制器
|
||
:param parent_logic_controller_id: 新的逻辑控制所属的父逻辑控制器id
|
||
:return: SimpleController
|
||
:rtype: SimpleController
|
||
"""
|
||
return SimpleController.add(parent_logic_controller_id=parent_logic_controller_id,
|
||
description=self.logic_controller.description,
|
||
status=self.logic_controller.status)
|
||
|
||
|
||
class IfController(db.Model):
|
||
"""
|
||
IF条件控制器
|
||
字段说明
|
||
expression: 表达式
|
||
关系说明
|
||
logic_controller: 对应逻辑控制器
|
||
IfController:LogicController is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
expression = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
|
||
logic_controller = db.relationship('LogicController')
|
||
|
||
@classmethod
|
||
def add(cls, expression, parent_logic_controller_id, description='', status=None):
|
||
"""
|
||
添加If控制器,一般所有逻辑控制新增时默认都在场景控制器内
|
||
:param expression: 表达式
|
||
:param parent_logic_controller_id: IF控制所属的父逻辑控制id
|
||
:param description: 注释(复制组件时传入)
|
||
:param status: 逻辑控制器状态(复制组件时传入)
|
||
"""
|
||
logic_controller = LogicController.add(name='IF控制器',
|
||
description=description,
|
||
logic_controller_type=LOGIC_CONTROLLER_TYPE.IF_CONTROLLER,
|
||
status=status)
|
||
if_controller = cls(logic_controller_id=logic_controller.id, expression=expression)
|
||
db.session.add(if_controller)
|
||
db.session.commit()
|
||
# 将逻辑控制绑定在场景控制中
|
||
SubElementInLogicController.add(element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
|
||
logic_controller_id=parent_logic_controller_id)
|
||
return if_controller
|
||
|
||
@classmethod
|
||
def update(cls, logic_controller_id, name, description, expression):
|
||
"""更新If控制器表达式和名称"""
|
||
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
|
||
if logic_controller:
|
||
if_controller = logic_controller.specific_controller
|
||
if if_controller is not None:
|
||
if_controller.logic_controller.name = name
|
||
if_controller.logic_controller.description = description
|
||
if_controller.expression = expression
|
||
db.session.commit()
|
||
|
||
def copy_from_self(self, parent_logic_controller_id):
|
||
"""
|
||
复制一个新的逻辑控制器
|
||
:param parent_logic_controller_id: 新的逻辑控制所属的父逻辑控制器id
|
||
:return: IfController
|
||
:rtype: IfController
|
||
"""
|
||
return IfController.add(expression=self.expression,
|
||
parent_logic_controller_id=parent_logic_controller_id,
|
||
description=self.logic_controller.description,
|
||
status=self.logic_controller.status)
|
||
|
||
|
||
class WhileController(db.Model):
|
||
"""
|
||
WHILE控制器
|
||
字段说明
|
||
expression: 表达式
|
||
关系说明
|
||
logic_controller: 对应逻辑控制器
|
||
WhileController:LogicController is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
expression = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
|
||
logic_controller = db.relationship('LogicController')
|
||
|
||
@classmethod
|
||
def add(cls, expression, parent_logic_controller_id, description='', status=None):
|
||
"""
|
||
添加While控制器,一般所有逻辑控制新增时默认都在场景控制器内
|
||
:param expression: 表达式
|
||
:param parent_logic_controller_id: WHILE控制所属的父逻辑控制id
|
||
:param description: 注释(复制组件时传入)
|
||
:param status: 逻辑控制器状态(复制组件时传入)
|
||
"""
|
||
logic_controller = LogicController.add(name='While控制器',
|
||
description=description,
|
||
logic_controller_type=LOGIC_CONTROLLER_TYPE.WHILE_CONTROLLER,
|
||
status=status)
|
||
while_controller = cls(logic_controller_id=logic_controller.id, expression=expression)
|
||
db.session.add(while_controller)
|
||
db.session.commit()
|
||
# 将逻辑控制绑定在场景控制中
|
||
SubElementInLogicController.add(element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
|
||
logic_controller_id=parent_logic_controller_id)
|
||
return while_controller
|
||
|
||
@classmethod
|
||
def update(cls, logic_controller_id, name, description, expression):
|
||
"""更新While控制器表达式和名称"""
|
||
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
|
||
if logic_controller:
|
||
while_controller = logic_controller.specific_controller
|
||
if while_controller is not None:
|
||
while_controller.logic_controller.name = name
|
||
while_controller.logic_controller.description = description
|
||
while_controller.expression = expression
|
||
db.session.commit()
|
||
|
||
def copy_from_self(self, parent_logic_controller_id):
|
||
"""
|
||
复制一个新的逻辑控制器
|
||
:param parent_logic_controller_id: 新的逻辑控制所属的父逻辑控制器id
|
||
:return: while_controller
|
||
:rtype: WhileController
|
||
"""
|
||
return WhileController.add(expression=self.expression,
|
||
parent_logic_controller_id=parent_logic_controller_id,
|
||
description=self.logic_controller.description,
|
||
status=self.logic_controller.status)
|
||
|
||
|
||
class LoopController(db.Model):
|
||
"""
|
||
LOOP控制器
|
||
字段说明
|
||
expression: 表达式
|
||
关系说明
|
||
logic_controller: 对应逻辑控制器
|
||
LoopController:LogicController is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
expression = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
logic_controller_id = db.Column(db.Integer, db.ForeignKey('logic_controller.id'))
|
||
logic_controller = db.relationship('LogicController')
|
||
|
||
@classmethod
|
||
def add(cls, expression, parent_logic_controller_id, description='', status=None):
|
||
"""
|
||
添加Loop控制器,一般所有逻辑控制新增时默认都在场景控制器内
|
||
:param expression: 表达式
|
||
:param parent_logic_controller_id: WHILE控制所属的父逻辑控制id
|
||
:param description: 注释(复制组件时传入)
|
||
:param status: 逻辑控制器状态(复制组件时传入)
|
||
"""
|
||
logic_controller = LogicController.add(name='LOOP控制器',
|
||
description=description,
|
||
logic_controller_type=LOGIC_CONTROLLER_TYPE.LOOP_CONTROLLER,
|
||
status=status)
|
||
loop_controller = cls(logic_controller_id=logic_controller.id, expression=expression)
|
||
db.session.add(loop_controller)
|
||
db.session.commit()
|
||
# 将逻辑控制绑定在场景控制中
|
||
SubElementInLogicController.add(element_id=logic_controller.id, element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
|
||
logic_controller_id=parent_logic_controller_id)
|
||
return loop_controller
|
||
|
||
@classmethod
|
||
def update(cls, logic_controller_id, name, description, expression):
|
||
"""更新Loop控制器表达式和名称"""
|
||
logic_controller = LogicController.query.filter_by(id=logic_controller_id).first()
|
||
if logic_controller:
|
||
loop_controller = logic_controller.specific_controller
|
||
if loop_controller is not None:
|
||
loop_controller.logic_controller.name = name
|
||
loop_controller.logic_controller.description = description
|
||
loop_controller.expression = expression
|
||
db.session.commit()
|
||
|
||
def copy_from_self(self, parent_logic_controller_id):
|
||
"""
|
||
复制一个新的逻辑控制器
|
||
:param parent_logic_controller_id: 新的逻辑控制所属的父逻辑控制器id
|
||
:return: loop_controller
|
||
:rtype: LoopController
|
||
"""
|
||
return LoopController.add(expression=self.expression,
|
||
parent_logic_controller_id=parent_logic_controller_id,
|
||
description=self.logic_controller.description,
|
||
status=self.logic_controller.status)
|
||
|
||
|
||
class Tool(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 工具名称
|
||
description: 工具注释
|
||
status: 案例状态(删除 正常)
|
||
tool_type: 工具类型 TOOL_TYPE
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
description = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
status = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
tool_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
|
||
@property
|
||
def specific_tool(self):
|
||
if self.tool_type == TOOL_TYPE.TIMER:
|
||
return TimerTool.query.filter_by(tool_id=self.id).first()
|
||
if self.tool_type == TOOL_TYPE.SCRIPT:
|
||
return ScriptTool.query.filter_by(tool_id=self.id).first()
|
||
if self.tool_type == TOOL_TYPE.VARIABLE_DEFINITION:
|
||
return VariableDefinitionTool.query.filter_by(tool_id=self.id).first()
|
||
if self.tool_type == TOOL_TYPE.HTTP_HEADER_MANAGER:
|
||
return HTTPHeaderManagerTool.query.filter_by(tool_id=self.id).first()
|
||
if self.tool_type == TOOL_TYPE.HTTP_COOKIE_MANAGER:
|
||
return HTTPCookieManagerTool.query.filter_by(tool_id=self.id).first()
|
||
|
||
@classmethod
|
||
def exist_and_status_not_delete(cls, tool_id) -> bool:
|
||
"""根据tool_id判断工具存在且状态正常(未删除)"""
|
||
tool = cls.query.filter(cls.id == int(tool_id),
|
||
cls.status.in_([STATUS.NORMAL, STATUS.FORBIDDEN])).first()
|
||
return True if tool is not None else False
|
||
|
||
@classmethod
|
||
def add_new_empty_tool(cls, parent_logic_controller_id, tool_type):
|
||
"""
|
||
新增一个新的tool
|
||
:param parent_logic_controller_id: 新tool所属的逻辑控制器
|
||
:param tool_type: 新的组件类型
|
||
"""
|
||
specific_tool = Tool.new_tool(tool_type=tool_type)
|
||
tool = specific_tool.tool
|
||
# 在SubElementInLogicController表中插入一条子组件数据
|
||
SubElementInLogicController.add(element_id=tool.id, element_type=ELEMENT_TYPE.TOOL,
|
||
logic_controller_id=parent_logic_controller_id)
|
||
return tool
|
||
|
||
@classmethod
|
||
def new_tool(cls, tool_type):
|
||
"""
|
||
:param tool_type: 工具组件类型
|
||
:return: 特定类型工具对象
|
||
:rtype: Union[TimerTool]
|
||
"""
|
||
tool = cls(
|
||
name='新的' + tool_type + '工具',
|
||
description='',
|
||
tool_type=tool_type,
|
||
status=STATUS.NORMAL,
|
||
)
|
||
if tool_type == TOOL_TYPE.TIMER:
|
||
specific_tool = TimerTool(delay='5000')
|
||
specific_tool.tool = tool
|
||
elif tool_type == TOOL_TYPE.SCRIPT:
|
||
specific_tool = ScriptTool(script='# 请编写测试脚本')
|
||
specific_tool.tool = tool
|
||
elif tool_type == TOOL_TYPE.VARIABLE_DEFINITION:
|
||
specific_tool = VariableDefinitionTool.new_tool()
|
||
specific_tool.tool = tool
|
||
elif tool_type == TOOL_TYPE.HTTP_HEADER_MANAGER:
|
||
specific_tool = HTTPHeaderManagerTool.new_tool()
|
||
specific_tool.tool = tool
|
||
elif tool_type == TOOL_TYPE.HTTP_COOKIE_MANAGER:
|
||
specific_tool = HTTPCookieManagerTool.new_tool()
|
||
specific_tool.tool = tool
|
||
else:
|
||
raise ValueError('不支持当前tool_type类型, tool_type=%s' % tool_type)
|
||
db.session.add(specific_tool)
|
||
db.session.commit()
|
||
return specific_tool
|
||
|
||
@classmethod
|
||
def delete(cls, tool_id):
|
||
"""不会真正从数据库删除,更改status状态为0"""
|
||
tool = cls.query.filter_by(id=tool_id).first()
|
||
if tool is not None:
|
||
tool.status = STATUS.DELETED
|
||
# tool所在的逻辑组件序号改为-1
|
||
SubElementInLogicController.update_order(element_id=tool.id, element_type=ELEMENT_TYPE.TOOL,
|
||
order_in_logic_controller=-1)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def copy_from_id(cls, tool_id, logic_controller_id=None):
|
||
"""
|
||
根据id复制一个新的tool
|
||
:param tool_id: 被复制的tool id
|
||
:type tool_id: int
|
||
:param logic_controller_id: 新tool所属的父逻辑控制id
|
||
:type logic_controller_id: int
|
||
:rtype: Tool
|
||
"""
|
||
tool = cls.query.filter_by(id=tool_id).first()
|
||
if tool:
|
||
copy_specific_tool = tool.specific_tool.copy_from_self()
|
||
# 在SubElementInLogicController表中插入一条子组件数据
|
||
SubElementInLogicController.add(element_id=copy_specific_tool.tool.id, element_type=ELEMENT_TYPE.TOOL,
|
||
logic_controller_id=logic_controller_id)
|
||
return copy_specific_tool.tool
|
||
|
||
@classmethod
|
||
def forbidden(cls, tool_id):
|
||
"""禁用测试组件"""
|
||
tool = cls.query.filter_by(id=tool_id).first()
|
||
if tool:
|
||
tool.status = STATUS.FORBIDDEN
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def enable(cls, tool_id):
|
||
"""启用测试组件"""
|
||
tool = cls.query.filter_by(id=tool_id).first()
|
||
if tool:
|
||
tool.status = STATUS.NORMAL
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def get_project_id_from_current_tool(cls, tool):
|
||
"""找到当前工具所属的项目id"""
|
||
# 找到工具所在的逻辑控制器
|
||
sub_element_in_logic_controller = SubElementInLogicController.query.filter_by(
|
||
element_id=tool.id,
|
||
element_type=ELEMENT_TYPE.TOOL,
|
||
).first()
|
||
if sub_element_in_logic_controller:
|
||
parent_logic_controller_id = sub_element_in_logic_controller.logic_controller_id
|
||
logic_controller = LogicController.query.filter_by(id=parent_logic_controller_id).first()
|
||
if logic_controller:
|
||
return LogicController.get_project_id_from_current_logic_controller(logic_controller=logic_controller)
|
||
|
||
|
||
class TimerTool(db.Model):
|
||
"""
|
||
字段说明
|
||
delay: 暂停时间(ms)
|
||
关系说明
|
||
tool: 定时器对应的工具对象 TimerTool:Tool is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
delay = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
|
||
tool = db.relationship('Tool')
|
||
|
||
@property
|
||
def delay_(self):
|
||
return _p(self.delay)
|
||
|
||
@classmethod
|
||
def update(cls, tool_id, name, description, delay):
|
||
"""
|
||
更新工具数据
|
||
:param tool_id: 工具编号
|
||
:param name: 名称
|
||
:param description: 注释
|
||
:param delay: 暂停时间
|
||
"""
|
||
tool = Tool.query.filter_by(id=tool_id).first()
|
||
if tool:
|
||
timer_tool = tool.specific_tool
|
||
tool.name = name
|
||
tool.description = description
|
||
timer_tool.delay = delay
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def add(cls, name, description, delay, status=STATUS.NORMAL):
|
||
"""
|
||
新增一条工具数据
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param delay: 暂停时间
|
||
:param status: 案例状态(删除 正常)
|
||
"""
|
||
tool = Tool(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
tool_type=TOOL_TYPE.TIMER,
|
||
)
|
||
timer_tool = cls(
|
||
delay=delay,
|
||
)
|
||
timer_tool.tool = tool
|
||
db.session.add(tool)
|
||
db.session.add(timer_tool)
|
||
db.session.commit()
|
||
return timer_tool
|
||
|
||
def copy_from_self(self):
|
||
"""
|
||
复制一个新的工具
|
||
:return: 新的工具
|
||
:rtype: Tool
|
||
"""
|
||
copy_tool = self.add(
|
||
# Tool
|
||
name='Copy' + self.tool.name,
|
||
description=self.tool.description,
|
||
status=self.tool.status,
|
||
# TimerTool
|
||
delay=self.delay,
|
||
)
|
||
return copy_tool
|
||
|
||
|
||
class ScriptTool(db.Model):
|
||
"""
|
||
字段说明
|
||
script: 脚本
|
||
log: 日志
|
||
关系说明
|
||
tool: 脚本对应的工具对象 ScriptTool:Tool is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
script = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
log = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
|
||
tool = db.relationship('Tool')
|
||
|
||
@property
|
||
def script_(self):
|
||
return _p(self.script)
|
||
|
||
@classmethod
|
||
def update(cls, tool_id, name, description, script):
|
||
"""
|
||
更新工具数据
|
||
:param tool_id: 工具编号
|
||
:param name: 名称
|
||
:param description: 注释
|
||
:param script: 脚本
|
||
"""
|
||
tool = Tool.query.filter_by(id=tool_id).first()
|
||
if tool:
|
||
script_tool = tool.specific_tool
|
||
tool.name = name
|
||
tool.description = description
|
||
script_tool.script = script
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def add(cls, name, description, script, status=STATUS.NORMAL):
|
||
"""
|
||
新增一条工具数据
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param script: 脚本
|
||
:param status: 案例状态(删除 正常)
|
||
"""
|
||
tool = Tool(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
tool_type=TOOL_TYPE.SCRIPT,
|
||
)
|
||
script_tool = cls(
|
||
script=script,
|
||
)
|
||
script_tool.tool = tool
|
||
db.session.add(tool)
|
||
db.session.add(script_tool)
|
||
db.session.commit()
|
||
return script_tool
|
||
|
||
def copy_from_self(self):
|
||
"""
|
||
复制一个新的工具
|
||
:return: 新的工具
|
||
:rtype: Tool
|
||
"""
|
||
copy_tool = self.add(
|
||
# Tool
|
||
name='Copy' + self.tool.name,
|
||
description=self.tool.description,
|
||
status=self.tool.status,
|
||
# ScriptTool
|
||
script=self.script,
|
||
)
|
||
return copy_tool
|
||
|
||
|
||
class VariableDefinitionTool(db.Model):
|
||
"""
|
||
关系说明
|
||
tool: 变量对应的工具对象 VariableDefinitionTool:Tool is 1:1
|
||
variable_definition_list: 变量工具所包含的所有变量与值数据 VariableDefinitionTool:VariableDefinitionList is 1:N
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
|
||
# relationship
|
||
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
|
||
tool = db.relationship('Tool')
|
||
variable_definition_list = db.relationship('VariableDefinitionList', back_populates='variable_definition_tool',
|
||
cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def variables(self):
|
||
return [{
|
||
'name': row.name,
|
||
'value': row.value,
|
||
'description': row.description,
|
||
} for row in self.variable_definition_list]
|
||
|
||
@classmethod
|
||
def new_tool(cls):
|
||
"""新增默认"""
|
||
variable_definition_tool = cls()
|
||
variable_definition_tool.variable_definition_list = []
|
||
variable_definition_tool.variable_definition_list.append(
|
||
VariableDefinitionList(name='var_name', value='var_value', description='')
|
||
)
|
||
db.session.add(variable_definition_tool)
|
||
db.session.commit()
|
||
return variable_definition_tool
|
||
|
||
@classmethod
|
||
def update(cls, name, description, tool_id, variables):
|
||
"""
|
||
更新工具数据
|
||
:param tool_id: 工具编号
|
||
:type tool_id: int
|
||
:param name: 名称
|
||
:type name: str
|
||
:param description: 注释
|
||
:type description: str
|
||
:param variables: 变量键值对列表
|
||
:type variables: List[Mapping[str, str]]
|
||
"""
|
||
tool = Tool.query.filter_by(id=tool_id).first()
|
||
if tool:
|
||
tool.name = name
|
||
tool.description = description
|
||
variable_definition_tool = tool.specific_tool
|
||
variable_definition_tool.variable_definition_list = []
|
||
for var in variables:
|
||
variable_definition_tool.variable_definition_list.append(
|
||
VariableDefinitionList(name=var.get('name', ''),
|
||
value=var.get('value', ''),
|
||
description=var.get('description', ''))
|
||
)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def add(cls, name, description, variables, status=STATUS.NORMAL):
|
||
"""
|
||
新增一条工具数据
|
||
:param name: 名称
|
||
:type name: str
|
||
:param description: 注释
|
||
:type description: str
|
||
:param variables: 变量键值对
|
||
:type variables: List[Mapping[str, str]]
|
||
:param status: 案例状态(删除 正常)
|
||
:type status: str
|
||
"""
|
||
tool = Tool(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
tool_type=TOOL_TYPE.VARIABLE_DEFINITION,
|
||
)
|
||
variable_definition_tool = cls()
|
||
variable_definition_tool.variable_definition_list = []
|
||
for var in variables:
|
||
variable_definition_tool.variable_definition_list.append(
|
||
VariableDefinitionList(name=var.get('name', ''),
|
||
value=var.get('value', ''),
|
||
description=var.get('description', ''))
|
||
)
|
||
variable_definition_tool.tool = tool
|
||
db.session.add(tool)
|
||
db.session.add(variable_definition_tool)
|
||
db.session.commit()
|
||
return variable_definition_tool
|
||
|
||
def copy_from_self(self):
|
||
"""
|
||
复制一个新的工具
|
||
:return: 新的工具
|
||
:rtype: Tool
|
||
"""
|
||
# 先获取变量键值对数据
|
||
copy_tool = self.add(
|
||
# Tool
|
||
name='Copy' + self.tool.name,
|
||
description=self.tool.description,
|
||
status=self.tool.status,
|
||
# VariableDefinitionTool
|
||
variables=self.variables,
|
||
)
|
||
return copy_tool
|
||
|
||
|
||
class VariableDefinitionList(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 变量名
|
||
value: 变量值
|
||
description: 注释
|
||
关系说明
|
||
variable_definition_tool: 变量设置列表对应的变量工具对象 VariableDefinitionList:VariableDefinitionTool is N:1
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['name', 'value', 'description']
|
||
# filed
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
value = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
description = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
variable_definition_tool_id = db.Column(db.Integer, db.ForeignKey('variable_definition_tool.id'))
|
||
variable_definition_tool = db.relationship('VariableDefinitionTool', back_populates='variable_definition_list')
|
||
|
||
@property
|
||
def name_(self):
|
||
return _p(self.name)
|
||
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
|
||
class HTTPHeaderManagerTool(db.Model):
|
||
"""
|
||
关系说明
|
||
tool: HTTP请求头对应的工具对象 HTTPHeaderManagerTool:Tool is 1:1
|
||
http_header_manager_list: HTTP请求头工具所包含的所有变量与值数据 HTTPHeaderManagerTool:HTTPHeaderManagerList is 1:N
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
|
||
# relationship
|
||
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
|
||
tool = db.relationship('Tool')
|
||
http_header_manager_list = db.relationship('HTTPHeaderManagerList',
|
||
back_populates='http_header_manager_tool',
|
||
cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def variables(self):
|
||
return [{
|
||
'name': row.name,
|
||
'value': row.value,
|
||
'description': row.description,
|
||
} for row in self.http_header_manager_list]
|
||
|
||
@classmethod
|
||
def new_tool(cls):
|
||
"""新增默认"""
|
||
http_header_manager_tool = cls()
|
||
http_header_manager_tool.http_header_manager_list = []
|
||
http_header_manager_tool.http_header_manager_list.append(
|
||
HTTPHeaderManagerList(name='header_name', value='header_value', description='')
|
||
)
|
||
db.session.add(http_header_manager_tool)
|
||
db.session.commit()
|
||
return http_header_manager_tool
|
||
|
||
@classmethod
|
||
def update(cls, name, description, tool_id, variables):
|
||
"""
|
||
更新工具数据
|
||
:param tool_id: 工具编号
|
||
:type tool_id: int
|
||
:param name: 名称
|
||
:type name: str
|
||
:param description: 注释
|
||
:type description: str
|
||
:param variables: 请求头键值对列表
|
||
:type variables: List[Mapping[str, str]]
|
||
"""
|
||
tool = Tool.query.filter_by(id=tool_id).first()
|
||
if tool:
|
||
tool.name = name
|
||
tool.description = description
|
||
http_header_manager_tool = tool.specific_tool
|
||
http_header_manager_tool.http_header_manager_list = []
|
||
for var in variables:
|
||
http_header_manager_tool.http_header_manager_list.append(
|
||
HTTPHeaderManagerList(name=var.get('name', ''),
|
||
value=var.get('value', ''),
|
||
description=var.get('description', ''))
|
||
)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def add(cls, name, description, variables, status=STATUS.NORMAL):
|
||
"""
|
||
新增一条工具数据
|
||
:param name: 名称
|
||
:type name: str
|
||
:param description: 注释
|
||
:type description: str
|
||
:param variables: 请求头键值对
|
||
:type variables: List[Mapping[str, str]]
|
||
:param status: 案例状态(删除 正常)
|
||
:type status: str
|
||
"""
|
||
tool = Tool(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
tool_type=TOOL_TYPE.HTTP_HEADER_MANAGER,
|
||
)
|
||
http_header_manager_tool = cls()
|
||
http_header_manager_tool.http_header_manager_list = []
|
||
for var in variables:
|
||
http_header_manager_tool.http_header_manager_list.append(
|
||
HTTPHeaderManagerList(name=var.get('name', ''),
|
||
value=var.get('value', ''),
|
||
description=var.get('description', ''))
|
||
)
|
||
http_header_manager_tool.tool = tool
|
||
db.session.add(tool)
|
||
db.session.add(http_header_manager_tool)
|
||
db.session.commit()
|
||
return http_header_manager_tool
|
||
|
||
def copy_from_self(self):
|
||
"""
|
||
复制一个新的工具
|
||
:return: 新的工具
|
||
:rtype: Tool
|
||
"""
|
||
# 先获取变量键值对数据
|
||
copy_tool = self.add(
|
||
# Tool
|
||
name='Copy' + self.tool.name,
|
||
description=self.tool.description,
|
||
status=self.tool.status,
|
||
# HTTPHeaderManagerTool
|
||
variables=self.variables,
|
||
)
|
||
return copy_tool
|
||
|
||
|
||
class HTTPHeaderManagerList(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 名称
|
||
value: 值
|
||
description: 注释
|
||
关系说明
|
||
http_header_manager_tool: HTTP请求头数据列表对应的HTTP请求头管理工具对象 HTTPHeaderManagerList:HTTPHeaderManagerTool is N:1
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['name', 'value', 'description']
|
||
# filed
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
value = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
description = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
http_header_manager_tool_id = db.Column(db.Integer, db.ForeignKey('http_header_manager_tool.id'))
|
||
http_header_manager_tool = db.relationship('HTTPHeaderManagerTool',
|
||
back_populates='http_header_manager_list')
|
||
|
||
@property
|
||
def name_(self):
|
||
return _p(self.name)
|
||
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
|
||
class HTTPCookieManagerTool(db.Model):
|
||
"""
|
||
关系说明
|
||
tool: HTTPCookieManagerTool对应的工具对象 HTTPCookieManagerTool:Tool is 1:1
|
||
http_cookie_manager_list: HTTPCookieManagerTool工具所包含的所有变量与值数据 HTTPCookieManagerTool:HTTPCookieManagerList is 1:N
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
|
||
# relationship
|
||
tool_id = db.Column(db.Integer, db.ForeignKey('tool.id'))
|
||
tool = db.relationship('Tool')
|
||
http_cookie_manager_list = db.relationship('HTTPCookieManagerList',
|
||
back_populates='http_cookie_manager_tool',
|
||
cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def attributes(self):
|
||
return [{
|
||
'name': row.name,
|
||
'value': row.value,
|
||
'domain': row.domain,
|
||
'path': row.path,
|
||
'secure': row.secure,
|
||
} for row in self.http_cookie_manager_list]
|
||
|
||
@classmethod
|
||
def new_tool(cls):
|
||
"""新增默认"""
|
||
http_cookie_manager_tool = cls()
|
||
http_cookie_manager_tool.http_cookie_manager_list = []
|
||
http_cookie_manager_tool.http_cookie_manager_list.append(
|
||
HTTPCookieManagerList(name='cookie_name',
|
||
value='cookie_value',
|
||
domain='localhost',
|
||
path='/',
|
||
secure=False)
|
||
)
|
||
db.session.add(http_cookie_manager_tool)
|
||
db.session.commit()
|
||
return http_cookie_manager_tool
|
||
|
||
@classmethod
|
||
def update(cls, name, description, tool_id, attributes):
|
||
"""
|
||
更新工具数据
|
||
:param tool_id: 工具编号
|
||
:type tool_id: int
|
||
:param name: 名称
|
||
:type name: str
|
||
:param description: 注释
|
||
:type description: str
|
||
:param attributes: HTTPCookieManager数据列表
|
||
:type attributes: List[Mapping[str, str]]
|
||
"""
|
||
tool = Tool.query.filter_by(id=tool_id).first()
|
||
if tool:
|
||
tool.name = name
|
||
tool.description = description
|
||
http_cookie_manager_tool = tool.specific_tool
|
||
http_cookie_manager_tool.http_cookie_manager_list = []
|
||
for attr in attributes:
|
||
http_cookie_manager_tool.http_cookie_manager_list.append(
|
||
HTTPCookieManagerList(name=attr.get('name', ''),
|
||
value=attr.get('value', ''),
|
||
domain=attr.get('domain', ''),
|
||
path=attr.get('path', ''),
|
||
secure=attr.get('secure', False))
|
||
)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def add(cls, name, description, attributes, status=STATUS.NORMAL):
|
||
"""
|
||
新增一条工具数据
|
||
:param name: 名称
|
||
:type name: str
|
||
:param description: 注释
|
||
:type description: str
|
||
:param attributes: HTTPCookieManager数据列表
|
||
:type attributes: List[Mapping[str, str]]
|
||
:param status: 案例状态(删除 正常)
|
||
:type status: str
|
||
"""
|
||
tool = Tool(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
tool_type=TOOL_TYPE.HTTP_COOKIE_MANAGER,
|
||
)
|
||
http_cookie_manager_tool = cls()
|
||
http_cookie_manager_tool.http_cookie_manager_list = []
|
||
for attr in attributes:
|
||
http_cookie_manager_tool.http_cookie_manager_list.append(
|
||
HTTPCookieManagerList(name=attr.get('name', ''),
|
||
value=attr.get('value', ''),
|
||
domain=attr.get('domain', ''),
|
||
path=attr.get('path', ''),
|
||
secure=attr.get('secure', False))
|
||
)
|
||
http_cookie_manager_tool.tool = tool
|
||
db.session.add(tool)
|
||
db.session.add(http_cookie_manager_tool)
|
||
db.session.commit()
|
||
return http_cookie_manager_tool
|
||
|
||
def copy_from_self(self):
|
||
"""
|
||
复制一个新的工具
|
||
:return: 新的工具
|
||
:rtype: Tool
|
||
"""
|
||
# 先获取变量键值对数据
|
||
copy_tool = self.add(
|
||
# Tool
|
||
name='Copy' + self.tool.name,
|
||
description=self.tool.description,
|
||
status=self.tool.status,
|
||
# HTTPCookieManagerTool
|
||
attributes=self.attributes,
|
||
)
|
||
return copy_tool
|
||
|
||
|
||
class HTTPCookieManagerList(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 名称
|
||
value: 值
|
||
domain: 域
|
||
path: 路径
|
||
secure: 安全
|
||
关系说明
|
||
http_cookie_manager_tool: HTTPCookieManager数据列表对应的HTTPCookieManager工具对象 HTTPCookieManagerList:HTTPCookieManagerTool is N:1
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['name', 'value', 'domain', 'path', 'secure']
|
||
# filed
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
domain = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
path = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
secure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
|
||
# relationship
|
||
http_cookie_manager_tool_id = db.Column(db.Integer, db.ForeignKey('http_cookie_manager_tool.id'))
|
||
http_cookie_manager_tool = db.relationship('HTTPCookieManagerTool',
|
||
back_populates='http_cookie_manager_list')
|
||
|
||
@property
|
||
def name_(self):
|
||
return _p(self.name)
|
||
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
@property
|
||
def domain_(self):
|
||
return _p(self.domain)
|
||
|
||
@property
|
||
def path_(self):
|
||
return _p(self.path)
|
||
|
||
|
||
class Case(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 案例名
|
||
description: 案例注释
|
||
status: 案例状态(删除 正常)
|
||
case_type: 案例类型
|
||
关系说明
|
||
scene: 案例所属场景 Scene:Case is 1:N
|
||
*specific_case: 不同案例类型具体数据 Case:OtherTypeCase is 1:1 OtherTypeCase可以是HTTPCase、SQLCase...
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True) # TODO: 从10000开始自增?
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
description = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
status = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
case_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
scene_id = db.Column(db.Integer, db.ForeignKey('scene.id'))
|
||
scene = db.relationship('Scene', back_populates='cases')
|
||
|
||
@property
|
||
def specific_case(self):
|
||
if self.case_type == CASE_TYPE.HTTP:
|
||
return HTTPCase.query.filter_by(case_id=self.id).first()
|
||
if self.case_type == CASE_TYPE.SSH:
|
||
return SSHCase.query.filter_by(case_id=self.id).first()
|
||
if self.case_type == CASE_TYPE.SQL:
|
||
return SQLCase.query.filter_by(case_id=self.id).first()
|
||
if self.case_type == CASE_TYPE.DEBUG:
|
||
return DebugCase.query.filter_by(case_id=self.id).first()
|
||
|
||
@property
|
||
def parent_logic_controller(self):
|
||
"""获取case所在的父逻辑控制器对象"""
|
||
sub_element_in_logic_controller = SubElementInLogicController.query.filter_by(
|
||
element_type=ELEMENT_TYPE.CASE,
|
||
element_id=self.id).first()
|
||
if sub_element_in_logic_controller is not None:
|
||
return sub_element_in_logic_controller.logic_controller
|
||
|
||
@property
|
||
def has_effective_preprocessor_script(self):
|
||
"""判断当前case是否包含有效的预处理脚本"""
|
||
script = self.specific_case.preprocessor_script
|
||
return script.strip() != ''
|
||
|
||
@property
|
||
def has_effective_postprocessor_script(self):
|
||
"""判断当前case是否包含有效的后处理脚本"""
|
||
script = self.specific_case.postprocessor_script
|
||
return script.strip() != ''
|
||
|
||
@classmethod
|
||
def new_case(cls, case_type):
|
||
"""
|
||
:param case_type: 案例组件类型
|
||
:return: 特定类型案例对象
|
||
:rtype: Union[HTTPCase, SSHCase, SQLCase, DebugCase]
|
||
"""
|
||
case = __class__(
|
||
name='新的' + case_type + '测试案例',
|
||
description='',
|
||
case_type=case_type,
|
||
status=STATUS.NORMAL,
|
||
)
|
||
if case_type == CASE_TYPE.HTTP:
|
||
specific_case = HTTPCase(protocol='HTTP', method='GET')
|
||
specific_case.case = case
|
||
elif case_type == CASE_TYPE.SSH:
|
||
specific_case = SSHCase(host_name='127.0.0.1', port='22', connection_timeout='5000', method='SSH')
|
||
specific_case.case = case
|
||
elif case_type == CASE_TYPE.SQL:
|
||
specific_case = SQLCase(host='localhost', port='3306', connect_timeout='10', db_type=DB_TYPE.MYSQL,
|
||
charset='utf8')
|
||
specific_case.case = case
|
||
elif case_type == CASE_TYPE.DEBUG:
|
||
specific_case = DebugCase(project_variable=True)
|
||
specific_case.case = case
|
||
else:
|
||
raise ValueError('不支持当前case_type类型, case_type=%s' % case_type)
|
||
db.session.add(specific_case)
|
||
db.session.commit()
|
||
return specific_case
|
||
|
||
@classmethod
|
||
def update(cls, id, name, description):
|
||
"""
|
||
更新一条案例
|
||
:param id: 案例编号
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
"""
|
||
case = cls.query.filter_by(id=int(id)).first()
|
||
if case:
|
||
case.name = name
|
||
case.description = description
|
||
db.session.commit()
|
||
return case
|
||
|
||
@classmethod
|
||
def delete(cls, id):
|
||
"""不会真正从数据库删除,更改status状态为0"""
|
||
case = cls.query.filter_by(id=int(id)).first()
|
||
if case is not None:
|
||
case.status = STATUS.DELETED
|
||
# case所在的逻辑组件序号改为-1
|
||
SubElementInLogicController.update_order(element_id=case.id, element_type=ELEMENT_TYPE.CASE,
|
||
order_in_logic_controller=-1)
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def exist_and_status_not_delete(cls, id) -> bool:
|
||
"""根据id判断该案例存在且状态正常(未删除)"""
|
||
case = cls.query.filter(cls.id == int(id),
|
||
cls.status.in_([STATUS.NORMAL, STATUS.FORBIDDEN])).first()
|
||
return True if case is not None else False
|
||
|
||
@classmethod
|
||
def copy_from_id(cls, case_id, scene_id, logic_controller_id=None):
|
||
"""
|
||
根据id复制一个新的case
|
||
:param case_id: 被复制的case id
|
||
:type case_id: int
|
||
:param scene_id: 新case所属的scene_id, 默认为被复制case的scene_id
|
||
:type scene_id: int
|
||
:param logic_controller_id: 新case所属的父逻辑控制id
|
||
:type logic_controller_id: int
|
||
:rtype: Case
|
||
"""
|
||
case = cls.query.filter_by(id=int(case_id)).first()
|
||
if case:
|
||
copy_specific_case = case.specific_case.copy_from_self(scene_id=scene_id)
|
||
# 在SubElementInLogicController表中插入一条子组件数据
|
||
SubElementInLogicController.add(element_id=copy_specific_case.case.id, element_type=ELEMENT_TYPE.CASE,
|
||
logic_controller_id=logic_controller_id)
|
||
return copy_specific_case.case
|
||
|
||
@classmethod
|
||
def forbidden(cls, id):
|
||
"""禁用测试组件"""
|
||
case = cls.query.filter_by(id=int(id)).first()
|
||
if case:
|
||
case.status = STATUS.FORBIDDEN
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def enable(cls, id):
|
||
"""启用测试组件"""
|
||
case = cls.query.filter_by(id=int(id)).first()
|
||
if case:
|
||
case.status = STATUS.NORMAL
|
||
db.session.commit()
|
||
|
||
|
||
class HTTPCase(db.Model):
|
||
"""
|
||
字段说明
|
||
protocol: 协议
|
||
domain: 服务器名称或IP
|
||
port: 端口
|
||
method: 方法
|
||
path: url路径
|
||
encoding: 编码
|
||
expectation_logic: 期望结果逻辑处理(与 或)
|
||
message_body: 消息体数据
|
||
content_type: HTTP请求体数据类型
|
||
preprocessor_script: 预处理脚本
|
||
postprocessor_script: 后处理脚本
|
||
postprocessor_failure: 后处理断言失败
|
||
postprocessor_failure_message: 后处理断言失败信息
|
||
关系说明
|
||
case: 案例组件 Case:HTTPCase is 1:1
|
||
parameters: 案例参数 HTTPCase:HTTPCaseParameter is 1:N
|
||
headers: 请求头参数 HTTPCase:HTTPCaseHeader is 1:N
|
||
expectations: 案例期望 HTTPCase:HTTPCaseExpectation is 1:N
|
||
file_upload: 文件上传参数 HTTPCase:HTTPCaseFileUpload is 1:N
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
protocol = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
domain = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
port = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
method = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
path = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
encoding = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND, server_default=EXPECTATION_LOGIC.AND)
|
||
message_body = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
content_type = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_failure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
case_id = db.Column(db.Integer, db.ForeignKey('case.id'))
|
||
case = db.relationship('Case')
|
||
parameters = db.relationship('HTTPCaseParameter', back_populates='http_case', cascade='all, delete-orphan') # 解除与case的关系后删除
|
||
headers = db.relationship('HTTPCaseHeader', back_populates='http_case', cascade='all, delete-orphan')
|
||
expectations = db.relationship('HTTPCaseExpectation', back_populates='http_case', cascade='all, delete-orphan')
|
||
file_upload = db.relationship('HTTPCaseFileUpload', back_populates='http_case', cascade='all, delete-orphan')
|
||
|
||
# 解析处理
|
||
@property
|
||
def domain_(self):
|
||
return _p(self.domain)
|
||
|
||
@property
|
||
def port_(self):
|
||
return _p(self.port)
|
||
|
||
@property
|
||
def path_(self):
|
||
return _p(self.path)
|
||
|
||
@property
|
||
def encoding_(self):
|
||
return _p(self.encoding)
|
||
|
||
@property
|
||
def message_body_(self):
|
||
return _p(self.message_body)
|
||
|
||
@property
|
||
def preprocessor_script_(self):
|
||
return _p(self.preprocessor_script)
|
||
|
||
@property
|
||
def postprocessor_script_(self):
|
||
return _p(self.postprocessor_script)
|
||
|
||
@classmethod
|
||
def add(cls, name, description, scene_id, protocol, domain, port, method, path, encoding, expectation_logic,
|
||
message_body, content_type, preprocessor_script, postprocessor_script, status=STATUS.NORMAL,
|
||
parameters=None, headers=None, file_upload=None, expectations=None):
|
||
"""
|
||
新增一条案例
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param status: 案例状态(删除 正常)
|
||
:param scene_id: 所属的场景编号scene_id
|
||
:param protocol: 协议
|
||
:param domain: 服务器名称或IP
|
||
:param port: 端口
|
||
:param method: 方法
|
||
:param path: url路径
|
||
:param encoding: 编码
|
||
:param expectation_logic: 期望结果逻辑处理
|
||
:param message_body: 消息体数据
|
||
:param content_type: 请求体数据类型
|
||
:param preprocessor_script: 预处理脚本
|
||
:param postprocessor_script: 后处理脚本
|
||
:param parameters: 参数数据
|
||
:param headers: 请求头数据
|
||
:param file_upload: 文件上传参数数据
|
||
:param expectations: 期望数据
|
||
"""
|
||
case = Case(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
scene_id=scene_id,
|
||
case_type=CASE_TYPE.HTTP,
|
||
)
|
||
http_case = cls(
|
||
protocol=protocol,
|
||
domain=domain,
|
||
port=port,
|
||
method=method,
|
||
path=path,
|
||
encoding=encoding,
|
||
expectation_logic=expectation_logic,
|
||
message_body=message_body,
|
||
content_type=content_type,
|
||
preprocessor_script=preprocessor_script,
|
||
postprocessor_script=postprocessor_script,
|
||
)
|
||
http_case.case = case
|
||
http_case._update_parameters(parameters)
|
||
http_case._update_headers(headers)
|
||
http_case._update_expectations(expectations)
|
||
http_case._update_file_upload(file_upload)
|
||
db.session.add(case)
|
||
db.session.add(http_case)
|
||
db.session.commit()
|
||
return http_case
|
||
|
||
@classmethod
|
||
def update(cls, id, name, description, protocol, domain, port, method, path, encoding, expectation_logic,
|
||
message_body, content_type, preprocessor_script, postprocessor_script, params=None, headers=None,
|
||
file_upload=None, expectations=None):
|
||
"""
|
||
更新一条案例
|
||
:param id: 案例编号
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param protocol: 协议
|
||
:param domain: 服务器名称或IP
|
||
:param port: 端口
|
||
:param method: 方法
|
||
:param path: url路径
|
||
:param encoding: 编码
|
||
:param expectation_logic: 期望结果逻辑处理
|
||
:param message_body: 消息体数据
|
||
:param content_type: 请求体数据类型
|
||
:param preprocessor_script: 预处理脚本
|
||
:param postprocessor_script: 后处理脚本
|
||
:param params: 参数数据
|
||
:param headers: 请求头数据
|
||
:param file_upload: 文件上传参数
|
||
:param expectations: 期望数据
|
||
"""
|
||
case = Case.query.filter_by(id=int(id)).first()
|
||
http_case = case.specific_case
|
||
if case:
|
||
case.name = name
|
||
case.description = description
|
||
if http_case:
|
||
http_case.protocol = protocol
|
||
http_case.domain = domain
|
||
http_case.port = port
|
||
http_case.method = method
|
||
http_case.path = path
|
||
http_case.encoding = encoding
|
||
http_case.expectation_logic = expectation_logic
|
||
http_case.message_body = message_body
|
||
http_case.content_type = content_type
|
||
http_case.preprocessor_script = preprocessor_script
|
||
http_case.postprocessor_script = postprocessor_script
|
||
http_case._update_parameters(params)
|
||
http_case._update_headers(headers)
|
||
http_case._update_file_upload(file_upload)
|
||
http_case._update_expectations(expectations)
|
||
db.session.commit()
|
||
|
||
def copy_from_self(self, scene_id):
|
||
"""
|
||
复制一个新的案例
|
||
:param scene_id: 新case所属的scene id,如果是None则新案例还在当前测试场景中
|
||
:return: 新的案例
|
||
:rtype: Case
|
||
"""
|
||
copy_case = self.add(
|
||
# Case
|
||
name='Copy' + self.case.name,
|
||
description=self.case.description,
|
||
status=self.case.status,
|
||
scene_id=self.case.scene_id if scene_id is None else scene_id, # if scene_id is None 表示
|
||
# HTTPCase
|
||
protocol=self.protocol,
|
||
domain=self.domain,
|
||
port=self.port,
|
||
method=self.method,
|
||
path=self.path,
|
||
encoding=self.encoding,
|
||
expectation_logic=self.expectation_logic,
|
||
message_body=self.message_body,
|
||
content_type=self.content_type,
|
||
preprocessor_script=self.preprocessor_script,
|
||
postprocessor_script=self.postprocessor_script,
|
||
parameters=self.parameters,
|
||
headers=self.headers,
|
||
expectations=self.expectations,
|
||
file_upload=self.file_upload,
|
||
)
|
||
return copy_case
|
||
|
||
def update_postprocessor_failure(self, postprocessor_failure, postprocessor_failure_message):
|
||
"""更新后处理脚本结果和错误信息"""
|
||
self.postprocessor_failure = postprocessor_failure
|
||
self.postprocessor_failure_message = postprocessor_failure_message
|
||
db.session.commit()
|
||
|
||
def _update_parameters(self, parameters):
|
||
self.parameters = []
|
||
if parameters is not None:
|
||
for _parameter in parameters:
|
||
if hasattr(_parameter, 'get'):
|
||
parameter = HTTPCaseParameter(
|
||
name=_parameter.get('name'),
|
||
value=_parameter.get('value'),
|
||
url_encode=_parameter.get('url_encode'),
|
||
content_type=_parameter.get('content_type'),
|
||
include_equals=_parameter.get('include_equals'),
|
||
)
|
||
else: # copy_from_self func exec this branch
|
||
parameter = HTTPCaseParameter(
|
||
name=_parameter.name,
|
||
value=_parameter.value,
|
||
url_encode=_parameter.url_encode,
|
||
content_type=_parameter.content_type,
|
||
include_equals=_parameter.include_equals,
|
||
)
|
||
self.parameters.append(parameter)
|
||
|
||
def _update_headers(self, headers):
|
||
self.headers = []
|
||
if headers is not None:
|
||
for _header in headers:
|
||
if hasattr(_header, 'get'):
|
||
header = HTTPCaseHeader(
|
||
name=_header.get('name'),
|
||
value=_header.get('value'),
|
||
)
|
||
else: # copy_from_self func exec this branch
|
||
header = HTTPCaseHeader(
|
||
name=_header.name,
|
||
value=_header.value,
|
||
)
|
||
self.headers.append(header)
|
||
|
||
def _update_expectations(self, expectations):
|
||
self.expectations = []
|
||
if expectations is not None:
|
||
for _expectation in expectations:
|
||
if hasattr(_expectation, 'get'):
|
||
expectation = HTTPCaseExpectation(
|
||
test_field=_expectation.get('test_field'),
|
||
value=_expectation.get('value'),
|
||
matching_rule=_expectation.get('matching_rule'),
|
||
last_result=_expectation.get('last_result'),
|
||
last_failure_msg=_expectation.get('last_failure_msg'),
|
||
negater=_expectation.get('negater'),
|
||
)
|
||
else: # copy_from_self func exec this branch
|
||
expectation = HTTPCaseExpectation(
|
||
test_field=_expectation.test_field,
|
||
value=_expectation.value,
|
||
matching_rule=_expectation.matching_rule,
|
||
last_result=_expectation.last_result,
|
||
last_failure_msg=_expectation.last_failure_msg,
|
||
negater=_expectation.negater,
|
||
)
|
||
self.expectations.append(expectation)
|
||
|
||
def _update_file_upload(self, file_upload):
|
||
self.file_upload = []
|
||
if file_upload is not None:
|
||
for _file_upload in file_upload:
|
||
if hasattr(_file_upload, 'get'):
|
||
instance = HTTPCaseFileUpload(
|
||
name=_file_upload.get('name'),
|
||
value=_file_upload.get('value'),
|
||
)
|
||
else: # copy_from_self func exec this branch
|
||
instance = HTTPCaseFileUpload(
|
||
name=_file_upload.name,
|
||
value=_file_upload.value,
|
||
)
|
||
self.file_upload.append(instance)
|
||
|
||
|
||
class HTTPCaseParameter(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 参数名
|
||
value: 参数值
|
||
url_encode: 编码?
|
||
content_type: 内容类型
|
||
include_equals: 包含等于?
|
||
关系说明
|
||
http_case: 参数所属案例 HTTPCase:HTTPCaseParameter is 1:N
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['name', 'value', 'url_encode', 'content_type', 'include_equals']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
url_encode = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
content_type = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
include_equals = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
|
||
# relationship
|
||
http_case_id = db.Column(db.Integer, db.ForeignKey('http_case.id'))
|
||
http_case = db.relationship('HTTPCase', back_populates='parameters')
|
||
|
||
# 解析处理
|
||
@property
|
||
def name_(self):
|
||
return _p(self.name)
|
||
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
|
||
class HTTPCaseHeader(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 参数名
|
||
value: 参数值
|
||
关系说明
|
||
http_case: 请求头所属HTTP案例 HTTPCase:HTTPCaseHeader is 1:N
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['name', 'value']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
value = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
http_case_id = db.Column(db.Integer, db.ForeignKey('http_case.id'))
|
||
http_case = db.relationship('HTTPCase', back_populates='headers')
|
||
|
||
# 解析处理
|
||
@property
|
||
def name_(self):
|
||
return _p(self.name)
|
||
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
|
||
class HTTPCaseExpectation(db.Model):
|
||
"""
|
||
字段说明
|
||
test_field: 测试字段
|
||
value: 期望值
|
||
matching_rule: 匹配模式
|
||
last_result: 上次断言结果
|
||
last_failure_msg: 上次断言信息
|
||
negater: 断言结果取反
|
||
关系说明
|
||
http_case: 期望所属案例 HTTPCase:HTTPCaseExpectation is 1:N
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['test_field', 'value', 'matching_rule', 'last_result', 'last_failure_msg', 'negater']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
last_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
last_failure_msg = db.Column(db.Text, nullable=False, default='初次执行前默认失败', server_default='初次执行前默认失败')
|
||
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
|
||
# relationship
|
||
http_case_id = db.Column(db.Integer, db.ForeignKey('http_case.id'))
|
||
http_case = db.relationship('HTTPCase', back_populates='expectations')
|
||
|
||
last_failure_msg_max_length = 1000
|
||
|
||
# 解析处理
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
def update_assert_result(self, last_result, last_failure_msg):
|
||
"""更新断言结果"""
|
||
self.last_result = last_result
|
||
if isinstance(last_failure_msg, str):
|
||
if len(last_failure_msg) > __class__.last_failure_msg_max_length:
|
||
self.last_failure_msg = last_failure_msg[:__class__.last_failure_msg_max_length] + '... (内容过长已截断)'
|
||
else:
|
||
self.last_failure_msg = last_failure_msg
|
||
else:
|
||
self.last_failure_msg = ''
|
||
db.session.commit()
|
||
|
||
|
||
class HTTPCaseFileUpload(db.Model):
|
||
"""
|
||
字段说明
|
||
name: 参数名
|
||
value: 参数值
|
||
关系说明
|
||
http_case: 文件上传参数所属案例 HTTPCase:HTTPCaseFileUpload is 1:N
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['name', 'value']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
http_case_id = db.Column(db.Integer, db.ForeignKey('http_case.id'))
|
||
http_case = db.relationship('HTTPCase', back_populates='file_upload')
|
||
|
||
# 解析处理
|
||
@property
|
||
def name_(self):
|
||
return _p(self.name)
|
||
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
|
||
class SSHCase(db.Model):
|
||
"""
|
||
字段说明
|
||
host_name: 主机名
|
||
port: 端口号
|
||
connection_timeout: 超时时间
|
||
user_name: 用户名
|
||
password: 密码
|
||
command: 命令内容
|
||
expectation_logic: 期望结果逻辑处理(与 或)
|
||
method: 方法默认为'SSH'
|
||
preprocessor_script: 预处理脚本
|
||
postprocessor_script: 后处理脚本
|
||
postprocessor_failure: 后处理断言失败
|
||
postprocessor_failure_message: 后处理断言失败信息
|
||
关系说明
|
||
case: 案例组件 Case:SSHCase is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
host_name = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
port = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
connection_timeout = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
user_name = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
password = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
command = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND, server_default=EXPECTATION_LOGIC.AND)
|
||
method = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_failure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
case_id = db.Column(db.Integer, db.ForeignKey('case.id'))
|
||
case = db.relationship('Case')
|
||
expectations = db.relationship('SSHCaseExpectation', back_populates='ssh_case', cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def host_name_(self):
|
||
return _p(self.host_name)
|
||
|
||
@property
|
||
def port_(self):
|
||
return _p(self.port)
|
||
|
||
@property
|
||
def connection_timeout_(self):
|
||
return _p(self.connection_timeout)
|
||
|
||
@property
|
||
def user_name_(self):
|
||
return _p(self.user_name)
|
||
|
||
@property
|
||
def password_(self):
|
||
return _p(self.password)
|
||
|
||
@property
|
||
def command_(self):
|
||
return _p(self.command)
|
||
|
||
@property
|
||
def preprocessor_script_(self):
|
||
return _p(self.preprocessor_script)
|
||
|
||
@property
|
||
def postprocessor_script_(self):
|
||
return _p(self.postprocessor_script)
|
||
|
||
@classmethod
|
||
def add(cls, name, description, scene_id, host_name, port, method, connection_timeout, user_name, password, command,
|
||
expectation_logic, preprocessor_script, postprocessor_script, status=STATUS.NORMAL, expectations=None):
|
||
"""
|
||
新增一条案例
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param status: 案例状态(删除 正常)
|
||
:param scene_id: 所属的场景编号scene_id
|
||
:param host_name: 主机名
|
||
:param port: 端口
|
||
:param method: 方法
|
||
:param connection_timeout: 超时时间
|
||
:param user_name: 用户名
|
||
:param password: 密码
|
||
:param command: 命令内容
|
||
:param expectation_logic: 期望结果逻辑处理
|
||
:param preprocessor_script: 预处理脚本
|
||
:param postprocessor_script: 后处理脚本
|
||
:param expectations: 期望数据
|
||
"""
|
||
case = Case(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
scene_id=scene_id,
|
||
case_type=CASE_TYPE.SSH,
|
||
)
|
||
ssh_case = cls(
|
||
host_name=host_name,
|
||
port=port,
|
||
connection_timeout=connection_timeout,
|
||
method=method,
|
||
user_name=user_name,
|
||
password=password,
|
||
command=command,
|
||
expectation_logic=expectation_logic,
|
||
preprocessor_script=preprocessor_script,
|
||
postprocessor_script=postprocessor_script,
|
||
)
|
||
ssh_case.case = case
|
||
ssh_case._update_expectations(expectations)
|
||
db.session.add(case)
|
||
db.session.add(ssh_case)
|
||
db.session.commit()
|
||
return ssh_case
|
||
|
||
@classmethod
|
||
def update(cls, id, name, description, host_name, port, connection_timeout, user_name, password, command,
|
||
expectation_logic, preprocessor_script, postprocessor_script, expectations=None):
|
||
"""
|
||
更新一条案例
|
||
:param id: 案例编号
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param host_name: 主机名
|
||
:param port: 端口
|
||
:param connection_timeout: 超时时间
|
||
:param user_name: 用户名
|
||
:param password: 密码
|
||
:param expectation_logic: 期望结果逻辑处理
|
||
:param command: 命令内容
|
||
:param preprocessor_script: 预处理脚本
|
||
:param postprocessor_script: 后处理脚本
|
||
:param expectations: 期望数据
|
||
"""
|
||
case = Case.query.filter_by(id=int(id)).first()
|
||
ssh_case = case.specific_case
|
||
if case:
|
||
case.name = name
|
||
case.description = description
|
||
if ssh_case:
|
||
ssh_case.host_name = host_name
|
||
ssh_case.port = port
|
||
ssh_case.connection_timeout = connection_timeout
|
||
ssh_case.user_name = user_name
|
||
ssh_case.password = password
|
||
ssh_case.command = command
|
||
ssh_case.expectation_logic = expectation_logic
|
||
ssh_case.preprocessor_script = preprocessor_script
|
||
ssh_case.postprocessor_script = postprocessor_script
|
||
ssh_case._update_expectations(expectations)
|
||
db.session.commit()
|
||
|
||
def copy_from_self(self, scene_id):
|
||
"""
|
||
复制一个新的案例
|
||
:param scene_id: 新case所属的scene id,如果是None则新案例还在当前测试场景中
|
||
:return:
|
||
"""
|
||
copy_case = self.add(
|
||
# Case
|
||
name='Copy' + self.case.name,
|
||
description=self.case.description,
|
||
status=self.case.status,
|
||
scene_id=self.case.scene_id if scene_id is None else scene_id, # if scene_id is None 表示
|
||
# SSHCase
|
||
host_name=self.host_name,
|
||
port=self.port,
|
||
method=self.method,
|
||
connection_timeout=self.connection_timeout,
|
||
user_name=self.user_name,
|
||
password=self.password,
|
||
command=self.command,
|
||
expectation_logic=self.expectation_logic,
|
||
preprocessor_script=self.preprocessor_script,
|
||
postprocessor_script=self.postprocessor_script,
|
||
expectations=self.expectations,
|
||
)
|
||
return copy_case
|
||
|
||
def _update_expectations(self, expectations):
|
||
self.expectations = []
|
||
if expectations is not None:
|
||
for _expectation in expectations:
|
||
if hasattr(_expectation, 'get'):
|
||
expectation = SSHCaseExpectation(
|
||
test_field=_expectation.get('test_field'),
|
||
value=_expectation.get('value'),
|
||
matching_rule=_expectation.get('matching_rule'),
|
||
last_result=_expectation.get('last_result'),
|
||
last_failure_msg=_expectation.get('last_failure_msg'),
|
||
negater=_expectation.get('negater'),
|
||
)
|
||
else: # copy_from_self func exec this branch
|
||
expectation = SSHCaseExpectation(
|
||
test_field=_expectation.test_field,
|
||
value=_expectation.value,
|
||
matching_rule=_expectation.matching_rule,
|
||
last_result=_expectation.last_result,
|
||
last_failure_msg=_expectation.last_failure_msg,
|
||
negater=_expectation.negater,
|
||
)
|
||
self.expectations.append(expectation)
|
||
|
||
def update_postprocessor_failure(self, postprocessor_failure, postprocessor_failure_message):
|
||
"""更新后处理脚本结果和错误信息"""
|
||
self.postprocessor_failure = postprocessor_failure
|
||
self.postprocessor_failure_message = postprocessor_failure_message
|
||
db.session.commit()
|
||
|
||
|
||
class SSHCaseExpectation(db.Model):
|
||
"""
|
||
字段说明
|
||
test_field: 测试字段
|
||
value: 期望值
|
||
matching_rule: 匹配模式
|
||
last_result: 上次断言结果
|
||
last_failure_msg: 上次断言信息
|
||
negater: 断言结果取反
|
||
关系说明
|
||
ssh_case: 期望所属案例 SSHCase:SSHCaseExpectation is 1:N
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['test_field', 'value', 'matching_rule', 'last_result', 'last_failure_msg', 'negater']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
last_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
last_failure_msg = db.Column(db.Text, nullable=False, default='初次执行前默认失败', server_default='初次执行前默认失败')
|
||
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
|
||
# relationship
|
||
ssh_case_id = db.Column(db.Integer, db.ForeignKey('ssh_case.id'))
|
||
ssh_case = db.relationship('SSHCase', back_populates='expectations')
|
||
|
||
last_failure_msg_max_length = 1000
|
||
|
||
# 解析处理
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
def update_assert_result(self, last_result, last_failure_msg):
|
||
"""更新断言结果"""
|
||
self.last_result = last_result
|
||
if isinstance(last_failure_msg, str):
|
||
if len(last_failure_msg) > __class__.last_failure_msg_max_length:
|
||
self.last_failure_msg = last_failure_msg[:__class__.last_failure_msg_max_length] + '... (内容过长已截断)'
|
||
else:
|
||
self.last_failure_msg = last_failure_msg
|
||
else:
|
||
self.last_failure_msg = ''
|
||
db.session.commit()
|
||
|
||
|
||
class SQLCase(db.Model):
|
||
"""
|
||
字段说明
|
||
host: 主机地址
|
||
port: 端口号
|
||
connect_timeout: 超时时间 单位s
|
||
user: 用户名
|
||
password: 密码
|
||
sql: sql脚本
|
||
expectation_logic: 期望结果逻辑处理(与 或)
|
||
db_type: 数据库类型 DB_TYPE
|
||
charset: 字符集
|
||
preprocessor_script: 预处理脚本
|
||
postprocessor_script: 后处理脚本
|
||
postprocessor_failure: 后处理断言失败
|
||
postprocessor_failure_message: 后处理断言失败信息
|
||
关系说明
|
||
case: 案例组件 Case:SQLCase is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
host = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
port = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
connect_timeout = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
user = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
password = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
sql = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND, server_default=EXPECTATION_LOGIC.AND)
|
||
db_type = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
charset = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_failure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
case_id = db.Column(db.Integer, db.ForeignKey('case.id'))
|
||
case = db.relationship('Case')
|
||
expectations = db.relationship('SQLCaseExpectation', back_populates='sql_case', cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def host_(self):
|
||
return _p(self.host)
|
||
|
||
@property
|
||
def port_(self):
|
||
return _p(self.port)
|
||
|
||
@property
|
||
def connect_timeout_(self):
|
||
return _p(self.connect_timeout)
|
||
|
||
@property
|
||
def user_(self):
|
||
return _p(self.user)
|
||
|
||
@property
|
||
def password_(self):
|
||
return _p(self.password)
|
||
|
||
@property
|
||
def sql_(self):
|
||
return _p(self.sql)
|
||
|
||
@property
|
||
def charset_(self):
|
||
return _p(self.charset)
|
||
|
||
@property
|
||
def preprocessor_script_(self):
|
||
return _p(self.preprocessor_script)
|
||
|
||
@property
|
||
def postprocessor_script_(self):
|
||
return _p(self.postprocessor_script)
|
||
|
||
@classmethod
|
||
def add(cls, name, description, scene_id, host, port, db_type, connect_timeout, user, password, sql, charset,
|
||
expectation_logic, preprocessor_script, postprocessor_script, status=STATUS.NORMAL, expectations=None):
|
||
"""
|
||
新增一条案例
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param status: 案例状态(删除 正常)
|
||
:param scene_id: 所属的场景编号scene_id
|
||
:param host: 主机地址
|
||
:param port: 端口
|
||
:param db_type: 数据库类型
|
||
:param connect_timeout: 超时时间 单位s
|
||
:param user: 用户名
|
||
:param password: 密码
|
||
:param sql: sql脚本
|
||
:param charset: 字符集
|
||
:param expectation_logic: 期望结果逻辑处理
|
||
:param preprocessor_script: 预处理脚本
|
||
:param postprocessor_script: 后处理脚本
|
||
:param expectations: 期望数据
|
||
"""
|
||
case = Case(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
scene_id=scene_id,
|
||
case_type=CASE_TYPE.SQL,
|
||
)
|
||
sql_case = cls(
|
||
host=host,
|
||
port=port,
|
||
connect_timeout=connect_timeout,
|
||
db_type=db_type,
|
||
user=user,
|
||
password=password,
|
||
sql=sql,
|
||
charset=charset,
|
||
expectation_logic=expectation_logic,
|
||
preprocessor_script=preprocessor_script,
|
||
postprocessor_script=postprocessor_script,
|
||
)
|
||
sql_case.case = case
|
||
sql_case._update_expectations(expectations)
|
||
db.session.add(case)
|
||
db.session.add(sql_case)
|
||
db.session.commit()
|
||
return sql_case
|
||
|
||
@classmethod
|
||
def update(cls, id, name, description, host, port, connect_timeout, user, password, db_type, sql, charset,
|
||
expectation_logic, preprocessor_script, postprocessor_script, expectations=None):
|
||
"""
|
||
更新一条案例
|
||
:param id: 案例编号
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param host: 主机地址
|
||
:param port: 端口
|
||
:param connect_timeout: 超时时间 单位s
|
||
:param user: 用户名
|
||
:param password: 密码
|
||
:param expectation_logic: 期望结果逻辑处理
|
||
:param db_type: 数据库类型
|
||
:param sql: sql脚本
|
||
:param charset: 字符集
|
||
:param preprocessor_script: 预处理脚本
|
||
:param postprocessor_script: 后处理脚本
|
||
:param expectations: 期望数据
|
||
"""
|
||
case = Case.query.filter_by(id=int(id)).first()
|
||
sql_case = case.specific_case
|
||
if case:
|
||
case.name = name
|
||
case.description = description
|
||
if sql_case:
|
||
sql_case.host = host
|
||
sql_case.port = port
|
||
sql_case.connect_timeout = connect_timeout
|
||
sql_case.user = user
|
||
sql_case.password = password
|
||
sql_case.db_type = db_type
|
||
sql_case.sql = sql
|
||
sql_case.charset = charset
|
||
sql_case.expectation_logic = expectation_logic
|
||
sql_case.preprocessor_script = preprocessor_script
|
||
sql_case.postprocessor_script = postprocessor_script
|
||
sql_case._update_expectations(expectations)
|
||
db.session.commit()
|
||
|
||
def copy_from_self(self, scene_id):
|
||
"""
|
||
复制一个新的案例
|
||
:param scene_id: 新case所属的scene id,如果是None则新案例还在当前测试场景中
|
||
:return:
|
||
"""
|
||
copy_case = self.add(
|
||
# Case
|
||
name='Copy' + self.case.name,
|
||
description=self.case.description,
|
||
status=self.case.status,
|
||
scene_id=self.case.scene_id if scene_id is None else scene_id, # if scene_id is None 表示
|
||
# SQLCase
|
||
host=self.host,
|
||
port=self.port,
|
||
db_type=self.db_type,
|
||
connect_timeout=self.connect_timeout,
|
||
user=self.user,
|
||
password=self.password,
|
||
sql=self.sql,
|
||
charset=self.charset,
|
||
expectation_logic=self.expectation_logic,
|
||
preprocessor_script=self.preprocessor_script,
|
||
postprocessor_script=self.postprocessor_script,
|
||
expectations=self.expectations,
|
||
)
|
||
return copy_case
|
||
|
||
def _update_expectations(self, expectations):
|
||
self.expectations = []
|
||
if expectations is not None:
|
||
for _expectation in expectations:
|
||
if hasattr(_expectation, 'get'):
|
||
expectation = SQLCaseExpectation(
|
||
test_field=_expectation.get('test_field'),
|
||
value=_expectation.get('value'),
|
||
matching_rule=_expectation.get('matching_rule'),
|
||
last_result=_expectation.get('last_result'),
|
||
last_failure_msg=_expectation.get('last_failure_msg'),
|
||
negater=_expectation.get('negater'),
|
||
)
|
||
else: # copy_from_self func exec this branch
|
||
expectation = SQLCaseExpectation(
|
||
test_field=_expectation.test_field,
|
||
value=_expectation.value,
|
||
matching_rule=_expectation.matching_rule,
|
||
last_result=_expectation.last_result,
|
||
last_failure_msg=_expectation.last_failure_msg,
|
||
negater=_expectation.negater,
|
||
)
|
||
self.expectations.append(expectation)
|
||
|
||
def update_postprocessor_failure(self, postprocessor_failure, postprocessor_failure_message):
|
||
"""更新后处理脚本结果和错误信息"""
|
||
self.postprocessor_failure = postprocessor_failure
|
||
self.postprocessor_failure_message = postprocessor_failure_message
|
||
db.session.commit()
|
||
|
||
|
||
class SQLCaseExpectation(db.Model):
|
||
"""
|
||
字段说明
|
||
test_field: 测试字段
|
||
value: 期望值
|
||
matching_rule: 匹配模式
|
||
last_result: 上次断言结果
|
||
last_failure_msg: 上次断言信息
|
||
negater: 断言结果取反
|
||
关系说明
|
||
sql_case: 期望所属案例 SQLCase:SQLCaseExpectation is 1:N
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['test_field', 'value', 'matching_rule', 'last_result', 'last_failure_msg', 'negater']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
last_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
last_failure_msg = db.Column(db.Text, nullable=False, default='初次执行前默认失败', server_default='初次执行前默认失败')
|
||
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
|
||
# relationship
|
||
sql_case_id = db.Column(db.Integer, db.ForeignKey('sql_case.id'))
|
||
sql_case = db.relationship('SQLCase', back_populates='expectations')
|
||
|
||
last_failure_msg_max_length = 1000
|
||
|
||
# 解析处理
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
def update_assert_result(self, last_result, last_failure_msg):
|
||
"""更新断言结果"""
|
||
self.last_result = last_result
|
||
if isinstance(last_failure_msg, str):
|
||
if len(last_failure_msg) > __class__.last_failure_msg_max_length:
|
||
self.last_failure_msg = last_failure_msg[:__class__.last_failure_msg_max_length] + '... (内容过长已截断)'
|
||
else:
|
||
self.last_failure_msg = last_failure_msg
|
||
else:
|
||
self.last_failure_msg = ''
|
||
db.session.commit()
|
||
|
||
|
||
class DebugCase(db.Model):
|
||
"""
|
||
字段说明
|
||
project_variable: 是否展示项目变量
|
||
expectation_logic: 期望结果逻辑处理(与 或)
|
||
preprocessor_script: 预处理脚本
|
||
postprocessor_script: 后处理脚本
|
||
postprocessor_failure: 后处理断言失败
|
||
postprocessor_failure_message: 后处理断言失败信息
|
||
关系说明
|
||
case: 案例组件 Case:DebugCase is 1:1
|
||
"""
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
project_variable = db.Column(db.Boolean, nullable=False, default=False, server_default='1')
|
||
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND, server_default=EXPECTATION_LOGIC.AND)
|
||
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_failure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
case_id = db.Column(db.Integer, db.ForeignKey('case.id'))
|
||
case = db.relationship('Case')
|
||
expectations = db.relationship('DebugCaseExpectation', back_populates='debug_case', cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def preprocessor_script_(self):
|
||
return _p(self.preprocessor_script)
|
||
|
||
@property
|
||
def postprocessor_script_(self):
|
||
return _p(self.postprocessor_script)
|
||
|
||
@classmethod
|
||
def add(cls, name, description, scene_id, project_variable, expectation_logic, preprocessor_script,
|
||
postprocessor_script, status=STATUS.NORMAL, expectations=None):
|
||
"""
|
||
新增一条案例
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param status: 案例状态(删除 正常)
|
||
:param scene_id: 所属的场景编号scene_id
|
||
:param project_variable: 是否展示变量
|
||
:param expectation_logic: 期望结果逻辑处理
|
||
:param preprocessor_script: 预处理脚本
|
||
:param postprocessor_script: 后处理脚本
|
||
:param expectations: 期望数据
|
||
"""
|
||
case = Case(
|
||
name=name,
|
||
description=description,
|
||
status=status,
|
||
scene_id=scene_id,
|
||
case_type=CASE_TYPE.DEBUG,
|
||
)
|
||
debug_case = cls(
|
||
project_variable=project_variable,
|
||
expectation_logic=expectation_logic,
|
||
preprocessor_script=preprocessor_script,
|
||
postprocessor_script=postprocessor_script,
|
||
)
|
||
debug_case.case = case
|
||
debug_case._update_expectations(expectations)
|
||
db.session.add(case)
|
||
db.session.add(debug_case)
|
||
db.session.commit()
|
||
return debug_case
|
||
|
||
@classmethod
|
||
def update(cls, id, name, description, project_variable, expectation_logic, preprocessor_script,
|
||
postprocessor_script, expectations=None):
|
||
"""
|
||
更新一条案例
|
||
:param id: 案例编号
|
||
:param name: 案例名
|
||
:param description: 案例注释
|
||
:param project_variable: 是否展示项目变量
|
||
:param expectation_logic: 期望结果逻辑处理
|
||
:param preprocessor_script: 预处理脚本
|
||
:param postprocessor_script: 后处理脚本
|
||
:param expectations: 期望数据
|
||
"""
|
||
case = Case.query.filter_by(id=int(id)).first()
|
||
debug_case = case.specific_case
|
||
if case:
|
||
case.name = name
|
||
case.description = description
|
||
if debug_case:
|
||
debug_case.project_variable = project_variable
|
||
debug_case.expectation_logic = expectation_logic
|
||
debug_case.preprocessor_script = preprocessor_script
|
||
debug_case.postprocessor_script = postprocessor_script
|
||
debug_case._update_expectations(expectations)
|
||
db.session.commit()
|
||
|
||
def copy_from_self(self, scene_id):
|
||
"""
|
||
复制一个新的案例
|
||
:param scene_id: 新case所属的scene id,如果是None则新案例还在当前测试场景中
|
||
:return:
|
||
"""
|
||
copy_case = self.add(
|
||
# Case
|
||
name='Copy' + self.case.name,
|
||
description=self.case.description,
|
||
status=self.case.status,
|
||
scene_id=self.case.scene_id if scene_id is None else scene_id, # if scene_id is None 表示
|
||
# DebugCase
|
||
project_variable=self.project_variable,
|
||
expectation_logic=self.expectation_logic,
|
||
preprocessor_script=self.preprocessor_script,
|
||
postprocessor_script=self.postprocessor_script,
|
||
expectations=self.expectations,
|
||
)
|
||
return copy_case
|
||
|
||
def _update_expectations(self, expectations):
|
||
self.expectations = []
|
||
if expectations is not None:
|
||
for _expectation in expectations:
|
||
if hasattr(_expectation, 'get'):
|
||
expectation = DebugCaseExpectation(
|
||
test_field=_expectation.get('test_field'),
|
||
value=_expectation.get('value'),
|
||
matching_rule=_expectation.get('matching_rule'),
|
||
last_result=_expectation.get('last_result'),
|
||
last_failure_msg=_expectation.get('last_failure_msg'),
|
||
negater=_expectation.get('negater'),
|
||
)
|
||
else: # copy_from_self func exec this branch
|
||
expectation = DebugCaseExpectation(
|
||
test_field=_expectation.test_field,
|
||
value=_expectation.value,
|
||
matching_rule=_expectation.matching_rule,
|
||
last_result=_expectation.last_result,
|
||
last_failure_msg=_expectation.last_failure_msg,
|
||
negater=_expectation.negater,
|
||
)
|
||
self.expectations.append(expectation)
|
||
|
||
def update_postprocessor_failure(self, postprocessor_failure, postprocessor_failure_message):
|
||
"""更新后处理脚本结果和错误信息"""
|
||
self.postprocessor_failure = postprocessor_failure
|
||
self.postprocessor_failure_message = postprocessor_failure_message
|
||
db.session.commit()
|
||
|
||
|
||
class DebugCaseExpectation(db.Model):
|
||
"""
|
||
字段说明
|
||
test_field: 测试字段
|
||
value: 期望值
|
||
matching_rule: 匹配模式
|
||
last_result: 上次断言结果
|
||
last_failure_msg: 上次断言信息
|
||
negater: 断言结果取反
|
||
关系说明
|
||
debug_case: 期望所属案例 DebugCase:DebugCaseExpectation is 1:N
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['test_field', 'value', 'matching_rule', 'last_result', 'last_failure_msg', 'negater']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
last_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
last_failure_msg = db.Column(db.Text, nullable=False, default='初次执行前默认失败', server_default='初次执行前默认失败')
|
||
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
|
||
# relationship
|
||
debug_case_id = db.Column(db.Integer, db.ForeignKey('debug_case.id'))
|
||
debug_case = db.relationship('DebugCase', back_populates='expectations')
|
||
|
||
last_failure_msg_max_length = 1000
|
||
|
||
# 解析处理
|
||
@property
|
||
def value_(self):
|
||
return _p(self.value)
|
||
|
||
def update_assert_result(self, last_result, last_failure_msg):
|
||
"""更新断言结果"""
|
||
self.last_result = last_result
|
||
if isinstance(last_failure_msg, str):
|
||
if len(last_failure_msg) > __class__.last_failure_msg_max_length:
|
||
self.last_failure_msg = last_failure_msg[:__class__.last_failure_msg_max_length] + '... (内容过长已截断)'
|
||
else:
|
||
self.last_failure_msg = last_failure_msg
|
||
else:
|
||
self.last_failure_msg = ''
|
||
db.session.commit()
|
||
|
||
|
||
class Dispatcher(db.Model):
|
||
"""
|
||
记录项目或模块运行测试时相关信息
|
||
字段说明
|
||
element_type: 元素类型 (项目 or 模块)
|
||
element_id: 元素id
|
||
status: 状态
|
||
progress: 进度
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
log: 调度/构建 日志
|
||
user_id: 启动调度者
|
||
关系说明
|
||
DispatcherDetail: 调度中所有执行元素 Dispatcher:DispatcherDetail is 1:N
|
||
Report: 调度结果对应报告 Dispatcher:Report is 1:1
|
||
"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
element_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
status = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
progress = db.Column(db.Float(), nullable=False, default=0.0, server_default=text('0.0')) # TODO 控制精度?
|
||
start_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
|
||
end_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
|
||
log = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
user_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
|
||
# relationship
|
||
details = db.relationship('DispatcherDetail', back_populates='dispatcher', cascade='all, delete-orphan')
|
||
report = db.relationship('Report', uselist=False, back_populates='dispatcher', cascade='all')
|
||
|
||
end_type = '' # 调度结束类型 DISPATCHER_END_TYPE, 暂时不需要将其作为表字段使用,而是作为实例属性使用便于在调度过程中标识
|
||
stop_on_error = True # 调度执行发生错误时终止执行,与项目配置ProjectAdvancedConfiguration.stop_on_error一致
|
||
|
||
@classmethod
|
||
def add(cls, element_type, element_id):
|
||
# 定时构建会在服务器启动后第一个请求前app.before_first_request注册
|
||
# 而此时如果用户未登录则请求上下文中无法获取正确的用户id即current_user.id
|
||
# 因此这里需要做下判断
|
||
if current_user.is_authenticated:
|
||
dispatcher = cls(element_type=element_type, element_id=element_id, user_id=current_user.id)
|
||
else:
|
||
dispatcher = cls(element_type=element_type, element_id=element_id)
|
||
db.session.add(dispatcher)
|
||
db.session.commit()
|
||
return dispatcher
|
||
|
||
def update_status(self, status):
|
||
if status in [DISPATCHER_STATUS.STOPPED]: # 只在STOPPED时更新结束日期,FINISHED时需手动更新日期
|
||
self.update_end_time()
|
||
self.status = status
|
||
db.session.commit()
|
||
|
||
def update_end_time(self):
|
||
self.end_time = datetime.now()
|
||
db.session.commit()
|
||
|
||
|
||
class DispatcherDetail(db.Model):
|
||
"""
|
||
记录调度中每个元素执行
|
||
字段说明
|
||
element_type: 元素类型 (项目、模块、场景、案例)
|
||
element_id: 元素id
|
||
element_name: 元素名称
|
||
parent_dispatcher_detail_id: 调度节点所属的父调度节点
|
||
parent_element_id: 所属父元素id
|
||
parent_element_type: 所属父元素类型
|
||
start_time: 开始时间
|
||
关系说明
|
||
dispatcher: 所属调度 Dispatcher:DispatcherDetail is 1:N
|
||
report_data: 单挑调度执行对应的报告数据 DispatcherDetail:ReportData is 1:1
|
||
"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
element_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
element_name = db.Column(db.String(512), nullable=False, default='', server_default='') # case.name大小为512
|
||
parent_dispatcher_detail_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
parent_element_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
parent_element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
start_time = db.Column(db.DateTime, nullable=False, default=datetime.now, server_default=text('CURRENT_TIMESTAMP'))
|
||
|
||
# relationship
|
||
dispatcher_id = db.Column(db.Integer, db.ForeignKey('dispatcher.id'))
|
||
dispatcher = db.relationship('Dispatcher', back_populates='details')
|
||
|
||
@property
|
||
def specific_element_report_data(self):
|
||
if self.element_type == ELEMENT_TYPE.CASE:
|
||
return ReportCaseData.query.filter_by(dispatcher_detail_id=self.id).first()
|
||
elif self.element_type == ELEMENT_TYPE.TOOL:
|
||
return ReportToolData.query.filter_by(dispatcher_detail_id=self.id).first()
|
||
|
||
@classmethod
|
||
def add(cls, element_type, element_id, element_name, dispatcher, parent_dispatcher_detail_id=None):
|
||
"""
|
||
添加一条调度子数据
|
||
:param element_type: 元素类型 (项目、模块、场景、案例、逻辑控制器)
|
||
:param element_id: 元素id
|
||
:param element_name: 元素名称
|
||
:param dispatcher: 调度对象
|
||
:param parent_dispatcher_detail_id: 当前调度子节点所属的父调度子节点
|
||
:return:
|
||
"""
|
||
parent_element_id = None
|
||
parent_element_type = None
|
||
if element_id == dispatcher.element_id and element_type == dispatcher.element_type:
|
||
# 表示当前element_id为调度触发元素,parent字段为缺省值
|
||
dispatcher_detail = cls(element_type=element_type, element_id=element_id, element_name=element_name,
|
||
dispatcher_id=dispatcher.id, parent_element_id=-1, parent_element_type='',
|
||
parent_dispatcher_detail_id=-1)
|
||
else:
|
||
if element_type == ELEMENT_TYPE.MODULE:
|
||
parent_element_type = ELEMENT_TYPE.PROJECT
|
||
parent_element_id = Module.query.filter_by(id=int(element_id)).first().project_id
|
||
parent_dispatcher_detail_id = DispatcherDetail.query.filter_by(
|
||
dispatcher_id=dispatcher.id,
|
||
element_id=parent_element_id,
|
||
element_type=parent_element_type,
|
||
).first().id
|
||
elif element_type == ELEMENT_TYPE.SCENE: # 将scene的element_type改为logic_controller
|
||
parent_element_type = ELEMENT_TYPE.MODULE
|
||
parent_element_id = Scene.query.filter_by(id=int(element_id)).first().module_id
|
||
# 将scene改为logic_controller记录在表中(传进来参数element_type是SCENE, 但存到表里面需要转换为scene_controller的逻辑控制器对象数据)
|
||
element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
|
||
scene = Scene.query.filter_by(id=element_id).first()
|
||
element_id = scene.scene_controller.logic_controller.id
|
||
parent_dispatcher_detail_id = DispatcherDetail.query.filter_by(
|
||
dispatcher_id=dispatcher.id,
|
||
element_id=parent_element_id,
|
||
element_type=parent_element_type,
|
||
).first().id
|
||
elif element_type == ELEMENT_TYPE.CASE:
|
||
# parent_element_type = ELEMENT_TYPE.SCENE
|
||
parent_element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
|
||
# parent_element_id = Case.query.filter_by(id=int(element_id)).first().scene_id
|
||
parent_element_id = SubElementInLogicController.query.filter_by(
|
||
element_type=ELEMENT_TYPE.CASE,
|
||
element_id=element_id
|
||
).first().logic_controller_id
|
||
elif element_type == ELEMENT_TYPE.LOGIC_CONTROLLER:
|
||
parent_element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
|
||
parent_element_id = SubElementInLogicController.query.filter_by(
|
||
element_type=ELEMENT_TYPE.LOGIC_CONTROLLER,
|
||
element_id=element_id
|
||
).first().logic_controller_id
|
||
elif element_type == ELEMENT_TYPE.TOOL:
|
||
parent_element_type = ELEMENT_TYPE.LOGIC_CONTROLLER
|
||
parent_element_id = SubElementInLogicController.query.filter_by(
|
||
element_type=ELEMENT_TYPE.TOOL,
|
||
element_id=element_id
|
||
).first().logic_controller_id
|
||
# parent_dispatcher_detail_id 当element_type=CASE或LOGIC_CONTROLLER,会通过参数传入
|
||
dispatcher_detail = cls(element_type=element_type, element_id=element_id, element_name=element_name,
|
||
dispatcher_id=dispatcher.id, parent_dispatcher_detail_id=parent_dispatcher_detail_id,
|
||
parent_element_type=parent_element_type, parent_element_id=parent_element_id)
|
||
db.session.add(dispatcher_detail)
|
||
db.session.commit()
|
||
return dispatcher_detail
|
||
|
||
|
||
class Report(db.Model):
|
||
"""
|
||
调度报告
|
||
字段说明
|
||
name: 报告名
|
||
result: 报告结果 REPORT_RESULT
|
||
关系说明
|
||
dispatcher: 报告对应的调度 Dispatcher:Report is 1:N
|
||
report_datas: 报告中详细数据 Report:ReportData is 1:N
|
||
"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
result = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
status = db.Column(db.String(64), nullable=False, default=STATUS.NORMAL, server_default=STATUS.NORMAL)
|
||
|
||
# relationship
|
||
dispatcher_id = db.Column(db.Integer, db.ForeignKey('dispatcher.id'))
|
||
dispatcher = db.relationship('Dispatcher', back_populates='report')
|
||
report_case_data = db.relationship('ReportCaseData', back_populates='report', cascade='all, delete-orphan')
|
||
|
||
@classmethod
|
||
def add(cls, dispatcher_id, name, result=False):
|
||
report = cls(name=name, result=result, dispatcher_id=dispatcher_id)
|
||
db.session.add(report)
|
||
db.session.commit()
|
||
return report
|
||
|
||
@classmethod
|
||
def update(cls, id, **kwargs):
|
||
report = cls.query.filter_by(id=int(id)).first()
|
||
if report is not None:
|
||
report.status = STATUS.DELETED
|
||
status = kwargs.get('status')
|
||
if status:
|
||
report.status = status
|
||
db.session.commit()
|
||
|
||
def update_result(self, result):
|
||
self.result = result
|
||
db.session.commit()
|
||
|
||
|
||
class ReportCaseData(db.Model):
|
||
"""
|
||
案例报告执行数据
|
||
字段说明
|
||
case_type: 案例类型
|
||
case_id: 案例编号
|
||
module_id: 案例所属模块id
|
||
module_name: 案例所属模块名称
|
||
request_header: 请求头
|
||
request_body: 请求体
|
||
response_header: 应答头
|
||
response_body: 应答体
|
||
preprocessor_script: 预处理脚本
|
||
postprocessor_script: 后处理脚本
|
||
postprocessor_result: 后处理脚本结果
|
||
postprocessor_failure_message: 后处理脚本错误信息
|
||
log: 执行日志
|
||
expectation_logic: 期望结果逻辑处理
|
||
elapsed_time: 耗时
|
||
result: 执行结果 REPORT_RESULT
|
||
关系说明
|
||
report: 执行数据所属的报告 Report:ReportData is 1:N
|
||
report_case_expectation_data: 执行数据包含的期望结果数据 ReportData:ReportExpectationData is 1:N
|
||
dispatcher_detail: 报告数据对应单条调度数据 DispatcherDetail:ReportData is 1:1
|
||
"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
case_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
case_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
module_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
module_name = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
request_header = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
request_body = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
response_header = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
response_body = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
preprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
postprocessor_result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
postprocessor_failure_message = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
log = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
expectation_logic = db.Column(db.String(64), nullable=False, default=EXPECTATION_LOGIC.AND,
|
||
server_default=EXPECTATION_LOGIC.AND)
|
||
elapsed_time = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
result = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
report_id = db.Column(db.Integer, db.ForeignKey('report.id'))
|
||
dispatcher_detail_id = db.Column(db.Integer, db.ForeignKey('dispatcher_detail.id'))
|
||
report = db.relationship('Report', back_populates='report_case_data')
|
||
report_case_expectation_data = db.relationship('ReportCaseExpectationData', back_populates='report_case_data',
|
||
cascade='all, delete-orphan')
|
||
dispatcher_detail = db.relationship('DispatcherDetail')
|
||
|
||
@classmethod
|
||
def add(cls, report_id, dispatcher_detail_id, case_type, case_id, module_id, module_name, request_header,
|
||
request_body, response_header, response_body, elapsed_time, preprocessor_script, postprocessor_script,
|
||
postprocessor_result, postprocessor_failure_message, log, expectation_logic, result):
|
||
report_case_data = cls(case_type=case_type,
|
||
case_id=case_id,
|
||
module_id=module_id,
|
||
module_name=module_name,
|
||
request_header=request_header,
|
||
request_body=request_body,
|
||
response_header=response_header,
|
||
response_body=response_body,
|
||
preprocessor_script=preprocessor_script,
|
||
postprocessor_script=postprocessor_script,
|
||
postprocessor_result=postprocessor_result,
|
||
postprocessor_failure_message=postprocessor_failure_message,
|
||
log=log,
|
||
expectation_logic=expectation_logic,
|
||
result=result,
|
||
elapsed_time=elapsed_time,
|
||
report_id=report_id,
|
||
dispatcher_detail_id=dispatcher_detail_id)
|
||
db.session.add(report_case_data)
|
||
db.session.commit()
|
||
return report_case_data
|
||
|
||
|
||
class ReportCaseExpectationData(db.Model):
|
||
"""
|
||
报告记录案例期望结果数据
|
||
字段说明
|
||
test_field: 测试字段
|
||
value: 期望值
|
||
matching_rule: 匹配模式
|
||
negater: 断言结果取反
|
||
result: 断言结果
|
||
failure_msg: 断言信息
|
||
关系说明
|
||
report_data: 期望结果数据所属的执行数据 ReportData:ReportExpectationData is 1:N
|
||
"""
|
||
# 需要转换为json数据格式的字段
|
||
render_field_list = ['test_field', 'value', 'matching_rule', 'negater', 'result', 'failure_msg']
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
test_field = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
matching_rule = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
negater = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
result = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
failure_msg = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
report_case_data_id = db.Column(db.Integer, db.ForeignKey('report_case_data.id'))
|
||
report_case_data = db.relationship('ReportCaseData', back_populates='report_case_expectation_data')
|
||
|
||
@classmethod
|
||
def add(cls, report_case_data_id, test_field, value, matching_rule, negater, result, failure_msg):
|
||
report_case_expectation_data = cls(
|
||
report_case_data_id=report_case_data_id,
|
||
test_field=test_field,
|
||
value=value,
|
||
matching_rule=matching_rule,
|
||
negater=negater,
|
||
result=result,
|
||
failure_msg=failure_msg,
|
||
)
|
||
db.session.add(report_case_expectation_data)
|
||
db.session.commit()
|
||
return report_case_expectation_data
|
||
|
||
|
||
class ReportToolData(db.Model):
|
||
"""
|
||
工具报告执行数据
|
||
字段说明
|
||
tool_type: 工具类型
|
||
tool_id: 工具编号
|
||
关系说明
|
||
report: 执行数据所属的报告 Report:ReportToolData is 1:N
|
||
dispatcher_detail: 报告数据对应单条调度数据 DispatcherDetail:ReportToolData is 1:1
|
||
"""
|
||
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
tool_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
tool_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
report_id = db.Column(db.Integer, db.ForeignKey('report.id'))
|
||
dispatcher_detail_id = db.Column(db.Integer, db.ForeignKey('dispatcher_detail.id'))
|
||
report = db.relationship('Report')
|
||
dispatcher_detail = db.relationship('DispatcherDetail')
|
||
report_script_tool_data = db.relationship('ReportScriptToolData',
|
||
back_populates='report_tool_data',
|
||
uselist=False,
|
||
cascade='all, delete-orphan')
|
||
report_timer_tool_data = db.relationship('ReportTimerToolData',
|
||
back_populates='report_tool_data',
|
||
uselist=False,
|
||
cascade='all, delete-orphan')
|
||
report_variable_definition_tool_data = db.relationship('ReportVariableDefinitionToolData',
|
||
back_populates='report_tool_data',
|
||
uselist=False,
|
||
cascade='all, delete-orphan')
|
||
report_http_header_manager_tool_data = db.relationship('ReportHTTPHeaderManagerToolData',
|
||
back_populates='report_tool_data',
|
||
uselist=False,
|
||
cascade='all, delete-orphan')
|
||
report_http_cookie_manager_tool_data = db.relationship('ReportHTTPCookieManagerToolData',
|
||
back_populates='report_tool_data',
|
||
uselist=False,
|
||
cascade='all, delete-orphan')
|
||
|
||
@classmethod
|
||
def add(cls, tool, report_id, dispatcher_detail_id):
|
||
report_tool_data = cls(tool_id=tool.id, tool_type=tool.tool_type, report_id=report_id,
|
||
dispatcher_detail_id=dispatcher_detail_id)
|
||
db.session.add(report_tool_data)
|
||
db.session.commit()
|
||
if tool.tool_type == TOOL_TYPE.TIMER:
|
||
ReportTimerToolData.add(delay=tool.specific_tool.delay, report_tool_data_id=report_tool_data.id)
|
||
elif tool.tool_type == TOOL_TYPE.SCRIPT:
|
||
ReportScriptToolData.add(script=tool.specific_tool.script, report_tool_data_id=report_tool_data.id)
|
||
elif tool.tool_type == TOOL_TYPE.VARIABLE_DEFINITION:
|
||
ReportVariableDefinitionToolData.add(variables=tool.specific_tool.variables, report_tool_data_id=report_tool_data.id)
|
||
elif tool.tool_type == TOOL_TYPE.HTTP_HEADER_MANAGER:
|
||
ReportHTTPHeaderManagerToolData.add(variables=tool.specific_tool.variables,
|
||
report_tool_data_id=report_tool_data.id)
|
||
elif tool.tool_type == TOOL_TYPE.HTTP_COOKIE_MANAGER:
|
||
ReportHTTPCookieManagerToolData.add(attributes=tool.specific_tool.attributes,
|
||
report_tool_data_id=report_tool_data.id)
|
||
return report_tool_data
|
||
|
||
|
||
class ReportTimerToolData(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
delay = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
|
||
report_tool_data = db.relationship('ReportToolData', back_populates='report_timer_tool_data')
|
||
|
||
@classmethod
|
||
def add(cls, delay, report_tool_data_id):
|
||
db.session.add(cls(delay=delay, report_tool_data_id=report_tool_data_id))
|
||
db.session.commit()
|
||
|
||
|
||
class ReportScriptToolData(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
script = db.Column(db.TEXT, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
|
||
report_tool_data = db.relationship('ReportToolData', back_populates='report_script_tool_data')
|
||
|
||
@classmethod
|
||
def add(cls, script, report_tool_data_id):
|
||
db.session.add(cls(script=script, report_tool_data_id=report_tool_data_id))
|
||
db.session.commit()
|
||
|
||
|
||
class ReportVariableDefinitionToolData(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
|
||
# relationship
|
||
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
|
||
report_tool_data = db.relationship('ReportToolData', back_populates='report_variable_definition_tool_data')
|
||
report_variable_definition_tool_data_list = db.relationship('ReportVariableDefinitionToolListData',
|
||
back_populates='report_variable_definition_tool_data',
|
||
cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def variables(self):
|
||
return [{
|
||
'name': row.name,
|
||
'value': row.value,
|
||
'description': row.description,
|
||
} for row in self.report_variable_definition_tool_data_list]
|
||
|
||
@classmethod
|
||
def add(cls, variables, report_tool_data_id):
|
||
report_variable_definition_tool_data = ReportVariableDefinitionToolData(report_tool_data_id=report_tool_data_id)
|
||
db.session.add(report_variable_definition_tool_data)
|
||
db.session.commit() # commit后report_variable_definition_tool_data.id才有值,否则为None
|
||
for variable in variables:
|
||
db.session.add(ReportVariableDefinitionToolListData(
|
||
name=variable.get('name', ''),
|
||
value=variable.get('value', ''),
|
||
description=variable.get('description', ''),
|
||
report_variable_definition_tool_data_id=report_variable_definition_tool_data.id,
|
||
))
|
||
db.session.commit()
|
||
|
||
|
||
class ReportVariableDefinitionToolListData(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
value = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
description = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
report_variable_definition_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_variable_definition_tool_data.id'))
|
||
report_variable_definition_tool_data = db.relationship('ReportVariableDefinitionToolData',
|
||
back_populates='report_variable_definition_tool_data_list')
|
||
|
||
|
||
class ReportHTTPHeaderManagerToolData(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
|
||
# relationship
|
||
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
|
||
report_tool_data = db.relationship('ReportToolData', back_populates='report_http_header_manager_tool_data')
|
||
report_http_header_manager_tool_list_data = db.relationship('ReportHTTPHeaderManagerToolListData',
|
||
back_populates='report_http_header_manager_tool_data',
|
||
cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def variables(self):
|
||
return [{
|
||
'name': row.name,
|
||
'value': row.value,
|
||
'description': row.description,
|
||
} for row in self.report_http_header_manager_tool_list_data]
|
||
|
||
@classmethod
|
||
def add(cls, variables, report_tool_data_id):
|
||
report_http_header_manager_tool_data = ReportHTTPHeaderManagerToolData(report_tool_data_id=report_tool_data_id)
|
||
db.session.add(report_http_header_manager_tool_data)
|
||
db.session.commit() # commit后report_http_header_manager_tool_data.id才有值,否则为None
|
||
for variable in variables:
|
||
db.session.add(ReportHTTPHeaderManagerToolListData(
|
||
name=variable.get('name', ''),
|
||
value=variable.get('value', ''),
|
||
description=variable.get('description', ''),
|
||
report_http_header_manager_tool_data_id=report_http_header_manager_tool_data.id,
|
||
))
|
||
db.session.commit()
|
||
|
||
|
||
class ReportHTTPHeaderManagerToolListData(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
value = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
description = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
# relationship
|
||
report_http_header_manager_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_http_header_manager_tool_data.id'))
|
||
report_http_header_manager_tool_data = db.relationship('ReportHTTPHeaderManagerToolData',
|
||
back_populates='report_http_header_manager_tool_list_data')
|
||
|
||
|
||
class ReportHTTPCookieManagerToolData(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
|
||
# relationship
|
||
report_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_tool_data.id'))
|
||
report_tool_data = db.relationship('ReportToolData', back_populates='report_http_cookie_manager_tool_data')
|
||
report_http_cookie_manager_tool_list_data = db.relationship('ReportHTTPCookieManagerToolListData',
|
||
back_populates='report_http_cookie_manager_tool_data',
|
||
cascade='all, delete-orphan')
|
||
|
||
@property
|
||
def attributes(self):
|
||
return [{
|
||
'name': row.name,
|
||
'value': row.value,
|
||
'domain': row.domain,
|
||
'path': row.path,
|
||
'secure': row.secure,
|
||
} for row in self.report_http_cookie_manager_tool_list_data]
|
||
|
||
@classmethod
|
||
def add(cls, attributes, report_tool_data_id):
|
||
report_http_cookie_manager_tool_data = ReportHTTPCookieManagerToolData(report_tool_data_id=report_tool_data_id)
|
||
db.session.add(report_http_cookie_manager_tool_data)
|
||
db.session.commit() # commit后report_http_cookie_manager_tool_data.id才有值,否则为None
|
||
for attr in attributes:
|
||
db.session.add(ReportHTTPCookieManagerToolListData(
|
||
name=attr.get('name', ''),
|
||
value=attr.get('value', ''),
|
||
domain=attr.get('domain', ''),
|
||
path=attr.get('path', ''),
|
||
secure=attr.get('secure', False),
|
||
report_http_cookie_manager_tool_data_id=report_http_cookie_manager_tool_data.id,
|
||
))
|
||
db.session.commit()
|
||
|
||
|
||
class ReportHTTPCookieManagerToolListData(db.Model):
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
value = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
domain = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
path = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
secure = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
|
||
# relationship
|
||
report_http_cookie_manager_tool_data_id = db.Column(db.Integer, db.ForeignKey('report_http_cookie_manager_tool_data.id'))
|
||
report_http_cookie_manager_tool_data = db.relationship('ReportHTTPCookieManagerToolData',
|
||
back_populates='report_http_cookie_manager_tool_list_data')
|
||
|
||
|
||
class Scheduler(db.Model):
|
||
"""
|
||
调度定时任务
|
||
字段说明
|
||
element_type 元素类型
|
||
element_id 元素id
|
||
cron cron表达式
|
||
enable 是否启用调度任务
|
||
"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
element_type = db.Column(db.String(64), nullable=False, default='', server_default='')
|
||
element_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
cron = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
enable = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
|
||
@classmethod
|
||
def get_scheduler(cls, element_type, element_id):
|
||
return cls.query.filter_by(element_id=element_id, element_type=element_type).first()
|
||
|
||
@classmethod
|
||
def update(cls, element_type, element_id, enable, cron):
|
||
scheduler = cls.get_scheduler(element_type=element_type, element_id=element_id)
|
||
scheduler.cron = cron
|
||
scheduler.enable = enable
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def add(cls, element_type, element_id, enable, cron):
|
||
db.session.add(cls(element_type=element_type, element_id=element_id, enable=enable, cron=cron))
|
||
db.session.commit()
|
||
|
||
|
||
class DingTalkRobotSetting(db.Model):
|
||
"""
|
||
钉钉机器人配置
|
||
字段说明
|
||
enable 是否启用当前钉钉机器人
|
||
access_token token信息,创建钉钉机器人时自动生成
|
||
secret 钉钉机器人密钥(安全设置-加签)参考文档: https://ding-doc.dingtalk.com/document#/org-dev-guide/qf2nxq#topic-1914047
|
||
at_all 通知时是否@全体成员
|
||
at_mobiles 通知时@一个或多个成员(使用手机号,多个手机号使用逗号分隔)
|
||
"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
enable = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
access_token = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
secret = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
at_all = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
at_mobiles = db.Column(db.Text, nullable=False, default='', server_default='')
|
||
|
||
@classmethod
|
||
def get_project_ding_talk_robot_setting(cls, project_id):
|
||
"""获取该项目绑定的钉钉机器人,返回列表,可能有多个钉钉机器人配置"""
|
||
ding_talk_robot_settings = []
|
||
rows = DingTalkRobotSettingAssociationProject.query.filter_by(project_id=project_id).all()
|
||
if rows:
|
||
for row in rows:
|
||
ding_talk_robot_setting = cls.query.filter_by(id=row.setting_id).first()
|
||
if ding_talk_robot_setting:
|
||
ding_talk_robot_settings.append(ding_talk_robot_setting)
|
||
return ding_talk_robot_settings
|
||
|
||
@classmethod
|
||
def add(cls, enable, access_token, secret, at_all, at_mobiles, projects=None):
|
||
ding_talk_robot_setting = cls(enable=enable,
|
||
access_token=access_token,
|
||
secret=secret,
|
||
at_all=at_all,
|
||
at_mobiles=at_mobiles)
|
||
db.session.add(ding_talk_robot_setting)
|
||
db.session.commit()
|
||
DingTalkRobotSettingAssociationProject.add(setting_id=ding_talk_robot_setting.id, projects=projects)
|
||
return ding_talk_robot_setting
|
||
|
||
@classmethod
|
||
def modify(cls, id, enable, access_token, secret, at_all, at_mobiles, projects=None):
|
||
ding_talk_robot_setting = cls.query.filter_by(id=id).first()
|
||
if ding_talk_robot_setting:
|
||
ding_talk_robot_setting.enable = enable
|
||
ding_talk_robot_setting.access_token = access_token
|
||
ding_talk_robot_setting.secret = secret
|
||
ding_talk_robot_setting.at_all = at_all
|
||
ding_talk_robot_setting.at_mobiles = at_mobiles
|
||
db.session.commit()
|
||
DingTalkRobotSettingAssociationProject.delete(setting_id=id)
|
||
DingTalkRobotSettingAssociationProject.add(setting_id=id, projects=projects)
|
||
|
||
@classmethod
|
||
def delete(cls, id):
|
||
ding_talk_robot_setting = cls.query.filter_by(id=id).first()
|
||
if ding_talk_robot_setting:
|
||
db.session.delete(ding_talk_robot_setting)
|
||
db.session.commit()
|
||
DingTalkRobotSettingAssociationProject.delete(setting_id=ding_talk_robot_setting.id)
|
||
|
||
|
||
class DingTalkRobotSettingAssociationProject(db.Model):
|
||
"""钉钉机器人配置与项目关系表"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
setting_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
project_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
|
||
@classmethod
|
||
def add(cls, setting_id, projects):
|
||
for project_id in projects:
|
||
db.session.add(cls(setting_id=setting_id, project_id=project_id))
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def delete(cls, setting_id):
|
||
rows = cls.query.filter_by(setting_id=setting_id).all()
|
||
for row in rows:
|
||
db.session.delete(row)
|
||
db.session.commit()
|
||
|
||
|
||
class EmailReceiverSetting(db.Model):
|
||
"""
|
||
邮件收件人配置
|
||
字段说明
|
||
enable 是否启用当前收件人地址
|
||
name 收件人名称
|
||
address 邮件地址
|
||
"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
enable = db.Column(db.Boolean, nullable=False, default=False, server_default='0')
|
||
name = db.Column(db.String(128), nullable=False, default='', server_default='')
|
||
address = db.Column(db.String(512), nullable=False, default='', server_default='')
|
||
|
||
@classmethod
|
||
def get_project_email_receiver_setting(cls, project_id):
|
||
"""获取该项目需要通知的所有收件人"""
|
||
email_receiver_settings = []
|
||
rows = EmailReceiverSettingAssociationProject.query.filter_by(project_id=project_id).all()
|
||
if rows:
|
||
for row in rows:
|
||
email_receiver_setting = cls.query.filter_by(id=row.setting_id).first()
|
||
if email_receiver_setting:
|
||
email_receiver_settings.append(email_receiver_setting)
|
||
return email_receiver_settings
|
||
|
||
@classmethod
|
||
def add(cls, enable, name, address, projects=None):
|
||
email_receiver_setting = cls(enable=enable,
|
||
name=name,
|
||
address=address)
|
||
db.session.add(email_receiver_setting)
|
||
db.session.commit()
|
||
EmailReceiverSettingAssociationProject.add(setting_id=email_receiver_setting.id, projects=projects)
|
||
return email_receiver_setting
|
||
|
||
@classmethod
|
||
def modify(cls, id, enable, name, address, projects=None):
|
||
email_receiver_setting = cls.query.filter_by(id=id).first()
|
||
if email_receiver_setting:
|
||
email_receiver_setting.enable = enable
|
||
email_receiver_setting.name = name
|
||
email_receiver_setting.address = address
|
||
db.session.commit()
|
||
EmailReceiverSettingAssociationProject.delete(setting_id=id)
|
||
EmailReceiverSettingAssociationProject.add(setting_id=id, projects=projects)
|
||
|
||
@classmethod
|
||
def delete(cls, id):
|
||
email_receiver_setting = cls.query.filter_by(id=id).first()
|
||
if email_receiver_setting:
|
||
db.session.delete(email_receiver_setting)
|
||
db.session.commit()
|
||
EmailReceiverSettingAssociationProject.delete(setting_id=email_receiver_setting.id)
|
||
|
||
|
||
class EmailReceiverSettingAssociationProject(db.Model):
|
||
"""邮件收件人配置与项目关系表"""
|
||
# field
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
setting_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
project_id = db.Column(db.Integer, nullable=False, default=0, server_default=text('0'))
|
||
|
||
@classmethod
|
||
def add(cls, setting_id, projects):
|
||
for project_id in projects:
|
||
db.session.add(cls(setting_id=setting_id, project_id=project_id))
|
||
db.session.commit()
|
||
|
||
@classmethod
|
||
def delete(cls, setting_id):
|
||
rows = cls.query.filter_by(setting_id=setting_id).all()
|
||
for row in rows:
|
||
db.session.delete(row)
|
||
db.session.commit()
|