pytest_api/tools/feishu_control.py

517 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from json import JSONDecodeError
import requests
import json
import logging
import time
import urllib3
import datetime
import os
import yaml.scanner
import argparse
urllib3.disable_warnings()
try:
JSONDecodeError = json.decoder.JSONDecodeError
except AttributeError:
JSONDecodeError = ValueError
class ConfigHandler:
# 项目路径
root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(root_path, 'config', 'conf.yaml')
report_path = os.path.join(root_path, 'report')
class AllureFileClean:
"""allure 报告数据清洗,提取业务需要得数据"""
def __init__(self):
pass
@classmethod
def _get_al_files(cls) -> list:
""" 获取所有 test-case 中的 json 文件 """
filename = []
# 获取所有文件下的子文件名称
for root, dirs, files in os.walk(ConfigHandler.report_path + '/html/data/test-cases'):
for filePath in files:
path = os.path.join(root, filePath)
filename.append(path)
return filename
def get_test_cases(self):
""" 获取所有 allure 报告中执行用例的情况"""
# 将所有数据都收集到files中
files = []
for i in self._get_al_files():
with open(i, 'r', encoding='utf-8') as fp:
date = json.load(fp)
files.append(date)
return files
def get_failed_case(self):
""" 获取到所有失败的用例标题和用例代码路径"""
error_cases = []
for i in self.get_test_cases():
if i['status'] == 'failed' or i['status'] == 'broken':
error_cases.append((i['name'], i['fullName']))
return error_cases
def get_failed_cases_detail(self):
""" 返回所有失败的测试用例相关内容 """
date = self.get_failed_case()
# 判断有失败用例,则返回内容
if len(date) >= 1:
values = "失败用例:\n"
values += " **********************************\n"
for i in date:
values += " " + i[0] + ":" + i[1] + "\n"
return values
else:
# 如果没有失败用例则返回False
return ""
@classmethod
def get_case_count(cls):
""" 统计用例数量 """
file_name = ConfigHandler.report_path + '/html/history/history-trend.json'
with open(file_name, 'r', encoding='utf-8') as fp:
date = json.load(fp)[0]['data']
return date
class CaseCount:
def __init__(self):
self.AllureData = AllureFileClean()
def pass_count(self):
"""用例成功数"""
return self.AllureData.get_case_count()['passed']
def failed_count(self):
"""用例失败数"""
return self.AllureData.get_case_count()['failed']
def broken_count(self):
"""用例异常数"""
return self.AllureData.get_case_count()['broken']
def skipped_count(self):
"""用例跳过数"""
return self.AllureData.get_case_count()['skipped']
def total_count(self):
"""用例总数"""
return self.AllureData.get_case_count()['total']
def pass_rate(self):
"""用例成功率"""
# 四舍五入保留2位小数
try:
pass_rate = round((self.pass_count() + self.skipped_count()) / self.total_count() * 100, 2)
return pass_rate
except ZeroDivisionError:
return 0.00
class GetYamlData:
def __init__(self, file_dir):
self.fileDir = file_dir
def get_yaml_data(self) -> dict:
"""
获取 yaml 中的数据
:param: fileDir:
:return:
"""
# 判断文件是否存在
if os.path.exists(self.fileDir):
data = open(self.fileDir, 'r', encoding='utf-8')
try:
res = yaml.load(data, Loader=yaml.FullLoader)
return res
except UnicodeDecodeError:
raise ValueError(f"yaml文件编码错误文件路径:{self.fileDir}")
else:
raise FileNotFoundError("文件路径不存在")
def write_yaml_data(self, key: str, value) -> int:
"""
更改 yaml 文件中的值
:param key: 字典的key
:param value: 写入的值
:return:
"""
with open(self.fileDir, 'r', encoding='utf-8') as f:
# 创建了一个空列表,里面没有元素
lines = []
for line in f.readlines():
if line != '\n':
lines.append(line)
f.close()
with open(self.fileDir, 'w', encoding='utf-8') as f:
flag = 0
for line in lines:
left_str = line.split(":")[0]
if key == left_str and '#' not in line:
newline = "{0}: {1}".format(left_str, value)
line = newline
f.write('%s\n' % line)
flag = 1
else:
f.write('%s' % line)
f.close()
return flag
class Config:
''''
测试环境 : CI_ENVIRONMENT_SLUG
飞书通知: webhook
GL_JOB_ID :GL_JOB_ID
test_user : GITLAB_USER_NAME
'''
def __getattr__(self, attr):
return os.environ[attr]
def is_not_null_and_blank_str(content):
"""
非空字符串
:param content: 字符串
:return: 非空 - True空 - False
"""
if content and content.strip():
return True
else:
return False
class FeiShuTalkChatBot(object):
"""飞书机器人通知"""
def __init__(self):
self.job_id = str(Config().__getattr__("GL_JOB_ID"))
self.timeStamp = str(round(time.time() * 1000))
self.devConfig = ConfigHandler()
# 从yaml文件中获取钉钉配置信息
self.name = str(GetYamlData(ConfigHandler.config_path).get_yaml_data()['ProjectName'])
self.test_name = Config().__getattr__("GITLAB_USER_NAME")
try:
self.host = Config().__getattr__("DAV_ENVIRONMENT_SLUG")
except:
self.host = "默认分支环境" + Config().__getattr__("CI_ENVIRONMENT_SLUG")
self.tester = GetYamlData(ConfigHandler.config_path).get_yaml_data()
self.allure_data = CaseCount()
self.PASS = self.allure_data.pass_count()
self.FAILED = self.allure_data.failed_count()
self.BROKEN = self.allure_data.broken_count()
self.SKIP = self.allure_data.skipped_count()
self.TOTAL = self.allure_data.total_count()
self.RATE = self.allure_data.pass_rate()
self.ALL_CASE = self.PASS + self.FAILED + self.BROKEN
self.Except_case = self.BROKEN+self.FAILED
self.headers = {'Content-Type': 'application/json; charset=utf-8'}
self.devConfig = ConfigHandler()
# self.getFeiShuTalk = GetYamlData(self.devConfig.config_path).get_yaml_data()['FeiShuTalk']
def getwebhook(self):
try:
testing_evn = Config().__getattr__("DAV_ENVIRONMENT_SLUG")
if "-" in testing_evn:
list_evn = testing_evn.split("-")
cur_evn = list_evn[0]
if cur_evn == 'prod':
webhook = Config().__getattr__("webhook")
else:
webhook = Config().__getattr__("testwebhook")
else:
if testing_evn == 'prod':
webhook = Config().__getattr__("webhook")
else:
webhook = Config().__getattr__("testwebhook")
except:
cur_evn = Config().__getattr__("CI_ENVIRONMENT_SLUG")
if cur_evn == 'prod':
webhook = Config().__getattr__("webhook")
else:
webhook = Config().__getattr__("testwebhook")
return webhook
def send_text(self, msg: str):
"""
消息类型为text类型
:param msg: 消息内容
:return: 返回消息发送结果
"""
data = {"msg_type": "text", "at": {}}
if is_not_null_and_blank_str(msg): # 传入msg非空
data["content"] = {"text": msg}
else:
logging.error("text类型消息内容不能为空")
raise ValueError("text类型消息内容不能为空")
logging.debug('text类型%s' % data)
return self.post()
def error_feishu(self, error_message):
"""
发送消息内容UTF-8编码
:return: 返回消息发送结果
"""
rich_text = {
"email": "fanlv@bytedance.com",
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": "【接口自动化执行异常通知】",
"content": [
[{
"tag": "text",
"text": "接口自动化执行异常,错误如下请关注 : "
},
{
"tag": "text",
"text": "{0}".format(error_message)
}], # 成功率
]
}
}
}
}
try:
post_data = json.dumps(rich_text)
response = requests.post(self.getwebhook(), headers=self.headers, data=post_data, verify=False)
except requests.exceptions.HTTPError as exc:
logging.error("消息发送失败, HTTP error: %d, reason: %s" % (exc.response.status_code, exc.response.reason))
raise
except requests.exceptions.ConnectionError:
logging.error("消息发送失败HTTP connection error!")
raise
except requests.exceptions.Timeout:
logging.error("消息发送失败Timeout error!")
raise
except requests.exceptions.RequestException:
logging.error("消息发送失败, Request Exception!")
raise
else:
try:
result = response.json()
except JSONDecodeError:
logging.error("服务器响应异常,状态码:%s,响应内容:%s" % (response.status_code, response.text))
return {'errcode': 500, 'errmsg': '服务器响应异常'}
else:
logging.debug('发送结果:%s' % result)
# 消息发送失败提醒errcode 不为 0表示消息发送异常默认不提醒开发者可以根据返回的消息发送结果自行判断和处理
if result.get('StatusCode') != 0:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
error_data = {
"msgtype": "text",
"text": {
"content": "[注意-自动通知]飞书机器人消息发送失败,时间:%s,原因:%s,请及时跟进,谢谢!" % (
time_now, result['errmsg'] if result.get('errmsg', False) else '未知异常')
},
"at": {
"isAtAll": False
}
}
logging.error("消息发送失败,自动通知:%s" % error_data)
requests.post(self.getwebhook(), headers=self.headers, data=json.dumps(error_data))
return result
# 判断 如果错误与异常相加大于1 @all,如果没有@寂寞
def exce_case(self):
if self.Except_case >= 1:
return 'all'
else:
# userId c1c916dg
return '郭林莉'
def get_commit(self):
com = ''
try:
if Config().__getattr__("DAV_COMMIT_ID"):
com = f'(全量回归{Config().__getattr__("DAV_COMMIT_ID")})'
except KeyError:
com = '(巡检)'
return com
def post(self):
"""
发送消息内容UTF-8编码
:return: 返回消息发送结果
"""
rich_text = {
"email": "fanlv@bytedance.com",
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": self.name+ self.get_commit(),
"content": [
[
{
"tag": "a",
"text": "测试报告",
"href": "https://davinci-rnd.pages.davincimotor.com/-/testing/davinci_dm_api/-/jobs/{0}/artifacts/report/pytest_html/result.html".format(self.job_id)
},
{
"tag": "at",
"user_id": self.exce_case()
# "text":"陈锐男"
}
],
[
{
"tag": "text",
"text": "测试 人员 : "
},
{
"tag": "text",
"text": "{testname}".format(testname=self.test_name)
}
],
[
{
"tag": "text",
"text": "运行 环境 : "
},
{
"tag": "text",
"text": "{host}".format(host=str(self.host))
}
],
[{
"tag": "text",
"text": "成 功 率 : "
},
{
"tag": "text",
"text": "{rate}".format(rate=self.RATE) + " %"
}], # 成功率
[{
"tag": "text",
"text": "总用例条数 : "
},
{
"tag": "text",
"text": "{failed}".format(failed=self.ALL_CASE)
}],
[{
"tag": "text",
"text": "成功用例数 : "
},
{
"tag": "text",
"text": "{total}".format(total=self.PASS)
}], # 成功用例数
[{
"tag": "text",
"text": "失败用例数 : "
},
{
"tag": "text",
"text": "{failed}".format(failed=self.FAILED)
}], # 失败用例数
[{
"tag": "text",
"text": "跳过用例数 : "
},
{
"tag": "text",
"text": "{skip}".format(skip=self.SKIP)
}],
[{
"tag": "text",
"text": "异常用例数 : "
},
{
"tag": "text",
"text": "{failed}".format(failed=self.BROKEN)
}], # 损坏用例数
[
{
"tag": "text",
"text": "时 间 : "
},
{
"tag": "text",
"text": "{test}".format(test=(datetime.datetime.now() + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S'))
}
],
# [
# {
# "tag": "img",
# "image_key": "d640eeea-4d2f-4cb3-88d8-c964fab53987",
# "width": 300,
# "height": 300
# }
# ]
]
}
}
}
}
try:
post_data = json.dumps(rich_text)
response = requests.post(self.getwebhook(), headers=self.headers, data=post_data, verify=False)
except requests.exceptions.HTTPError as exc:
logging.error("消息发送失败, HTTP error: %d, reason: %s" % (exc.response.status_code, exc.response.reason))
raise
except requests.exceptions.ConnectionError:
logging.error("消息发送失败HTTP connection error!")
raise
except requests.exceptions.Timeout:
logging.error("消息发送失败Timeout error!")
raise
except requests.exceptions.RequestException:
logging.error("消息发送失败, Request Exception!")
raise
else:
try:
result = response.json()
except JSONDecodeError:
logging.error("服务器响应异常,状态码:%s,响应内容:%s" % (response.status_code, response.text))
return {'errcode': 500, 'errmsg': '服务器响应异常'}
else:
logging.debug('发送结果:%s' % result)
# 消息发送失败提醒errcode 不为 0表示消息发送异常默认不提醒开发者可以根据返回的消息发送结果自行判断和处理
if result.get('StatusCode') != 0:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
error_data = {
"msgtype": "text",
"text": {
"content": "[注意-自动通知]飞书机器人消息发送失败,时间:%s,原因:%s,请及时跟进,谢谢!" % (
time_now, result['errmsg'] if result.get('errmsg', False) else '未知异常')
},
"at": {
"isAtAll": False
}
}
logging.error("消息发送失败,自动通知:%s" % error_data)
requests.post(self.getwebhook(), headers=self.headers, data=json.dumps(error_data))
return result
if __name__ == '__main__':
send = FeiShuTalkChatBot()
send.post()