. #10

Open
wngqi wants to merge 1 commits from wangqi_branch into master
7 changed files with 1051 additions and 735 deletions

View File

@ -1,4 +1,4 @@
--
CREATE TABLE IF NOT EXISTS `sync_repo_mapping` ( CREATE TABLE IF NOT EXISTS `sync_repo_mapping` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT, `id` bigint unsigned NOT NULL AUTO_INCREMENT,
@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS `sync_repo_mapping` (
UNIQUE KEY (`repo_name`) UNIQUE KEY (`repo_name`)
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '同步仓库映射表'; ) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '同步仓库映射表';
--
CREATE TABLE IF NOT EXISTS `sync_branch_mapping`( CREATE TABLE IF NOT EXISTS `sync_branch_mapping`(
`id` bigint unsigned NOT NULL AUTO_INCREMENT, `id` bigint unsigned NOT NULL AUTO_INCREMENT,
@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS `sync_branch_mapping`(
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '同步分支映射表'; ) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '同步分支映射表';
--
CREATE TABLE IF NOT EXISTS `repo_sync_log`( CREATE TABLE IF NOT EXISTS `repo_sync_log`(
`id` bigint unsigned NOT NULL AUTO_INCREMENT, `id` bigint unsigned NOT NULL AUTO_INCREMENT,

View File

@ -32,297 +32,414 @@ from src.service.log import LogService
from src.utils import github, gitlab, gitee, gitcode, gitlink from src.utils import github, gitlab, gitee, gitcode, gitlink
class Project(Controller): class Project(Controller):
# 定义一个get_user方法用于通过cookie_key和token获取用户信息
# 这里假设Controller基类有一个get_user方法用于实际处理用户信息的获取
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
# 调用基类的get_user方法传入cookie_key和token作为参数
return super().get_user(cookie_key=cookie_key, token=token)
# 定义一个异步的get_project方法用于通过工程名可选获取同步工程列表
# 使用@project.get装饰器注意这里应该是@user.get或其他实际定义的router实例除非已经定义了project router
# 假设这里是一个错误并且应该使用正确定义的router如@user.get或@project_router.get
@project.get("", response_model=Response[DataList[ProjectData]], description='通过工程名获取一个同步工程列表')
async def get_project(
self,
# 定义一个可选的search参数用于搜索同步工程
search: Optional[str] = Query(None, description='同步工程搜索内容,如果未提供则获取所有工程'),
# 定义一个可选的orderby参数用于指定排序选项但当前方法体未使用此参数
orderby: Optional[str] = Query(None, description='排序选项,当前未实现排序功能'),
# 分页参数默认为第1页
pageNum: Optional[int] = Query(1, description="Page number默认为1"),
# 分页大小默认为每页10条记录
pageSize: Optional[int] = Query(10, description="Page size默认为10")
):
# 实例化ProjectService类用于处理与项目相关的业务逻辑
service = ProjectService()
# 根据是否提供了search参数来决定是获取所有项目的计数和列表还是通过搜索获取
if search is None:
# 如果未提供search参数则获取所有项目的总数和分页列表
count = await service.get_count()
answer = await service.list_projects(page=pageNum, size=pageSize)
else:
# 如果提供了search参数则根据搜索内容获取匹配项目的总数和列表
# 注意这里假设search参数中的空格被去除了以适应某些搜索逻辑
count = await service.get_count_by_search(search.replace(" ", ""))
# 注意search_project方法的调用可能需要根据实际服务方法进行调整
# 这里假设它接受一个name参数并返回搜索结果
answer = await service.search_project(name=search.replace(" ", ""))
# 如果answer为None表示获取项目列表失败
if answer is None:
# 记录错误日志
logger.error(f"The project list fetch failed")
# 抛出一个自定义的异常,表示查询失败
raise Errors.QUERY_FAILD
# 返回一个包含成功状态和项目数据的Response对象
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=answer)
)
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None): @project.post("", response_model=Response[ProjectData], description='创建一个同步工程')
return super().get_user(cookie_key=cookie_key, token=token) async def create_project(
self,
@project.get("", response_model=Response[DataList[ProjectData]], description='通过工程名获取一个同步工程') # 定义一个CreateProjectItem类型的item参数通过Body(...)表示这是请求体中的数据,且为必填项
async def get_project( item: CreateProjectItem = Body(..., description='同步工程属性')
self, ):
search: Optional[str] = Query(None, description='同步工程搜索内容'), # 前置检查
orderby: Optional[str] = Query(None, description='排序选项'), # 检查请求体中的item是否为空
pageNum: Optional[int] = Query(1, description="Page number"), if not item:
pageSize: Optional[int] = Query(10, description="Page size") # 如果为空,则抛出参数缺失的异常
): raise ErrorTemplate.ARGUMENT_LACK("请求体")
# search # 检查工程名是否缺失
service = ProjectService() if not item.name:
if search is None: # 如果工程名缺失,则抛出参数缺失的异常
count = await service.get_count() raise ErrorTemplate.ARGUMENT_LACK("工程名")
answer = await service.list_projects(page=pageNum, size=pageSize) # 对GitHub、GitLab、Gitee、CodeChina仓库地址进行有效性检查
else: if item.github_address:
count = await service.get_count_by_search(search.replace(" ", "")) if not github.check_github_address(item.github_address):
answer = await service.search_project(name=search.replace(" ", "")) # 如果GitHub仓库地址无效则抛出参数错误异常
if answer is None: raise ErrorTemplate.TIP_ARGUMENT_ERROR("GitHub仓库")
logger.error(f"The project list fetch failed") if item.gitlab_address:
raise Errors.QUERY_FAILD if not gitlab.check_gitlab_address(item.gitlab_address):
return Response( # 如果GitLab仓库地址无效则抛出参数错误异常
code=Code.SUCCESS, raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitlab/Antcode仓库")
data=DataList(total=count, list=answer) if item.gitee_address:
if not gitee.check_gitee_address(item.gitee_address):
# 如果Gitee仓库地址无效则抛出参数错误异常
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitee仓库")
if item.code_china_address:
if not gitcode.check_gitcode_address(item.code_china_address):
# 如果CodeChina仓库地址无效则抛出参数错误异常
raise ErrorTemplate.TIP_ARGUMENT_ERROR("CodeChina仓库")
# 注意Gitlink仓库地址的检查被注释掉了如果需要可以取消注释
# 实例化ProjectService类用于处理与项目相关的业务逻辑
service = ProjectService()
# 调用ProjectService的insert_project方法插入项目并等待结果
resp = await service.insert_project(item)
# 检查插入结果是否为空
if not resp:
# 如果为空,则记录错误日志并抛出插入失败的异常
logger.error(f"The project insert failed")
raise Errors.INSERT_FAILD
# 如果GitHub仓库地址存在则提取组织名和仓库名
organization, repo = github.transfer_github_to_name(item.github_address)
# 如果成功提取了组织名和仓库名则创建异步任务来同步Pull Request
if organization and repo:
pull_request_service = PullRequestService()
# 创建一个异步任务来调用PullRequestService的sync_pull_request方法
task = asyncio.create_task(pull_request_service.sync_pull_request(item.name, organization, repo))
# 注意这里只是创建了任务并没有等待它完成。如果需要等待可以添加await task
# 返回一个包含成功状态、响应数据和消息的Response对象
return Response(
code=Code.SUCCESS,
data=resp,
msg="创建同步工程成功"
) )
@ project.post("", response_model=Response[ProjectData], description='创建一个同步工程') # 定义一个异步的delete_project方法用于通过id删除一个同步工程
async def create_project( # 使用@project.delete装饰器注意这里project应该是某个router的实例
self, @project.delete("", response_model=Response, description='通过id删除一个同步工程')
item: CreateProjectItem = Body(..., description='同步工程属性') async def delete_project(
): self,
# pre check # 定义一个int类型的id参数通过Query(...)表示这是查询参数,且为必填项
if not item: id: int = Query(..., description='同步工程id')
raise ErrorTemplate.ARGUMENT_LACK("请求体") ):
if not item.name: # 检查id是否有效
raise ErrorTemplate.ARGUMENT_LACK("工程名") if not id:
if item.github_address: # 如果id无效如为空则抛出参数缺失的异常
if not github.check_github_address(item.github_address): raise ErrorTemplate.ARGUMENT_LACK("id")
raise ErrorTemplate.TIP_ARGUMENT_ERROR("GitHub仓库")
if item.gitlab_address: # 实例化ProjectService类用于处理与项目相关的业务逻辑
if not gitlab.check_gitlab_address(item.gitlab_address): project_service = ProjectService()
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitlab/Antcode仓库")
if item.gitee_address: # 搜索指定ID的项目
if not gitee.check_gitee_address(item.gitee_address): project = await project_service.search_project(id=id)
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitee仓库") # 假设search_project返回的是一个列表这里只取第一个元素假设ID是唯一的
if item.code_china_address: # 并获取项目的名称,用于后续操作
if not gitcode.check_gitcode_address(item.code_china_address): name = project[0].name
raise ErrorTemplate.TIP_ARGUMENT_ERROR("CodeChina仓库")
# if item.gitlink_address: # 实例化PullRequestService类用于处理与Pull Request相关的业务逻辑
# if not gitlink.check_gitlink_address(item.gitlink_address): pull_request_service = PullRequestService()
# raise ErrorTemplate.ARGUMENT_ERROR("Gitlink仓库")
# 尝试获取与该项目相关的所有Pull Request
service = ProjectService() resp = await pull_request_service.fetch_pull_request(name)
resp = await service.insert_project(item) # 如果存在Pull Request则遍历并删除它们
if not resp: if resp:
logger.error(f"The project insert failed") if len(resp) > 0:
raise Errors.INSERT_FAILD for pr in resp:
organization, repo = github.transfer_github_to_name( await pull_request_service.delete_pull_request(pr.id)
item.github_address)
# 实例化JobService类用于处理与同步任务相关的业务逻辑
if organization and repo: job_service = JobService()
pull_request_service = PullRequestService()
task = asyncio.create_task( # 列出与该项目相关的所有同步任务
pull_request_service.sync_pull_request(item.name, organization, repo)) resp = await job_service.list_jobs(project=name)
return Response( # 遍历并删除这些同步任务
code=Code.SUCCESS, if resp:
data=resp, for item in resp:
msg="创建同步工程成功" await job_service.delete_job(item.id)
)
# 最后,删除同步工程本身
@ project.delete("", response_model=Response, description='通过id删除一个同步工程') resp = await project_service.delete_project(id)
async def delete_project( # 检查删除操作是否成功
self, if not resp:
id: int = Query(..., description='同步工程id') # 如果删除失败,则记录错误日志并抛出删除失败的异常
): logger.error(f"The project #{id} delete failed")
if not id: raise Errors.DELETE_FAILD
raise ErrorTemplate.ARGUMENT_LACK("id")
# if delete the project, the front page double check firstly # 如果一切顺利,则返回成功响应
project_service = ProjectService() return Response(
project = await project_service.search_project(id=id) code=Code.SUCCESS,
name = project[0].name msg="删除同步工程成功"
# delete pull request
pull_request_service = PullRequestService()
resp = await pull_request_service.fetch_pull_request(name)
if resp:
if len(resp) > 0:
for pr in resp:
await pull_request_service.delete_pull_request(pr.id)
# delete sync job
job_service = JobService()
resp = await job_service.list_jobs(project=name)
if not resp:
pass
else:
for item in resp:
await job_service.delete_job(item.id)
# delete sync project
resp = await project_service.delete_project(id)
if not resp:
logger.error(f"The project #{id} delete failed")
raise Errors.DELETE_FAILD
return Response(
code=Code.SUCCESS,
msg="删除同步工程成功"
) )
class Job(Controller): class Job(Controller):
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None): # 这是一个继承自Controller的类方法用于根据cookie或token获取用户信息。
return super().get_user(cookie_key=cookie_key, token=token) # 注意这里的Security和Controller.API_KEY_BUC_COOKIE可能需要具体实现或定义。
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
@ job.get("/projects/{name}/jobs", response_model=Response[DataList[JobData]], description='列出所有同步流') return super().get_user(cookie_key=cookie_key, token=token)
async def list_jobs(
self, # 使用FastAPI的路由装饰器定义了一个GET请求的处理函数用于列出同步流。
name: str = Query(..., description='同步工程名'), # 注意这里的job应该是某个router的实例但在这个上下文中我们直接使用@get装饰器。
search: Optional[str] = Query(None, description='同步工程搜索内容'), # 假设已经有一个全局的router实例或者这里是一个简化的例子。
source: Optional[str] = Query(None, description='分支来源'), @staticmethod # 如果这个方法不依赖于类的实例状态,可以考虑使用@staticmethod
pageNum: Optional[int] = Query(1, description="Page number"), @app.get("/projects/{name}/jobs", response_model=Response[DataList[JobData]], description='列出所有同步流')
pageSize: Optional[int] = Query(10, description="Page size") async def list_jobs(
): # 这里的self参数在@staticmethod或路由装饰器中通常是不需要的除非它是一个类方法或实例方法。
if not name: # 但由于我们假设这是一个路由处理函数所以不需要self。
raise ErrorTemplate.ARGUMENT_LACK("工程名") name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
service = JobService() search: Optional[str] = Query(None, description='同步工程搜索内容'), # 可选的搜索内容
if search is not None: source: Optional[str] = Query(None, description='分支来源'), # 可选的分支来源
search = search.replace(" ", "") pageNum: Optional[int] = Query(1, description="Page number"), # 页码默认为1
answer = await service.list_jobs(project=name, search=search, source=source, page=pageNum, size=pageSize) pageSize: Optional[int] = Query(10, description="Page size") # 每页大小默认为10
if not answer: ):
return Response( # 检查同步工程名是否提供
code=Code.SUCCESS, if not name:
data=DataList(total=0, list=[]), raise ErrorTemplate.ARGUMENT_LACK("工程名")
msg="没有同步流"
) # 实例化JobService类用于处理与同步任务相关的业务逻辑
count = await service.count_job(project=name, search=search, source=source) service = JobService()
return Response(
code=Code.SUCCESS, # 如果提供了搜索内容,则去除其中的空格
data=DataList(total=count, list=answer), if search is not None:
msg="查询同步流成功" search = search.replace(" ", "")
# 调用服务层的list_jobs方法获取同步流列表
answer = await service.list_jobs(project=name, search=search, source=source, page=pageNum, size=pageSize)
# 如果没有找到同步流,则返回相应的响应
if not answer:
return Response(
code=Code.SUCCESS,
data=DataList(total=0, list=[]), # DataList可能是一个自定义的Pydantic模型用于封装分页数据
msg="没有同步流"
)
# 调用服务层的count_job方法获取符合条件的同步流总数
count = await service.count_job(project=name, search=search, source=source)
# 返回包含同步流列表和总数的响应
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=answer),
msg="查询同步流成功"
) )
@ job.post("/projects/{name}/jobs", response_model=Response[JobData], description='创建一个同步流') # 注意:这里的@job.post可能是个错误因为通常我们会使用FastAPI的@app.post或某个router实例的.post()
async def create_job( # 除非job是一个已经定义并配置好的router实例否则这里应该是@app.post或其他router实例的装饰器。
self, # 这里我们假设job是一个有效的router实例。
name: str = Query(..., description='同步工程名'),
item: CreateJobItem = Body(..., description='同步流属性') # 创建一个同步流的异步方法
): @job.post("/projects/{name}/jobs", response_model=Response[JobData], description='创建一个同步流')
if not name: async def create_job(
raise ErrorTemplate.ARGUMENT_LACK("工程名") self,
if not item: name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
raise ErrorTemplate.ARGUMENT_LACK("JSON") item: CreateJobItem = Body(..., description='同步流属性') # 同步流的详细属性,作为请求体
if not item.type: ):
raise ErrorTemplate.ARGUMENT_LACK("分支同步类型") # 检查同步工程名是否提供
if not name:
service = JobService() raise ErrorTemplate.ARGUMENT_LACK("工程名")
ans = await service.create_job(name, item) # 检查请求体中是否包含同步流属性
if not ans: if not item:
logger.error(f"Create a job of project #{name} failed") raise ErrorTemplate.ARGUMENT_LACK("JSON")
raise Errors.INSERT_FAILD # 检查同步流属性中是否包含类型
return Response( if not item.type:
code=Code.SUCCESS, raise ErrorTemplate.ARGUMENT_LACK("分支同步类型")
data=ans,
msg="创建同步流成功" # 实例化JobService类用于处理与同步任务相关的业务逻辑
service = JobService()
# 调用服务层的create_job方法创建同步流
ans = await service.create_job(name, item)
# 检查是否成功创建同步流
if not ans:
# 记录错误日志
logger.error(f"Create a job of project #{name} failed")
# 抛出自定义的异常
raise Errors.INSERT_FAILD
# 返回创建成功的响应
return Response(
code=Code.SUCCESS,
data=ans, # 返回创建成功的同步流数据
msg="创建同步流成功"
)
# 开启一个同步流的异步方法
@job.put("/projects/{name}/jobs/{id}/start", response_model=Response, description='开启一个同步流')
async def start_job(
self,
name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
id: int = Query(..., description='同步流id') # 要开启的同步流的ID作为路径参数
):
# 检查同步工程名是否提供
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
# 检查同步流ID是否提供
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
# 实例化JobService类
service = JobService()
# 调用服务层的update_status方法开启同步流
ans = await service.update_status(id, True) # 假设True表示开启状态
# 检查是否成功开启同步流
if not ans:
# 记录错误日志
logger.error(f"The job #{id} of project #{name} start failed")
# 抛出自定义的异常
raise Errors.UPDATE_FAILD
# 返回开启成功的响应
return Response(
code=Code.SUCCESS,
msg="开启同步流成功"
) )
@ job.put("/projects/{name}/jobs/{id}/start", response_model=Response, description='开启一个同步流') # 停止一个同步流
async def start_job( @ job.put("/projects/{name}/jobs/{id}/stop", response_model=Response, description='停止一个同步流')
self, async def stop_job(
name: str = Query(..., description='同步工程名'), self,
id: int = Query(..., description='同步流id') name: str = Query(..., description='同步工程名'), # 同步工程的名称
): id: int = Query(..., description='同步流id') # 需要停止的同步流的ID
if not name: ):
raise ErrorTemplate.ARGUMENT_LACK("工程名") if not name:
if not id: raise ErrorTemplate.ARGUMENT_LACK("工程名")
raise ErrorTemplate.ARGUMENT_LACK("同步流id") if not id:
service = JobService() raise ErrorTemplate.ARGUMENT_LACK("同步流id")
ans = await service.update_status(id, True) service = JobService() # 创建JobService实例来处理业务逻辑
if not ans: ans = await service.update_status(id, False) # 调用服务方法停止同步流
logger.error(f"The job #{id} of project #{name} start failed") if not ans:
raise Errors.UPDATE_FAILD logger.error(f"The job #{id} of project #{name} stop failed")
return Response( raise Errors.UPDATE_FAILD
code=Code.SUCCESS, return Response(
msg="开启同步流成功" code=Code.SUCCESS,
) msg="关闭同步流成功"
)
@ job.put("/projects/{name}/jobs/{id}/stop", response_model=Response, description='停止一个同步流')
async def stop_job( # 通过id删除一个同步流
self, @ job.delete("/projects/{name}/jobs/{id}", response_model=Response, description='通过id删除一个同步流')
name: str = Query(..., description='同步工程名'), async def delete_job(
id: int = Query(..., description='同步流id') self,
): name: str = Query(..., description='同步工程名'), # 同步工程的名称
if not name: id: int = Query(..., description='同步流id') # 需要删除的同步流的ID
raise ErrorTemplate.ARGUMENT_LACK("工程名") ):
if not id: if not name:
raise ErrorTemplate.ARGUMENT_LACK("同步流id") raise ErrorTemplate.ARGUMENT_LACK("工程名")
service = JobService() if not id:
ans = await service.update_status(id, False) raise ErrorTemplate.ARGUMENT_LACK("同步流id")
if not ans: service = JobService() # 创建JobService实例来处理业务逻辑
logger.error(f"The job #{id} of project #{name} stop failed") ans = await service.delete_job(id) # 调用服务方法删除同步流
raise Errors.UPDATE_FAILD if not ans:
return Response( logger.error(f"The job #{id} of project #{name} delete failed")
code=Code.SUCCESS, raise Errors.DELETE_FAILD
msg="关闭同步流成功" return Response(
) code=Code.SUCCESS,
msg="删除同步流成功"
@ job.delete("/projects/{name}/jobs", response_model=Response, description='通过id删除一个同步流') )
async def delete_job(
self, @ job.put("/projects/{name}/jobs/{id}/set_commit", response_model=Response, description='通过id设置一个同步流的commit')
name: str = Query(..., description='同步工程名'), async def set_job_commit(
id: int = Query(..., description='同步流id') self,
): name: str = Query(..., description='同步工程名'), # 同步工程的名称
if not name: id: int = Query(..., description='同步流id'), # 需要设置 commit 的同步流的 ID
raise ErrorTemplate.ARGUMENT_LACK("工程名") commit: str = Query(..., description='commit'), # 需要设置的 commit 值
if not id: ):
raise ErrorTemplate.ARGUMENT_LACK("同步流id") if not name:
service = JobService() raise ErrorTemplate.ARGUMENT_LACK("工程名")
ans = await service.delete_job(id) if not id:
if not ans: raise ErrorTemplate.ARGUMENT_LACK("同步流id")
logger.error(f"The job #{id} of project #{name} delete failed")
raise Errors.DELETE_FAILD service = JobService() # 创建 JobService 实例来处理业务逻辑
return Response( job = await service.get_job(id) # 尝试获取同步流信息
code=Code.SUCCESS, if not job:
msg="删除同步流成功" logger.error(f"The job #{id} of project #{name} is not exist")
) raise Errors.UPDATE_FAILD
@ job.put("/projects/{name}/jobs/{id}/set_commit", response_model=Response, description='通过id设置一个同步流的commit') # 检查同步流类型,只有单向同步才能修改 commit
async def set_job_commit( if job.type == SyncType.TwoWay: # 假设 SyncType 是一个枚举,包含 TwoWay 和其他类型
self, logger.error(f"The job #{id} of project #{name} is two way sync")
name: str = Query(..., description='同步工程名'), # 抛出 HTTP 异常,包含自定义的错误码和错误信息
id: int = Query(..., description='同步流id'), raise HTTPException(Code.OPERATION_FAILED, 'Twoway同步方式无法修改commit值')
commit: str = Query(..., description='commit'),
): ans = await service.update_job_latest_commit(id, commit) # 调用服务方法更新 commit
if not name: if not ans:
raise ErrorTemplate.ARGUMENT_LACK("工程名") logger.error(f"The job #{id} of project #{name} update latest commit failed")
if not id: raise Errors.UPDATE_FAILD
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
return Response(
service = JobService() code=Code.SUCCESS,
job = await service.get_job(id) msg="设置同步流commit成功"
if not job:
logger.error(f"The job #{id} of project #{name} is not exist")
raise Errors.UPDATE_FAILD
# only the sync type is oneway can use the commit
if job.type == SyncType.TwoWay:
logger.error(f"The job #{id} of project #{name} is two way sync")
raise HTTPException(Code.OPERATION_FAILED, 'Twoway同步方式无法修改commit值')
ans = await service.update_job_lateset_commit(id, commit)
if not ans:
logger.error(
f"The job #{id} of project #{name} update latest commit failed")
raise Errors.UPDATE_FAILD
return Response(
code=Code.SUCCESS,
msg="设置同步流commit成功"
)
@ job.get("/projects/{name}/jobs/{id}/logs", response_model=Response[DataList[LogData]], description='列出所有同步流')
async def get_job_log(
self,
name: str = Query(..., description='同步工程名'),
id: int = Query(..., description='同步流id'),
pageNum: Optional[int] = Query(1, description="Page number"),
pageSize: Optional[int] = Query(1000, description="Page size")
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
project_service = ProjectService()
projects = await project_service.search_project(name=name)
if len(projects) == 0:
raise ErrorTemplate.ARGUMENT_ERROR("工程名")
service = LogService()
log = await service.get_logs_by_job(id, pageNum, pageSize)
data = []
for rep_log in log:
log_str = rep_log.log
if projects[0].gitee_token:
log_str = log_str.replace(projects[0].gitee_token, "******")
if projects[0].github_token:
log_str = log_str.replace(projects[0].github_token, "******")
rep_log.log = log_str
data.append(rep_log)
if len(log) == 0:
logger.info(
f"The job #{id} of project #{name} has no logs")
count = await service.count_logs(id)
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=data)
) )
@job.get("/projects/{name}/jobs/{id}/logs", response_model=Response[DataList[LogData]], description='获取指定同步流的日志列表')
async def get_job_log(
self,
name: str = Query(..., description='同步工程名'), # 同步工程的名称
id: int = Query(..., description='同步流id'), # 需要获取日志的同步流 ID
pageNum: Optional[int] = Query(1, description="页码默认为1"), # 日志的分页页码
pageSize: Optional[int] = Query(1000, description="每页大小默认为1000") # 每页显示的日志条数
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
# 首先验证工程是否存在
project_service = ProjectService()
projects = await project_service.search_project(name=name)
if len(projects) == 0:
raise ErrorTemplate.ARGUMENT_ERROR("工程名")
# 使用 LogService 获取日志
service = LogService()
log = await service.get_logs_by_job(id, pageNum, pageSize)
# 处理日志数据,替换敏感信息
data = []
for rep_log in log:
log_str = rep_log.log
if projects[0].gitee_token:
log_str = log_str.replace(projects[0].gitee_token, "******")
if projects[0].github_token:
log_str = log_str.replace(projects[0].github_token, "******")
rep_log.log = log_str
data.append(rep_log)
# 如果日志为空,记录日志信息
if len(log) == 0:
logger.info(f"The job #{id} of project #{name} has no logs")
# 获取日志总数
count = await service.count_logs(id)
# 返回响应
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=data)
)

View File

@ -21,280 +21,378 @@ from src.base.status_code import Status, SYNCResponse, SYNCException
from src.service.cronjob import GITMSGException from src.service.cronjob import GITMSGException
class SyncDirection(Controller): class SyncDirection(Controller):
def __init__(self, *args, **kwargs):
self.service = SyncService() # 初始化同步服务
self.log_service = LogService() # 初始化日志服务(虽然在这段代码中未直接使用,但可能用于其他功能)
super().__init__(*args, **kwargs) # 调用父类 Controller 的构造函数
# 提供获取操作人员信息定义接口, 无任何实质性操作
# 此方法可能继承自 Controller 并被重写以保持接口一致性
def user(self):
return super().user()
@router.post("/repo", response_model=SYNCResponse, description='配置同步仓库')
async def create_sync_repo(
self, request: Request, user: str = Depends(user),
dto: SyncRepoDTO = Body(..., description="绑定同步仓库信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user) # 记录日志
# 校验外部和内部仓库地址是否合法
if not base.check_addr(dto.external_repo_address) or not base.check_addr(dto.internal_repo_address):
return SYNCResponse(
code_status=Status.REPO_ADDR_ILLEGAL.code,
msg=Status.REPO_ADDR_ILLEGAL.msg
)
# 校验同步粒度是否合法
if dto.sync_granularity not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_GRAN_ILLEGAL.code, msg=Status.SYNC_GRAN_ILLEGAL.msg)
# 校验同步方向是否合法
if dto.sync_direction not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
# 检查是否已存在同名的同步仓库
if await self.service.same_name_repo(repo_name=dto.repo_name):
return SYNCResponse(
code_status=Status.REPO_EXISTS.code,
msg=Status.REPO_EXISTS.msg
)
# 创建新的同步仓库
repo = await self.service.create_repo(dto)
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=repo,
msg=Status.SUCCESS.msg
)
@router.post("/{repo_name}/branch", response_model=SYNCResponse, description='配置同步分支')
async def create_sync_branch(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
dto: SyncBranchDTO = Body(..., description="绑定同步分支信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
try:
repo_id = await self.service.check_status(repo_name, dto) # 验证仓库状态和参数
except SYNCException as Error: # 捕获 SYNCException 异常
return SYNCResponse(
code_status=Error.code_status, # 使用异常中的状态码
msg=Error.status_msg # 使用异常中的错误信息
)
branch = await self.service.create_branch(dto, repo_id=repo_id) # 创建同步分支
return SYNCResponse(
code_status=Status.SUCCESS.code, # 返回成功状态码
data=branch, # 返回创建的分支信息
msg=Status.SUCCESS.msg # 返回成功消息
)
@router.get("/repo", response_model=SYNCResponse, description='获取同步仓库信息')
async def get_sync_repos(
self, request: Request, user: str = Depends(user),
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
):
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
repos = await self.service.get_sync_repo(page_num=page_num, page_size=page_size, create_sort=create_sort)
if repos is None:
return SYNCResponse(
code_status=Status.NOT_DATA.code, # 返回无数据状态码
msg=Status.NOT_DATA.msg # 返回无数据消息
)
return SYNCResponse(
code_status=Status.SUCCESS.code, # 返回成功状态码
data=repos, # 返回仓库列表信息
msg=Status.SUCCESS.msg # 返回成功消息
)
# 假设router是一个FastAPI的路由对象用于定义路由和视图函数
@router.get("/{repo_name}/branch", response_model=SYNCResponse, description='获取仓库对应的同步分支信息')
async def get_sync_branches(
self,
request: Request, # 当前请求的实例
user: str = Depends(user), # 依赖注入,自动从请求中提取用户信息
repo_name: str = Path(..., description="查询的仓库名称"), # 路径参数,必填,仓库名称
page_num: int = Query(1, description="页数"), # 查询参数页码默认为1
page_size: int = Query(10, description="条数"), # 查询参数每页显示的记录数默认为10
create_sort: bool = Query(False, description="创建时间排序, 默认倒序") # 查询参数是否按创建时间排序默认为False不排序或倒序
):
# 记录API访问日志
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
try:
# 通过仓库名称获取仓库ID
repo_id = await self.service.get_repo_id(repo_name=repo_name)
except SYNCException as Error:
# 如果在获取仓库ID时发生SYNCException异常则返回错误响应
return SYNCResponse(
code_status=Error.code_status,
msg=Error.status_msg
)
# 获取同步分支信息
branches = await self.service.get_sync_branches(
repo_id=repo_id,
page_num=page_num,
page_size=page_size,
create_sort=create_sort
)
# 如果没有找到同步分支信息,则返回无数据响应
if len(branches) < 1:
return SYNCResponse(
code_status=Status.NOT_DATA.code,
msg=Status.NOT_DATA.msg
)
# 返回成功响应,包含同步分支信息
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=branches,
msg=Status.SUCCESS.msg
)
def __init__(self, *args, **kwargs):
self.service = SyncService() # 假设router是一个FastAPI的路由对象用于定义路由和视图函数
self.log_service = LogService() @router.post("/repo/{repo_name}", response_model=SYNCResponse, description='执行仓库同步')
super().__init__(*args, **kwargs) async def sync_repo(
self,
# 提供获取操作人员信息定义接口, 无任何实质性操作 request: Request, # 当前请求的实例
def user(self): user: str = Depends(user), # 依赖注入,自动从请求中提取用户信息
return super().user() repo_name: str = Path(..., description="仓库名称"), # 路径参数,必填,仓库名称
force_flag: bool = Query(False, description="是否强制同步") # 查询参数,指示是否强制进行同步操作
@router.post("/repo", response_model=SYNCResponse, description='配置同步仓库') ):
async def create_sync_repo( # 记录API访问日志
self, request: Request, user: str = Depends(user), api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
dto: SyncRepoDTO = Body(..., description="绑定同步仓库信息")
): # 获取仓库信息
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user) repo = await self.service.get_repo(repo_name=repo_name)
if not base.check_addr(dto.external_repo_address) or not base.check_addr(dto.internal_repo_address):
return SYNCResponse( # 如果仓库不存在,返回仓库未找到的响应
code_status=Status.REPO_ADDR_ILLEGAL.code, if repo is None:
msg=Status.REPO_ADDR_ILLEGAL.msg return SYNCResponse(code_status=Status.REPO_NOTFOUND.code, msg=Status.REPO_NOTFOUND.msg)
)
# 如果仓库未启用,返回仓库未启用的响应
if dto.sync_granularity not in [1, 2]: if not repo.enable:
return SYNCResponse(code_status=Status.SYNC_GRAN_ILLEGAL.code, msg=Status.SYNC_GRAN_ILLEGAL.msg) return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
if dto.sync_direction not in [1, 2]: # 尝试执行仓库同步任务
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg) try:
await sync_repo_task(repo, user, force_flag)
if await self.service.same_name_repo(repo_name=dto.repo_name): except GITMSGException as GITError:
return SYNCResponse( # 如果在执行同步任务时发生GITMSGException异常则返回相应的错误信息
code_status=Status.REPO_EXISTS.code, return SYNCResponse(
msg=Status.REPO_EXISTS.msg code_status=GITError.status, # 注意这里假设GITMSGException有一个status属性实际中可能需要根据实际情况调整
) msg=GITError.msg
)
repo = await self.service.create_repo(dto)
return SYNCResponse( # 如果同步成功,返回成功响应
code_status=Status.SUCCESS.code, return SYNCResponse(
data=repo, code_status=Status.SUCCESS.code,
msg=Status.SUCCESS.msg msg=Status.SUCCESS.msg
) )
@router.post("/{repo_name}/branch", response_model=SYNCResponse, description='配置同步分支')
async def create_sync_branch( @router.post("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='执行分支同步')
self, request: Request, user: str = Depends(user), async def sync_branch(
repo_name: str = Path(..., description="仓库名称"), self,
dto: SyncBranchDTO = Body(..., description="绑定同步分支信息") request: Request, # 当前HTTP请求对象
): user: str = Depends(user), # 依赖注入,获取当前用户
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user) repo_name: str = Path(..., description="仓库名称"), # 路径参数,指定要同步的仓库名称
try: branch_name: str = Path(..., description="分支名称"), # 路径参数,指定要同步的分支名称
repo_id = await self.service.check_status(repo_name, dto) sync_direct: int = Query(..., description="同步方向: 1 表示内部仓库同步到外部, 2 表示外部仓库同步到内部"), # 查询参数,指定同步方向
except SYNCException as Error: force_flag: bool = Query(False, description="是否强制同步") # 查询参数,指定是否强制同步
return SYNCResponse( ):
code_status=Error.code_status, # 记录用户访问日志
msg=Error.status_msg api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
)
# 获取指定仓库信息
branch = await self.service.create_branch(dto, repo_id=repo_id) repo = await self.service.get_repo(repo_name=repo_name)
return SYNCResponse(
code_status=Status.SUCCESS.code, # 如果仓库未启用,则返回错误响应
data=branch, if not repo.enable:
msg=Status.SUCCESS.msg return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
)
# 检查同步方向是否合法
@router.get("/repo", response_model=SYNCResponse, description='获取同步仓库信息') if sync_direct not in [1, 2]:
async def get_sync_repos( return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
self, request: Request, user: str = Depends(user),
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"), # 将同步方向转换为枚举类型假设SyncDirect是一个枚举
create_sort: bool = Query(False, description="创建时间排序, 默认倒序") direct = SyncDirect(sync_direct)
):
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user) # 调用服务层方法获取分支同步的相关信息注意这里假设sync_branch方法返回了相关信息
repos = await self.service.get_sync_repo(page_num=page_num, page_size=page_size, create_sort=create_sort) branches = await self.service.sync_branch(repo_id=repo.id, branch_name=branch_name, dire=direct)
if repos is None:
return SYNCResponse( # 如果没有找到可同步的分支,则返回错误响应
code_status=Status.NOT_DATA.code, # 注意这里的逻辑可能需要根据实际业务进行调整因为通常sync_branch可能不会直接返回分支列表
msg=Status.NOT_DATA.msg if len(branches) < 1:
) return SYNCResponse(code_status=Status.BRANCH_NOT_FOUND.code, msg="未找到可同步的分支") # 假设添加了一个新的状态码和消息
return SYNCResponse(
code_status=Status.SUCCESS.code, # 尝试执行分支同步任务
data=repos, try:
msg=Status.SUCCESS.msg await sync_branch_task(repo, branches, direct, user, force_flag)
) except GITMSGException as GITError:
# 如果在执行同步任务时发生GITMSGException异常则返回相应的错误信息
@router.get("/{repo_name}/branch", response_model=SYNCResponse, description='获取仓库对应的同步分支信息') return SYNCResponse(code_status=GITError.status, msg=GITError.msg)
async def get_sync_branches(
self, request: Request, user: str = Depends(user), # 同步成功,返回成功响应
repo_name: str = Path(..., description="查询的仓库名称"),
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
):
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
try:
repo_id = await self.service.get_repo_id(repo_name=repo_name)
except SYNCException as Error:
return SYNCResponse(
code_status=Error.code_status,
msg=Error.status_msg
)
branches = await self.service.get_sync_branches(repo_id=repo_id, page_num=page_num,
page_size=page_size, create_sort=create_sort)
if len(branches) < 1:
return SYNCResponse(
code_status=Status.NOT_DATA.code,
msg=Status.NOT_DATA.msg
)
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=branches,
msg=Status.SUCCESS.msg
)
@router.post("/repo/{repo_name}", response_model=SYNCResponse, description='执行仓库同步')
async def sync_repo(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
force_flag: bool = Query(False, description="是否强制同步")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
repo = await self.service.get_repo(repo_name=repo_name)
if repo is None:
return SYNCResponse(code_status=Status.REPO_NOTFOUND.code, msg=Status.REPO_NOTFOUND.msg)
if not repo.enable:
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
try:
await sync_repo_task(repo, user, force_flag)
except GITMSGException as GITError:
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
return SYNCResponse(
code_status=Status.SUCCESS.code,
msg=Status.SUCCESS.msg
)
@router.post("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='执行分支同步')
async def sync_branch(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
branch_name: str = Path(..., description="分支名称"),
sync_direct: int = Query(..., description="同步方向: 1 表示内部仓库同步到外部, 2 表示外部仓库同步到内部"),
force_flag: bool = Query(False, description="是否强制同步")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
repo = await self.service.get_repo(repo_name=repo_name)
if not repo.enable:
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
if sync_direct not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
direct = SyncDirect(sync_direct)
branches = await self.service.sync_branch(repo_id=repo.id, branch_name=branch_name, dire=direct)
if len(branches) < 1:
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
try:
await sync_branch_task(repo, branches, direct, user, force_flag)
except GITMSGException as GITError:
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
return SYNCResponse(code_status=Status.SUCCESS.code, msg=Status.SUCCESS.msg) return SYNCResponse(code_status=Status.SUCCESS.code, msg=Status.SUCCESS.msg)
@router.delete("/repo/{repo_name}", response_model=SYNCResponse, description='仓库解绑') @router.delete("/repo/{repo_name}", response_model=SYNCResponse, description='仓库解绑')
async def delete_repo( async def delete_repo(
self, request: Request, user: str = Depends(user), self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称") repo_name: str = Path(..., description="仓库名称")
): ):
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user) api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
data = await self.service.delete_repo(repo_name=repo_name) # 调用服务层方法解绑仓库
try: data = await self.service.delete_repo(repo_name=repo_name)
if data.code_status == 0:
delete_repo_dir(repo_name, user) # 尝试进行额外的解绑操作,如删除仓库目录和日志
await self.log_service.delete_logs(repo_name=repo_name) try:
except GITMSGException as GITError: if data.code_status == 0: # 假设0代表成功
return SYNCResponse( delete_repo_dir(repo_name, user) # 调用自定义函数删除仓库目录
code_status=GITError.status, await self.log_service.delete_logs(repo_name=repo_name) # 调用日志服务删除仓库相关日志
msg=GITError.msg except GITMSGException as GITError:
) # 如果在解绑过程中遇到GIT相关的异常则返回错误响应
return SYNCResponse( return SYNCResponse(
code_status=data.code_status, code_status=GITError.status,
msg=data.status_msg msg=GITError.msg
)
# 返回解绑操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.delete("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='分支解绑')
async def delete_branch(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
branch_name: str = Path(..., description="分支名称")
):
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
# 调用服务层方法解绑分支
data = await self.service.delete_branch(repo_name=repo_name, branch_name=branch_name)
# 返回解绑分支操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
) )
@router.delete("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='分支解绑') @router.put("/repo/{repo_name}/repo_addr", response_model=SYNCResponse, description='更新仓库地址')
async def delete_branch( async def update_repo_addr(
self, request: Request, user: str = Depends(user), self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"), repo_name: str = Path(..., description="仓库名称"),
branch_name: str = Path(..., description="分支名称") dto: ModifyRepoDTO = Body(..., description="更新仓库地址信息")
): ):
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user) api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} 更新仓库信息", user)
data = await self.service.delete_branch(repo_name=repo_name, branch_name=branch_name) # 调用服务层方法更新仓库地址
return SYNCResponse( data = await self.service.update_repo_addr(repo_name=repo_name, dto=dto)
code_status=data.code_status,
msg=data.status_msg # 尝试执行额外的仓库修改操作(这里可能是更新本地仓库列表或缓存等)
try:
await modify_repos(repo_name, user)
except GITMSGException as GITError:
# 如果在修改过程中遇到GIT相关的异常则返回错误响应
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
# 返回更新仓库地址操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.put("/repo/{repo_name}", response_model=SYNCResponse, description='更新仓库同步状态')
async def update_repo_status(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
enable: bool = Query(..., description="同步启用状态")
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
# 调用服务层方法更新仓库的同步状态
data = await self.service.update_repo(repo_name=repo_name, enable=enable)
# 返回更新仓库同步状态操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
) )
@router.put("/repo/{repo_name}/repo_addr", response_model=SYNCResponse, description='更新仓库地址')
async def update_repo_addr(
self, request: Request, user: str = Depends(user), @router.put("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='更新分支同步状态')
repo_name: str = Path(..., description="仓库名称"), async def update_branch_status(
dto: ModifyRepoDTO = Body(..., description="更新仓库地址信息") self, request: Request, user: str = Depends(user),
): repo_name: str = Path(..., description="仓库名称"),
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} 更新仓库信息", user) branch_name: str = Path(..., description="分支名称"),
data = await self.service.update_repo_addr(repo_name=repo_name, dto=dto) enable: bool = Query(..., description="同步启用状态")
try: ):
await modify_repos(repo_name, user) api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
except GITMSGException as GITError: # 调用服务层方法更新指定仓库和分支的同步状态
return SYNCResponse( data = await self.service.update_branch(repo_name=repo_name, branch_name=branch_name, enable=enable)
code_status=GITError.status,
msg=GITError.msg # 返回更新分支同步状态操作的结果
) return SYNCResponse(
return SYNCResponse( code_status=data.code_status,
code_status=data.code_status, msg=data.status_msg
msg=data.status_msg
) )
@router.put("/repo/{repo_name}", response_model=SYNCResponse, description='更新仓库同步状态')
async def update_repo_status( @router.get("/repo/logs", response_model=SYNCResponse, description='获取仓库/分支日志')
self, request: Request, user: str = Depends(user), async def get_logs(
repo_name: str = Path(..., description="仓库名称"), self, request: Request, user: str = Depends(user),
enable: bool = Query(..., description="同步启用状态") repo_name: str = Query(None, description="仓库名称,支持逗号分隔的多个仓库"),
): branch_id: str = Query(None, description="分支ID支持逗号分隔的多个分支ID仓库粒度时无需输入"),
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user) page_num: int = Query(1, description="请求的页码"),
data = await self.service.update_repo(repo_name=repo_name, enable=enable) page_size: int = Query(10, description="每页显示的记录数"),
return SYNCResponse( create_sort: bool = Query(False, description="是否按创建时间排序,默认为倒序")
code_status=data.code_status, ):
msg=data.status_msg api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
)
# 将仓库名称和分支ID如果提供转换为列表
@router.put("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='更新分支同步状态') branch_id_list = branch_id.split(',') if branch_id is not None else []
async def update_branch_status( repo_name_list = repo_name.split(',') if repo_name is not None else []
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"), # 调用服务层方法获取日志
branch_name: str = Path(..., description="分支名称"), data = await self.log_service.get_logs(repo_name_list=repo_name_list, branch_id_list=branch_id_list,
enable: bool = Query(..., description="同步启用状态") page_num=page_num, page_size=page_size, create_sort=create_sort)
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user) # 注意这里的if not data条件可能不总是正确的因为data可能是其他结构如元组或列表
data = await self.service.update_branch(repo_name=repo_name, branch_name=branch_name, enable=enable) # 如果data是(total, logs)形式的元组,则应该这样处理:
return SYNCResponse( if not isinstance(data, tuple) or len(data) != 2:
code_status=data.code_status, # 处理异常情况,例如数据格式不正确
msg=data.status_msg return SYNCResponse(
) code_status=Status.ERROR.code, # 假设有Status.ERROR
total=0,
@router.get("/repo/logs", response_model=SYNCResponse, description='获取仓库/分支日志') data=[],
async def get_logs( msg="数据格式错误"
self, request: Request, user: str = Depends(user), )
repo_name: str = Query(None, description="仓库名称"), total, logs = data # 解构元组
branch_id: str = Query(None, description="分支id仓库粒度无需输入"),
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"), # 如果没有数据,返回相应的响应
create_sort: bool = Query(False, description="创建时间排序, 默认倒序") if not logs:
): return SYNCResponse(
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user) code_status=Status.NOT_DATA.code,
branch_id_list = branch_id.split(',') if branch_id is not None else [] total=total,
repo_name_list = repo_name.split(',') if repo_name is not None else [] data=[],
data = await self.log_service.get_logs(repo_name_list=repo_name_list, branch_id_list=branch_id_list, msg=Status.NOT_DATA.msg
page_num=page_num, page_size=page_size, create_sort=create_sort) )
if not data: # 返回成功响应
return SYNCResponse( return SYNCResponse(
code_status=Status.NOT_DATA.code, code_status=Status.SUCCESS.code,
total=data[0], total=total,
data=data[1], data=logs,
msg=Status.NOT_DATA.msg msg=Status.SUCCESS.msg
) )
return SYNCResponse(
code_status=Status.SUCCESS.code,
total=data[0],
data=data[1],
msg=Status.SUCCESS.msg
)

