初始化

This commit is contained in:
jing song 2021-09-10 17:38:16 +08:00
commit 2357d4d2c7
29 changed files with 1491 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
venv
.idea
__pycache__/
logs/
report/
.pytest_cache/
config/config.ini
test_suite/datas

63
README.md Normal file
View File

@ -0,0 +1,63 @@
# **接口自动化测试框架**
## 目录结构
|-- 接口自动化测试框架 # 主目录
├─common
├─config
├─scripts
├─testsuite
├─log
├─report
├─pytest.ini
├─requirements.txt
├─README.md
└─setupMain.py
###yaml param格式
login:
- info: "用户名登录-成功"
headers: {
"Content-Type": "application/json"
}
data: {
"username": "finsiot",
"password": "finsiot"
}
assert:
jsonpath:
- {
"path": "$.msg",
"value": "Success.",
"asserttype": "=="
}
- {
"path": "$.code",
"value": 0,
"asserttype": "=="
}
- {
"path": "$.data.id",
"value": 196,
"asserttype": "=="
}
sqlassert:
- {
"datas": [
{
"path": "$.data.id",
"name": "id"
},
{
"path": "$.data.username",
"name": "username"
},
],
"sql": "select * from saas.user where username = '****'",
}
time: 2
###yaml url格式
login:
name: '登录'
address: '/v1/apps/login/'
method: 'post

1
common/__init__.py Normal file
View File

@ -0,0 +1 @@
# coding:utf-8

226
common/basePage.py Normal file
View File

@ -0,0 +1,226 @@
# coding:utf-8
import json
import logging
import os
import random
import allure
import requests
import simplejson
from requests_toolbelt import MultipartEncoder
from scripts.log import Log
from scripts.randomData import replace_random
from config.confManage import host_manage
Log()
class apiSend(object):
def __init__(self):
self.host = host_manage(hos="${host}$")
self.http_type = host_manage(hos="${http_type}$")
@staticmethod
def iniDatas(data):
if isinstance(data, dict):
data = json.dumps(data)
if data is None:
return data
dataran = replace_random(data)
logging.info("请求参数: %s" % str(dataran))
return dataran
def post(self, address, header, request_parameter_type="json", timeout=8, data=None, files=None):
"""
post请求
:param address: 请求地址
:param header: 请求头
:param request_parameter_type: 请求参数格式form_data,raw
:param timeout: 超时时间
:param data: 请求参数
:param files: 文件路径
:return:
"""
url = str(self.http_type) + "://" + self.host + address
logging.info("请求地址:%s" % "" + str(address))
logging.info("请求头: %s" % str(header))
if 'form-data' in request_parameter_type:
with allure.step("POST上传文件"):
allure.attach(name="请求地址", body=url)
allure.attach(name="请求头", body=str(header))
allure.attach(name="请求参数", body=str(data))
if files is not None:
for i in files:
value = files[i]
if isinstance(value, int):
files[i] = str(value)
pass
elif '/' in value:
file_parm = i
files[file_parm] = (os.path.basename(value), open(value, 'rb'), 'application/octet-stream')
else:
pass
multipart = MultipartEncoder(
fields=files,
boundary='-----------------------------' + str(random.randint(int(1e28), int(1e29 - 1)))
)
header['Content-Type'] = multipart.content_type
response = requests.post(url=url, data=data, headers=header, timeout=timeout)
else:
# print(type(data))
multipart = MultipartEncoder(data)
response = requests.post(url=url, data=multipart, headers={'Content-Type': multipart.content_type},
timeout=timeout)
else:
data_random = self.iniDatas(data)
with allure.step("POST请求接口"):
allure.attach(name="请求地址", body=url)
allure.attach(name="请求头", body=str(header))
allure.attach(name="请求参数", body=str(data_random))
response = requests.post(url=url, data=data_random, headers=header, timeout=timeout)
try:
if response.status_code != 200:
logging.info("请求接口结果: %s" % str(response.text))
return response.text
else:
logging.info("请求接口结果: %s" % str(response.json()))
return response.json(), response.elapsed.total_seconds()
except json.decoder.JSONDecodeError:
return ''
except simplejson.errors.JSONDecodeError:
return ''
except Exception as e:
logging.exception('ERROR')
logging.error(e)
raise
def get(self, address, header, data, timeout=8):
"""
get请求
:param address:
:param header: 请求头
:param data: 请求参数
:param timeout: 超时时间
:return:
"""
data_random = self.iniDatas(data)
url = str(self.http_type) + "://" + host_manage(hos='${host}$') + address
logging.info("请求地址:%s" % "" + str(address))
logging.info("请求头: %s" % str(header))
logging.info("请求参数: %s" % str(data_random))
with allure.step("GET请求接口"):
allure.attach(name="请求地址", body=url)
allure.attach(name="请求头", body=str(header))
allure.attach(name="请求参数", body=str(data_random))
response = requests.get(url=url, params=data_random, headers=header, timeout=timeout)
if response.status_code == 301:
response = requests.get(url=response.headers["location"])
try:
logging.info("请求接口结果: %s" % str(response.json()))
return response.json(), response.elapsed.total_seconds()
# except json.decoder.JSONDecodeError:
# return ''
# except simplejson.errors.JSONDecodeError:
# return ''
except Exception as e:
logging.exception('ERROR')
logging.error(e)
raise
def put(self, address, header, request_parameter_type="json", timeout=8, data=None, files=None):
"""
put请求
:param address:
:param header: 请求头
:param request_parameter_type: 请求参数格式form_data,raw
:param timeout: 超时时间
:param data: 请求参数
:param files: 文件路径
:return:
"""
url = str(self.http_type) + "://" + host_manage(hos='${host}$') + address
logging.info("请求地址:%s" % "" + str(address))
logging.info("请求头: %s" % str(header))
with allure.step("PUT请求接口"):
allure.attach(name="请求地址", body=url)
allure.attach(name="请求头", body=str(header))
allure.attach(name="请求参数", body=str(data))
if request_parameter_type == 'raw':
data = self.iniDatas(data)
else:
data = data
response = requests.put(url=url, data=data, headers=header, timeout=timeout, files=files)
try:
logging.info("请求接口结果: %s" % str(response.json()))
return response.json(), response.elapsed.total_seconds()
except json.decoder.JSONDecodeError:
return ''
except simplejson.errors.JSONDecodeError:
return ''
except Exception as e:
logging.exception('ERROR')
logging.error(e)
raise
def delete(self, address, header, data, timeout=8):
"""
get请求
:param address:
:param header: 请求头
:param data: 请求参数
:param timeout: 超时时间
:return:
"""
data_random = self.iniDatas(data)
url = str(self.http_type) + "://" + host_manage(hos='${host}$') + address
logging.info("请求地址:%s" % "" + str(address))
logging.info("请求头: %s" % str(header))
with allure.step("DELETE请求接口"):
allure.attach(name="请求地址", body=url)
allure.attach(name="请求头", body=str(header))
allure.attach(name="请求参数", body=str(data_random))
response = requests.delete(url=url, params=data_random, headers=header, timeout=timeout)
try:
logging.info("请求接口结果: %s" % str(response.json()))
return response.json(), response.elapsed.total_seconds()
except json.decoder.JSONDecodeError:
return ''
except simplejson.errors.JSONDecodeError:
return ''
except Exception as e:
logging.exception('ERROR')
logging.error(e)
raise
def __call__(self, address, method, headers, data, **kwargs):
try:
if method == "post" or method == 'POST':
return self.post(address=address, data=data, header=headers, **kwargs)
elif method == "get" or method == 'GET':
return self.get(address=address, data=data, header=headers, **kwargs)
elif method == "delete" or method == 'DELETE':
return self.delete(address=address, data=data, header=headers ** kwargs)
elif method == "put" or method == 'PUT':
return self.put(address=address, data=data, header=headers, **kwargs)
else:
raise TypeError(f"请求异常,检查yml文件method")
except Exception:
raise TypeError(f"请求异常,检查yml文件method")
apisend = apiSend()
if __name__ == '__main__':
apisend.get()
apisend()

