token鉴权&仓库信息修改

This commit is contained in:
淮新 2024-04-24 10:11:38 +08:00
parent f8e6b97382
commit 22efdd9521
7 changed files with 131 additions and 47 deletions

6
sql/update_20240423.sql Normal file
View File

@ -0,0 +1,6 @@
ALTER TABLE `sync_repo_mapping`
ADD COLUMN `inter_token` VARCHAR(100) COMMENT '内部仓库token',
ADD COLUMN `exter_token` VARCHAR(100) COMMENT '外部仓库token';
ALTER TABLE `sync_branch_mapping`
MODIFY COLUMN `sync_direction` enum('to_outer', 'to_inter') COMMENT '首次同步方向';

View File

@ -14,7 +14,7 @@ from src.utils.sync_log import sync_log, LogType, api_log
from src.api.Controller import APIController as Controller
from src.router import SYNC_CONFIG as router
from src.do.sync_config import SyncDirect
from src.dto.sync_config import SyncRepoDTO, SyncBranchDTO, LogDTO
from src.dto.sync_config import SyncRepoDTO, SyncBranchDTO, LogDTO, ModifyRepoDTO
from src.service.sync_config import SyncService, LogService
from src.service.cronjob import sync_repo_task, sync_branch_task
from src.base.status_code import Status, SYNCResponse, SYNCException
@ -213,6 +213,19 @@ class SyncDirection(Controller):
msg=data.status_msg
)
@router.put("/repo/{repo_name}/repo_addr", response_model=SYNCResponse, description='更新仓库地址')
async def update_repo_addr(
self, request: Request, user: str = Depends(user),
repo_name: str = Path(..., description="仓库名称"),
dto: ModifyRepoDTO = Body(..., description="更新仓库地址信息")
):
api_log(LogType.INFO, f"用户 {user} 使用 PUT 方法访问接口 {request.url.path} ", user)
data = await self.service.update_repo_addr(repo_name=repo_name, dto=dto)
return SYNCResponse(
code_status=data.code_status,
msg=data.status_msg
)
@router.put("/repo/{repo_name}", response_model=SYNCResponse, description='更新仓库同步状态')
async def update_repo_status(
self, request: Request, user: str = Depends(user),

View File

@ -28,7 +28,9 @@ class SyncRepoMapping(DataObject):
repo_name = Column(String(128), unique=True, nullable=False, comment="仓库名称")
enable = Column(Boolean, default=True, comment="是否启用同步")
internal_repo_address = Column(String, nullable=False, comment="内部仓库地址")
inter_token = Column(String, nullable=True, comment="内部仓库token")
external_repo_address = Column(String, nullable=False, comment="外部仓库地址")
exter_token = Column(String, nullable=True, comment="外部仓库token")
sync_granularity = Column(Enum(SyncType), comment="同步类型")
sync_direction = Column(Enum(SyncDirect), comment="首次同步方向")
created_at = Column(TIMESTAMP, server_default=text('CURRENT_TIMESTAMP'), comment="创建时间")

View File

@ -6,11 +6,20 @@ class SyncRepoDTO(BaseModel):
repo_name: str = Field(..., description="仓库名称")
enable: bool = Field(..., description="同步状态")
internal_repo_address: str = Field(..., description="内部仓库地址")
inter_token: str = Field(None, description="内部仓库token")
external_repo_address: str = Field(..., description="外部仓库地址")
exter_token: str = Field(None, description="外部仓库token")
sync_granularity: int = Field(..., description="1 为仓库粒度的同步, 2 为分支粒度的同步")
sync_direction: int = Field(..., description="1 表示内部仓库同步到外部, 2 表示外部仓库同步到内部")
class ModifyRepoDTO(BaseModel):
internal_repo_address: str = Field(None, description="内部仓库地址")
inter_token: str = Field(None, description="内部仓库token")
external_repo_address: str = Field(None, description="外部仓库地址")
exter_token: str = Field(None, description="外部仓库token")
class SyncBranchDTO(BaseModel):
enable: bool = Field(..., description="是否启用分支同步")
internal_branch_name: str = Field(..., description="内部仓库分支名")

View File

@ -25,9 +25,33 @@ def get_git_error(stderr: str):
return GITMSGException(Status.UNKNOWN_ERROR)
def get_repo_address_with_token(address: str, token: str) -> str:
if token is None or token == "":
return address
try:
if not address.startswith('https') and not address.startswith('http'):
raise Exception('address is error')
if address.startswith('https'):
owner_name = address[8:].split("/")[1]
return address[:8] + owner_name + ":" + token + '@' + address[8:]
elif address.startswith('http'):
owner_name = address[7:].split("/")[1]
return address[:7] + owner_name + ":" + token + '@' + address[7:]
except Exception as e:
print(e)
def shell(cmd, dire: str, log_name: str, user: str):
log = f'Execute cmd: ' + cmd
sync_log(LogType.INFO, log, log_name, user)
if 'git clone' in log:
sync_log(LogType.INFO, 'Execute cmd: git clone', log_name, user)
elif 'git remote add' in log:
sync_log(LogType.INFO, 'Execute cmd: git remote add', log_name, user)
elif 'git ls-remote' in log:
sync_log(LogType.INFO, '获取仓库分支信息', log_name, user)
else:
sync_log(LogType.INFO, log, log_name, user)
output = subprocess.run(shlex.split(cmd), cwd=dire, capture_output=True, text=True)
if output.returncode != 0:
git_error = get_git_error(output.stderr)
@ -42,17 +66,18 @@ def init_repos(repo, log_name: str, user: str):
repo_dir = os.path.join(SYNC_DIR, repo.repo_name)
if not os.path.exists(repo_dir):
sync_log(LogType.INFO, "初始化仓库 *********", log_name, user)
repo_name = repo.repo_name
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
if repo.sync_direction == SyncDirect.to_outer:
# 克隆内部仓库到同步目录下
shell(f'git clone -b master {repo.internal_repo_address} {repo_dir}', SYNC_DIR, log_name, user)
shell(f'git clone -b master {inter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
else:
# 克隆外部仓库到同步目录下
shell(f'git clone -b master {repo.external_repo_address} {repo_dir}', SYNC_DIR, log_name, user)
shell(f'git clone -b master {exter_repo_addr} {repo_dir}', SYNC_DIR, log_name, user)
# 添加internal远程仓库并强制使用
shell(f'git remote add -f internal {repo.internal_repo_address}', repo_dir, log_name, user)
shell(f'git remote add -f internal {inter_repo_addr}', repo_dir, log_name, user)
# 添加external远程仓库并强制使用
shell(f'git remote add -f external {repo.external_repo_address}', repo_dir, log_name, user)
shell(f'git remote add -f external {exter_repo_addr}', repo_dir, log_name, user)
def inter_to_outer(repo, branch, log_name: str, user: str):
@ -97,32 +122,35 @@ async def sync_repo_task(repo, user):
log_name = f'sync_{repo.repo_name}.log'
init_repos(repo, log_name, user)
sync_log(LogType.INFO, f'************ 执行{repo.repo_name}仓库同步 ************', log_name, user)
if repo.sync_direction == SyncDirect.to_outer:
stm = shell(f"git ls-remote --heads {repo.internal_repo_address}", SYNC_DIR, log_name, user)
branch_list_output = stm.stdout.split('\n')
for branch in branch_list_output:
if branch:
branch_name = branch.split('/')[-1].strip()
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
sync_log(LogType.INFO, f'Execute inter to outer {branch_name} branch Sync', log_name, user)
inter_to_outer(repo, branch, log_name, user)
else:
stm = shell(f"git ls-remote --heads {repo.external_repo_address}", SYNC_DIR, log_name, user)
branch_list_output = stm.stdout.split('\n')
for branch in branch_list_output:
if branch:
branch_name = branch.split('/')[-1].strip()
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
sync_log(LogType.INFO, f'Execute outer to inter {branch_name} branch Sync', log_name, user)
outer_to_inter(repo, branch, log_name, user)
try:
if repo.sync_direction == SyncDirect.to_outer:
inter_repo_addr = get_repo_address_with_token(repo.internal_repo_address, repo.inter_token)
stm = shell(f"git ls-remote --heads {inter_repo_addr}", SYNC_DIR, log_name, user)
branch_list_output = stm.stdout.split('\n')
for branch in branch_list_output:
if branch:
branch_name = branch.split('/')[-1].strip()
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
sync_log(LogType.INFO, f'Execute inter to outer {branch_name} branch Sync', log_name, user)
inter_to_outer(repo, branch, log_name, user)
else:
exter_repo_addr = get_repo_address_with_token(repo.external_repo_address, repo.exter_token)
stm = shell(f"git ls-remote --heads {exter_repo_addr}", SYNC_DIR, log_name, user)
branch_list_output = stm.stdout.split('\n')
for branch in branch_list_output:
if branch:
branch_name = branch.split('/')[-1].strip()
branch = SyncBranchDTO(enable=1, internal_branch_name=branch_name, external_branch_name=branch_name)
sync_log(LogType.INFO, f'Execute outer to inter {branch_name} branch Sync', log_name, user)
outer_to_inter(repo, branch, log_name, user)
sync_log(LogType.INFO, f'************ {repo.repo_name}仓库同步完成 ************', log_name, user)
finally:
if config.DELETE_SYNC_DIR:
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
if config.DELETE_SYNC_DIR:
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
sync_log(LogType.INFO, f'************ {repo.repo_name}仓库同步完成 ************', log_name, user)
await log_service.insert_repo_log(repo_name=repo.repo_name, direct=repo.sync_direction)
os.remove(os.path.join(log_path, log_name))
await log_service.insert_repo_log(repo_name=repo.repo_name, direct=repo.sync_direction)
os.remove(os.path.join(log_path, log_name))
async def sync_branch_task(repo, branches, direct, user):
@ -131,18 +159,20 @@ async def sync_branch_task(repo, branches, direct, user):
log_name = f'sync_{repo.repo_name}_{branch.id}.log'
init_repos(repo, log_name, user)
sync_log(LogType.INFO, f'************ 执行分支同步 ************', log_name, user)
if direct == SyncDirect.to_inter:
sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user)
commit_id = outer_to_inter(repo, branch, log_name, user)
else:
sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user)
commit_id = inter_to_outer(repo, branch, log_name, user)
commit_id = ''
try:
if direct == SyncDirect.to_inter:
sync_log(LogType.INFO, f'Execute outer to inter {branch.external_branch_name} branch Sync', log_name, user)
commit_id = outer_to_inter(repo, branch, log_name, user)
else:
sync_log(LogType.INFO, f'Execute inter to outer {branch.internal_branch_name} branch Sync', log_name, user)
commit_id = inter_to_outer(repo, branch, log_name, user)
sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user)
finally:
if config.DELETE_SYNC_DIR:
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
if config.DELETE_SYNC_DIR:
os.path.exists(SYNC_DIR) and os.removedirs(SYNC_DIR)
sync_log(LogType.INFO, f'删除同步工作目录: {SYNC_DIR}', log_name, user)
sync_log(LogType.INFO, f'************ 分支同步完成 ************', log_name, user)
await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id)
os.remove(os.path.join(log_path, log_name))
await log_service.insert_branch_log(repo.repo_name, direct, branch.id, commit_id)
os.remove(os.path.join(log_path, log_name))

View File

@ -1,8 +1,9 @@
import re
from typing import List, Union, Optional, Dict
from .service import Service
from src.utils import base
from src.dao.sync_config import SyncBranchDAO, SyncRepoDAO, LogDAO
from src.dto.sync_config import SyncBranchDTO, SyncRepoDTO, RepoDTO, AllRepoDTO, GetBranchDTO, LogDTO, BranchDTO
from src.dto.sync_config import SyncBranchDTO, SyncRepoDTO, RepoDTO, AllRepoDTO, GetBranchDTO, LogDTO, BranchDTO, ModifyRepoDTO
from src.do.sync_config import SyncDirect, SyncType
from src.base.status_code import Status, SYNCException
from src.utils.sync_log import log_path
@ -105,6 +106,26 @@ class SyncService(Service):
return SYNCException(Status.SUCCESS)
async def update_repo_addr(self, repo_name: str, dto: ModifyRepoDTO) -> SYNCException:
repo = await self.sync_repo_dao.get(repo_name=repo_name)
if repo is None:
return SYNCException(Status.REPO_NOTFOUND)
update_fields = {}
if dto.internal_repo_address is not None:
if not base.check_addr(dto.internal_repo_address):
return SYNCException(Status.REPO_ADDR_ILLEGAL)
update_fields['internal_repo_address'] = dto.internal_repo_address
if dto.external_repo_address is not None:
if not base.check_addr(dto.external_repo_address):
return SYNCException(Status.REPO_ADDR_ILLEGAL)
update_fields['external_repo_address'] = dto.external_repo_address
if dto.inter_token is not None:
update_fields['inter_token'] = dto.inter_token
if dto.exter_token is not None:
update_fields['exter_token'] = dto.exter_token
await self.sync_repo_dao.update(repo, **update_fields)
return SYNCException(Status.SUCCESS)
async def update_repo(self, repo_name: str, enable: bool) -> SYNCException:
repo = await self.sync_repo_dao.get(repo_name=repo_name)
if repo is None:

View File

@ -1,12 +1,15 @@
import re
GIT_HTTP_PATTERN = r'https://.*.com/(.*)/(.*).git'
GIT_HTTPS_PATTERN = r'https://.*.com/(.*)/(.*).git'
GIT_HTTP_PATTERN = r'http://.*.com/(.*)/(.*).git'
GIT_SSH_PATTERN = r'git@.*.com:(.*)/(.*).git'
def check_addr(repo_address: str) -> bool:
try:
if repo_address.startswith('https'):
pattern = GIT_HTTPS_PATTERN
elif repo_address.startswith('http'):
pattern = GIT_HTTP_PATTERN
else:
pattern = GIT_SSH_PATTERN