148 lines
3.2 KiB
Python
148 lines
3.2 KiB
Python
# coding: utf-8
|
||
# 采集repo的issue;支持更新
|
||
import MySQLdb
|
||
import json
|
||
import time
|
||
import random
|
||
import urllib2
|
||
|
||
|
||
import Queue
|
||
import threading
|
||
PRS = Queue.Queue()
|
||
|
||
|
||
import dbop
|
||
dbop.init_pool(3)
|
||
|
||
|
||
import Queue
|
||
TOKEN_POOL = Queue.Queue()
|
||
with open(".config.json") as fp:
|
||
config = json.load(fp)
|
||
tokens = config["tokens"]
|
||
for token in tokens:
|
||
TOKEN_POOL.put(token)
|
||
|
||
def get_token():
|
||
if TOKEN_POOL.empty():
|
||
return None
|
||
token = TOKEN_POOL.get()
|
||
# !!!!应该判断该token是否还有访问机会,否则再放回池子里
|
||
return token
|
||
|
||
def push_token(token):
|
||
TOKEN_POOL.put(token)
|
||
|
||
|
||
|
||
send_headers = {"Content-Type":"application/json","Authorization":None}
|
||
def _get_url(url,retry_times=3):
|
||
|
||
while True:
|
||
token = get_token()
|
||
if token is None:
|
||
continue
|
||
send_headers["Authorization"] = "token %s"%token
|
||
push_token(token)
|
||
break
|
||
|
||
print "\t%s: "%threading.current_thread().name, url,token
|
||
|
||
req = urllib2.Request(url,headers = send_headers)
|
||
try:
|
||
error_msg, result = None,None
|
||
result = urllib2.urlopen(req,timeout=20)
|
||
raw_data = result.read().decode('utf-8')
|
||
except urllib2.HTTPError, e:
|
||
error_msg = e.code
|
||
except urllib2.URLError, e:
|
||
error_msg = e.reason
|
||
except Exception,e:
|
||
error_msg = e.message
|
||
|
||
if error_msg != None:
|
||
print error_msg
|
||
if retry_times == 0:
|
||
return None, result
|
||
else:
|
||
time.sleep(3*(4-retry_times))
|
||
return _get_url(url,retry_times-1)
|
||
return raw_data,result
|
||
|
||
|
||
|
||
gh_gt = {}
|
||
gt_gh = {}
|
||
with open("../prj_par.txt","r")as fp:
|
||
for line in fp.readlines():
|
||
ps = line.split("\t")
|
||
gt_gh[int(ps[0])] = int(ps[1])
|
||
gh_gt[int(ps[1])] = int(ps[0])
|
||
|
||
def get_pr_json(pr):
|
||
status_url = "https://api.github.com/repos/%s/%s/pulls/%s"
|
||
repo_id, pr_num, un, rn = pr
|
||
|
||
# while True:
|
||
# status_json, result = _get_url(status_url%(un,rn, pr_num))
|
||
# if status_json is not None:
|
||
# break
|
||
# else:
|
||
# time.sleep(5)
|
||
|
||
|
||
|
||
status_json, result = _get_url(status_url%(un,rn, pr_num))
|
||
if status_json is None:
|
||
print "Error pr get none",repo_id, pr_num, un, rn
|
||
return
|
||
|
||
dbop.execute("update pr_raw_json set raw_json=%s where repo_id=%s and pr_num=%s",
|
||
(status_json, gt_gh[repo_id], pr_num))
|
||
|
||
|
||
def fetchThread():
|
||
global PRS
|
||
print "\t%s: starts to work"%( threading.current_thread().name)
|
||
while True:
|
||
try:
|
||
pr = PRS.get(block=False)
|
||
print ("\t%s: fetch"%( threading.current_thread().name)),pr
|
||
except Exception,e:
|
||
print e, ("\t%s: no more PRS"%( threading.current_thread().name))
|
||
break
|
||
get_pr_json(pr)
|
||
|
||
|
||
def work():
|
||
global PRS
|
||
|
||
repo_names = dbop.select_all("select id,user_name, repo_name from project")
|
||
repo_names = {item[0]:(item[1], item[2]) for item in repo_names}
|
||
|
||
# 取出所有chagne为0的pr
|
||
prs = dbop.select_all("select repo_id, pr_num from pr_changes where additions=0 and deletions=0 and changed_files=0")
|
||
for pr in prs:
|
||
prj_id, pr_num = pr
|
||
un, rn = repo_names[prj_id]
|
||
PRS.put((prj_id, pr_num, un, rn))
|
||
print "total prs: ", PRS.qsize()
|
||
# 并行采集
|
||
thread_list = []
|
||
threading_num = 20
|
||
print("threads number:%d"%(threading_num,))
|
||
for i in range(0,threading_num):
|
||
t = threading.Thread(target=fetchThread,name="Thread-%d"%i)
|
||
thread_list.append(t)
|
||
|
||
for thread in thread_list:
|
||
thread.start()
|
||
for thread in thread_list:
|
||
thread.join()
|
||
|
||
print("all threads done work")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
work() |