mirror of https://gitee.com/a529548204/apitest.git
353 lines
16 KiB
Python
353 lines
16 KiB
Python
# coding:utf-8
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import xmltodict
|
|
|
|
import allure
|
|
import requests
|
|
from requests_toolbelt import MultipartEncoder
|
|
from util.tools.log import Log
|
|
from util.tools.randomData import replace_random
|
|
from config.confManage import host_manage
|
|
from util.tools.caches import Cache, valuehandle
|
|
|
|
Log()
|
|
|
|
|
|
class apiSend(object):
|
|
|
|
def __init__(self):
|
|
# self.http_type = host_manage(hos="${http_type}$")
|
|
self.rel = Cache()
|
|
|
|
@staticmethod
|
|
def iniDatas(data):
|
|
if isinstance(data, dict):
|
|
data = json.dumps(data, ensure_ascii=False)
|
|
if data is None:
|
|
return data
|
|
dataran = replace_random(data)
|
|
return dataran
|
|
|
|
def post(self, http, address, header, caches, timeout=8, data=None, files=None, host="host"):
|
|
"""
|
|
post请求
|
|
:param http:
|
|
:param host:
|
|
:param caches: 关联情况
|
|
- cachefrom: 'body' # response : 从结果中获取 body : 从参数中获取
|
|
path: '$.code' # body如果是 "id=2&path=haha" 会转换成字典 然后根据path使用jsonpath取值
|
|
name: 'code'
|
|
:param address: 请求地址
|
|
:param header: 请求头
|
|
:param timeout: 超时时间
|
|
:param data: 请求参数
|
|
:param files: 文件对象 dict
|
|
{
|
|
'file参数名(files)': (r'文件名称.xlsx', open(r'路径\文件名称.xlsx', 'rb') 或者 直接文件路径)
|
|
}
|
|
file: 'D:\apitest\test_suite\files\test.png'
|
|
files={"files": casedata["data"]["file"]}
|
|
:return:
|
|
"""
|
|
|
|
iniaddress = replace_random(address, param=data["urlparam"])
|
|
url = http + "://" + host_manage(hos='${{{}}}$'.format(host)) + iniaddress
|
|
data_random = self.iniDatas(data["param"])
|
|
logging.info("请求地址:%s" % "" + url)
|
|
logging.info("请求方法:POST")
|
|
logging.info("请求头: %s" % str(header))
|
|
logging.info("请求参数: %s" % str(data_random))
|
|
with allure.step("POST请求"):
|
|
allure.attach(name="请求地址", body=url)
|
|
allure.attach(name="请求头", body=str(header))
|
|
allure.attach(name="请求参数", body=str(data_random))
|
|
if "multipart/form-data" in str(header.values()):
|
|
# 根据请求头contenttype判断是否为上传文件
|
|
if isinstance(files, dict):
|
|
for fileskey in files:
|
|
value = files[fileskey]
|
|
if isinstance(value, int):
|
|
files[fileskey] = str(value)
|
|
pass
|
|
elif "\\" in value:
|
|
file_parm = fileskey
|
|
files[file_parm] = (os.path.basename(value), open(value, 'rb'), 'application/octet-stream')
|
|
else:
|
|
pass
|
|
multipart = MultipartEncoder(
|
|
fields=files,
|
|
boundary='-----------------------------' + str(random.randint(int(1e28), int(1e29 - 1)))
|
|
)
|
|
header['Content-Type'] = multipart.content_type
|
|
response = requests.post(url=url, headers=header, timeout=timeout, data=multipart)
|
|
elif files is None:
|
|
try:
|
|
data_random = json.loads(data_random)
|
|
except json.decoder.JSONDecodeError:
|
|
pass
|
|
if not isinstance(data_random, dict):
|
|
data_random = valuehandle(str(data_random))
|
|
multipart = MultipartEncoder(
|
|
fields=data_random,
|
|
boundary='-----------------------------' + str(random.randint(int(1e28), int(1e29 - 1)))
|
|
)
|
|
header['Content-Type'] = multipart.content_type
|
|
response = requests.post(url=url, data=multipart, headers=header,
|
|
timeout=timeout)
|
|
else:
|
|
raise TypeError("files参数格式错误")
|
|
elif 'application/json' in str(header.values()):
|
|
if data_random:
|
|
data_random = json.loads(data_random)
|
|
response = requests.post(url=url, json=data_random, headers=header, timeout=timeout)
|
|
elif 'application/xml' in str(header.values()) or 'text/xml' in str(header.values()):
|
|
if data_random:
|
|
data_random = json.loads(data_random)
|
|
xmldata = xmltodict.unparse({'xml': data_random})
|
|
# dict转成xml
|
|
response = requests.post(url=url, data=xmldata, headers=header, timeout=timeout)
|
|
else:
|
|
response = requests.post(url=url, data=data_random, headers=header, timeout=timeout)
|
|
try:
|
|
try:
|
|
res = response.json()
|
|
except:
|
|
res = response.text
|
|
logging.info("请求接口结果: %s" % str(res))
|
|
self.rel.locallcache(caches, bodys=data_random, res=res, cookie=response.cookies)
|
|
allure.attach(name="请求结果", body=str(res))
|
|
return res, response.elapsed.total_seconds(), response.status_code
|
|
except Exception as e:
|
|
logging.error(e)
|
|
logging.error(response.text)
|
|
raise
|
|
|
|
def get(self, http, address, caches, header, data, timeout=8, host="host"):
|
|
"""
|
|
get请求
|
|
:param http:
|
|
:param host:
|
|
:param caches: 关联情况
|
|
- cachefrom: 'body' # response : 从结果中获取 body : 从参数中获取
|
|
path: '$.code' # body如果是 "id=2&path=haha" 会转换成字典 然后根据path使用jsonpath取值
|
|
name: 'code'
|
|
:param address:
|
|
:param header: 请求头
|
|
:param data: 请求参数
|
|
:param timeout: 超时时间
|
|
:return:
|
|
"""
|
|
iniaddress = replace_random(address, param=data["urlparam"])
|
|
# if isinstance(data, dict):
|
|
# if "urlparam" in data.keys():
|
|
# address = replace_random(address, param=data["urlparam"])
|
|
data_random = self.iniDatas(data["param"])
|
|
url = http + "://" + host_manage(hos='${{{}}}$'.format(host)) + iniaddress
|
|
logging.info("请求地址:%s" % "" + url)
|
|
logging.info("请求方法:GET")
|
|
logging.info("请求头: %s" % str(header))
|
|
logging.info("请求参数: %s" % str(data_random))
|
|
with allure.step("GET请求接口"):
|
|
allure.attach(name="请求地址", body=url)
|
|
allure.attach(name="请求头", body=str(header))
|
|
allure.attach(name="请求参数", body=str(data_random))
|
|
response = requests.get(url=url, params=data_random, headers=header, timeout=timeout)
|
|
if response.status_code == 301:
|
|
response = requests.get(url=response.headers["location"])
|
|
try:
|
|
try:
|
|
res = response.json()
|
|
except:
|
|
res = response.text
|
|
logging.info("请求接口结果: %s" % str(res))
|
|
self.rel.locallcache(caches, bodys=data_random, res=res, cookie=response.cookies)
|
|
allure.attach(name="请求结果", body=str(res))
|
|
return res, response.elapsed.total_seconds(), response.status_code
|
|
except Exception as e:
|
|
logging.error(e)
|
|
logging.error(response.text)
|
|
raise
|
|
|
|
def put(self, http, address, caches, header, timeout=8, data=None, files=None, host="host"):
|
|
"""
|
|
put请求
|
|
:param http:
|
|
:param host:
|
|
:param address:
|
|
:param caches: 关联情况
|
|
- cachefrom: 'body' # response : 从结果中获取 body : 从参数中获取
|
|
path: '$.code' # body如果是 "id=2&path=haha" 会转换成字典 然后根据path使用jsonpath取值
|
|
name: 'code'
|
|
:param header: 请求头
|
|
|
|
:param timeout: 超时时间
|
|
:param data: 请求参数
|
|
:param files: 文件路径
|
|
:return:
|
|
"""
|
|
iniaddress = replace_random(address, param=data["urlparam"])
|
|
url = http + "://" + host_manage(hos='${{{}}}$'.format(host)) + iniaddress
|
|
logging.info("请求地址:%s" % "" + url)
|
|
logging.info("请求方法:PUT")
|
|
logging.info("请求头: %s" % str(header))
|
|
data_random = self.iniDatas(data["param"])
|
|
logging.info("请求参数: %s" % str(data_random))
|
|
with allure.step("PUT请求接口"):
|
|
allure.attach(name="请求地址", body=url)
|
|
allure.attach(name="请求头", body=str(header))
|
|
allure.attach(name="请求参数", body=str(data_random))
|
|
if 'application/json' in header.values():
|
|
if data_random:
|
|
data_random = json.loads(data_random)
|
|
response = requests.put(url=url, json=data_random, headers=header, timeout=timeout, files=files)
|
|
elif 'application/xml' in str(header.values()) or 'text/xml' in str(header.values()):
|
|
if data_random:
|
|
data_random = json.loads(data_random)
|
|
xmldata = xmltodict.unparse({'xml': data_random})
|
|
# dict转成xml
|
|
response = requests.post(url=url, data=xmldata, headers=header, timeout=timeout)
|
|
else:
|
|
response = requests.put(url=url, data=data_random, headers=header, timeout=timeout, files=files)
|
|
try:
|
|
try:
|
|
res = response.json()
|
|
except:
|
|
res = response.text
|
|
logging.info("请求接口结果: %s" % str(res))
|
|
self.rel.locallcache(caches, bodys=data_random, res=res)
|
|
allure.attach(name="请求结果", body=str(res))
|
|
return res, response.elapsed.total_seconds(), response.status_code
|
|
except Exception as e:
|
|
logging.error(e)
|
|
logging.error(response.text)
|
|
raise
|
|
|
|
def delete(self, http, address, caches, header, data, timeout=8, host="host"):
|
|
"""
|
|
get请求
|
|
:param http:
|
|
:param host:
|
|
:param caches: 关联情况
|
|
- cachefrom: 'body' # response : 从结果中获取 body : 从参数中获取
|
|
path: '$.code' # body如果是 "id=2&path=haha" 会转换成字典 然后根据path使用jsonpath取值
|
|
name: 'code'
|
|
:param address:
|
|
:param header: 请求头
|
|
:param data: 请求参数
|
|
:param timeout: 超时时间
|
|
:return:
|
|
"""
|
|
iniaddress = replace_random(address, param=data["urlparam"])
|
|
data_random = self.iniDatas(data["param"])
|
|
url = http + "://" + host_manage(hos='${{{}}}$'.format(host)) + iniaddress
|
|
logging.info("请求地址:%s" % "" + url)
|
|
logging.info("请求方法:DELETE")
|
|
logging.info("请求头: %s" % str(header))
|
|
logging.info("请求参数: %s" % str(data_random))
|
|
with allure.step("DELETE请求接口"):
|
|
allure.attach(name="请求地址", body=url)
|
|
allure.attach(name="请求头", body=str(header))
|
|
allure.attach(name="请求参数", body=str(data_random))
|
|
response = requests.delete(url=url, params=data_random, headers=header, timeout=timeout)
|
|
|
|
try:
|
|
try:
|
|
res = response.json()
|
|
except:
|
|
res = response.text
|
|
logging.info("请求接口结果: %s" % str(res))
|
|
self.rel.locallcache(caches, bodys=data_random, res=res, cookie=response.cookies)
|
|
allure.attach(name="请求结果", body=str(res))
|
|
return res, response.elapsed.total_seconds(), response.status_code
|
|
except Exception as e:
|
|
logging.error(e)
|
|
logging.error(response.text)
|
|
raise
|
|
|
|
def patch(self, http, address, caches, header, timeout=8, data=None, files=None, host="host"):
|
|
"""
|
|
|
|
:param http:
|
|
:param address:
|
|
:param caches:
|
|
:param header:
|
|
:param timeout:
|
|
:param data:
|
|
:param files:
|
|
:param host:
|
|
:return:
|
|
"""
|
|
iniaddress = replace_random(address, param=data["urlparam"])
|
|
url = http + "://" + host_manage(hos='${{{}}}$'.format(host)) + iniaddress
|
|
logging.info("请求地址:%s" % "" + url)
|
|
logging.info("请求方法:PATHC")
|
|
logging.info("请求头: %s" % str(header))
|
|
data_random = self.iniDatas(data["param"])
|
|
logging.info("请求参数: %s" % str(data_random))
|
|
with allure.step("PATCH请求接口"):
|
|
allure.attach(name="请求地址", body=url)
|
|
allure.attach(name="请求头", body=str(header))
|
|
allure.attach(name="请求参数", body=str(data_random))
|
|
if 'application/json' in header.values():
|
|
if data_random:
|
|
data_random = json.loads(data_random)
|
|
response = requests.patch(url=url, json=data_random, headers=header, timeout=timeout, files=files)
|
|
else:
|
|
response = requests.patch(url=url, data=data_random, headers=header, timeout=timeout, files=files)
|
|
try:
|
|
try:
|
|
res = response.json()
|
|
except:
|
|
res = response.text
|
|
logging.info("请求接口结果: %s" % str(res))
|
|
allure.attach(name="请求结果", body=str(res))
|
|
self.rel.locallcache(caches, bodys=data_random, res=res, cookie=response.cookies)
|
|
return res, response.elapsed.total_seconds(), response.status_code
|
|
except Exception as e:
|
|
logging.error(e)
|
|
logging.error(response.text)
|
|
raise
|
|
|
|
def __call__(self, http, address, method, headers, data, caches, **kwargs):
|
|
try:
|
|
if method == "post" or method == 'POST':
|
|
return self.post(http=http, address=address, data=data, header=headers, caches=caches, **kwargs)
|
|
elif method == "get" or method == 'GET':
|
|
return self.get(http=http, address=address, data=data, header=headers, caches=caches, **kwargs)
|
|
elif method == "delete" or method == 'DELETE':
|
|
return self.delete(http=http, address=address, data=data, header=headers, caches=caches, **kwargs)
|
|
elif method == "put" or method == 'PUT':
|
|
return self.put(http=http, address=address, data=data, header=headers, caches=caches, **kwargs)
|
|
elif method == "patch" or method == 'PATCH':
|
|
return self.patch(http=http, address=address, data=data, header=headers, caches=caches, **kwargs)
|
|
else:
|
|
raise TypeError(f"请求异常,检查yml文件method")
|
|
except Exception:
|
|
raise
|
|
|
|
|
|
apisend = apiSend()
|
|
# if __name__ == '__main__':
|
|
# h = {
|
|
# "Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJsb2dpbl91c2VyX2tleSI6IjE2OTRmYjlmLTUzNjYtNGZjZS1hODg4LTBlY2UxOThmZThhZSJ9.5_mD4abE-5iHsSr6RB9R8qaIRV7zidUFkpytyyd2cjSiQcrdJvAE_6GjU9Q_Xsr0JmTkSCTiefpFySguyk2E8Q",
|
|
# "Content-Type": "multipart/form-data"
|
|
#
|
|
# }
|
|
# d = {
|
|
# "param": "updateSupport=0",
|
|
# "urlparam": None
|
|
# }
|
|
# p = {'address': '/v1/enter/trade/', 'assert': {'jsonpath': None, 'sqlassert': None, 'time': 2, 'code': 201},
|
|
# 'data': {'param': {'name': '行业名称$RandomString($RandomPosInt(2,6)$)$',
|
|
# 'desc': '备注$RandomString($RandomPosInt(2,8)$)$'}, 'urlparam': None},
|
|
# 'headers': {'Content-Type': 'application/json',
|
|
# 'Authorization': 'GREEN eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoyMiwiZXhwIjoxNjUxODMzNjY4LCJ1c2VybmFtZSI6ImRsMDAxIn0.Dbk1ddEXdmW1tRzxZLvFgJwsh0hek6HJjzCabStdnz0'},
|
|
# 'host': 'host_HB', 'info': '新建行业', 'method': 'POST', 'cache': None, 'relevance': None}
|
|
# ress = apisend(address=p['address'], data=p['data'], method=p['method'], headers=p['headers'],
|
|
# caches=p['cache'], host=p['host'], )
|
|
# print(ress)
|
|
# d =pymysql
|