View File

@ -6,21 +6,34 @@ from src.api.Controller import APIController as Controller
from src.router import USER as user from src.router import USER as user
class User(Controller): # 定义一个User类继承自APIController基类
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token=None): class User(Controller):
return super().get_user(cookie_key=cookie_key, token=token) # 定义一个get_user方法该方法通过cookie_key和token参数获取用户信息
# Security(Controller.API_KEY_BUC_COOKIE)是一个依赖项用于从请求中提取API密钥
@user.get("/info", response_model=Response[UserInfoDto], description="获得用户信息") def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token=None):
async def get_user_info( # 调用基类的get_user方法传入cookie_key和token
self, # 这里假设基类有处理这些参数并返回用户信息的逻辑
user: Security = Depends(get_user) return super().get_user(cookie_key=cookie_key, token=token)
):
return Response( # 定义一个异步的get_user_info方法用于获取用户信息
data=UserInfoDto( # 使用@user.get装饰器将该方法注册为/info路由的GET请求处理函数
name=user.name, # response_model=Response[UserInfoDto]指定了响应的数据模型即Response包含UserInfoDto
nick=user.nick, # description参数提供了该路由的简短描述
emp_id=user.emp_id, @user.get("/info", response_model=Response[UserInfoDto], description="获得用户信息")
email=user.email, async def get_user_info(
dept=user.dept self,
) # 使用Security和Depends从请求中提取当前用户信息这里假设get_user是一个能返回用户的函数
) user: Security = Depends(get_user)
):
# 创建一个UserInfoDto实例包含从user中提取的用户信息
# 注意这里假设user对象具有name, nick, emp_id, email, dept等属性
user_info = UserInfoDto(
name=user.name,
nick=user.nick,
emp_id=user.emp_id,
email=user.email,
dept=user.dept
)
# 使用Response类封装用户信息并返回
# Response类可能处理了序列化、状态码、响应头等额外的逻辑
return Response(data=user_info)

