做了一些优化
This commit is contained in:
parent
28a6b1f13e
commit
2c860e7f9a
|
@ -1 +1 @@
|
|||
39031
|
||||
39154
|
|
@ -1 +1 @@
|
|||
39031
|
||||
39154
|
|
@ -1 +1 @@
|
|||
39029
|
||||
39152
|
|
@ -1 +1 @@
|
|||
39030
|
||||
39153
|
|
@ -1 +1 @@
|
|||
loginUserName_wanandroid_com=18800000001;token_pass_wanandroid_com=5d9b90bcb70640183e09d1e755ead823;JSESSIONID=04590DBEDD46BCF1DF37EEB15B97E5BE;loginUserName=18800000001;token_pass=5d9b90bcb70640183e09d1e755ead823;
|
||||
loginUserName_wanandroid_com=18800000001;token_pass_wanandroid_com=5d9b90bcb70640183e09d1e755ead823;JSESSIONID=90719A97D7C40EEB32807E62997AFD88;loginUserName=18800000001;token_pass=5d9b90bcb70640183e09d1e755ead823;
|
|
@ -1,10 +1,8 @@
|
|||
ProjectName:
|
||||
- 七月的框架
|
||||
-
|
||||
project_name: xxx项目名称
|
||||
|
||||
Env: 测试环境
|
||||
env: 测试环境
|
||||
# 测试人员名称,作用于自动生成代码的作者,以及发送企业微信、钉钉通知的测试负责人
|
||||
TesterName: 七月
|
||||
tester_name: 七月
|
||||
|
||||
# 域名1
|
||||
host: https://www.wanandroid.com
|
||||
|
@ -16,7 +14,7 @@ app_host:
|
|||
real_time_update_test_cases: False
|
||||
|
||||
# 报告通知类型:0: 不发送通知 1:钉钉 2:企业微信通知 3、邮箱通知 4、飞书通知
|
||||
NotificationType: 1
|
||||
notification_type: 1
|
||||
|
||||
# 收集失败的用例开关,整理成excel报告的形式,自动发送,目前只支持返送企业微信通知
|
||||
excel_report: False
|
||||
|
@ -26,12 +24,12 @@ excel_report: False
|
|||
# 我发现很多拉代码的小伙伴这里配置都没改,所有的通知都发到我这里来了哦~~麻烦看到这里的小伙伴自己改一下相关配置
|
||||
|
||||
# 钉钉相关配置
|
||||
DingTalk:
|
||||
ding_talk:
|
||||
webhook: https://oapi.dingtalk.com/robot/send?access_token=a59902a7e811f93ffe301d8326b07a2acc8aa2a864e7d61ee9fc076481ced2a6
|
||||
secret: SECdea6489dfcc3b9259da943c5ae38d3530696f2fa83ac72a9ee716e9511675b9b
|
||||
|
||||
# 数据库相关配置
|
||||
MySqlDB:
|
||||
mysql_db:
|
||||
# 数据库开关
|
||||
switch: False
|
||||
host:
|
||||
|
@ -43,7 +41,7 @@ MySqlDB:
|
|||
mirror_source: http://mirrors.aliyun.com/pypi/simple/
|
||||
|
||||
# 企业通知的相关配置
|
||||
WeChat:
|
||||
wechat:
|
||||
webhook: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=22748687-fa3b-4e48-a5d7-0502cef422b4
|
||||
|
||||
email:
|
||||
|
@ -55,7 +53,7 @@ email:
|
|||
send_list: 1603453211@qq.com
|
||||
|
||||
# 飞书通知
|
||||
FeiShuTalk:
|
||||
lark:
|
||||
webhook:
|
||||
|
||||
|
||||
|
|
15
run.py
15
run.py
|
@ -6,17 +6,14 @@ import os
|
|||
import traceback
|
||||
import pytest
|
||||
from utils.other_tools.models import NotificationType
|
||||
from common.setting import ConfigHandler
|
||||
from utils.other_tools.get_conf_data import project_name, get_excel_report_switch
|
||||
from utils.other_tools.allure_data.allure_report_data import AllureFileClean
|
||||
from utils.logging_tool.log_control import INFO
|
||||
from utils.other_tools.get_conf_data import get_notification_type
|
||||
from utils.notify.wechat_send import WeChatSend
|
||||
from utils.notify.ding_talk import DingTalkSendMsg
|
||||
from utils.notify.send_mail import SendEmail
|
||||
from utils.notify.lark import FeiShuTalkChatBot
|
||||
from utils.read_files_tools.clean_files import del_file
|
||||
from utils.other_tools.allure_data.error_case_excel import ErrorCaseExcel
|
||||
from utils import config
|
||||
|
||||
|
||||
def run():
|
||||
|
@ -31,11 +28,9 @@ def run():
|
|||
\\__,_| .__/|_/_/ \\_\\__,_|\\__\\___/|_|\\___||___/\\__|
|
||||
|_|
|
||||
开始执行{}项目...
|
||||
""".format(project_name)
|
||||
""".format(config.project_name)
|
||||
)
|
||||
|
||||
# del_file(ConfigHandler.cache_path)
|
||||
|
||||
# 判断现有的测试用例,如果未生成测试代码,则自动生成
|
||||
# TestCaseAutomaticGeneration().get_case_automatic()
|
||||
|
||||
|
@ -62,10 +57,10 @@ def run():
|
|||
NotificationType.FEI_SHU.value: FeiShuTalkChatBot(allure_data).post
|
||||
}
|
||||
|
||||
if get_notification_type() != NotificationType.DEFAULT.value:
|
||||
notification_mapping.get(get_notification_type())()
|
||||
if config.notification_type != NotificationType.DEFAULT.value:
|
||||
notification_mapping.get(config.notification_type)()
|
||||
|
||||
if get_excel_report_switch():
|
||||
if config.excel_report:
|
||||
ErrorCaseExcel().write_case()
|
||||
|
||||
# 程序运行之后,自动启动报告,如果不想启动报告,可注释这段代码
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
|
||||
from utils.other_tools.models import Config
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
from common.setting import ConfigHandler
|
||||
|
||||
_data = GetYamlData(ConfigHandler.config_path).get_yaml_data()
|
||||
config = Config(**_data)
|
|
@ -10,12 +10,12 @@ import json
|
|||
from typing import Text, Dict, Any, Union
|
||||
from jsonpath import jsonpath
|
||||
from utils.other_tools.models import AssertMethod
|
||||
from utils.other_tools.get_conf_data import sql_switch
|
||||
from utils.logging_tool.log_control import ERROR, WARNING
|
||||
from utils.read_files_tools.regular_control import cache_regular
|
||||
from utils.other_tools.models import load_module_functions
|
||||
from utils.assertion import assert_type
|
||||
from utils.other_tools.exceptions import JsonpathExtractionFailed, SqlNotFound, AssertTypeError
|
||||
from utils import config
|
||||
|
||||
|
||||
class Assert:
|
||||
|
@ -69,12 +69,12 @@ class Assert:
|
|||
:return:
|
||||
"""
|
||||
# 判断数据库为开关为关闭状态
|
||||
if sql_switch() is False:
|
||||
if config.mysql_db.switch is False:
|
||||
WARNING.logger.warning(
|
||||
"检测到数据库状态为关闭状态,程序已为您跳过此断言,断言值:%s", values
|
||||
)
|
||||
# 数据库开关为开启
|
||||
if sql_switch():
|
||||
if config.mysql_db.switch:
|
||||
# 走正常SQL断言逻辑
|
||||
if sql_data != {'sql': None}:
|
||||
res_sql_data = jsonpath(sql_data, assert_value)
|
||||
|
|
|
@ -11,11 +11,9 @@ import decimal
|
|||
from warnings import filterwarnings
|
||||
import pymysql
|
||||
from typing import List, Union, Text, Dict
|
||||
from common.setting import ConfigHandler
|
||||
from utils import config
|
||||
from utils.logging_tool.log_control import ERROR
|
||||
from utils.read_files_tools.regular_control import sql_regular
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
from utils.other_tools.get_conf_data import sql_switch
|
||||
from utils.read_files_tools.regular_control import cache_regular
|
||||
from utils.other_tools.exceptions import DataAcquisitionFailed, ValueTypeError
|
||||
|
||||
|
@ -25,19 +23,17 @@ filterwarnings("ignore", category=pymysql.Warning)
|
|||
|
||||
class MysqlDB:
|
||||
""" mysql 封装 """
|
||||
if sql_switch():
|
||||
if config.mysql_db.switch:
|
||||
|
||||
def __init__(self):
|
||||
self.config = GetYamlData(ConfigHandler.config_path)
|
||||
self.read_mysql_config = self.config.get_yaml_data()['MySqlDB']
|
||||
|
||||
try:
|
||||
# 建立数据库连接
|
||||
self.conn = pymysql.connect(
|
||||
host=self.read_mysql_config['host'],
|
||||
user=self.read_mysql_config['user'],
|
||||
password=self.read_mysql_config['password'],
|
||||
port=self.read_mysql_config['port']
|
||||
host=config.mysql_db.host,
|
||||
user=config.mysql_db.user,
|
||||
password=config.mysql_db.password,
|
||||
port=config.mysql_db.port
|
||||
)
|
||||
|
||||
# 使用 cursor 方法获取操作游标,得到一个可以执行sql语句,并且操作结果为字典返回的游标
|
||||
|
|
|
@ -12,11 +12,9 @@ import time
|
|||
import urllib.parse
|
||||
from typing import Any, Text
|
||||
from dingtalkchatbot.chatbot import DingtalkChatbot, FeedLink
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
from utils.other_tools.get_local_ip import get_host_ip
|
||||
from utils.other_tools.allure_data.allure_report_data import AllureFileClean, TestMetrics
|
||||
from utils.other_tools.get_conf_data import project_name, tester_name
|
||||
from common.setting import ConfigHandler
|
||||
from utils import config
|
||||
|
||||
|
||||
class DingTalkSendMsg:
|
||||
|
@ -25,10 +23,8 @@ class DingTalkSendMsg:
|
|||
self.metrics = metrics
|
||||
self.timeStamp = str(round(time.time() * 1000))
|
||||
self.sign = self.get_sign()
|
||||
self.dev_config = ConfigHandler()
|
||||
# 从yaml文件中获取钉钉配置信息
|
||||
self.get_ding_talk = GetYamlData(self.dev_config.config_path).get_yaml_data()['DingTalk']
|
||||
self.webhook = self.get_ding_talk["webhook"] + "×tamp=" + self.timeStamp + "&sign=" + self.sign
|
||||
self.webhook = config.ding_talk.webhook + "×tamp=" + self.timeStamp + "&sign=" + self.sign
|
||||
|
||||
# 获取 webhook地址
|
||||
self.xiao_ding = DingtalkChatbot(self.webhook)
|
||||
|
@ -38,10 +34,9 @@ class DingTalkSendMsg:
|
|||
根据时间戳 + "sign" 生成密钥
|
||||
:return:
|
||||
"""
|
||||
secret = GetYamlData(ConfigHandler().config_path).get_yaml_data()['DingTalk']['secret']
|
||||
string_to_sign = f'{self.timeStamp}\n{secret}'.encode('utf-8')
|
||||
string_to_sign = f'{self.timeStamp}\n{config.ding_talk.secret}'.encode('utf-8')
|
||||
hmac_code = hmac.new(
|
||||
secret.encode('utf-8'),
|
||||
config.ding_talk.secret.encode('utf-8'),
|
||||
string_to_sign,
|
||||
digestmod=hashlib.sha256).digest()
|
||||
|
||||
|
@ -133,10 +128,10 @@ class DingTalkSendMsg:
|
|||
is_at_all = False
|
||||
if self.metrics.failed + self.metrics.broken > 0:
|
||||
is_at_all = True
|
||||
text = f"#### {project_name}自动化通知 " \
|
||||
f"\n\n>Python脚本任务: {project_name}" \
|
||||
text = f"#### {config.project_name}自动化通知 " \
|
||||
f"\n\n>Python脚本任务: {config.project_name}" \
|
||||
f"\n\n>环境: TEST\n\n>" \
|
||||
f"执行人: {tester_name}" \
|
||||
f"执行人: {config.tester_name}" \
|
||||
f"\n\n>执行结果: {self.metrics.pass_rate}% " \
|
||||
f"\n\n>总用例数: {self.metrics.total} " \
|
||||
f"\n\n>成功用例数: {self.metrics.passed}" \
|
||||
|
|
|
@ -7,10 +7,8 @@ import time
|
|||
import datetime
|
||||
import requests
|
||||
import urllib3
|
||||
from common.setting import ConfigHandler
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
from utils.other_tools.allure_data.allure_report_data import TestMetrics
|
||||
|
||||
from utils import config
|
||||
|
||||
urllib3.disable_warnings()
|
||||
|
||||
|
@ -33,13 +31,6 @@ class FeiShuTalkChatBot:
|
|||
"""飞书机器人通知"""
|
||||
def __init__(self, metrics: TestMetrics):
|
||||
self.metrics = metrics
|
||||
self.dev_config = ConfigHandler()
|
||||
# 从yaml文件中获取钉钉配置信息
|
||||
self.name = GetYamlData(ConfigHandler.config_path).get_yaml_data()['ProjectName'][0]
|
||||
self.test_name = GetYamlData(ConfigHandler.config_path).get_yaml_data()['TesterName']
|
||||
self.host = GetYamlData(ConfigHandler.config_path).get_yaml_data()['Env']
|
||||
self.tester = GetYamlData(ConfigHandler.config_path).get_yaml_data()
|
||||
self.get_lark_talk = GetYamlData(self.dev_config.config_path).get_yaml_data()['FeiShuTalk']
|
||||
|
||||
def send_text(self, msg: str):
|
||||
"""
|
||||
|
@ -57,11 +48,6 @@ class FeiShuTalkChatBot:
|
|||
logging.debug('text类型:%s', data)
|
||||
return self.post()
|
||||
|
||||
def webhook(self):
|
||||
""" 获取 webhook """
|
||||
webhook = self.get_lark_talk["webhook"]
|
||||
return webhook
|
||||
|
||||
def post(self):
|
||||
"""
|
||||
发送消息(内容UTF-8编码)
|
||||
|
@ -94,7 +80,7 @@ class FeiShuTalkChatBot:
|
|||
},
|
||||
{
|
||||
"tag": "text",
|
||||
"text": f"{self.test_name}"
|
||||
"text": f"{config.tester_name}"
|
||||
}
|
||||
],
|
||||
[
|
||||
|
@ -104,7 +90,7 @@ class FeiShuTalkChatBot:
|
|||
},
|
||||
{
|
||||
"tag": "text",
|
||||
"text": f"{str(self.host)}"
|
||||
"text": f"{config.env}"
|
||||
}
|
||||
],
|
||||
[{
|
||||
|
@ -169,7 +155,7 @@ class FeiShuTalkChatBot:
|
|||
|
||||
post_data = json.dumps(rich_text)
|
||||
response = requests.post(
|
||||
self.webhook(),
|
||||
config.lark.webhook,
|
||||
headers=headers,
|
||||
data=post_data,
|
||||
verify=False
|
||||
|
@ -190,5 +176,5 @@ class FeiShuTalkChatBot:
|
|||
}
|
||||
}
|
||||
logging.error("消息发送失败,自动通知:%s", error_data)
|
||||
requests.post(self.webhook(), headers=headers, data=json.dumps(error_data))
|
||||
requests.post(config.lark.webhook, headers=headers, data=json.dumps(error_data))
|
||||
return result
|
||||
|
|
|
@ -8,22 +8,19 @@
|
|||
|
||||
import smtplib
|
||||
from email.mime.text import MIMEText
|
||||
from common.setting import ConfigHandler
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
from utils.other_tools.allure_data.allure_report_data import TestMetrics, AllureFileClean
|
||||
from utils import config
|
||||
|
||||
|
||||
class SendEmail:
|
||||
""" 发送邮箱 """
|
||||
def __init__(self, metrics: TestMetrics):
|
||||
self.metrics = metrics
|
||||
self.get_data = GetYamlData(ConfigHandler.config_path).get_yaml_data()['email']
|
||||
self.send_user = self.get_data['send_user'] # 发件人
|
||||
self.email_host = self.get_data['email_host'] # QQ 邮件 STAMP 服务器地址
|
||||
self.key = self.get_data['stamp_key'] # STAMP 授权码
|
||||
self.name = GetYamlData(ConfigHandler.config_path).get_yaml_data()['ProjectName'][0]
|
||||
self.allure_data = AllureFileClean()
|
||||
self.CaseDetail = self.allure_data.get_failed_cases_detail()
|
||||
|
||||
def send_mail(self, user_list: list, sub, content: str) -> None:
|
||||
@classmethod
|
||||
def send_mail(cls, user_list: list, sub, content: str) -> None:
|
||||
"""
|
||||
|
||||
@param user_list: 发件人邮箱
|
||||
|
@ -31,14 +28,14 @@ class SendEmail:
|
|||
@param content: 发送内容
|
||||
@return:
|
||||
"""
|
||||
user = "余少琪" + "<" + self.send_user + ">"
|
||||
user = "余少琪" + "<" + config.email.send_user + ">"
|
||||
message = MIMEText(content, _subtype='plain', _charset='utf-8')
|
||||
message['Subject'] = sub
|
||||
message['From'] = user
|
||||
message['To'] = ";".join(user_list)
|
||||
server = smtplib.SMTP()
|
||||
server.connect(self.email_host)
|
||||
server.login(self.send_user, self.key)
|
||||
server.connect(config.email.email_host)
|
||||
server.login(config.email.send_user, config.email.stamp_key)
|
||||
server.sendmail(user, user_list, message.as_string())
|
||||
server.close()
|
||||
|
||||
|
@ -48,10 +45,10 @@ class SendEmail:
|
|||
@param error_message: 报错信息
|
||||
@return:
|
||||
"""
|
||||
email = self.get_data['send_list']
|
||||
email = config.email.send_list
|
||||
user_list = email.split(',') # 多个邮箱发送,config文件中直接添加 '806029174@qq.com'
|
||||
|
||||
sub = self.name + "接口自动化执行异常通知"
|
||||
sub = config.project_name + "接口自动化执行异常通知"
|
||||
content = f"自动化测试执行完毕,程序中发现异常,请悉知。报错信息如下:\n{error_message}"
|
||||
self.send_mail(user_list, sub, content)
|
||||
|
||||
|
@ -60,10 +57,10 @@ class SendEmail:
|
|||
发送邮件
|
||||
:return:
|
||||
"""
|
||||
email = self.get_data["send_list"]
|
||||
email = config.email.send_list
|
||||
user_list = email.split(',') # 多个邮箱发送,yaml文件中直接添加 '806029174@qq.com'
|
||||
|
||||
sub = self.name + "接口自动化报告"
|
||||
sub = config.project_name + "接口自动化报告"
|
||||
content = f"""
|
||||
各位同事, 大家好:
|
||||
自动化用例执行完成,执行结果如下:
|
||||
|
@ -74,6 +71,7 @@ class SendEmail:
|
|||
跳过用例个数: {self.metrics.skipped} 个
|
||||
成 功 率: {self.metrics.pass_rate} %
|
||||
|
||||
{self.allure_data.get_failed_cases_detail()}
|
||||
|
||||
**********************************
|
||||
jenkins地址:https://121.xx.xx.47:8989/login
|
||||
|
|
|
@ -7,14 +7,12 @@
|
|||
"""
|
||||
|
||||
import requests
|
||||
from common.setting import ConfigHandler
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
from utils.logging_tool.log_control import ERROR
|
||||
from utils.other_tools.allure_data.allure_report_data import TestMetrics, AllureFileClean
|
||||
from utils.times_tool.time_control import now_time
|
||||
from utils.other_tools.get_local_ip import get_host_ip
|
||||
from utils.other_tools.get_conf_data import tester_name
|
||||
from utils.other_tools.exceptions import SendMessageError, ValueTypeError
|
||||
from utils import config
|
||||
|
||||
|
||||
class WeChatSend:
|
||||
|
@ -24,8 +22,6 @@ class WeChatSend:
|
|||
|
||||
def __init__(self, metrics: TestMetrics):
|
||||
self.metrics = metrics
|
||||
self.wechat_data = GetYamlData(ConfigHandler.config_path).get_yaml_data()['WeChat']
|
||||
self.curl = self.wechat_data['webhook']
|
||||
self.headers = {"Content-Type": "application/json"}
|
||||
|
||||
def send_text(self, content, mentioned_mobile_list=None):
|
||||
|
@ -43,7 +39,7 @@ class WeChatSend:
|
|||
if len(mentioned_mobile_list) >= 1:
|
||||
for i in mentioned_mobile_list:
|
||||
if isinstance(i, str):
|
||||
res = requests.post(url=self.curl, json=_data, headers=self.headers)
|
||||
res = requests.post(url=config.wechat.webhook, json=_data, headers=self.headers)
|
||||
if res.json()['errcode'] != 0:
|
||||
ERROR.logger.error(res.json())
|
||||
raise SendMessageError("企业微信「文本类型」消息发送失败")
|
||||
|
@ -60,7 +56,7 @@ class WeChatSend:
|
|||
:return:
|
||||
"""
|
||||
_data = {"msgtype": "markdown", "markdown": {"content": content}}
|
||||
res = requests.post(url=self.curl, json=_data, headers=self.headers)
|
||||
res = requests.post(url=config.wechat.webhook, json=_data, headers=self.headers)
|
||||
if res.json()['errcode'] != 0:
|
||||
ERROR.logger.error(res.json())
|
||||
raise SendMessageError("企业微信「MarkDown类型」消息发送失败")
|
||||
|
@ -69,7 +65,7 @@ class WeChatSend:
|
|||
"""
|
||||
先将文件上传到临时媒体库
|
||||
"""
|
||||
key = self.curl.split("key=")[1]
|
||||
key = config.wechat.webhook.split("key=")[1]
|
||||
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file"
|
||||
data = {"file": open(file, "rb")}
|
||||
res = requests.post(url, files=data).json()
|
||||
|
@ -82,16 +78,16 @@ class WeChatSend:
|
|||
"""
|
||||
|
||||
_data = {"msgtype": "file", "file": {"media_id": self._upload_file(file)}}
|
||||
res = requests.post(url=self.curl, json=_data, headers=self.headers)
|
||||
res = requests.post(url=config.wechat.webhook, json=_data, headers=self.headers)
|
||||
if res.json()['errcode'] != 0:
|
||||
ERROR.logger.error(res.json())
|
||||
raise SendMessageError("企业微信「file类型」消息发送失败")
|
||||
|
||||
def send_wechat_notification(self):
|
||||
""" 发送企业微信通知 """
|
||||
text = f"""【{0}自动化通知】
|
||||
text = f"""【{config.project_name}自动化通知】
|
||||
>测试环境:<font color=\"info\">TEST</font>
|
||||
>测试负责人:@{tester_name}
|
||||
>测试负责人:@{config.tester_name}
|
||||
>
|
||||
> **执行结果**
|
||||
><font color=\"info\">成 功 率 : {self.metrics.pass_rate}%</font>
|
||||
|
|
|
@ -69,7 +69,6 @@ class AllureFileClean:
|
|||
run_case_data["pass_rate"] = 0.0
|
||||
# 收集用例运行时长
|
||||
run_case_data['time'] = _time if run_case_data['total'] == 0 else round(_time['duration'] / 1000, 2)
|
||||
print(TestMetrics(**run_case_data))
|
||||
return TestMetrics(**run_case_data)
|
||||
except FileNotFoundError as exc:
|
||||
raise FileNotFoundError(
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
#!/usr/bin/python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
# @Time : 2022/5/10 18:55
|
||||
# @Author : 余少琪
|
||||
# @Email : 1603453211@qq.com
|
||||
# @File : get_conf_data
|
||||
# @describe:
|
||||
"""
|
||||
|
||||
from common.setting import ConfigHandler
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
conf = GetYamlData(ConfigHandler.config_path).get_yaml_data()
|
||||
|
||||
|
||||
def sql_switch():
|
||||
"""获取数据库开关"""
|
||||
switch = conf['MySqlDB']["switch"]
|
||||
return switch
|
||||
|
||||
|
||||
def get_notification_type():
|
||||
"""获取报告通知类型,是发送钉钉还是企业邮箱"""
|
||||
date = conf['NotificationType']
|
||||
return date
|
||||
|
||||
|
||||
def get_excel_report_switch():
|
||||
"""获取excel报告开关"""
|
||||
excel_report_switch = conf['excel_report']
|
||||
return excel_report_switch
|
||||
|
||||
|
||||
def get_mirror_url():
|
||||
"""获取镜像源"""
|
||||
mirror_url = conf['mirror_source']
|
||||
return mirror_url
|
||||
|
||||
|
||||
project_name = conf['ProjectName'][0]
|
||||
tester_name = conf['TesterName']
|
|
@ -144,6 +144,46 @@ class ResponseData(BaseModel):
|
|||
body: Union[Dict, None] = None
|
||||
|
||||
|
||||
class DingTalk(BaseModel):
|
||||
webhook: Union[Text, None]
|
||||
secret: Union[Text, None]
|
||||
|
||||
|
||||
class MySqlDB(BaseModel):
|
||||
switch: bool = False
|
||||
host: Union[Text, None] = None
|
||||
user: Union[Text, None] = None
|
||||
password: Union[Text, None] = None
|
||||
port: Union[int, None] = 3306
|
||||
|
||||
|
||||
class Webhook(BaseModel):
|
||||
webhook: Union[Text, None]
|
||||
|
||||
|
||||
class Email(BaseModel):
|
||||
send_user: Union[Text, None]
|
||||
email_host: Union[Text, None]
|
||||
stamp_key: Union[Text, None]
|
||||
# 收件人
|
||||
send_list: Union[Text, None]
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
project_name: Text
|
||||
env: Text
|
||||
tester_name: Text
|
||||
notification_type: int = 0
|
||||
excel_report: bool
|
||||
ding_talk: "DingTalk"
|
||||
mysql_db: "MySqlDB"
|
||||
mirror_source: Text
|
||||
wechat: "Webhook"
|
||||
email: "Email"
|
||||
lark: "Webhook"
|
||||
real_time_update_test_cases: bool = False
|
||||
|
||||
|
||||
@unique
|
||||
class AllureAttachmentType(Enum):
|
||||
"""
|
||||
|
|
|
@ -6,10 +6,10 @@
|
|||
"""
|
||||
|
||||
from typing import Union, Text, Dict
|
||||
from utils.other_tools.get_conf_data import sql_switch
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
from utils.other_tools.models import TestCase
|
||||
from utils.other_tools.exceptions import ValueNotFoundError
|
||||
from utils import config
|
||||
|
||||
|
||||
class CaseData:
|
||||
|
@ -331,7 +331,7 @@ class CaseData:
|
|||
try:
|
||||
_sql = case_data['sql']
|
||||
# 判断数据库开关为开启状态,并且sql不为空
|
||||
if sql_switch() and _sql is None:
|
||||
if config.mysql_db.switch and _sql is None:
|
||||
return None
|
||||
return case_data['sql']
|
||||
except KeyError as exc:
|
||||
|
|
|
@ -107,21 +107,21 @@ class Context:
|
|||
return host
|
||||
|
||||
@classmethod
|
||||
def merchant_host(cls) -> str:
|
||||
def app_host(cls) -> str:
|
||||
"""获取app的host"""
|
||||
from utils.read_files_tools.yaml_control import GetYamlData
|
||||
from common.setting import ConfigHandler
|
||||
|
||||
# 从配置文件conf.yaml 文件中获取到域名,然后使用正则替换
|
||||
host = GetYamlData(ConfigHandler.config_path) \
|
||||
.get_yaml_data()['merchant_host']
|
||||
.get_yaml_data()['app_host']
|
||||
return host
|
||||
|
||||
|
||||
def sql_json(js_path, res):
|
||||
""" 提取 sql中的 json 数据 """
|
||||
_json_data = jsonpath(res, js_path)[0]
|
||||
if _json_data is not False:
|
||||
if _json_data is False:
|
||||
raise ValueError(f"sql中的jsonpath获取失败 {res}, {js_path}")
|
||||
return jsonpath(res, js_path)[0]
|
||||
|
||||
|
|
|
@ -9,13 +9,13 @@ from jsonpath import jsonpath
|
|||
from utils.cache_process.cache_control import Cache
|
||||
from utils.requests_tool.request_control import RequestControl
|
||||
from utils.mysql_tool.mysql_control import SetUpMySQL
|
||||
from utils.other_tools.get_conf_data import sql_switch
|
||||
from utils.read_files_tools.regular_control import regular, cache_regular
|
||||
from utils.other_tools.jsonpath_date_replace import jsonpath_replace
|
||||
from utils.logging_tool.log_control import WARNING
|
||||
from utils.other_tools.models import DependentType
|
||||
from utils.other_tools.models import TestCase, DependentCaseData, DependentData
|
||||
from utils.other_tools.exceptions import ValueNotFoundError
|
||||
from utils import config
|
||||
|
||||
|
||||
class DependentCase:
|
||||
|
@ -111,7 +111,7 @@ class DependentCase:
|
|||
"""
|
||||
# 判断依赖数据类型,依赖 sql中的数据
|
||||
if setup_sql is not None:
|
||||
if sql_switch():
|
||||
if config.mysql_db.switch:
|
||||
setup_sql = ast.literal_eval(cache_regular(str(setup_sql)))
|
||||
sql_data = SetUpMySQL().setup_sql_data(sql=setup_sql)
|
||||
dependent_data = dependence_case_data.dependent_data
|
||||
|
|
|
@ -17,13 +17,12 @@ from common.setting import ConfigHandler
|
|||
from utils.other_tools.models import RequestType
|
||||
from utils.logging_tool.log_decorator import log_decorator
|
||||
from utils.mysql_tool.mysql_control import AssertExecution
|
||||
from utils.other_tools.get_conf_data import sql_switch
|
||||
from utils.logging_tool.run_time_decorator import execution_duration
|
||||
from utils.other_tools.allure_data.allure_tools import allure_step, allure_step_no, allure_attach
|
||||
from utils.read_files_tools.regular_control import cache_regular
|
||||
from utils.requests_tool.set_current_request_cache import SetCurrentRequestCache
|
||||
from utils.other_tools.models import TestCase, ResponseData
|
||||
|
||||
from utils import config
|
||||
# from utils.requests_tool.encryption_algorithm_control import encryption
|
||||
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
@ -315,7 +314,7 @@ class RequestControl:
|
|||
def _sql_data_handler(cls, sql_data, res):
|
||||
"""处理 sql 参数 """
|
||||
# 判断数据库开关,开启状态,则返回对应的数据
|
||||
if sql_switch() and sql_data is not None:
|
||||
if config.mysql_db.switch and sql_data is not None:
|
||||
sql_data = AssertExecution().assert_execution(
|
||||
sql=sql_data,
|
||||
resp=res.json()
|
||||
|
@ -341,7 +340,7 @@ class RequestControl:
|
|||
data, yaml_data.requestType
|
||||
),
|
||||
"method": res.request.method,
|
||||
"sql_data": self._sql_data_handler(sql_data=yaml_data.sql, res=res),
|
||||
"sql_data": self._sql_data_handler(sql_data=ast.literal_eval(cache_regular(str(yaml_data.sql))), res=res),
|
||||
"yaml_data": yaml_data,
|
||||
"headers": res.request.headers,
|
||||
"cookie": res.cookies,
|
||||
|
|
|
@ -15,11 +15,11 @@ from utils.requests_tool.request_control import RequestControl
|
|||
from utils.read_files_tools.regular_control import cache_regular, sql_regular, regular
|
||||
from utils.other_tools.jsonpath_date_replace import jsonpath_replace
|
||||
from utils.mysql_tool.mysql_control import MysqlDB
|
||||
from utils.other_tools.get_conf_data import sql_switch
|
||||
from utils.logging_tool.log_control import WARNING
|
||||
from utils.cache_process.cache_control import Cache
|
||||
from utils.other_tools.models import ResponseData, TearDown, SendRequest, ParamPrepare
|
||||
from utils.other_tools.exceptions import JsonpathExtractionFailed, ValueNotFoundError
|
||||
from utils import config
|
||||
|
||||
|
||||
class TearDownHandler:
|
||||
|
@ -304,7 +304,7 @@ class TearDownHandler:
|
|||
_response_data = self._res.response_data
|
||||
if sql_data is not None:
|
||||
for i in sql_data:
|
||||
if sql_switch():
|
||||
if config.mysql_db.switch:
|
||||
_sql_data = sql_regular(value=i, res=json.loads(_response_data))
|
||||
MysqlDB().execute(_sql_data)
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue