This commit is contained in:
刘德华 2024-07-13 02:47:07 +08:00
parent eb1ce45c4d
commit b85c7ce9f6
2 changed files with 233 additions and 233 deletions

View File

@ -24,7 +24,7 @@ app.include_router(USER)
app.include_router(LOG)
app.include_router(AUTH)
app.include_router(SYNC_CONFIG)
app.include_router(ISSUES)
# app.include_router(ISSUES)
app.mount("/", StaticFiles(directory="web"), name="static")

View File

@ -1,243 +1,243 @@
import time
# import time
from fastapi import (
BackgroundTasks,
Query,
Depends,
Security,
Body,
requests
)
from typing import Optional
import requests
# from fastapi import (
# BackgroundTasks,
# Query,
# Depends,
# Security,
# Body,
# requests
# )
# from typing import Optional
# import requests
from starlette.exceptions import HTTPException
# from starlette.exceptions import HTTPException
from src.utils.logger import logger
from extras.obfastapi.frame import Trace, DataList
from extras.obfastapi.frame import OBResponse as Response
from src.base.code import Code
from src.base.error_code import ErrorTemplate, Errors
from src.router import PULL_REQUEST as pull_request
# from src.router import ISSUES as issues
from src.api.Controller import APIController as Controller
from src.dto.pull_request import PullRequest as PullRequestData
from src.service.pull_request import PullRequestService
from src.service.sync import ProjectService
from src.utils import github
from src.utils import gitee
# # from src.utils.logger import logger
# from extras.obfastapi.frame import Trace, DataList
# from extras.obfastapi.frame import OBResponse as Response
# from src.base.code import Code
# from src.base.error_code import ErrorTemplate, Errors
# # from src.router import PULL_REQUEST as pull_request
# # from src.router import ISSUES as issues
# from src.api.Controller import APIController as Controller
# from src.dto.pull_request import PullRequest as PullRequestData
# from src.service.pull_request import PullRequestService
# from src.service.sync import ProjectService
# from src.utils import github
# from src.utils import gitee
class PullRequest(Controller):
# class PullRequest(Controller):
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
return super().get_user(cookie_key=cookie_key, token=token)
@pull_request.get("/projects/{name}/pullrequests", response_model=Response[DataList[PullRequestData]], description='列出pull request')
async def list_pull_request(
self,
name: str = Query(..., description='工程名字'),
search: Optional[str] = Query(None, description='搜索内容'),
orderby: Optional[str] = Query(None, description='排序选项')
):
# 项目是否存在
await self._check_project(name)
pull_request_service = PullRequestService()
count = await pull_request_service.count_pull_request(name)
answer = await pull_request_service.fetch_pull_request(name) # 获取所有PR信息从哪里
if not answer:
logger.info(f"The project {name} has no pull request")
answer = []
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=answer)
)
@pull_request.get("/projects/{name}/pullrequests/sync", response_model=Response, description='同步pull request')
async def sync_pull_request(
self,
name: str = Query(..., description='工程名字')
):
# 项目是否存在查sync_project表,同时获取数据库中的一条完整记录
resp = await self._check_project(name)
# 解析 resp 获取需要的信息(仓库地址、名字等)
organization, repo = github.transfer_github_to_name(
resp[0].github_address)
# 如果仓库存在,执行同步操作
if organization and repo:
pull_request_service = PullRequestService()
await pull_request_service.sync_pull_request(name, organization, repo)
else:
# 否则抛出异常
logger.error(f"The pull rquest of project {name} sync failed")
raise Errors.QUERY_FAILD
# 返回响应信息
return Response(
code=Code.SUCCESS,
msg="发送同意请求成功"
)
@pull_request.get("/projects/{name}/pullrequests/{id}/approve", response_model=Response, description='同意一个pull request')
async def approve_pull_request(
self,
name: str = Query(..., description='同步工程名称'),
id: int = Query(..., description='pull request id')
):
if not name or not id:
raise ErrorTemplate.ARGUMENT_LACK()
resp = await self._check_project(name)
organization, repo = github.transfer_github_to_name(
resp[0].github_address)
if organization and repo:
pull_request_service = PullRequestService()
resp = await pull_request_service.approve_pull_request(organization, repo, id)
if not resp:
logger.error(
f"The pull rquest #{id} of project {name} approve failed")
raise Errors.QUERY_FAILD
else:
logger.error(
f"Get the project {name} organization and repo failed")
raise Errors.QUERY_FAILD
return Response(
code=Code.SUCCESS,
msg="发送同意请求成功"
)
@pull_request.get("/projects/{name}/pullrequests/{id}/merge", response_model=Response, description='合并一个pull request')
async def merge_pull_request(
self,
name: str = Query(..., description='同步工程名称'),
id: int = Query(..., description='pull request id')
):
if not name or not id:
raise ErrorTemplate.ARGUMENT_LACK()
resp = await self._check_project(name)
organization, repo = github.transfer_github_to_name(
resp[0].github_address)
if organization and repo:
pull_request_service = PullRequestService()
resp = await pull_request_service.merge_pull_request(organization, repo, id)
if not resp:
logger.error(
f"The pull rquest #{id} of project {name} merge failed")
raise Errors.QUERY_FAILD
else:
logger.error(
f"Get the project {name} organization and repo failed")
raise Errors.QUERY_FAILD
return Response(
code=Code.SUCCESS,
msg="发送合并请求成功"
)
@pull_request.get("/projects/{name}/pullrequests/{id}/close", response_model=Response, description='关闭一个pull request')
async def close_pull_request(
self,
name: str = Query(..., description='同步工程名称'),
id: int = Query(..., description='pull request id')
):
if not name or not id:
raise ErrorTemplate.ARGUMENT_LACK()
resp = await self._check_project(name)
organization, repo = github.transfer_github_to_name(
resp[0].github_address)
if organization and repo:
pull_request_service = PullRequestService()
resp = await pull_request_service.close_pull_request(organization, repo, id)
if not resp:
logger.error(
f"The pull rquest #{id} of project {name} close failed")
raise Errors.QUERY_FAILD
else:
logger.error(
f"Get the project {name} organization and repo failed")
raise Errors.QUERY_FAILD
return Response(
code=Code.SUCCESS,
msg="发送关闭请求成功"
)
@pull_request.get("/projects/{name}/pullrequests/{id}/press", response_model=Response, description='催促一个pull request')
async def press_pull_request(
self,
name: str = Query(..., description='同步工程名称'),
id: int = Query(..., description='pull request id')
):
# await self._check_project(name)
# service = PullRequestService()
# resp = await service.press_pull_request()
# if not resp:
# code = Code.INVALID_PARAMS
# msg = "发送催促请求失败"
# else:
# code = Code.SUCCESS
# msg = "发送催促请求成功"
return Response(
code=Code.SUCCESS,
msg="第二期功能,敬请期待"
)
async def _check_project(self, name: str):
project_service = ProjectService()
resp = await project_service.search_project(name=name) # 从sync_project中查询项目是否存在
if len(resp) == 0:
logger.error(
f"The project {name} is not exist")
raise Errors.QUERY_FAILD
return resp
@pull_request.get("/projects/{name}/pullrequests/", description='获取指定仓库的PR')
async def fetch_pull_request(
self,
domain: str = Query(..., description='平台'),
owner: str = Query(..., description='仓库所有者'),
repo: str = Query(..., description='仓库名称'),
token: str = Query(..., description='用户token'),
state: str = Query(None, description='可选PR的状态'),
):
try:
if 'gitee' in domain:
if state:
url = f'https://gitee.com/api/v5/repos/{owner}/{repo}/pulls?state={state}'
else:
url = f'https://gitee.com/api/v5/repos/{owner}/{repo}/pulls'
elif 'gitlink' in domain:
url = f'https://www.gitlink.org.cn/api/v1/{owner}/{repo}/pulls.json'
headers = {
'Authorization': f'token {token}'
}
response = requests.get(url, headers=headers)
response.raise_for_status() # 如果请求返回了不成功的状态码,就会产生一个 HTTPError 异常。
return response.json()
except requests.HTTPError as errh:
raise HTTPException(status_code=400, detail=str(errh))
except requests.RequestException as err:
raise HTTPException(status_code=500, detail=str(err))
@pull_request.get("/projects/{name}/pullrequests/test", description='测试接口')
async def testInterface(
self,
token: str = Query(..., description='用户授权码'),
owner: str = Query(..., description='仓库所有者'),
repo: str = Query(..., description='仓库名称'),
state: str = Query(None, description='可选PR的状态'),
):
return gitee.fetch_gitee_pull_request(token, owner, repo, state)
# class Issues(Controller):
# def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
# return super().get_user(cookie_key=cookie_key, token=token)
# @issues.get("/fetch/issues", description='获取issues列表')
# async def get_issues(
# @pull_request.get("/projects/{name}/pullrequests", response_model=Response[DataList[PullRequestData]], description='列出pull request')
# async def list_pull_request(
# self,
# owner: str = Query(..., description= '仓库所有者'),
# repo: str = Query(..., description='仓库名称')
# name: str = Query(..., description='工程名字'),
# search: Optional[str] = Query(None, description='搜索内容'),
# orderby: Optional[str] = Query(None, description='排序选项')
# ):
# issue_list = gitee.fetch_gitee_issues(owner, repo)
# return issue_list
# # 项目是否存在
# await self._check_project(name)
# pull_request_service = PullRequestService()
# count = await pull_request_service.count_pull_request(name)
# answer = await pull_request_service.fetch_pull_request(name) # 获取所有PR信息从哪里
# if not answer:
# logger.info(f"The project {name} has no pull request")
# answer = []
# return Response(
# code=Code.SUCCESS,
# data=DataList(total=count, list=answer)
# )
# @pull_request.get("/projects/{name}/pullrequests/sync", response_model=Response, description='同步pull request')
# async def sync_pull_request(
# self,
# name: str = Query(..., description='工程名字')
# ):
# # 项目是否存在查sync_project表,同时获取数据库中的一条完整记录
# resp = await self._check_project(name)
# # 解析 resp 获取需要的信息(仓库地址、名字等)
# organization, repo = github.transfer_github_to_name(
# resp[0].github_address)
# # 如果仓库存在,执行同步操作
# if organization and repo:
# pull_request_service = PullRequestService()
# await pull_request_service.sync_pull_request(name, organization, repo)
# else:
# # 否则抛出异常
# logger.error(f"The pull rquest of project {name} sync failed")
# raise Errors.QUERY_FAILD
# # 返回响应信息
# return Response(
# code=Code.SUCCESS,
# msg="发送同意请求成功"
# )
# @pull_request.get("/projects/{name}/pullrequests/{id}/approve", response_model=Response, description='同意一个pull request')
# async def approve_pull_request(
# self,
# name: str = Query(..., description='同步工程名称'),
# id: int = Query(..., description='pull request id')
# ):
# if not name or not id:
# raise ErrorTemplate.ARGUMENT_LACK()
# resp = await self._check_project(name)
# organization, repo = github.transfer_github_to_name(
# resp[0].github_address)
# if organization and repo:
# pull_request_service = PullRequestService()
# resp = await pull_request_service.approve_pull_request(organization, repo, id)
# if not resp:
# logger.error(
# f"The pull rquest #{id} of project {name} approve failed")
# raise Errors.QUERY_FAILD
# else:
# logger.error(
# f"Get the project {name} organization and repo failed")
# raise Errors.QUERY_FAILD
# return Response(
# code=Code.SUCCESS,
# msg="发送同意请求成功"
# )
# @pull_request.get("/projects/{name}/pullrequests/{id}/merge", response_model=Response, description='合并一个pull request')
# async def merge_pull_request(
# self,
# name: str = Query(..., description='同步工程名称'),
# id: int = Query(..., description='pull request id')
# ):
# if not name or not id:
# raise ErrorTemplate.ARGUMENT_LACK()
# resp = await self._check_project(name)
# organization, repo = github.transfer_github_to_name(
# resp[0].github_address)
# if organization and repo:
# pull_request_service = PullRequestService()
# resp = await pull_request_service.merge_pull_request(organization, repo, id)
# if not resp:
# logger.error(
# f"The pull rquest #{id} of project {name} merge failed")
# raise Errors.QUERY_FAILD
# else:
# logger.error(
# f"Get the project {name} organization and repo failed")
# raise Errors.QUERY_FAILD
# return Response(
# code=Code.SUCCESS,
# msg="发送合并请求成功"
# )
# @pull_request.get("/projects/{name}/pullrequests/{id}/close", response_model=Response, description='关闭一个pull request')
# async def close_pull_request(
# self,
# name: str = Query(..., description='同步工程名称'),
# id: int = Query(..., description='pull request id')
# ):
# if not name or not id:
# raise ErrorTemplate.ARGUMENT_LACK()
# resp = await self._check_project(name)
# organization, repo = github.transfer_github_to_name(
# resp[0].github_address)
# if organization and repo:
# pull_request_service = PullRequestService()
# resp = await pull_request_service.close_pull_request(organization, repo, id)
# if not resp:
# logger.error(
# f"The pull rquest #{id} of project {name} close failed")
# raise Errors.QUERY_FAILD
# else:
# logger.error(
# f"Get the project {name} organization and repo failed")
# raise Errors.QUERY_FAILD
# return Response(
# code=Code.SUCCESS,
# msg="发送关闭请求成功"
# )
# @pull_request.get("/projects/{name}/pullrequests/{id}/press", response_model=Response, description='催促一个pull request')
# async def press_pull_request(
# self,
# name: str = Query(..., description='同步工程名称'),
# id: int = Query(..., description='pull request id')
# ):
# # await self._check_project(name)
# # service = PullRequestService()
# # resp = await service.press_pull_request()
# # if not resp:
# # code = Code.INVALID_PARAMS
# # msg = "发送催促请求失败"
# # else:
# # code = Code.SUCCESS
# # msg = "发送催促请求成功"
# return Response(
# code=Code.SUCCESS,
# msg="第二期功能,敬请期待"
# )
# async def _check_project(self, name: str):
# project_service = ProjectService()
# resp = await project_service.search_project(name=name) # 从sync_project中查询项目是否存在
# if len(resp) == 0:
# logger.error(
# f"The project {name} is not exist")
# raise Errors.QUERY_FAILD
# return resp
# @pull_request.get("/projects/{name}/pullrequests/", description='获取指定仓库的PR')
# async def fetch_pull_request(
# self,
# domain: str = Query(..., description='平台'),
# owner: str = Query(..., description='仓库所有者'),
# repo: str = Query(..., description='仓库名称'),
# token: str = Query(..., description='用户token'),
# state: str = Query(None, description='可选PR的状态'),
# ):
# try:
# if 'gitee' in domain:
# if state:
# url = f'https://gitee.com/api/v5/repos/{owner}/{repo}/pulls?state={state}'
# else:
# url = f'https://gitee.com/api/v5/repos/{owner}/{repo}/pulls'
# elif 'gitlink' in domain:
# url = f'https://www.gitlink.org.cn/api/v1/{owner}/{repo}/pulls.json'
# headers = {
# 'Authorization': f'token {token}'
# }
# response = requests.get(url, headers=headers)
# response.raise_for_status() # 如果请求返回了不成功的状态码,就会产生一个 HTTPError 异常。
# return response.json()
# except requests.HTTPError as errh:
# raise HTTPException(status_code=400, detail=str(errh))
# except requests.RequestException as err:
# raise HTTPException(status_code=500, detail=str(err))
# @pull_request.get("/projects/{name}/pullrequests/test", description='测试接口')
# async def testInterface(
# self,
# token: str = Query(..., description='用户授权码'),
# owner: str = Query(..., description='仓库所有者'),
# repo: str = Query(..., description='仓库名称'),
# state: str = Query(None, description='可选PR的状态'),
# ):
# return gitee.fetch_gitee_pull_request(token, owner, repo, state)
# # class Issues(Controller):
# # def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
# # return super().get_user(cookie_key=cookie_key, token=token)
# # @issues.get("/fetch/issues", description='获取issues列表')
# # async def get_issues(
# # self,
# # owner: str = Query(..., description= '仓库所有者'),
# # repo: str = Query(..., description='仓库名称')
# # ):
# # issue_list = gitee.fetch_gitee_issues(owner, repo)
# # return issue_list