204
common/checkResult.py Normal file
View File

@ -0,0 +1,204 @@
# coding:utf-8
import logging
import time
import allure
import jsonpath
from scripts.dataBase import MYSQL
from scripts.log import Log
from scripts.readYamlFile import ini_yaml
Log()
def assert_json(data, key=None, value=None, asserttype="KEY"):
"""
json返回值断言
:param data: 返回数据
:param key:
:param value:
:param asserttype: "KEY" OR "VALUE" OR "KEY-VALUE"
:return:
"""
if asserttype == "KEY":
try:
assert data.get(key) is not None
with allure.step("判断JSON返回值KEY是否存在"):
allure.attach(name="期望存在KEY", body=str(key))
allure.attach(name='实际data', body=str(data))
logging.info(
"断言通过, 存在key'{}', value为'{}'.".format(key, data.get(key)))
except Exception:
logging.error("断言未通过, 不存在key:'{}'.".format(key))
raise
elif asserttype == "VALUE":
try:
rs_values = list(data.values())
with allure.step("判断JSON返回值VALUE是否存在"):
allure.attach(name="期望VALUE", body=str(value))
allure.attach(name='实际VALUE', body=str(rs_values))
assert value in rs_values
logging.info("断言通过, 存在value为'{}', key为'{}'.".format(value, list(data.keys())[
list(data.values()).index(value)]))
except AssertionError:
logging.error(
"断言未通过, 不存在类型为'{}'的value:''{}''.".format(type(value), value))
raise
elif asserttype == "KEY-VALUE":
try:
with allure.step("判断JSON返回值是否存在KEY-VALUE"):
allure.attach(name="期望KEY-VALUE", body=str({key: value}))
allure.attach(name='实际KEY-VALUE', body=str(data))
if key in data:
assert value == data[key]
else:
logging.error("断言未通过, 不存在key:'{}'.".format(key))
raise AssertionError
logging.info("断言通过, 存在键值对key为'{}',value为'{}'.".format(list(data.keys())[
list(data.values()).index(value)], value))
except AssertionError:
logging.error(
"断言未通过, 不存在key为'{}',类型为'{}'的value:''{}''.".format(key, type(value), value))
raise
else:
logging.error("断言类型错误, 请选择断言类型.")
def assert_text(hope_res, real_res):
"""
文本判断
:param hope_res: 期望结果
:param real_res: 实际结果
:return:
"""
if isinstance(hope_res["jsonpath"], list):
for h_res in hope_res["jsonpath"]:
if jsonpath.jsonpath(real_res, h_res["path"]):
r_res = jsonpath.jsonpath(real_res, h_res["path"])[0]
if h_res["asserttype"] == "==":
try:
with allure.step("json断言判断相等"):
allure.attach(name="期望结果", body=str(h_res))
allure.attach(name='实际实际结果', body=str(r_res))
assert r_res == h_res["value"]
logging.info("json断言通过, 期望结果{0}, 实际结果{1}".format(h_res, r_res))
except AssertionError:
logging.error("json断言未通过, 期望结果{0}, 实际结果{1}".format(h_res, r_res))
raise
elif h_res["asserttype"] == "!=":
try:
with allure.step("json断言判断不等"):
allure.attach(name="json期望结果", body=str(h_res))
allure.attach(name='json实际实际结果', body=str(r_res))
assert r_res != h_res["value"]
logging.info("json断言通过, 期望结果{0}, 实际结果{1}".format(h_res, r_res))
except AssertionError:
logging.error("json断言未通过, 期望结果{0}, 实际结果{1}".format(h_res, r_res))
raise
elif h_res["asserttype"] == "in":
r_res = str(r_res)
try:
with allure.step("json断言判断包含"):
allure.attach(name="期望结果", body=str(h_res))
allure.attach(name='实际实际结果', body=str(r_res))
assert r_res in h_res["value"]
logging.info("json断言通过, 期望结果{0}, 实际结果{1}".format(h_res, real_res))
except AssertionError:
logging.error("json断言未通过, 期望结果{0}, 实际结果{1}".format(h_res, real_res))
raise
else:
raise TypeError("asserttype方法错误")
else:
raise ValueError("获取json值失败,请检查jsonpath")
def assert_time(hope_res, real_res):
hope_time = hope_res["time"]
try:
with allure.step("判断响应时间"):
allure.attach(name="期望响应时间", body=str(hope_time))
allure.attach(name='实际响应时间', body=str(real_res))
assert real_res <= hope_time
logging.info("time断言通过, 期望响应时间{0}, 实际响应时间{1}".format(hope_time, real_res))
except AssertionError:
logging.error("请求响应时间过长, 期望时间{0}, 实际时间{1}".format(hope_time, real_res))
raise
def assert_sql(hope_res, real_res):
if isinstance(hope_res["sqlassert"], list):
db = MYSQL()
for h_res in hope_res["sqlassert"]:
h_sql = h_res["sql"]
for i in h_res["datas"]:
if jsonpath.jsonpath(real_res, i["path"]):
r_res = jsonpath.jsonpath(real_res, i["path"])[0]
t1 = time.time()
datas = db.run_sql(h_sql)
try:
with allure.step("断言判断相等"):
allure.attach(name="期望结果", body=str(datas))
allure.attach(name='实际实际结果', body=str(r_res))
assert r_res == datas[0][i["name"]]
logging.info("sql断言通过, 期望结果{0}, 实际结果{1},sql耗时{2:.5f}".format(datas, r_res,time.time()-t1))
except AssertionError:
logging.error("sql断言未通过, 期望结果{0}, 实际结果{1},sql耗时{2:.5f}".format(datas, r_res,time.time()-t1))
raise
else:
raise ValueError("获取json值失败,请检查jsonpath")
def asserting(hope_res, real_res,re_time=None):
if hope_res["jsonpath"]:
assert_text(hope_res, real_res)
if hope_res["sqlassert"]:
assert_sql(hope_res, real_res)
if hope_res["time"]:
assert_time(hope_res, re_time)
if __name__ == '__main__':
j = {'code': 0, 'msg': 'Success.', 'data': {
'token': 'eyJ1c2VyX2lkIjoxOTYsInVzZXJuYW1lIjoiZmluc2lvdCIsImV4cCI6MTYzMzc2MDg0NCwiZW1haWwiOiIifQ',
'id': 196,'username': '123'}}
# hp = {
# "jsonpath": [
#
# ],
# "sqlassert": [
# {
# "datas": [
# {
# "path": "$.data.id",
# "name": "id"
# },
# {
# "path": "$.data.username",
# "name": "username"
# },
# ],
# "sql": "select * from saas.user where username = '123' ",
# },
# {
# "datas": [
# {
# "path": "$.data.id",
# "name": "id"
# },
# {
# "path": "$.data.username",
# "name": "username"
# },
# ],
# "sql": "select * from saas.user where username = '44324' ",
# }
# ],
# "time": None
# }
hp = ini_yaml("loginData.yml")
print(hp)
# asserting(hp, j,23)