View File

@ -33,18 +33,18 @@ buc_key and ConfigsUtil.set_obfastapi_config('buc_key', buc_key)
DB_ENV = getenv('DB_ENV', 'test_env') DB_ENV = getenv('DB_ENV', 'test_env')
DB = { DB = {
'test_env': { 'test_env': {
'host': getenv('CEROBOT_MYSQL_HOST', ''), 'host': getenv('CEROBOT_MYSQL_HOST', 'localhost'),
'port': getenv('CEROBOT_MYSQL_PORT', 2883, int), 'port': getenv('CEROBOT_MYSQL_PORT', 3306, int),
'user': getenv('CEROBOT_MYSQL_USER', ''), 'user': getenv('CEROBOT_MYSQL_USER', 'root'),
'passwd': getenv('CEROBOT_MYSQL_PWD', ''), 'passwd': getenv('CEROBOT_MYSQL_PWD', 'wq20030602'),
'dbname': getenv('CEROBOT_MYSQL_DB', '') 'dbname': getenv('CEROBOT_MYSQL_DB', 'reposyncer')
}, },
'local': { 'local': {
'host': getenv('CEROBOT_MYSQL_HOST', ''), 'host': getenv('CEROBOT_MYSQL_HOST', 'localhost'),
'port': getenv('CEROBOT_MYSQL_PORT', 2881, int), 'port': getenv('CEROBOT_MYSQL_PORT', 3306, int),
'user': getenv('CEROBOT_MYSQL_USER', ''), 'user': getenv('CEROBOT_MYSQL_USER', 'root'),
'passwd': getenv('CEROBOT_MYSQL_PWD', ''), 'passwd': getenv('CEROBOT_MYSQL_PWD', 'wq20030602'),
'dbname': getenv('CEROBOT_MYSQL_DB', '') 'dbname': getenv('CEROBOT_MYSQL_DB', 'reposyncer')
} }
} }

