duppr_analysis/experiment_code/minor_revision/get_ci.py

196 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
# 采集repo的issue支持更新
import MySQLdb
import json
import time
import random
import urllib2
import Queue
import threading
PRS = Queue.Queue()
import dbop
dbop.init_pool(3)
import Queue
TOKEN_POOL = Queue.Queue()
with open(".config.json") as fp:
config = json.load(fp)
tokens = config["tokens"]
for token in tokens:
TOKEN_POOL.put(token)
def get_token():
if TOKEN_POOL.empty():
return None
token = TOKEN_POOL.get()
# !!!!应该判断该token是否还有访问机会否则再放回池子里
return token
def push_token(token):
TOKEN_POOL.put(token)
send_headers = {"Content-Type":"application/json","Authorization":None}
def _get_url(url,retry_times=3):
while True:
token = get_token()
if token is None:
continue
send_headers["Authorization"] = "token %s"%token
push_token(token)
break
print "\t%s: "%threading.current_thread().name, url,token
req = urllib2.Request(url,headers = send_headers)
try:
error_msg, result = None,None
result = urllib2.urlopen(req,timeout=20)
raw_data = result.read().decode('utf-8')
except urllib2.HTTPError, e:
error_msg = e.code
except urllib2.URLError, e:
error_msg = e.reason
except Exception,e:
error_msg = e.message
if error_msg != None:
print error_msg
if retry_times == 0:
return None, result
else:
time.sleep(3*(4-retry_times))
return _get_url(url,retry_times-1)
return raw_data,result
def _fetchPR(pr):
ISSUE_URL_TEMPLATE = "https://api.github.com/repos/%s/%s/pulls/%s"
prj_id, pr_num, un, rn = pr
result = dbop.select_one("select repo_id from pr_changes where repo_id=%s and pr_num=%s", (prj_id, pr_num))
if result is not None:
print "already get", pr
return
print "\t\t%s: start to fetch issues for "%threading.current_thread().name, prj_id, pr_num, un, rn
while True:
url = ISSUE_URL_TEMPLATE%(un,rn,pr_num)
raw_data,result = _get_url(url)
if raw_data is None:
continue
raw_data = json.loads(raw_data)
ch_sql = "insert into pr_changes (repo_id, pr_num, additions, deletions, changed_files)values (%s,%s,%s,%s,%s)"
dbop.execute(ch_sql, (prj_id, pr_num, raw_data["additions"], raw_data["deletions"], raw_data["changed_files"]))
break
gh_gt = {}
gt_gh = {}
with open("../prj_par.txt","r")as fp:
for line in fp.readlines():
ps = line.split("\t")
gt_gh[int(ps[0])] = int(ps[1])
gh_gt[int(ps[1])] = int(ps[0])
def get_ci(pr):
status_url = "https://api.github.com/repos/%s/%s/commits/%s/status"
repo_id, pr_num, un, rn = pr
cmts = dbop.select_one("select raw_json from commit_raw_json where repo_id=%s and pr_num=%s",(gt_gh[repo_id], pr_num))
if cmts is None:
return
cmts = json.loads(cmts[0])
for cmt in cmts:
sha = cmt["sha"]
sha_ss = dbop.select_one("select id from pr_ci where repo_id=%s and cmt_sha=%s",(repo_id,sha))
tmp_url = status_url%(un,rn,sha)
print tmp_url
if sha_ss is not None and sha_ss[0] is not None:
print repo_id, pr_num, sha, "already"
continue
while True:
status_json, result = _get_url(tmp_url)
if status_json is not None:
break
else:
time.sleep(5)
status = json.loads(status_json)
del status["repository"]
dbop.execute("insert into pr_ci(repo_id,pr_num,cmt_sha,ci_status) values(%s,%s,%s,%s)",
(repo_id,pr_num,sha,json.dumps(status)))
def get_commit(sha,repo_id):
cursor.execute("select sha from real_cmts where sha=%s",(sha,))
result = cursor.fetchone()
if result is not None:
print "already sha"
return
cursor.execute("select user_name, repo_name from project where id=%s",(repo_id,))
un, rn = cursor.fetchone()
while True:
url = cmt_TEMPLATE%(un,rn,sha)
print url
ets, result = _get_url(url)
if ets is None:
print "返回none"
continue
cursor.execute("insert into real_cmts (sha,info) values(%s,%s)", (sha, ets))
conn.commit()
break
def fetchThread():
global PRS
print "\t%s: starts to work"%( threading.current_thread().name)
while True:
try:
pr = PRS.get(block=False)
print ("\t%s: fetch"%( threading.current_thread().name)),pr
except Exception,e:
print e, ("\t%s: no more PRS"%( threading.current_thread().name))
break
get_ci(pr)
def work():
global PRS
repo_names = dbop.select_all("select id,user_name, repo_name from project")
repo_names = {item[0]:(item[1], item[2]) for item in repo_names}
# 取出所有pr
prs = dbop.select_all("select prj_id, pr_num from `pull-request`")
for pr in prs:
prj_id, pr_num = pr
un, rn = repo_names[prj_id]
PRS.put((prj_id, pr_num, un, rn))
print "total prs: ", PRS.qsize()
# 并行采集
thread_list = []
threading_num = 27
print("threads number:%d"%(threading_num,))
for i in range(0,threading_num):
t = threading.Thread(target=fetchThread,name="Thread-%d"%i)
thread_list.append(t)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
print("all threads done work")
if __name__ == "__main__":
work()