1
config/__init__.py Normal file
View File

@ -0,0 +1 @@
# coding:utf-8

94
config/confManage.py Normal file
View File

@ -0,0 +1,94 @@
# coding:utf-8
import re
from config.confRead import Config
def host_manage(hos):
"""
host关联配置
:param hos:
:return:
"""
try:
relevance_list = re.findall(r'\${(.*?)}\$', hos)
for n in relevance_list:
pattern = re.compile(r'\${' + n + r'}\$')
host_cf = Config()
host_relevance = host_cf.read_host()
hos = re.sub(pattern, host_relevance[n], hos, count=1)
except TypeError:
pass
return hos
def mail_manage(ml):
"""
email关联配置
:param ml:
:return:
"""
try:
relevance_list = re.findall(r"\${(.*?)}\$", ml)
for n in relevance_list:
pattern = re.compile(r'\${' + n + r'}\$')
email_cf = Config()
email_relevance = email_cf.read_email()
ml = re.sub(pattern, email_relevance[n], ml, count=1)
except TypeError:
pass
return ml
def dir_manage(directory):
"""
directory关联配置
:param directory:
:return:
"""
try:
relevance_list = re.findall(r"\${(.*?)}\$", directory)
for n in relevance_list:
pattern = re.compile(r'\${' + n + r'}\$')
dir_cf = Config()
dir_relevance = dir_cf.read_dir()
directory = re.sub(pattern, dir_relevance[n], directory, count=1)
except TypeError:
pass
return directory
def db_manage(db):
try:
relevance_list = re.findall(r"\${(.*?)}\$", db)
for n in relevance_list:
pattern = re.compile(r'\${' + n + r'}\$')
dir_cf = Config()
dir_relevance = dir_cf.read_db()
db = re.sub(pattern, dir_relevance[n], db, count=1)
except TypeError:
pass
return db
def dingding_manage(dingding):
try:
relevance_list = re.findall(r"\${(.*?)}\$", dingding)
for n in relevance_list:
pattern = re.compile(r'\${' + n + r'}\$')
dir_cf = Config()
dir_relevance = dir_cf.read_dingding()
dingding = re.sub(pattern, dir_relevance[n], dingding, count=1)
except TypeError:
pass
return dingding
if __name__ == '__main__':
# print(db_manage("${host}$"))
# print(db_manage("${user}$"))
# print(db_manage("${password}$"))
# print(db_manage("${database}$"))
# print(db_manage("${charset}$"))
# print(int(db_manage("${port}$")))
print(dingding_manage("${webhook}$"))