View File

@ -12,6 +12,7 @@ from src.do.sync_config import SyncDirect, SyncType
from src.dto.sync_config import SyncBranchDTO from src.dto.sync_config import SyncBranchDTO
from src.utils.sync_log import sync_log, LogType, log_path, api_log from src.utils.sync_log import sync_log, LogType, log_path, api_log
from src.service.sync_config import LogService from src.service.sync_config import LogService
from subprocess import check_output, CalledProcessError, STDOUT
sync_repo_dao = SyncRepoDAO() sync_repo_dao = SyncRepoDAO()
sync_branch_dao = SyncBranchDAO() sync_branch_dao = SyncBranchDAO()
@ -48,154 +49,237 @@ def delete_repo_dir(repo_name, user: str):
os.path.exists(repo_dir) and shutil.rmtree(repo_dir) os.path.exists(repo_dir) and shutil.rmtree(repo_dir)
def shell(cmd, dire: str, log_name: str, user: str): # 定义一个名为 shell 的函数它接受命令cmd、工作目录dire、日志名称log_name和用户user作为参数
log = f'Execute cmd: ' + cmd def shell(cmd, dire: str, log_name: str, user: str):
if 'git clone' in log: # 将执行的命令转化为字符串并记录到变量 log 中
sync_log(LogType.INFO, 'Execute cmd: git clone', log_name, user) log = f'Execute cmd: ' + cmd
elif 'git remote' in log:
sync_log(LogType.INFO, '添加/更新仓库信息', log_name, user) # 根据命令内容的不同,记录不同的日志信息
elif 'git ls-remote' in log: if 'git clone' in log:
sync_log(LogType.INFO, '获取仓库分支信息', log_name, user) # 如果命令中包含 'git clone',则记录克隆仓库的日志
else: sync_log(LogType.INFO, 'Execute cmd: git clone', log_name, user)
sync_log(LogType.INFO, log, log_name, user) elif 'git remote' in log:
output = subprocess.run(shlex.split(cmd), cwd=dire, capture_output=True, text=True) # 如果命令中包含 'git remote',则记录添加/更新仓库信息的日志
if output.returncode != 0: sync_log(LogType.INFO, '添加/更新仓库信息', log_name, user)
git_error = get_git_error(output.stderr) elif 'git ls-remote' in log:
if config.LOG_DETAIL: # 如果命令中包含 'git ls-remote',则记录获取仓库分支信息的日志
sync_log(LogType.ERROR, output.stderr, log_name, user) sync_log(LogType.INFO, '获取仓库分支信息', log_name, user)
raise git_error else:
# 如果命令不是上述三种情况之一,则记录原始命令的日志
sync_log(LogType.INFO, log, log_name, user)
# 使用 subprocess 模块执行命令并将命令分割为列表shlex.split 用于处理包含空格和特殊字符的命令)
# cwd 参数指定工作目录capture_output=True 表示捕获命令的输出text=True 表示输出为文本格式
output = subprocess.run(shlex.split(cmd), cwd=dire, capture_output=True, text=True)
# 检查命令的返回值,如果返回值不为 0则表示命令执行失败
if output.returncode != 0:
# 调用 get_git_error 函数从 stderr 中获取 Git 错误信息
git_error = get_git_error(output.stderr)
# 如果配置中启用了详细日志记录config.LOG_DETAIL则记录命令的错误输出
if config.LOG_DETAIL:
sync_log(LogType.ERROR, output.stderr, log_name, user)
# 抛出 Git 错误,以中断函数并向上层传递错误信息
raise git_error
# 如果命令执行成功,则返回命令的输出
return output return output
def init_repos(repo, log_name: str, user: str): def init_repos(repo, log_name: str, user: str):
not os.path.exists(SYNC_DIR) and os.makedirs(SYNC_DIR) # 检查SYNC_DIR同步目录是否存在如果不存在则创建它
repo_dir = os.path.join(SYNC_DIR, repo.repo_name) # 这里使用了逻辑与的短路求值特性如果os.path.exists(SYNC_DIR)为True则不会执行os.makedirs(SYNC_DIR)
if not os.path.exists(repo_dir): not os.path.exists(SYNC_DIR) and os.makedirs(SYNC_DIR)
sync_log(LogType.INFO, "初始化仓库 *********", log_name, user)
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token) # 构造仓库目录的路径将repo的repo_name与SYNC_DIR结合
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token) repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
if repo.sync_direction == SyncDirect.to_outer:
# 克隆内部仓库到同步目录下 # 检查仓库目录是否存在
shell(f'git clone {inter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user) if not os.path.exists(repo_dir):
else: # 如果仓库目录不存在,记录一条日志信息,表明正在初始化仓库
# 克隆外部仓库到同步目录下 sync_log(LogType.INFO, "********* 初始化仓库 *********", log_name, user)
shell(f'git clone {exter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
# 添加internal远程仓库并强制使用 # 调用get_repo_address_with_token函数获取带有内部令牌的内部仓库地址
shell(f'git remote add -f internal {inter_repo_addr}', repo_dir, log_name, user) inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
# 添加external远程仓库并强制使用
shell(f'git remote add -f external {exter_repo_addr}', repo_dir, log_name, user) # 调用get_repo_address_with_token函数获取带有外部令牌的外部仓库地址
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
# 根据repo的同步方向来决定克隆哪个仓库
if repo.sync_direction == SyncDirect.to_outer:
# 如果同步方向是to_outer可能是从内部到外部则克隆内部仓库到同步目录下
shell(f'git clone {inter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
else:
# 如果不是to_outer可能是从外部到内部或双向则克隆外部仓库到同步目录下
shell(f'git clone {exter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
# 无论克隆了哪个仓库,都执行以下两步操作来添加远程仓库
# 添加internal远程仓库并使用-f参数强制获取远程仓库的最新数据
# 注意这里的internal仓库可能指的是之前克隆的内部仓库或者是另一个指定的内部仓库
shell(f'git remote add -f internal {inter_repo_addr}', repo_dir, log_name, user)
# 添加external远程仓库并使用-f参数强制获取远程仓库的最新数据
# 注意这里的external仓库可能指的是之前克隆的外部仓库或者是另一个指定的外部仓库
shell(f'git remote add -f external {exter_repo_addr}', repo_dir, log_name, user)
def inter_to_outer(repo, branch, log_name: str, user: str, force_flag): def inter_to_outer(repo, branch, log_name: str, user: str, force_flag: bool):
repo_dir = os.path.join(SYNC_DIR, repo.repo_name) # 拼接内部仓库的目录路径
inter_name = branch.internal_branch_name repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
outer_name = branch.external_branch_name # 获取内部和外部的分支名
try: inter_name = branch.internal_branch_name
# 从internal仓库的指定分支inter_name中获取代码更新远程分支的信息到本地仓库 outer_name = branch.external_branch_name
shell(f"git fetch internal {inter_name}", repo_dir, log_name, user) try:
# 切换到inter_name分支并将internal仓库的分支强制 checkout 到当前分支。 # 从internal仓库的指定分支inter_name中获取代码更新远程分支的信息到本地仓库
shell(f"git checkout -B {inter_name} internal/{inter_name}", repo_dir, log_name, user) # 执行 git fetch 命令从名为 'internal' 的远程仓库获取 inter_name 分支的最新代码
# 将本地仓库的inter_name分支推送到external仓库的outer_name分支上。 shell(f"git fetch internal {inter_name}", repo_dir, log_name, user)
if force_flag: # 切换到inter_name分支并将internal仓库的分支强制 checkout 到当前分支
shell(f"git push --force external {inter_name}:{outer_name}", repo_dir, log_name, user) # 使用 git checkout -B 命令创建如果不存在或重置如果存在inter_name分支并切换到它
else: # 同时将 internal 仓库的 inter_name 分支的内容拉取到本地该分支
shell(f"git push external {inter_name}:{outer_name}", repo_dir, log_name, user) shell(f"git checkout -B {inter_name} internal/{inter_name}", repo_dir, log_name, user)
# commit id # 将本地仓库的inter_name分支推送到external仓库的outer_name分支上
# result = shell(f"git log HEAD~1..HEAD --oneline", repo_dir, log_name, user) # 如果 force_flag 为 True则使用 --force 标志强制推送,可能会覆盖远程分支的提交
# commit_id = result.stdout.split(" ")[0] if force_flag:
result = shell(f'git log -1 --format="%H"', repo_dir, log_name, user) shell(f"git push --force external {inter_name}:{outer_name}", repo_dir, log_name, user)
commit_id = result.stdout[0:7] else:
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user) # 否则,正常推送,不覆盖远程分支的提交
return commit_id shell(f"git push external {inter_name}:{outer_name}", repo_dir, log_name, user)
except Exception as e: # 在推送后获取outer-name分支的最新提交ID假设它已经被更新
outer_commit_id = check_output(f'git ls-remote external {outer_name} | cut -f1', shell=True, cwd=repo_dir, stderr=STDOUT).decode().strip()
# 比较inter_name分支的HEAD和outer-name分支的最新提交
# 注意这里我们假设inter_name分支已经是最新的因为我们刚刚推送了它
diff_output = check_output(f'git diff --name-status {outer_commit_id}', shell=True, cwd=repo_dir, stderr=STDOUT).decode()
# 解析diff输出以获取被修改的文件名
modified_files = [line.split()[1] for line in diff_output.strip().split('\n') if line]
# 记录被修改的文件名到日志中
if modified_files:
sync_log(LogType.INFO, f'Modified files: {", ".join(modified_files)}', log_name, user)
# 获取最近一次提交的 commit id
result = shell(f'git log -1 --format="%H"', repo_dir, log_name, user)
commit_id = result.stdout[0:7]
# 记录 commit id 到日志中
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
# 返回 commit id
return commit_id
except (CalledProcessError, Exception) as e:
# 处理可能出现的异常
raise raise
def outer_to_inter(repo, branch, log_name: str, user: str, force_flag): def outer_to_inter(repo, branch, log_name: str, user: str, force_flag):
repo_dir = os.path.join(SYNC_DIR, repo.repo_name) # 拼接内部仓库的目录路径
inter_name = branch.internal_branch_name repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
outer_name = branch.external_branch_name # 获取内部和外部的分支名
try: inter_name = branch.internal_branch_name
# 从external仓库的指定分支outer_name中获取代码更新远程分支的信息到本地仓库 outer_name = branch.external_branch_name
shell(f"git fetch external {outer_name}", repo_dir, log_name, user) try:
# 切换到本地仓库的outer_name分支并将origin仓库的outer_name分支强制 checkout 到当前分支。 # 从external仓库的指定分支outer_name中获取代码更新远程分支的信息到本地仓库
shell(f"git checkout -B {outer_name} external/{outer_name}", repo_dir, log_name, user) # 执行 git fetch 命令从名为 'external' 的远程仓库获取 outer_name 分支的最新代码
# 将本地仓库的outer_name分支推送到internal仓库的inter_name分支上。 shell(f"git fetch external {outer_name}", repo_dir, log_name, user)
if force_flag: # 切换到本地仓库的outer_name分支如果不存在则创建并将external仓库的outer_name分支内容拉取到本地该分支
shell(f"git push --force internal {outer_name}:{inter_name}", repo_dir, log_name, user) # 使用 git checkout -B 命令创建如果不存在或重置如果存在outer_name分支并切换到它
else: shell(f"git checkout -B {outer_name} external/{outer_name}", repo_dir, log_name, user)
shell(f"git push internal {outer_name}:{inter_name}", repo_dir, log_name, user) # 将本地仓库的outer_name分支推送到internal仓库的inter_name分支上
# commit id # 如果 force_flag 为 True则使用 --force 标志强制推送,可能会覆盖远程分支的提交
# result = shell(f"git log HEAD~1..HEAD --oneline", repo_dir, log_name, user) if force_flag:
# commit_id = result.stdout.split(" ")[0] shell(f"git push --force internal {outer_name}:{inter_name}", repo_dir, log_name, user)
result = shell(f'git log -1 --format=%h', repo_dir, log_name, user) else:
commit_id = result.stdout[0:7] # 否则,正常推送,不覆盖远程分支的提交
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user) shell(f"git push internal {outer_name}:{inter_name}", repo_dir, log_name, user)
return commit_id # 获取最新提交的 commit ID短哈希
except Exception as e: # 注意:这里使用 %h 而不是 %H 来获取短哈希通常是7个字符
result = shell(f'git log -1 --format=%h', repo_dir, log_name, user)
commit_id = result.stdout[0:7]
# 记录 commit ID 到日志中
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
# 返回 commit ID
return commit_id
except Exception as e:
# 如果在同步过程中发生任何异常,则直接抛出该异常
raise raise
async def sync_repo_task(repo, user, force_flag):
if repo.sync_granularity == SyncType.one:
branches = await sync_branch_dao.sync_branch(repo_id=repo.id)
await sync_branch_task(repo, branches, repo.sync_direction, user)
else:
log_name = f'sync_{repo.repo_name}.log'
try:
init_repos(repo, log_name, user)
sync_log(LogType.INFO, f'************ 执行{repo.repo_name}仓库同步 ************', log_name, user)
if repo.sync_direction == SyncDirect.to_outer:
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
stm = shell(f"git ls-remote --heads {inter_repo_addr}", SYNC_DIR, log_name, user)
branch_list_output = stm.stdout.split('\n')
for branch in branch_list_output:
if branch:
branch_name = branch.split('/')[-1].strip()
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
sync_log(LogType.INFO, f'Execute inter to outer {branch_name} branch Sync', log_name, user)
inter_to_outer(repo, branch, log_name, user, force_flag)
else:
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
stm = shell(f"git ls-remote --heads {exter_repo_addr}", SYNC_DIR, log_name, user)
branch_list_output = stm.stdout.split('\n')
for branch in branch_list_output:
if branch:
branch_name = branch.split('/')[-1].strip()
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
sync_log(LogType.INFO, f'Execute outer to inter {branch_name} branch Sync', log_name, user)
outer_to_inter(repo, branch, log_name, user, force_flag)
sync_log(LogType.INFO, f'************ {repo.repo_name}仓库同步完成 ************', log_name, user)
finally:
if config.DELETE_SYNC_DIR:
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
await log_service.insert_repo_log(repo_name=repo.repo_name, direct=repo.sync_direction)
os.remove(os.path.join(log_path, log_name))
async def sync_branch_task(repo, branches, direct, user, force_flag): async def sync_branch_task(repo, branches, direct, user, force_flag):
# 遍历每个分支进行同步操作
for branch in branches: for branch in branches:
# 分支日志文件名
log_name = f'sync_{repo.repo_name}_{branch.id}.log' log_name = f'sync_{repo.repo_name}_{branch.id}.log'
commit_id = '' commit_id = ''
try: try:
# 初始化仓库同步环境
init_repos(repo, log_name, user) init_repos(repo, log_name, user)
# 记录分支同步开始信息
sync_log(LogType.INFO, f'************ 执行分支同步 ************', log_name, user) sync_log(LogType.INFO, f'************ 执行分支同步 ************', log_name, user)
# 根据同步方向执行不同的分支同步操作
if direct == SyncDirect.to_inter: if direct == SyncDirect.to_inter:
# 记录外部到内部分支同步信息
sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user) sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user)
# 执行外部到内部分支同步操作并获取commit_id
commit_id = outer_to_inter(repo, branch, log_name, user, force_flag) commit_id = outer_to_inter(repo, branch, log_name, user, force_flag)
else: else:
# 记录内部到外部分支同步信息
sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user) sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user)
# 执行内部到外部分支同步操作并获取commit_id
commit_id = inter_to_outer(repo, branch, log_name, user, force_flag) commit_id = inter_to_outer(repo, branch, log_name, user, force_flag)
# 记录分支同步完成信息
sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user) sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user)
finally: finally:
# 如果配置允许删除同步工作目录,则删除
if config.DELETE_SYNC_DIR: if config.DELETE_SYNC_DIR:
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR) os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user) sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
# 插入分支同步日志
await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id) await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id)
# 删除同步日志文件
os.remove(os.path.join(log_path, log_name))
async def sync_branch_task(repo, branches, direct, user, force_flag):
for branch in branches:
log_name = f'sync_{repo.repo_name}_{branch.id}.log'
commit_id = ''
try:
# 初始化代码库和日志系统
init_repos(repo, log_name, user)
# 记录日志,表示开始执行分支同步
sync_log(LogType.INFO, f'************ 执行分支同步 ************', log_name, user)
if direct == SyncDirect.to_inter:
# 执行外部分支到内部分支的同步
sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user)
commit_id = outer_to_inter(repo, branch, log_name, user, force_flag)
else:
# 执行内部分支到外部分支的同步
sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user)
commit_id = inter_to_outer(repo, branch, log_name, user, force_flag)
# 记录日志,表示分支同步已完成
sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user)
finally:
if config.DELETE_SYNC_DIR:
# 删除同步工作目录
if os.path.exists(SYNC_DIR): # 修复潜在问题:使用 if 语句检查目录是否存在
os.removedirs(SYNC_DIR)
# 记录日志,表示已删除同步工作目录
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
# 异步插入分支日志到日志服务
await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id)
# 假设 log_path 是已定义的变量或配置
# 删除之前创建的日志文件
# 修正潜在问题:使用已定义的 log_path
os.remove(os.path.join(log_path, log_name)) os.remove(os.path.join(log_path, log_name))

