init code

This commit is contained in:
tsbc 2018-03-29 19:19:44 +08:00
parent 77dd1ff93d
commit baa93e94bb
17 changed files with 2806 additions and 2806 deletions

File diff suppressed because it is too large Load Diff

View File

@ -10,235 +10,235 @@ from Base import *
################################################################################
# 关键字函数必须用Aciton.add_action(keyword)装饰起进行装饰
# 函数的形参必须为4个:action_object(Aciton对象), step_desc(步骤描述信息可用于记录log),
# value(需要输入的值,多个输入值用逗号隔开), loc(元素的定位信息)
# value(需要输入的值,多个输入值用逗号隔开), loc(元素的定位信息)
# 函数正常退出时不能有返回值使用默认的返回值None出错时返回字符串记录出错信息
################################################################################
##############################################################################################
# #
# 自定义关键字 START #
# #
# #
# 自定义关键字 START #
# #
##############################################################################################
@Action.add_action('keau1000_login')
def action_login(action_object, step_desc, value, loc):
"""
构造登录使用的公用方法
"""
print value
username, password = value.split(',')
action_object.find_element(('id', 'login_txtUserName')).clear()
action_object.find_element(('id', 'login_txtUserName')).send_keys(username)
action_object.find_element(('id', 'login_txtUserPassword')).clear()
action_object.find_element(('id', 'login_txtUserPassword')).send_keys(password)
# action_object.find_element(('id', 'login_text_verifycode')).clear()
# action_object.find_element(('id', 'login_text_verifycode')).send_keys(VerifyNo)
action_object.find_element(('id', 'login_btnLogin')).click()
time.sleep(0.5)
try:
action_object.find_element(('css', 'button.btn-guidestyle.btn-g-over')).click()
except:
pass
"""
构造登录使用的公用方法
"""
print value
username, password = value.split(',')
action_object.find_element(('id', 'login_txtUserName')).clear()
action_object.find_element(('id', 'login_txtUserName')).send_keys(username)
action_object.find_element(('id', 'login_txtUserPassword')).clear()
action_object.find_element(('id', 'login_txtUserPassword')).send_keys(password)
# action_object.find_element(('id', 'login_text_verifycode')).clear()
# action_object.find_element(('id', 'login_text_verifycode')).send_keys(VerifyNo)
action_object.find_element(('id', 'login_btnLogin')).click()
time.sleep(0.5)
try:
action_object.find_element(('css', 'button.btn-guidestyle.btn-g-over')).click()
except:
pass
@Action.add_action('keau1000_reset')
def action_(action_object, step_desc, value, loc):
"""
reset platform
"""
# print loc, value
# username, password = value.split(',')
url = action_object.driver.current_url
hostname = re.search('\d+\.\d+\.\d+\.\d+', url).group(0)
action_object.find_element(('css', 'ul.nav-list li:nth-child(8) span')).click()
time.sleep(0.5)
action_object.find_element(('css', 'ul.nav-list li:nth-child(8) div li:nth-child(2)')).click()
time.sleep(0.5)
action_object.find_element(('id', 'system_system_btnResetDevice')).click()
time.sleep(30)
for i in xrange(180):
print i + 1,
action_object.driver.get("https://" + hostname)
if action_object.isElementExsit(('id', 'login_txtUserName')):
break
time.sleep(1)
action_object.find_element(('id', 'login_txtUserName')).clear()
action_object.find_element(('id', 'login_txtUserName')).send_keys('admin')
action_object.find_element(('id', 'login_txtUserPassword')).clear()
action_object.find_element(('id', 'login_txtUserPassword')).send_keys('admin@123')
# action_object.find_element(('id', 'login_text_verifycode')).clear()
# action_object.find_element(('id', 'login_text_verifycode')).send_keys(VerifyNo)
action_object.find_element(('id', 'login_btnLogin')).click()
action_object.find_element(('css', 'div.row-setting label[for="device_centralize_rabMode1"]')).click()
action_object.find_element(('id', 'device_centralize_btnMode')).click()
time.sleep(30)
for i in xrange(180):
print i + 1,
action_object.driver.get("https://" + hostname)
if action_object.isElementExsit(('id', 'login_txtUserName')):
break
time.sleep(1)
action_object.find_element(('id', 'login_txtUserName')).clear()
action_object.find_element(('id', 'login_txtUserName')).send_keys('admin')
action_object.find_element(('id', 'login_txtUserPassword')).clear()
action_object.find_element(('id', 'login_txtUserPassword')).send_keys('admin@123')
# action_object.find_element(('id', 'login_text_verifycode')).clear()
# action_object.find_element(('id', 'login_text_verifycode')).send_keys(VerifyNo)
action_object.find_element(('id', 'login_btnLogin')).click()
time.sleep(1)
# action_object.find_element(('css', 'button.btn-guidestyle.btn-g-over')).click()
if action_object.isElementExsit(('id', 'spanSystemTime')):
print "reset succesful."
else:
return "reset failed!"
"""
reset platform
"""
# print loc, value
# username, password = value.split(',')
url = action_object.driver.current_url
hostname = re.search('\d+\.\d+\.\d+\.\d+', url).group(0)
action_object.find_element(('css', 'ul.nav-list li:nth-child(8) span')).click()
time.sleep(0.5)
action_object.find_element(('css', 'ul.nav-list li:nth-child(8) div li:nth-child(2)')).click()
time.sleep(0.5)
action_object.find_element(('id', 'system_system_btnResetDevice')).click()
time.sleep(30)
for i in xrange(180):
print i + 1,
action_object.driver.get("https://" + hostname)
if action_object.isElementExsit(('id', 'login_txtUserName')):
break
time.sleep(1)
action_object.find_element(('id', 'login_txtUserName')).clear()
action_object.find_element(('id', 'login_txtUserName')).send_keys('admin')
action_object.find_element(('id', 'login_txtUserPassword')).clear()
action_object.find_element(('id', 'login_txtUserPassword')).send_keys('admin@123')
# action_object.find_element(('id', 'login_text_verifycode')).clear()
# action_object.find_element(('id', 'login_text_verifycode')).send_keys(VerifyNo)
action_object.find_element(('id', 'login_btnLogin')).click()
action_object.find_element(('css', 'div.row-setting label[for="device_centralize_rabMode1"]')).click()
action_object.find_element(('id', 'device_centralize_btnMode')).click()
time.sleep(30)
for i in xrange(180):
print i + 1,
action_object.driver.get("https://" + hostname)
if action_object.isElementExsit(('id', 'login_txtUserName')):
break
time.sleep(1)
action_object.find_element(('id', 'login_txtUserName')).clear()
action_object.find_element(('id', 'login_txtUserName')).send_keys('admin')
action_object.find_element(('id', 'login_txtUserPassword')).clear()
action_object.find_element(('id', 'login_txtUserPassword')).send_keys('admin@123')
# action_object.find_element(('id', 'login_text_verifycode')).clear()
# action_object.find_element(('id', 'login_text_verifycode')).send_keys(VerifyNo)
action_object.find_element(('id', 'login_btnLogin')).click()
time.sleep(1)
# action_object.find_element(('css', 'button.btn-guidestyle.btn-g-over')).click()
if action_object.isElementExsit(('id', 'spanSystemTime')):
print "reset succesful."
else:
return "reset failed!"
@Action.add_action('btn_on_off')
def action_btn_on_off(action_object, step_desc, value, loc):
"""
执行javaScript
:param action_object:
:param step_desc:
:param value: jquery loc , true | flase
:param loc: javascript
:return:
"""
print value
loc, btn_value = value.split(',')
js = 'return jQuery("' + loc + '").is(":checked")'
try:
rejs = action_object.driver.execute_script(js)
print rejs
if str(rejs).lower() == btn_value.lower():
print u"开关状态验证通过."
else:
return u"开关状态验证失败."
except Exception, e:
return str(e)
"""
执行javaScript
:param action_object:
:param step_desc:
:param value: jquery loc , true | flase
:param loc: javascript
:return:
"""
print value
loc, btn_value = value.split(',')
js = 'return jQuery("' + loc + '").is(":checked")'
try:
rejs = action_object.driver.execute_script(js)
print rejs
if str(rejs).lower() == btn_value.lower():
print u"开关状态验证通过."
else:
return u"开关状态验证失败."
except Exception, e:
return str(e)
def execute_in_start(hostname,port,username,password,start_pw,cmds):
from time import sleep
comands = ['start', start_pw]
comands.append(isinstance(cmds, (list, tuple)) and ';'.join(cmds) or cmds)
comands = ['%s\n' % comand for comand in comands]
from time import sleep
comands = ['start', start_pw]
comands.append(isinstance(cmds, (list, tuple)) and ';'.join(cmds) or cmds)
comands = ['%s\n' % comand for comand in comands]
try:
myssh = paramiko.SSHClient()
myssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
myssh.load_system_host_keys()
myssh.connect(hostname, int(port), username, password, timeout=300)
except paramiko.SSHException, err:
print u'无法连接到 "%s": %s' % (hostname, err)
return False
try:
myssh = paramiko.SSHClient()
myssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
myssh.load_system_host_keys()
myssh.connect(hostname, int(port), username, password, timeout=300)
except paramiko.SSHException, err:
print u'无法连接到 "%s": %s' % (hostname, err)
return False
chan = myssh.invoke_shell()
for comand in comands:
chan.send(comand)
sleep(1)
out = chan.recv(65535)
while not out.endswith('# ') and not out.endswith(': ') and not out.endswith('$ '):
sleep(0.5)
out += chan.recv(65535)
chan = myssh.invoke_shell()
for comand in comands:
chan.send(comand)
sleep(1)
out = chan.recv(65535)
while not out.endswith('# ') and not out.endswith(': ') and not out.endswith('$ '):
sleep(0.5)
out += chan.recv(65535)
myssh.close()
return '\n'.join(out.split(os.linesep)[1:-1])
myssh.close()
return '\n'.join(out.split(os.linesep)[1:-1])
@Action.add_action('verify_diskrate')
def action_verify_diskrate(action_object, step_desc, value, loc):
u"""获取设备磁盘信息、计算设备磁盘利用率、获取页面磁盘利用率、比较前后是否一致
:param value:[hostname,port,username,password,start_pw,id,attr]
"""
print value
hostname,port,username,password,start_pw,id,attr=value.split(',')
print u'开始计算系统的磁盘占用率'
disk_used = 0
disk_avail = 0
df_info = execute_in_start(hostname,port,username,password,start_pw,'df -m')
u"""获取设备磁盘信息、计算设备磁盘利用率、获取页面磁盘利用率、比较前后是否一致
:param value:[hostname,port,username,password,start_pw,id,attr]
"""
print value
hostname,port,username,password,start_pw,id,attr=value.split(',')
print u'开始计算系统的磁盘占用率'
disk_used = 0
disk_avail = 0
df_info = execute_in_start(hostname,port,username,password,start_pw,'df -m')
# 获取所有分区的使用情况和大小
disk_info_list = re.findall(re.compile(r"/dev/.+\s+(\d+)\s+(\d+)\s+(\d+)\s+"), df_info)
for disk_info in disk_info_list:
disk_used += int(disk_info[1])
disk_avail += int(disk_info[2])
# 获取所有分区的使用情况和大小
disk_info_list = re.findall(re.compile(r"/dev/.+\s+(\d+)\s+(\d+)\s+(\d+)\s+"), df_info)
for disk_info in disk_info_list:
disk_used += int(disk_info[1])
disk_avail += int(disk_info[2])
# 计算出硬盘使用率
diskrate_sys = (disk_used * 100) // disk_avail + 1
print u'系统硬盘占用率为%d' % diskrate_ys
# 计算出硬盘使用率
diskrate_sys = (disk_used * 100) // disk_avail + 1
print u'系统硬盘占用率为%d' % diskrate_ys
js = "return $('#%s').attr('%s')" % (id, attr)
try:
value = action_object.driver.execute_script(js)
print u'%s元素的属性值是:%s' % (attr, value)
if (diskrate_sys - 5 >= int(value)) or (diskrate_sys + 5 <= int(value)):
return False
except Exception, err:
return str(err)
js = "return $('#%s').attr('%s')" % (id, attr)
try:
value = action_object.driver.execute_script(js)
print u'%s元素的属性值是:%s' % (attr, value)
if (diskrate_sys - 5 >= int(value)) or (diskrate_sys + 5 <= int(value)):
return False
except Exception, err:
return str(err)
return None
return None
@Action.add_action('verify_diskrate_increment')
def action_verify_diskrate_increment(action_object, step_desc, value, loc):
u"""获取页面磁盘利用率、向磁盘写入文件、再次获取页面磁盘利用率、检查增量是否准确、获取设备磁盘信息、计算设备磁盘利用率、比较前后是否一致
:param value:[hostname,port,username,password,start_pw,id,attr,cmds,increment]
"""
from time import sleep
print value
hostname,port,username,password,start_pw,id,attr,cmds,increment=value.split(',')
u"""获取页面磁盘利用率、向磁盘写入文件、再次获取页面磁盘利用率、检查增量是否准确、获取设备磁盘信息、计算设备磁盘利用率、比较前后是否一致
:param value:[hostname,port,username,password,start_pw,id,attr,cmds,increment]
"""
from time import sleep
print value
hostname,port,username,password,start_pw,id,attr,cmds,increment=value.split(',')
# 页面磁盘利用率
js = "return $('#%s').attr('%s')" % (id, attr)
try:
page_rate_before = int(action_object.driver.execute_script(js))
print u'页面磁盘利用率: %d' % page_rate_before
except Exception, err:
return str(err)
# 页面磁盘利用率
js = "return $('#%s').attr('%s')" % (id, attr)
try:
page_rate_before = int(action_object.driver.execute_script(js))
print u'页面磁盘利用率: %d' % page_rate_before
except Exception, err:
return str(err)
try:
# 向磁盘写入文件
print u'执行 dd 命令向磁盘写入文件'
execute_in_start(hostname,port,username,password,start_pw,cmds)
try:
# 向磁盘写入文件
print u'执行 dd 命令向磁盘写入文件'
execute_in_start(hostname,port,username,password,start_pw,cmds)
# 写入文件后页面磁盘利用率
action_object.driver.refresh()
sleep(2)
js = "return $('#%s').attr('%s')" % (id, attr)
try:
page_rate_after = int(action_object.driver.execute_script(js))
print u'写入文件后页面磁盘利用率: %d' % page_rate_after
except Exception, err:
return str(err)
# 写入文件后页面磁盘利用率
action_object.driver.refresh()
sleep(2)
js = "return $('#%s').attr('%s')" % (id, attr)
try:
page_rate_after = int(action_object.driver.execute_script(js))
print u'写入文件后页面磁盘利用率: %d' % page_rate_after
except Exception, err:
return str(err)
# 检查增量是否准确
if page_rate_before + int(increment) != page_rate_after:
print u'向磁盘写入文件后磁盘利用率的增量不准确'
return False
# 检查增量是否准确
if page_rate_before + int(increment) != page_rate_after:
print u'向磁盘写入文件后磁盘利用率的增量不准确'
return False
# 计算系统的磁盘占用率
disk_used = 0
disk_avail = 0
df_info = execute_in_start(hostname,port,username,password,start_pw,'df -m')
disk_info_list = re.findall(re.compile(r"/dev/.+\s+(\d+)\s+(\d+)\s+(\d+)\s+"), df_info)
for disk_info in disk_info_list:
disk_used += int(disk_info[1])
disk_avail += int(disk_info[2])
# 计算系统的磁盘占用率
disk_used = 0
disk_avail = 0
df_info = execute_in_start(hostname,port,username,password,start_pw,'df -m')
disk_info_list = re.findall(re.compile(r"/dev/.+\s+(\d+)\s+(\d+)\s+(\d+)\s+"), df_info)
for disk_info in disk_info_list:
disk_used += int(disk_info[1])
disk_avail += int(disk_info[2])
diskrate_sys = (disk_used * 100) // disk_avail + 1
print u'系统硬盘占用率: %d' % diskrate_sys
diskrate_sys = (disk_used * 100) // disk_avail + 1
print u'系统硬盘占用率: %d' % diskrate_sys
# 比较前后是否一致
if (diskrate_sys - 5 >= page_rate_after) or (diskrate_sys + 5 <= page_rate_after):
print u'磁盘利用率前后不一致'
return False
# 比较前后是否一致
if (diskrate_sys - 5 >= page_rate_after) or (diskrate_sys + 5 <= page_rate_after):
print u'磁盘利用率前后不一致'
return False
finally:
# 删除文件
print u'删除写入的磁盘文件'
execute_in_start(hostname,port,username,password,start_pw,'rm %s' % cmds.split('of=')[1])
return None
finally:
# 删除文件
print u'删除写入的磁盘文件'
execute_in_start(hostname,port,username,password,start_pw,'rm %s' % cmds.split('of=')[1])
# #
# 自定义关键字 END #
return None
# #
# 自定义关键字 END #
##############################################################################################

