196 lines
4.8 KiB
Python
196 lines
4.8 KiB
Python
# coding: utf-8
|
||
# 采集repo的issue;支持更新
|
||
import MySQLdb
|
||
import json
|
||
import time
|
||
import random
|
||
import urllib2
|
||
|
||
|
||
import Queue
|
||
import threading
|
||
PRS = Queue.Queue()
|
||
|
||
|
||
import dbop
|
||
dbop.init_pool(3)
|
||
|
||
|
||
import Queue
|
||
TOKEN_POOL = Queue.Queue()
|
||
with open(".config.json") as fp:
|
||
config = json.load(fp)
|
||
tokens = config["tokens"]
|
||
for token in tokens:
|
||
TOKEN_POOL.put(token)
|
||
|
||
def get_token():
|
||
if TOKEN_POOL.empty():
|
||
return None
|
||
token = TOKEN_POOL.get()
|
||
# !!!!应该判断该token是否还有访问机会,否则再放回池子里
|
||
return token
|
||
|
||
def push_token(token):
|
||
TOKEN_POOL.put(token)
|
||
|
||
|
||
|
||
send_headers = {"Content-Type":"application/json","Authorization":None}
|
||
def _get_url(url,retry_times=3):
|
||
|
||
while True:
|
||
token = get_token()
|
||
if token is None:
|
||
continue
|
||
send_headers["Authorization"] = "token %s"%token
|
||
push_token(token)
|
||
break
|
||
|
||
print "\t%s: "%threading.current_thread().name, url,token
|
||
|
||
req = urllib2.Request(url,headers = send_headers)
|
||
try:
|
||
error_msg, result = None,None
|
||
result = urllib2.urlopen(req,timeout=20)
|
||
raw_data = result.read().decode('utf-8')
|
||
except urllib2.HTTPError, e:
|
||
error_msg = e.code
|
||
except urllib2.URLError, e:
|
||
error_msg = e.reason
|
||
except Exception,e:
|
||
error_msg = e.message
|
||
|
||
if error_msg != None:
|
||
print error_msg
|
||
if retry_times == 0:
|
||
return None, result
|
||
else:
|
||
time.sleep(3*(4-retry_times))
|
||
return _get_url(url,retry_times-1)
|
||
return raw_data,result
|
||
|
||
def _fetchPR(pr):
|
||
|
||
|
||
ISSUE_URL_TEMPLATE = "https://api.github.com/repos/%s/%s/pulls/%s"
|
||
prj_id, pr_num, un, rn = pr
|
||
result = dbop.select_one("select repo_id from pr_changes where repo_id=%s and pr_num=%s", (prj_id, pr_num))
|
||
if result is not None:
|
||
print "already get", pr
|
||
return
|
||
print "\t\t%s: start to fetch issues for "%threading.current_thread().name, prj_id, pr_num, un, rn
|
||
|
||
while True:
|
||
|
||
url = ISSUE_URL_TEMPLATE%(un,rn,pr_num)
|
||
raw_data,result = _get_url(url)
|
||
if raw_data is None:
|
||
continue
|
||
|
||
raw_data = json.loads(raw_data)
|
||
ch_sql = "insert into pr_changes (repo_id, pr_num, additions, deletions, changed_files)values (%s,%s,%s,%s,%s)"
|
||
dbop.execute(ch_sql, (prj_id, pr_num, raw_data["additions"], raw_data["deletions"], raw_data["changed_files"]))
|
||
break
|
||
|
||
gh_gt = {}
|
||
gt_gh = {}
|
||
with open("../prj_par.txt","r")as fp:
|
||
for line in fp.readlines():
|
||
ps = line.split("\t")
|
||
gt_gh[int(ps[0])] = int(ps[1])
|
||
gh_gt[int(ps[1])] = int(ps[0])
|
||
|
||
def get_ci(pr):
|
||
status_url = "https://api.github.com/repos/%s/%s/commits/%s/status"
|
||
repo_id, pr_num, un, rn = pr
|
||
cmts = dbop.select_one("select raw_json from commit_raw_json where repo_id=%s and pr_num=%s",(gt_gh[repo_id], pr_num))
|
||
|
||
if cmts is None:
|
||
return
|
||
cmts = json.loads(cmts[0])
|
||
for cmt in cmts:
|
||
sha = cmt["sha"]
|
||
sha_ss = dbop.select_one("select id from pr_ci where repo_id=%s and cmt_sha=%s",(repo_id,sha))
|
||
tmp_url = status_url%(un,rn,sha)
|
||
print tmp_url
|
||
|
||
if sha_ss is not None and sha_ss[0] is not None:
|
||
print repo_id, pr_num, sha, "already"
|
||
continue
|
||
|
||
while True:
|
||
status_json, result = _get_url(tmp_url)
|
||
if status_json is not None:
|
||
break
|
||
else:
|
||
time.sleep(5)
|
||
|
||
status = json.loads(status_json)
|
||
del status["repository"]
|
||
dbop.execute("insert into pr_ci(repo_id,pr_num,cmt_sha,ci_status) values(%s,%s,%s,%s)",
|
||
(repo_id,pr_num,sha,json.dumps(status)))
|
||
|
||
def get_commit(sha,repo_id):
|
||
cursor.execute("select sha from real_cmts where sha=%s",(sha,))
|
||
result = cursor.fetchone()
|
||
if result is not None:
|
||
print "already sha"
|
||
return
|
||
cursor.execute("select user_name, repo_name from project where id=%s",(repo_id,))
|
||
un, rn = cursor.fetchone()
|
||
while True:
|
||
url = cmt_TEMPLATE%(un,rn,sha)
|
||
print url
|
||
ets, result = _get_url(url)
|
||
if ets is None:
|
||
print "返回none"
|
||
continue
|
||
cursor.execute("insert into real_cmts (sha,info) values(%s,%s)", (sha, ets))
|
||
conn.commit()
|
||
break
|
||
|
||
def fetchThread():
|
||
global PRS
|
||
print "\t%s: starts to work"%( threading.current_thread().name)
|
||
while True:
|
||
try:
|
||
pr = PRS.get(block=False)
|
||
print ("\t%s: fetch"%( threading.current_thread().name)),pr
|
||
except Exception,e:
|
||
print e, ("\t%s: no more PRS"%( threading.current_thread().name))
|
||
break
|
||
get_ci(pr)
|
||
|
||
|
||
def work():
|
||
global PRS
|
||
|
||
repo_names = dbop.select_all("select id,user_name, repo_name from project")
|
||
repo_names = {item[0]:(item[1], item[2]) for item in repo_names}
|
||
|
||
# 取出所有pr
|
||
prs = dbop.select_all("select prj_id, pr_num from `pull-request`")
|
||
for pr in prs:
|
||
prj_id, pr_num = pr
|
||
un, rn = repo_names[prj_id]
|
||
PRS.put((prj_id, pr_num, un, rn))
|
||
print "total prs: ", PRS.qsize()
|
||
# 并行采集
|
||
thread_list = []
|
||
threading_num = 27
|
||
print("threads number:%d"%(threading_num,))
|
||
for i in range(0,threading_num):
|
||
t = threading.Thread(target=fetchThread,name="Thread-%d"%i)
|
||
thread_list.append(t)
|
||
|
||
for thread in thread_list:
|
||
thread.start()
|
||
for thread in thread_list:
|
||
thread.join()
|
||
|
||
print("all threads done work")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
work() |