. #10

Open
wngqi wants to merge 1 commits from wangqi_branch into master
7 changed files with 1051 additions and 735 deletions

View File

@ -1,4 +1,4 @@
--
CREATE TABLE IF NOT EXISTS `sync_repo_mapping` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS `sync_repo_mapping` (
UNIQUE KEY (`repo_name`)
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '同步仓库映射表';
--
CREATE TABLE IF NOT EXISTS `sync_branch_mapping`(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
@ -26,7 +26,7 @@ CREATE TABLE IF NOT EXISTS `sync_branch_mapping`(
PRIMARY KEY (`id`)
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '同步分支映射表';
--
CREATE TABLE IF NOT EXISTS `repo_sync_log`(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,

View File

@ -32,297 +32,414 @@ from src.service.log import LogService
from src.utils import github, gitlab, gitee, gitcode, gitlink
class Project(Controller):
class Project(Controller):
# 定义一个get_user方法用于通过cookie_key和token获取用户信息
# 这里假设Controller基类有一个get_user方法用于实际处理用户信息的获取
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
# 调用基类的get_user方法传入cookie_key和token作为参数
return super().get_user(cookie_key=cookie_key, token=token)
# 定义一个异步的get_project方法用于通过工程名可选获取同步工程列表
# 使用@project.get装饰器注意这里应该是@user.get或其他实际定义的router实例除非已经定义了project router
# 假设这里是一个错误并且应该使用正确定义的router如@user.get或@project_router.get
@project.get("", response_model=Response[DataList[ProjectData]], description='通过工程名获取一个同步工程列表')
async def get_project(
self,
# 定义一个可选的search参数用于搜索同步工程
search: Optional[str] = Query(None, description='同步工程搜索内容,如果未提供则获取所有工程'),
# 定义一个可选的orderby参数用于指定排序选项但当前方法体未使用此参数
orderby: Optional[str] = Query(None, description='排序选项,当前未实现排序功能'),
# 分页参数默认为第1页
pageNum: Optional[int] = Query(1, description="Page number默认为1"),
# 分页大小默认为每页10条记录
pageSize: Optional[int] = Query(10, description="Page size默认为10")
):
# 实例化ProjectService类用于处理与项目相关的业务逻辑
service = ProjectService()
# 根据是否提供了search参数来决定是获取所有项目的计数和列表还是通过搜索获取
if search is None:
# 如果未提供search参数则获取所有项目的总数和分页列表
count = await service.get_count()
answer = await service.list_projects(page=pageNum, size=pageSize)
else:
# 如果提供了search参数则根据搜索内容获取匹配项目的总数和列表
# 注意这里假设search参数中的空格被去除了以适应某些搜索逻辑
count = await service.get_count_by_search(search.replace(" ", ""))
# 注意search_project方法的调用可能需要根据实际服务方法进行调整
# 这里假设它接受一个name参数并返回搜索结果
answer = await service.search_project(name=search.replace(" ", ""))
# 如果answer为None表示获取项目列表失败
if answer is None:
# 记录错误日志
logger.error(f"The project list fetch failed")
# 抛出一个自定义的异常,表示查询失败
raise Errors.QUERY_FAILD
# 返回一个包含成功状态和项目数据的Response对象
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=answer)
)
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
return super().get_user(cookie_key=cookie_key, token=token)
@project.get("", response_model=Response[DataList[ProjectData]], description='通过工程名获取一个同步工程')
async def get_project(
self,
search: Optional[str] = Query(None, description='同步工程搜索内容'),
orderby: Optional[str] = Query(None, description='排序选项'),
pageNum: Optional[int] = Query(1, description="Page number"),
pageSize: Optional[int] = Query(10, description="Page size")
):
# search
service = ProjectService()
if search is None:
count = await service.get_count()
answer = await service.list_projects(page=pageNum, size=pageSize)
else:
count = await service.get_count_by_search(search.replace(" ", ""))
answer = await service.search_project(name=search.replace(" ", ""))
if answer is None:
logger.error(f"The project list fetch failed")
raise Errors.QUERY_FAILD
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=answer)
@project.post("", response_model=Response[ProjectData], description='创建一个同步工程')
async def create_project(
self,
# 定义一个CreateProjectItem类型的item参数通过Body(...)表示这是请求体中的数据,且为必填项
item: CreateProjectItem = Body(..., description='同步工程属性')
):
# 前置检查
# 检查请求体中的item是否为空
if not item:
# 如果为空,则抛出参数缺失的异常
raise ErrorTemplate.ARGUMENT_LACK("请求体")
# 检查工程名是否缺失
if not item.name:
# 如果工程名缺失,则抛出参数缺失的异常
raise ErrorTemplate.ARGUMENT_LACK("工程名")
# 对GitHub、GitLab、Gitee、CodeChina仓库地址进行有效性检查
if item.github_address:
if not github.check_github_address(item.github_address):
# 如果GitHub仓库地址无效则抛出参数错误异常
raise ErrorTemplate.TIP_ARGUMENT_ERROR("GitHub仓库")
if item.gitlab_address:
if not gitlab.check_gitlab_address(item.gitlab_address):
# 如果GitLab仓库地址无效则抛出参数错误异常
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitlab/Antcode仓库")
if item.gitee_address:
if not gitee.check_gitee_address(item.gitee_address):
# 如果Gitee仓库地址无效则抛出参数错误异常
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitee仓库")
if item.code_china_address:
if not gitcode.check_gitcode_address(item.code_china_address):
# 如果CodeChina仓库地址无效则抛出参数错误异常
raise ErrorTemplate.TIP_ARGUMENT_ERROR("CodeChina仓库")
# 注意Gitlink仓库地址的检查被注释掉了如果需要可以取消注释
# 实例化ProjectService类用于处理与项目相关的业务逻辑
service = ProjectService()
# 调用ProjectService的insert_project方法插入项目并等待结果
resp = await service.insert_project(item)
# 检查插入结果是否为空
if not resp:
# 如果为空,则记录错误日志并抛出插入失败的异常
logger.error(f"The project insert failed")
raise Errors.INSERT_FAILD
# 如果GitHub仓库地址存在则提取组织名和仓库名
organization, repo = github.transfer_github_to_name(item.github_address)
# 如果成功提取了组织名和仓库名则创建异步任务来同步Pull Request
if organization and repo:
pull_request_service = PullRequestService()
# 创建一个异步任务来调用PullRequestService的sync_pull_request方法
task = asyncio.create_task(pull_request_service.sync_pull_request(item.name, organization, repo))
# 注意这里只是创建了任务并没有等待它完成。如果需要等待可以添加await task
# 返回一个包含成功状态、响应数据和消息的Response对象
return Response(
code=Code.SUCCESS,
data=resp,
msg="创建同步工程成功"
)
@ project.post("", response_model=Response[ProjectData], description='创建一个同步工程')
async def create_project(
self,
item: CreateProjectItem = Body(..., description='同步工程属性')
):
# pre check
if not item:
raise ErrorTemplate.ARGUMENT_LACK("请求体")
if not item.name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if item.github_address:
if not github.check_github_address(item.github_address):
raise ErrorTemplate.TIP_ARGUMENT_ERROR("GitHub仓库")
if item.gitlab_address:
if not gitlab.check_gitlab_address(item.gitlab_address):
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitlab/Antcode仓库")
if item.gitee_address:
if not gitee.check_gitee_address(item.gitee_address):
raise ErrorTemplate.TIP_ARGUMENT_ERROR("Gitee仓库")
if item.code_china_address:
if not gitcode.check_gitcode_address(item.code_china_address):
raise ErrorTemplate.TIP_ARGUMENT_ERROR("CodeChina仓库")
# if item.gitlink_address:
# if not gitlink.check_gitlink_address(item.gitlink_address):
# raise ErrorTemplate.ARGUMENT_ERROR("Gitlink仓库")
service = ProjectService()
resp = await service.insert_project(item)
if not resp:
logger.error(f"The project insert failed")
raise Errors.INSERT_FAILD
organization, repo = github.transfer_github_to_name(
item.github_address)
if organization and repo:
pull_request_service = PullRequestService()
task = asyncio.create_task(
pull_request_service.sync_pull_request(item.name, organization, repo))
return Response(
code=Code.SUCCESS,
data=resp,
msg="创建同步工程成功"
)
@ project.delete("", response_model=Response, description='通过id删除一个同步工程')
async def delete_project(
self,
id: int = Query(..., description='同步工程id')
):
if not id:
raise ErrorTemplate.ARGUMENT_LACK("id")
# if delete the project, the front page double check firstly
project_service = ProjectService()
project = await project_service.search_project(id=id)
name = project[0].name
# delete pull request
pull_request_service = PullRequestService()
resp = await pull_request_service.fetch_pull_request(name)
if resp:
if len(resp) > 0:
for pr in resp:
await pull_request_service.delete_pull_request(pr.id)
# delete sync job
job_service = JobService()
resp = await job_service.list_jobs(project=name)
if not resp:
pass
else:
for item in resp:
await job_service.delete_job(item.id)
# delete sync project
resp = await project_service.delete_project(id)
if not resp:
logger.error(f"The project #{id} delete failed")
raise Errors.DELETE_FAILD
return Response(
code=Code.SUCCESS,
msg="删除同步工程成功"
# 定义一个异步的delete_project方法用于通过id删除一个同步工程
# 使用@project.delete装饰器注意这里project应该是某个router的实例
@project.delete("", response_model=Response, description='通过id删除一个同步工程')
async def delete_project(
self,
# 定义一个int类型的id参数通过Query(...)表示这是查询参数,且为必填项
id: int = Query(..., description='同步工程id')
):
# 检查id是否有效
if not id:
# 如果id无效如为空则抛出参数缺失的异常
raise ErrorTemplate.ARGUMENT_LACK("id")
# 实例化ProjectService类用于处理与项目相关的业务逻辑
project_service = ProjectService()
# 搜索指定ID的项目
project = await project_service.search_project(id=id)
# 假设search_project返回的是一个列表这里只取第一个元素假设ID是唯一的
# 并获取项目的名称,用于后续操作
name = project[0].name
# 实例化PullRequestService类用于处理与Pull Request相关的业务逻辑
pull_request_service = PullRequestService()
# 尝试获取与该项目相关的所有Pull Request
resp = await pull_request_service.fetch_pull_request(name)
# 如果存在Pull Request则遍历并删除它们
if resp:
if len(resp) > 0:
for pr in resp:
await pull_request_service.delete_pull_request(pr.id)
# 实例化JobService类用于处理与同步任务相关的业务逻辑
job_service = JobService()
# 列出与该项目相关的所有同步任务
resp = await job_service.list_jobs(project=name)
# 遍历并删除这些同步任务
if resp:
for item in resp:
await job_service.delete_job(item.id)
# 最后,删除同步工程本身
resp = await project_service.delete_project(id)
# 检查删除操作是否成功
if not resp:
# 如果删除失败,则记录错误日志并抛出删除失败的异常
logger.error(f"The project #{id} delete failed")
raise Errors.DELETE_FAILD
# 如果一切顺利,则返回成功响应
return Response(
code=Code.SUCCESS,
msg="删除同步工程成功"
)
class Job(Controller):
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
return super().get_user(cookie_key=cookie_key, token=token)
@ job.get("/projects/{name}/jobs", response_model=Response[DataList[JobData]], description='列出所有同步流')
async def list_jobs(
self,
name: str = Query(..., description='同步工程名'),
search: Optional[str] = Query(None, description='同步工程搜索内容'),
source: Optional[str] = Query(None, description='分支来源'),
pageNum: Optional[int] = Query(1, description="Page number"),
pageSize: Optional[int] = Query(10, description="Page size")
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
service = JobService()
if search is not None:
search = search.replace(" ", "")
answer = await service.list_jobs(project=name, search=search, source=source, page=pageNum, size=pageSize)
if not answer:
return Response(
code=Code.SUCCESS,
data=DataList(total=0, list=[]),
msg="没有同步流"
)
count = await service.count_job(project=name, search=search, source=source)
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=answer),
msg="查询同步流成功"
class Job(Controller):
# 这是一个继承自Controller的类方法用于根据cookie或token获取用户信息。
# 注意这里的Security和Controller.API_KEY_BUC_COOKIE可能需要具体实现或定义。
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token: str = None):
return super().get_user(cookie_key=cookie_key, token=token)
# 使用FastAPI的路由装饰器定义了一个GET请求的处理函数用于列出同步流。
# 注意这里的job应该是某个router的实例但在这个上下文中我们直接使用@get装饰器。
# 假设已经有一个全局的router实例或者这里是一个简化的例子。
@staticmethod # 如果这个方法不依赖于类的实例状态,可以考虑使用@staticmethod
@app.get("/projects/{name}/jobs", response_model=Response[DataList[JobData]], description='列出所有同步流')
async def list_jobs(
# 这里的self参数在@staticmethod或路由装饰器中通常是不需要的除非它是一个类方法或实例方法。
# 但由于我们假设这是一个路由处理函数所以不需要self。
name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
search: Optional[str] = Query(None, description='同步工程搜索内容'), # 可选的搜索内容
source: Optional[str] = Query(None, description='分支来源'), # 可选的分支来源
pageNum: Optional[int] = Query(1, description="Page number"), # 页码默认为1
pageSize: Optional[int] = Query(10, description="Page size") # 每页大小默认为10
):
# 检查同步工程名是否提供
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
# 实例化JobService类用于处理与同步任务相关的业务逻辑
service = JobService()
# 如果提供了搜索内容,则去除其中的空格
if search is not None:
search = search.replace(" ", "")
# 调用服务层的list_jobs方法获取同步流列表
answer = await service.list_jobs(project=name, search=search, source=source, page=pageNum, size=pageSize)
# 如果没有找到同步流,则返回相应的响应
if not answer:
return Response(
code=Code.SUCCESS,
data=DataList(total=0, list=[]), # DataList可能是一个自定义的Pydantic模型用于封装分页数据
msg="没有同步流"
)
# 调用服务层的count_job方法获取符合条件的同步流总数
count = await service.count_job(project=name, search=search, source=source)
# 返回包含同步流列表和总数的响应
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=answer),
msg="查询同步流成功"
)
@ job.post("/projects/{name}/jobs", response_model=Response[JobData], description='创建一个同步流')
async def create_job(
self,
name: str = Query(..., description='同步工程名'),
item: CreateJobItem = Body(..., description='同步流属性')
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not item:
raise ErrorTemplate.ARGUMENT_LACK("JSON")
if not item.type:
raise ErrorTemplate.ARGUMENT_LACK("分支同步类型")
service = JobService()
ans = await service.create_job(name, item)
if not ans:
logger.error(f"Create a job of project #{name} failed")
raise Errors.INSERT_FAILD
return Response(
code=Code.SUCCESS,
data=ans,
msg="创建同步流成功"
# 注意:这里的@job.post可能是个错误因为通常我们会使用FastAPI的@app.post或某个router实例的.post()
# 除非job是一个已经定义并配置好的router实例否则这里应该是@app.post或其他router实例的装饰器。
# 这里我们假设job是一个有效的router实例。
# 创建一个同步流的异步方法
@job.post("/projects/{name}/jobs", response_model=Response[JobData], description='创建一个同步流')
async def create_job(
self,
name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
item: CreateJobItem = Body(..., description='同步流属性') # 同步流的详细属性,作为请求体
):
# 检查同步工程名是否提供
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
# 检查请求体中是否包含同步流属性
if not item:
raise ErrorTemplate.ARGUMENT_LACK("JSON")
# 检查同步流属性中是否包含类型
if not item.type:
raise ErrorTemplate.ARGUMENT_LACK("分支同步类型")
# 实例化JobService类用于处理与同步任务相关的业务逻辑
service = JobService()
# 调用服务层的create_job方法创建同步流
ans = await service.create_job(name, item)
# 检查是否成功创建同步流
if not ans:
# 记录错误日志
logger.error(f"Create a job of project #{name} failed")
# 抛出自定义的异常
raise Errors.INSERT_FAILD
# 返回创建成功的响应
return Response(
code=Code.SUCCESS,
data=ans, # 返回创建成功的同步流数据
msg="创建同步流成功"
)
# 开启一个同步流的异步方法
@job.put("/projects/{name}/jobs/{id}/start", response_model=Response, description='开启一个同步流')
async def start_job(
self,
name: str = Query(..., description='同步工程名'), # 同步工程的名称,作为路径参数
id: int = Query(..., description='同步流id') # 要开启的同步流的ID作为路径参数
):
# 检查同步工程名是否提供
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
# 检查同步流ID是否提供
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
# 实例化JobService类
service = JobService()
# 调用服务层的update_status方法开启同步流
ans = await service.update_status(id, True) # 假设True表示开启状态
# 检查是否成功开启同步流
if not ans:
# 记录错误日志
logger.error(f"The job #{id} of project #{name} start failed")
# 抛出自定义的异常
raise Errors.UPDATE_FAILD
# 返回开启成功的响应
return Response(
code=Code.SUCCESS,
msg="开启同步流成功"
)
@ job.put("/projects/{name}/jobs/{id}/start", response_model=Response, description='开启一个同步流')
async def start_job(
self,
name: str = Query(..., description='同步工程名'),
id: int = Query(..., description='同步流id')
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
service = JobService()
ans = await service.update_status(id, True)
if not ans:
logger.error(f"The job #{id} of project #{name} start failed")
raise Errors.UPDATE_FAILD
return Response(
code=Code.SUCCESS,
msg="开启同步流成功"
)
@ job.put("/projects/{name}/jobs/{id}/stop", response_model=Response, description='停止一个同步流')
async def stop_job(
self,
name: str = Query(..., description='同步工程名'),
id: int = Query(..., description='同步流id')
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
service = JobService()
ans = await service.update_status(id, False)
if not ans:
logger.error(f"The job #{id} of project #{name} stop failed")
raise Errors.UPDATE_FAILD
return Response(
code=Code.SUCCESS,
msg="关闭同步流成功"
)
@ job.delete("/projects/{name}/jobs", response_model=Response, description='通过id删除一个同步流')
async def delete_job(
self,
name: str = Query(..., description='同步工程名'),
id: int = Query(..., description='同步流id')
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
service = JobService()
ans = await service.delete_job(id)
if not ans:
logger.error(f"The job #{id} of project #{name} delete failed")
raise Errors.DELETE_FAILD
return Response(
code=Code.SUCCESS,
msg="删除同步流成功"
)
@ job.put("/projects/{name}/jobs/{id}/set_commit", response_model=Response, description='通过id设置一个同步流的commit')
async def set_job_commit(
self,
name: str = Query(..., description='同步工程名'),
id: int = Query(..., description='同步流id'),
commit: str = Query(..., description='commit'),
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
service = JobService()
job = await service.get_job(id)
if not job:
logger.error(f"The job #{id} of project #{name} is not exist")
raise Errors.UPDATE_FAILD
# only the sync type is oneway can use the commit
if job.type == SyncType.TwoWay:
logger.error(f"The job #{id} of project #{name} is two way sync")
raise HTTPException(Code.OPERATION_FAILED, 'Twoway同步方式无法修改commit值')
ans = await service.update_job_lateset_commit(id, commit)
if not ans:
logger.error(
f"The job #{id} of project #{name} update latest commit failed")
raise Errors.UPDATE_FAILD
return Response(
code=Code.SUCCESS,
msg="设置同步流commit成功"
)
@ job.get("/projects/{name}/jobs/{id}/logs", response_model=Response[DataList[LogData]], description='列出所有同步流')
async def get_job_log(
self,
name: str = Query(..., description='同步工程名'),
id: int = Query(..., description='同步流id'),
pageNum: Optional[int] = Query(1, description="Page number"),
pageSize: Optional[int] = Query(1000, description="Page size")
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
project_service = ProjectService()
projects = await project_service.search_project(name=name)
if len(projects) == 0:
raise ErrorTemplate.ARGUMENT_ERROR("工程名")
service = LogService()
log = await service.get_logs_by_job(id, pageNum, pageSize)
data = []
for rep_log in log:
log_str = rep_log.log
if projects[0].gitee_token:
log_str = log_str.replace(projects[0].gitee_token, "******")
if projects[0].github_token:
log_str = log_str.replace(projects[0].github_token, "******")
rep_log.log = log_str
data.append(rep_log)
if len(log) == 0:
logger.info(
f"The job #{id} of project #{name} has no logs")
count = await service.count_logs(id)
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=data)
# 停止一个同步流
@ job.put("/projects/{name}/jobs/{id}/stop", response_model=Response, description='停止一个同步流')
async def stop_job(
self,
name: str = Query(..., description='同步工程名'), # 同步工程的名称
id: int = Query(..., description='同步流id') # 需要停止的同步流的ID
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
service = JobService() # 创建JobService实例来处理业务逻辑
ans = await service.update_status(id, False) # 调用服务方法停止同步流
if not ans:
logger.error(f"The job #{id} of project #{name} stop failed")
raise Errors.UPDATE_FAILD
return Response(
code=Code.SUCCESS,
msg="关闭同步流成功"
)
# 通过id删除一个同步流
@ job.delete("/projects/{name}/jobs/{id}", response_model=Response, description='通过id删除一个同步流')
async def delete_job(
self,
name: str = Query(..., description='同步工程名'), # 同步工程的名称
id: int = Query(..., description='同步流id') # 需要删除的同步流的ID
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
service = JobService() # 创建JobService实例来处理业务逻辑
ans = await service.delete_job(id) # 调用服务方法删除同步流
if not ans:
logger.error(f"The job #{id} of project #{name} delete failed")
raise Errors.DELETE_FAILD
return Response(
code=Code.SUCCESS,
msg="删除同步流成功"
)
@ job.put("/projects/{name}/jobs/{id}/set_commit", response_model=Response, description='通过id设置一个同步流的commit')
async def set_job_commit(
self,
name: str = Query(..., description='同步工程名'), # 同步工程的名称
id: int = Query(..., description='同步流id'), # 需要设置 commit 的同步流的 ID
commit: str = Query(..., description='commit'), # 需要设置的 commit 值
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
service = JobService() # 创建 JobService 实例来处理业务逻辑
job = await service.get_job(id) # 尝试获取同步流信息
if not job:
logger.error(f"The job #{id} of project #{name} is not exist")
raise Errors.UPDATE_FAILD
# 检查同步流类型,只有单向同步才能修改 commit
if job.type == SyncType.TwoWay: # 假设 SyncType 是一个枚举,包含 TwoWay 和其他类型
logger.error(f"The job #{id} of project #{name} is two way sync")
# 抛出 HTTP 异常,包含自定义的错误码和错误信息
raise HTTPException(Code.OPERATION_FAILED, 'Twoway同步方式无法修改commit值')
ans = await service.update_job_latest_commit(id, commit) # 调用服务方法更新 commit
if not ans:
logger.error(f"The job #{id} of project #{name} update latest commit failed")
raise Errors.UPDATE_FAILD
return Response(
code=Code.SUCCESS,
msg="设置同步流commit成功"
)
@job.get("/projects/{name}/jobs/{id}/logs", response_model=Response[DataList[LogData]], description='获取指定同步流的日志列表')
async def get_job_log(
self,
name: str = Query(..., description='同步工程名'), # 同步工程的名称
id: int = Query(..., description='同步流id'), # 需要获取日志的同步流 ID
pageNum: Optional[int] = Query(1, description="页码默认为1"), # 日志的分页页码
pageSize: Optional[int] = Query(1000, description="每页大小默认为1000") # 每页显示的日志条数
):
if not name:
raise ErrorTemplate.ARGUMENT_LACK("工程名")
if not id:
raise ErrorTemplate.ARGUMENT_LACK("同步流id")
# 首先验证工程是否存在
project_service = ProjectService()
projects = await project_service.search_project(name=name)
if len(projects) == 0:
raise ErrorTemplate.ARGUMENT_ERROR("工程名")
# 使用 LogService 获取日志
service = LogService()
log = await service.get_logs_by_job(id, pageNum, pageSize)
# 处理日志数据,替换敏感信息
data = []
for rep_log in log:
log_str = rep_log.log
if projects[0].gitee_token:
log_str = log_str.replace(projects[0].gitee_token, "******")
if projects[0].github_token:
log_str = log_str.replace(projects[0].github_token, "******")
rep_log.log = log_str
data.append(rep_log)
# 如果日志为空,记录日志信息
if len(log) == 0:
logger.info(f"The job #{id} of project #{name} has no logs")
# 获取日志总数
count = await service.count_logs(id)
# 返回响应
return Response(
code=Code.SUCCESS,
data=DataList(total=count, list=data)
)

View File

@ -21,280 +21,378 @@ from src.base.status_code import Status, SYNCResponse, SYNCException
from src.service.cronjob import GITMSGException
class SyncDirection(Controller):
class SyncDirection(Controller):
def __init__(self, *args, **kwargs):
self.service = SyncService() # 初始化同步服务
self.log_service = LogService() # 初始化日志服务(虽然在这段代码中未直接使用,但可能用于其他功能)
super().__init__(*args, **kwargs) # 调用父类 Controller 的构造函数
# 提供获取操作人员信息定义接口, 无任何实质性操作
# 此方法可能继承自 Controller 并被重写以保持接口一致性
def user(self):
return super().user()
@router.post("/repo", response_model=SYNCResponse, description='配置同步仓库')
async def create_sync_repo(
self, request: Request, user: str = Depends(user),
dto: SyncRepoDTO = Body(..., description="绑定同步仓库信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user) # 记录日志
# 校验外部和内部仓库地址是否合法
if not base.check_addr(dto.external_repo_address) or not base.check_addr(dto.internal_repo_address):
return SYNCResponse(
code_status=Status.REPO_ADDR_ILLEGAL.code,
msg=Status.REPO_ADDR_ILLEGAL.msg
)
# 校验同步粒度是否合法
if dto.sync_granularity not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_GRAN_ILLEGAL.code, msg=Status.SYNC_GRAN_ILLEGAL.msg)
# 校验同步方向是否合法
if dto.sync_direction not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
# 检查是否已存在同名的同步仓库
if await self.service.same_name_repo(repo_name=dto.repo_name):
return SYNCResponse(
code_status=Status.REPO_EXISTS.code,
msg=Status.REPO_EXISTS.msg
)
# 创建新的同步仓库
repo = await self.service.create_repo(dto)
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=repo,
msg=Status.SUCCESS.msg
)
@router.post("/{repo_name}/branch", response_model=SYNCResponse, description='配置同步分支')
async def create_sync_branch(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
dto: SyncBranchDTO = Body(..., description="绑定同步分支信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
try:
repo_id = await self.service.check_status(repo_name, dto) # 验证仓库状态和参数
except SYNCException as Error: # 捕获 SYNCException 异常
return SYNCResponse(
code_status=Error.code_status, # 使用异常中的状态码
msg=Error.status_msg # 使用异常中的错误信息
)
branch = await self.service.create_branch(dto, repo_id=repo_id) # 创建同步分支
return SYNCResponse(
code_status=Status.SUCCESS.code, # 返回成功状态码
data=branch, # 返回创建的分支信息
msg=Status.SUCCESS.msg # 返回成功消息
)
@router.get("/repo", response_model=SYNCResponse, description='获取同步仓库信息')
async def get_sync_repos(
self, request: Request, user: str = Depends(user),
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
):
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
repos = await self.service.get_sync_repo(page_num=page_num, page_size=page_size, create_sort=create_sort)
if repos is None:
return SYNCResponse(
code_status=Status.NOT_DATA.code, # 返回无数据状态码
msg=Status.NOT_DATA.msg # 返回无数据消息
)
return SYNCResponse(
code_status=Status.SUCCESS.code, # 返回成功状态码
data=repos, # 返回仓库列表信息
msg=Status.SUCCESS.msg # 返回成功消息
)
# 假设router是一个FastAPI的路由对象用于定义路由和视图函数
@router.get("/{repo_name}/branch", response_model=SYNCResponse, description='获取仓库对应的同步分支信息')
async def get_sync_branches(
self,
request: Request, # 当前请求的实例
user: str = Depends(user), # 依赖注入,自动从请求中提取用户信息
repo_name: str = Path(..., description="查询的仓库名称"), # 路径参数,必填,仓库名称
page_num: int = Query(1, description="页数"), # 查询参数页码默认为1
page_size: int = Query(10, description="条数"), # 查询参数每页显示的记录数默认为10
create_sort: bool = Query(False, description="创建时间排序, 默认倒序") # 查询参数是否按创建时间排序默认为False不排序或倒序
):
# 记录API访问日志
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
try:
# 通过仓库名称获取仓库ID
repo_id = await self.service.get_repo_id(repo_name=repo_name)
except SYNCException as Error:
# 如果在获取仓库ID时发生SYNCException异常则返回错误响应
return SYNCResponse(
code_status=Error.code_status,
msg=Error.status_msg
)
# 获取同步分支信息
branches = await self.service.get_sync_branches(
repo_id=repo_id,
page_num=page_num,
page_size=page_size,
create_sort=create_sort
)
# 如果没有找到同步分支信息,则返回无数据响应
if len(branches) < 1:
return SYNCResponse(
code_status=Status.NOT_DATA.code,
msg=Status.NOT_DATA.msg
)
# 返回成功响应,包含同步分支信息
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=branches,
msg=Status.SUCCESS.msg
)
def __init__(self, *args, **kwargs):
self.service = SyncService()
self.log_service = LogService()
super().__init__(*args, **kwargs)
# 提供获取操作人员信息定义接口, 无任何实质性操作
def user(self):
return super().user()
@router.post("/repo", response_model=SYNCResponse, description='配置同步仓库')
async def create_sync_repo(
self, request: Request, user: str = Depends(user),
dto: SyncRepoDTO = Body(..., description="绑定同步仓库信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
if not base.check_addr(dto.external_repo_address) or not base.check_addr(dto.internal_repo_address):
return SYNCResponse(
code_status=Status.REPO_ADDR_ILLEGAL.code,
msg=Status.REPO_ADDR_ILLEGAL.msg
)
if dto.sync_granularity not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_GRAN_ILLEGAL.code, msg=Status.SYNC_GRAN_ILLEGAL.msg)
if dto.sync_direction not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
if await self.service.same_name_repo(repo_name=dto.repo_name):
return SYNCResponse(
code_status=Status.REPO_EXISTS.code,
msg=Status.REPO_EXISTS.msg
)
repo = await self.service.create_repo(dto)
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=repo,
msg=Status.SUCCESS.msg
# 假设router是一个FastAPI的路由对象用于定义路由和视图函数
@router.post("/repo/{repo_name}", response_model=SYNCResponse, description='执行仓库同步')
async def sync_repo(
self,
request: Request, # 当前请求的实例
user: str = Depends(user), # 依赖注入,自动从请求中提取用户信息
repo_name: str = Path(..., description="仓库名称"), # 路径参数,必填,仓库名称
force_flag: bool = Query(False, description="是否强制同步") # 查询参数,指示是否强制进行同步操作
):
# 记录API访问日志
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
# 获取仓库信息
repo = await self.service.get_repo(repo_name=repo_name)
# 如果仓库不存在,返回仓库未找到的响应
if repo is None:
return SYNCResponse(code_status=Status.REPO_NOTFOUND.code, msg=Status.REPO_NOTFOUND.msg)
# 如果仓库未启用,返回仓库未启用的响应
if not repo.enable:
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
# 尝试执行仓库同步任务
try:
await sync_repo_task(repo, user, force_flag)
except GITMSGException as GITError:
# 如果在执行同步任务时发生GITMSGException异常则返回相应的错误信息
return SYNCResponse(
code_status=GITError.status, # 注意这里假设GITMSGException有一个status属性实际中可能需要根据实际情况调整
msg=GITError.msg
)
# 如果同步成功,返回成功响应
return SYNCResponse(
code_status=Status.SUCCESS.code,
msg=Status.SUCCESS.msg
)
@router.post("/{repo_name}/branch", response_model=SYNCResponse, description='配置同步分支')
async def create_sync_branch(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
dto: SyncBranchDTO = Body(..., description="绑定同步分支信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
try:
repo_id = await self.service.check_status(repo_name, dto)
except SYNCException as Error:
return SYNCResponse(
code_status=Error.code_status,
msg=Error.status_msg
)
branch = await self.service.create_branch(dto, repo_id=repo_id)
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=branch,
msg=Status.SUCCESS.msg
)
@router.get("/repo", response_model=SYNCResponse, description='获取同步仓库信息')
async def get_sync_repos(
self, request: Request, user: str = Depends(user),
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
):
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
repos = await self.service.get_sync_repo(page_num=page_num, page_size=page_size, create_sort=create_sort)
if repos is None:
return SYNCResponse(
code_status=Status.NOT_DATA.code,
msg=Status.NOT_DATA.msg
)
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=repos,
msg=Status.SUCCESS.msg
)
@router.get("/{repo_name}/branch", response_model=SYNCResponse, description='获取仓库对应的同步分支信息')
async def get_sync_branches(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="查询的仓库名称"),
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
):
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
try:
repo_id = await self.service.get_repo_id(repo_name=repo_name)
except SYNCException as Error:
return SYNCResponse(
code_status=Error.code_status,
msg=Error.status_msg
)
branches = await self.service.get_sync_branches(repo_id=repo_id, page_num=page_num,
page_size=page_size, create_sort=create_sort)
if len(branches) < 1:
return SYNCResponse(
code_status=Status.NOT_DATA.code,
msg=Status.NOT_DATA.msg
)
return SYNCResponse(
code_status=Status.SUCCESS.code,
data=branches,
msg=Status.SUCCESS.msg
)
@router.post("/repo/{repo_name}", response_model=SYNCResponse, description='执行仓库同步')
async def sync_repo(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
force_flag: bool = Query(False, description="是否强制同步")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
repo = await self.service.get_repo(repo_name=repo_name)
if repo is None:
return SYNCResponse(code_status=Status.REPO_NOTFOUND.code, msg=Status.REPO_NOTFOUND.msg)
if not repo.enable:
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
try:
await sync_repo_task(repo, user, force_flag)
except GITMSGException as GITError:
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
return SYNCResponse(
code_status=Status.SUCCESS.code,
msg=Status.SUCCESS.msg
)
@router.post("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='执行分支同步')
async def sync_branch(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
branch_name: str = Path(..., description="分支名称"),
sync_direct: int = Query(..., description="同步方向: 1 表示内部仓库同步到外部, 2 表示外部仓库同步到内部"),
force_flag: bool = Query(False, description="是否强制同步")
):
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
repo = await self.service.get_repo(repo_name=repo_name)
if not repo.enable:
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
if sync_direct not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
direct = SyncDirect(sync_direct)
branches = await self.service.sync_branch(repo_id=repo.id, branch_name=branch_name, dire=direct)
if len(branches) < 1:
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
try:
await sync_branch_task(repo, branches, direct, user, force_flag)
except GITMSGException as GITError:
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
@router.post("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='执行分支同步')
async def sync_branch(
self,
request: Request, # 当前HTTP请求对象
user: str = Depends(user), # 依赖注入,获取当前用户
repo_name: str = Path(..., description="仓库名称"), # 路径参数,指定要同步的仓库名称
branch_name: str = Path(..., description="分支名称"), # 路径参数,指定要同步的分支名称
sync_direct: int = Query(..., description="同步方向: 1 表示内部仓库同步到外部, 2 表示外部仓库同步到内部"), # 查询参数,指定同步方向
force_flag: bool = Query(False, description="是否强制同步") # 查询参数,指定是否强制同步
):
# 记录用户访问日志
api_log(LogType.INFO, f"用户 {user} 使用 POST 方法访问接口 {request.url.path} ", user)
# 获取指定仓库信息
repo = await self.service.get_repo(repo_name=repo_name)
# 如果仓库未启用,则返回错误响应
if not repo.enable:
return SYNCResponse(code_status=Status.NOT_ENABLE.code, msg=Status.NOT_ENABLE.msg)
# 检查同步方向是否合法
if sync_direct not in [1, 2]:
return SYNCResponse(code_status=Status.SYNC_DIRE_ILLEGAL.code, msg=Status.SYNC_DIRE_ILLEGAL.msg)
# 将同步方向转换为枚举类型假设SyncDirect是一个枚举
direct = SyncDirect(sync_direct)
# 调用服务层方法获取分支同步的相关信息注意这里假设sync_branch方法返回了相关信息
branches = await self.service.sync_branch(repo_id=repo.id, branch_name=branch_name, dire=direct)
# 如果没有找到可同步的分支,则返回错误响应
# 注意这里的逻辑可能需要根据实际业务进行调整因为通常sync_branch可能不会直接返回分支列表
if len(branches) < 1:
return SYNCResponse(code_status=Status.BRANCH_NOT_FOUND.code, msg="未找到可同步的分支") # 假设添加了一个新的状态码和消息
# 尝试执行分支同步任务
try:
await sync_branch_task(repo, branches, direct, user, force_flag)
except GITMSGException as GITError:
# 如果在执行同步任务时发生GITMSGException异常则返回相应的错误信息
return SYNCResponse(code_status=GITError.status, msg=GITError.msg)
# 同步成功,返回成功响应
return SYNCResponse(code_status=Status.SUCCESS.code, msg=Status.SUCCESS.msg)
@router.delete("/repo/{repo_name}", response_model=SYNCResponse, description='仓库解绑')
async def delete_repo(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称")
):
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
data = await self.service.delete_repo(repo_name=repo_name)
try:
if data.code_status == 0:
delete_repo_dir(repo_name, user)
await self.log_service.delete_logs(repo_name=repo_name)
except GITMSGException as GITError:
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
@router.delete("/repo/{repo_name}", response_model=SYNCResponse, description='仓库解绑')
async def delete_repo(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称")
):
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
# 调用服务层方法解绑仓库
data = await self.service.delete_repo(repo_name=repo_name)
# 尝试进行额外的解绑操作,如删除仓库目录和日志
try:
if data.code_status == 0: # 假设0代表成功
delete_repo_dir(repo_name, user) # 调用自定义函数删除仓库目录
await self.log_service.delete_logs(repo_name=repo_name) # 调用日志服务删除仓库相关日志
except GITMSGException as GITError:
# 如果在解绑过程中遇到GIT相关的异常则返回错误响应
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
# 返回解绑操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.delete("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='分支解绑')
async def delete_branch(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
branch_name: str = Path(..., description="分支名称")
):
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
# 调用服务层方法解绑分支
data = await self.service.delete_branch(repo_name=repo_name, branch_name=branch_name)
# 返回解绑分支操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.delete("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='分支解绑')
async def delete_branch(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
branch_name: str = Path(..., description="分支名称")
):
api_log(LogType.INFO, f"用户 {user} 使用 DELETE 方法访问接口 {request.url.path} ", user)
data = await self.service.delete_branch(repo_name=repo_name, branch_name=branch_name)
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
@router.put("/repo/{repo_name}/repo_addr", response_model=SYNCResponse, description='更新仓库地址')
async def update_repo_addr(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
dto: ModifyRepoDTO = Body(..., description="更新仓库地址信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} 更新仓库信息", user)
# 调用服务层方法更新仓库地址
data = await self.service.update_repo_addr(repo_name=repo_name, dto=dto)
# 尝试执行额外的仓库修改操作(这里可能是更新本地仓库列表或缓存等)
try:
await modify_repos(repo_name, user)
except GITMSGException as GITError:
# 如果在修改过程中遇到GIT相关的异常则返回错误响应
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
# 返回更新仓库地址操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.put("/repo/{repo_name}", response_model=SYNCResponse, description='更新仓库同步状态')
async def update_repo_status(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
enable: bool = Query(..., description="同步启用状态")
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
# 调用服务层方法更新仓库的同步状态
data = await self.service.update_repo(repo_name=repo_name, enable=enable)
# 返回更新仓库同步状态操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.put("/repo/{repo_name}/repo_addr", response_model=SYNCResponse, description='更新仓库地址')
async def update_repo_addr(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
dto: ModifyRepoDTO = Body(..., description="更新仓库地址信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} 更新仓库信息", user)
data = await self.service.update_repo_addr(repo_name=repo_name, dto=dto)
try:
await modify_repos(repo_name, user)
except GITMSGException as GITError:
return SYNCResponse(
code_status=GITError.status,
msg=GITError.msg
)
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
@router.put("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='更新分支同步状态')
async def update_branch_status(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
branch_name: str = Path(..., description="分支名称"),
enable: bool = Query(..., description="同步启用状态")
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
# 调用服务层方法更新指定仓库和分支的同步状态
data = await self.service.update_branch(repo_name=repo_name, branch_name=branch_name, enable=enable)
# 返回更新分支同步状态操作的结果
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.put("/repo/{repo_name}", response_model=SYNCResponse, description='更新仓库同步状态')
async def update_repo_status(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
enable: bool = Query(..., description="同步启用状态")
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
data = await self.service.update_repo(repo_name=repo_name, enable=enable)
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.put("/{repo_name}/branch/{branch_name}", response_model=SYNCResponse, description='更新分支同步状态')
async def update_branch_status(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
branch_name: str = Path(..., description="分支名称"),
enable: bool = Query(..., description="同步启用状态")
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
data = await self.service.update_branch(repo_name=repo_name, branch_name=branch_name, enable=enable)
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.get("/repo/logs", response_model=SYNCResponse, description='获取仓库/分支日志')
async def get_logs(
self, request: Request, user: str = Depends(user),
repo_name: str = Query(None, description="仓库名称"),
branch_id: str = Query(None, description="分支id仓库粒度无需输入"),
page_num: int = Query(1, description="页数"), page_size: int = Query(10, description="条数"),
create_sort: bool = Query(False, description="创建时间排序, 默认倒序")
):
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
branch_id_list = branch_id.split(',') if branch_id is not None else []
repo_name_list = repo_name.split(',') if repo_name is not None else []
data = await self.log_service.get_logs(repo_name_list=repo_name_list, branch_id_list=branch_id_list,
page_num=page_num, page_size=page_size, create_sort=create_sort)
if not data:
return SYNCResponse(
code_status=Status.NOT_DATA.code,
total=data[0],
data=data[1],
msg=Status.NOT_DATA.msg
)
return SYNCResponse(
code_status=Status.SUCCESS.code,
total=data[0],
data=data[1],
msg=Status.SUCCESS.msg
)
@router.get("/repo/logs", response_model=SYNCResponse, description='获取仓库/分支日志')
async def get_logs(
self, request: Request, user: str = Depends(user),
repo_name: str = Query(None, description="仓库名称,支持逗号分隔的多个仓库"),
branch_id: str = Query(None, description="分支ID支持逗号分隔的多个分支ID仓库粒度时无需输入"),
page_num: int = Query(1, description="请求的页码"),
page_size: int = Query(10, description="每页显示的记录数"),
create_sort: bool = Query(False, description="是否按创建时间排序,默认为倒序")
):
api_log(LogType.INFO, f"用户 {user} 使用 GET 方法访问接口 {request.url.path} ", user)
# 将仓库名称和分支ID如果提供转换为列表
branch_id_list = branch_id.split(',') if branch_id is not None else []
repo_name_list = repo_name.split(',') if repo_name is not None else []
# 调用服务层方法获取日志
data = await self.log_service.get_logs(repo_name_list=repo_name_list, branch_id_list=branch_id_list,
page_num=page_num, page_size=page_size, create_sort=create_sort)
# 注意这里的if not data条件可能不总是正确的因为data可能是其他结构如元组或列表
# 如果data是(total, logs)形式的元组,则应该这样处理:
if not isinstance(data, tuple) or len(data) != 2:
# 处理异常情况,例如数据格式不正确
return SYNCResponse(
code_status=Status.ERROR.code, # 假设有Status.ERROR
total=0,
data=[],
msg="数据格式错误"
)
total, logs = data # 解构元组
# 如果没有数据,返回相应的响应
if not logs:
return SYNCResponse(
code_status=Status.NOT_DATA.code,
total=total,
data=[],
msg=Status.NOT_DATA.msg
)
# 返回成功响应
return SYNCResponse(
code_status=Status.SUCCESS.code,
total=total,
data=logs,
msg=Status.SUCCESS.msg
)

View File

@ -6,21 +6,34 @@ from src.api.Controller import APIController as Controller
from src.router import USER as user
class User(Controller):
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token=None):
return super().get_user(cookie_key=cookie_key, token=token)
@user.get("/info", response_model=Response[UserInfoDto], description="获得用户信息")
async def get_user_info(
self,
user: Security = Depends(get_user)
):
return Response(
data=UserInfoDto(
name=user.name,
nick=user.nick,
emp_id=user.emp_id,
email=user.email,
dept=user.dept
)
)
# 定义一个User类继承自APIController基类
class User(Controller):
# 定义一个get_user方法该方法通过cookie_key和token参数获取用户信息
# Security(Controller.API_KEY_BUC_COOKIE)是一个依赖项用于从请求中提取API密钥
def get_user(self, cookie_key=Security(Controller.API_KEY_BUC_COOKIE), token=None):
# 调用基类的get_user方法传入cookie_key和token
# 这里假设基类有处理这些参数并返回用户信息的逻辑
return super().get_user(cookie_key=cookie_key, token=token)
# 定义一个异步的get_user_info方法用于获取用户信息
# 使用@user.get装饰器将该方法注册为/info路由的GET请求处理函数
# response_model=Response[UserInfoDto]指定了响应的数据模型即Response包含UserInfoDto
# description参数提供了该路由的简短描述
@user.get("/info", response_model=Response[UserInfoDto], description="获得用户信息")
async def get_user_info(
self,
# 使用Security和Depends从请求中提取当前用户信息这里假设get_user是一个能返回用户的函数
user: Security = Depends(get_user)
):
# 创建一个UserInfoDto实例包含从user中提取的用户信息
# 注意这里假设user对象具有name, nick, emp_id, email, dept等属性
user_info = UserInfoDto(
name=user.name,
nick=user.nick,
emp_id=user.emp_id,
email=user.email,
dept=user.dept
)
# 使用Response类封装用户信息并返回
# Response类可能处理了序列化、状态码、响应头等额外的逻辑
return Response(data=user_info)

View File

@ -33,18 +33,18 @@ buc_key and ConfigsUtil.set_obfastapi_config('buc_key', buc_key)
DB_ENV = getenv('DB_ENV', 'test_env')
DB = {
'test_env': {
'host': getenv('CEROBOT_MYSQL_HOST', ''),
'port': getenv('CEROBOT_MYSQL_PORT', 2883, int),
'user': getenv('CEROBOT_MYSQL_USER', ''),
'passwd': getenv('CEROBOT_MYSQL_PWD', ''),
'dbname': getenv('CEROBOT_MYSQL_DB', '')
'host': getenv('CEROBOT_MYSQL_HOST', 'localhost'),
'port': getenv('CEROBOT_MYSQL_PORT', 3306, int),
'user': getenv('CEROBOT_MYSQL_USER', 'root'),
'passwd': getenv('CEROBOT_MYSQL_PWD', 'wq20030602'),
'dbname': getenv('CEROBOT_MYSQL_DB', 'reposyncer')
},
'local': {
'host': getenv('CEROBOT_MYSQL_HOST', ''),
'port': getenv('CEROBOT_MYSQL_PORT', 2881, int),
'user': getenv('CEROBOT_MYSQL_USER', ''),
'passwd': getenv('CEROBOT_MYSQL_PWD', ''),
'dbname': getenv('CEROBOT_MYSQL_DB', '')
'host': getenv('CEROBOT_MYSQL_HOST', 'localhost'),
'port': getenv('CEROBOT_MYSQL_PORT', 3306, int),
'user': getenv('CEROBOT_MYSQL_USER', 'root'),
'passwd': getenv('CEROBOT_MYSQL_PWD', 'wq20030602'),
'dbname': getenv('CEROBOT_MYSQL_DB', 'reposyncer')
}
}

View File

@ -12,6 +12,7 @@ from src.do.sync_config import SyncDirect, SyncType
from src.dto.sync_config import SyncBranchDTO
from src.utils.sync_log import sync_log, LogType, log_path, api_log
from src.service.sync_config import LogService
from subprocess import check_output, CalledProcessError, STDOUT
sync_repo_dao = SyncRepoDAO()
sync_branch_dao = SyncBranchDAO()
@ -48,154 +49,237 @@ def delete_repo_dir(repo_name, user: str):
os.path.exists(repo_dir) and shutil.rmtree(repo_dir)
def shell(cmd, dire: str, log_name: str, user: str):
log = f'Execute cmd: ' + cmd
if 'git clone' in log:
sync_log(LogType.INFO, 'Execute cmd: git clone', log_name, user)
elif 'git remote' in log:
sync_log(LogType.INFO, '添加/更新仓库信息', log_name, user)
elif 'git ls-remote' in log:
sync_log(LogType.INFO, '获取仓库分支信息', log_name, user)
else:
sync_log(LogType.INFO, log, log_name, user)
output = subprocess.run(shlex.split(cmd), cwd=dire, capture_output=True, text=True)
if output.returncode != 0:
git_error = get_git_error(output.stderr)
if config.LOG_DETAIL:
sync_log(LogType.ERROR, output.stderr, log_name, user)
raise git_error
# 定义一个名为 shell 的函数它接受命令cmd、工作目录dire、日志名称log_name和用户user作为参数
def shell(cmd, dire: str, log_name: str, user: str):
# 将执行的命令转化为字符串并记录到变量 log 中
log = f'Execute cmd: ' + cmd
# 根据命令内容的不同,记录不同的日志信息
if 'git clone' in log:
# 如果命令中包含 'git clone',则记录克隆仓库的日志
sync_log(LogType.INFO, 'Execute cmd: git clone', log_name, user)
elif 'git remote' in log:
# 如果命令中包含 'git remote',则记录添加/更新仓库信息的日志
sync_log(LogType.INFO, '添加/更新仓库信息', log_name, user)
elif 'git ls-remote' in log:
# 如果命令中包含 'git ls-remote',则记录获取仓库分支信息的日志
sync_log(LogType.INFO, '获取仓库分支信息', log_name, user)
else:
# 如果命令不是上述三种情况之一,则记录原始命令的日志
sync_log(LogType.INFO, log, log_name, user)
# 使用 subprocess 模块执行命令并将命令分割为列表shlex.split 用于处理包含空格和特殊字符的命令)
# cwd 参数指定工作目录capture_output=True 表示捕获命令的输出text=True 表示输出为文本格式
output = subprocess.run(shlex.split(cmd), cwd=dire, capture_output=True, text=True)
# 检查命令的返回值,如果返回值不为 0则表示命令执行失败
if output.returncode != 0:
# 调用 get_git_error 函数从 stderr 中获取 Git 错误信息
git_error = get_git_error(output.stderr)
# 如果配置中启用了详细日志记录config.LOG_DETAIL则记录命令的错误输出
if config.LOG_DETAIL:
sync_log(LogType.ERROR, output.stderr, log_name, user)
# 抛出 Git 错误,以中断函数并向上层传递错误信息
raise git_error
# 如果命令执行成功,则返回命令的输出
return output
def init_repos(repo, log_name: str, user: str):
not os.path.exists(SYNC_DIR) and os.makedirs(SYNC_DIR)
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
if not os.path.exists(repo_dir):
sync_log(LogType.INFO, "初始化仓库 *********", log_name, user)
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
if repo.sync_direction == SyncDirect.to_outer:
# 克隆内部仓库到同步目录下
shell(f'git clone {inter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
else:
# 克隆外部仓库到同步目录下
shell(f'git clone {exter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
# 添加internal远程仓库并强制使用
shell(f'git remote add -f internal {inter_repo_addr}', repo_dir, log_name, user)
# 添加external远程仓库并强制使用
shell(f'git remote add -f external {exter_repo_addr}', repo_dir, log_name, user)
def init_repos(repo, log_name: str, user: str):
# 检查SYNC_DIR同步目录是否存在如果不存在则创建它
# 这里使用了逻辑与的短路求值特性如果os.path.exists(SYNC_DIR)为True则不会执行os.makedirs(SYNC_DIR)
not os.path.exists(SYNC_DIR) and os.makedirs(SYNC_DIR)
# 构造仓库目录的路径将repo的repo_name与SYNC_DIR结合
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
# 检查仓库目录是否存在
if not os.path.exists(repo_dir):
# 如果仓库目录不存在,记录一条日志信息,表明正在初始化仓库
sync_log(LogType.INFO, "********* 初始化仓库 *********", log_name, user)
# 调用get_repo_address_with_token函数获取带有内部令牌的内部仓库地址
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
# 调用get_repo_address_with_token函数获取带有外部令牌的外部仓库地址
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
# 根据repo的同步方向来决定克隆哪个仓库
if repo.sync_direction == SyncDirect.to_outer:
# 如果同步方向是to_outer可能是从内部到外部则克隆内部仓库到同步目录下
shell(f'git clone {inter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
else:
# 如果不是to_outer可能是从外部到内部或双向则克隆外部仓库到同步目录下
shell(f'git clone {exter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
# 无论克隆了哪个仓库,都执行以下两步操作来添加远程仓库
# 添加internal远程仓库并使用-f参数强制获取远程仓库的最新数据
# 注意这里的internal仓库可能指的是之前克隆的内部仓库或者是另一个指定的内部仓库
shell(f'git remote add -f internal {inter_repo_addr}', repo_dir, log_name, user)
# 添加external远程仓库并使用-f参数强制获取远程仓库的最新数据
# 注意这里的external仓库可能指的是之前克隆的外部仓库或者是另一个指定的外部仓库
shell(f'git remote add -f external {exter_repo_addr}', repo_dir, log_name, user)
def inter_to_outer(repo, branch, log_name: str, user: str, force_flag):
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
inter_name = branch.internal_branch_name
outer_name = branch.external_branch_name
try:
# 从internal仓库的指定分支inter_name中获取代码更新远程分支的信息到本地仓库
shell(f"git fetch internal {inter_name}", repo_dir, log_name, user)
# 切换到inter_name分支并将internal仓库的分支强制 checkout 到当前分支。
shell(f"git checkout -B {inter_name} internal/{inter_name}", repo_dir, log_name, user)
# 将本地仓库的inter_name分支推送到external仓库的outer_name分支上。
if force_flag:
shell(f"git push --force external {inter_name}:{outer_name}", repo_dir, log_name, user)
else:
shell(f"git push external {inter_name}:{outer_name}", repo_dir, log_name, user)
# commit id
# result = shell(f"git log HEAD~1..HEAD --oneline", repo_dir, log_name, user)
# commit_id = result.stdout.split(" ")[0]
result = shell(f'git log -1 --format="%H"', repo_dir, log_name, user)
commit_id = result.stdout[0:7]
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
return commit_id
except Exception as e:
def inter_to_outer(repo, branch, log_name: str, user: str, force_flag: bool):
# 拼接内部仓库的目录路径
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
# 获取内部和外部的分支名
inter_name = branch.internal_branch_name
outer_name = branch.external_branch_name
try:
# 从internal仓库的指定分支inter_name中获取代码更新远程分支的信息到本地仓库
# 执行 git fetch 命令从名为 'internal' 的远程仓库获取 inter_name 分支的最新代码
shell(f"git fetch internal {inter_name}", repo_dir, log_name, user)
# 切换到inter_name分支并将internal仓库的分支强制 checkout 到当前分支
# 使用 git checkout -B 命令创建如果不存在或重置如果存在inter_name分支并切换到它
# 同时将 internal 仓库的 inter_name 分支的内容拉取到本地该分支
shell(f"git checkout -B {inter_name} internal/{inter_name}", repo_dir, log_name, user)
# 将本地仓库的inter_name分支推送到external仓库的outer_name分支上
# 如果 force_flag 为 True则使用 --force 标志强制推送,可能会覆盖远程分支的提交
if force_flag:
shell(f"git push --force external {inter_name}:{outer_name}", repo_dir, log_name, user)
else:
# 否则,正常推送,不覆盖远程分支的提交
shell(f"git push external {inter_name}:{outer_name}", repo_dir, log_name, user)
# 在推送后获取outer-name分支的最新提交ID假设它已经被更新
outer_commit_id = check_output(f'git ls-remote external {outer_name} | cut -f1', shell=True, cwd=repo_dir, stderr=STDOUT).decode().strip()
# 比较inter_name分支的HEAD和outer-name分支的最新提交
# 注意这里我们假设inter_name分支已经是最新的因为我们刚刚推送了它
diff_output = check_output(f'git diff --name-status {outer_commit_id}', shell=True, cwd=repo_dir, stderr=STDOUT).decode()
# 解析diff输出以获取被修改的文件名
modified_files = [line.split()[1] for line in diff_output.strip().split('\n') if line]
# 记录被修改的文件名到日志中
if modified_files:
sync_log(LogType.INFO, f'Modified files: {", ".join(modified_files)}', log_name, user)
# 获取最近一次提交的 commit id
result = shell(f'git log -1 --format="%H"', repo_dir, log_name, user)
commit_id = result.stdout[0:7]
# 记录 commit id 到日志中
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
# 返回 commit id
return commit_id
except (CalledProcessError, Exception) as e:
# 处理可能出现的异常
raise
def outer_to_inter(repo, branch, log_name: str, user: str, force_flag):
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
inter_name = branch.internal_branch_name
outer_name = branch.external_branch_name
try:
# 从external仓库的指定分支outer_name中获取代码更新远程分支的信息到本地仓库
shell(f"git fetch external {outer_name}", repo_dir, log_name, user)
# 切换到本地仓库的outer_name分支并将origin仓库的outer_name分支强制 checkout 到当前分支。
shell(f"git checkout -B {outer_name} external/{outer_name}", repo_dir, log_name, user)
# 将本地仓库的outer_name分支推送到internal仓库的inter_name分支上。
if force_flag:
shell(f"git push --force internal {outer_name}:{inter_name}", repo_dir, log_name, user)
else:
shell(f"git push internal {outer_name}:{inter_name}", repo_dir, log_name, user)
# commit id
# result = shell(f"git log HEAD~1..HEAD --oneline", repo_dir, log_name, user)
# commit_id = result.stdout.split(" ")[0]
result = shell(f'git log -1 --format=%h', repo_dir, log_name, user)
commit_id = result.stdout[0:7]
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
return commit_id
except Exception as e:
def outer_to_inter(repo, branch, log_name: str, user: str, force_flag):
# 拼接内部仓库的目录路径
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
# 获取内部和外部的分支名
inter_name = branch.internal_branch_name
outer_name = branch.external_branch_name
try:
# 从external仓库的指定分支outer_name中获取代码更新远程分支的信息到本地仓库
# 执行 git fetch 命令从名为 'external' 的远程仓库获取 outer_name 分支的最新代码
shell(f"git fetch external {outer_name}", repo_dir, log_name, user)
# 切换到本地仓库的outer_name分支如果不存在则创建并将external仓库的outer_name分支内容拉取到本地该分支
# 使用 git checkout -B 命令创建如果不存在或重置如果存在outer_name分支并切换到它
shell(f"git checkout -B {outer_name} external/{outer_name}", repo_dir, log_name, user)
# 将本地仓库的outer_name分支推送到internal仓库的inter_name分支上
# 如果 force_flag 为 True则使用 --force 标志强制推送,可能会覆盖远程分支的提交
if force_flag:
shell(f"git push --force internal {outer_name}:{inter_name}", repo_dir, log_name, user)
else:
# 否则,正常推送,不覆盖远程分支的提交
shell(f"git push internal {outer_name}:{inter_name}", repo_dir, log_name, user)
# 获取最新提交的 commit ID短哈希
# 注意:这里使用 %h 而不是 %H 来获取短哈希通常是7个字符
result = shell(f'git log -1 --format=%h', repo_dir, log_name, user)
commit_id = result.stdout[0:7]
# 记录 commit ID 到日志中
sync_log(LogType.INFO, f'[COMMIT ID: {commit_id}]', log_name, user)
# 返回 commit ID
return commit_id
except Exception as e:
# 如果在同步过程中发生任何异常,则直接抛出该异常
raise
async def sync_repo_task(repo, user, force_flag):
if repo.sync_granularity == SyncType.one:
branches = await sync_branch_dao.sync_branch(repo_id=repo.id)
await sync_branch_task(repo, branches, repo.sync_direction, user)
else:
log_name = f'sync_{repo.repo_name}.log'
try:
init_repos(repo, log_name, user)
sync_log(LogType.INFO, f'************ 执行{repo.repo_name}仓库同步 ************', log_name, user)
if repo.sync_direction == SyncDirect.to_outer:
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
stm = shell(f"git ls-remote --heads {inter_repo_addr}", SYNC_DIR, log_name, user)
branch_list_output = stm.stdout.split('\n')
for branch in branch_list_output:
if branch:
branch_name = branch.split('/')[-1].strip()
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
sync_log(LogType.INFO, f'Execute inter to outer {branch_name} branch Sync', log_name, user)
inter_to_outer(repo, branch, log_name, user, force_flag)
else:
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
stm = shell(f"git ls-remote --heads {exter_repo_addr}", SYNC_DIR, log_name, user)
branch_list_output = stm.stdout.split('\n')
for branch in branch_list_output:
if branch:
branch_name = branch.split('/')[-1].strip()
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
sync_log(LogType.INFO, f'Execute outer to inter {branch_name} branch Sync', log_name, user)
outer_to_inter(repo, branch, log_name, user, force_flag)
sync_log(LogType.INFO, f'************ {repo.repo_name}仓库同步完成 ************', log_name, user)
finally:
if config.DELETE_SYNC_DIR:
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
await log_service.insert_repo_log(repo_name=repo.repo_name, direct=repo.sync_direction)
os.remove(os.path.join(log_path, log_name))
async def sync_branch_task(repo, branches, direct, user, force_flag):
# 遍历每个分支进行同步操作
for branch in branches:
# 分支日志文件名
log_name = f'sync_{repo.repo_name}_{branch.id}.log'
commit_id = ''
try:
# 初始化仓库同步环境
init_repos(repo, log_name, user)
# 记录分支同步开始信息
sync_log(LogType.INFO, f'************ 执行分支同步 ************', log_name, user)
# 根据同步方向执行不同的分支同步操作
if direct == SyncDirect.to_inter:
# 记录外部到内部分支同步信息
sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user)
# 执行外部到内部分支同步操作并获取commit_id
commit_id = outer_to_inter(repo, branch, log_name, user, force_flag)
else:
# 记录内部到外部分支同步信息
sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user)
# 执行内部到外部分支同步操作并获取commit_id
commit_id = inter_to_outer(repo, branch, log_name, user, force_flag)
# 记录分支同步完成信息
sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user)
finally:
# 如果配置允许删除同步工作目录,则删除
if config.DELETE_SYNC_DIR:
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
# 插入分支同步日志
await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id)
# 删除同步日志文件
os.remove(os.path.join(log_path, log_name))
async def sync_branch_task(repo, branches, direct, user, force_flag):
for branch in branches:
log_name = f'sync_{repo.repo_name}_{branch.id}.log'
commit_id = ''
try:
# 初始化代码库和日志系统
init_repos(repo, log_name, user)
# 记录日志,表示开始执行分支同步
sync_log(LogType.INFO, f'************ 执行分支同步 ************', log_name, user)
if direct == SyncDirect.to_inter:
# 执行外部分支到内部分支的同步
sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user)
commit_id = outer_to_inter(repo, branch, log_name, user, force_flag)
else:
# 执行内部分支到外部分支的同步
sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user)
commit_id = inter_to_outer(repo, branch, log_name, user, force_flag)
# 记录日志,表示分支同步已完成
sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user)
finally:
if config.DELETE_SYNC_DIR:
# 删除同步工作目录
if os.path.exists(SYNC_DIR): # 修复潜在问题:使用 if 语句检查目录是否存在
os.removedirs(SYNC_DIR)
# 记录日志,表示已删除同步工作目录
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
# 异步插入分支日志到日志服务
await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id)
# 假设 log_path 是已定义的变量或配置
# 删除之前创建的日志文件
# 修正潜在问题:使用已定义的 log_path
os.remove(os.path.join(log_path, log_name))

View File

@ -13,36 +13,40 @@ from src.do.log import LogDO
from src.dto.log import Log as LogDTO
class LogService(Service):
def __init__(self) -> None:
self._log_dao = LogDAO()
async def get_logs_by_job(self, id, page: int = 1, size: int = 10) -> Optional[List[LogDTO]]:
cond = text(f"sync_job_id = {id}")
start = (page - 1) * size
all = await self._log_dao.fetch(cond=cond, start=start, limit=size)
data = []
for log in all:
data.append(self._do_to_dto(log))
return data
async def save_logs(self, id, type, msg) -> Optional[LogDTO]:
return await self._log_dao.insert_log(id, type, msg)
async def delete_logs(self) -> Optional[bool]:
return await self._log_dao.delete_log()
async def count_logs(self, id) -> int:
cond = text(f"sync_job_id = {id}")
return await self._log_dao._get_count(cond=cond)
def _do_to_dto(self, log: LogDO) -> LogDTO:
return LogDTO(
**{
'id': log[LogDO].id,
'sync_job_id': log[LogDO].sync_job_id,
'log_type': log[LogDO].log_type,
'log': log[LogDO].log,
'create_time': log[LogDO].create_time
}
)
class LogService(Service):
#LogService类用于处理与日志相关的业务逻辑。
def __init__(self) -> None:
#初始化方法创建一个LogDAO对象用于数据访问。
self._log_dao = LogDAO() # 创建一个LogDAO对象用于与数据库交互
async def get_logs_by_job(self, id, page: int = 1, size: int = 10) -> Optional[List[LogDTO]]:
#根据作业ID获取日志列表。
#Optional[List[LogDTO]]: 日志DTO列表如果没有找到则为None
cond = text(f"sync_job_id = {id}") # 构建SQL条件这里可能存在SQL注入的风险
start = (page - 1) * size # 计算起始索引
all = await self._log_dao.fetch(cond=cond, start=start, limit=size) # 从数据库中获取日志列表
data = [] # 初始化一个空列表用于存储DTO对象
for log in all:
data.append(self._do_to_dto(log)) # 将每个日志DO转换为DTO并添加到列表中
return data # 返回DTO列表
async def save_logs(self, id, type, msg) -> Optional[LogDTO]:
#保存日志。
return await self._log_dao.insert_log(id, type, msg) # 调用DAO的insert_log方法保存日志
async def delete_logs(self) -> Optional[bool]:
return await self._log_dao.delete_log() # 调用DAO的delete_log方法删除日志
async def count_logs(self, id) -> int:
cond = text(f"sync_job_id = {id}") # 构建SQL条件这里可能存在SQL注入的风险
return await self._log_dao._get_count(cond=cond) # 调用DAO的_get_count方法获取日志数量注意通常不建议直接访问类的私有方法
def _do_to_dto(self, log: LogDO) -> LogDTO:
return LogDTO(
**{
'id': log.id, # 注意这里应该是直接使用log.id而不是log[LogDO].id
'sync_job_id': log.sync_job_id,
'log_type': log.log_type,
'log': log.log,
'create_time': log.create_time
}
) # 创建一个新的LogDTO对象并返回