View File

@ -36,16 +36,16 @@ Exp:
```python
@Action.add_action('InputText')
def action_InputText(action_object, step_desc, value, loc):
"""
文本框输入内容
:param action_object:
:param step_desc:
:param value: text
:param loc:
:return:
"""
print loc, value
action_object.send_keys(loc, value)
"""
文本框输入内容
:param action_object:
:param step_desc:
:param value: text
:param loc:
:return:
"""
print loc, value
action_object.send_keys(loc, value)
```
**AddCase.py 同步用例到TestRail到脚本**

View File

@ -87,7 +87,7 @@ def generateDpiLog(simulatorIP, testServerIP, dpisn, type):
if '__main__' == __name__:
#sendFlowData('127.0.0.1:4000', '192.168.1.135', 'all')
for i in xrange(1):
#sendEventAndIncident('192.168.116.3:4000', '172.18.51.111', 'ZB0202C400000092', 'incident')
sendTrafoFlowData('192.168.110.77:4000', '192.168.110.77', 'dnp3', timeoutSeconds=None)
# sendFlowData('192.168.116.3:4000', '192.168.110.114', 'modbus')
for i in xrange(1):
#sendEventAndIncident('192.168.116.3:4000', '172.18.51.111', 'ZB0202C400000092', 'incident')
sendTrafoFlowData('192.168.110.77:4000', '192.168.110.77', 'dnp3', timeoutSeconds=None)
# sendFlowData('192.168.116.3:4000', '192.168.110.114', 'modbus')

View File

@ -21,426 +21,426 @@ from Base import Action
class TestSuite(unittest.TestCase):
def tearDown(self):
'''
每个测试用例执行后的收尾函数
:return:
'''
Controller.case_id_list.append(self.case_id)
Controller.flag_list.append([self.testrail_case_id, self.result_flag])
def tearDown(self):
'''
每个测试用例执行后的收尾函数
:return:
'''
Controller.case_id_list.append(self.case_id)
Controller.flag_list.append([self.testrail_case_id, self.result_flag])
def action_test(self, step_list):
'''
生成详细的测试用例
:param step_list:
:return: None
'''
self.case_id, self.testrail_case_id, case_desc = step_list[0][0:3]
result = None
print "[%s]" % case_desc
text = u'[%d:' % self.case_id + case_desc + u']'
try:
for idx, step_info in enumerate(step_list):
stepid = idx
print u'step%d' % (stepid + 1),
if stepid >= 0:
textstr = text + u'step%d:' % (stepid + 1) + step_info[3:][1]
try:
Controller.action.show_note(textstr)
except:
pass
result = Controller.action_test(*step_info[3:])
if result is not None:
result = u'step%d: %s' % (idx + 1, result)
break
except Exception, e:
self.result_flag = 2
text_result = u'【case_%d ERROR】 step%d: %s' % (self.case_id, idx + 1, e)
print text_result
Controller.action.save_runing_log(text_result)
if Controller.video is not None:
subprocess.Popen("pkill -2 recordmydesktop", shell=True)
else:
Controller.action.saveScreenshot(str(self.case_id))
raise Exception(u'ERROR')
else:
if result is None:
self.result_flag = 1
text_result = u'【case_%d PASS】' % self.case_id
print text_result
Controller.action.save_runing_log(text_result)
if Controller.video is not None:
subprocess.Popen("pkill -2 recordmydesktop", shell=True)
else:
self.result_flag = 5
text_result = u'【case_%d FAIL】 %s' % (self.case_id, result)
print text_result
Controller.action.save_runing_log(text_result)
if Controller.video is not None:
subprocess.Popen("pkill -2 recordmydesktop", shell=True)
else:
Controller.action.saveScreenshot(str(self.case_id))
self.assertTrue(False, msg=u'FAIL')
def action_test(self, step_list):
'''
生成详细的测试用例
:param step_list:
:return: None
'''
self.case_id, self.testrail_case_id, case_desc = step_list[0][0:3]
result = None
print "[%s]" % case_desc
text = u'[%d:' % self.case_id + case_desc + u']'
try:
for idx, step_info in enumerate(step_list):
stepid = idx
print u'step%d' % (stepid + 1),
if stepid >= 0:
textstr = text + u'step%d:' % (stepid + 1) + step_info[3:][1]
try:
Controller.action.show_note(textstr)
except:
pass
result = Controller.action_test(*step_info[3:])
if result is not None:
result = u'step%d: %s' % (idx + 1, result)
break
except Exception, e:
self.result_flag = 2
text_result = u'【case_%d ERROR】 step%d: %s' % (self.case_id, idx + 1, e)
print text_result
Controller.action.save_runing_log(text_result)
if Controller.video is not None:
subprocess.Popen("pkill -2 recordmydesktop", shell=True)
else:
Controller.action.saveScreenshot(str(self.case_id))
raise Exception(u'ERROR')
else:
if result is None:
self.result_flag = 1
text_result = u'【case_%d PASS】' % self.case_id
print text_result
Controller.action.save_runing_log(text_result)
if Controller.video is not None:
subprocess.Popen("pkill -2 recordmydesktop", shell=True)
else:
self.result_flag = 5
text_result = u'【case_%d FAIL】 %s' % (self.case_id, result)
print text_result
Controller.action.save_runing_log(text_result)
if Controller.video is not None:
subprocess.Popen("pkill -2 recordmydesktop", shell=True)
else:
Controller.action.saveScreenshot(str(self.case_id))
self.assertTrue(False, msg=u'FAIL')
@staticmethod
def generateTest(step_list):
def func(self):
if Controller.video is not None:
time.sleep(5)
image = Controller.action.saveVideoName(step_list[0][0])
video = subprocess.Popen("recordmydesktop --no-sound -o %s > video.log" % image + ".ogv", shell=True)
self.action_test(step_list)
subprocess.Popen("kill -2 %s" % str(video.pid + 1), shell=True)
else:
self.action_test(step_list)
@staticmethod
def generateTest(step_list):
def func(self):
if Controller.video is not None:
time.sleep(5)
image = Controller.action.saveVideoName(step_list[0][0])
video = subprocess.Popen("recordmydesktop --no-sound -o %s > video.log" % image + ".ogv", shell=True)
self.action_test(step_list)
subprocess.Popen("kill -2 %s" % str(video.pid + 1), shell=True)
else:
self.action_test(step_list)
return func
return func
class Controller(object):
action = None
client = None
user = ''
password = ''
run_id = None
result_list = []
case_id_list = []
flag_list = []
expand_paras_dict = None
conn = None
cur = None
args = None
start = None
# [all, pass, fail, error, result_path]
tag_list = [0, 0, 0, 0, '']
task_type = None
task_name = None
video = None
action = None
client = None
user = ''
password = ''
run_id = None
result_list = []
case_id_list = []
flag_list = []
expand_paras_dict = None
conn = None
cur = None
args = None
start = None
# [all, pass, fail, error, result_path]
tag_list = [0, 0, 0, 0, '']
task_type = None
task_name = None
video = None
@classmethod
def update_task_history(cls):
if Controller.args.user_id is not None:
user_id = Controller.args.user_id
Controller.my_execute(ur'''SELECT id FROM autoplat_user WHERE username = '%s' ''' % Controller.args.user_id)
task_info = Controller.cur.fetchall()
if task_info:
user_id = task_info[0][0]
exectime = datetime.datetime.now() - Controller.start
sql_str = ur"""INSERT INTO autoplat_taskhistory (tasktype, taskname, case_tag_all, case_tag_pass,
case_tag_fail, case_tag_error, starttime, exectime, taskid_id, userid_id, reporturl, build_name, build_number)
VALUES (%s, '%s', %s, %s, %s, %s, '%s', '%s', %s, %s, '%s', '', '')""" % \
(Controller.task_type, Controller.task_name, Controller.tag_list[0], Controller.tag_list[1],
Controller.tag_list[2], Controller.tag_list[3], Controller.start,exectime,
Controller.args.task_id, user_id, Controller.tag_list[4])
Controller.my_execute(sql_str)
Controller.conn.commit()
# 通过钉钉接口,使用钉钉机器人把测试结果发送对应群中
if Controller.args.popding is not None:
url = "https://oapi.dingtalk.com/robot/send?access_token=8d36288c964c024ca2e5e53a45faddde0adcb2c6c638e9a59a16096f5d866715"
#url = "https://oapi.dingtalk.com/robot/send?access_token=e0c23e2f9242783f0ad34ccf21197a41f663c9b63ddc87fc386722654914c7ee"
count_num, pass_num, fail_num, error_num = Controller.tag_list[0], Controller.tag_list[1], Controller.tag_list[
2], Controller.tag_list[3]
reporturl = Controller.tag_list[4]
data_markdown = json.dumps({
"msgtype": "markdown",
"markdown": {
"title": "AutoMagic TestReprot",
"text": "![screenshot](http://192.168.110.65/static/images/automagic.png) \n" +
">[AutoMagic TestReprot](http://jenkinsm.acorn-net.com/TestResults/CornerStone/" + reporturl + ") \n\n " +
">总共 " + str(count_num) + " CasePass " + str(pass_num) + "Fail " + str(fail_num) + "Error " + str(error_num) + "\n"+
"运行时间: "+str(exectime)[:-7]
}
})
popautomagic.sendAutoMagic(url, data_markdown)
@classmethod
def update_task_history(cls):
if Controller.args.user_id is not None:
user_id = Controller.args.user_id
Controller.my_execute(ur'''SELECT id FROM autoplat_user WHERE username = '%s' ''' % Controller.args.user_id)
task_info = Controller.cur.fetchall()
if task_info:
user_id = task_info[0][0]
exectime = datetime.datetime.now() - Controller.start
sql_str = ur"""INSERT INTO autoplat_taskhistory (tasktype, taskname, case_tag_all, case_tag_pass,
case_tag_fail, case_tag_error, starttime, exectime, taskid_id, userid_id, reporturl, build_name, build_number)
VALUES (%s, '%s', %s, %s, %s, %s, '%s', '%s', %s, %s, '%s', '', '')""" % \
(Controller.task_type, Controller.task_name, Controller.tag_list[0], Controller.tag_list[1],
Controller.tag_list[2], Controller.tag_list[3], Controller.start,exectime,
Controller.args.task_id, user_id, Controller.tag_list[4])
Controller.my_execute(sql_str)
Controller.conn.commit()
# 通过钉钉接口,使用钉钉机器人把测试结果发送对应群中
if Controller.args.popding is not None:
url = "https://oapi.dingtalk.com/robot/send?access_token=8d36288c964c024ca2e5e53a45faddde0adcb2c6c638e9a59a16096f5d866715"
#url = "https://oapi.dingtalk.com/robot/send?access_token=e0c23e2f9242783f0ad34ccf21197a41f663c9b63ddc87fc386722654914c7ee"
count_num, pass_num, fail_num, error_num = Controller.tag_list[0], Controller.tag_list[1], Controller.tag_list[
2], Controller.tag_list[3]
reporturl = Controller.tag_list[4]
data_markdown = json.dumps({
"msgtype": "markdown",
"markdown": {
"title": "AutoMagic TestReprot",
"text": "![screenshot](http://192.168.110.65/static/images/automagic.png) \n" +
">[AutoMagic TestReprot](http://jenkinsm.acorn-net.com/TestResults/CornerStone/" + reporturl + ") \n\n " +
">总共 " + str(count_num) + " CasePass " + str(pass_num) + "Fail " + str(fail_num) + "Error " + str(error_num) + "\n"+
"运行时间: "+str(exectime)[:-7]
}
})
popautomagic.sendAutoMagic(url, data_markdown)
@classmethod
def get_client(cls):
'''
获取testrail接口对象
:return: 返回testrail接口对象
'''
if cls.client is None:
cls.client = testrail.APIClient('http://172.17.3.70/testrail/')
cls.client.user = cls.user
cls.client.password = cls.password
@classmethod
def get_client(cls):
'''
获取testrail接口对象
:return: 返回testrail接口对象
'''
if cls.client is None:
cls.client = testrail.APIClient('http://172.17.3.70/testrail/')
cls.client.user = cls.user
cls.client.password = cls.password
return cls.client
return cls.client
@classmethod
def init(cls):
cls.args = get_args()
cls.action = Action()
cls.set_conn()
cls.start = datetime.datetime.now()
browser = Controller.args.browser
Controller.video = Controller.args.video
if browser is not None:
Controller.action_test('openBrowser', '', browser, None, None)
else:
Controller.action_test('openBrowser', '', 'chrome', None, None)
@classmethod
def init(cls):
cls.args = get_args()
cls.action = Action()
cls.set_conn()
cls.start = datetime.datetime.now()
browser = Controller.args.browser
Controller.video = Controller.args.video
if browser is not None:
Controller.action_test('openBrowser', '', browser, None, None)
else:
Controller.action_test('openBrowser', '', 'chrome', None, None)
@classmethod
def set_conn(cls):
cls.conn = MySQLdb.connect(host='172.21.129.32', port=3306, user='automagic', passwd='admin@123', db='autoplat',
charset='utf8')
cls.cur = cls.conn.cursor()
@classmethod
def set_conn(cls):
cls.conn = MySQLdb.connect(host='172.21.129.32', port=3306, user='automagic', passwd='admin@123', db='autoplat',
charset='utf8')
cls.cur = cls.conn.cursor()
@classmethod
def my_execute(cls, sql):
try:
cls.conn.ping()
except:
cls.set_conn()
@classmethod
def my_execute(cls, sql):
try:
cls.conn.ping()
except:
cls.set_conn()
cls.cur.execute(sql)
cls.cur.execute(sql)
@classmethod
def action_test(cls, *args):
action_keyword = cls.action.keyword2action.get(args[0], None)
if action_keyword is not None:
# action_object, step_desc, value, loc
if cls.expand_paras_dict is None:
# 以非任务方式执行cass
return action_keyword(cls.action, args[1], args[2], (args[3], args[4]))
else:
# 以任务方式执行cass需要进行扩展参数的替换
return action_keyword(cls.action, args[1], cls.expand_paras(args[2]), (args[3], args[4]))
@classmethod
def action_test(cls, *args):
action_keyword = cls.action.keyword2action.get(args[0], None)
if action_keyword is not None:
# action_object, step_desc, value, loc
if cls.expand_paras_dict is None:
# 以非任务方式执行cass
return action_keyword(cls.action, args[1], args[2], (args[3], args[4]))
else:
# 以任务方式执行cass需要进行扩展参数的替换
return action_keyword(cls.action, args[1], cls.expand_paras(args[2]), (args[3], args[4]))
else:
return u"关键字处理函数未定义!"
else:
return u"关键字处理函数未定义!"
@classmethod
def set_expand_paras_dict(cls, task_id):
Controller.my_execute(u'''SELECT codename, codevalue FROM autoplat_codelist WHERE taskid_id = %s''' % task_id)
expand_paras = Controller.cur.fetchall()
cls.expand_paras_dict = {}
for code_name, code_value in expand_paras:
cls.expand_paras_dict[u'{%s}' % code_name] = code_value
@classmethod
def set_expand_paras_dict(cls, task_id):
Controller.my_execute(u'''SELECT codename, codevalue FROM autoplat_codelist WHERE taskid_id = %s''' % task_id)
expand_paras = Controller.cur.fetchall()
cls.expand_paras_dict = {}
for code_name, code_value in expand_paras:
cls.expand_paras_dict[u'{%s}' % code_name] = code_value
@classmethod
def expand_paras(cls, input_text):
for expand_para in cls.expand_paras_dict:
input_text = input_text.replace(expand_para, cls.expand_paras_dict[expand_para])
return input_text
@classmethod
def expand_paras(cls, input_text):
for expand_para in cls.expand_paras_dict:
input_text = input_text.replace(expand_para, cls.expand_paras_dict[expand_para])
return input_text
@classmethod
def update_result(cls):
'''
将测试用例结果更新到数据库
:return: None
'''
case_id_str = ur'(%s)' % ur','.join(unicode(case_id) for case_id in Controller.case_id_list)
when_list = []
for idx in xrange(len(Controller.result_list)):
when_list.append(ur'''WHEN %s THEN "%s"''' % (Controller.case_id_list[idx],
Controller.result_list[idx].replace('"', "'") + (
u'\n运行时间:%s' % time.ctime(time.time()))))
sql_str = ur'''UPDATE autoplat_case SET debuginfo = CASE id %s END WHERE id IN %s''' % (
u' '.join(when_list), case_id_str)
Controller.my_execute(sql_str)
Controller.conn.commit()
@classmethod
def update_result(cls):
'''
将测试用例结果更新到数据库
:return: None
'''
case_id_str = ur'(%s)' % ur','.join(unicode(case_id) for case_id in Controller.case_id_list)
when_list = []
for idx in xrange(len(Controller.result_list)):
when_list.append(ur'''WHEN %s THEN "%s"''' % (Controller.case_id_list[idx],
Controller.result_list[idx].replace('"', "'") + (
u'\n运行时间:%s' % time.ctime(time.time()))))
sql_str = ur'''UPDATE autoplat_case SET debuginfo = CASE id %s END WHERE id IN %s''' % (
u' '.join(when_list), case_id_str)
Controller.my_execute(sql_str)
Controller.conn.commit()
@classmethod
def update_testrail(cls):
print u"testrail update start..."
try:
client = Controller.get_client()
for flag_info, result in zip(cls.flag_list, cls.result_list):
if cls.run_id and cls.user and cls.password and flag_info[0]:
para_str = u'add_result_for_case/%s/%s' % (cls.run_id, flag_info[0])
try:
client.send_post(para_str, {'status_id': flag_info[1], 'comment': result})
except Exception, E:
print E
except Exception, e:
print e
print u"testrail update end..."
@classmethod
def update_testrail(cls):
print u"testrail update start..."
try:
client = Controller.get_client()
for flag_info, result in zip(cls.flag_list, cls.result_list):
if cls.run_id and cls.user and cls.password and flag_info[0]:
para_str = u'add_result_for_case/%s/%s' % (cls.run_id, flag_info[0])
try:
client.send_post(para_str, {'status_id': flag_info[1], 'comment': result})
except Exception, E:
print E
except Exception, e:
print e
print u"testrail update end..."
def get_args():
'''
解析命令行参数
:return: 命令行参数命名空间
'''
parser = argparse.ArgumentParser()
parser.add_argument('-r', action='store', dest='run_id', type=str, help='Run ID')
parser.add_argument('-p', action='store', dest='project_id', type=str, help='Project ID')
parser.add_argument('-c', action='store', dest='case_id', type=str, help='Case ID')
parser.add_argument('-u', action='store', dest='user_id', type=str, help='User ID')
parser.add_argument('-t', action='store', dest='task_id', type=str, help='Task ID')
parser.add_argument('-b', action='store', dest='browser', type=str, help='Browser')
parser.add_argument('-v', action='store', dest='video', type=str, help='Video True or False')
parser.add_argument('-d', action='store', dest='popding', type=str, help='ding ding poping')
rst = parser.parse_args()
return rst
'''
解析命令行参数
:return: 命令行参数命名空间
'''
parser = argparse.ArgumentParser()
parser.add_argument('-r', action='store', dest='run_id', type=str, help='Run ID')
parser.add_argument('-p', action='store', dest='project_id', type=str, help='Project ID')
parser.add_argument('-c', action='store', dest='case_id', type=str, help='Case ID')
parser.add_argument('-u', action='store', dest='user_id', type=str, help='User ID')
parser.add_argument('-t', action='store', dest='task_id', type=str, help='Task ID')
parser.add_argument('-b', action='store', dest='browser', type=str, help='Browser')
parser.add_argument('-v', action='store', dest='video', type=str, help='Video True or False')
parser.add_argument('-d', action='store', dest='popding', type=str, help='ding ding poping')
rst = parser.parse_args()
return rst
def gen_test_cass(suite):
'''
生成测试用例
:param suite: 测试用例集对象
:return:
'''
'''
生成测试用例
:param suite: 测试用例集对象
:return:
'''
project_id = Controller.args.project_id
task_id = Controller.args.task_id
project_id = Controller.args.project_id
task_id = Controller.args.task_id
pid_info = u''
cid_info = u''
pid_info = u''
cid_info = u''
if project_id is not None:
pid_info = u'AND tb4.projectid_id = %s' % project_id
case_id = None
elif task_id is not None:
# 查询、初始化任务相关的扩展参数
Controller.set_expand_paras_dict(task_id)
if project_id is not None:
pid_info = u'AND tb4.projectid_id = %s' % project_id
case_id = None
elif task_id is not None:
# 查询、初始化任务相关的扩展参数
Controller.set_expand_paras_dict(task_id)
Controller.run_id = Controller.args.run_id
user_id = Controller.args.user_id
if user_id is not None:
Controller.my_execute(
u"""SELECT testrailuser, testrailpass FROM autoplat_user WHERE username = '%s'""" % user_id)
user_info = Controller.cur.fetchall()
if user_info:
Controller.user, Controller.password = user_info[0]
Controller.run_id = Controller.args.run_id
user_id = Controller.args.user_id
if user_id is not None:
Controller.my_execute(
u"""SELECT testrailuser, testrailpass FROM autoplat_user WHERE username = '%s'""" % user_id)
user_info = Controller.cur.fetchall()
if user_info:
Controller.user, Controller.password = user_info[0]
Controller.my_execute(u'''SELECT caselist FROM autoplat_task WHERE id = %s''' % task_id)
caseid = Controller.cur.fetchall()
if caseid:
case_id = caseid[0][0]
else:
case_id = '0'
else:
case_id = Controller.args.case_id
Controller.my_execute(u'''SELECT caselist FROM autoplat_task WHERE id = %s''' % task_id)
caseid = Controller.cur.fetchall()
if caseid:
case_id = caseid[0][0]
else:
case_id = '0'
else:
case_id = Controller.args.case_id
if case_id is not None:
casedict = eval(case_id)
if type(casedict) is int:
case_id = case_id
else:
case_id = ''
caselist = {}
for i in casedict:
caselist[int(i)] = casedict[i]
casedict = sorted(caselist.iteritems())
for i in casedict:
case_id = case_id + ',' + i[1]
case_id = case_id[1:]
cid_info = u'AND tb4.id IN (%s)' % case_id
if case_id is not None:
casedict = eval(case_id)
if type(casedict) is int:
case_id = case_id
else:
case_id = ''
caselist = {}
for i in casedict:
caselist[int(i)] = casedict[i]
casedict = sorted(caselist.iteritems())
for i in casedict:
case_id = case_id + ',' + i[1]
case_id = case_id[1:]
cid_info = u'AND tb4.id IN (%s)' % case_id
Controller.my_execute(u'''SELECT tb4.id, tb4.testrailcaseid, tb4.casedesc, tb6.keyword, tb5.descr, tb5.inputtext, tb7.locmode, tb7.location
FROM autoplat_product AS tb1
RIGHT JOIN autoplat_project tb2 ON tb2.productid_id = tb1.id
RIGHT JOIN autoplat_module tb3 ON tb3.projectid_id = tb2.id
RIGHT JOIN autoplat_case AS tb4 ON tb4.moduleid_id = tb3.id
RIGHT JOIN autoplat_step AS tb5 ON tb5.caseid_id = tb4.id
LEFT JOIN autoplat_keyword AS tb6 ON tb6.id = tb5.keywordid_id
LEFT JOIN autoplat_element AS tb7 ON tb7.id = tb5.elementid_id
WHERE tb1.isenabled = 1 AND tb2.isenabled = 1 AND tb3.isenabled AND tb4.isenabled = 1
%s %s
ORDER BY tb3.sortby DESC , tb4.id ASC, tb5.id ASC''' % (pid_info, cid_info))
case_list = Controller.cur.fetchall()
# 输出Case步骤
# for x in case_list:
# x = str(x).replace('u\'','\'')
# print x.decode("unicode-escape")
case_flag = None
step_flag = 0
test_attr = [int(idx.strip()) for idx in case_id.split(',')] if case_id not in ['', None, 'None'] else []
Controller.my_execute(u'''SELECT tb4.id, tb4.testrailcaseid, tb4.casedesc, tb6.keyword, tb5.descr, tb5.inputtext, tb7.locmode, tb7.location
FROM autoplat_product AS tb1
RIGHT JOIN autoplat_project tb2 ON tb2.productid_id = tb1.id
RIGHT JOIN autoplat_module tb3 ON tb3.projectid_id = tb2.id
RIGHT JOIN autoplat_case AS tb4 ON tb4.moduleid_id = tb3.id
RIGHT JOIN autoplat_step AS tb5 ON tb5.caseid_id = tb4.id
LEFT JOIN autoplat_keyword AS tb6 ON tb6.id = tb5.keywordid_id
LEFT JOIN autoplat_element AS tb7 ON tb7.id = tb5.elementid_id
WHERE tb1.isenabled = 1 AND tb2.isenabled = 1 AND tb3.isenabled AND tb4.isenabled = 1
%s %s
ORDER BY tb3.sortby DESC , tb4.id ASC, tb5.id ASC''' % (pid_info, cid_info))
case_list = Controller.cur.fetchall()
# 输出Case步骤
# for x in case_list:
# x = str(x).replace('u\'','\'')
# print x.decode("unicode-escape")
case_flag = None
step_flag = 0
test_attr = [int(idx.strip()) for idx in case_id.split(',')] if case_id not in ['', None, 'None'] else []
# 解析所有case信息并生成所有的测试用例函数
for idx, case in enumerate(case_list):
if case_flag != case[0]:
if step_flag != idx:
setattr(TestSuite, 'test_case%d' % case_flag, TestSuite.generateTest(case_list[step_flag:idx]))
if case_flag in test_attr:
test_attr[test_attr.index(case_flag)] = 'test_case%d' % case_flag
else:
test_attr.append('test_case%d' % case_flag)
step_flag = idx
case_flag = case[0]
if case_flag:
setattr(TestSuite, 'test_case%d' % case[0], TestSuite.generateTest(case_list[step_flag:]))
if case[0] in test_attr:
test_attr[test_attr.index(case[0])] = 'test_case%d' % case[0]
else:
test_attr.append('test_case%d' % case[0])
# 解析所有case信息并生成所有的测试用例函数
for idx, case in enumerate(case_list):
if case_flag != case[0]:
if step_flag != idx:
setattr(TestSuite, 'test_case%d' % case_flag, TestSuite.generateTest(case_list[step_flag:idx]))
if case_flag in test_attr:
test_attr[test_attr.index(case_flag)] = 'test_case%d' % case_flag
else:
test_attr.append('test_case%d' % case_flag)
step_flag = idx
case_flag = case[0]
if case_flag:
setattr(TestSuite, 'test_case%d' % case[0], TestSuite.generateTest(case_list[step_flag:]))
if case[0] in test_attr:
test_attr[test_attr.index(case[0])] = 'test_case%d' % case[0]
else:
test_attr.append('test_case%d' % case[0])
for test_fun in test_attr:
if type(test_fun) != int:
suite.addTest(TestSuite(test_fun))
for test_fun in test_attr:
if type(test_fun) != int:
suite.addTest(TestSuite(test_fun))
def run_suite():
suite = unittest.TestSuite()
gen_test_cass(suite)
suite = unittest.TestSuite()
gen_test_cass(suite)
# 获取系统当前时间
now = time.strftime('%Y-%m-%d-%H_%M_%S', time.localtime(time.time()))
day = time.strftime('%Y-%m-%d', time.localtime(time.time()))
# 获取系统当前时间
now = time.strftime('%Y-%m-%d-%H_%M_%S', time.localtime(time.time()))
day = time.strftime('%Y-%m-%d', time.localtime(time.time()))
result = os.path.join(os.path.split(os.path.realpath(__file__))[0], 'result', day)
result = os.path.join(os.path.split(os.path.realpath(__file__))[0], 'result', day)
# 定义个报告存放路径,支持相对路径
if not os.path.exists(result):
os.mkdir(result)
# 定义个报告存放路径,支持相对路径
if not os.path.exists(result):
os.mkdir(result)
filename = os.path.join(result, "%s_result.html" % now)
filename = os.path.join(result, "%s_result.html" % now)
Controller.tag_list[4] = os.path.join(day, "%s_result.html" % now)
Controller.tag_list[4] = os.path.join(day, "%s_result.html" % now)
fp = file(filename, 'wb')
fp = file(filename, 'wb')
report_title = u'自动化测试报告'
if Controller.args.task_id is not None:
Controller.my_execute(
u'''SELECT taskname, tasktype FROM autoplat_task WHERE id = %s''' % Controller.args.task_id)
task_info = Controller.cur.fetchall()
if task_info:
report_title = u'%s报告' % task_info[0][0]
Controller.task_name = task_info[0][0]
Controller.task_type = task_info[0][1]
report_title = u'自动化测试报告'
if Controller.args.task_id is not None:
Controller.my_execute(
u'''SELECT taskname, tasktype FROM autoplat_task WHERE id = %s''' % Controller.args.task_id)
task_info = Controller.cur.fetchall()
if task_info:
report_title = u'%s报告' % task_info[0][0]
Controller.task_name = task_info[0][0]
Controller.task_type = task_info[0][1]
# 定义测试报告
runner = HTMLTestRunner.HTMLTestRunner(stream=fp, verbosity=1, title=report_title, description=u'用例执行情况:',
result_list=Controller.result_list, tag_list=Controller.tag_list)
# 定义测试报告
runner = HTMLTestRunner.HTMLTestRunner(stream=fp, verbosity=1, title=report_title, description=u'用例执行情况:',
result_list=Controller.result_list, tag_list=Controller.tag_list)
# 运行测试用例
runner.run(suite)
# 运行测试用例
runner.run(suite)
# 关闭报告文件
fp.close()
# 关闭报告文件
fp.close()
if "__main__" == __name__:
# 初始化控制类对象:实例化浏览器实例
Controller.init()
try:
if Controller.args.task_id is not None:
Controller.my_execute(ur'''UPDATE autoplat_task SET status = 1 WHERE id = %s''' % Controller.args.task_id)
Controller.conn.commit()
# 初始化控制类对象:实例化浏览器实例
Controller.init()
try:
if Controller.args.task_id is not None:
Controller.my_execute(ur'''UPDATE autoplat_task SET status = 1 WHERE id = %s''' % Controller.args.task_id)
Controller.conn.commit()
run_suite()
run_suite()
Controller.update_result()
Controller.update_testrail()
except KeyboardInterrupt:
run_suite()
finally:
# 释放控制类对象信息:关闭浏览器实例
if Controller.args.task_id is not None:
Controller.my_execute(ur'''UPDATE autoplat_task SET status = 2 WHERE id = %s''' % Controller.args.task_id)
Controller.conn.commit()
Controller.update_result()
Controller.update_testrail()
except KeyboardInterrupt:
run_suite()
finally:
# 释放控制类对象信息:关闭浏览器实例
if Controller.args.task_id is not None:
Controller.my_execute(ur'''UPDATE autoplat_task SET status = 2 WHERE id = %s''' % Controller.args.task_id)
Controller.conn.commit()
Controller.conn.close()
Controller.conn.close()
Controller.conn = None
Controller.cur = None
Controller.action = None
Controller.client = None
Controller.run_id = None
del Controller.result_list[:]
del Controller.case_id_list[:]
del Controller.flag_list[:]
if Controller.args.task_id is not None:
Controller.update_task_history()
Controller.conn = None
Controller.cur = None
Controller.action = None
Controller.client = None
Controller.run_id = None
del Controller.result_list[:]
del Controller.case_id_list[:]
del Controller.flag_list[:]
if Controller.args.task_id is not None:
Controller.update_task_history()

View File

@ -13,167 +13,167 @@ sys.setdefaultencoding('utf-8')
class Demo(unittest.TestCase):
#脚本初始化
def setUp(self):
option = webdriver.ChromeOptions()
option.add_argument('test-type')
self.driver = webdriver.Chrome(chrome_options=option)
self.driver.implicitly_wait(30)
self.base_url = "https://192.168.110.114/login"
self.username = 'root'
self.password = 'root12345'
self.verifycode = '@c0rnC0d$'
#脚本初始化
def setUp(self):
option = webdriver.ChromeOptions()
option.add_argument('test-type')
self.driver = webdriver.Chrome(chrome_options=option)
self.driver.implicitly_wait(30)
self.base_url = "https://192.168.110.114/login"
self.username = 'root'
self.password = 'root12345'
self.verifycode = '@c0rnC0d$'
#测试用例
def test_mwupgrade(self):
"""
mwupgrade script
"""
driver = self.driver
print ur"========【MW在线升级程序】============="
#测试用例
def test_mwupgrade(self):
"""
mwupgrade script
"""
driver = self.driver
print ur"========【MW在线升级程序】============="
print ur"获取升级文件"
mwupgrade = self.get_bin()
if mwupgrade is not None:
driver.get(self.base_url + "/")
driver.maximize_window()
print ur"进行登录"
driver.find_element_by_id("login_text_username").clear()
driver.find_element_by_id("login_text_username").send_keys(self.username)
driver.find_element_by_id("login_text_password").clear()
driver.find_element_by_id("login_text_password").send_keys(self.password)
driver.find_element_by_id("login_text_verifycode").clear()
driver.find_element_by_id("login_text_verifycode").send_keys(self.verifycode)
driver.find_element_by_id("login_button_loginButton").click()
time.sleep(2)
#点击系统重置
driver.find_element_by_css_selector("#setting-systemconsole_li_systemReset a").click()
time.sleep(1)
#点击恢复出厂
driver.find_element_by_css_selector("#setting-systemconsole_button_resetModal").click()
time.sleep(2)
#点击确定
driver.find_element_by_css_selector(".modal-content button:nth-child(2)").click()
#等待160秒
print "重置中请稍等..."
for j in xrange(15):
for i in xrange(20):
time.sleep(1)
print '.',
print "."
#点击开始升级
print ur"重置完成,重新登录进行升级"
driver.get(self.base_url + "/")
driver.find_element_by_id("login_text_username").clear()
driver.find_element_by_id("login_text_username").send_keys(self.username)
driver.find_element_by_id("login_text_password").clear()
driver.find_element_by_id("login_text_password").send_keys(self.password)
driver.find_element_by_id("login_text_verifycode").clear()
driver.find_element_by_id("login_text_verifycode").send_keys(self.verifycode)
driver.find_element_by_id("login_button_loginButton").click()
time.sleep(2)
#点击系统设置
driver.find_element_by_css_selector("#header_li_setting a").click()
print "点击系统升级"
#点击系统升级
time.sleep(1)
driver.find_element_by_css_selector("#setting-systemconsole_li_systemUpgrade a").click()
time.sleep(1)
#上传升级文件
print ur"上传升级文件"
driver.find_element_by_css_selector(ur"#setting-systemconsole_li_systemUpgrade_browse>input").send_keys(mwupgrade)
#等待180秒
for j in xrange(8):
for i in xrange(20):
time.sleep(1)
print '.',
print "."
#点击开始升级
print u"点击开始升级"
driver.find_element_by_css_selector("#setting-systemconsole_li_systemUpgrade_start").click()
time.sleep(3)
#点击确认升级
print u"点击确认升级"
driver.find_element_by_xpath("//button[@ng-click='done()']").click()
#等待160秒
print ur"开始升级,并重启服务"
for j in xrange(15):
for i in xrange(20):
time.sleep(1)
print '.',
print "."
url = driver.current_url
hostname = re.search('\d+\.\d+\.\d+\.\d+', url).group(0)
port = 22
username = 'acorn'
password = 'Ag0@dbegiNNingmakesag0@dending.'
print ur"获取升级文件"
mwupgrade = self.get_bin()
if mwupgrade is not None:
driver.get(self.base_url + "/")
driver.maximize_window()
print ur"进行登录"
driver.find_element_by_id("login_text_username").clear()
driver.find_element_by_id("login_text_username").send_keys(self.username)
driver.find_element_by_id("login_text_password").clear()
driver.find_element_by_id("login_text_password").send_keys(self.password)
driver.find_element_by_id("login_text_verifycode").clear()
driver.find_element_by_id("login_text_verifycode").send_keys(self.verifycode)
driver.find_element_by_id("login_button_loginButton").click()
try:
paramiko.util.log_to_file("paramiko.log")
s = paramiko.SSHClient()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
s.connect(hostname=hostname, port=port, username=username, password=password)
time.sleep(2)
#点击系统重置
driver.find_element_by_css_selector("#setting-systemconsole_li_systemReset a").click()
time.sleep(1)
#点击恢复出厂
driver.find_element_by_css_selector("#setting-systemconsole_button_resetModal").click()
time.sleep(2)
#点击确定
driver.find_element_by_css_selector(".modal-content button:nth-child(2)").click()
#等待160秒
print "重置中请稍等..."
for j in xrange(15):
for i in xrange(20):
time.sleep(1)
print '.',
print "."
#点击开始升级
tt = s.invoke_shell()
tt.send('sudo reboot')
tt.send('\n')
tt.send(password)
tt.send('\n')
while not tt.recv_ready():
print "working..."
time.sleep(10)
print tt.recv(1024)
s.close()
except:
print u"MW 连接失败"
print ur"重新登录,验证版本"
driver.get(self.base_url + "/")
time.sleep(3)
driver.find_element_by_id("login_text_username").clear()
driver.find_element_by_id("login_text_username").send_keys(self.username)
driver.find_element_by_id("login_text_password").clear()
driver.find_element_by_id("login_text_password").send_keys(self.password)
driver.find_element_by_id("login_text_verifycode").clear()
driver.find_element_by_id("login_text_verifycode").send_keys(self.verifycode)
driver.find_element_by_id("login_button_loginButton").click()
#等待1秒
time.sleep(1)
print ur"检查系统当前验证版本"
#点击系统设置
driver.find_element_by_css_selector("#header_li_setting a").click()
#点击系统升级
driver.find_element_by_css_selector("#setting-systemconsole_li_systemUpgrade a").click()
time.sleep(1)
print ur"重置完成,重新登录进行升级"
driver.get(self.base_url + "/")
driver.find_element_by_id("login_text_username").clear()
driver.find_element_by_id("login_text_username").send_keys(self.username)
driver.find_element_by_id("login_text_password").clear()
driver.find_element_by_id("login_text_password").send_keys(self.password)
driver.find_element_by_id("login_text_verifycode").clear()
driver.find_element_by_id("login_text_verifycode").send_keys(self.verifycode)
driver.find_element_by_id("login_button_loginButton").click()
time.sleep(2)
#点击系统设置
driver.find_element_by_css_selector("#header_li_setting a").click()
print "点击系统升级"
#点击系统升级
time.sleep(1)
driver.find_element_by_css_selector("#setting-systemconsole_li_systemUpgrade a").click()
time.sleep(1)
#上传升级文件
print ur"上传升级文件"
driver.find_element_by_css_selector(ur"#setting-systemconsole_li_systemUpgrade_browse>input").send_keys(mwupgrade)
#等待180秒
for j in xrange(8):
for i in xrange(20):
time.sleep(1)
print '.',
print "."
#点击开始升级
print u"点击开始升级"
driver.find_element_by_css_selector("#setting-systemconsole_li_systemUpgrade_start").click()
time.sleep(3)
#点击确认升级
print u"点击确认升级"
driver.find_element_by_xpath("//button[@ng-click='done()']").click()
#等待160秒
print ur"开始升级,并重启服务"
for j in xrange(15):
for i in xrange(20):
time.sleep(1)
print '.',
print "."
url = driver.current_url
hostname = re.search('\d+\.\d+\.\d+\.\d+', url).group(0)
port = 22
username = 'acorn'
password = 'Ag0@dbegiNNingmakesag0@dending.'
#获取当前版本号
mc_version_text = driver.find_element_by_xpath(u"//div[contains(text(),'当前系统版本号')]").text
#进行版本校验
print mc_version_text
print ur"升级包:"+mwupgrade
mwupgradetext = mc_version_text[mc_version_text.find('MC'):mc_version_text.find('-C0')]
if mwupgradetext in mwupgrade:
print ur"版本一致升级成功! 进行版本测试。"
self.driver.quit()
os.system("python TestSuite.py -t 1 -u tsbc -r 1433")
else:
print ur"版本不一致升级失败!"
self.driver.quit()
else:
print ur"升级文件不存在,升级失败"
self.driver.quit()
try:
paramiko.util.log_to_file("paramiko.log")
s = paramiko.SSHClient()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
s.connect(hostname=hostname, port=port, username=username, password=password)
#脚本退出
def tearDown(self):
pass
tt = s.invoke_shell()
tt.send('sudo reboot')
tt.send('\n')
tt.send(password)
tt.send('\n')
while not tt.recv_ready():
print "working..."
time.sleep(10)
print tt.recv(1024)
s.close()
except:
print u"MW 连接失败"
print ur"重新登录,验证版本"
driver.get(self.base_url + "/")
time.sleep(3)
driver.find_element_by_id("login_text_username").clear()
driver.find_element_by_id("login_text_username").send_keys(self.username)
driver.find_element_by_id("login_text_password").clear()
driver.find_element_by_id("login_text_password").send_keys(self.password)
driver.find_element_by_id("login_text_verifycode").clear()
driver.find_element_by_id("login_text_verifycode").send_keys(self.verifycode)
driver.find_element_by_id("login_button_loginButton").click()
#等待1秒
time.sleep(1)
print ur"检查系统当前验证版本"
#点击系统设置
driver.find_element_by_css_selector("#header_li_setting a").click()
#点击系统升级
driver.find_element_by_css_selector("#setting-systemconsole_li_systemUpgrade a").click()
time.sleep(1)
def get_bin(self):
current_path = os.path.split(os.path.realpath(__file__))[0]
for item in os.listdir(current_path):
if os.path.splitext(item)[1].upper() == '.BIN':
return os.path.realpath(item)
return None
#获取当前版本号
mc_version_text = driver.find_element_by_xpath(u"//div[contains(text(),'当前系统版本号')]").text
#进行版本校验
print mc_version_text
print ur"升级包:"+mwupgrade
mwupgradetext = mc_version_text[mc_version_text.find('MC'):mc_version_text.find('-C0')]
if mwupgradetext in mwupgrade:
print ur"版本一致升级成功! 进行版本测试。"
self.driver.quit()
os.system("python TestSuite.py -t 1 -u tsbc -r 1433")
else:
print ur"版本不一致升级失败!"
self.driver.quit()
else:
print ur"升级文件不存在,升级失败"
self.driver.quit()
#脚本退出
def tearDown(self):
pass
def get_bin(self):
current_path = os.path.split(os.path.realpath(__file__))[0]
for item in os.listdir(current_path):
if os.path.splitext(item)[1].upper() == '.BIN':
return os.path.realpath(item)
return None
if __name__ == "__main__":
unittest.main()
unittest.main()