51
config/confRead.py Normal file
View File

@ -0,0 +1,51 @@
# coding:utf-8
import configparser
import os
class Config(object):
def __init__(self):
self.config = configparser.ConfigParser()
self.conf_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.ini')
def read_host(self):
"""
读取配置文件中host相关信息
:return:
"""
self.config.read(self.conf_path, encoding='utf-8')
return self.config['host']
def read_email(self):
"""
读取配置文件中host相关信息
:return:
"""
self.config.read(self.conf_path, encoding='utf-8')
return self.config['email']
def read_dir(self):
"""
读取配置文件中directory相关信息
:return:
"""
self.config.read(self.conf_path, encoding='utf-8')
return self.config['directory']
def read_db(self):
"""
读取配置文件中directory相关信息
:return:
"""
self.config.read(self.conf_path, encoding='utf-8')
return self.config['database']
def read_dingding(self):
self.config.read(self.conf_path, encoding='utf-8')
return self.config['dingding']
if __name__ == '__main__':
c = Config()
print(c.read_db()["host"])

10
conftest.py Normal file
View File

@ -0,0 +1,10 @@
# coding:utf-8
def pytest_collection_modifyitems(items):
"""
测试用例收集完成时将收集到的item的name和nodeid的中文显示在控制台上
:return:
"""
for item in items:
item.name = item.name.encode("utf-8").decode("unicode_escape")
item._nodeid = item.nodeid.encode("utf-8").decode("unicode_escape")

5
pytest.ini Normal file
View File

@ -0,0 +1,5 @@
[pytest]
log_cli = 1
log_cli_level = INFO
log_cli_format = "%(levelname)-8s%(asctime)s %(name)s:%(filename)s:%(lineno)d %(message)s"
log_cli_date_format=%Y-%m-%d %H:%M:%S

27
requirements.txt Normal file
View File

@ -0,0 +1,27 @@
allure-pytest==2.9.43
allure-python-commons==2.9.43
atomicwrites==1.4.0
attrs==21.2.0
certifi==2021.5.30
chardet==4.0.0
colorama==0.4.4
coverage==5.5
DingtalkChatbot==1.5.3
idna==2.10
iniconfig==1.1.1
jsonpath==0.82
packaging==20.9
pluggy==0.13.1
py==1.10.0
PyMySQL==1.0.2
pyparsing==2.4.7
pytest==6.2.4
pytest-ordering==0.6
pytest-rerunfailures==10.0
PyYAML==5.4.1
requests==2.25.1
requests-toolbelt==0.9.1
simplejson==3.17.2
six==1.16.0
toml==0.10.2
urllib3==1.26.5

1
scripts/__init__.py Normal file
View File

@ -0,0 +1 @@
# coding:utf-8

20
scripts/cleanFile.py Normal file
View File

@ -0,0 +1,20 @@
# coding:utf-8
import os
import shutil
def setDir(filepath):
"""
如果文件夹不存在就创建如果文件存在就清空
:param filepath:需要创建的文件夹路径
:return:
"""
shutil.rmtree(filepath)
# def cleanReport()
if __name__ == '__main__':
# setDir(r'D:\api_t\report\xml\temp')
l =os.listdir(r'D:\apitest\report\html')
betime = 60*60*24*7
t = os.path.getmtime(r'D:\apitest\report\html')
print(l,t)

70
scripts/dataBase.py Normal file
View File

@ -0,0 +1,70 @@
# coding:utf-8
import warnings
warnings.simplefilter('ignore', DeprecationWarning)
import pymysql
from scripts.log import Log
import logging
from config.confManage import db_manage
from scripts.randomData import replace_random
Log()
class MYSQL(object):
def __init__(self):
self.host = db_manage("${host}$")
self.user = db_manage("${user}$")
self.password = db_manage("${password}$")
self.database = db_manage("${database}$")
self.charset = db_manage("${charset}$")
self.port = int(db_manage("${port}$"))
try:
logging.debug("正在连接数据库..")
self.conn = pymysql.connect(host=self.host, user=self.user, password=self.password, database=self.database,
port=self.port, charset=self.charset)
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
logging.debug("数据库连接成功..")
except Exception as e:
logging.error("连接数据库失败..{}".format(e))
raise e
def run_sql(self, sql):
"""
执行sql语句
:param sql:
:return:
"""
try:
logging.debug("准备执行SQL语句..")
r_sql = replace_random(sql)
logging.debug("sql语句:{}".format(r_sql))
self.cursor.execute(r_sql)
rs = self.cursor.fetchall()
return rs
except Exception as e:
logging.error("执行SQL失败..{}".format(e))
raise
def commit(self):
self.conn.commit()
if __name__ == "__main__":
DB = MYSQL()
datas = DB.run_sql(
"""
select count(*) count
from (
select device.id, device.qr_code, device.region_id, region.enterprise_id, device.status, device.category_id
from device
inner join region
on device.region_id = region.id
and region.enterprise_id = 88
)
as dr
where status = 3 or status = 1 or status = 2 or status=0 or status IS NULL
"""
)

112
scripts/dingding.py Normal file
View File

