forked from Lesin/reposync
. #10
|
@ -1,4 +1,4 @@
|
|||
--同步仓库信息映射表
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `sync_repo_mapping` (
|
||||
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
|
||||
|
@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS `sync_repo_mapping` (
|
|||
UNIQUE KEY (`repo_name`)
|
||||
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '同步仓库映射表';
|
||||
|
||||
--同步分支信息映射表
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `sync_branch_mapping`(
|
||||
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
|
||||
|
@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS `sync_branch_mapping`(
|
|||
PRIMARY KEY (`id`)
|
||||
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '同步分支映射表';
|
||||
|
||||
--日志信息表
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `repo_sync_log`(
|
||||
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
|
||||
|
|
687
src/api/Sync.py
687
src/api/Sync.py
|
@ -32,297 +32,414 @@ from src.service.log import LogService
|
|||
from src.utils import github, gitlab, gitee, gitcode, gitlink
|
||||
|
||||
|
||||
class Project(Controller):
|
||||
class Project(Controller):
|
||||
|
||||
# 定义一个get_user方法,用于通过cookie_key和token获取用户信息
|
||||
# 这里假设Controller基类有一个get_user方法用于实际处理用户信息的获取
|
||||
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
|
||||
# 调用基类的get_user方法,传入cookie_key和token作为参数
|
||||
return super().get_user(cookie_key=cookie_key, token=token)
|
||||
|
||||
# 定义一个异步的get_project方法,用于通过工程名(可选)获取同步工程列表
|
||||
# 使用@project.get装饰器(注意:这里应该是@user.get或其他实际定义的router实例,除非已经定义了project router)
|
||||
# 假设这里是一个错误,并且应该使用正确定义的router(如@user.get或@project_router.get)
|
||||
@project.get("", response_model=Response[DataList[ProjectData]], description='通过工程名获取一个同步工程列表')
|
||||
async def get_project(
|
||||
self,
|
||||
# 定义一个可选的search参数,用于搜索同步工程
|
||||
search: Optional[str] = Query(None, description='同步工程搜索内容,如果未提供则获取所有工程'),
|
||||
# 定义一个可选的orderby参数,用于指定排序选项(但当前方法体未使用此参数)
|
||||
orderby: Optional[str] = Query(None, description='排序选项,当前未实现排序功能'),
|
||||
# 分页参数,默认为第1页
|
||||
pageNum: Optional[int] = Query(1, description="Page number,默认为1"),
|
||||
# 分页大小,默认为每页10条记录
|
||||
pageSize: Optional[int] = Query(10, description="Page size,默认为10")
|
||||
):
|
||||
# 实例化ProjectService类,用于处理与项目相关的业务逻辑
|
||||
service = ProjectService()
|
||||
# 根据是否提供了search参数来决定是获取所有项目的计数和列表,还是通过搜索获取
|
||||
if search is None:
|
||||
# 如果未提供search参数,则获取所有项目的总数和分页列表
|
||||
count = await service.get_count()
|
||||
answer = await service.list_projects(page=pageNum, size=pageSize)
|
||||
else:
|
||||
# 如果提供了search参数,则根据搜索内容获取匹配项目的总数和列表
|
||||
# 注意:这里假设search参数中的空格被去除了,以适应某些搜索逻辑
|
||||
count = await service.get_count_by_search(search.replace(" ", ""))
|
||||
# 注意:search_project方法的调用可能需要根据实际服务方法进行调整
|
||||
# 这里假设它接受一个name参数并返回搜索结果
|
||||
answer = await service.search_project(name=search.replace(" ", ""))
|
||||
# 如果answer为None,表示获取项目列表失败
|
||||
if answer is None:
|
||||
# 记录错误日志
|
||||
logger.error(f"The project list fetch failed")
|
||||
# 抛出一个自定义的异常,表示查询失败
|
||||
raise Errors.QUERY_FAILD
|
||||
# 返回一个包含成功状态和项目数据的Response对象
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=DataList(total=count, list=answer)
|
||||
)
|
||||
|
||||
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
|
||||
return super().get_user(cookie_key=cookie_key, token=token)
|
||||
|
||||
@project.get("", response_model=Response[DataList[ProjectData]], description='通过工程名获取一个同步工程')
|
||||
async def get_project(
|
||||
self,
|
||||
search: Optional[str] = Query(None, description='同步工程搜索内容'),
|
||||
orderby: Optional[str] = Query(None, description='排序选项'),
|
||||
pageNum: Optional[int] = Query(1, description="Page number"),
|
||||
pageSize: Optional[int] = Query(10, description="Page size")
|
||||
):
|
||||
# search
|
||||
service = ProjectService()
|
||||
if search is None:
|
||||
count = await service.get_count()
|
||||
answer = await service.list_projects(page=pageNum, size=pageSize)
|
||||
else:
|
||||
count = await service.get_count_by_search(search.replace(" ", ""))
|
||||
answer = await service.search_project(name=search.replace(" ", ""))
|
||||
if answer is None:
|
||||
logger.error(f"The project list fetch failed")
|
||||
raise Errors.QUERY_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=DataList(total=count, list=answer)
|
||||
@project.post("", response_model=Response[ProjectData], description='创建一个同步工程')
|
||||
async def create_project(
|
||||
self,
|
||||
# 定义一个CreateProjectItem类型的item参数,通过Body(...)表示这是请求体中的数据,且为必填项
|
||||
item: CreateProjectItem = Body(..., description='同步工程属性')
|
||||
):
|
||||
# 前置检查
|
||||
# 检查请求体中的item是否为空
|
||||
if not item:
|
||||
# 如果为空,则抛出参数缺失的异常
|
||||
raise ErrorTemplate.ARGUMENT_LACK("请求体")
|
||||
# 检查工程名是否缺失
|
||||
if not item.name:
|
||||
# 如果工程名缺失,则抛出参数缺失的异常
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
# 对GitHub、GitLab、Gitee、CodeChina仓库地址进行有效性检查
|
||||
if item.github_address:
|
||||
if not github.check_github_address(item.github_address):
|
||||
# 如果GitHub仓库地址无效,则抛出参数错误异常
|
||||
raise ErrorTemplate.TIP_ARGUMENT_ERROR("GitHub仓库")
|
||||
if item.gitlab_address:
|
||||
if not gitlab.check_gitlab_address(item.gitlab_address):
|
||||
# 如果GitLab仓库地址无效,则抛出参数错误异常
|
||||
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitlab/Antcode仓库")
|
||||
if item.gitee_address:
|
||||
if not gitee.check_gitee_address(item.gitee_address):
|
||||
# 如果Gitee仓库地址无效,则抛出参数错误异常
|
||||
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitee仓库")
|
||||
if item.code_china_address:
|
||||
if not gitcode.check_gitcode_address(item.code_china_address):
|
||||
# 如果CodeChina仓库地址无效,则抛出参数错误异常
|
||||
raise ErrorTemplate.TIP_ARGUMENT_ERROR("CodeChina仓库")
|
||||
# 注意:Gitlink仓库地址的检查被注释掉了,如果需要可以取消注释
|
||||
|
||||
# 实例化ProjectService类,用于处理与项目相关的业务逻辑
|
||||
service = ProjectService()
|
||||
# 调用ProjectService的insert_project方法插入项目,并等待结果
|
||||
resp = await service.insert_project(item)
|
||||
# 检查插入结果是否为空
|
||||
if not resp:
|
||||
# 如果为空,则记录错误日志并抛出插入失败的异常
|
||||
logger.error(f"The project insert failed")
|
||||
raise Errors.INSERT_FAILD
|
||||
|
||||
# 如果GitHub仓库地址存在,则提取组织名和仓库名
|
||||
organization, repo = github.transfer_github_to_name(item.github_address)
|
||||
|
||||
# 如果成功提取了组织名和仓库名,则创建异步任务来同步Pull Request
|
||||
if organization and repo:
|
||||
pull_request_service = PullRequestService()
|
||||
# 创建一个异步任务来调用PullRequestService的sync_pull_request方法
|
||||
task = asyncio.create_task(pull_request_service.sync_pull_request(item.name, organization, repo))
|
||||
# 注意:这里只是创建了任务,并没有等待它完成。如果需要等待,可以添加await task
|
||||
|
||||
# 返回一个包含成功状态、响应数据和消息的Response对象
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=resp,
|
||||
msg="创建同步工程成功"
|
||||
)
|
||||
|
||||
@ project.post("", response_model=Response[ProjectData], description='创建一个同步工程')
|
||||
async def create_project(
|
||||
self,
|
||||
item: CreateProjectItem = Body(..., description='同步工程属性')
|
||||
):
|
||||
# pre check
|
||||
if not item:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("请求体")
|
||||
if not item.name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if item.github_address:
|
||||
if not github.check_github_address(item.github_address):
|
||||
raise ErrorTemplate.TIP_ARGUMENT_ERROR("GitHub仓库")
|
||||
if item.gitlab_address:
|
||||
if not gitlab.check_gitlab_address(item.gitlab_address):
|
||||
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitlab/Antcode仓库")
|
||||
if item.gitee_address:
|
||||
if not gitee.check_gitee_address(item.gitee_address):
|
||||
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitee仓库")
|
||||
if item.code_china_address:
|
||||
if not gitcode.check_gitcode_address(item.code_china_address):
|
||||
raise ErrorTemplate.TIP_ARGUMENT_ERROR("CodeChina仓库")
|
||||
# if item.gitlink_address:
|
||||
# if not gitlink.check_gitlink_address(item.gitlink_address):
|
||||
# raise ErrorTemplate.ARGUMENT_ERROR("Gitlink仓库")
|
||||
|
||||
service = ProjectService()
|
||||
resp = await service.insert_project(item)
|
||||
if not resp:
|
||||
logger.error(f"The project insert failed")
|
||||
raise Errors.INSERT_FAILD
|
||||
organization, repo = github.transfer_github_to_name(
|
||||
item.github_address)
|
||||
|
||||
if organization and repo:
|
||||
pull_request_service = PullRequestService()
|
||||
task = asyncio.create_task(
|
||||
pull_request_service.sync_pull_request(item.name, organization, repo))
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=resp,
|
||||
msg="创建同步工程成功"
|
||||
)
|
||||
|
||||
@ project.delete("", response_model=Response, description='通过id删除一个同步工程')
|
||||
async def delete_project(
|
||||
self,
|
||||
id: int = Query(..., description='同步工程id')
|
||||
):
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("id")
|
||||
# if delete the project, the front page double check firstly
|
||||
project_service = ProjectService()
|
||||
project = await project_service.search_project(id=id)
|
||||
name = project[0].name
|
||||
# delete pull request
|
||||
pull_request_service = PullRequestService()
|
||||
resp = await pull_request_service.fetch_pull_request(name)
|
||||
if resp:
|
||||
if len(resp) > 0:
|
||||
for pr in resp:
|
||||
await pull_request_service.delete_pull_request(pr.id)
|
||||
# delete sync job
|
||||
job_service = JobService()
|
||||
resp = await job_service.list_jobs(project=name)
|
||||
if not resp:
|
||||
pass
|
||||
else:
|
||||
for item in resp:
|
||||
await job_service.delete_job(item.id)
|
||||
# delete sync project
|
||||
resp = await project_service.delete_project(id)
|
||||
if not resp:
|
||||
logger.error(f"The project #{id} delete failed")
|
||||
raise Errors.DELETE_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="删除同步工程成功"
|
||||
# 定义一个异步的delete_project方法,用于通过id删除一个同步工程
|
||||
# 使用@project.delete装饰器(注意:这里project应该是某个router的实例)
|
||||
@project.delete("", response_model=Response, description='通过id删除一个同步工程')
|
||||
async def delete_project(
|
||||
self,
|
||||
# 定义一个int类型的id参数,通过Query(...)表示这是查询参数,且为必填项
|
||||
id: int = Query(..., description='同步工程id')
|
||||
):
|
||||
# 检查id是否有效
|
||||
if not id:
|
||||
# 如果id无效(如为空),则抛出参数缺失的异常
|
||||
raise ErrorTemplate.ARGUMENT_LACK("id")
|
||||
|
||||
# 实例化ProjectService类,用于处理与项目相关的业务逻辑
|
||||
project_service = ProjectService()
|
||||
|
||||
# 搜索指定ID的项目
|
||||
project = await project_service.search_project(id=id)
|
||||
# 假设search_project返回的是一个列表,这里只取第一个元素(假设ID是唯一的)
|
||||
# 并获取项目的名称,用于后续操作
|
||||
name = project[0].name
|
||||
|
||||
# 实例化PullRequestService类,用于处理与Pull Request相关的业务逻辑
|
||||
pull_request_service = PullRequestService()
|
||||
|
||||
# 尝试获取与该项目相关的所有Pull Request
|
||||
resp = await pull_request_service.fetch_pull_request(name)
|
||||
# 如果存在Pull Request,则遍历并删除它们
|
||||
if resp:
|
||||
if len(resp) > 0:
|
||||
for pr in resp:
|
||||
await pull_request_service.delete_pull_request(pr.id)
|
||||
|
||||
# 实例化JobService类,用于处理与同步任务相关的业务逻辑
|
||||
job_service = JobService()
|
||||
|
||||
# 列出与该项目相关的所有同步任务
|
||||
resp = await job_service.list_jobs(project=name)
|
||||
# 遍历并删除这些同步任务
|
||||
if resp:
|
||||
for item in resp:
|
||||
await job_service.delete_job(item.id)
|
||||
|
||||
# 最后,删除同步工程本身
|
||||
resp = await project_service.delete_project(id)
|
||||
# 检查删除操作是否成功
|
||||
if not resp:
|
||||
# 如果删除失败,则记录错误日志并抛出删除失败的异常
|
||||
logger.error(f"The project #{id} delete failed")
|
||||
raise Errors.DELETE_FAILD
|
||||
|
||||
# 如果一切顺利,则返回成功响应
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="删除同步工程成功"
|
||||
)
|
||||
|
||||
|
||||
class Job(Controller):
|
||||
|
||||
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
|
||||
return super().get_user(cookie_key=cookie_key, token=token)
|
||||
|
||||
@ job.get("/projects/{name}/jobs", response_model=Response[DataList[JobData]], description='列出所有同步流')
|
||||
async def list_jobs(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'),
|
||||
search: Optional[str] = Query(None, description='同步工程搜索内容'),
|
||||
source: Optional[str] = Query(None, description='分支来源'),
|
||||
pageNum: Optional[int] = Query(1, description="Page number"),
|
||||
pageSize: Optional[int] = Query(10, description="Page size")
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
service = JobService()
|
||||
if search is not None:
|
||||
search = search.replace(" ", "")
|
||||
answer = await service.list_jobs(project=name, search=search, source=source, page=pageNum, size=pageSize)
|
||||
if not answer:
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=DataList(total=0, list=[]),
|
||||
msg="没有同步流"
|
||||
)
|
||||
count = await service.count_job(project=name, search=search, source=source)
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=DataList(total=count, list=answer),
|
||||
msg="查询同步流成功"
|
||||
class Job(Controller):
|
||||
|
||||
# 这是一个继承自Controller的类方法,用于根据cookie或token获取用户信息。
|
||||
# 注意:这里的Security和Controller.API_KEY_BUC_COOKIE可能需要具体实现或定义。
|
||||
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
|
||||
return super().get_user(cookie_key=cookie_key, token=token)
|
||||
|
||||
# 使用FastAPI的路由装饰器定义了一个GET请求的处理函数,用于列出同步流。
|
||||
# 注意:这里的job应该是某个router的实例,但在这个上下文中,我们直接使用@get装饰器。
|
||||
# 假设已经有一个全局的router实例或者这里是一个简化的例子。
|
||||
@staticmethod # 如果这个方法不依赖于类的实例状态,可以考虑使用@staticmethod
|
||||
@app.get("/projects/{name}/jobs", response_model=Response[DataList[JobData]], description='列出所有同步流')
|
||||
async def list_jobs(
|
||||
# 这里的self参数在@staticmethod或路由装饰器中通常是不需要的,除非它是一个类方法或实例方法。
|
||||
# 但由于我们假设这是一个路由处理函数,所以不需要self。
|
||||
name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
|
||||
search: Optional[str] = Query(None, description='同步工程搜索内容'), # 可选的搜索内容
|
||||
source: Optional[str] = Query(None, description='分支来源'), # 可选的分支来源
|
||||
pageNum: Optional[int] = Query(1, description="Page number"), # 页码,默认为1
|
||||
pageSize: Optional[int] = Query(10, description="Page size") # 每页大小,默认为10
|
||||
):
|
||||
# 检查同步工程名是否提供
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
|
||||
# 实例化JobService类,用于处理与同步任务相关的业务逻辑
|
||||
service = JobService()
|
||||
|
||||
# 如果提供了搜索内容,则去除其中的空格
|
||||
if search is not None:
|
||||
search = search.replace(" ", "")
|
||||
|
||||
# 调用服务层的list_jobs方法获取同步流列表
|
||||
answer = await service.list_jobs(project=name, search=search, source=source, page=pageNum, size=pageSize)
|
||||
|
||||
# 如果没有找到同步流,则返回相应的响应
|
||||
if not answer:
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=DataList(total=0, list=[]), # DataList可能是一个自定义的Pydantic模型,用于封装分页数据
|
||||
msg="没有同步流"
|
||||
)
|
||||
|
||||
# 调用服务层的count_job方法获取符合条件的同步流总数
|
||||
count = await service.count_job(project=name, search=search, source=source)
|
||||
|
||||
# 返回包含同步流列表和总数的响应
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=DataList(total=count, list=answer),
|
||||
msg="查询同步流成功"
|
||||
)
|
||||
|
||||
@ job.post("/projects/{name}/jobs", response_model=Response[JobData], description='创建一个同步流')
|
||||
async def create_job(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'),
|
||||
item: CreateJobItem = Body(..., description='同步流属性')
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not item:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("JSON")
|
||||
if not item.type:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("分支同步类型")
|
||||
|
||||
service = JobService()
|
||||
ans = await service.create_job(name, item)
|
||||
if not ans:
|
||||
logger.error(f"Create a job of project #{name} failed")
|
||||
raise Errors.INSERT_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=ans,
|
||||
msg="创建同步流成功"
|
||||
|
||||
# 注意:这里的@job.post可能是个错误,因为通常我们会使用FastAPI的@app.post或某个router实例的.post()
|
||||
# 除非job是一个已经定义并配置好的router实例,否则这里应该是@app.post或其他router实例的装饰器。
|
||||
# 这里我们假设job是一个有效的router实例。
|
||||
|
||||
# 创建一个同步流的异步方法
|
||||
@job.post("/projects/{name}/jobs", response_model=Response[JobData], description='创建一个同步流')
|
||||
async def create_job(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
|
||||
item: CreateJobItem = Body(..., description='同步流属性') # 同步流的详细属性,作为请求体
|
||||
):
|
||||
# 检查同步工程名是否提供
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
# 检查请求体中是否包含同步流属性
|
||||
if not item:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("JSON")
|
||||
# 检查同步流属性中是否包含类型
|
||||
if not item.type:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("分支同步类型")
|
||||
|
||||
# 实例化JobService类,用于处理与同步任务相关的业务逻辑
|
||||
service = JobService()
|
||||
# 调用服务层的create_job方法创建同步流
|
||||
ans = await service.create_job(name, item)
|
||||
# 检查是否成功创建同步流
|
||||
if not ans:
|
||||
# 记录错误日志
|
||||
logger.error(f"Create a job of project #{name} failed")
|
||||
# 抛出自定义的异常
|
||||
raise Errors.INSERT_FAILD
|
||||
# 返回创建成功的响应
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=ans, # 返回创建成功的同步流数据
|
||||
msg="创建同步流成功"
|
||||
)
|
||||
|
||||
# 开启一个同步流的异步方法
|
||||
@job.put("/projects/{name}/jobs/{id}/start", response_model=Response, description='开启一个同步流')
|
||||
async def start_job(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
|
||||
id: int = Query(..., description='同步流id') # 要开启的同步流的ID,作为路径参数
|
||||
):
|
||||
# 检查同步工程名是否提供
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
# 检查同步流ID是否提供
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
|
||||
# 实例化JobService类
|
||||
service = JobService()
|
||||
# 调用服务层的update_status方法开启同步流
|
||||
ans = await service.update_status(id, True) # 假设True表示开启状态
|
||||
# 检查是否成功开启同步流
|
||||
if not ans:
|
||||
# 记录错误日志
|
||||
logger.error(f"The job #{id} of project #{name} start failed")
|
||||
# 抛出自定义的异常
|
||||
raise Errors.UPDATE_FAILD
|
||||
# 返回开启成功的响应
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="开启同步流成功"
|
||||
)
|
||||
|
||||
@ job.put("/projects/{name}/jobs/{id}/start", response_model=Response, description='开启一个同步流')
|
||||
async def start_job(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'),
|
||||
id: int = Query(..., description='同步流id')
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
service = JobService()
|
||||
ans = await service.update_status(id, True)
|
||||
if not ans:
|
||||
logger.error(f"The job #{id} of project #{name} start failed")
|
||||
raise Errors.UPDATE_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="开启同步流成功"
|
||||
)
|
||||
|
||||
@ job.put("/projects/{name}/jobs/{id}/stop", response_model=Response, description='停止一个同步流')
|
||||
async def stop_job(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'),
|
||||
id: int = Query(..., description='同步流id')
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
service = JobService()
|
||||
ans = await service.update_status(id, False)
|
||||
if not ans:
|
||||
logger.error(f"The job #{id} of project #{name} stop failed")
|
||||
raise Errors.UPDATE_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="关闭同步流成功"
|
||||
)
|
||||
|
||||
@ job.delete("/projects/{name}/jobs", response_model=Response, description='通过id删除一个同步流')
|
||||
async def delete_job(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'),
|
||||
id: int = Query(..., description='同步流id')
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
service = JobService()
|
||||
ans = await service.delete_job(id)
|
||||
if not ans:
|
||||
logger.error(f"The job #{id} of project #{name} delete failed")
|
||||
raise Errors.DELETE_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="删除同步流成功"
|
||||
)
|
||||
|
||||
@ job.put("/projects/{name}/jobs/{id}/set_commit", response_model=Response, description='通过id设置一个同步流的commit')
|
||||
async def set_job_commit(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'),
|
||||
id: int = Query(..., description='同步流id'),
|
||||
commit: str = Query(..., description='commit'),
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
|
||||
service = JobService()
|
||||
job = await service.get_job(id)
|
||||
if not job:
|
||||
logger.error(f"The job #{id} of project #{name} is not exist")
|
||||
raise Errors.UPDATE_FAILD
|
||||
# only the sync type is oneway can use the commit
|
||||
if job.type == SyncType.TwoWay:
|
||||
logger.error(f"The job #{id} of project #{name} is two way sync")
|
||||
raise HTTPException(Code.OPERATION_FAILED, 'Twoway同步方式无法修改commit值')
|
||||
ans = await service.update_job_lateset_commit(id, commit)
|
||||
if not ans:
|
||||
logger.error(
|
||||
f"The job #{id} of project #{name} update latest commit failed")
|
||||
raise Errors.UPDATE_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="设置同步流commit成功"
|
||||
)
|
||||
|
||||
@ job.get("/projects/{name}/jobs/{id}/logs", response_model=Response[DataList[LogData]], description='列出所有同步流')
|
||||
async def get_job_log(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'),
|
||||
id: int = Query(..., description='同步流id'),
|
||||
pageNum: Optional[int] = Query(1, description="Page number"),
|
||||
pageSize: Optional[int] = Query(1000, description="Page size")
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
project_service = ProjectService()
|
||||
projects = await project_service.search_project(name=name)
|
||||
if len(projects) == 0:
|
||||
raise ErrorTemplate.ARGUMENT_ERROR("工程名")
|
||||
service = LogService()
|
||||
log = await service.get_logs_by_job(id, pageNum, pageSize)
|
||||
data = []
|
||||
for rep_log in log:
|
||||
log_str = rep_log.log
|
||||
if projects[0].gitee_token:
|
||||
log_str = log_str.replace(projects[0].gitee_token, "******")
|
||||
if projects[0].github_token:
|
||||
log_str = log_str.replace(projects[0].github_token, "******")
|
||||
rep_log.log = log_str
|
||||
data.append(rep_log)
|
||||
|
||||
if len(log) == 0:
|
||||
logger.info(
|
||||
f"The job #{id} of project #{name} has no logs")
|
||||
count = await service.count_logs(id)
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=DataList(total=count, list=data)
|
||||
|
||||
# 停止一个同步流
|
||||
@ job.put("/projects/{name}/jobs/{id}/stop", response_model=Response, description='停止一个同步流')
|
||||
async def stop_job(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'), # 同步工程的名称
|
||||
id: int = Query(..., description='同步流id') # 需要停止的同步流的ID
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
service = JobService() # 创建JobService实例来处理业务逻辑
|
||||
ans = await service.update_status(id, False) # 调用服务方法停止同步流
|
||||
if not ans:
|
||||
logger.error(f"The job #{id} of project #{name} stop failed")
|
||||
raise Errors.UPDATE_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="关闭同步流成功"
|
||||
)
|
||||
|
||||
# 通过id删除一个同步流
|
||||
@ job.delete("/projects/{name}/jobs/{id}", response_model=Response, description='通过id删除一个同步流')
|
||||
async def delete_job(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'), # 同步工程的名称
|
||||
id: int = Query(..., description='同步流id') # 需要删除的同步流的ID
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
service = JobService() # 创建JobService实例来处理业务逻辑
|
||||
ans = await service.delete_job(id) # 调用服务方法删除同步流
|
||||
if not ans:
|
||||
logger.error(f"The job #{id} of project #{name} delete failed")
|
||||
raise Errors.DELETE_FAILD
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="删除同步流成功"
|
||||
)
|
||||
|
||||
@ job.put("/projects/{name}/jobs/{id}/set_commit", response_model=Response, description='通过id设置一个同步流的commit')
|
||||
async def set_job_commit(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'), # 同步工程的名称
|
||||
id: int = Query(..., description='同步流id'), # 需要设置 commit 的同步流的 ID
|
||||
commit: str = Query(..., description='commit'), # 需要设置的 commit 值
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
|
||||
service = JobService() # 创建 JobService 实例来处理业务逻辑
|
||||
job = await service.get_job(id) # 尝试获取同步流信息
|
||||
if not job:
|
||||
logger.error(f"The job #{id} of project #{name} is not exist")
|
||||
raise Errors.UPDATE_FAILD
|
||||
|
||||
# 检查同步流类型,只有单向同步才能修改 commit
|
||||
if job.type == SyncType.TwoWay: # 假设 SyncType 是一个枚举,包含 TwoWay 和其他类型
|
||||
logger.error(f"The job #{id} of project #{name} is two way sync")
|
||||
# 抛出 HTTP 异常,包含自定义的错误码和错误信息
|
||||
raise HTTPException(Code.OPERATION_FAILED, 'Twoway同步方式无法修改commit值')
|
||||
|
||||
ans = await service.update_job_latest_commit(id, commit) # 调用服务方法更新 commit
|
||||
if not ans:
|
||||
logger.error(f"The job #{id} of project #{name} update latest commit failed")
|
||||
raise Errors.UPDATE_FAILD
|
||||
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
msg="设置同步流commit成功"
|
||||
)
|
||||
|
||||
|
||||
@job.get("/projects/{name}/jobs/{id}/logs", response_model=Response[DataList[LogData]], description='获取指定同步流的日志列表')
|
||||
async def get_job_log(
|
||||
self,
|
||||
name: str = Query(..., description='同步工程名'), # 同步工程的名称
|
||||
id: int = Query(..., description='同步流id'), # 需要获取日志的同步流 ID
|
||||
pageNum: Optional[int] = Query(1, description="页码,默认为1"), # 日志的分页页码
|
||||
pageSize: Optional[int] = Query(1000, description="每页大小,默认为1000") # 每页显示的日志条数
|
||||
):
|
||||
if not name:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("工程名")
|
||||
if not id:
|
||||
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
|
||||
# 首先验证工程是否存在
|
||||
project_service = ProjectService()
|
||||
projects = await project_service.search_project(name=name)
|
||||
if len(projects) == 0:
|
||||
raise ErrorTemplate.ARGUMENT_ERROR("工程名")
|
||||
|
||||
# 使用 LogService 获取日志
|
||||
service = LogService()
|
||||
log = await service.get_logs_by_job(id, pageNum, pageSize)
|
||||
|
||||
# 处理日志数据,替换敏感信息
|
||||
data = []
|
||||
for rep_log in log:
|
||||
log_str = rep_log.log
|
||||
if projects[0].gitee_token:
|
||||
log_str = log_str.replace(projects[0].gitee_token, "******")
|
||||
if projects[0].github_token:
|
||||
log_str = log_str.replace(projects[0].github_token, "******")
|
||||
rep_log.log = log_str
|
||||
data.append(rep_log)
|
||||
|
||||
# 如果日志为空,记录日志信息
|
||||
if len(log) == 0:
|
||||
logger.info(f"The job #{id} of project #{name} has no logs")
|
||||
|
||||
# 获取日志总数
|
||||
count = await service.count_logs(id)
|
||||
|
||||
# 返回响应
|
||||
return Response(
|
||||
code=Code.SUCCESS,
|
||||
data=DataList(total=count, list=data)
|
||||
)
|
|
@ -21,280 +21,378 @@ from src.base.status_code import Status, SYNCResponse, SYNCException
|
|||
from src.service.cronjob import GITMSGException
|
||||
|
||||
|
||||
class SyncDirection(Controller):
|
||||
class SyncDirection(Controller):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.service = SyncService() # 初始化同步服务
|
||||
self.log_service = LogService() # 初始化日志服务(虽然在这段代码中未直接使用,但可能用于其他功能)
|
||||
super().__init__(*args, **kwargs) # 调用父类 Controller 的构造函数
|
||||
|
||||
# 提供获取操作人员信息定义接口, 无任何实质性操作
|
||||
# 此方法可能继承自 Controller 并被重写以保持接口一致性
|
||||
def user(self):
|
||||
return super().user()
|
||||
|
||||
@router.post("/repo", response_model=SYNCResponse, description='配置同步仓库')
|
||||
async def create_sync_repo(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
dto: SyncRepoDTO = Body(..., description="绑定同步仓库信息")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user) # 记录日志
|
||||
|
||||
# 校验外部和内部仓库地址是否合法
|
||||
if not base.check_addr(dto.external_repo_address) or not base.check_addr(dto.internal_repo_address):
|
||||
return SYNCResponse(
|
||||
code_status=Status.REPO_ADDR_ILLEGAL.code,
|
||||
msg=Status.REPO_ADDR_ILLEGAL.msg
|
||||
)
|
||||
|
||||
# 校验同步粒度是否合法
|
||||
if dto.sync_granularity not in [1, 2]:
|
||||
return SYNCResponse(code_status=Status.SYNC_GRAN_ILLEGAL.code, msg=Status.SYNC_GRAN_ILLEGAL.msg)
|
||||
|
||||
# 校验同步方向是否合法
|
||||
if dto.sync_direction not in [1, 2]:
|
||||
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
|
||||
|
||||
# 检查是否已存在同名的同步仓库
|
||||
if await self.service.same_name_repo(repo_name=dto.repo_name):
|
||||
return SYNCResponse(
|
||||
code_status=Status.REPO_EXISTS.code,
|
||||
msg=Status.REPO_EXISTS.msg
|
||||
)
|
||||
|
||||
# 创建新的同步仓库
|
||||
repo = await self.service.create_repo(dto)
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
data=repo,
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
||||
|
||||
@router.post("/{repo_name}/branch", response_model=SYNCResponse, description='配置同步分支')
|
||||
async def create_sync_branch(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
dto: SyncBranchDTO = Body(..., description="绑定同步分支信息")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
|
||||
try:
|
||||
repo_id = await self.service.check_status(repo_name, dto) # 验证仓库状态和参数
|
||||
except SYNCException as Error: # 捕获 SYNCException 异常
|
||||
return SYNCResponse(
|
||||
code_status=Error.code_status, # 使用异常中的状态码
|
||||
msg=Error.status_msg # 使用异常中的错误信息
|
||||
)
|
||||
|
||||
branch = await self.service.create_branch(dto, repo_id=repo_id) # 创建同步分支
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code, # 返回成功状态码
|
||||
data=branch, # 返回创建的分支信息
|
||||
msg=Status.SUCCESS.msg # 返回成功消息
|
||||
)
|
||||
|
||||
@router.get("/repo", response_model=SYNCResponse, description='获取同步仓库信息')
|
||||
async def get_sync_repos(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
|
||||
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
|
||||
repos = await self.service.get_sync_repo(page_num=page_num, page_size=page_size, create_sort=create_sort)
|
||||
if repos is None:
|
||||
return SYNCResponse(
|
||||
code_status=Status.NOT_DATA.code, # 返回无数据状态码
|
||||
msg=Status.NOT_DATA.msg # 返回无数据消息
|
||||
)
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code, # 返回成功状态码
|
||||
data=repos, # 返回仓库列表信息
|
||||
msg=Status.SUCCESS.msg # 返回成功消息
|
||||
)
|
||||
|
||||
|
||||
# 假设router是一个FastAPI的路由对象,用于定义路由和视图函数
|
||||
@router.get("/{repo_name}/branch", response_model=SYNCResponse, description='获取仓库对应的同步分支信息')
|
||||
async def get_sync_branches(
|
||||
self,
|
||||
request: Request, # 当前请求的实例
|
||||
user: str = Depends(user), # 依赖注入,自动从请求中提取用户信息
|
||||
repo_name: str = Path(..., description="查询的仓库名称"), # 路径参数,必填,仓库名称
|
||||
page_num: int = Query(1, description="页数"), # 查询参数,页码,默认为1
|
||||
page_size: int = Query(10, description="条数"), # 查询参数,每页显示的记录数,默认为10
|
||||
create_sort: bool = Query(False, description="创建时间排序, 默认倒序") # 查询参数,是否按创建时间排序,默认为False(不排序或倒序)
|
||||
):
|
||||
# 记录API访问日志
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
|
||||
|
||||
try:
|
||||
# 通过仓库名称获取仓库ID
|
||||
repo_id = await self.service.get_repo_id(repo_name=repo_name)
|
||||
except SYNCException as Error:
|
||||
# 如果在获取仓库ID时发生SYNCException异常,则返回错误响应
|
||||
return SYNCResponse(
|
||||
code_status=Error.code_status,
|
||||
msg=Error.status_msg
|
||||
)
|
||||
|
||||
# 获取同步分支信息
|
||||
branches = await self.service.get_sync_branches(
|
||||
repo_id=repo_id,
|
||||
page_num=page_num,
|
||||
page_size=page_size,
|
||||
create_sort=create_sort
|
||||
)
|
||||
|
||||
# 如果没有找到同步分支信息,则返回无数据响应
|
||||
if len(branches) < 1:
|
||||
return SYNCResponse(
|
||||
code_status=Status.NOT_DATA.code,
|
||||
msg=Status.NOT_DATA.msg
|
||||
)
|
||||
|
||||
# 返回成功响应,包含同步分支信息
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
data=branches,
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.service = SyncService()
|
||||
self.log_service = LogService()
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# 提供获取操作人员信息定义接口, 无任何实质性操作
|
||||
def user(self):
|
||||
return super().user()
|
||||
|
||||
@router.post("/repo", response_model=SYNCResponse, description='配置同步仓库')
|
||||
async def create_sync_repo(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
dto: SyncRepoDTO = Body(..., description="绑定同步仓库信息")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
|
||||
if not base.check_addr(dto.external_repo_address) or not base.check_addr(dto.internal_repo_address):
|
||||
return SYNCResponse(
|
||||
code_status=Status.REPO_ADDR_ILLEGAL.code,
|
||||
msg=Status.REPO_ADDR_ILLEGAL.msg
|
||||
)
|
||||
|
||||
if dto.sync_granularity not in [1, 2]:
|
||||
return SYNCResponse(code_status=Status.SYNC_GRAN_ILLEGAL.code, msg=Status.SYNC_GRAN_ILLEGAL.msg)
|
||||
|
||||
if dto.sync_direction not in [1, 2]:
|
||||
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
|
||||
|
||||
if await self.service.same_name_repo(repo_name=dto.repo_name):
|
||||
return SYNCResponse(
|
||||
code_status=Status.REPO_EXISTS.code,
|
||||
msg=Status.REPO_EXISTS.msg
|
||||
)
|
||||
|
||||
repo = await self.service.create_repo(dto)
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
data=repo,
|
||||
msg=Status.SUCCESS.msg
|
||||
|
||||
# 假设router是一个FastAPI的路由对象,用于定义路由和视图函数
|
||||
@router.post("/repo/{repo_name}", response_model=SYNCResponse, description='执行仓库同步')
|
||||
async def sync_repo(
|
||||
self,
|
||||
request: Request, # 当前请求的实例
|
||||
user: str = Depends(user), # 依赖注入,自动从请求中提取用户信息
|
||||
repo_name: str = Path(..., description="仓库名称"), # 路径参数,必填,仓库名称
|
||||
force_flag: bool = Query(False, description="是否强制同步") # 查询参数,指示是否强制进行同步操作
|
||||
):
|
||||
# 记录API访问日志
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
|
||||
|
||||
# 获取仓库信息
|
||||
repo = await self.service.get_repo(repo_name=repo_name)
|
||||
|
||||
# 如果仓库不存在,返回仓库未找到的响应
|
||||
if repo is None:
|
||||
return SYNCResponse(code_status=Status.REPO_NOTFOUND.code, msg=Status.REPO_NOTFOUND.msg)
|
||||
|
||||
# 如果仓库未启用,返回仓库未启用的响应
|
||||
if not repo.enable:
|
||||
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
|
||||
|
||||
# 尝试执行仓库同步任务
|
||||
try:
|
||||
await sync_repo_task(repo, user, force_flag)
|
||||
except GITMSGException as GITError:
|
||||
# 如果在执行同步任务时发生GITMSGException异常,则返回相应的错误信息
|
||||
return SYNCResponse(
|
||||
code_status=GITError.status, # 注意:这里假设GITMSGException有一个status属性,实际中可能需要根据实际情况调整
|
||||
msg=GITError.msg
|
||||
)
|
||||
|
||||
# 如果同步成功,返回成功响应
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
||||
|
||||
@router.post("/{repo_name}/branch", response_model=SYNCResponse, description='配置同步分支')
|
||||
async def create_sync_branch(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
dto: SyncBranchDTO = Body(..., description="绑定同步分支信息")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
|
||||
try:
|
||||
repo_id = await self.service.check_status(repo_name, dto)
|
||||
except SYNCException as Error:
|
||||
return SYNCResponse(
|
||||
code_status=Error.code_status,
|
||||
msg=Error.status_msg
|
||||
)
|
||||
|
||||
branch = await self.service.create_branch(dto, repo_id=repo_id)
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
data=branch,
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
||||
|
||||
@router.get("/repo", response_model=SYNCResponse, description='获取同步仓库信息')
|
||||
async def get_sync_repos(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
|
||||
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
|
||||
repos = await self.service.get_sync_repo(page_num=page_num, page_size=page_size, create_sort=create_sort)
|
||||
if repos is None:
|
||||
return SYNCResponse(
|
||||
code_status=Status.NOT_DATA.code,
|
||||
msg=Status.NOT_DATA.msg
|
||||
)
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
data=repos,
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
||||
|
||||
@router.get("/{repo_name}/branch", response_model=SYNCResponse, description='获取仓库对应的同步分支信息')
|
||||
async def get_sync_branches(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="查询的仓库名称"),
|
||||
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
|
||||
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
|
||||
try:
|
||||
repo_id = await self.service.get_repo_id(repo_name=repo_name)
|
||||
except SYNCException as Error:
|
||||
return SYNCResponse(
|
||||
code_status=Error.code_status,
|
||||
msg=Error.status_msg
|
||||
)
|
||||
|
||||
branches = await self.service.get_sync_branches(repo_id=repo_id, page_num=page_num,
|
||||
page_size=page_size, create_sort=create_sort)
|
||||
if len(branches) < 1:
|
||||
return SYNCResponse(
|
||||
code_status=Status.NOT_DATA.code,
|
||||
msg=Status.NOT_DATA.msg
|
||||
)
|
||||
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
data=branches,
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
||||
|
||||
@router.post("/repo/{repo_name}", response_model=SYNCResponse, description='执行仓库同步')
|
||||
async def sync_repo(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
force_flag: bool = Query(False, description="是否强制同步")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
|
||||
repo = await self.service.get_repo(repo_name=repo_name)
|
||||
if repo is None:
|
||||
return SYNCResponse(code_status=Status.REPO_NOTFOUND.code, msg=Status.REPO_NOTFOUND.msg)
|
||||
if not repo.enable:
|
||||
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
|
||||
|
||||
try:
|
||||
await sync_repo_task(repo, user, force_flag)
|
||||
except GITMSGException as GITError:
|
||||
return SYNCResponse(
|
||||
code_status=GITError.status,
|
||||
msg=GITError.msg
|
||||
)
|
||||
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
||||
|
||||
@router.post("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='执行分支同步')
|
||||
async def sync_branch(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
branch_name: str = Path(..., description="分支名称"),
|
||||
sync_direct: int = Query(..., description="同步方向: 1 表示内部仓库同步到外部, 2 表示外部仓库同步到内部"),
|
||||
force_flag: bool = Query(False, description="是否强制同步")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
|
||||
repo = await self.service.get_repo(repo_name=repo_name)
|
||||
if not repo.enable:
|
||||
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
|
||||
if sync_direct not in [1, 2]:
|
||||
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
|
||||
|
||||
direct = SyncDirect(sync_direct)
|
||||
branches = await self.service.sync_branch(repo_id=repo.id, branch_name=branch_name, dire=direct)
|
||||
if len(branches) < 1:
|
||||
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
|
||||
|
||||
try:
|
||||
await sync_branch_task(repo, branches, direct, user, force_flag)
|
||||
except GITMSGException as GITError:
|
||||
return SYNCResponse(
|
||||
code_status=GITError.status,
|
||||
msg=GITError.msg
|
||||
)
|
||||
|
||||
|
||||
@router.post("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='执行分支同步')
|
||||
async def sync_branch(
|
||||
self,
|
||||
request: Request, # 当前HTTP请求对象
|
||||
user: str = Depends(user), # 依赖注入,获取当前用户
|
||||
repo_name: str = Path(..., description="仓库名称"), # 路径参数,指定要同步的仓库名称
|
||||
branch_name: str = Path(..., description="分支名称"), # 路径参数,指定要同步的分支名称
|
||||
sync_direct: int = Query(..., description="同步方向: 1 表示内部仓库同步到外部, 2 表示外部仓库同步到内部"), # 查询参数,指定同步方向
|
||||
force_flag: bool = Query(False, description="是否强制同步") # 查询参数,指定是否强制同步
|
||||
):
|
||||
# 记录用户访问日志
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
|
||||
|
||||
# 获取指定仓库信息
|
||||
repo = await self.service.get_repo(repo_name=repo_name)
|
||||
|
||||
# 如果仓库未启用,则返回错误响应
|
||||
if not repo.enable:
|
||||
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
|
||||
|
||||
# 检查同步方向是否合法
|
||||
if sync_direct not in [1, 2]:
|
||||
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
|
||||
|
||||
# 将同步方向转换为枚举类型(假设SyncDirect是一个枚举)
|
||||
direct = SyncDirect(sync_direct)
|
||||
|
||||
# 调用服务层方法获取分支同步的相关信息(注意:这里假设sync_branch方法返回了相关信息)
|
||||
branches = await self.service.sync_branch(repo_id=repo.id, branch_name=branch_name, dire=direct)
|
||||
|
||||
# 如果没有找到可同步的分支,则返回错误响应
|
||||
# 注意:这里的逻辑可能需要根据实际业务进行调整,因为通常sync_branch可能不会直接返回分支列表
|
||||
if len(branches) < 1:
|
||||
return SYNCResponse(code_status=Status.BRANCH_NOT_FOUND.code, msg="未找到可同步的分支") # 假设添加了一个新的状态码和消息
|
||||
|
||||
# 尝试执行分支同步任务
|
||||
try:
|
||||
await sync_branch_task(repo, branches, direct, user, force_flag)
|
||||
except GITMSGException as GITError:
|
||||
# 如果在执行同步任务时发生GITMSGException异常,则返回相应的错误信息
|
||||
return SYNCResponse(code_status=GITError.status, msg=GITError.msg)
|
||||
|
||||
# 同步成功,返回成功响应
|
||||
return SYNCResponse(code_status=Status.SUCCESS.code, msg=Status.SUCCESS.msg)
|
||||
|
||||
|
||||
@router.delete("/repo/{repo_name}", response_model=SYNCResponse, description='仓库解绑')
|
||||
async def delete_repo(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
|
||||
data = await self.service.delete_repo(repo_name=repo_name)
|
||||
try:
|
||||
if data.code_status == 0:
|
||||
delete_repo_dir(repo_name, user)
|
||||
await self.log_service.delete_logs(repo_name=repo_name)
|
||||
except GITMSGException as GITError:
|
||||
return SYNCResponse(
|
||||
code_status=GITError.status,
|
||||
msg=GITError.msg
|
||||
)
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
@router.delete("/repo/{repo_name}", response_model=SYNCResponse, description='仓库解绑')
|
||||
async def delete_repo(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
|
||||
# 调用服务层方法解绑仓库
|
||||
data = await self.service.delete_repo(repo_name=repo_name)
|
||||
|
||||
# 尝试进行额外的解绑操作,如删除仓库目录和日志
|
||||
try:
|
||||
if data.code_status == 0: # 假设0代表成功
|
||||
delete_repo_dir(repo_name, user) # 调用自定义函数删除仓库目录
|
||||
await self.log_service.delete_logs(repo_name=repo_name) # 调用日志服务删除仓库相关日志
|
||||
except GITMSGException as GITError:
|
||||
# 如果在解绑过程中遇到GIT相关的异常,则返回错误响应
|
||||
return SYNCResponse(
|
||||
code_status=GITError.status,
|
||||
msg=GITError.msg
|
||||
)
|
||||
|
||||
# 返回解绑操作的结果
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
)
|
||||
|
||||
@router.delete("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='分支解绑')
|
||||
async def delete_branch(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
branch_name: str = Path(..., description="分支名称")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
|
||||
# 调用服务层方法解绑分支
|
||||
data = await self.service.delete_branch(repo_name=repo_name, branch_name=branch_name)
|
||||
|
||||
# 返回解绑分支操作的结果
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='分支解绑')
|
||||
async def delete_branch(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
branch_name: str = Path(..., description="分支名称")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
|
||||
data = await self.service.delete_branch(repo_name=repo_name, branch_name=branch_name)
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
@router.put("/repo/{repo_name}/repo_addr", response_model=SYNCResponse, description='更新仓库地址')
|
||||
async def update_repo_addr(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
dto: ModifyRepoDTO = Body(..., description="更新仓库地址信息")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} 更新仓库信息", user)
|
||||
# 调用服务层方法更新仓库地址
|
||||
data = await self.service.update_repo_addr(repo_name=repo_name, dto=dto)
|
||||
|
||||
# 尝试执行额外的仓库修改操作(这里可能是更新本地仓库列表或缓存等)
|
||||
try:
|
||||
await modify_repos(repo_name, user)
|
||||
except GITMSGException as GITError:
|
||||
# 如果在修改过程中遇到GIT相关的异常,则返回错误响应
|
||||
return SYNCResponse(
|
||||
code_status=GITError.status,
|
||||
msg=GITError.msg
|
||||
)
|
||||
|
||||
# 返回更新仓库地址操作的结果
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
)
|
||||
|
||||
@router.put("/repo/{repo_name}", response_model=SYNCResponse, description='更新仓库同步状态')
|
||||
async def update_repo_status(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
enable: bool = Query(..., description="同步启用状态")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
|
||||
# 调用服务层方法更新仓库的同步状态
|
||||
data = await self.service.update_repo(repo_name=repo_name, enable=enable)
|
||||
|
||||
# 返回更新仓库同步状态操作的结果
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
)
|
||||
|
||||
@router.put("/repo/{repo_name}/repo_addr", response_model=SYNCResponse, description='更新仓库地址')
|
||||
async def update_repo_addr(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
dto: ModifyRepoDTO = Body(..., description="更新仓库地址信息")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} 更新仓库信息", user)
|
||||
data = await self.service.update_repo_addr(repo_name=repo_name, dto=dto)
|
||||
try:
|
||||
await modify_repos(repo_name, user)
|
||||
except GITMSGException as GITError:
|
||||
return SYNCResponse(
|
||||
code_status=GITError.status,
|
||||
msg=GITError.msg
|
||||
)
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
|
||||
|
||||
|
||||
@router.put("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='更新分支同步状态')
|
||||
async def update_branch_status(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
branch_name: str = Path(..., description="分支名称"),
|
||||
enable: bool = Query(..., description="同步启用状态")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
|
||||
# 调用服务层方法更新指定仓库和分支的同步状态
|
||||
data = await self.service.update_branch(repo_name=repo_name, branch_name=branch_name, enable=enable)
|
||||
|
||||
# 返回更新分支同步状态操作的结果
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
)
|
||||
|
||||
@router.put("/repo/{repo_name}", response_model=SYNCResponse, description='更新仓库同步状态')
|
||||
async def update_repo_status(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
enable: bool = Query(..., description="同步启用状态")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
|
||||
data = await self.service.update_repo(repo_name=repo_name, enable=enable)
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
)
|
||||
|
||||
@router.put("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='更新分支同步状态')
|
||||
async def update_branch_status(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Path(..., description="仓库名称"),
|
||||
branch_name: str = Path(..., description="分支名称"),
|
||||
enable: bool = Query(..., description="同步启用状态")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
|
||||
data = await self.service.update_branch(repo_name=repo_name, branch_name=branch_name, enable=enable)
|
||||
return SYNCResponse(
|
||||
code_status=data.code_status,
|
||||
msg=data.status_msg
|
||||
)
|
||||
|
||||
@router.get("/repo/logs", response_model=SYNCResponse, description='获取仓库/分支日志')
|
||||
async def get_logs(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Query(None, description="仓库名称"),
|
||||
branch_id: str = Query(None, description="分支id(仓库粒度无需输入)"),
|
||||
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
|
||||
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
|
||||
branch_id_list = branch_id.split(',') if branch_id is not None else []
|
||||
repo_name_list = repo_name.split(',') if repo_name is not None else []
|
||||
data = await self.log_service.get_logs(repo_name_list=repo_name_list, branch_id_list=branch_id_list,
|
||||
page_num=page_num, page_size=page_size, create_sort=create_sort)
|
||||
if not data:
|
||||
return SYNCResponse(
|
||||
code_status=Status.NOT_DATA.code,
|
||||
total=data[0],
|
||||
data=data[1],
|
||||
msg=Status.NOT_DATA.msg
|
||||
)
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
total=data[0],
|
||||
data=data[1],
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
||||
|
||||
|
||||
|
||||
@router.get("/repo/logs", response_model=SYNCResponse, description='获取仓库/分支日志')
|
||||
async def get_logs(
|
||||
self, request: Request, user: str = Depends(user),
|
||||
repo_name: str = Query(None, description="仓库名称,支持逗号分隔的多个仓库"),
|
||||
branch_id: str = Query(None, description="分支ID,支持逗号分隔的多个分支ID(仓库粒度时无需输入)"),
|
||||
page_num: int = Query(1, description="请求的页码"),
|
||||
page_size: int = Query(10, description="每页显示的记录数"),
|
||||
create_sort: bool = Query(False, description="是否按创建时间排序,默认为倒序")
|
||||
):
|
||||
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
|
||||
|
||||
# 将仓库名称和分支ID(如果提供)转换为列表
|
||||
branch_id_list = branch_id.split(',') if branch_id is not None else []
|
||||
repo_name_list = repo_name.split(',') if repo_name is not None else []
|
||||
|
||||
# 调用服务层方法获取日志
|
||||
data = await self.log_service.get_logs(repo_name_list=repo_name_list, branch_id_list=branch_id_list,
|
||||
page_num=page_num, page_size=page_size, create_sort=create_sort)
|
||||
|
||||
# 注意:这里的if not data条件可能不总是正确的,因为data可能是其他结构(如元组或列表)
|
||||
# 如果data是(total, logs)形式的元组,则应该这样处理:
|
||||
if not isinstance(data, tuple) or len(data) != 2:
|
||||
# 处理异常情况,例如数据格式不正确
|
||||
return SYNCResponse(
|
||||
code_status=Status.ERROR.code, # 假设有Status.ERROR
|
||||
total=0,
|
||||
data=[],
|
||||
msg="数据格式错误"
|
||||
)
|
||||
total, logs = data # 解构元组
|
||||
|
||||
# 如果没有数据,返回相应的响应
|
||||
if not logs:
|
||||
return SYNCResponse(
|
||||
code_status=Status.NOT_DATA.code,
|
||||
total=total,
|
||||
data=[],
|
||||
msg=Status.NOT_DATA.msg
|
||||
)
|
||||
# 返回成功响应
|
||||
return SYNCResponse(
|
||||
code_status=Status.SUCCESS.code,
|
||||
total=total,
|
||||
data=logs,
|
||||
msg=Status.SUCCESS.msg
|
||||
)
|
|
@ -6,21 +6,34 @@ from src.api.Controller import APIController as Controller
|
|||
from src.router import USER as user
|
||||
|
||||
|
||||
class User(Controller):
|
||||
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token=None):
|
||||
return super().get_user(cookie_key=cookie_key, token=token)
|
||||
|
||||
@user.get("/info", response_model=Response[UserInfoDto], description="获得用户信息")
|
||||
async def get_user_info(
|
||||
self,
|
||||
user: Security = Depends(get_user)
|
||||
):
|
||||
return Response(
|
||||
data=UserInfoDto(
|
||||
name=user.name,
|
||||
nick=user.nick,
|
||||
emp_id=user.emp_id,
|
||||
email=user.email,
|
||||
dept=user.dept
|
||||
)
|
||||
)
|
||||
# 定义一个User类,继承自APIController基类
|
||||
class User(Controller):
|
||||
# 定义一个get_user方法,该方法通过cookie_key和token参数获取用户信息
|
||||
# Security(Controller.API_KEY_BUC_COOKIE)是一个依赖项,用于从请求中提取API密钥
|
||||
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token=None):
|
||||
# 调用基类的get_user方法,传入cookie_key和token
|
||||
# 这里假设基类有处理这些参数并返回用户信息的逻辑
|
||||
return super().get_user(cookie_key=cookie_key, token=token)
|
||||
|
||||
# 定义一个异步的get_user_info方法,用于获取用户信息
|
||||
# 使用@user.get装饰器,将该方法注册为/info路由的GET请求处理函数
|
||||
# response_model=Response[UserInfoDto]指定了响应的数据模型,即Response包含UserInfoDto
|
||||
# description参数提供了该路由的简短描述
|
||||
@user.get("/info", response_model=Response[UserInfoDto], description="获得用户信息")
|
||||
async def get_user_info(
|
||||
self,
|
||||
# 使用Security和Depends从请求中提取当前用户信息,这里假设get_user是一个能返回用户的函数
|
||||
user: Security = Depends(get_user)
|
||||
):
|
||||
# 创建一个UserInfoDto实例,包含从user中提取的用户信息
|
||||
# 注意:这里假设user对象具有name, nick, emp_id, email, dept等属性
|
||||
user_info = UserInfoDto(
|
||||
name=user.name,
|
||||
nick=user.nick,
|
||||
emp_id=user.emp_id,
|
||||
email=user.email,
|
||||
dept=user.dept
|
||||
)
|
||||
# 使用Response类封装用户信息,并返回
|
||||
# Response类可能处理了序列化、状态码、响应头等额外的逻辑
|
||||
return Response(data=user_info)
|
||||
|
|
|
@ -33,18 +33,18 @@ buc_key and ConfigsUtil.set_obfastapi_config('buc_key', buc_key)
|
|||
DB_ENV = getenv('DB_ENV', 'test_env')
|
||||
DB = {
|
||||
'test_env': {
|
||||
'host': getenv('CEROBOT_MYSQL_HOST', ''),
|
||||
'port': getenv('CEROBOT_MYSQL_PORT', 2883, int),
|
||||
'user': getenv('CEROBOT_MYSQL_USER', ''),
|
||||
'passwd': getenv('CEROBOT_MYSQL_PWD', ''),
|
||||
'dbname': getenv('CEROBOT_MYSQL_DB', '')
|
||||
'host': getenv('CEROBOT_MYSQL_HOST', 'localhost'),
|
||||
'port': getenv('CEROBOT_MYSQL_PORT', 3306, int),
|
||||
'user': getenv('CEROBOT_MYSQL_USER', 'root'),
|
||||
'passwd': getenv('CEROBOT_MYSQL_PWD', 'wq20030602'),
|
||||
'dbname': getenv('CEROBOT_MYSQL_DB', 'reposyncer')
|
||||
},
|
||||
'local': {
|
||||
'host': getenv('CEROBOT_MYSQL_HOST', ''),
|
||||
'port': getenv('CEROBOT_MYSQL_PORT', 2881, int),
|
||||
'user': getenv('CEROBOT_MYSQL_USER', ''),
|
||||
'passwd': getenv('CEROBOT_MYSQL_PWD', ''),
|
||||
'dbname': getenv('CEROBOT_MYSQL_DB', '')
|
||||
'host': getenv('CEROBOT_MYSQL_HOST', 'localhost'),
|
||||
'port': getenv('CEROBOT_MYSQL_PORT', 3306, int),
|
||||
'user': getenv('CEROBOT_MYSQL_USER', 'root'),
|
||||
'passwd': getenv('CEROBOT_MYSQL_PWD', 'wq20030602'),
|
||||
'dbname': getenv('CEROBOT_MYSQL_DB', 'reposyncer')
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ from src.do.sync_config import SyncDirect, SyncType
|
|||
from src.dto.sync_config import SyncBranchDTO
|
||||
from src.utils.sync_log import sync_log, LogType, log_path, api_log
|
||||
from src.service.sync_config import LogService
|
||||
from subprocess import check_output, CalledProcessError, STDOUT
|
||||
|
||||
sync_repo_dao = SyncRepoDAO()
|
||||
sync_branch_dao = SyncBranchDAO()
|
||||
|
@ -48,154 +49,237 @@ def delete_repo_dir(repo_name, user: str):
|
|||
os.path.exists(repo_dir) and shutil.rmtree(repo_dir)
|
||||
|
||||
|
||||
def shell(cmd, dire: str, log_name: str, user: str):
|
||||
log = f'Execute cmd: ' + cmd
|
||||
if 'git clone' in log:
|
||||
sync_log(LogType.INFO, 'Execute cmd: git clone', log_name, user)
|
||||
elif 'git remote' in log:
|
||||
sync_log(LogType.INFO, '添加/更新仓库信息', log_name, user)
|
||||
elif 'git ls-remote' in log:
|
||||
sync_log(LogType.INFO, '获取仓库分支信息', log_name, user)
|
||||
else:
|
||||
sync_log(LogType.INFO, log, log_name, user)
|
||||
output = subprocess.run(shlex.split(cmd), cwd=dire, capture_output=True, text=True)
|
||||
if output.returncode != 0:
|
||||
git_error = get_git_error(output.stderr)
|
||||
if config.LOG_DETAIL:
|
||||
sync_log(LogType.ERROR, output.stderr, log_name, user)
|
||||
raise git_error
|
||||
# 定义一个名为 shell 的函数,它接受命令(cmd)、工作目录(dire)、日志名称(log_name)和用户(user)作为参数
|
||||
def shell(cmd, dire: str, log_name: str, user: str):
|
||||
# 将执行的命令转化为字符串并记录到变量 log 中
|
||||
log = f'Execute cmd: ' + cmd
|
||||
|
||||
# 根据命令内容的不同,记录不同的日志信息
|
||||
if 'git clone' in log:
|
||||
# 如果命令中包含 'git clone',则记录克隆仓库的日志
|
||||
sync_log(LogType.INFO, 'Execute cmd: git clone', log_name, user)
|
||||
elif 'git remote' in log:
|
||||
# 如果命令中包含 'git remote',则记录添加/更新仓库信息的日志
|
||||
sync_log(LogType.INFO, '添加/更新仓库信息', log_name, user)
|
||||
elif 'git ls-remote' in log:
|
||||
# 如果命令中包含 'git ls-remote',则记录获取仓库分支信息的日志
|
||||
sync_log(LogType.INFO, '获取仓库分支信息', log_name, user)
|
||||
else:
|
||||
# 如果命令不是上述三种情况之一,则记录原始命令的日志
|
||||
sync_log(LogType.INFO, log, log_name, user)
|
||||
|
||||
# 使用 subprocess 模块执行命令,并将命令分割为列表(shlex.split 用于处理包含空格和特殊字符的命令)
|
||||
# cwd 参数指定工作目录,capture_output=True 表示捕获命令的输出,text=True 表示输出为文本格式
|
||||
output = subprocess.run(shlex.split(cmd), cwd=dire, capture_output=True, text=True)
|
||||
|
||||
# 检查命令的返回值,如果返回值不为 0,则表示命令执行失败
|
||||
if output.returncode != 0:
|
||||
# 调用 get_git_error 函数从 stderr 中获取 Git 错误信息
|
||||
git_error = get_git_error(output.stderr)
|
||||
|
||||
# 如果配置中启用了详细日志记录(config.LOG_DETAIL),则记录命令的错误输出
|
||||
if config.LOG_DETAIL:
|
||||
sync_log(LogType.ERROR, output.stderr, log_name, user)
|
||||
|
||||
# 抛出 Git 错误,以中断函数并向上层传递错误信息
|
||||
raise git_error
|
||||
|
||||
# 如果命令执行成功,则返回命令的输出
|
||||
return output
|
||||
|
||||
|
||||
def init_repos(repo, log_name: str, user: str):
|
||||
not os.path.exists(SYNC_DIR) and os.makedirs(SYNC_DIR)
|
||||
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
|
||||
if not os.path.exists(repo_dir):
|
||||
sync_log(LogType.INFO, "初始化仓库 *********", log_name, user)
|
||||
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
|
||||
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
|
||||
if repo.sync_direction == SyncDirect.to_outer:
|
||||
# 克隆内部仓库到同步目录下
|
||||
shell(f'git clone {inter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
|
||||
else:
|
||||
# 克隆外部仓库到同步目录下
|
||||
shell(f'git clone {exter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
|
||||
# 添加internal远程仓库,并强制使用
|
||||
shell(f'git remote add -f internal {inter_repo_addr}', repo_dir, log_name, user)
|
||||
# 添加external远程仓库,并强制使用
|
||||
shell(f'git remote add -f external {exter_repo_addr}', repo_dir, log_name, user)
|
||||
def init_repos(repo, log_name: str, user: str):
|
||||
# 检查SYNC_DIR(同步目录)是否存在,如果不存在则创建它
|
||||
# 这里使用了逻辑与的短路求值特性,如果os.path.exists(SYNC_DIR)为True,则不会执行os.makedirs(SYNC_DIR)
|
||||
not os.path.exists(SYNC_DIR) and os.makedirs(SYNC_DIR)
|
||||
|
||||
# 构造仓库目录的路径,将repo的repo_name与SYNC_DIR结合
|
||||
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
|
||||
|
||||
# 检查仓库目录是否存在
|
||||
if not os.path.exists(repo_dir):
|
||||
# 如果仓库目录不存在,记录一条日志信息,表明正在初始化仓库
|
||||
sync_log(LogType.INFO, "********* 初始化仓库 *********", log_name, user)
|
||||
|
||||
# 调用get_repo_address_with_token函数,获取带有内部令牌的内部仓库地址
|
||||
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
|
||||
|
||||
# 调用get_repo_address_with_token函数,获取带有外部令牌的外部仓库地址
|
||||
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
|
||||
|
||||
# 根据repo的同步方向来决定克隆哪个仓库
|
||||
if repo.sync_direction == SyncDirect.to_outer:
|
||||
# 如果同步方向是to_outer(可能是从内部到外部),则克隆内部仓库到同步目录下
|
||||
shell(f'git clone {inter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
|
||||
else:
|
||||
# 如果不是to_outer(可能是从外部到内部或双向),则克隆外部仓库到同步目录下
|
||||
shell(f'git clone {exter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
|
||||
|
||||
# 无论克隆了哪个仓库,都执行以下两步操作来添加远程仓库
|
||||
|
||||
# 添加internal远程仓库,并使用-f参数强制获取远程仓库的最新数据
|
||||
# 注意:这里的internal仓库可能指的是之前克隆的内部仓库,或者是另一个指定的内部仓库
|
||||
shell(f'git remote add -f internal {inter_repo_addr}', repo_dir, log_name, user)
|
||||
|
||||
# 添加external远程仓库,并使用-f参数强制获取远程仓库的最新数据
|
||||
# 注意:这里的external仓库可能指的是之前克隆的外部仓库,或者是另一个指定的外部仓库
|
||||
shell(f'git remote add -f external {exter_repo_addr}', repo_dir, log_name, user)
|
||||
|
||||
|
||||
def inter_to_outer(repo, branch, log_name: str, user: str, force_flag):
|
||||
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
|
||||
inter_name = branch.internal_branch_name
|
||||
outer_name = branch.external_branch_name
|
||||
try:
|
||||
# 从internal仓库的指定分支inter_name中获取代码,更新远程分支的信息到本地仓库
|
||||
shell(f"git fetch internal {inter_name}", repo_dir, log_name, user)
|
||||
# 切换到inter_name分支,并将internal仓库的分支强制 checkout 到当前分支。
|
||||
shell(f"git checkout -B {inter_name} internal/{inter_name}", repo_dir, log_name, user)
|
||||
# 将本地仓库的inter_name分支推送到external仓库的outer_name分支上。
|
||||
if force_flag:
|
||||
shell(f"git push --force external {inter_name}:{outer_name}", repo_dir, log_name, user)
|
||||
else:
|
||||
shell(f"git push external {inter_name}:{outer_name}", repo_dir, log_name, user)
|
||||
# commit id
|
||||
# result = shell(f"git log HEAD~1..HEAD --oneline", repo_dir, log_name, user)
|
||||
# commit_id = result.stdout.split(" ")[0]
|
||||
result = shell(f'git log -1 --format="%H"', repo_dir, log_name, user)
|
||||
commit_id = result.stdout[0:7]
|
||||
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
|
||||
return commit_id
|
||||
except Exception as e:
|
||||
def inter_to_outer(repo, branch, log_name: str, user: str, force_flag: bool):
|
||||
# 拼接内部仓库的目录路径
|
||||
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
|
||||
# 获取内部和外部的分支名
|
||||
inter_name = branch.internal_branch_name
|
||||
outer_name = branch.external_branch_name
|
||||
try:
|
||||
# 从internal仓库的指定分支inter_name中获取代码,更新远程分支的信息到本地仓库
|
||||
# 执行 git fetch 命令从名为 'internal' 的远程仓库获取 inter_name 分支的最新代码
|
||||
shell(f"git fetch internal {inter_name}", repo_dir, log_name, user)
|
||||
# 切换到inter_name分支,并将internal仓库的分支强制 checkout 到当前分支
|
||||
# 使用 git checkout -B 命令创建(如果不存在)或重置(如果存在)inter_name分支,并切换到它
|
||||
# 同时将 internal 仓库的 inter_name 分支的内容拉取到本地该分支
|
||||
shell(f"git checkout -B {inter_name} internal/{inter_name}", repo_dir, log_name, user)
|
||||
# 将本地仓库的inter_name分支推送到external仓库的outer_name分支上
|
||||
# 如果 force_flag 为 True,则使用 --force 标志强制推送,可能会覆盖远程分支的提交
|
||||
if force_flag:
|
||||
shell(f"git push --force external {inter_name}:{outer_name}", repo_dir, log_name, user)
|
||||
else:
|
||||
# 否则,正常推送,不覆盖远程分支的提交
|
||||
shell(f"git push external {inter_name}:{outer_name}", repo_dir, log_name, user)
|
||||
# 在推送后,获取outer-name分支的最新提交ID(假设它已经被更新)
|
||||
outer_commit_id = check_output(f'git ls-remote external {outer_name} | cut -f1', shell=True, cwd=repo_dir, stderr=STDOUT).decode().strip()
|
||||
# 比较inter_name分支的HEAD和outer-name分支的最新提交
|
||||
# 注意:这里我们假设inter_name分支已经是最新的,因为我们刚刚推送了它
|
||||
diff_output = check_output(f'git diff --name-status {outer_commit_id}', shell=True, cwd=repo_dir, stderr=STDOUT).decode()
|
||||
# 解析diff输出以获取被修改的文件名
|
||||
modified_files = [line.split()[1] for line in diff_output.strip().split('\n') if line]
|
||||
# 记录被修改的文件名到日志中
|
||||
if modified_files:
|
||||
sync_log(LogType.INFO, f'Modified files: {", ".join(modified_files)}', log_name, user)
|
||||
# 获取最近一次提交的 commit id
|
||||
result = shell(f'git log -1 --format="%H"', repo_dir, log_name, user)
|
||||
commit_id = result.stdout[0:7]
|
||||
# 记录 commit id 到日志中
|
||||
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
|
||||
# 返回 commit id
|
||||
return commit_id
|
||||
except (CalledProcessError, Exception) as e:
|
||||
# 处理可能出现的异常
|
||||
raise
|
||||
|
||||
|
||||
def outer_to_inter(repo, branch, log_name: str, user: str, force_flag):
|
||||
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
|
||||
inter_name = branch.internal_branch_name
|
||||
outer_name = branch.external_branch_name
|
||||
try:
|
||||
# 从external仓库的指定分支outer_name中获取代码,更新远程分支的信息到本地仓库
|
||||
shell(f"git fetch external {outer_name}", repo_dir, log_name, user)
|
||||
# 切换到本地仓库的outer_name分支,并将origin仓库的outer_name分支强制 checkout 到当前分支。
|
||||
shell(f"git checkout -B {outer_name} external/{outer_name}", repo_dir, log_name, user)
|
||||
# 将本地仓库的outer_name分支推送到internal仓库的inter_name分支上。
|
||||
if force_flag:
|
||||
shell(f"git push --force internal {outer_name}:{inter_name}", repo_dir, log_name, user)
|
||||
else:
|
||||
shell(f"git push internal {outer_name}:{inter_name}", repo_dir, log_name, user)
|
||||
# commit id
|
||||
# result = shell(f"git log HEAD~1..HEAD --oneline", repo_dir, log_name, user)
|
||||
# commit_id = result.stdout.split(" ")[0]
|
||||
result = shell(f'git log -1 --format=%h', repo_dir, log_name, user)
|
||||
commit_id = result.stdout[0:7]
|
||||
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
|
||||
return commit_id
|
||||
except Exception as e:
|
||||
def outer_to_inter(repo, branch, log_name: str, user: str, force_flag):
|
||||
# 拼接内部仓库的目录路径
|
||||
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
|
||||
# 获取内部和外部的分支名
|
||||
inter_name = branch.internal_branch_name
|
||||
outer_name = branch.external_branch_name
|
||||
try:
|
||||
# 从external仓库的指定分支outer_name中获取代码,更新远程分支的信息到本地仓库
|
||||
# 执行 git fetch 命令从名为 'external' 的远程仓库获取 outer_name 分支的最新代码
|
||||
shell(f"git fetch external {outer_name}", repo_dir, log_name, user)
|
||||
# 切换到本地仓库的outer_name分支(如果不存在则创建),并将external仓库的outer_name分支内容拉取到本地该分支
|
||||
# 使用 git checkout -B 命令创建(如果不存在)或重置(如果存在)outer_name分支,并切换到它
|
||||
shell(f"git checkout -B {outer_name} external/{outer_name}", repo_dir, log_name, user)
|
||||
# 将本地仓库的outer_name分支推送到internal仓库的inter_name分支上
|
||||
# 如果 force_flag 为 True,则使用 --force 标志强制推送,可能会覆盖远程分支的提交
|
||||
if force_flag:
|
||||
shell(f"git push --force internal {outer_name}:{inter_name}", repo_dir, log_name, user)
|
||||
else:
|
||||
# 否则,正常推送,不覆盖远程分支的提交
|
||||
shell(f"git push internal {outer_name}:{inter_name}", repo_dir, log_name, user)
|
||||
# 获取最新提交的 commit ID(短哈希)
|
||||
# 注意:这里使用 %h 而不是 %H 来获取短哈希(通常是7个字符)
|
||||
result = shell(f'git log -1 --format=%h', repo_dir, log_name, user)
|
||||
commit_id = result.stdout[0:7]
|
||||
# 记录 commit ID 到日志中
|
||||
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
|
||||
# 返回 commit ID
|
||||
return commit_id
|
||||
except Exception as e:
|
||||
# 如果在同步过程中发生任何异常,则直接抛出该异常
|
||||
raise
|
||||
|
||||
|
||||
async def sync_repo_task(repo, user, force_flag):
|
||||
if repo.sync_granularity == SyncType.one:
|
||||
branches = await sync_branch_dao.sync_branch(repo_id=repo.id)
|
||||
await sync_branch_task(repo, branches, repo.sync_direction, user)
|
||||
else:
|
||||
log_name = f'sync_{repo.repo_name}.log'
|
||||
try:
|
||||
init_repos(repo, log_name, user)
|
||||
sync_log(LogType.INFO, f'************ 执行{repo.repo_name}仓库同步 ************', log_name, user)
|
||||
if repo.sync_direction == SyncDirect.to_outer:
|
||||
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
|
||||
stm = shell(f"git ls-remote --heads {inter_repo_addr}", SYNC_DIR, log_name, user)
|
||||
branch_list_output = stm.stdout.split('\n')
|
||||
for branch in branch_list_output:
|
||||
if branch:
|
||||
branch_name = branch.split('/')[-1].strip()
|
||||
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
|
||||
sync_log(LogType.INFO, f'Execute inter to outer {branch_name} branch Sync', log_name, user)
|
||||
inter_to_outer(repo, branch, log_name, user, force_flag)
|
||||
else:
|
||||
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
|
||||
stm = shell(f"git ls-remote --heads {exter_repo_addr}", SYNC_DIR, log_name, user)
|
||||
branch_list_output = stm.stdout.split('\n')
|
||||
for branch in branch_list_output:
|
||||
if branch:
|
||||
branch_name = branch.split('/')[-1].strip()
|
||||
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
|
||||
sync_log(LogType.INFO, f'Execute outer to inter {branch_name} branch Sync', log_name, user)
|
||||
outer_to_inter(repo, branch, log_name, user, force_flag)
|
||||
sync_log(LogType.INFO, f'************ {repo.repo_name}仓库同步完成 ************', log_name, user)
|
||||
finally:
|
||||
if config.DELETE_SYNC_DIR:
|
||||
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
|
||||
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
|
||||
|
||||
await log_service.insert_repo_log(repo_name=repo.repo_name, direct=repo.sync_direction)
|
||||
os.remove(os.path.join(log_path, log_name))
|
||||
|
||||
|
||||
async def sync_branch_task(repo, branches, direct, user, force_flag):
|
||||
|
||||
# 遍历每个分支进行同步操作
|
||||
for branch in branches:
|
||||
# 分支日志文件名
|
||||
log_name = f'sync_{repo.repo_name}_{branch.id}.log'
|
||||
commit_id = ''
|
||||
try:
|
||||
# 初始化仓库同步环境
|
||||
init_repos(repo, log_name, user)
|
||||
# 记录分支同步开始信息
|
||||
sync_log(LogType.INFO, f'************ 执行分支同步 ************', log_name, user)
|
||||
|
||||
# 根据同步方向执行不同的分支同步操作
|
||||
if direct == SyncDirect.to_inter:
|
||||
# 记录外部到内部分支同步信息
|
||||
sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user)
|
||||
# 执行外部到内部分支同步操作,并获取commit_id
|
||||
commit_id = outer_to_inter(repo, branch, log_name, user, force_flag)
|
||||
else:
|
||||
# 记录内部到外部分支同步信息
|
||||
sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user)
|
||||
# 执行内部到外部分支同步操作,并获取commit_id
|
||||
commit_id = inter_to_outer(repo, branch, log_name, user, force_flag)
|
||||
|
||||
# 记录分支同步完成信息
|
||||
sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user)
|
||||
|
||||
finally:
|
||||
# 如果配置允许删除同步工作目录,则删除
|
||||
if config.DELETE_SYNC_DIR:
|
||||
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
|
||||
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
|
||||
|
||||
|
||||
# 插入分支同步日志
|
||||
await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id)
|
||||
# 删除同步日志文件
|
||||
os.remove(os.path.join(log_path, log_name))
|
||||
|
||||
|
||||
|
||||
async def sync_branch_task(repo, branches, direct, user, force_flag):
|
||||
for branch in branches:
|
||||
log_name = f'sync_{repo.repo_name}_{branch.id}.log'
|
||||
commit_id = ''
|
||||
|
||||
try:
|
||||
# 初始化代码库和日志系统
|
||||
init_repos(repo, log_name, user)
|
||||
|
||||
# 记录日志,表示开始执行分支同步
|
||||
sync_log(LogType.INFO, f'************ 执行分支同步 ************', log_name, user)
|
||||
|
||||
if direct == SyncDirect.to_inter:
|
||||
# 执行外部分支到内部分支的同步
|
||||
sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user)
|
||||
commit_id = outer_to_inter(repo, branch, log_name, user, force_flag)
|
||||
else:
|
||||
# 执行内部分支到外部分支的同步
|
||||
sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user)
|
||||
commit_id = inter_to_outer(repo, branch, log_name, user, force_flag)
|
||||
|
||||
# 记录日志,表示分支同步已完成
|
||||
sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user)
|
||||
|
||||
finally:
|
||||
if config.DELETE_SYNC_DIR:
|
||||
# 删除同步工作目录
|
||||
if os.path.exists(SYNC_DIR): # 修复潜在问题:使用 if 语句检查目录是否存在
|
||||
os.removedirs(SYNC_DIR)
|
||||
# 记录日志,表示已删除同步工作目录
|
||||
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
|
||||
|
||||
# 异步插入分支日志到日志服务
|
||||
await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id)
|
||||
|
||||
# 假设 log_path 是已定义的变量或配置
|
||||
# 删除之前创建的日志文件
|
||||
# 修正潜在问题:使用已定义的 log_path
|
||||
os.remove(os.path.join(log_path, log_name))
|
||||
|
||||
|
||||
|
|
|
@ -13,36 +13,40 @@ from src.do.log import LogDO
|
|||
from src.dto.log import Log as LogDTO
|
||||
|
||||
|
||||
class LogService(Service):
|
||||
def __init__(self) -> None:
|
||||
self._log_dao = LogDAO()
|
||||
|
||||
async def get_logs_by_job(self, id, page: int = 1, size: int = 10) -> Optional[List[LogDTO]]:
|
||||
cond = text(f"sync_job_id = {id}")
|
||||
start = (page - 1) * size
|
||||
all = await self._log_dao.fetch(cond=cond, start=start, limit=size)
|
||||
data = []
|
||||
for log in all:
|
||||
data.append(self._do_to_dto(log))
|
||||
return data
|
||||
|
||||
async def save_logs(self, id, type, msg) -> Optional[LogDTO]:
|
||||
return await self._log_dao.insert_log(id, type, msg)
|
||||
|
||||
async def delete_logs(self) -> Optional[bool]:
|
||||
return await self._log_dao.delete_log()
|
||||
|
||||
async def count_logs(self, id) -> int:
|
||||
cond = text(f"sync_job_id = {id}")
|
||||
return await self._log_dao._get_count(cond=cond)
|
||||
|
||||
def _do_to_dto(self, log: LogDO) -> LogDTO:
|
||||
return LogDTO(
|
||||
**{
|
||||
'id': log[LogDO].id,
|
||||
'sync_job_id': log[LogDO].sync_job_id,
|
||||
'log_type': log[LogDO].log_type,
|
||||
'log': log[LogDO].log,
|
||||
'create_time': log[LogDO].create_time
|
||||
}
|
||||
)
|
||||
class LogService(Service):
|
||||
#LogService类,用于处理与日志相关的业务逻辑。
|
||||
def __init__(self) -> None:
|
||||
#初始化方法,创建一个LogDAO对象用于数据访问。
|
||||
self._log_dao = LogDAO() # 创建一个LogDAO对象,用于与数据库交互
|
||||
async def get_logs_by_job(self, id, page: int = 1, size: int = 10) -> Optional[List[LogDTO]]:
|
||||
#根据作业ID获取日志列表。
|
||||
#Optional[List[LogDTO]]: 日志DTO列表,如果没有找到则为None
|
||||
cond = text(f"sync_job_id = {id}") # 构建SQL条件,这里可能存在SQL注入的风险
|
||||
start = (page - 1) * size # 计算起始索引
|
||||
all = await self._log_dao.fetch(cond=cond, start=start, limit=size) # 从数据库中获取日志列表
|
||||
data = [] # 初始化一个空列表用于存储DTO对象
|
||||
for log in all:
|
||||
data.append(self._do_to_dto(log)) # 将每个日志DO转换为DTO并添加到列表中
|
||||
return data # 返回DTO列表
|
||||
|
||||
async def save_logs(self, id, type, msg) -> Optional[LogDTO]:
|
||||
#保存日志。
|
||||
return await self._log_dao.insert_log(id, type, msg) # 调用DAO的insert_log方法保存日志
|
||||
|
||||
async def delete_logs(self) -> Optional[bool]:
|
||||
return await self._log_dao.delete_log() # 调用DAO的delete_log方法删除日志
|
||||
|
||||
async def count_logs(self, id) -> int:
|
||||
cond = text(f"sync_job_id = {id}") # 构建SQL条件,这里可能存在SQL注入的风险
|
||||
return await self._log_dao._get_count(cond=cond) # 调用DAO的_get_count方法获取日志数量(注意:通常不建议直接访问类的私有方法)
|
||||
|
||||
def _do_to_dto(self, log: LogDO) -> LogDTO:
|
||||
return LogDTO(
|
||||
**{
|
||||
'id': log.id, # 注意:这里应该是直接使用log.id而不是log[LogDO].id
|
||||
'sync_job_id': log.sync_job_id,
|
||||
'log_type': log.log_type,
|
||||
'log': log.log,
|
||||
'create_time': log.create_time
|
||||
}
|
||||
) # 创建一个新的LogDTO对象并返回
|
||||
|
|
Loading…
Reference in New Issue