View File

@ -10,61 +10,61 @@ import requests, json
def sendAutoMagic(url, data):
headers = {"Content-Type": "application/json"}
requests.post(url, data=data, headers=headers)
headers = {"Content-Type": "application/json"}
requests.post(url, data=data, headers=headers)
# call_post(url_post, data_text)
if '__main__' == __name__:
#MWQA
url_post = "https://oapi.dingtalk.com/robot/send?access_token=8d36288c964c024ca2e5e53a45faddde0adcb2c6c638e9a59a16096f5d866715"
#Test
#url_post = "https://oapi.dingtalk.com/robot/send?access_token=e0c23e2f9242783f0ad34ccf21197a41f663c9b63ddc87fc386722654914c7ee"
#MWQA
url_post = "https://oapi.dingtalk.com/robot/send?access_token=8d36288c964c024ca2e5e53a45faddde0adcb2c6c638e9a59a16096f5d866715"
#Test
#url_post = "https://oapi.dingtalk.com/robot/send?access_token=e0c23e2f9242783f0ad34ccf21197a41f663c9b63ddc87fc386722654914c7ee"
data_text = json.dumps({
"msgtype": "text",
"text": {
"content": "圈人"
},
"at": {
"atMobiles": [
"18217516787",#wenjuang.wang
"15891390680" #yongbo.he
],
"isAtAll": False
}
})
data_text = json.dumps({
"msgtype": "text",
"text": {
"content": "圈人"
},
"at": {
"atMobiles": [
"18217516787",#wenjuang.wang
"15891390680" #yongbo.he
],
"isAtAll": False
}
})
data_link = json.dumps({"msgtype": "link",
"link": {
"text": "这个即将发布的新版本,创始人陈航(花名“无招”)称它为“红树林”。而在此之前,每当面临重大升级,产品经理们都会取一个应景的代号,这一次,为什么是“红树林”?",
"title": "时代的火车向前开", "picUrl": "",
"messageUrl": "https://mp.weixin.qq.com/s?__biz=MzA4NjMwMTA2Ng==&mid=2650316842&idx=1&sn=60da3ea2b29f1dcc43a7c8e4a7c97a16&scene=2&srcid=09189AnRJEdIiWVaKltFzNTw&from=timeline&isappinstalled=0&key=&ascene=2&uin=&devicetype=android-23&version=26031933&nettype=WIFI"}
})
data_link = json.dumps({"msgtype": "link",
"link": {
"text": "这个即将发布的新版本,创始人陈航(花名“无招”)称它为“红树林”。而在此之前,每当面临重大升级,产品经理们都会取一个应景的代号,这一次,为什么是“红树林”?",
"title": "时代的火车向前开", "picUrl": "",
"messageUrl": "https://mp.weixin.qq.com/s?__biz=MzA4NjMwMTA2Ng==&mid=2650316842&idx=1&sn=60da3ea2b29f1dcc43a7c8e4a7c97a16&scene=2&srcid=09189AnRJEdIiWVaKltFzNTw&from=timeline&isappinstalled=0&key=&ascene=2&uin=&devicetype=android-23&version=26031933&nettype=WIFI"}
})
count_num, pass_num, fail_num, error_num = 631, 328, 101, 202
data_markdown = json.dumps({
"msgtype": "markdown",
"markdown": {
"title": "AutoMagic TestReprot",
"text": "![screenshot](http://192.168.110.65/static/images/automagic.png) \n" +
">[AutoMagic TestReprot](http://jenkinsm.acorn-net.com/TestResults/CornerStone/2017-02-19/2017-02-19-00_55_46_result.html) \n\n " +
">执行CASE" + str(count_num) + "Pass" + str(pass_num) + "Fail" + str(fail_num) + "Error" + str(
error_num) + "\n"
}
})
data_md = json.dumps({
"msgtype": "markdown",
"markdown": {
"title": "Script Update",
"text": "![screenshot](http://192.168.110.65/static/images/automagic.png) \n" +
">[192.168.115.1](\\192.168.115.1) \n\n " +
">sendingdata.py 脚本更新\n" +
">1.添加 -t 参数调整线程数量默认为1\n"
">2.添加 -s 参数调整发包速度(单位是秒)可以是小数\n"
}
})
data_test = '''{"msgtype": "text", "text": {"content": "我就是我,颜色不一样的烟火!!!"}}'''
sendAutoMagic(url_post, data_md)
count_num, pass_num, fail_num, error_num = 631, 328, 101, 202
data_markdown = json.dumps({
"msgtype": "markdown",
"markdown": {
"title": "AutoMagic TestReprot",
"text": "![screenshot](http://192.168.110.65/static/images/automagic.png) \n" +
">[AutoMagic TestReprot](http://jenkinsm.acorn-net.com/TestResults/CornerStone/2017-02-19/2017-02-19-00_55_46_result.html) \n\n " +
">执行CASE" + str(count_num) + "Pass" + str(pass_num) + "Fail" + str(fail_num) + "Error" + str(
error_num) + "\n"
}
})
data_md = json.dumps({
"msgtype": "markdown",
"markdown": {
"title": "Script Update",
"text": "![screenshot](http://192.168.110.65/static/images/automagic.png) \n" +
">[192.168.115.1](\\192.168.115.1) \n\n " +
">sendingdata.py 脚本更新\n" +
">1.添加 -t 参数调整线程数量默认为1\n"
">2.添加 -s 参数调整发包速度(单位是秒)可以是小数\n"
}
})
data_test = '''{"msgtype": "text", "text": {"content": "我就是我,颜色不一样的烟火!!!"}}'''
sendAutoMagic(url_post, data_md)

View File

@ -23,14 +23,14 @@ python sendlog -f [filepath] -c [cycles] -t [sleep time]
-d [Receiver Host] -p [Receiver port]
**Details:**</br>
-C [开启无限循环]</br>
-f [指派日志文件路径]</br>
-c [指定循环次数]</br>
-t [指定日志发送间隔时间 单位:秒]</br>
-d [指定日志接收IP地址]</br>
-p [指定日志接收端口(整数)]</br>
-v [查看工具版本]</br>
-h [查看帮助]</br>
-C [开启无限循环]</br>
-f [指派日志文件路径]</br>
-c [指定循环次数]</br>
-t [指定日志发送间隔时间 单位:秒]</br>
-d [指定日志接收IP地址]</br>
-p [指定日志接收端口(整数)]</br>
-v [查看工具版本]</br>
-h [查看帮助]</br>
#TCP Socket
##通过TCP Socket发送syslog事件|tcpsendingsyslog.py

View File

