forked from Lesin/reposync
587 lines
28 KiB
Python
587 lines
28 KiB
Python
import asyncio
|
||
import os
|
||
import json
|
||
from typing import Optional
|
||
import requests
|
||
import re
|
||
from pathlib import Path
|
||
|
||
|
||
from src.service.pull_request import PullRequestService
|
||
from src.service.sync import JobService, ProjectService
|
||
from src.service.log import LogService
|
||
from src.dto.sync import Color, SyncType
|
||
from src.dto.sync import Job as JobDTO
|
||
from src.dto.pull_request import PullRequest as PullRequestDTO
|
||
from src.dto.sync import Project as ProjectDTO
|
||
from src.utils import cmd, author, gitlab, github
|
||
from src.utils.logger import logger
|
||
from src.base import config
|
||
from src.utils.logger import Log
|
||
from src.base.code import LogType
|
||
from src.common.repo import Repo, RepoType
|
||
|
||
|
||
async def apply_diff(project, job, pull: PullRequestDTO, dir):
|
||
# 从GitHub地址中提取组织和仓库名称
|
||
organization, repo = github.transfer_github_to_name(project.github_address)
|
||
|
||
# 根据仓库类型构建基础URL
|
||
baseUrl = ""
|
||
if pull.type == RepoType.Github:
|
||
baseUrl = f"{config.GITHUB_ENV['github_api_diff_address']}/{organization}/{repo}/pull/"
|
||
elif pull.type == RepoType.Gitee:
|
||
baseUrl = f"{config.GITEE_ENV['gitee_api_diff_address']}/{organization}/{repo}/pull/"
|
||
elif pull.type == RepoType.Gitcode:
|
||
pass
|
||
|
||
# 构建diff文件的URL和临时文件路径
|
||
diffUrl = baseUrl + str(pull.id) + ".diff"
|
||
tmpfile = dir + "/" + str(pull.id) + "_diff"
|
||
|
||
# 根据仓库类型构建下载diff文件的命令
|
||
download_diff_cmd = ""
|
||
if pull.type == RepoType.Github:
|
||
download_diff_cmd = f"curl -X GET {diffUrl} -H 'Accept: application/vnd.github.v3.diff'"
|
||
elif pull.type == RepoType.Gitee:
|
||
download_diff_cmd = f"curl -X GET {diffUrl}"
|
||
elif pull.type == RepoType.Gitcode:
|
||
pass
|
||
|
||
# 下载diff文件并写入临时文件中
|
||
with open(tmpfile, "w") as diff_file:
|
||
diff, err = await cmd.shell(download_diff_cmd, dir, job)
|
||
diff_file.write(diff)
|
||
|
||
# 检查diff是否可以无错误地应用
|
||
out, err = await cmd.shell('git apply --check ' + tmpfile, dir, job)
|
||
if out != "":
|
||
raise ValueError(f"git apply --check failed")
|
||
|
||
# 应用diff并处理冲突
|
||
out, err = await cmd.shell('git apply ' + tmpfile, dir, job)
|
||
if err.startswith("error"):
|
||
await Log(LogType.ERROR, "The git apply operation has some conflict", job.id)
|
||
await cmd.shell('rm -rf ' + dir, '.', job)
|
||
return
|
||
|
||
# 清理临时文件并将更改添加到git仓库中
|
||
await cmd.shell(f"rm -rf {tmpfile}", dir, job)
|
||
await cmd.shell('git add .', dir, job)
|
||
|
||
|
||
#用于同步代码从一个存储库到另一个存储库,处理不同类型的拉取请求(例如GitHub和GitLab),并在处理过程中记录相关日志。
|
||
async def sync_common(project, job, pull: PullRequestDTO):
|
||
try:
|
||
# 记录项目信息日志
|
||
await Log(LogType.INFO, f"The project base repo is {project.base}", job.id)
|
||
|
||
# 记录同步代码日志
|
||
await Log(LogType.INFO, f"Sync the job code from other repo to base {project.base} repo", job.id)
|
||
# 创建临时目录
|
||
dir = f"/tmp/{job.project}_job_inter_{job.id}_pull_{pull.id}"
|
||
|
||
# 记录拉取请求目录日志
|
||
await Log(LogType.INFO, f"The pull request dir is {dir}", job.id)
|
||
# 执行创建目录命令
|
||
await cmd.shell('mkdir ' + dir, '.', job)
|
||
|
||
# 克隆Git仓库到临时目录
|
||
await cmd.shell(
|
||
f"git clone -b {job.gitlab_branch} {project.gitlab_address}", dir, job)
|
||
repo_dir = dir + "/" + project.name
|
||
|
||
# GitHub 拉取请求处理
|
||
if project.base != RepoType.Github and project.github_address is not None:
|
||
# 获取Git仓库状态
|
||
await cmd.shell('git status', repo_dir, job)
|
||
new_branch = 'github_pr' + str(pull.id)
|
||
await Log(LogType.INFO, f"The new branch is {new_branch}", job.id)
|
||
# 创建并切换到新分支
|
||
await cmd.shell('git checkout -b ' + new_branch, repo_dir, job)
|
||
|
||
# 应用差异
|
||
apply_diff(project, job, pull, repo_dir)
|
||
commit_msg = "Github pull request #" + str(pull.id)
|
||
# 提交变更
|
||
await cmd.shell(f"git commit -m \"{commit_msg}\"", repo_dir, job)
|
||
# 推送新分支到远程仓库
|
||
await cmd.shell(f"git push -uv origin {new_branch}", repo_dir, job)
|
||
|
||
# 获取GitLab仓库类型
|
||
inter_type = gitlab.get_inter_repo_type(project.gitlab_address)
|
||
if inter_type is None:
|
||
# 记录错误日志
|
||
await Log(LogType.ERROR,
|
||
f"The {project.gitlab_address} is not belong to gitlab or antcode", job.id)
|
||
else:
|
||
# 发送合并请求
|
||
if inter_type == 'gitlab':
|
||
# 阿里巴巴 GitLab 仓库(Code Aone)
|
||
await Log(LogType.INFO,
|
||
f"Merge the pull request to internal Gitlab {project.name}", job.id)
|
||
# 获取GitLab仓库ID
|
||
repo_id = gitlab.get_repo_id_from_url(
|
||
project.gitlab_address)
|
||
if repo_id is None:
|
||
# 记录错误日志
|
||
await Log(LogType.ERROR,
|
||
f"We can not get the repo id {repo_id}", job.id)
|
||
await Log(LogType.INFO,
|
||
f"The project's gitlab repo id is {repo_id}", job.id)
|
||
await Log(LogType.INFO,
|
||
f"send the merge request about the pull request #{pull.id}", job.id)
|
||
# 发送合并请求到内部GitLab
|
||
merge_to_code_aone_inter(
|
||
repo_id, pull.id, new_branch, job.gitlab_branch)
|
||
else:
|
||
# 支付宝 Antcode 仓库
|
||
await Log(LogType.INFO,
|
||
f"Merge the pull request to internal Antcode {project.name}", job.id)
|
||
# 获取组织和项目名称
|
||
organization, name = gitlab.get_organization_and_name_from_url(
|
||
project.gitlab_address)
|
||
|
||
# 发送合并请求到内部Antcode
|
||
merge_to_antcode_inter(
|
||
job.id, pull.id, organization, name, new_branch, job.gitlab_branch)
|
||
|
||
# 更新拉取请求内联状态
|
||
service = PullRequestService()
|
||
await service.update_inline_status(pull, True)
|
||
await service.update_latest_commit(pull)
|
||
|
||
# Gitee 拉取请求处理
|
||
|
||
# TODO: Gitcode 拉取请求处理
|
||
|
||
except:
|
||
# 记录错误日志
|
||
msg = f"The pull request #{pull.id} sync to the internal failed"
|
||
await Log(LogType.ERROR, msg, job.id)
|
||
finally:
|
||
# 删除临时目录
|
||
await cmd.shell('rm -rf ' + dir, '.', job)
|
||
|
||
|
||
#将拉取请求合并到GitLab指定的分支
|
||
def merge_to_code_aone_inter(repo_id: int, pull_id: int, source_branch: str, target_branch: str):
|
||
url = f"" # 合并请求的URL地址
|
||
param = {
|
||
'private_token': config.ACCOUNT['gitlab_token'], # GitLab访问令牌
|
||
}
|
||
data = {
|
||
"description": "合并拉取请求 #" + str(pull_id), # 合并请求的描述
|
||
"source_branch": source_branch, # 源分支
|
||
"target_branch": target_branch, # 目标分支
|
||
"title": "合并拉取请求 #" + str(pull_id) # 合并请求的标题
|
||
}
|
||
response = requests.post(url=url, params=param,
|
||
data=json.dumps(data)).json() # 发送POST请求,合并拉取请求
|
||
return response
|
||
|
||
#将拉取请求合并到Antcode指定的项目
|
||
async def merge_to_antcode_inter(job_id: int, pull_id: int, organization, name, source_branch, target_branch: str):
|
||
await Log(LogType.INFO,
|
||
"发送有关拉取请求 #" + str(pull_id) + " 的合并请求", job_id)
|
||
headers = {
|
||
"PRIVATE-TOKEN": config.ACCOUNT['antcode_token'] # Antcode访问令牌
|
||
}
|
||
mainSiteUrl = ""
|
||
organization = "oceanbase-docs"
|
||
name = "oceanbase-doc"
|
||
path = f"{organization}/{name}"
|
||
req_str = f"{mainSiteUrl}/api/v3/projects/find_with_namespace?path={path}"
|
||
response = requests.get(url=req_str, headers=headers).json()
|
||
projectId = response['id']
|
||
await Log(LogType.INFO, "Antcode项目ID为" + str(projectId), job_id)
|
||
# 合并请求
|
||
merge_req_str = f"{mainSiteUrl}/api/v3/projects/{projectId}/pull_requests"
|
||
param = {
|
||
'source_branch': source_branch, # 源分支
|
||
'target_branch': target_branch, # 目标分支
|
||
'squash_merge': True
|
||
}
|
||
response = requests.post(
|
||
url=merge_req_str, param=param, headers=headers).json()
|
||
return
|
||
|
||
#将代码同步到OceanBase,并创建任务进行处理。
|
||
async def sync_oceanbase(project, job, pull):
|
||
title = f"Github合并请求 #{pull.id} {pull.title}" # 合并请求的标题
|
||
await Log(LogType.INFO, "从Github同步代码到Inter的OceanBase", job.id)
|
||
# 创建任务命令
|
||
create_task_cmd = f"/usr/local/obdev/libexec/ob-task create --subject=\"{title}\" -T bug --branch={job.gitlab_branch} --description=\"{title}\""
|
||
out, err = await cmd.shell(create_task_cmd, '.', job, env=dict(os.environ, AONE_ISSUE_NICK='官明'))
|
||
issue_num = str.splitlines(out)[1].replace("[issue-id]", "").strip()
|
||
await Log(LogType.INFO,
|
||
f"关于OceanBase拉取请求 {pull.id} 的问题号为 {issue_num}", job.id)
|
||
task_id = issue_num.replace("T", "")
|
||
if task_id != "":
|
||
await Log(LogType.INFO,
|
||
f"通过ob flow成功创建任务号 {task_id}", job.id)
|
||
await cmd.shell(
|
||
f"/usr/local/obdev/libexec/ob-flow start T{task_id} {job.gitlab_branch}", '.', job, env=dict(
|
||
os.environ, OB_FLOW_PROJECT='oceanbase'))
|
||
task_addr = f"/data/1/wangzelin.wzl/task-{task_id}"
|
||
apply_diff(project, job, pull, task_addr)
|
||
await cmd.shell(f"git commit -m \"{title}\"", task_addr, job)
|
||
|
||
await cmd.shell("/usr/local/obdev/libexec/ob-flow checkin", task_addr, job)
|
||
service = PullRequestService()
|
||
await service.update_inline_status(pull, True)
|
||
await service.update_latest_commit(pull)
|
||
|
||
|
||
async def sync_pull_request(project, job):
|
||
# 从项目的GitHub地址中提取组织名和仓库名
|
||
organization, repo = github.transfer_github_to_name(project.github_address)
|
||
if organization and repo:
|
||
pull_request_service = PullRequestService()
|
||
# 同步指定项目的拉取请求
|
||
await pull_request_service.sync_pull_request(project.name, organization, repo)
|
||
|
||
pull_request_service = PullRequestService()
|
||
# 获取数据库中与当前任务相关的拉取请求列表
|
||
pull_request_list = await pull_request_service.fetch_pull_request(project=job.project)
|
||
|
||
if pull_request_list and len(pull_request_list) > 0:
|
||
await Log(LogType.INFO,
|
||
f"There are {len(pull_request_list)} pull requests in the database", job.id)
|
||
|
||
for pull in pull_request_list:
|
||
if pull.target_branch == job.github_branch:
|
||
await Log(LogType.INFO,
|
||
f"Judge the pull request #{pull.id} of project {project.name} if need to merge", job.id)
|
||
# 判断该拉取请求是否需要合并
|
||
need_merge = await pull_request_service.judge_pull_request_need_merge(project.name, organization, repo, pull.id)
|
||
if need_merge:
|
||
await Log(LogType.INFO,
|
||
f"The pull request #{pull.id} of project {project.name} need merge", job.id)
|
||
if job.project == "oceanbase":
|
||
# 如果是OceanBase项目,检查拉取请求是否在配置的ID列表中
|
||
if pull.id in config.OCEANBASE:
|
||
await sync_oceanbase(project, job, pull)
|
||
else:
|
||
await sync_common(project, job, pull)
|
||
else:
|
||
await Log(LogType.INFO,
|
||
f"The pull request #{pull.id} of project {project.name} does not need merge", job.id)
|
||
return
|
||
|
||
|
||
async def sync_inter_code(project, job):
|
||
# 判断同步类型
|
||
if job.type == SyncType.OneWay:
|
||
await sync_oneway_inter_code(project, job)
|
||
elif job.type == SyncType.TwoWay:
|
||
await sync_twoway_inter_code(project, job)
|
||
else:
|
||
await Log(LogType.ERROR,
|
||
"The job {job.github_branch}'s type of project {project.name} is wrong", job.id)
|
||
return
|
||
|
||
async def sync_oneway_inter_code(project: ProjectDTO, job: JobDTO):
|
||
service = JobService()
|
||
|
||
# 记录同步任务开始信息到日志
|
||
await Log(LogType.INFO, "Sync the job code to outer", job.id)
|
||
|
||
# 定义临时工作目录
|
||
dir = f"/data/1/tmp/{job.project}_job_outer_{job.id}"
|
||
await Log(LogType.INFO, f"The sync work dir is {dir}", job.id)
|
||
|
||
try:
|
||
# 在临时目录中创建工作目录
|
||
await cmd.shell('mkdir ' + dir, '.', job)
|
||
|
||
# 克隆指定分支的代码库到临时工作目录
|
||
await cmd.shell(
|
||
f"git clone -b {job.gitlab_branch} {project.gitlab_address} --depth=100", dir, job)
|
||
repo_dir = dir + "/" + project.name
|
||
|
||
# 显示当前代码库状态
|
||
await cmd.shell('git status', repo_dir, job)
|
||
|
||
# 添加GitHub远程仓库,并获取最新更新
|
||
await cmd.shell(
|
||
f"git remote add github {project.github_address}", repo_dir, job)
|
||
await cmd.shell('git fetch github', repo_dir, job)
|
||
|
||
# 切换到目标GitHub分支并同步更新
|
||
await cmd.shell(
|
||
f"git checkout -b out_branch github/{job.github_branch}", repo_dir, job)
|
||
await cmd.shell('git checkout ' + job.gitlab_branch, repo_dir, job)
|
||
|
||
# 如果有Gitee地址,添加远程仓库并获取最新更新
|
||
if project.gitee_address:
|
||
await cmd.shell(
|
||
f"git remote add gitee {project.gitee_address}", repo_dir, job)
|
||
await cmd.shell('git fetch gitee', repo_dir, job)
|
||
|
||
# 如果有Code China地址,添加远程仓库并获取最新更新
|
||
if project.code_china_address:
|
||
await cmd.shell(
|
||
f"git remote add csdn {project.code_china_address}", repo_dir, job)
|
||
result, err = await cmd.shell('git status', repo_dir, job)
|
||
|
||
# 获取最新提交信息
|
||
latestCommit = await service.get_job_lateset_commit(job.id)
|
||
await Log(LogType.INFO, 'The lastest commit is ' + latestCommit, job.id)
|
||
|
||
if latestCommit == 'no_commit':
|
||
# 如果没有最新提交,则获取最新合并提交
|
||
result, err = await cmd.shell(
|
||
f"git log HEAD^1..HEAD --oneline --merges", repo_dir, job)
|
||
commit = result.split(" ")[0]
|
||
await Log(LogType.INFO, f"patch the commit {commit}", job.id)
|
||
await patch_every_commit(repo_dir, project, job, commit)
|
||
return
|
||
else:
|
||
# 否则获取从最新提交到HEAD的所有合并提交
|
||
result, err = await cmd.shell(
|
||
"git log "+latestCommit + "..HEAD --oneline --merges", repo_dir, job)
|
||
|
||
if result == "":
|
||
await Log(LogType.INFO,
|
||
f"The commit {latestCommit} is the newest commit on the remote branch", job.id)
|
||
else:
|
||
# 反向遍历所有提交,并逐一进行代码补丁操作
|
||
commit_info_list = str.splitlines(result)
|
||
commit_info_list.reverse()
|
||
for commit_info in commit_info_list:
|
||
commit = commit_info.split(" ")[0]
|
||
await Log(LogType.INFO, "patch the commit " + commit, job.id)
|
||
await patch_every_commit(repo_dir, project, job, commit)
|
||
except:
|
||
# 捕获异常情况,并记录错误信息到日志
|
||
msg = f"Sync the code from inter to outer of project {project.name} branch {job.github_branch} failed"
|
||
await Log(LogType.ERROR, msg, job.id)
|
||
finally:
|
||
# 最终清理临时工作目录
|
||
await cmd.shell(f"rm -rf {dir}", '.', job)
|
||
await Log(LogType.INFO, f"remove the temper repo folder {dir}", job.id)
|
||
return
|
||
|
||
#该异步函数用于同步项目代码库的内容到多个远程仓库,包括GitHub、Gitee和Code China,并更新最新的提交哈希值。
|
||
async def sync_twoway_inter_code(project, job):
|
||
service = JobService()
|
||
await Log(LogType.INFO, "Sync the job document to outer", job.id) # 记录同步任务开始信息到日志
|
||
dir = "/tmp/" + job.project+"_job_outer_"+str(job.id) # 定义临时工作目录
|
||
await Log(LogType.INFO, f"The sync work dir is {dir}", job.id) # 记录临时工作目录路径到日志
|
||
|
||
try:
|
||
await cmd.shell('mkdir ' + dir, '.', job) # 创建临时工作目录
|
||
await cmd.shell(
|
||
f"git clone -b {job.gitlab_branch} {project.gitlab_address}", dir, job) # 克隆指定分支的代码库到临时工作目录
|
||
repo_dir = dir + "/" + project.name # 设置代码库目录路径
|
||
|
||
await cmd.shell('git status', repo_dir, job) # 显示当前代码库状态
|
||
await cmd.shell('git remote add github ' +
|
||
project.github_address, repo_dir, job) # 添加GitHub远程仓库地址
|
||
await cmd.shell('git fetch github', repo_dir, job) # 获取GitHub远程仓库的更新
|
||
await cmd.shell('git pull -r github ' + job.github_branch, repo_dir, job) # 从GitHub远程仓库拉取并重新基于合并
|
||
await cmd.shell(
|
||
f"git push origin {job.gitlab_branch} -f", repo_dir, job) # 强制推送更新到GitLab远程仓库
|
||
await cmd.shell(
|
||
f"git push github {job.github_branch} -f", repo_dir, job) # 强制推送更新到GitHub远程仓库
|
||
|
||
if project.gitee_address:
|
||
await cmd.shell('git remote add gitee ' +
|
||
project.gitee_address, repo_dir, job) # 如果有Gitee地址,添加Gitee远程仓库地址
|
||
await cmd.shell('git fetch gitee', repo_dir, job) # 获取Gitee远程仓库的更新
|
||
await cmd.shell(
|
||
f"git push gitee {job.gitlab_branch}:{job.gitee_branch}", repo_dir, job) # 推送更新到Gitee远程仓库
|
||
|
||
if project.code_china_address:
|
||
await cmd.shell('git remote add csdn ' +
|
||
project.code_china_address, repo_dir, job) # 如果有Code China地址,添加Code China远程仓库地址
|
||
result, err = await cmd.shell('git status', repo_dir, job) # 显示当前代码库状态
|
||
await cmd.shell(
|
||
f"git push csdn {job.gitlab_branch}:{job.code_china_branch}", repo_dir, job) # 推送更新到Code China远程仓库
|
||
|
||
# 更新最新提交的哈希值
|
||
result, err = await cmd.shell(
|
||
"git log HEAD~1..HEAD --oneline", repo_dir, job) # 获取最新提交的信息
|
||
commit = result.split(" ")[0] # 提取最新提交的哈希值
|
||
await service.update_job_lateset_commit(job.id, commit) # 更新任务中的最新提交哈希值
|
||
except:
|
||
msg = f"Sync the document from inter to outer of project {project.name} branch {job.github_branch} failed"
|
||
await Log(LogType.Error, msg, job.id) # 捕获异常情况,并记录错误信息到日志
|
||
finally:
|
||
await cmd.shell(f"rm -rf {dir}", '.', job) # 最终清理临时工作目录
|
||
await Log(LogType.INFO, f"remove the temper repo folder {dir}", job.id) # 记录删除临时工作目录的日志
|
||
return
|
||
|
||
#该函数用于从给定的作者内容字符串中提取作者的邮箱地址,返回邮箱地址的用户名部分。
|
||
def get_author_from_oceanbase(author_content: str) -> Optional[str]:
|
||
partten = r'Author : (.*) \((.*)\)' # 定义正则表达式模式,用于匹配作者信息
|
||
matchObj = re.match(partten, author_content, re.M | re.I) # 使用正则表达式匹配作者内容
|
||
if matchObj:
|
||
author = matchObj.group(2) # 提取匹配到的第二部分,即邮箱地址
|
||
return author.split('#')[0] # 返回邮箱地址的用户名部分
|
||
return None # 如果没有匹配到,返回None
|
||
|
||
#函数负责根据给定的commit,执行一系列git操作,包括切换分支、重置提交、获取提交信息、处理补丁文件等。最终目的是将代码从内部合并到外部,并处理可能的冲突。
|
||
async def patch_every_commit(dir, project, job, commit):
|
||
service = JobService() # 创建一个JobService实例
|
||
|
||
try:
|
||
await cmd.shell('git status', dir, job) # 在指定目录执行'git status'命令,检查当前git状态
|
||
await cmd.shell('git checkout ' + job.gitlab_branch, dir, job) # 切换到job对应的gitlab分支
|
||
await cmd.shell('git pull -r origin ' + job.gitlab_branch, dir, job) # 从远程仓库拉取最新代码并且rebase到当前分支上
|
||
await cmd.shell('git reset --hard ' + commit, dir, job) # 重置当前分支到指定commit
|
||
|
||
# 获取此次提交的日志信息
|
||
output, err = await cmd.shell("git log -1", dir, job)
|
||
|
||
email, err = await cmd.shell("git log --format='%ae' -1", dir, job) # 获取此次提交的作者email
|
||
if email is None:
|
||
raise ValueError("The commit has no email") # 如果没有email,抛出异常
|
||
await Log(LogType.INFO, f"The commit {commit} email is {email}", job.id) # 记录提交的email信息
|
||
|
||
if project.name == "oceanbase":
|
||
author_string = str.splitlines(output)[8].strip() # 获取第9行作者字符串
|
||
await Log(LogType.INFO,
|
||
f"The author string is {author_string}", job.id) # 记录作者字符串信息
|
||
domain = get_author_from_oceanbase(author_string) # 从oceanbase项目获取作者域
|
||
else:
|
||
domain = author.get_author_domain(email) # 通过email获取作者域
|
||
if domain is None:
|
||
raise ValueError("The commit author has no ali domain") # 如果没有作者域,抛出异常
|
||
await Log(LogType.INFO, f"The commit author ali domain is {domain}", job.id) # 记录作者域信息
|
||
|
||
content = str.splitlines(output)[5].strip() # 获取第6行的提交内容
|
||
await Log(LogType.INFO, f"content is {content}", job.id) # 记录提交内容信息
|
||
if content is None or content == "":
|
||
raise ValueError("The commit has no commit content") # 如果没有提交内容,抛出异常
|
||
await Log(LogType.INFO, f"The commit {commit} content is {content}", job.id) # 记录提交内容信息
|
||
# TODO 如果发现提交来自github,合并pull request
|
||
if content.startswith("Github Merge"):
|
||
pr_id = int(content.split()[3].replace('#', '')) # 提取PR id
|
||
pr_service = PullRequestService() # 创建PullRequestService实例
|
||
organization, repo = github.transfer_github_to_name(
|
||
project.github_address) # 获取组织和仓库名称
|
||
ans = await pr_service.merge_pull_request_code(organization, repo, pr_id) # 合并PR
|
||
if ans is None:
|
||
return
|
||
|
||
# 如果仓库中包含.ce文件,则在合并前执行相关操作
|
||
ce_file = Path(dir + '/.ce')
|
||
if ce_file.is_file():
|
||
await cmd.shell('bash .ce', dir, job) # 执行.ce文件中的脚本
|
||
else:
|
||
await Log(LogType.INFO,
|
||
f"There is no .ce file in the project {project.name}", job.id) # 记录没有.ce文件的日志信息
|
||
|
||
# TODO 检查git diff apply --check
|
||
diff, err = await cmd.shell("git diff out_branch", dir, job) # 获取与目标分支的差异
|
||
if diff == "":
|
||
# 如果没有差异,保存提交并返回
|
||
await cmd.shell('git reset --hard', dir, job) # 重置当前分支到最新提交
|
||
await service.update_job_lateset_commit(job.id, commit) # 更新job的最新提交信息
|
||
return
|
||
|
||
patch_file = '/tmp/' + job.github_branch + '_patch' # 创建补丁文件路径
|
||
await cmd.shell('rm -rf ' + patch_file, dir, job) # 删除可能存在的旧补丁文件
|
||
|
||
with open(patch_file, "w") as diff_file: # 打开补丁文件用于写入
|
||
diff, err = await cmd.shell("git diff out_branch", dir, job) # 再次获取差异
|
||
diff_file.write(diff) # 写入补丁文件
|
||
|
||
await cmd.shell('git reset --hard', dir, job) # 重置当前分支到最新提交
|
||
await cmd.shell('git checkout out_branch', dir, job) # 切换到目标分支
|
||
|
||
# git apply --check first
|
||
# out, err = await cmd.shell('git apply --check ' + patch_file, dir, job)
|
||
if err != "":
|
||
raise ValueError(
|
||
f"The commit {commit} has conflict to the branch {job.github_branch}") # 如果有冲突,抛出异常
|
||
|
||
await cmd.shell('git apply ' + patch_file, dir, job) # 应用补丁文件
|
||
await cmd.shell('git add .', dir, job) # 添加所有更改到暂存区
|
||
await cmd.shell(f"git commit -m \"{content}\"", dir, job) # 提交更改
|
||
|
||
# TODO:change commit author
|
||
out = await author.get_github_author_and_email(domain)
|
||
if out['author'] is None or out['email'] is None:
|
||
await Log(LogType.ERROR, f"The commit has no correct author or email", job.id)
|
||
raise ValueError("That is not a positive author or email")
|
||
await Log(LogType.INFO,
|
||
f"Get the commit author {out['author']} and email {out['email']}", job.id)
|
||
|
||
author_info = f"{out['author']} <{out['email']}>"
|
||
await cmd.shell(
|
||
f"git commit --amend --no-edit --author=\"{author_info}\"", dir, job)
|
||
|
||
await cmd.shell(f"git pull -r github {job.github_branch}", dir, job)
|
||
await cmd.shell(f"git push -u github out_branch:{job.github_branch}", dir, job)
|
||
|
||
if job.gitee_branch is not None:
|
||
await cmd.shell(f"git pull -r gitee {job.gitee_branch}", dir, job)
|
||
await cmd.shell(f"git push -u gitee out_branch:{job.gitee_branch}", dir, job)
|
||
|
||
if job.code_china_branch is not None:
|
||
await cmd.shell(f"git pull -r csdn {job.code_china_branch}", dir, job)
|
||
await cmd.shell(f"git push -u csdn out_branch:{job.code_china_branch}", dir, job)
|
||
|
||
await cmd.shell(f"git checkout {job.gitlab_branch}", dir, job)
|
||
|
||
# save the latest commit
|
||
ans = await service.update_job_lateset_commit(job.id, commit)
|
||
if ans:
|
||
await Log(LogType.INFO,
|
||
f"Update the latest commit {commit} successfully", job.id)
|
||
except:
|
||
msg = f"Sync the commit {commit} of project {project.name} failed"
|
||
await Log(LogType.ERROR, msg, job.id)
|
||
return
|
||
|
||
|
||
async def sync_job(job: JobDTO):
|
||
project_service = ProjectService()
|
||
project = await project_service.search_project(name=job.project)
|
||
|
||
if len(project) == 0:
|
||
await Log(LogType.INFO, "There are no projects in the database", job.id)
|
||
return
|
||
|
||
# 1. sync the outer pull request into inter
|
||
if job.type == SyncType.OneWay:
|
||
await sync_pull_request(project[0], job)
|
||
# 2. sync the inter code into outer
|
||
await sync_inter_code(project[0], job)
|
||
|
||
|
||
async def sync():
|
||
logger.info("Start syncing ****************************")
|
||
log_service = LogService()
|
||
await log_service.delete_logs()
|
||
# fetch the sync job list
|
||
service = JobService()
|
||
jobs = await service.list_jobs()
|
||
if jobs is None:
|
||
logger.info(f"There are no sync jobs in the database")
|
||
return
|
||
logger.info(f"There are {len(jobs)} sync jobs in the database")
|
||
|
||
tasks = []
|
||
for job in jobs:
|
||
# if the job status is green, it means we can sync the job
|
||
if job.status == Color.green:
|
||
await Log(LogType.INFO,
|
||
f"The github branch {job.github_branch} from {job.project} is now syncing", job.id)
|
||
task = asyncio.create_task(sync_job(job))
|
||
tasks.append(task)
|
||
else:
|
||
await Log(LogType.INFO,
|
||
f"The github branch {job.github_branch} from {job.project} does not need to sync", job.id)
|
||
for task in tasks:
|
||
await task
|
||
logger.info("End syncing ****************************")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
loop = asyncio.get_event_loop()
|
||
loop.run_until_complete(sync())
|