View File

@ -13,36 +13,40 @@ from src.do.log import LogDO
from src.dto.log import Log as LogDTO from src.dto.log import Log as LogDTO
class LogService(Service): class LogService(Service):
def __init__(self) -> None: #LogService类用于处理与日志相关的业务逻辑。
self._log_dao = LogDAO() def __init__(self) -> None:
#初始化方法创建一个LogDAO对象用于数据访问。
async def get_logs_by_job(self, id, page: int = 1, size: int = 10) -> Optional[List[LogDTO]]: self._log_dao = LogDAO() # 创建一个LogDAO对象用于与数据库交互
cond = text(f"sync_job_id = {id}") async def get_logs_by_job(self, id, page: int = 1, size: int = 10) -> Optional[List[LogDTO]]:
start = (page - 1) * size #根据作业ID获取日志列表。
all = await self._log_dao.fetch(cond=cond, start=start, limit=size) #Optional[List[LogDTO]]: 日志DTO列表如果没有找到则为None
data = [] cond = text(f"sync_job_id = {id}") # 构建SQL条件这里可能存在SQL注入的风险
for log in all: start = (page - 1) * size # 计算起始索引
data.append(self._do_to_dto(log)) all = await self._log_dao.fetch(cond=cond, start=start, limit=size) # 从数据库中获取日志列表
return data data = [] # 初始化一个空列表用于存储DTO对象
for log in all:
async def save_logs(self, id, type, msg) -> Optional[LogDTO]: data.append(self._do_to_dto(log)) # 将每个日志DO转换为DTO并添加到列表中
return await self._log_dao.insert_log(id, type, msg) return data # 返回DTO列表
async def delete_logs(self) -> Optional[bool]: async def save_logs(self, id, type, msg) -> Optional[LogDTO]:
return await self._log_dao.delete_log() #保存日志。
return await self._log_dao.insert_log(id, type, msg) # 调用DAO的insert_log方法保存日志
async def count_logs(self, id) -> int:
cond = text(f"sync_job_id = {id}") async def delete_logs(self) -> Optional[bool]:
return await self._log_dao._get_count(cond=cond) return await self._log_dao.delete_log() # 调用DAO的delete_log方法删除日志
def _do_to_dto(self, log: LogDO) -> LogDTO: async def count_logs(self, id) -> int:
return LogDTO( cond = text(f"sync_job_id = {id}") # 构建SQL条件这里可能存在SQL注入的风险
**{ return await self._log_dao._get_count(cond=cond) # 调用DAO的_get_count方法获取日志数量注意通常不建议直接访问类的私有方法
'id': log[LogDO].id,
'sync_job_id': log[LogDO].sync_job_id, def _do_to_dto(self, log: LogDO) -> LogDTO:
'log_type': log[LogDO].log_type, return LogDTO(
'log': log[LogDO].log, **{
'create_time': log[LogDO].create_time 'id': log.id, # 注意这里应该是直接使用log.id而不是log[LogDO].id
} 'sync_job_id': log.sync_job_id,
) 'log_type': log.log_type,
'log': log.log,
'create_time': log.create_time
}
) # 创建一个新的LogDTO对象并返回