@ -0,0 +1,112 @@
# coding:utf-8
# @Time : 2021/9/2 11:46 PM
# @Author : 余少琪
# @FileName: dingding.py
# @email : 1603453211@qq.com
import base64
import hashlib
import hmac
import time
import urllib.parse
from dingtalkchatbot.chatbot import DingtalkChatbot, FeedLink
from config.confManage import dingding_manage
class DingTalkSendMsg(object):
def __init__(self):
self.timestap = str(round(time.time() * 1000))
self.sign = self.get_sign()
self.webhook = dingding_manage("${webhook}$") + "&timestamp=" + self.timestap + "&sign=" + self.sign
self.xiaoding = DingtalkChatbot(self.webhook)
def get_sign(self):
"""
根据时间戳 + sgin 生成密钥
:return:
"""
secret = dingding_manage("${secret}$")
string_to_sign = '{}\n{}'.format(self.timestap, secret).encode('utf-8')
hmac_code = hmac.new(secret.encode('utf-8'), string_to_sign, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def send_text(self, msg, mobiles=None):
"""
发送文本信息
:param msg: 文本内容
:param mobiles: 艾特用户电话
:return:
"""
if not mobiles:
self.xiaoding.send_text(msg=msg, is_at_all=True)
else:
if isinstance(mobiles, list):
self.xiaoding.send_text(msg=msg, at_mobiles=mobiles)
else:
raise TypeError("mobiles类型错误 不是list类型.")
def send_link(self, title, text, message_url, pic_url):
"""
发送link通知
:return:
"""
try:
self.xiaoding.send_link(title=title, text=text, message_url=message_url, pic_url=pic_url)
except Exception:
raise
def send_markdown(self, title, msg, mobiles=None):
"""
:param mobiles:
:param title:
:param msg:
markdown 格式
'#### 自动化测试报告\n'
'> ' + str(msg) + '\n\n'
'> ![图片](https://img1.baidu.com/it/u'
'=424332266,'
'2532715190&fm=26&fmt=auto&gp=0.jpg)\n '
'> ###### ' + get_now_time() +
'[测试报告](https://blog.csdn.net/weixin_43865008/article'
'/details/120079270?spm=1001.2014.3001.5501) \n'
:return:
"""
if not mobiles:
self.xiaoding.send_markdown(title=title, text=msg, is_at_all=True)
if isinstance(mobiles, list):
self.xiaoding.send_markdown(title=title, text=msg, at_mobiles=mobiles)
else:
raise TypeError("mobiles类型错误 不是list类型.")
@staticmethod
def feed_link(title, message_url, pic_url):
return FeedLink(title=title, message_url=message_url, pic_url=pic_url)
def send_feed_link(self, *arg):
try:
self.xiaoding.send_feed_card(list(arg))
except Exception:
raise
if __name__ == '__main__':
# title = '自动化测试', text = '自动化测试报告',
# message_url = 'https://blog.csdn.net/weixin_43865008/article/details/120079270?spm=1001'
# '.2014.3001.5501',
# pic_url = "https://img2.baidu.com/it/u=841211112,1898579033&fm=26&fmt=auto&gp=0.jpg"
d = DingTalkSendMsg()
a = d.feed_link("1", message_url='https://blog.csdn.net/weixin_43865008/article/details/120079270?spm=1001',
pic_url="https://img2.baidu.com/it/u=841211112,1898579033&fm=26&fmt=auto&gp=0.jpg")
b = d.feed_link("2", message_url='https://blog.csdn.net/weixin_43865008/article/details/120079270?spm=1001',
pic_url="https://img2.baidu.com/it/u=841211112,1898579033&fm=26&fmt=auto&gp=0.jpg")
d.send_feed_link(a, b)
d.send_text("3", [13688400244])

41
scripts/log.py Normal file
View File

@ -0,0 +1,41 @@
# coding:utf-8
import logging
import time
from scripts.mkDir import mk_dir
from config.confManage import dir_manage
class Log(object):
def __init__(self):
"""
日志配置
"""
log_path = dir_manage(directory='${pro_dir}$')+dir_manage(directory='${log_dir}$')
now = time.strftime('%Y-%m-%d')
mk_dir(log_path)
logfile = log_path + "{}".format(now) + ".log"
# logfile_err = log_path + "{}-".format(now) + 'error.log'
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)
self.logger.handlers = []
fh = logging.FileHandler(logfile, mode='a+',encoding='utf-8')
fh.setLevel(logging.DEBUG)
# fh_err = logging.FileHandler(logfile_err, mode='a+',encoding='utf-8')
# fh_err.setLevel(logging.ERROR)
formatter = logging.Formatter("%(levelname)-8s%(asctime)s %(name)s:%(filename)s:%(lineno)d %(message)s")
fh.setFormatter(formatter)
# fh_err.setFormatter(formatter)
self.logger.addHandler(fh)
# self.logger.addHandler(fh_err)
if __name__ == '__main__':
Log()
logging.info("111222")
logging.error("111222")
logging.debug("111222")
logging.warning("111222")

19
scripts/mkDir.py Normal file
View File

@ -0,0 +1,19 @@
# coding:utf-8
import os
def mk_dir(path):
# 去除首位空格
path = path.strip()
path = path.rstrip("\\")
path = path.rstrip("/")
# 判断路径是否存在
is_exists = os.path.exists(path)
if not is_exists:
try:
os.makedirs(path)
except Exception as e:
raise ("logs目录创建失败%s" % e)

232
scripts/randomData.py Normal file
View File

