517 lines
19 KiB
Python
517 lines
19 KiB
Python
from json import JSONDecodeError
|
||
import requests
|
||
import json
|
||
import logging
|
||
import time
|
||
import urllib3
|
||
import datetime
|
||
import os
|
||
import yaml.scanner
|
||
import argparse
|
||
|
||
urllib3.disable_warnings()
|
||
|
||
try:
|
||
JSONDecodeError = json.decoder.JSONDecodeError
|
||
except AttributeError:
|
||
JSONDecodeError = ValueError
|
||
|
||
|
||
class ConfigHandler:
|
||
# 项目路径
|
||
root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
config_path = os.path.join(root_path, 'config', 'conf.yaml')
|
||
|
||
report_path = os.path.join(root_path, 'report')
|
||
|
||
|
||
class AllureFileClean:
|
||
"""allure 报告数据清洗,提取业务需要得数据"""
|
||
|
||
def __init__(self):
|
||
pass
|
||
|
||
@classmethod
|
||
def _get_al_files(cls) -> list:
|
||
""" 获取所有 test-case 中的 json 文件 """
|
||
filename = []
|
||
# 获取所有文件下的子文件名称
|
||
for root, dirs, files in os.walk(ConfigHandler.report_path + '/html/data/test-cases'):
|
||
for filePath in files:
|
||
path = os.path.join(root, filePath)
|
||
filename.append(path)
|
||
return filename
|
||
|
||
def get_test_cases(self):
|
||
""" 获取所有 allure 报告中执行用例的情况"""
|
||
# 将所有数据都收集到files中
|
||
files = []
|
||
for i in self._get_al_files():
|
||
with open(i, 'r', encoding='utf-8') as fp:
|
||
date = json.load(fp)
|
||
files.append(date)
|
||
return files
|
||
|
||
def get_failed_case(self):
|
||
""" 获取到所有失败的用例标题和用例代码路径"""
|
||
error_cases = []
|
||
for i in self.get_test_cases():
|
||
if i['status'] == 'failed' or i['status'] == 'broken':
|
||
error_cases.append((i['name'], i['fullName']))
|
||
return error_cases
|
||
|
||
def get_failed_cases_detail(self):
|
||
""" 返回所有失败的测试用例相关内容 """
|
||
date = self.get_failed_case()
|
||
# 判断有失败用例,则返回内容
|
||
if len(date) >= 1:
|
||
values = "失败用例:\n"
|
||
values += " **********************************\n"
|
||
for i in date:
|
||
values += " " + i[0] + ":" + i[1] + "\n"
|
||
return values
|
||
else:
|
||
# 如果没有失败用例,则返回False
|
||
return ""
|
||
|
||
@classmethod
|
||
def get_case_count(cls):
|
||
""" 统计用例数量 """
|
||
file_name = ConfigHandler.report_path + '/html/history/history-trend.json'
|
||
with open(file_name, 'r', encoding='utf-8') as fp:
|
||
date = json.load(fp)[0]['data']
|
||
return date
|
||
|
||
|
||
class CaseCount:
|
||
def __init__(self):
|
||
self.AllureData = AllureFileClean()
|
||
|
||
def pass_count(self):
|
||
"""用例成功数"""
|
||
return self.AllureData.get_case_count()['passed']
|
||
|
||
def failed_count(self):
|
||
"""用例失败数"""
|
||
return self.AllureData.get_case_count()['failed']
|
||
|
||
def broken_count(self):
|
||
"""用例异常数"""
|
||
return self.AllureData.get_case_count()['broken']
|
||
|
||
def skipped_count(self):
|
||
"""用例跳过数"""
|
||
return self.AllureData.get_case_count()['skipped']
|
||
|
||
def total_count(self):
|
||
"""用例总数"""
|
||
return self.AllureData.get_case_count()['total']
|
||
|
||
def pass_rate(self):
|
||
"""用例成功率"""
|
||
# 四舍五入,保留2位小数
|
||
try:
|
||
pass_rate = round((self.pass_count() + self.skipped_count()) / self.total_count() * 100, 2)
|
||
return pass_rate
|
||
except ZeroDivisionError:
|
||
return 0.00
|
||
|
||
|
||
class GetYamlData:
|
||
|
||
def __init__(self, file_dir):
|
||
self.fileDir = file_dir
|
||
|
||
def get_yaml_data(self) -> dict:
|
||
"""
|
||
获取 yaml 中的数据
|
||
:param: fileDir:
|
||
:return:
|
||
"""
|
||
# 判断文件是否存在
|
||
if os.path.exists(self.fileDir):
|
||
data = open(self.fileDir, 'r', encoding='utf-8')
|
||
try:
|
||
res = yaml.load(data, Loader=yaml.FullLoader)
|
||
return res
|
||
except UnicodeDecodeError:
|
||
raise ValueError(f"yaml文件编码错误,文件路径:{self.fileDir}")
|
||
|
||
else:
|
||
raise FileNotFoundError("文件路径不存在")
|
||
|
||
def write_yaml_data(self, key: str, value) -> int:
|
||
"""
|
||
更改 yaml 文件中的值
|
||
:param key: 字典的key
|
||
:param value: 写入的值
|
||
:return:
|
||
"""
|
||
with open(self.fileDir, 'r', encoding='utf-8') as f:
|
||
# 创建了一个空列表,里面没有元素
|
||
lines = []
|
||
for line in f.readlines():
|
||
if line != '\n':
|
||
lines.append(line)
|
||
f.close()
|
||
|
||
with open(self.fileDir, 'w', encoding='utf-8') as f:
|
||
flag = 0
|
||
for line in lines:
|
||
left_str = line.split(":")[0]
|
||
if key == left_str and '#' not in line:
|
||
newline = "{0}: {1}".format(left_str, value)
|
||
line = newline
|
||
f.write('%s\n' % line)
|
||
flag = 1
|
||
else:
|
||
f.write('%s' % line)
|
||
f.close()
|
||
return flag
|
||
|
||
|
||
class Config:
|
||
''''
|
||
测试环境 : CI_ENVIRONMENT_SLUG
|
||
飞书通知: webhook
|
||
GL_JOB_ID :GL_JOB_ID
|
||
test_user : GITLAB_USER_NAME
|
||
'''
|
||
def __getattr__(self, attr):
|
||
return os.environ[attr]
|
||
|
||
|
||
def is_not_null_and_blank_str(content):
|
||
"""
|
||
非空字符串
|
||
:param content: 字符串
|
||
:return: 非空 - True,空 - False
|
||
"""
|
||
if content and content.strip():
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
class FeiShuTalkChatBot(object):
|
||
"""飞书机器人通知"""
|
||
def __init__(self):
|
||
self.job_id = str(Config().__getattr__("GL_JOB_ID"))
|
||
self.timeStamp = str(round(time.time() * 1000))
|
||
self.devConfig = ConfigHandler()
|
||
# 从yaml文件中获取钉钉配置信息
|
||
self.name = str(GetYamlData(ConfigHandler.config_path).get_yaml_data()['ProjectName'])
|
||
self.test_name = Config().__getattr__("GITLAB_USER_NAME")
|
||
try:
|
||
self.host = Config().__getattr__("DAV_ENVIRONMENT_SLUG")
|
||
except:
|
||
self.host = "默认分支环境" + Config().__getattr__("CI_ENVIRONMENT_SLUG")
|
||
self.tester = GetYamlData(ConfigHandler.config_path).get_yaml_data()
|
||
self.allure_data = CaseCount()
|
||
self.PASS = self.allure_data.pass_count()
|
||
self.FAILED = self.allure_data.failed_count()
|
||
self.BROKEN = self.allure_data.broken_count()
|
||
self.SKIP = self.allure_data.skipped_count()
|
||
self.TOTAL = self.allure_data.total_count()
|
||
self.RATE = self.allure_data.pass_rate()
|
||
self.ALL_CASE = self.PASS + self.FAILED + self.BROKEN
|
||
self.Except_case = self.BROKEN+self.FAILED
|
||
|
||
self.headers = {'Content-Type': 'application/json; charset=utf-8'}
|
||
self.devConfig = ConfigHandler()
|
||
# self.getFeiShuTalk = GetYamlData(self.devConfig.config_path).get_yaml_data()['FeiShuTalk']
|
||
|
||
|
||
def getwebhook(self):
|
||
try:
|
||
testing_evn = Config().__getattr__("DAV_ENVIRONMENT_SLUG")
|
||
if "-" in testing_evn:
|
||
list_evn = testing_evn.split("-")
|
||
cur_evn = list_evn[0]
|
||
if cur_evn == 'prod':
|
||
webhook = Config().__getattr__("webhook")
|
||
else:
|
||
webhook = Config().__getattr__("testwebhook")
|
||
else:
|
||
if testing_evn == 'prod':
|
||
webhook = Config().__getattr__("webhook")
|
||
else:
|
||
webhook = Config().__getattr__("testwebhook")
|
||
except:
|
||
cur_evn = Config().__getattr__("CI_ENVIRONMENT_SLUG")
|
||
if cur_evn == 'prod':
|
||
webhook = Config().__getattr__("webhook")
|
||
else:
|
||
webhook = Config().__getattr__("testwebhook")
|
||
return webhook
|
||
|
||
def send_text(self, msg: str):
|
||
"""
|
||
消息类型为text类型
|
||
:param msg: 消息内容
|
||
:return: 返回消息发送结果
|
||
"""
|
||
data = {"msg_type": "text", "at": {}}
|
||
if is_not_null_and_blank_str(msg): # 传入msg非空
|
||
data["content"] = {"text": msg}
|
||
else:
|
||
logging.error("text类型,消息内容不能为空!")
|
||
raise ValueError("text类型,消息内容不能为空!")
|
||
|
||
logging.debug('text类型:%s' % data)
|
||
return self.post()
|
||
|
||
def error_feishu(self, error_message):
|
||
"""
|
||
发送消息(内容UTF-8编码)
|
||
:return: 返回消息发送结果
|
||
"""
|
||
rich_text = {
|
||
"email": "fanlv@bytedance.com",
|
||
"msg_type": "post",
|
||
"content": {
|
||
"post": {
|
||
"zh_cn": {
|
||
"title": "【接口自动化执行异常通知】",
|
||
"content": [
|
||
|
||
[{
|
||
"tag": "text",
|
||
"text": "接口自动化执行异常,错误如下请关注 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{0}".format(error_message)
|
||
}], # 成功率
|
||
|
||
]
|
||
}
|
||
}
|
||
}
|
||
}
|
||
try:
|
||
post_data = json.dumps(rich_text)
|
||
response = requests.post(self.getwebhook(), headers=self.headers, data=post_data, verify=False)
|
||
except requests.exceptions.HTTPError as exc:
|
||
logging.error("消息发送失败, HTTP error: %d, reason: %s" % (exc.response.status_code, exc.response.reason))
|
||
raise
|
||
except requests.exceptions.ConnectionError:
|
||
logging.error("消息发送失败,HTTP connection error!")
|
||
raise
|
||
except requests.exceptions.Timeout:
|
||
logging.error("消息发送失败,Timeout error!")
|
||
raise
|
||
except requests.exceptions.RequestException:
|
||
logging.error("消息发送失败, Request Exception!")
|
||
raise
|
||
else:
|
||
try:
|
||
result = response.json()
|
||
except JSONDecodeError:
|
||
logging.error("服务器响应异常,状态码:%s,响应内容:%s" % (response.status_code, response.text))
|
||
return {'errcode': 500, 'errmsg': '服务器响应异常'}
|
||
else:
|
||
logging.debug('发送结果:%s' % result)
|
||
# 消息发送失败提醒(errcode 不为 0,表示消息发送异常),默认不提醒,开发者可以根据返回的消息发送结果自行判断和处理
|
||
if result.get('StatusCode') != 0:
|
||
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
|
||
error_data = {
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": "[注意-自动通知]飞书机器人消息发送失败,时间:%s,原因:%s,请及时跟进,谢谢!" % (
|
||
time_now, result['errmsg'] if result.get('errmsg', False) else '未知异常')
|
||
},
|
||
"at": {
|
||
"isAtAll": False
|
||
}
|
||
}
|
||
logging.error("消息发送失败,自动通知:%s" % error_data)
|
||
requests.post(self.getwebhook(), headers=self.headers, data=json.dumps(error_data))
|
||
return result
|
||
|
||
# 判断 如果错误与异常相加大于1 @all,如果没有@寂寞
|
||
def exce_case(self):
|
||
if self.Except_case >= 1:
|
||
return 'all'
|
||
else:
|
||
# userId c1c916dg
|
||
return '郭林莉'
|
||
|
||
|
||
|
||
|
||
def get_commit(self):
|
||
com = ''
|
||
try:
|
||
if Config().__getattr__("DAV_COMMIT_ID"):
|
||
com = f'(全量回归{Config().__getattr__("DAV_COMMIT_ID")})'
|
||
except KeyError:
|
||
com = '(巡检)'
|
||
return com
|
||
|
||
def post(self):
|
||
"""
|
||
发送消息(内容UTF-8编码)
|
||
:return: 返回消息发送结果
|
||
"""
|
||
rich_text = {
|
||
"email": "fanlv@bytedance.com",
|
||
"msg_type": "post",
|
||
"content": {
|
||
"post": {
|
||
"zh_cn": {
|
||
"title": self.name+ self.get_commit(),
|
||
"content": [
|
||
[
|
||
{
|
||
"tag": "a",
|
||
"text": "测试报告",
|
||
"href": "https://davinci-rnd.pages.davincimotor.com/-/testing/davinci_dm_api/-/jobs/{0}/artifacts/report/pytest_html/result.html".format(self.job_id)
|
||
},
|
||
|
||
{
|
||
"tag": "at",
|
||
"user_id": self.exce_case()
|
||
# "text":"陈锐男"
|
||
}
|
||
],
|
||
[
|
||
{
|
||
"tag": "text",
|
||
"text": "测试 人员 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{testname}".format(testname=self.test_name)
|
||
}
|
||
],
|
||
[
|
||
{
|
||
"tag": "text",
|
||
"text": "运行 环境 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{host}".format(host=str(self.host))
|
||
}
|
||
],
|
||
[{
|
||
"tag": "text",
|
||
"text": "成 功 率 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{rate}".format(rate=self.RATE) + " %"
|
||
}], # 成功率
|
||
[{
|
||
"tag": "text",
|
||
"text": "总用例条数 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{failed}".format(failed=self.ALL_CASE)
|
||
}],
|
||
|
||
[{
|
||
"tag": "text",
|
||
"text": "成功用例数 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{total}".format(total=self.PASS)
|
||
}], # 成功用例数
|
||
|
||
[{
|
||
"tag": "text",
|
||
"text": "失败用例数 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{failed}".format(failed=self.FAILED)
|
||
}], # 失败用例数
|
||
[{
|
||
"tag": "text",
|
||
"text": "跳过用例数 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{skip}".format(skip=self.SKIP)
|
||
}],
|
||
[{
|
||
"tag": "text",
|
||
"text": "异常用例数 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{failed}".format(failed=self.BROKEN)
|
||
}], # 损坏用例数
|
||
[
|
||
{
|
||
"tag": "text",
|
||
"text": "时 间 : "
|
||
},
|
||
{
|
||
"tag": "text",
|
||
"text": "{test}".format(test=(datetime.datetime.now() + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'))
|
||
}
|
||
],
|
||
|
||
# [
|
||
# {
|
||
# "tag": "img",
|
||
# "image_key": "d640eeea-4d2f-4cb3-88d8-c964fab53987",
|
||
# "width": 300,
|
||
# "height": 300
|
||
# }
|
||
# ]
|
||
]
|
||
}
|
||
}
|
||
}
|
||
}
|
||
try:
|
||
post_data = json.dumps(rich_text)
|
||
response = requests.post(self.getwebhook(), headers=self.headers, data=post_data, verify=False)
|
||
except requests.exceptions.HTTPError as exc:
|
||
logging.error("消息发送失败, HTTP error: %d, reason: %s" % (exc.response.status_code, exc.response.reason))
|
||
raise
|
||
except requests.exceptions.ConnectionError:
|
||
logging.error("消息发送失败,HTTP connection error!")
|
||
raise
|
||
except requests.exceptions.Timeout:
|
||
logging.error("消息发送失败,Timeout error!")
|
||
raise
|
||
except requests.exceptions.RequestException:
|
||
logging.error("消息发送失败, Request Exception!")
|
||
raise
|
||
else:
|
||
try:
|
||
result = response.json()
|
||
except JSONDecodeError:
|
||
logging.error("服务器响应异常,状态码:%s,响应内容:%s" % (response.status_code, response.text))
|
||
return {'errcode': 500, 'errmsg': '服务器响应异常'}
|
||
else:
|
||
logging.debug('发送结果:%s' % result)
|
||
# 消息发送失败提醒(errcode 不为 0,表示消息发送异常),默认不提醒,开发者可以根据返回的消息发送结果自行判断和处理
|
||
if result.get('StatusCode') != 0:
|
||
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
|
||
error_data = {
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": "[注意-自动通知]飞书机器人消息发送失败,时间:%s,原因:%s,请及时跟进,谢谢!" % (
|
||
time_now, result['errmsg'] if result.get('errmsg', False) else '未知异常')
|
||
},
|
||
"at": {
|
||
"isAtAll": False
|
||
}
|
||
}
|
||
logging.error("消息发送失败,自动通知:%s" % error_data)
|
||
requests.post(self.getwebhook(), headers=self.headers, data=json.dumps(error_data))
|
||
return result
|
||
|
||
|
||
if __name__ == '__main__':
|
||
send = FeiShuTalkChatBot()
|
||
send.post()
|