@ -25,13 +25,13 @@ label.grid(row=12, column=0, columnspan=12, sticky=W)
radio_var = IntVar()
radio_var.set(1)
def sel_radio():
"""Radiobutton 实现协议选择"""
v = radio_var.get()
label.config(text='')
if v == 1:
return v
else:
label.config(text = "SNMP协议暂时不可用")
"""Radiobutton 实现协议选择"""
v = radio_var.get()
label.config(text='')
if v == 1:
return v
else:
label.config(text = "SNMP协议暂时不可用")
Label(root, text='协议:').grid(row=0, sticky=E)
Radiobutton1 = Radiobutton(root, text='Syslog', variable=radio_var, value=1, command=sel_radio)
Radiobutton1.grid(row=0, column=1, sticky=W)
@ -42,24 +42,24 @@ Radiobutton2.grid(row=0, column=2, sticky=W)
#使用sticky参数 默认的空间会在网格中居中显示。你可以使用sticky选项去指定对齐方式可以选择的值有N/S/E/W分别代表上/下/左/右。
value = True
def sendlog():
"""按钮发送功能"""
global value
if sel_radio() == 1:
while True:
if value:
time.sleep(1)
syslogc.test()
else:
print 'stop'
return 0
elif sel_radio() == 2:
print "暂不可用!"
else:
print "GG!"
"""按钮发送功能"""
global value
if sel_radio() == 1:
while True:
if value:
time.sleep(1)
syslogc.test()
else:
print 'stop'
return 0
elif sel_radio() == 2:
print "暂不可用!"
else:
print "GG!"
def stopsend():
global value
value = False
global value
value = False
Sendbtn = Button(root, text='发送', command= sendlog)
Sendbtn.grid(row=0, column=6, rowspan=1, sticky=W+E+S+N, padx=5, pady=5)
Sendbtn = Button(root, text='停止', command= stopsend)
@ -69,8 +69,8 @@ Label(root, text='__'*56, foreground='gray').grid(row=1, column=0, columnspan=12
Label(root, text='消息设置:').grid(row=2, column=0, columnspan=1, sticky=E)
def sel_filetype(event):
"""下拉框选择日志读取格式"""
print filetypeCombo.get()
"""下拉框选择日志读取格式"""
print filetypeCombo.get()
filetype_index = 0
filetypeCombo = Combobox(root, state='readonly')
filetypeCombo['values'] = ['从文件选择', '从文本框选择']
@ -80,8 +80,8 @@ filetypeCombo.bind('<<ComboboxSelected>>', sel_filetype)
def sel_logtype(event):
"""下拉框选择日志类型样本"""
print logtypeCombo.get()
"""下拉框选择日志类型样本"""
print logtypeCombo.get()
Label(root, text='选择日志样本:').grid(row=2, column=4, sticky=E)
logtype_index = 0
@ -99,11 +99,11 @@ e.set('请选择日志文件')
filepath.grid(row=4, column=0, columnspan=8, sticky=W)
def checkfile():
"""选择syslog样例文件"""
filename = tkFileDialog.askopenfilename()
e.set(filename)
print filename
return filename
"""选择syslog样例文件"""
filename = tkFileDialog.askopenfilename()
e.set(filename)
print filename
return filename
checkbtn = Button(root, text='. . .', command=checkfile)
checkbtn.grid(row=4, column=8)
@ -117,7 +117,7 @@ Label(root, text='目的端设置:').grid(row=7, column=0, sticky=W)
# """选中listbox中一行"""
# def print_item(event):
# print lb.get(lb.curselection())
# print lb.get(lb.curselection())
list = Treeview(height="4", columns=("ID","IP","PORT"), selectmode="extended")
list.heading('#1', text='ID', anchor='center')
list.heading('#2', text='IP', anchor='center')
@ -129,24 +129,24 @@ list.column('#0', stretch=NO, minwidth=0, width=0) #width 0 to not display it
list.grid(row=8, column=0, rowspan=3, columnspan=7, sticky='w')
def add_btn():
addtl = Toplevel()
addtl.title('添加接收地址')
addtl.geometry('400x100')
addtl.resizable(width=False, height=False)
firstline = Label(addtl, text=' ')
firstline.grid(row=0)
iplabel = Label(addtl, text='IP地址')
iplabel.grid(row=1, column=0, sticky=W)
ipvalue = Entry(addtl)
ipvalue.grid(row=1, column=1)
protlabel = Label(addtl, text='端口:')
protlabel.grid(row=1, column=2, sticky=W)
protvalue = Entry(addtl)
protvalue.grid(row=1, column=3)
ok = Button(addtl, text='确定')
ok.grid(row=2, column=0, columnspan=2, sticky=E)
cancel = Button(addtl, text='取消', command=addtl.destroy)
cancel.grid(row=2, column=3, columnspan=4, sticky=W)
addtl = Toplevel()
addtl.title('添加接收地址')
addtl.geometry('400x100')
addtl.resizable(width=False, height=False)
firstline = Label(addtl, text=' ')
firstline.grid(row=0)
iplabel = Label(addtl, text='IP地址')
iplabel.grid(row=1, column=0, sticky=W)
ipvalue = Entry(addtl)
ipvalue.grid(row=1, column=1)
protlabel = Label(addtl, text='端口:')
protlabel.grid(row=1, column=2, sticky=W)
protvalue = Entry(addtl)
protvalue.grid(row=1, column=3)
ok = Button(addtl, text='确定')
ok.grid(row=2, column=0, columnspan=2, sticky=E)
cancel = Button(addtl, text='取消', command=addtl.destroy)
cancel.grid(row=2, column=3, columnspan=4, sticky=W)
addbtn = Button(root, text='添加', command=add_btn)
addbtn.grid(row=8, column=8, rowspan=1, sticky=W+E+S+N, padx=5, pady=5)
@ -158,7 +158,7 @@ addbtn.grid(row=10, column=8, rowspan=1, sticky=W+E+S+N, padx=5, pady=5)
# lb.bind('<buttonrelease-1>', print_item)
# list_item = [1,2,3,4,5,6,7,8,9,0]x
# for item in list_item:
# lb.insert(end, item)
# lb.insert(end, item)
# scrl = scrollbar(root)
# scrl.pack(side=right, fill=y)
# lb.configure(yscrollcommand = scrl.set)

View File

@ -16,74 +16,74 @@ data = '''<188> 2017-02-27 17:29:30 SECURITY VUL host tsbc type 常规 mintype
# pkt = IP(src=srcip, dst=dstip)/UDP(sport=12345,dport=514)/data
# send(pkt,count=80000)
class Controller(object):
count = 1
@classmethod
def init(cls):
cls.args = get_args()
cls.thread_stop = False
count = 1
@classmethod
def init(cls):
cls.args = get_args()
cls.thread_stop = False
def get_args():
'''
解析命令行参数
:return: 命令行参数命名空间
'''
parser = argparse.ArgumentParser()
parser.add_argument('-sip', action='store', dest='srcip', type=str, help='src ip addr')
parser.add_argument('-dip', action='store', dest='dstip', type=str, help='dst ip addr')
parser.add_argument('-p', action='store', dest='protocol', type=str, help='protocol udp or tcp')
parser.add_argument('-dport', action='store', dest='dport', type=str, help='send port')
parser.add_argument('-c', action='store', dest='count', type=str, help='packets count')
parser.add_argument('-f', action='store', dest='file', type=str, help='packets file')
parser.add_argument('-t', action='store', dest='thread', type=str, help='thread number')
parser.add_argument('-s', action='store', dest='speed', type=str, help='send speed (s)')
rst = parser.parse_args()
return rst
'''
解析命令行参数
:return: 命令行参数命名空间
'''
parser = argparse.ArgumentParser()
parser.add_argument('-sip', action='store', dest='srcip', type=str, help='src ip addr')
parser.add_argument('-dip', action='store', dest='dstip', type=str, help='dst ip addr')
parser.add_argument('-p', action='store', dest='protocol', type=str, help='protocol udp or tcp')
parser.add_argument('-dport', action='store', dest='dport', type=str, help='send port')
parser.add_argument('-c', action='store', dest='count', type=str, help='packets count')
parser.add_argument('-f', action='store', dest='file', type=str, help='packets file')
parser.add_argument('-t', action='store', dest='thread', type=str, help='thread number')
parser.add_argument('-s', action='store', dest='speed', type=str, help='send speed (s)')
rst = parser.parse_args()
return rst
def run(srcip, dstip,data,protocol,dport,count,speed):
if protocol.upper() == 'UDP':
pkt = IP(src=srcip, dst=dstip)/UDP(sport=12345,dport=dport)/data
send(pkt,inter=speed, count=count)
if protocol.upper() == 'TCP':
pkt = IP(src=srcip, dst=dstip)/TCP(sport=12345, dport=dport) / data
send(pkt, inter=speed, count=count)
if protocol.upper() == 'UDP':
pkt = IP(src=srcip, dst=dstip)/UDP(sport=12345,dport=dport)/data
send(pkt,inter=speed, count=count)
if protocol.upper() == 'TCP':
pkt = IP(src=srcip, dst=dstip)/TCP(sport=12345, dport=dport) / data
send(pkt, inter=speed, count=count)
def sendpkgdata():
srcip = Controller.args.srcip
dstip = Controller.args.dstip
data = '''<188> 2017-02-27 17:29:30 SECURITY VUL host tsbc type 常规 mintype 轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击"'''
if Controller.args.protocol is None:
protocol = 'udp'
else:
protocol = Controller.args.protocol
if Controller.args.dport is None:
dport = 514
else:
dport = Controller.args.dport
if Controller.args.count is None:
count = 1
else:
count = int(Controller.args.count)
if Controller.args.speed is None:
speed = 0
else:
speed = float(Controller.args.speed)
run(srcip, dstip, data, protocol, dport, count, speed)
srcip = Controller.args.srcip
dstip = Controller.args.dstip
data = '''<188> 2017-02-27 17:29:30 SECURITY VUL host tsbc type 常规 mintype 轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击"'''
if Controller.args.protocol is None:
protocol = 'udp'
else:
protocol = Controller.args.protocol
if Controller.args.dport is None:
dport = 514
else:
dport = Controller.args.dport
if Controller.args.count is None:
count = 1
else:
count = int(Controller.args.count)
if Controller.args.speed is None:
speed = 0
else:
speed = float(Controller.args.speed)
run(srcip, dstip, data, protocol, dport, count, speed)
if "__main__" == __name__:
# 初始化控制类对象:实例化浏览器实例
Controller.init()
if Controller.args.thread is None:
thread = 1
else:
thread = int(Controller.args.thread)
tlst = []
# 初始化控制类对象:实例化浏览器实例
Controller.init()
if Controller.args.thread is None:
thread = 1
else:
thread = int(Controller.args.thread)
tlst = []
for i in xrange(thread):
th = threading.Thread(sendpkgdata())
tlst.append(th)
for i in xrange(thread):
th = threading.Thread(sendpkgdata())
tlst.append(th)
for i in tlst:
i.start()
for i in tlst:
i.start()

View File

@ -19,92 +19,92 @@ reload(sys)
sys.setdefaultencoding('utf-8')
class send:
def __init__(self, message, host, port):
self.message = message
self.host = host
self.port = port
self.FACILITY = {
'kern': 0, 'user': 1, 'mail': 2, 'daemon': 3,
'auth': 4, 'syslog': 5, 'lpr': 6, 'news': 7,
'uucp': 8, 'cron': 9, 'authpriv': 10, 'ftp': 11,
'local0': 16, 'local1': 17, 'local2': 18, 'local3': 19,
'local4': 20, 'local5': 21, 'local6': 22, 'local7': 23,
}
def __init__(self, message, host, port):
self.message = message
self.host = host
self.port = port
self.FACILITY = {
'kern': 0, 'user': 1, 'mail': 2, 'daemon': 3,
'auth': 4, 'syslog': 5, 'lpr': 6, 'news': 7,
'uucp': 8, 'cron': 9, 'authpriv': 10, 'ftp': 11,
'local0': 16, 'local1': 17, 'local2': 18, 'local3': 19,
'local4': 20, 'local5': 21, 'local6': 22, 'local7': 23,
}
self.LEVEL = {
'emerg': 0, 'alert':1, 'crit': 2, 'err': 3,
'warning': 4, 'notice': 5, 'info': 6, 'debug': 7
}
weights = [0.03, 0.04, 0.05, 0.07, 0.1, 0.2, 0.5, 0.01] #随机产生日志级别的概率
self.LEVEL = {
'emerg': 0, 'alert':1, 'crit': 2, 'err': 3,
'warning': 4, 'notice': 5, 'info': 6, 'debug': 7
}
weights = [0.03, 0.04, 0.05, 0.07, 0.1, 0.2, 0.5, 0.01] #随机产生日志级别的概率
if 'success' in self.message:
self.levl = self.LEVEL['info']
elif 'failed' in self.message:
self.levl = self.LEVEL['warning']
elif 'permit' in self.message:
self.levl = self.LEVEL['info']
elif 'deny' in self.message:
self.levl = self.LEVEL['warning']
else:
self.levl = weighted_choice.weighted_choice_sub(weights)
# self.levl = random.choice(self.LEVEL.keys())
def run(self):
if 'success' in self.message:
self.levl = self.LEVEL['info']
elif 'failed' in self.message:
self.levl = self.LEVEL['warning']
elif 'permit' in self.message:
self.levl = self.LEVEL['info']
elif 'deny' in self.message:
self.levl = self.LEVEL['warning']
else:
self.levl = weighted_choice.weighted_choice_sub(weights)
# self.levl = random.choice(self.LEVEL.keys())
def run(self):
"""
Send syslog UDP packet to given host and port.
"""
BUFSIZE = 4096
ADDR = (self.host,self.port)
# sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(ADDR)
except Exception,E:
print "Connect Error",E
return False
"""
Send syslog UDP packet to given host and port.
"""
BUFSIZE = 4096
ADDR = (self.host,self.port)
# sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(ADDR)
except Exception,E:
print "Connect Error",E
return False
data = '<%d>%s' % (self.levl + self.FACILITY['local7']*8, self.message)
print data
# sock.sendto(data, (self.host, self.port))
while True:
try:
sock.send(data)
data = sock.recv(BUFSIZE)
if not data:
break
print "Server:",data
except Exception,e:
print "Error",e
sock.close()
data = '<%d>%s' % (self.levl + self.FACILITY['local7']*8, self.message)
print data
# sock.sendto(data, (self.host, self.port))
while True:
try:
sock.send(data)
data = sock.recv(BUFSIZE)
if not data:
break
print "Server:",data
except Exception,e:
print "Error",e
sock.close()
if __name__ == '__main__':
# 从配置文件读取数据
cf = ConfigParser.ConfigParser()
cf.read('send.config')
host = cf.get('receiver', 'host').split(',') # 获取接收log服务主机
port = int(cf.get('receiver', 'port'))
sip = randomip.get_random_ip([cf.get('mesg', 'sip')]) # 随机生成源IP
dip = randomip.get_random_ip([cf.get('mesg', 'dip')]) # 随机生成目的IP
sportstr = tuple(cf.get('mesg', 'sport').split(',')) # 随机生成源端口
sport = str(random.randint(int(sportstr[0]), int(sportstr[1])))
dportstr = cf.get('mesg', 'dport').split(',') # 随机生成目的端口
dport = random.choice(dportstr)
# 从配置文件读取数据
cf = ConfigParser.ConfigParser()
cf.read('send.config')
host = cf.get('receiver', 'host').split(',') # 获取接收log服务主机
port = int(cf.get('receiver', 'port'))
sip = randomip.get_random_ip([cf.get('mesg', 'sip')]) # 随机生成源IP
dip = randomip.get_random_ip([cf.get('mesg', 'dip')]) # 随机生成目的IP
sportstr = tuple(cf.get('mesg', 'sport').split(',')) # 随机生成源端口
sport = str(random.randint(int(sportstr[0]), int(sportstr[1])))
dportstr = cf.get('mesg', 'dport').split(',') # 随机生成目的端口
dport = random.choice(dportstr)
# 获取系统当前时间
now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
nowstr = time.strftime('%b %d %H:%M:%S', time.localtime())
# 获取系统当前时间
now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
nowstr = time.strftime('%b %d %H:%M:%S', time.localtime())
'''H3C交换机'''
h3clog1 = nowstr + ' 2000 H3C %%10IFNET/3/LINK_UPDOWN(l): GigabitEthernet1/0/7 link status is UP.'
h3clog2 = nowstr + ' 2000 H3C %%10SHELL/4/LOGOUT(t): Trap 1.3.6.1.4.1.25506.2.2.1.1.3.0.2<hh3cLogOut>: logout from VTY'
threadh3c1 = send(h3clog1, host[0], port)
threadh3c2 = send(h3clog2, host[0], port)
'''H3C交换机'''
h3clog1 = nowstr + ' 2000 H3C %%10IFNET/3/LINK_UPDOWN(l): GigabitEthernet1/0/7 link status is UP.'
h3clog2 = nowstr + ' 2000 H3C %%10SHELL/4/LOGOUT(t): Trap 1.3.6.1.4.1.25506.2.2.1.1.3.0.2<hh3cLogOut>: logout from VTY'
threadh3c1 = send(h3clog1, host[0], port)
threadh3c2 = send(h3clog2, host[0], port)
'''Cisco路由器'''
# 管理员操作
csicolog1 = '29: *' + nowstr + '.' + str(
random.randint(001, 999)) + ': %SYS-5-CONFIG_I: Configured from console by admin on vty0 (10.0.1.49)'
print csicolog1
threadcisco1 = send(csicolog1, host[0], port)
'''Cisco路由器'''
# 管理员操作
csicolog1 = '29: *' + nowstr + '.' + str(
random.randint(001, 999)) + ': %SYS-5-CONFIG_I: Configured from console by admin on vty0 (10.0.1.49)'
print csicolog1
threadcisco1 = send(csicolog1, host[0], port)
threadcisco1.run()
threadcisco1.run()

View File

@ -14,69 +14,69 @@ import threading
import argparse
class Controller(object):
count = 1
@classmethod
def init(cls):
cls.args = get_args()
cls.thread_stop = False
count = 1
@classmethod
def init(cls):
cls.args = get_args()
cls.thread_stop = False
def get_args():
'''
解析命令行参数
:return: 命令行参数命名空间
'''
parser = argparse.ArgumentParser()
parser.add_argument('-host', action='store', dest='host', type=str, help='receive host ip')
parser.add_argument('-port', action='store', dest='dport', type=str, help='send port')
parser.add_argument('-c', action='store', dest='count', type=str, help='count number')
# parser.add_argument('-f', action='store', dest='file', type=str, help='packets file')
# parser.add_argument('-t', action='store', dest='thread', type=str, help='thread number')
rst = parser.parse_args()
return rst
'''
解析命令行参数
:return: 命令行参数命名空间
'''
parser = argparse.ArgumentParser()
parser.add_argument('-host', action='store', dest='host', type=str, help='receive host ip')
parser.add_argument('-port', action='store', dest='dport', type=str, help='send port')
parser.add_argument('-c', action='store', dest='count', type=str, help='count number')
# parser.add_argument('-f', action='store', dest='file', type=str, help='packets file')
# parser.add_argument('-t', action='store', dest='thread', type=str, help='thread number')
rst = parser.parse_args()
return rst
def run(host,port,data,count):
address = (host, port)
for i in xrange(count):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(address)
s.send(data)
# source = s.recv(1024)
# print 'the data received is', source
s.close()
address = (host, port)
for i in xrange(count):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(address)
s.send(data)
# source = s.recv(1024)
# print 'the data received is', source
s.close()
def senddata():
data = '''<188> 2017-02-27 17:29:30 %SYS-5-CONFIG_I: Configured from console by admin on vty0 (10.0.1.49)'''
if Controller.args.host is None:
print "Receiver IP(host) is Null!"
return False
else:
host = Controller.args.host
if Controller.args.dport is None:
port = 514
else:
port = int(Controller.args.dport)
if Controller.args.count is None:
count = 1
else:
count = int(Controller.args.count)
run(host, port, data, count)
data = '''<188> 2017-02-27 17:29:30 %SYS-5-CONFIG_I: Configured from console by admin on vty0 (10.0.1.49)'''
if Controller.args.host is None:
print "Receiver IP(host) is Null!"
return False
else:
host = Controller.args.host
if Controller.args.dport is None:
port = 514
else:
port = int(Controller.args.dport)
if Controller.args.count is None:
count = 1
else:
count = int(Controller.args.count)
run(host, port, data, count)
if "__main__" == __name__:
# 初始化控制类对象:实例化浏览器实例
Controller.init()
senddata()
# if Controller.args.thread is None:
# thread = 1
# else:
# thread = int(Controller.args.thread)
# tlst = []
#
# for i in xrange(thread):
# th = threading.Thread(senddata())
# tlst.append(th)
#
# for i in tlst:
# i.start()
# 初始化控制类对象:实例化浏览器实例
Controller.init()
senddata()
# if Controller.args.thread is None:
# thread = 1
# else:
# thread = int(Controller.args.thread)
# tlst = []
#
# for i in xrange(thread):
# th = threading.Thread(senddata())
# tlst.append(th)
#
# for i in tlst:
# i.start()

View File

@ -12,11 +12,11 @@ import socket,time
address = ('192.168.110.158', 1470)
data = '''<188> 2017-02-27 17:29:30 SECURITY VUL host tsbc type desc "jiangpeng.chen@acorn-net.com" '''
for i in xrange(100):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect_ex(address)
print s.getsockname()
print s.getpeername()
s.sendall(data+"["+str(1)+"]")
# print s.recv(1024)
time.sleep(1)
s.close()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect_ex(address)
print s.getsockname()
print s.getpeername()
s.sendall(data+"["+str(1)+"]")
# print s.recv(1024)
time.sleep(1)
s.close()

View File

@ -17,293 +17,293 @@ reload(sys)
sys.setdefaultencoding('utf-8')
class sending(threading.Thread):
"""send log"""
def __init__(self):
threading.Thread.__init__(self)
self.thread_stop = False
try:
self.opts, self.args = getopt.getopt(sys.argv[1:], "hvCf:c:t:d:p:")
except:
print u"命令语法错误,可使用 -h 查看帮助。"
sys.exit()
cf = ConfigParser.ConfigParser()
cf.read('send.config')
self.host = cf.get('receiver', 'host').split(',')
self.port = int(cf.get('receiver', 'port'))
self.sip = randomip.get_random_ip([cf.get('mesg', 'sip')])#随机生成源IP
self.dip = randomip.get_random_ip([cf.get('mesg', 'dip')])#随机生成目的IP
self.sportstr = tuple(cf.get('mesg', 'sport').split(',')) #随机生成源端口
self.sport = str(random.randint(int(self.sportstr[0]), int(self.sportstr[1])))
self.dportstr = cf.get('mesg', 'dport').split(',')#随机生成目的端口
self.dport = random.choice(self.dportstr)
"""send log"""
def __init__(self):
threading.Thread.__init__(self)
self.thread_stop = False
try:
self.opts, self.args = getopt.getopt(sys.argv[1:], "hvCf:c:t:d:p:")
except:
print u"命令语法错误,可使用 -h 查看帮助。"
sys.exit()
cf = ConfigParser.ConfigParser()
cf.read('send.config')
self.host = cf.get('receiver', 'host').split(',')
self.port = int(cf.get('receiver', 'port'))
self.sip = randomip.get_random_ip([cf.get('mesg', 'sip')])#随机生成源IP
self.dip = randomip.get_random_ip([cf.get('mesg', 'dip')])#随机生成目的IP
self.sportstr = tuple(cf.get('mesg', 'sport').split(',')) #随机生成源端口
self.sport = str(random.randint(int(self.sportstr[0]), int(self.sportstr[1])))
self.dportstr = cf.get('mesg', 'dport').split(',')#随机生成目的端口
self.dport = random.choice(self.dportstr)
#获取系统当前时间
now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
nowstr = time.strftime('%b %d %H:%M:%S', time.localtime())
#获取系统当前时间
now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
nowstr = time.strftime('%b %d %H:%M:%S', time.localtime())
#syslog日志类型
self.FACILITY = {
'kern': 0, 'user': 1, 'mail': 2, 'daemon': 3,
'auth': 4, 'syslog': 5, 'lpr': 6, 'news': 7,
'uucp': 8, 'cron': 9, 'authpriv': 10, 'ftp': 11,
'local0': 16, 'local1': 17, 'local2': 18, 'local3': 19,
'local4': 20, 'local5': 21, 'local6': 22, 'local7': 23,
}
#syslog日志类型
self.FACILITY = {
'kern': 0, 'user': 1, 'mail': 2, 'daemon': 3,
'auth': 4, 'syslog': 5, 'lpr': 6, 'news': 7,
'uucp': 8, 'cron': 9, 'authpriv': 10, 'ftp': 11,
'local0': 16, 'local1': 17, 'local2': 18, 'local3': 19,
'local4': 20, 'local5': 21, 'local6': 22, 'local7': 23,
}
#syslog日志级别
self.LEVEL = {
'emerg': 0, 'alert':1, 'crit': 2, 'err': 3,
'warning': 4, 'notice': 5, 'info': 6, 'debug': 7
}
weights = [0.03, 0.04, 0.05, 0.07, 0.1, 0.2, 0.5, 0.01] #随机产生日志级别的概率
self.message = []
#syslog日志级别
self.LEVEL = {
'emerg': 0, 'alert':1, 'crit': 2, 'err': 3,
'warning': 4, 'notice': 5, 'info': 6, 'debug': 7
}
weights = [0.03, 0.04, 0.05, 0.07, 0.1, 0.2, 0.5, 0.01] #随机产生日志级别的概率
self.message = []
'''漏扫'''
#操作日志
nvaslog1 = ' '+now + u' ADMIN CONFIG user admin ip 10.0.1.48 module 系统-云中心配置 cmd "云中心配置。" result '+random.choice(['success', 'failed'])
self.message.append(nvaslog1)
#管理员登录
nvaslog2 = ' '+now + u' ADMIN LOGIN user tsbc ip 10.0.1.104 action login result '+random.choice(['success', 'failed'])
self.message.append(nvaslog2)
#漏洞发现
nvaslog3 = ' '+now + u' SECURITY VUL host tsbc type 常规 mintype 轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击"'
self.message.append(nvaslog3)
'''漏扫'''
#操作日志
nvaslog1 = ' '+now + u' ADMIN CONFIG user admin ip 10.0.1.48 module 系统-云中心配置 cmd "云中心配置。" result '+random.choice(['success', 'failed'])
self.message.append(nvaslog1)
#管理员登录
nvaslog2 = ' '+now + u' ADMIN LOGIN user tsbc ip 10.0.1.104 action login result '+random.choice(['success', 'failed'])
self.message.append(nvaslog2)
#漏洞发现
nvaslog3 = ' '+now + u' SECURITY VUL host tsbc type 常规 mintype 轻蠕虫 id CN-CVE name 洪泛攻击 desc "容易受到ARP攻击"'
self.message.append(nvaslog3)
'''信息审计'''
#内容审计
bca1 = ' '+now + u' AUDIT CONTENT user admin usergroup 管理员组 access permit prot tcp sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + u' dport 80 type HTTP url http://www.163.com filename "找工作.txt" keyword 找工作 cmd "找工作" subject 邮件主题 sender lietou@163.com receiver lietou@163.com'
self.message.append(bca1)
bca2 = ' '+now + u' AUDIT CONTENT user admin usergroup 管理员组 access deny prot tcp sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + u' dport 80 type HTTP url http://mail.163.com filename "找工作.txt" keyword 找工作 cmd "找工作" subject 邮件主题 sender lietou@163.com receiver lietou@163.com'
self.message.append(bca2)
#应用行为审计
bca3 = ' '+now + ' AUDIT APP user 10.0.1.48 usergroup NULL access permit prot "UDP" sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + u' type "IM通信" behav "飞秋" desc "内置特征"'
self.message.append(bca3)
'''信息审计'''
#内容审计
bca1 = ' '+now + u' AUDIT CONTENT user admin usergroup 管理员组 access permit prot tcp sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + u' dport 80 type HTTP url http://www.163.com filename "找工作.txt" keyword 找工作 cmd "找工作" subject 邮件主题 sender lietou@163.com receiver lietou@163.com'
self.message.append(bca1)
bca2 = ' '+now + u' AUDIT CONTENT user admin usergroup 管理员组 access deny prot tcp sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + u' dport 80 type HTTP url http://mail.163.com filename "找工作.txt" keyword 找工作 cmd "找工作" subject 邮件主题 sender lietou@163.com receiver lietou@163.com'
self.message.append(bca2)
#应用行为审计
bca3 = ' '+now + ' AUDIT APP user 10.0.1.48 usergroup NULL access permit prot "UDP" sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + u' type "IM通信" behav "飞秋" desc "内置特征"'
self.message.append(bca3)
'''上网行为'''
#IP访问事件 --事件入库了但是界面没有对应的类别查询不到
nca1 = ' '+now + ' SECURITY POLICY access permit prot http smac 00:22:46:0D:91:7C dmac 00:22:46:0D:91:7C sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + ' times 1'
self.message.append(nca1)
#应用行为审计
nca2 = ' '+now + u' AUDIT APP user tsbc usergroup 管理员组 access deny prot 代理上网 sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + u' type GAME behav 访问网络游戏 desc "访问英雄联盟游戏"'
self.message.append(nca2)
#用户认证事件
nca3 = ' '+now + u' USER AUTH name tsbc group 管理员组 ip ' + self.sip + ' type local result '+random.choice(['success', 'failed'])
self.message.append(nca3)
'''上网行为'''
#IP访问事件 --事件入库了但是界面没有对应的类别查询不到
nca1 = ' '+now + ' SECURITY POLICY access permit prot http smac 00:22:46:0D:91:7C dmac 00:22:46:0D:91:7C sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + ' times 1'
self.message.append(nca1)
#应用行为审计
nca2 = ' '+now + u' AUDIT APP user tsbc usergroup 管理员组 access deny prot 代理上网 sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + u' type GAME behav 访问网络游戏 desc "访问英雄联盟游戏"'
self.message.append(nca2)
#用户认证事件
nca3 = ' '+now + u' USER AUTH name tsbc group 管理员组 ip ' + self.sip + ' type local result '+random.choice(['success', 'failed'])
self.message.append(nca3)
'''IDS/IPS'''
#设备流量日志
idslog1 = ' '+now + ' SYSTEM TRAFFIC sendbps 10211 recvbps 10211 sendpps 19621 recvpps 18954'
self.message.append(idslog1)
'''IDS/IPS'''
#设备流量日志
idslog1 = ' '+now + ' SYSTEM TRAFFIC sendbps 10211 recvbps 10211 sendpps 19621 recvpps 18954'
self.message.append(idslog1)
'''Jump防火墙'''
#安全监测/访问控制
fwlog1 = ' '+now + ' SECURITY POLICY access permit prot TCP smac 00:22:46:1d:eb:b5 dmac 00:22:46:1f:aa:47 sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + ' times 1'
self.message.append(fwlog1)
#攻击检测
fwlog2 = ' '+now + ' SECURITY INSTRUCTION type ddos id '+str(random.randint(1000, 9999))+ u' name "尝试攻击" smac 00:22:46:0D:91:7d dmac 00:22:46:0D:91:7C prot ICMP sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + ' times 1'
self.message.append(fwlog2)
#病毒日志
fwlog3 = ' '+now + u' SECURITY VIRUS type 木马病毒 name "灰鸽子" smac 00:22:46:0A:B1:BC dmac 00:22:46:0D:91:7C prot tcp sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + u' filename "新建文本文档.exe"'
self.message.append(fwlog3)
#隧道日志
fwlog4 = ' '+now + ' IPSEC TUNNEL id 1 localip ' + self.sip + ' remote ' + self.dip + ' desc "Error!!!"'
self.message.append(fwlog4)
'''Jump防火墙'''
#安全监测/访问控制
fwlog1 = ' '+now + ' SECURITY POLICY access permit prot TCP smac 00:22:46:1d:eb:b5 dmac 00:22:46:1f:aa:47 sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + ' times 1'
self.message.append(fwlog1)
#攻击检测
fwlog2 = ' '+now + ' SECURITY INSTRUCTION type ddos id '+str(random.randint(1000, 9999))+ u' name "尝试攻击" smac 00:22:46:0D:91:7d dmac 00:22:46:0D:91:7C prot ICMP sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + ' times 1'
self.message.append(fwlog2)
#病毒日志
fwlog3 = ' '+now + u' SECURITY VIRUS type 木马病毒 name "灰鸽子" smac 00:22:46:0A:B1:BC dmac 00:22:46:0D:91:7C prot tcp sip ' + self.sip + ' sport ' + self.sport + ' dip ' + self.dip + ' dport ' + self.dport + u' filename "新建文本文档.exe"'
self.message.append(fwlog3)
#隧道日志
fwlog4 = ' '+now + ' IPSEC TUNNEL id 1 localip ' + self.sip + ' remote ' + self.dip + ' desc "Error!!!"'
self.message.append(fwlog4)
'''主机审计'''
#移动介质审计
hostlog1 = ' '+now + u' AUDIT MEDIUM operator admin action 接入 medtype U 盘 medname 小明的U盘 access "D:\Menu" times 50 result '+random.choice(['permit', 'deny'])
self.message.append(hostlog1)
'''主机审计'''
#移动介质审计
hostlog1 = ' '+now + u' AUDIT MEDIUM operator admin action 接入 medtype U 盘 medname 小明的U盘 access "D:\Menu" times 50 result '+random.choice(['permit', 'deny'])
self.message.append(hostlog1)
'''用户/系统'''
#网口状态变化
systemlog1 = ' '+now + ' SYSTEM INTERFACE eth '+str(random.randint(0, 10))+' type '+random.choice(['phy', 'link'])+' state '+random.choice(['up', 'down'])
self.message.append(systemlog1)
#用户登录
systemlog2 = ' '+now + u' USER LOGIN user tsbc group 运维管理员组 ip '+ self.dip +' action '+random.choice(['login', 'logout'])+' result '+random.choice(['success', 'failed'])
self.message.append(systemlog2)
#用户认证
systemlog3 = ' '+now + u' USER AUTH name tsbc group 运维管理员组 ip '+ self.dip +' type '+random.choice(['local', 'radius', 'tacacs', 'ldap', 'msad', 'pop3', 'cert', 'other'])+' result '+random.choice(['success', 'failed'])
self.message.append(systemlog3)
#用户访问资源
systemlog4 = ' '+now + u' USER ACCESS name tsbc group 安全管理员组 restype web resname "信息审计系统" sip '+ self.sip +' dip '+ self.dip +' prot tcp sport '+ self.sport +' dport '+ self.dport +' result '+random.choice(['permit', 'deny'])
self.message.append(systemlog4)
#性能日志
systemlog5 = ' '+now + ' SYSTEM PERFORMANCE cpu '+str(random.randint(80, 90))+' memory '+str(random.randint(75, 95))+' connections '+str(random.randint(80, 300))
self.message.append(systemlog5)
#系统关键进程
systemlog6 = ' '+now + ' SYSTEM PROCESS explorer.exe exit'
self.message.append(systemlog6)
'''用户/系统'''
#网口状态变化
systemlog1 = ' '+now + ' SYSTEM INTERFACE eth '+str(random.randint(0, 10))+' type '+random.choice(['phy', 'link'])+' state '+random.choice(['up', 'down'])
self.message.append(systemlog1)
#用户登录
systemlog2 = ' '+now + u' USER LOGIN user tsbc group 运维管理员组 ip '+ self.dip +' action '+random.choice(['login', 'logout'])+' result '+random.choice(['success', 'failed'])
self.message.append(systemlog2)
#用户认证
systemlog3 = ' '+now + u' USER AUTH name tsbc group 运维管理员组 ip '+ self.dip +' type '+random.choice(['local', 'radius', 'tacacs', 'ldap', 'msad', 'pop3', 'cert', 'other'])+' result '+random.choice(['success', 'failed'])
self.message.append(systemlog3)
#用户访问资源
systemlog4 = ' '+now + u' USER ACCESS name tsbc group 安全管理员组 restype web resname "信息审计系统" sip '+ self.sip +' dip '+ self.dip +' prot tcp sport '+ self.sport +' dport '+ self.dport +' result '+random.choice(['permit', 'deny'])
self.message.append(systemlog4)
#性能日志
systemlog5 = ' '+now + ' SYSTEM PERFORMANCE cpu '+str(random.randint(80, 90))+' memory '+str(random.randint(75, 95))+' connections '+str(random.randint(80, 300))
self.message.append(systemlog5)
#系统关键进程
systemlog6 = ' '+now + ' SYSTEM PROCESS explorer.exe exit'
self.message.append(systemlog6)
'''H3C交换机'''
h3clog1 = nowstr + ' 2000 H3C %%10IFNET/3/LINK_UPDOWN(l): GigabitEthernet1/0/7 link status is UP.'
self.message.append(h3clog1)
h3clog2 = nowstr + ' 2000 H3C %%10SHELL/4/LOGOUT(t): Trap 1.3.6.1.4.1.25506.2.2.1.1.3.0.2<hh3cLogOut>: logout from VTY'
self.message.append(h3clog2)
'''H3C交换机'''
h3clog1 = nowstr + ' 2000 H3C %%10IFNET/3/LINK_UPDOWN(l): GigabitEthernet1/0/7 link status is UP.'
self.message.append(h3clog1)
h3clog2 = nowstr + ' 2000 H3C %%10SHELL/4/LOGOUT(t): Trap 1.3.6.1.4.1.25506.2.2.1.1.3.0.2<hh3cLogOut>: logout from VTY'
self.message.append(h3clog2)
'''Cisco路由器'''
#管理员操作
csicolog1 = '29: *'+nowstr+'.'+ str(random.randint(001, 999)) +': %SYS-5-CONFIG_I: Configured from console by admin on vty0 (10.0.1.49)'
self.message.append(csicolog1)
'''Cisco路由器'''
#管理员操作
csicolog1 = '29: *'+nowstr+'.'+ str(random.randint(001, 999)) +': %SYS-5-CONFIG_I: Configured from console by admin on vty0 (10.0.1.49)'
self.message.append(csicolog1)
if self.message is not os.path:
if 'success' in self.message:
self.levl = self.LEVEL['info']
elif 'failed' in self.message:
self.levl = self.LEVEL['warning']
elif 'permit' in self.message:
self.levl = self.LEVEL['info']
elif 'deny' in self.message:
self.levl = self.LEVEL['warning']
else:
self.levl = weighted_choice.weighted_choice_sub(weights)
if self.message is not os.path:
if 'success' in self.message:
self.levl = self.LEVEL['info']
elif 'failed' in self.message:
self.levl = self.LEVEL['warning']
elif 'permit' in self.message:
self.levl = self.LEVEL['info']
elif 'deny' in self.message:
self.levl = self.LEVEL['warning']
else:
self.levl = weighted_choice.weighted_choice_sub(weights)
self.input_file = ''
self.count = 1
#self.host = self.host
self.port = 514
self.n = .0
self.x = False
self.fx = True
for op, value in self.opts:
if op == '-f':
self.input_file = value
self.fx = False
if op == '-v':
print "SendingTools Version 1.0"
sys.exit()
elif op == "-c":
try:
self.count = int(value)
except:
print u'-c 参数值错误, 可使用 -h 查看帮助。'
sys.exit()
elif op == "-C":
self.x = True
elif op == "-d":
self.host = []
self.host.append(value)
elif op == "-p":
try:
self.port = int(value)
except:
print u'-p 参数值错误, 可使用 -h 查看帮助。'
sys.exit()
elif op == "-t":
try:
self.n = float(value)
except:
print u'-t 参数值错误, 可使用 -h 查看帮助。'
sys.exit()
elif op == "-h" or op =="-help":
self.usage()
sys.exit()
def usage(self):
"""帮助文档"""
print u""" This is a Sending log Tools:
self.input_file = ''
self.count = 1
#self.host = self.host
self.port = 514
self.n = .0
self.x = False
self.fx = True
for op, value in self.opts:
if op == '-f':
self.input_file = value
self.fx = False
if op == '-v':
print "SendingTools Version 1.0"
sys.exit()
elif op == "-c":
try:
self.count = int(value)
except:
print u'-c 参数值错误, 可使用 -h 查看帮助。'
sys.exit()
elif op == "-C":
self.x = True
elif op == "-d":
self.host = []
self.host.append(value)
elif op == "-p":
try:
self.port = int(value)
except:
print u'-p 参数值错误, 可使用 -h 查看帮助。'
sys.exit()
elif op == "-t":
try:
self.n = float(value)
except:
print u'-t 参数值错误, 可使用 -h 查看帮助。'
sys.exit()
elif op == "-h" or op =="-help":
self.usage()
sys.exit()
def usage(self):
"""帮助文档"""
print u""" This is a Sending log Tools:
-------------------------------------------------------------------------------------------
Usge:
python sendlog -f [filepath] -c [cycles] -t [sleep time]
-d [Receiver Host] -p [Receiver port]
Details:
-C Cycle send log [开启无限循环]
-f Specify the path to log file [指派日志文件路径]
-c Specify number of cycles [指定循环次数]
-t Specify log sending time interval [指定日志发送间隔时间 单位:]
-d Specify Receive log IP Addr [指定日志接收IP地址]
-p Specify Receive log Dst Prot [指定日志接收端口整数]
-v Show version [查看工具版本]
-h Show help info [查看帮助]"""
Usge:
python sendlog -f [filepath] -c [cycles] -t [sleep time]
-d [Receiver Host] -p [Receiver port]
Details:
-C Cycle send log [开启无限循环]
-f Specify the path to log file [指派日志文件路径]
-c Specify number of cycles [指定循环次数]
-t Specify log sending time interval [指定日志发送间隔时间 单位:]
-d Specify Receive log IP Addr [指定日志接收IP地址]
-p Specify Receive log Dst Prot [指定日志接收端口整数]
-v Show version [查看工具版本]
-h Show help info [查看帮助]"""
def run(self):
"""
Send syslog UDP packet to given host and port.
"""
def run(self):
"""
Send syslog UDP packet to given host and port.
"""
if self.fx:#fx为True 发送脚本样例
if self.x: #传入 -C 参数进行无限循环
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
time.sleep(float(self.n))
for h in self.host:
for i in xrange(0, self.count):
for msg in self.message:
data = '<%d>%s' % (self.levl + self.FACILITY['local7']*8, msg)
time.sleep(self.n)
try:
sock.sendto(data, (h, self.port))
print data
except:
print u"参数取值出现错误,使用 -h 查看帮助。"
sys.exit()
sock.close()
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for h in self.host:
#print h
for i in xrange(0, self.count):
for msg in self.message:
data = '<%d>%s' % (self.levl + self.FACILITY['local7']*8, msg)
time.sleep(self.n)
#print self.n
try:
sock.sendto(data, (h, self.port))
print data
except:
print u"参数取值出现错误,使用 -h 查看帮助。"
sys.exit()
sock.close()
else:#-f 读取日志文件
if self.x:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
for i in xrange(0, self.count):
try:
f = open(self.input_file, 'r')
lines = f.readlines()
except IOError:
print u'读取的日志文件不存在,请更正后再试。'
sys.exit()
for line in lines:
time.sleep(self.n)
for h in self.host:
try:
sock.sendto(line, (h, self.port))
print line
except:
print u"参数取值出现错误,使用 -h 查看帮助。"
sys.exit()
f.close()
sock.close()
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for j in xrange(0, self.count):
try:
f = open(self.input_file, 'r')
lines = f.readlines()
except IOError:
print u'-f 文件路径不存在,可使用 -h 查看帮助。'
sys.exit()
for line in lines:
time.sleep(self.n)
for h in self.host:
try:
sock.sendto(line, (h, self.port))
print line
except:
print u"参数取值出现错误,使用 -h 查看帮助。"
sys.exit()
f.close()
sock.close()
def stop(self):
self.thread_stop = True
if self.fx:#fx为True 发送脚本样例
if self.x: #传入 -C 参数进行无限循环
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
time.sleep(float(self.n))
for h in self.host:
for i in xrange(0, self.count):
for msg in self.message:
data = '<%d>%s' % (self.levl + self.FACILITY['local7']*8, msg)
time.sleep(self.n)
try:
sock.sendto(data, (h, self.port))
print data
except:
print u"参数取值出现错误,使用 -h 查看帮助。"
sys.exit()
sock.close()
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for h in self.host:
#print h
for i in xrange(0, self.count):
for msg in self.message:
data = '<%d>%s' % (self.levl + self.FACILITY['local7']*8, msg)
time.sleep(self.n)
#print self.n
try:
sock.sendto(data, (h, self.port))
print data
except:
print u"参数取值出现错误,使用 -h 查看帮助。"
sys.exit()
sock.close()
else:#-f 读取日志文件
if self.x:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
for i in xrange(0, self.count):
try:
f = open(self.input_file, 'r')
lines = f.readlines()
except IOError:
print u'读取的日志文件不存在,请更正后再试。'
sys.exit()
for line in lines:
time.sleep(self.n)
for h in self.host:
try:
sock.sendto(line, (h, self.port))
print line
except:
print u"参数取值出现错误,使用 -h 查看帮助。"
sys.exit()
f.close()
sock.close()
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for j in xrange(0, self.count):
try:
f = open(self.input_file, 'r')
lines = f.readlines()
except IOError:
print u'-f 文件路径不存在,可使用 -h 查看帮助。'
sys.exit()
for line in lines:
time.sleep(self.n)
for h in self.host:
try:
sock.sendto(line, (h, self.port))
print line
except:
print u"参数取值出现错误,使用 -h 查看帮助。"
sys.exit()
f.close()
sock.close()
def stop(self):
self.thread_stop = True
def sendfile():
thread1 = sending()
thread1.start()
thread1.stop()
thread1 = sending()
thread1.start()
thread1.stop()
if __name__ == '__main__':
for i in xrange(1):
# print i
sendfile()
for i in xrange(1):
# print i
sendfile()

View File

@ -11,12 +11,12 @@ import random
from collections import Counter
def weighted_choice_sub(weights):
rnd = random.random() * sum(weights)
for i, w in enumerate(weights):
rnd -= w
if rnd < 0:
# print w
return i
rnd = random.random() * sum(weights)
for i, w in enumerate(weights):
rnd -= w
if rnd < 0:
# print w
return i
weights = [0.03, 0.04, 0.05, 0.07, 0.1, 0.2, 0.5, 0.01]
@ -25,10 +25,10 @@ weights = [0.03, 0.04, 0.05, 0.07, 0.1, 0.2, 0.5, 0.01]
# count =Counter()
#
# for x in xrange(10000):
# index = weighted_choice_sub(weights)
# count[index] += 1
# index = weighted_choice_sub(weights)
# count[index] += 1
#
# count_sum = sum(count.values())
#
# for key, value in count.iteritems():
# print float(value)/count_sum
# print float(value)/count_sum

View File

@ -13,72 +13,72 @@
import urllib2, json, base64
class APIClient:
def __init__(self, base_url):
self.user = ''
self.password = ''
if not base_url.endswith('/'):
base_url += '/'
self.__url = base_url + 'index.php?/api/v2/'
def __init__(self, base_url):
self.user = ''
self.password = ''
if not base_url.endswith('/'):
base_url += '/'
self.__url = base_url + 'index.php?/api/v2/'
#
# Send Get
#
# Issues a GET request (read) against the API and returns the result
# (as Python dict).
#
# Arguments:0
#
# uri The API method to call including parameters
# (e.g. get_case/1)
#
def send_get(self, uri):
return self.__send_request('GET', uri, None)
#
# Send Get
#
# Issues a GET request (read) against the API and returns the result
# (as Python dict).
#
# Arguments:0
#
# uri The API method to call including parameters
# (e.g. get_case/1)
#
def send_get(self, uri):
return self.__send_request('GET', uri, None)
#
# Send POST
#
# Issues a POST request (write) against the API and returns the result
# (as Python dict).
#
# Arguments:
#
# uri The API method to call including parameters
# (e.g. add_case/1)
# data The data to submit as part of the request (as
# Python dict, strings must be UTF-8 encoded)
#
def send_post(self, uri, data):
return self.__send_request('POST', uri, data)
#
# Send POST
#
# Issues a POST request (write) against the API and returns the result
# (as Python dict).
#
# Arguments:
#
# uri The API method to call including parameters
# (e.g. add_case/1)
# data The data to submit as part of the request (as
# Python dict, strings must be UTF-8 encoded)
#
def send_post(self, uri, data):
return self.__send_request('POST', uri, data)
def __send_request(self, method, uri, data):
url = self.__url + uri
request = urllib2.Request(url)
if (method == 'POST'):
request.add_data(json.dumps(data))
auth = base64.b64encode('%s:%s' % (self.user, self.password))
request.add_header('Authorization', 'Basic %s' % auth)
request.add_header('Content-Type', 'application/json')
def __send_request(self, method, uri, data):
url = self.__url + uri
request = urllib2.Request(url)
if (method == 'POST'):
request.add_data(json.dumps(data))
auth = base64.b64encode('%s:%s' % (self.user, self.password))
request.add_header('Authorization', 'Basic %s' % auth)
request.add_header('Content-Type', 'application/json')
e = None
try:
response = urllib2.urlopen(request).read()
except urllib2.HTTPError as e:
response = e.read()
e = None
try:
response = urllib2.urlopen(request).read()
except urllib2.HTTPError as e:
response = e.read()
if response:
result = json.loads(response)
else:
result = {}
if response:
result = json.loads(response)
else:
result = {}
if e != None:
if result and 'error' in result:
error = '"' + result['error'] + '"'
else:
error = 'No additional error message received'
raise APIError('TestRail API returned HTTP %s (%s)' %
(e.code, error))
if e != None:
if result and 'error' in result:
error = '"' + result['error'] + '"'
else:
error = 'No additional error message received'
raise APIError('TestRail API returned HTTP %s (%s)' %
(e.code, error))
return result
return result
class APIError(Exception):
pass
pass

View File

@ -21,8 +21,8 @@ status_id:
"""
result = client.send_post(
'add_result_for_case/1047/1036925',
{'status_id': 1, 'comment': 'AutoMagic Flag.' }
'add_result_for_case/1047/1036925',
{'status_id': 1, 'comment': 'AutoMagic Flag.' }
)
# pprint(result)
@ -46,13 +46,13 @@ section = client.send_get('get_section/45752')
send_post('add_case/[sectionid]',casedata)
"""
casedata = {
"title":"AutoMagic 测试使用admin用户登录。",
"template_id":"1",
"type_id":"1",
"priority_id":"4",
"custom_caseversion_id":"8",
"custom_automation_status":"2",
"custom_steps": "Step1 跳转到登录页面http://192.168.110.111\n Step2 输入登录用户名 admin \n Step3 输入登录密码 admin@123\n Step4 点击登录按钮"
"title":"AutoMagic 测试使用admin用户登录。",
"template_id":"1",
"type_id":"1",
"priority_id":"4",
"custom_caseversion_id":"8",
"custom_automation_status":"2",
"custom_steps": "Step1 跳转到登录页面http://192.168.110.111\n Step2 输入登录用户名 admin \n Step3 输入登录密码 admin@123\n Step4 点击登录按钮"
}
add_case = client.send_post('add_case/45752',casedata)
# pprint(add_case)