@ -0,0 +1,232 @@
# coding:utf-8
import datetime
import hashlib
import random
import time
import re
def choice_data(data):
"""
获取随机整型数据
:param data: 数组
:return:
"""
_list = data.split(",")
num = random.choice(_list)
return num
def random_float(data):
"""
获取随机整型数据
:param data: 数组
:return:
"""
try:
start_num, end_num, accuracy = data.split(",")
start_num = int(start_num)
end_num = int(end_num)
accuracy = int(accuracy)
except ValueError:
raise Exception("调用随机整数失败,范围参数或精度有误!\n小数范围精度 %s" % data)
if start_num <= end_num:
num = random.uniform(start_num, end_num)
else:
num = random.uniform(end_num, start_num)
num = round(num, accuracy)
return num
def md5(data):
"""
md5加密
:param data:想要加密的字符
:return:
"""
m1 = hashlib.md5()
m1.update(data.encode("utf-8"))
data = m1.hexdigest()
return data
def random_int(scope):
"""
获取随机整型数据
:param scope: 数据范围
:return:
"""
try:
start_num, end_num = scope.split(",")
start_num = int(start_num)
end_num = int(end_num)
except ValueError:
raise Exception("调用随机整数失败,范围参数有误!\n %s" % str(scope))
if start_num <= end_num:
num = random.randint(start_num, end_num)
else:
num = random.randint(end_num, start_num)
return num
import string
def random_string(num_len):
"""
从a-zA-Z0-9生成制定数量的随机字符
:param num_len: 字符串长度
:return:
"""
try:
num_len = int(num_len)
except ValueError:
raise Exception("从a-zA-Z0-9生成指定数量的随机字符失败长度参数有误 %s" % num_len)
strings = ''.join(random.sample(string.hexdigits, +num_len))
return strings
def get_time(time_type, layout, unit="0,0,0,0,0"):
"""
获取时间
:param time_type: 现在的时间now 其他时间else_time
:param layout: 10timestamp13timestamp, else 时间类型
:param unit: 时间单位[seconds, minutes, hours, days, weeks] 所有参数都是可选的并且默认都是0
:return:
python中时间日期格式化符号
------------------------------------
%y 两位数的年份表示00-99
%Y 四位数的年份表示000-9999
%m 月份01-12
%d 月内中的一天0-31
%H 24小时制小时数0-23
%I 12小时制小时数01-12
%M 分钟数00=59
%S 00-59
%a 本地简化星期名称
%A 本地完整星期名称
%b 本地简化的月份名称
%B 本地完整的月份名称
%c 本地相应的日期表示和时间表示
%j 年内的一天001-366
%p 本地A.M.或P.M.的等价符
%U 一年中的星期数00-53星期天为星期的开始
%w 星期0-6星期天为星期的开始
%W 一年中的星期数00-53星期一为星期的开始
%x 本地相应的日期表示
%X 本地相应的时间表示
%Z 当前时区的名称 # 乱码
%% %号本身
# datetime.timedelta 代表两个时间之间的时间差
# time.strftime(fmt[,tupletime]) 接收以时间元组并返回以可读字符串表示的当地时间格式由fmt决定
# time.strptime(str,fmt='%a %b %d %H:%M:%S %Y') 根据fmt的格式把一个时间字符串解析为时间元组
# time.mktime(tupletime) 接受时间元组并返回时间戳1970纪元后经过的浮点秒数
"""
ti = datetime.datetime.now()
if time_type != "now":
resolution = unit.split(",")
try:
ti = ti + datetime.timedelta(seconds=int(resolution[0]), minutes=int(resolution[1]),
hours=int(resolution[2]), days=int(resolution[3]), weeks=int(resolution[4]))
except ValueError:
raise Exception("获取时间错误,时间单位%s" % unit)
if layout == "10timestamp":
ti = ti.strftime('%Y-%m-%d %H:%M:%S')
ti = int(time.mktime(time.strptime(ti, "%Y-%m-%d %H:%M:%S")))
return ti
elif layout == "13timestamp":
ti = ti.strftime('%Y-%m-%d %H:%M:%S')
ti = int(time.mktime(time.strptime(ti, '%Y-%m-%d %H:%M:%S')))
# round()是四舍五入
ti = int(round(ti * 1000))
return ti
else:
ti = ti.strftime(layout)
return ti
def replace_random(value):
"""
调用定义方法替换字符串
:param value:
:return:
"""
posint_list = re.findall(r"\$RandomPosInt\(([0-9]*,[0-9]*?)\)\$", value)
int_list = re.findall(r'\$RandomInt\((-[0-9]*,[0-9]*?)\)\$', value)
string_list = re.findall(r'\$RandomString\(([0-9]*?)\)\$', value)
float_list = re.findall(r'\$RandomFloat\(([0-9]*,[0-9]*,[0-9]*)\)\$', value)
time_list = re.findall(r"\$GetTime\(time_type=(.*?),layout=(.*?),unit=([0-9],[0-9],[0-9],[0-9],[0-9])\)\$", value)
choice_list = re.findall(r"\$Choice\((.*?)\)\$", value)
if len(int_list):
# 获取整型数据替换
for i in int_list:
pattern = re.compile(r'\$RandomInt\(' + i + r'\)\$')
key = str(random_int(i))
value = re.sub(pattern, key, value, count=1)
value = replace_random(value)
elif len(posint_list):
# 获取整型数据替换
for i in posint_list:
pattern = re.compile(r'\$RandomPosInt\(' + i + r'\)\$')
key = str(random_int(i))
value = re.sub(pattern, key, value, count=1)
value = replace_random(value)
elif len(string_list):
# 获取字符串数据替换
for j in string_list:
pattern = re.compile(r'\$RandomString\(' + j + r'\)\$')
key = str(random_string(j))
value = re.sub(pattern, key, value, count=1)
value = replace_random(value)
elif len(float_list):
# 获取浮点数数据替换
for n in float_list:
pattern = re.compile(r'\$RandomFloat\(' + n + r'\)\$')
key = str(random_float(n))
value = re.sub(pattern, key, value, count=1)
value = replace_random(value)
elif len(time_list):
# 获取时间替换
for m in time_list:
if len(m[0]) and len(m[1]):
pattern = re.compile(r'\$GetTime\(time_type=' + m[0] + ',layout=' + m[1] + ',unit=' + m[2] + r'\)\$')
key = str(get_time(m[0], m[1], m[2]))
value = re.sub(pattern, key, value, count=1)
else:
print("$GetTime$参数错误time_type, layout为必填")
value = replace_random(value)
elif len(choice_list):
# 调用choice方法
for i in choice_list:
pattern = re.compile(r'\$Choice\(' + i + r'\)\$')
key = str(choice_data(i))
value = re.sub(pattern, key, value, count=1)
value = replace_random(value)
else:
pass
return value
if __name__ == '__main__':
from scripts.readYamlFile import ini_yaml
import json
int_num = "$RandomPosInt(1,333)$"
str_num = '$RandomString($RandomPosInt(2,23)$)$'
float_num = '$RandomFloat($RandomPosInt(2,13)$,$RandomPosInt(2,13)$,$RandomPosInt(2,13)$)$'
time_num = '$GetTime(time_type=else,layout=%Y-%m-%d 00:00:00,unit=0,0,0,0,0)$'
choice_num = '$Choice($RandomPosInt(2,13)$,$RandomPosInt(2,13)$)$'
a = json.dumps(ini_yaml("homePageData.yml")["companyAlarm"])
# print(type(ini_yaml("家庭详情.yml")[0]["data"]))
# print(replace_random(int_num))
# print(replace_random(str_num))
# print(replace_random(float_num))
print(replace_random(time_num))
# print(replace_random(choice_num))
print(replace_random(a))

23
scripts/readYamlFile.py Normal file
View File

@ -0,0 +1,23 @@
# coding:utf-8
import yaml
from config.confManage import dir_manage
datapath = dir_manage("${pro_dir}$") + dir_manage("${test_suite}$")+dir_manage("${data_dir}$") + dir_manage("${test_name}$")
def ini_yaml(filename, path=datapath):
with open(path + "/" + filename, 'r', encoding="utf-8") as f:
file_data = f.read()
data = yaml.load(file_data, Loader=yaml.FullLoader)
return data
if __name__ == '__main__':
# get_yaml_data(r"F:\api2.0\config\runConfig.yml")
runConfig_dict = ini_yaml("homePageData.yml")
# case_level = runConfig_dict[0]["address"].format(**{"home_id": "123"})
print(runConfig_dict)
# print(case_level)
# print(type(case_level))

44
scripts/sendEmail.py Normal file
View File

@ -0,0 +1,44 @@
# coding:utf-8
import logging
import smtplib
from email.header import Header
from email.mime.text import MIMEText
from config.confManage import mail_manage
from scripts.log import Log
Log()
def send_email(title, url):
# 第三方 SMTP 服务
mail_host = mail_manage(ml='${mail_host}$') # 设置服务器
mail_user = mail_manage(ml='${mail_user}$') # 用户名
mail_pass = mail_manage(ml='${mail_pass}$') # 口令
sender = mail_manage(ml='${sender}$')
receivers = mail_manage(ml='${receivers}$') # 接收邮件可设置为你的QQ邮箱或者其他邮箱
mail_msg = """
<p>测试报告发送</p>
<p><a href="{}">点我查看测试报告</a></p>
""".format(url)
message = MIMEText(mail_msg, 'html', 'utf-8')
message['From'] = Header("13688400244@sina.cn", )
message['To'] = Header("井松", 'utf-8')
message['Subject'] = Header(title, 'utf-8')
try:
logging.debug("初始化邮件服务..")
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25) # 25 为 SMTP 端口号
smtpObj.login(mail_user, mail_pass)
logging.debug("发送邮件中..")
smtpObj.sendmail(sender, receivers, message.as_string())
logging.debug("邮件发送成功")
except:
raise
if __name__ == '__main__':
send_email("报告", "https://www.runoob.com/")

47
setupMain.py Normal file
View File

@ -0,0 +1,47 @@
# coding:utf-8
import os
import time
import pytest
from config.confManage import dir_manage
from scripts.mkDir import mk_dir
project_path = os.path.split(os.path.realpath(__file__))[0]
if ':' in project_path:
project_path = project_path.replace('\\', '/')
else:
pass
def run():
date = time.strftime('%Y-%m-%d')
localtime = time.strftime('%Y%m%d%H%M%S', time.localtime())
test_case_path = project_path + dir_manage('${test_suite}$') + dir_manage('${case_dir}$') + dir_manage(
'${test_name}$')
# temp地址变量
temp_path = project_path + dir_manage('${report_xml_dir}$') + "temp/" + localtime + '/'
# html地址变量
html_path = project_path + dir_manage('${report_html_dir}$') + date + '/'
# 如果不存在地址路径则创建文件夹
mk_dir(temp_path)
mk_dir(html_path)
# 执行命令行
args = ['-s', '-q', test_case_path, '--alluredir', temp_path]
pytest.main(args)
cmd = 'allure generate %s -o %s -c' % (temp_path, html_path)
os.system(cmd)
# 发送报告
# send_email(localtime + "测试报告", "http://192.168.1.2:9999")
# 钉钉发送
# ding = DingTalkSendMsg()
# ding.send_text("点击链接打开测试报告 http://192.168.1.2:9999",[13688400244])
# 生成html报告
os.system(r'allure generate {0} -o {1} '.format(temp_path, html_path))
# 打开报告服务 并指定端口
os.system(r'allure serve {0} -p 9999'.format(temp_path))
if __name__ == '__main__':
run()

1
test_suite/__init__.py Normal file
View File

@ -0,0 +1 @@
# coding:utf-8

View File

@ -0,0 +1,23 @@
# coding:utf-8
import logging
import allure
import pytest
from common.checkResult import asserting
from scripts.log import Log
from scripts.readYamlFile import ini_yaml
from common.basePage import apisend
from test_suite.page import saasApp_pages
Log()
__all__ = [
'pytest',
'asserting',
'Log',
'ini_yaml',
'logging',
'allure',
'apisend',
'saasApp_pages'
]

View File

@ -0,0 +1,59 @@
# coding:utf-8
from test_suite.page import *
urlData = ini_yaml("urlData.yml")
def login(casedata):
data = urlData["login"]
logging.info("{}".format(casedata["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=casedata["headers"],
data=casedata["data"])
return res, restime
def mobileCode(casedata):
data = urlData["mobileCode"]
logging.info("{}".format(casedata["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=casedata["headers"],
data=casedata["data"])
return res, restime
def forgetPassword(casedata):
data = urlData["forgetPassword"]
logging.info("{}".format(casedata["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=casedata["headers"],
data=casedata["data"])
return res, restime
def todayTask(casedata):
data = urlData["todayTask"]
logging.info("{}".format(casedata["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=casedata["headers"],
data=casedata["data"])
return res, restime
def companyName(casedata):
data = urlData["companyName"]
logging.info("{}".format(casedata["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=casedata["headers"],
data=casedata["data"])
return res, restime
def companyPower(casedata):
data = urlData["companyPower"]
logging.info("{}".format(casedata["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=casedata["headers"],
data=casedata["data"])
return res, restime
def deviceState(casedata):
data = urlData["deviceState"]
logging.info("{}".format(casedata["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=casedata["headers"],
data=casedata["data"])
return res, restime
def companyAlarm(casedata):
data = urlData["companyAlarm"]
logging.info("{}".format(casedata["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=casedata["headers"],
data=casedata["data"])
return res, restime

View File

@ -0,0 +1 @@
# coding:utf-8

View File

@ -0,0 +1 @@
# coding:utf-8

View File

@ -0,0 +1,17 @@
# coding:utf-8
from test_suite.page.saasApp_pages import *
paramData = ini_yaml("loginData.yml")["login"][0]
@pytest.fixture(scope="module")
def setup_login():
logging.info("前置请求登录")
data = urlData["login"]
logging.info("{}".format(paramData["info"]))
res, restime = apisend(address=data["address"], method=data["method"], headers=paramData["headers"],
data=paramData["data"])
logging.info("前置请求结束")
return res

View File

@ -0,0 +1,54 @@
# coding:utf-8
from test_suite.page.saasApp_pages import *
paramData = ini_yaml("homePageData.yml")
class Test_homePage(object):
# def setup(self):
# self.re = saasPages()
# ids=[i["info"] for i in paramData["login"]]
@allure.story("Test_todayTasks")
@pytest.mark.parametrize('casedata', paramData["todayTasks"], ids=[i["info"] for i in paramData["todayTasks"]])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.run(order=1)
def test_todayTasks(self, setup_login, casedata):
casedata["headers"]["Authorization"] = "JWT " + setup_login["data"]["token"]
res, restime = todayTask(casedata)
asserting(hope_res=casedata["assert"], real_res=res, re_time=restime)
@allure.story("Test_companyName")
@pytest.mark.parametrize('casedata', paramData["companyName"], ids=[i["info"] for i in paramData["companyName"]])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.run(order=1)
def test_companyName(self,setup_login, casedata):
casedata["headers"]["Authorization"] = "JWT " + setup_login["data"]["token"]
res, restime = companyName(casedata)
asserting(hope_res=casedata["assert"], real_res=res, re_time=restime)
@allure.story("Test_companyPower")
@pytest.mark.parametrize('casedata', paramData["companyPower"], ids=[i["info"] for i in paramData["companyPower"]])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.run(order=1)
def test_companyPower(self,setup_login, casedata):
casedata["headers"]["Authorization"] = "JWT " + setup_login["data"]["token"]
res, restime = companyPower(casedata)
asserting(hope_res=casedata["assert"], real_res=res, re_time=restime)
@allure.story("Test_deviceState")
@pytest.mark.parametrize('casedata', paramData["deviceState"], ids=[i["info"] for i in paramData["deviceState"]])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.run(order=1)
def test_deviceState(self,setup_login, casedata):
casedata["headers"]["Authorization"] = "JWT " + setup_login["data"]["token"]
res, restime = deviceState(casedata)
asserting(hope_res=casedata["assert"], real_res=res, re_time=restime)
@allure.story("Test_companyAlarm")
@pytest.mark.parametrize('casedata', paramData["companyAlarm"], ids=[i["info"] for i in paramData["companyAlarm"]])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.run(order=1)
def test_companyAlarm(self,setup_login, casedata):
casedata["headers"]["Authorization"] = "JWT " + setup_login["data"]["token"]
res, restime = companyAlarm(casedata)
asserting(hope_res=casedata["assert"], real_res=res, re_time=restime)

View File

@ -0,0 +1,36 @@
# coding:utf-8
from test_suite.page.saasApp_pages import *
paramData = ini_yaml("loginData.yml")
class Test_login(object):
# def setup(self):
# self.re = saasPages()
# ids=[i["info"] for i in paramData["login"]]
@allure.story("Test_login")
@pytest.mark.parametrize('casedata', paramData["login"], ids=[i["info"] for i in paramData["login"]])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.run(order=1)
def test_login(self, casedata):
res, restime = login(casedata)
asserting(hope_res=casedata["assert"], real_res=res,re_time=restime)
@allure.story("Test_mobileCode")
@pytest.mark.parametrize('casedata', paramData["mobileCode"], ids=[i["info"] for i in paramData["mobileCode"]])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.run(order=1)
def test_mobileCode(self, casedata):
res, restime = mobileCode(casedata)
asserting(hope_res=casedata["assert"], real_res=res,re_time=restime)
# cache.set("session_id", res["session_id"])
@allure.story("Test_forgetPassword")
@pytest.mark.parametrize('casedata', paramData["forgetPassword"], ids=[i["info"] for i in paramData["forgetPassword"]])
@pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.run(order=1)
def test_forgetPassword(self, casedata):
res, restime = mobileCode(paramData["mobileCode"][1])
casedata['data']['session_key'] = res["data"]['session_id']
res, restime = forgetPassword(casedata)
asserting(hope_res=casedata["assert"], real_res=res,re_time=restime)