Commit 85f961b1 by zhangchunyao

code commit 2.0 201907311711

parent 6cca67d8
"""
by LiangChao
at 2019/7/31
用来执行单接口用例文件
"""
from common.Http import Http from common.Http import Http
from common.Excel import Reader, Writer from common.Excel import Reader, Writer
import unittest import unittest
import json,ddt import json, ddt, textwrap
http = Http() http = Http()
reader = Reader() reader = Reader()
writer = Writer() writer = Writer()
#reader.open_excle('D:/workSpace/Python_workspace/testcase.xls') # 读取用例文件
reader.open_excle('/opt/apitest/apiAutoByJenkins/repot/testcase.xls') # reader.open_excle('D:/workSpace/Python_workspace/apitest2.0.xls') # for local
reader.open_excle('/opt/apitest/apiAutoByJenkins/repot/apitest.xls') # for server
sheetname = reader.get_sheets() sheetname = reader.get_sheets()
#writer.copy_open('D:/workSpace/Python_workspace/testcase.xls', 'D:/workSpace/Python_workspace/testcase1.xls') # copy用例文件
writer.copy_open('/opt/apitest/apiAutoByJenkins/repot/testcase.xls', '/opt/apitest/apiAutoByJenkins/repot/testcase1.xls') # writer.copy_open('D:/workSpace/Python_workspace/apitest2.0.xls', 'D:/workSpace/Python_workspace/apitest2.0_1.xls') # for local
writer.copy_open('/opt/apitest/apiAutoByJenkins/repot/apitest.xls', '/opt/apitest/apiAutoByJenkins/repot/apitest1.xls') # for server
sheetname1 = writer.get_sheets() sheetname1 = writer.get_sheets()
writer.set_sheet(sheetname1[0]) writer.set_sheet(sheetname1[0])
for sheet in sheetname: for sheet in sheetname:
...@@ -23,62 +29,58 @@ class Run(unittest.TestCase): ...@@ -23,62 +29,58 @@ class Run(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
pass pass
def runner(self,line,http,num): def runner(self,line,http):
d = {
"Post": [line[4], line[5], line[6], line[7], line[8]],
"Addheader": [line[4]],
"savejson": [line[4], line[5]],
"clear_values": [],
"Url": [line[4]],
"Addjson": [line[4], line[5]],
"Database_query": [line[4], line[5], line[6]],
"Get": [line[4], line[5], line[6], line[7], line[8]]
}
if len(line[0])>0 or len(line[1])>0: if len(line[0])>0 or len(line[1])>0:
return return
if line[3] == 'post': if hasattr(http, line[3]):
result = http.post(line[4], line[5], line[6]) func = getattr(http, line[3])
return result for k, v in d.items():
if line[3] == 'addheader': if line[3] in k:
result = http.addheader(line[4]) result = func(*v)
return result return result
if line[3] == 'assertquals':
result = http.assertquals(line[4], line[5])
if result[1] == 'fail':
writer.write(num, 7, result[1])
try:
self.assertEqual(result[0][0], result[0][1])
except AssertionError as e:
print("预期与实际不符:{}".format(e))
raise
return result[1]
if line[3] == 'assertquals_int':
result = http.assertquals_int(line[4], line[5])
if result[1] == 'fail':
writer.write(num, 7, result[1])
try:
self.assertEqual(result[0][0], result[0][1])
except AssertionError as e:
print("预期与实际不符:{}".format(e))
raise
return result[1]
if line[3] == 'savejson':
result = http.savejson(line[4], line[5])
return result
if line[3] == 'clear_values':
http.clear_values()
return
if line[3] == 'seturl':
result = http.seturl(line[4])
return result
if line[3] == 'Addjson':
result = http.Addjson(line[4])
return result
if line[3] == 'Database_query':
result = http.Database_query(line[4],line[5])
return result
if line[3] =='get':
result = http.get(line[4], line[5], line[6])
return result
@ddt.data(*data) @ddt.data(*data)
def test_run(self,data): def test_run(self,data):
result = self.runner(reader.readline_new(data[-1]), http, data[-1]) # 用例执行
excle_value = reader.readline_new(data[-1])
result = self.runner(excle_value, http)
if result != None: if result != None:
#print(result) if isinstance(result, tuple):
results = json.dumps(result) results = json.dumps(result[0])
writer.write(data[-1], 7, results) if result[1][0] == result[1][1]:
writer.write(data[-1], 9, 'pass')
else:
writer.write(data[-1], 9, 'fail')
incol = 10
for var in textwrap.wrap(text=results, width=30000):
writer.write(data[-1], incol, var)
incol = incol +1
try:
self.assertEqual(result[1][0], result[1][1], msg='\n检验的字段名称:{}\n请求的接口api:{}'.format(excle_value[7], excle_value[4]))
except AssertionError as e:
print("实际与预期不符:{}\n接口返回结果:\n{}".format(e, result[0]))
#print("实际与预期不符:{}\n接口返回结果:\n".format(e))
raise
print(result[0])
# 写入Excel文件
else:
print(result)
results = json.dumps(result)
# 写入Excel文件
writer.write(data[-1], 9, 'pass')
writer.write(data[-1], 10, results)
#writer.save_close()
def tearDown(self):
# 保存文件
writer.save_close() writer.save_close()
if __name__ == '__main__': if __name__ == '__main__':
......
"""
by LiangChao
at 2019/7/31
用来执行多接口流程用例文件
"""
from common.Http import Http
from common.Excel import Reader, Writer from common.Excel import Reader, Writer
from common.Http import InterFace
from xlutils.copy import copy
import unittest import unittest
import json import json, ddt, textwrap
import ddt http = Http()
reader = Reader() reader = Reader()
writer = Writer() writer = Writer()
inter = InterFace() # 读取用例文件
APIPATH = '/opt/apitest/apiAutoByJenkins/repot/apitest.xls' # reader.open_excle('D:/workSpace/Python_workspace/testcase2.0.xls') # for local
#APIPATH = 'D:/workSpace/Python_workspace/apitestunit.xls' reader.open_excle('/opt/apitest/apiAutoByJenkins/repot/testcase.xls') # for server
br = reader.open_excle(APIPATH)
sheetname = reader.get_sheets() sheetname = reader.get_sheets()
bw = copy(br) # copy用例文件
data = reader.sheet_cont() # writer.copy_open('D:/workSpace/Python_workspace/testcase2.0.xls', 'D:/workSpace/Python_workspace/testcase2.0_1.xls') # for local
writer.copy_open('/opt/apitest/apiAutoByJenkins/repot/testcase.xls', '/opt/apitest/apiAutoByJenkins/repot/testcase1.xls') # for server
sheetname1 = writer.get_sheets()
writer.set_sheet(sheetname1[0])
for sheet in sheetname:
# 设置当前读取的sheet页面
reader.set_sheet(sheet)
data = reader.dict_data()
#print(data)
@ddt.ddt @ddt.ddt
class Run(unittest.TestCase): class Run(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
pass pass
def runner(self,line,http):
d = {
"Post": [line[4], line[5], line[6], line[7], line[8]],
"Addheader": [line[4]],
"savejson": [line[4], line[5]],
"clear_values": [],
"Url": [line[4]],
"Addjson": [line[4], line[5]],
"Database_query": [line[4], line[5], line[6]],
"Get": [line[4], line[5], line[6], line[7], line[8]]
}
if len(line[0])>0 or len(line[1])>0:
return
if hasattr(http, line[3]):
func = getattr(http, line[3])
for k, v in d.items():
if line[3] in k:
result = func(*v)
return result
@ddt.data(*data) @ddt.data(*data)
def test_run(self,data): def test_run(self,data):
result = inter.dosheet(br, bw, data[-1]) # 用例执行
#print(result[0]) excle_value = reader.readline_new(data[-1])
if result[1]: result = self.runner(excle_value, http)
if isinstance(result[1][0], int): if result != None:
if isinstance(result, tuple):
results = json.dumps(result[0])
if result[1][0] == result[1][1]:
writer.write(data[-1], 9, 'pass')
else:
writer.write(data[-1], 9, 'fail')
incol = 10
for var in textwrap.wrap(text=results, width=30000):
writer.write(data[-1], incol, var)
incol = incol +1
try: try:
self.assertEqual(result[1][0], int(result[1][1]), msg='\n检验的字段名称:{}\n请求的接口api:{}'.format(result[1][-2], result[1][-1])) self.assertEqual(result[1][0], result[1][1], msg='\n检验的字段名称:{}\n请求的接口api:{}'.format(excle_value[7], excle_value[4]))
except AssertionError as e: except AssertionError as e:
print("实际与预期不符:{}\n接口返回结果:\n{}".format(e, result[0])) print("实际与预期不符:{}\n接口返回结果:\n{}".format(e, result[0]))
#print("实际与预期不符:{}\n接口返回结果:\n".format(e))
raise raise
print(result[0])
# 写入Excel文件
else: else:
try: print(result)
self.assertEqual(result[1][0], result[1][1], msg='\n检验的字段名称:{}\n请求的接口api:{}'.format(result[1][-2], result[1][-1])) results = json.dumps(result)
except AssertionError as e: # 写入Excel文件
print("实际与预期不符:{}\n接口返回结果:\n{}".format(e, result[0])) writer.write(data[-1], 9, 'pass')
raise writer.write(data[-1], 10, results)
#writer.save_close()
def tearDown(self): def tearDown(self):
inter.xlsx_save(bw, APIPATH) # 保存文件
writer.save_close()
if __name__ == '__main__': if __name__ == '__main__':
run = Run() run = Run()
......
'''
by LiangChao
at 2019/7/11
用来读/写Excel文件内容
'''
import os,xlrd import os,xlrd
from xlutils.copy import copy from xlutils.copy import copy
class Reader: class Reader:
'''
by LiangChao
at 2019/7/11
用来读取Excel文件内容
'''
def __init__(self): def __init__(self):
#整个excel工作簿缓存 #整个excel工作簿缓存
self.workbook=None self.workbook=None
...@@ -82,7 +83,6 @@ class Reader: ...@@ -82,7 +83,6 @@ class Reader:
r.append(values) r.append(values)
return r return r
def sheet_cont(self,): def sheet_cont(self,):
r = [] r = []
for i in list(range(self.rows-1)): for i in list(range(self.rows-1)):
...@@ -119,10 +119,10 @@ class Reader: ...@@ -119,10 +119,10 @@ class Reader:
self.set_sheet(sheet) self.set_sheet(sheet)
for i in range(self.rows): for i in range(self.rows):
line = self.readline() line = self.readline()
if line[10] == 'fail': if line[9] == 'fail':
api.append(line[4]) result = line[4]+":"+line[2]
api.append(result)
return api return api
class Writer: class Writer:
''' '''
by kongwen 2019/7/8 用来复制写入excel by kongwen 2019/7/8 用来复制写入excel
...@@ -140,7 +140,7 @@ class Writer: ...@@ -140,7 +140,7 @@ class Writer:
self.row=0 self.row=0
#记录写入的列 #记录写入的列
self.clo=0 self.clo=0
#复制并打开excel # 复制并打开excel
def copy_open(self,srcfile,dstfile): def copy_open(self,srcfile,dstfile):
#判断要复制的文件是否存在 #判断要复制的文件是否存在
if not os.path.isfile(srcfile): if not os.path.isfile(srcfile):
...@@ -159,22 +159,22 @@ class Writer: ...@@ -159,22 +159,22 @@ class Writer:
#默认使用第一个sheet #默认使用第一个sheet
#sheet=wb.get_sheet('Sheet1') #sheet=wb.get_sheet('Sheet1')
return return
#获取sheet页面 # 获取sheet页面
def get_sheets(self): def get_sheets(self):
#获取所有sheet的名字,并返回第一个列表 #获取所有sheet的名字,并返回第一个列表
sheets=self.workbook.sheet_names() sheets=self.workbook.sheet_names()
#print(sheets) #print(sheets)
return sheets return sheets
#切换sheet页面 # 切换sheet页面
def set_sheet(self,name): def set_sheet(self,name):
#通过sheet名字,切换sheet页面 #通过sheet名字,切换sheet页面
self.sheet=self.wb.get_sheet(name) self.sheet=self.wb.get_sheet(name)
return return
#写入指定单元格,保留原格式 # 写入指定单元格,保留原格式
def write(self,r,c,value): def write(self,r,c,value):
#获取写入的单元格 # 获取写入的单元格
def _getCell(sheet,r,c): def _getCell(sheet, r, c):
#获取行 # 获取行
row=sheet._Worksheet__rows.get(r) row=sheet._Worksheet__rows.get(r)
if not row: if not row:
return None return None
...@@ -190,32 +190,33 @@ class Writer: ...@@ -190,32 +190,33 @@ class Writer:
ncell=_getCell(self.sheet,r,c) ncell=_getCell(self.sheet,r,c)
if ncell: if ncell:
#设置写入后格式和写入前一样 #设置写入后格式和写入前一样
ncell.xf_idx=cell.xf_idx ncell.xf_idx = cell.xf_idx
return return
#保存 #保存
def save_close(self): def save_close(self):
#保存复制后的文件到硬盘 #保存复制后的文件到硬盘
self.wb.save(self.df) self.wb.save(self.df)
return return
#调试 #调试
if __name__ == '__main__': if __name__ == '__main__':
reader=Reader() reader=Reader()
reader.open_excle('/Users/niweizhong/Desktop/case1.xls') reader.open_excle('D:/dminterfacetest/repot/apitest.xls')
sheetname=reader.get_sheets() sheetname=reader.get_sheets()
api = []
for sheet in sheetname: for sheet in sheetname:
#设置当前读取的sheet页面 #设置当前读取的sheet页面
reader.set_sheet(sheet) reader.set_sheet(sheet)
for i in range(reader.rows): for i in range(reader.rows):
print(reader.readline())
line = reader.readline() line = reader.readline()
if line[10] == 'fail': if line[10] == 'fail':
api.append(line[4]) api.append(line[4])
print(api) print(api)
writer=Writer() # writer=Writer()
writer.copy_open('/Users/niweizhong/Desktop/case1.xls','/Users/niweizhong/Desktop/case2.xls') # writer.copy_open('/Users/niweizhong/Desktop/case1.xls','/Users/niweizhong/Desktop/case2.xls')
sheetname=writer.get_sheets() # sheetname=writer.get_sheets()
writer.set_sheet(sheetname[0]) # writer.set_sheet(sheetname[0])
writer.write(1,1,'William') # writer.write(1,1,'William')
writer.save_close() # writer.save_close()
# coding: UTF-8
'''
pip install DingtalkChatbot
'''
# 2019/7/22 created by dadu
from dingtalkchatbot.chatbot import DingtalkChatbot
import time
from common.Excel import Reader
import sys,os
runTime = time.strftime('%Y-%m-%d %H:%M:%S')
#created by 2019/7/24
def findModuleName(arr,str):
for index in range (len(arr)):
if str == arr[index]:
return 1
return 0
#https://oapi.dingtalk.com/robot/send?access_token=1ffb37a991040d9d58bc3e967fa563e379db22c448fd7440f747cd4c44d4db26
def findGroupDingAddr(str):
enterGroupModule = ["gic-store-service","gic-store-web","gic-store-web"]
memberGroupModule = ["g2Test1","g2Test2"]
marketGroupModule = ["api-open"]
haobanGruupModule = ["g4Test1","g4Test2"]
enterGroupDingAddr = 'https://oapi.dingtalk.com/robot/send?access_token=1ffb37a991040d9d58bc3e967fa563e379db22c448fd7440f747cd4c44d4db26'
memberGroupDingAddr = 'https://oapi.dingtalk.com/robot/send?access_token=2656a52b44f1e2d9ebbf5e57560b5a7c2619dc07cec35c62d19b8b252e24a064'
marketGroupDingAddr = 'https://oapi.dingtalk.com/robot/send?access_token=3e8fac3b110012fa44ae42235323deba5102fc439b62d87b7491c838b1948470'
haobanGruupDingAddr = 'https://oapi.dingtalk.com/robot/send?access_token=2656a52b44f1e2d9ebbf5e57560b5a7c2619dc07cec35c62d19b8b252e24a064'
err = "no group can be match,please check it"
if findModuleName(enterGroupModule,str) == 1:
return enterGroupDingAddr
elif findModuleName(memberGroupModule,str) == 1:
return memberGroupDingAddr
elif findModuleName(marketGroupModule,str) == 1:
return marketGroupDingAddr
elif findModuleName(haobanGruupModule,str) == 1:
return haobanGruupDingAddr
else :
print(err)
return 0
def resultTextFalse(reporturl,str):
# path = 'D:/workSpace/Python_workspace/apitest2.0_1.xls' # for local
path = '/opt/apitest/apiAutoByJenkins/repot/apitest1.xls' # for server
if os.path.exists(path):
readExp = Reader()
arrExpSingle = readExp.api_result(path)
# print(arrExpSingle)
expMsg = '\n'.join(arrExpSingle)
if(findGroupDingAddr(str)!=0):
groupDing = DingtalkChatbot(findGroupDingAddr(str))
groupDing.send_text(msg = "接口自动化执行异常结果\n执行时间:"+ runTime+ "\n执行异常接口列表:\n"+expMsg+"\n详细结果地址:"+reporturl)
else:
sys.exit(0)
else :
print("resultFile is not exist,please check it")
sys.exit(0)
def resultTextPass(reporturl,str):
if(findGroupDingAddr(str)!=0):
groupDing = DingtalkChatbot(findGroupDingAddr(str))
groupDing.send_text(msg = "接口自动化执行无误\n执行时间:"+ runTime+ "\n详细结果地址:"+reporturl)
else :
sys.exit(0)
def msgSend(passrate,reporturl,str):
#print(str)
if float(passrate)==100.00:
resultTextPass(reporturl,str)
else :
resultTextFalse(reporturl,str)
...@@ -3,19 +3,27 @@ ...@@ -3,19 +3,27 @@
from dingtalkchatbot.chatbot import DingtalkChatbot from dingtalkchatbot.chatbot import DingtalkChatbot
import time import time
from common.Excel import Reader from common.Excel import Reader
import sys import sys,os
qaTeamWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=2656a52b44f1e2d9ebbf5e57560b5a7c2619dc07cec35c62d19b8b252e24a064' qaTeamWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=2656a52b44f1e2d9ebbf5e57560b5a7c2619dc07cec35c62d19b8b252e24a064'
qaTeamDing = DingtalkChatbot(qaTeamWebhook) qaTeamDing = DingtalkChatbot(qaTeamWebhook)
runTime = time.strftime('%Y-%m-%d %H:%M:%S') runTime = time.strftime('%Y-%m-%d %H:%M:%S')
def resultTextFalse(reporturl): def resultTextFalse(reporturl):
#path = 'D:/workSpace/Python_workspace/apitestunit.xls' # pathSingle = 'D:/workSpace/Python_workspace/apitest2.0_1.xls' # for local
path = '/opt/apitest/apiAutoByJenkins/repot/apitest.xls' # pathMulti = 'D:/workSpace/Python_workspace/testcase2.0_1.xls' # for local
readExp = Reader() pathSingle = '/opt/apitest/apiAutoByJenkins/repot/apitest1.xls' # for server
arrExpSingle = readExp.api_result(path) pathMulti = '/opt/apitest/apiAutoByJenkins/repot/testcase1.xls' #for server
expMsg = '\n'.join(arrExpSingle) if (os.path.exists(pathSingle) or os.path.exists(pathMulti)):
qaTeamDing.send_text(msg = "接口自动化执行异常结果\n执行时间:"+ runTime+ "\n执行异常接口列表:\n"+expMsg+"\n详细结果地址:"+reporturl) readExp = Reader()
arrExpSingle = readExp.api_result(pathSingle)
arrExpMulti = readExp.api_result(pathMulti)
expSingleMsg = '\n'.join(arrExpSingle)
expMultiMsg = '\n'.join(arrExpMulti)
qaTeamDing.send_text(msg = "接口自动化执行异常结果\n执行时间:"+ runTime+ "\n执行异常接口列表:\n"+expSingleMsg+"\n"+expMultiMsg+"\n详细结果地址:"+reporturl)
else :
print("resultFile is not exist,please check it")
qaTeamDing.send_text(msg = "接口自动化执行异常结果\n执行时间:"+ runTime+ "\n详细结果地址:"+reporturl)
def resultTextPass(reporturl): def resultTextPass(reporturl):
qaTeamDing.send_text(msg = "接口自动化执行无误\n执行时间:"+ runTime+ "\n详细结果地址:"+reporturl) qaTeamDing.send_text(msg = "接口自动化执行无误\n执行时间:"+ runTime+ "\n详细结果地址:"+reporturl)
......
# encoding: utf-8
import unittest import unittest
import HTMLTestReportCN import HTMLTestReportCN
from common.Excel import Reader
import time import time
from common.Mail import sendResultMail from common.Mail import sendResultMail
from HTMLTestReportCN import TestResult from HTMLTestReportCN import TestResult
#from common.resultToDing import msgSend # from common.resultToDing import msgSend
from common.resultToDingForQateam import msgSend from common.resultToDingForQateam import msgSend
import sys import sys
#case_dir = 'D:/workSpace/Python_workspace/myPython/.idea/interfaceTest_ddt/case' '''
#report_dir = 'D:/workSpace/Python_workspace/myPython/.idea/interfaceTest_ddt/repot' by LiangChao
case_dir = '/opt/apitest/apiAutoByJenkins/case' at 2019/7/17
report_dir = '/opt/apitest/apiAutoByJenkins/repot' 用来执行所有test开头的py文件,并生成表报
'''
reader = Reader()
# case_dir = 'D:/workSpace/Python_workspace/myPython/.idea/interfaceTest_ddt_2.0/case' #for local
# report_dir = 'D:/workSpace/Python_workspace/myPython/.idea/interfaceTest_ddt_2.0/repot' #for local
case_dir = '/opt/apitest/apiAutoByJenkins/case' # for server by QA
#case_dir = '/opt/apitest/apiAutoTest/case' # for server by DEV
report_dir = '/opt/apitest/apiAutoByJenkins/repot' # for server
discover = unittest.defaultTestLoader.discover(case_dir,pattern='test*.py') discover = unittest.defaultTestLoader.discover(case_dir,pattern='test*.py')
#discover = unittest.defaultTestLoader.discover(case_dir,pattern='testcase.py') # discover = unittest.defaultTestLoader.discover(case_dir,pattern='testcase.py')
# discover = unittest.defaultTestLoader.discover(case_dir,pattern='testapi.py')
now = time.strftime('%Y%m%d%H%M%S') now = time.strftime('%Y%m%d%H%M%S')
fileName = '/'+now+'test_report.html' fileName = '/'+now+'test_report.html'
reportFileName = report_dir+fileName reportFileName = report_dir+fileName
report_url = "https://four.gicdev.com/apitest" report_url = "https://four.gicdev.com/apitest"
reportUrl = report_url+fileName reportUrl = report_url+fileName
with open(reportFileName, 'wb') as f: with open(reportFileName, 'wb') as f:
runner = HTMLTestReportCN.HTMLTestRunner(stream=f, title='接口自动化测试报告', description='接口测试用例执行情况', tester='jenkins') runner = HTMLTestReportCN.HTMLTestRunner(stream=f, title='接口自动化测试报告', description='接口测试用例执行情况', tester='jenkins')
runner.run(discover) runner.run(discover)
str=runner.getPassRate() # api = reader.api_result('D:/workSpace/Python_workspace/apitest2.0_1.xls')
passRateValue = str.split('%',1)[0] # print(api)
#msgSend(passRateValue,reportUrl,str)
#msgSend(passRateValue,reportUrl,sys.argv[1]) passRate = runner.getPassRate()
passRateValue = passRate.split('%',1)[0]
msgSend(passRateValue,reportUrl)
# msgSend(passRateValue,reportUrl,"gic-store-web")
# str = 'gic-store-web'
# msgSend(passRateValue,reportUrl,str)
# msgSend(passRateValue,reportUrl,sys.argv[1]) # for fangdong
msgSend(passRateValue,reportUrl) # for QaTeam
smtpSer ="smtp.exmail.qq.com" smtpSer ="smtp.exmail.qq.com"
# 测试服务器群发测试组成员 # 测试服务器群发测试组成员
...@@ -41,3 +60,4 @@ sendResultMail(smtpSer,sendUser,sendPwd,recvUser,reportFileName) ...@@ -41,3 +60,4 @@ sendResultMail(smtpSer,sendUser,sendPwd,recvUser,reportFileName)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment