add hashid for repo database
This commit is contained in:
parent
17ae1edace
commit
a77c25d313
|
@ -0,0 +1,343 @@
|
|||
# aim: The ColdStartPerception service for OSCH
|
||||
# author: zhangxunhui
|
||||
# date: 2022-04-23
|
||||
|
||||
import os
|
||||
import queue
|
||||
import sys
|
||||
import threading
|
||||
from typing import List
|
||||
import hashlib
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
from ChangedMethodExtractor import ChangedMethodExtractor
|
||||
from dulwich.objects import Blob, Commit, Tag, Tree
|
||||
from dulwich.repo import Repo
|
||||
from dulwich.walk import WalkEntry
|
||||
from ESUtils import ESUtils
|
||||
from models.RepoInfo import RepoInfo
|
||||
from MySQLUtils import MySQLUtils
|
||||
|
||||
from services.utils import read_config
|
||||
|
||||
|
||||
class HandleRepository(object):
|
||||
def __init__(self, repoInfo: RepoInfo, config: dict, es_utils: ESUtils, type: str):
|
||||
self.config = config
|
||||
self.repoInfo = repoInfo
|
||||
self.repo = Repo(self.repoInfo.repo_path)
|
||||
self.type = type
|
||||
if self.type == "gitea":
|
||||
self.repoInfo.ownername = self.repo.path.split("/")[-2]
|
||||
self.repoInfo.reponame = self.repo.path.split("/")[-1].split(".")[0]
|
||||
self.mysql_utils = MySQLUtils(
|
||||
host=self.config["mysql"]["host"],
|
||||
port=self.config["mysql"]["port"],
|
||||
username=self.config["mysql"]["username"],
|
||||
password=self.config["mysql"]["password"],
|
||||
database=self.config["mysql"]["database"],
|
||||
autocommit=False,
|
||||
dictcursor=True,
|
||||
)
|
||||
repo_id = self.mysql_utils.get_repo_id(
|
||||
self.repoInfo.ownername, self.repoInfo.reponame
|
||||
)
|
||||
if repo_id is None:
|
||||
raise Exception(
|
||||
"HandleRepository Error: cannot find the id of repository: {repository_path}".format(
|
||||
repository_path=self.repository_path
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.repoInfo.repo_id = repo_id["id"]
|
||||
elif self.type == "local":
|
||||
self.repoInfo.repo_id = self.hash_path_to_id(self.repoInfo.repo_path)
|
||||
self.repoInfo.ownername, self.repoInfo.reponame = self.get_local_repoInfo()
|
||||
|
||||
self.es_utils = es_utils
|
||||
self.handled_commits = self.es_utils.get_handled_commits(
|
||||
repo_id=self.repoInfo.repo_id,
|
||||
index_name=self.config["elasticsearch"]["index_handled_commits"],
|
||||
)
|
||||
|
||||
# local repo id
|
||||
def hash_path_to_id(self, path):
|
||||
hash_hex = hashlib.sha256(path.encode()).hexdigest()
|
||||
return hash_hex
|
||||
|
||||
# local repo info
|
||||
def get_local_repoInfo(self):
|
||||
try:
|
||||
# 获取远程仓库的 URL
|
||||
result = subprocess.run(['git', 'remote', 'get-url', 'origin'],
|
||||
cwd=self.repoInfo.repo_path,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True)
|
||||
|
||||
if result.returncode == 0:
|
||||
url = result.stdout.strip()
|
||||
# 使用正则表达式解析 URL
|
||||
match = re.search(r'https://gitlink\.org\.cn/([^/]+)/([^/]+)\.git', url)
|
||||
if match:
|
||||
owner_name = match.group(1)
|
||||
repo_name = match.group(2)
|
||||
return owner_name, repo_name
|
||||
else:
|
||||
print("URL format not recognized:", url)
|
||||
return None, None
|
||||
else:
|
||||
print("Error:", result.stderr)
|
||||
return None, None
|
||||
except Exception as e:
|
||||
print("An error occurred:", e)
|
||||
return None, None
|
||||
|
||||
def run(self):
|
||||
"""Get all the commits."""
|
||||
|
||||
print(
|
||||
"[Info]: Handling repository {repo_path}".format(
|
||||
repo_path=self.repo.path
|
||||
)
|
||||
)
|
||||
|
||||
# commits: List[Commit] = []
|
||||
|
||||
# object_store = self.repo.object_store
|
||||
# object_shas = list(iter(object_store))
|
||||
# for object_sha in object_shas:
|
||||
# obj = object_store[object_sha]
|
||||
# if (
|
||||
# isinstance(obj, Tag)
|
||||
# or isinstance(obj, Blob)
|
||||
# or isinstance(obj, Tree)
|
||||
# ):
|
||||
# pass
|
||||
# elif isinstance(obj, Commit):
|
||||
# commits.append(obj)
|
||||
# else:
|
||||
# raise Exception("HandleRepository.run Error: unknown type!")
|
||||
|
||||
# if self.type == 'gitea':
|
||||
# """Whether this repository is forked or original"""
|
||||
# info = self.mysql_utils.get_repo_info(repo_id=self.repoInfo.repo_id)
|
||||
# is_fork = False
|
||||
# if info is not None and info["is_fork"] == 1:
|
||||
# is_fork = True
|
||||
# if is_fork:
|
||||
# # eliminate the forked commits
|
||||
# fork_id = info["fork_id"]
|
||||
# origin_info = self.mysql_utils.get_repo_info(repo_id=fork_id)
|
||||
# if origin_info is not None:
|
||||
# origin_ownername = origin_info["owner_name"]
|
||||
# origin_reponame = origin_info["name"]
|
||||
# origin_repo_path = os.path.join(
|
||||
# self.config["gitea"]["repositories_path"],
|
||||
# origin_ownername,
|
||||
# origin_reponame + ".git",
|
||||
# )
|
||||
# origin_repo = Repo(origin_repo_path)
|
||||
# origin_commits: List[Commit] = []
|
||||
# origin_object_store = origin_repo.object_store
|
||||
# origin_object_shas = list(iter(origin_object_store))
|
||||
# for object_sha in origin_object_shas:
|
||||
# obj = origin_object_store[object_sha]
|
||||
# if isinstance(obj, Commit):
|
||||
# origin_commits.append(obj)
|
||||
# commits = list(set(commits) - set(origin_commits))
|
||||
# else:
|
||||
# pass # origin repo not found in gitea mysql database
|
||||
# else:
|
||||
# pass # not a fork repo
|
||||
# else:
|
||||
# pass # local repo dont forked
|
||||
|
||||
# """Handle each commit."""
|
||||
# for commit in commits:
|
||||
# if commit.id.decode() not in self.handled_commits:
|
||||
# HandleCommit(
|
||||
# repo=self.repo,
|
||||
# repoInfo=self.repoInfo,
|
||||
# commit=commit,
|
||||
# config=self.config,
|
||||
# es_utils=self.es_utils,
|
||||
# ).run()
|
||||
# else:
|
||||
# continue
|
||||
|
||||
"""Finish handling this repo, insert into the handled_repo index in es."""
|
||||
es_data = {"repo_id": self.repoInfo.repo_id, "repo_owner": self.repoInfo.ownername, "repo_name": self.repoInfo.reponame}
|
||||
self.es_utils.insert_es_item(
|
||||
item=es_data,
|
||||
index_name=self.config["elasticsearch"]["index_handled_repos"],
|
||||
)
|
||||
|
||||
|
||||
class HandleRepoThread(threading.Thread):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
q: queue.Queue,
|
||||
config: dict,
|
||||
type: str,
|
||||
):
|
||||
threading.Thread.__init__(self)
|
||||
self.name = name
|
||||
self.q = q
|
||||
self.config = config
|
||||
self.es_utils = ESUtils(config=self.config)
|
||||
self.type = type
|
||||
|
||||
def run(self):
|
||||
print("[Info]: Start thread: " + self.name)
|
||||
while not self.q.empty():
|
||||
repoInfo = self.q.get()
|
||||
# handle one repository(local or gitea)
|
||||
handler = HandleRepository(
|
||||
repoInfo=repoInfo,
|
||||
config=self.config,
|
||||
es_utils=self.es_utils,
|
||||
type = self.type
|
||||
)
|
||||
handler.run()
|
||||
self.q.task_done()
|
||||
print("[Info]: Exist thread: " + self.name)
|
||||
|
||||
|
||||
class HandleCommit(object):
|
||||
def __init__(
|
||||
self,
|
||||
repo: Repo,
|
||||
repoInfo: RepoInfo,
|
||||
commit: Commit,
|
||||
config: dict,
|
||||
es_utils: ESUtils,
|
||||
):
|
||||
self.repo = repo
|
||||
self.repoInfo = repoInfo
|
||||
self.commit = commit
|
||||
self.config = config
|
||||
self.es_utils = es_utils
|
||||
|
||||
def run(self):
|
||||
commit_sha = self.commit.id.decode()
|
||||
print(
|
||||
"[Info]: Handling commit {commit_sha}".format(
|
||||
commit_sha=commit_sha
|
||||
)
|
||||
)
|
||||
|
||||
"""Generate all the changes for this commit."""
|
||||
walk_entry = WalkEntry(
|
||||
self.repo.get_walker(include=[self.commit.id]), self.commit
|
||||
)
|
||||
t_changes = walk_entry.changes() # get all the TreeChange objects
|
||||
if len(self.commit.parents) > 1:
|
||||
t_changes = [item for t_cs in t_changes for item in t_cs]
|
||||
|
||||
changed_methods = ChangedMethodExtractor(
|
||||
repo=self.repo,
|
||||
repoInfo=self.repoInfo,
|
||||
commit=self.commit,
|
||||
t_changes=t_changes,
|
||||
config=self.config,
|
||||
).parse()
|
||||
|
||||
es_data_bulk = self.es_utils.extract_es_infos(
|
||||
changed_methods=changed_methods
|
||||
)
|
||||
self.es_utils.insert_es_bulk(es_data_bulk)
|
||||
|
||||
"""Finish handling this commit, insert into the handled_commit index in es."""
|
||||
es_data = {"repo_id": self.repoInfo.repo_id, "commit_sha": commit_sha}
|
||||
self.es_utils.insert_es_item(
|
||||
item=es_data,
|
||||
index_name=self.config["elasticsearch"]["index_handled_commits"],
|
||||
)
|
||||
|
||||
|
||||
def handle_repositories(repositories_path: str, type: str, config: dict):
|
||||
"""Handle all the repositories in the directory."""
|
||||
|
||||
es_utils = ESUtils(config=config)
|
||||
es_utils.create_handled_repo_index()
|
||||
# es_utils.create_n_gram_index()
|
||||
# es_utils.create_handled_commit_index()
|
||||
|
||||
"""Handle repositories by multiple threads."""
|
||||
workQueue = queue.Queue()
|
||||
|
||||
if type == "local":
|
||||
repo_git_paths = [f.path for f in os.scandir(repositories_path) if f.is_dir()]
|
||||
for repo_git_path in repo_git_paths:
|
||||
workQueue.put(RepoInfo(repo_path=repo_git_path))
|
||||
|
||||
elif type == "gitea":
|
||||
# iterate all the ownernames
|
||||
ownername_paths = [
|
||||
f.path for f in os.scandir(repositories_path) if f.is_dir()
|
||||
]
|
||||
for ownername_path in ownername_paths:
|
||||
# iterate all the repositories
|
||||
repo_git_paths = [
|
||||
f.path for f in os.scandir(ownername_path) if f.is_dir()
|
||||
]
|
||||
for repo_git_path in repo_git_paths:
|
||||
# if "test1.git" not in repo_git_path:
|
||||
# continue # only for test
|
||||
|
||||
workQueue.put(RepoInfo(repo_path=repo_git_path))
|
||||
|
||||
else:
|
||||
print("[Error]: illegal type")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
THREADNUM = config["coldstart_service"]["THREADNUM"]
|
||||
threads = []
|
||||
for i in range(THREADNUM):
|
||||
t = HandleRepoThread(
|
||||
name="Thread-" + str(i + 1),
|
||||
q=workQueue,
|
||||
config=config,
|
||||
type=type,
|
||||
)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
|
||||
def main():
|
||||
config_path = os.path.join(
|
||||
os.path.dirname(os.path.realpath(__file__)), "config-repos.yml"
|
||||
)
|
||||
config = read_config(config_path)
|
||||
if config is None:
|
||||
print(
|
||||
"[Error]: configuration file {config_path} not found".format(
|
||||
config_path=config_path
|
||||
)
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# cold start check local data firstly
|
||||
try:
|
||||
local_repositories_path = config["local"]["repositories_path"]
|
||||
type = 'local'
|
||||
handle_repositories(repositories_path=local_repositories_path, type=type, config=config)
|
||||
except KeyError:
|
||||
try:
|
||||
gitea_repositories_path = config["gitea"]["repositories_path"]
|
||||
type = 'gitea'
|
||||
handle_repositories(repositories_path=gitea_repositories_path, type=type, config=config)
|
||||
except KeyError:
|
||||
print("[Error]: local and gitea repositories_path configration not found")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
print("Finish ColdStartPerception service")
|
Loading…
Reference in New Issue