migrate_issue/gitlink_api/project_api.py

219 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os.path
from requests_toolbelt import MultipartEncoder
from requests import request
from loguru import logger
class ProjectApi:
def __init__(self, host, project=None, cookies=None):
self.host = host
self.project = project
self.cookies = cookies
def get_issue_list(self, **kwargs):
"""
获取疑修列表信息
"""
try:
url = self.host + f"/api/v1/{self.project}/issues?participant_category=all&category=all&limit={kwargs.get('limit')}&page={kwargs.get('page')}"
method = "GET"
response = request(url=url, method=method, timeout=5)
# logger.debug(f"获取issue列表接口的响应数据是{response.json()}")
return response.json()
except Exception as e:
logger.error(f"获取疑修列表信息:{e}")
return {"error": f"{e}"}
def get_issue_detail(self, **kwargs):
"""
获取易修详情页数据
"""
try:
url = self.host + f"/api/v1/{self.project}/issues/{kwargs.get('issue_id')}"
method = "GET"
response = request(url=url, method=method, timeout=5)
# logger.debug(f"获取issue详情接口的响应数据是{response.json()}")
return response.json()
except Exception as e:
logger.error(f"获取易修详情页数据:{e}")
return {"error": f"{e}"}
def get_issue_journals_detail(self, **kwargs):
"""
获取易修评论详情
"""
try:
url = self.host + f"/api/v1/{self.project}/issues/{kwargs.get('issue_id')}/journals?category=comment&page={kwargs.get('page')}&limit={kwargs.get('limit')}"
method = "GET"
response = request(url=url, method=method, timeout=5)
# logger.debug(f"获取易修评论详情接口的响应数据是:{response.json()}")
return response.json()
except Exception as e:
logger.error(f"获取易修评论详情:{e}")
return {"error": f"{e}"}
def new_issue(self, **kwargs):
"""
新建疑修
payload = {
"subject": "",
"description": "",
"branch_name": "",
"status_id": "",
"priority_id": "",
"milestone_id": "",
"issue_tag_ids": "",
"assigner_ids": "",
"attachment_ids": "",
"start_date": ""
"due_date":""
"receivers_login": []
# }
"""
try:
url = self.host + f"/api/v1/{self.project}/issues"
method = "POST"
cookies = self.cookies
data = kwargs.get('payload')
response = request(url=url, method=method, cookies=cookies, json=data, timeout=5)
logger.debug(f"新建疑修接口的响应数据是:{response.json()}")
return response.json()
except Exception as e:
logger.error(f"新建疑修报错:{e}")
return {"error": f"{e}"}
def new_issue_journals(self, **kwargs):
"""
新建疑修评论
"""
try:
url = self.host + f"/api/v1/{self.project}/issues/{kwargs.get('issue_id')}/journals"
method = "POST"
cookies = self.cookies
data = kwargs.get('payload')
response = request(url=url, method=method, cookies=cookies, json=data, timeout=5)
logger.debug(f"新建疑修评论接口的数据是:{data}")
logger.debug(f"新建疑修评论接口的响应数据是:{response.json()}")
return response.json()
except Exception as e:
logger.error(f"新建疑修评论报错:{e}")
return {"error": f"{e}"}
def new_issue_journals_reply(self, **kwargs):
"""
新建疑修评论回复(二级评论)
payload = {"parent_id": 271911,"reply_id": 272004,"notes": "wwww","receivers_login": []}
"""
try:
url = self.host + f"/api/v1/{self.project}/issues/{kwargs.get('issue_id')}/journals"
method = "POST"
cookies = self.cookies
data = kwargs.get('payload')
response = request(url=url, method=method, cookies=cookies, json=data, timeout=5)
logger.debug(f"新建疑修评论接口的响应数据是:{response.json()}")
return response.json()
except Exception as e:
logger.error(f"新建疑修评论报错:{e}")
return {"error": f"{e}"}
def new_milestone(self, **kwargs):
"""
新建里程碑
payload ={name: "1", description: "1", effective_date: "2023-04-17", status: "open"}
"""
try:
url = self.host + f"/api/{self.project}/milestones.json"
method = "POST"
cookies = self.cookies
data = kwargs.get('payload')
response = request(url=url, method=method, cookies=cookies, json=data, timeout=5)
logger.debug(f"新建里程碑接口响应数据是:{response.json()}")
return response.json()
except Exception as e:
logger.error(f"新建里程碑接口报错:{e}")
return {"error": f"{e}"}
def new_issue_tags(self, **kwargs):
"""
新建项目标记
payload ={color: "#9e804a", description: "2", name: "2"}
"""
try:
url = self.host + f"/api/v1/{self.project}/issue_tags"
method = "POST"
cookies = self.cookies
data = kwargs.get('payload')
response = request(url=url, method=method, cookies=cookies, json=data, timeout=5)
logger.debug(f"新建项目标记接口响应数据是:{response.json()}")
return response.json()
except Exception as e:
logger.error(f"新建项目标记接口报错:{e}")
return {"error": f"{e}"}
def get_milestone_list(self, **kwargs):
"""
获取里程碑列表
"""
try:
url = self.host + f"/api/v1/{self.project}/milestones.json?page={kwargs.get('page')}&limit={kwargs.get('limit')}&category={kwargs.get('category')}"
method = "GET"
response = request(url=url, method=method, timeout=5)
# logger.debug(f"获取里程碑列表接口响应数据是:{response.json()}")
return response.json()
except Exception as e:
logger.error(f"获取里程碑列表接口报错:{e}")
return {"error": f"{e}"}
def get_issue_tags_list(self, **kwargs):
"""
获取项目标记列表
"""
try:
url = self.host + f"/api/v1/{self.project}/issue_tags?page={kwargs.get('page')}&limit={kwargs.get('limit')}"
method = "GET"
cookies = self.cookies
response = request(url=url, method=method, cookies=cookies, timeout=5)
# logger.debug(f"获取项目标记列表接口响应数据是:{response.json()}")
return response.json()
except Exception as e:
logger.error(f"获取项目标记列表接口报错:{e}")
return {"error": f"{e}"}
def upload_attachment(self, **kwargs):
"""
上传附件接口
请求地址:/api/attachments.json
请求方法POST
请求参数data = MultipartEncoder(fields={'file': ('filename', open(r'C:\0_project_chy\autotest\Files\1.png', 'rb'), 'image/png')})
content_type: multipart/form-data; boundary=----WebKitFormBoundaryRiL5fwKYjghMMaGc
{"Content_Type": "multipart/form-data"}
"""
try:
url = self.host + "/api/attachments.json"
method = "POST"
data = MultipartEncoder(fields={"file": (kwargs.get('filename'), open(kwargs.get("file"), "rb"))})
headers = {
"Referer": self.host + f"{self.project}/issues/new",
"Content-type": data.content_type
}
response = request(url=url, method=method, data=data, headers=headers, cookies=self.cookies, timeout=5)
json_data = response.json()
if "id" in json_data.keys() and "filesize" in json_data.keys():
logger.info(f"{kwargs.get('file')} || 上传附件成功!{json_data}")
return response.json()
else:
logger.info(f"上传附件失败!{json_data}")
except Exception as e:
logger.error(f"上传附件接口报错:{e}")
if __name__ == '__main__':
from gitlink_api.login_api import Login
cookies = Login("https://testforgeplus.trustie.net").login_api(user="floraachy", pwd="12345678").cookies
api = ProjectApi("https://testforgeplus.trustie.net", "floraachy/lalalalaa", cookies)
print(api.upload_attachment(filename="project_api.py", file=os.path.abspath(__file__)))