Commit 11bc1f58 by zhangchunyao

code update 201907251552

parent 4f7530bb
......@@ -23,7 +23,7 @@ class Run(unittest.TestCase):
@classmethod
def setUp(self):
pass
def runner(self,line,http):
def runner(self,line,http,num):
if len(line[0])>0 or len(line[1])>0:
return
if line[3] == 'post':
......@@ -34,16 +34,31 @@ class Run(unittest.TestCase):
return result
if line[3] == 'assertquals':
result = http.assertquals(line[4], line[5])
return result
if result[1] == 'fail':
writer.write(num, 7, result[1])
try:
self.assertEqual(result[0][0], result[0][1])
except AssertionError as e:
print("预期与实际不符:{}".format(e))
raise
return result[1]
if line[3] == 'assertquals_int':
result = http.assertquals_int(line[4], line[5])
if result[1] == 'fail':
writer.write(num, 7, result[1])
try:
self.assertEqual(result[0][0], result[0][1])
except AssertionError as e:
print("预期与实际不符:{}".format(e))
raise
return result[1]
if line[3] == 'savejson':
result = http.savejson(line[4], line[5])
return result
if line[3] == 'clear_values':
http.clear_values()
return
if line[3] == 'assertquals_int':
result = http.assertquals_int(line[4], line[5])
return result
if line[3] == 'seturl':
result = http.seturl(line[4])
return result
......@@ -56,11 +71,12 @@ class Run(unittest.TestCase):
if line[3] =='get':
result = http.get(line[4], line[5], line[6])
return result
@ddt.data(*data)
def test_run(self,data):
result = self.runner(reader.readline_new(data[-1]), http)
result = self.runner(reader.readline_new(data[-1]), http, data[-1])
if result != None:
print(result)
#print(result)
results = json.dumps(result)
writer.write(data[-1], 7, results)
writer.save_close()
......
......@@ -20,13 +20,22 @@ class Run(unittest.TestCase):
pass
@ddt.data(*data)
def test_run(self,data):
result = inter.dosheet(br, bw, data)
result = inter.dosheet(br, bw, data[-1])
#print(result[0])
if result[1]:
if isinstance(result[1][0], int):
self.assertEqual(result[1][0], int(result[1][1]))
try:
self.assertEqual(result[1][0], int(result[1][1]), msg='\n检验的字段名称:{}\n请求的接口api:{}'.format(result[1][-2], result[1][-1]))
except AssertionError as e:
print("实际与预期不符:{}\n接口返回结果:\n{}".format(e, result[0]))
raise
else:
self.assertEqual(result[1][0], result[1][1])
print(result)
try:
self.assertEqual(result[1][0], result[1][1], msg='\n检验的字段名称:{}\n请求的接口api:{}'.format(result[1][-2], result[1][-1]))
except AssertionError as e:
print("实际与预期不符:{}\n接口返回结果:\n{}".format(e, result[0]))
raise
def tearDown(self):
inter.xlsx_save(bw, APIPATH)
......
......@@ -82,11 +82,16 @@ class Reader:
r.append(values)
return r
def sheet_cont(self,):
cont = self.sheet.nrows
r = []
for i in range(1,cont):
r.append(i)
for i in list(range(self.rows-1)):
# 从第二行取对应values值
value = self.sheet.row_values(i+1)
values = value[0:2]
values.append(i)
# self.key_num.append(i+2)
r.append(values)
return r
def readline_new(self,num):
......@@ -202,6 +207,11 @@ if __name__ == '__main__':
reader.set_sheet(sheet)
for i in range(reader.rows):
print(reader.readline())
line = reader.readline()
if line[10] == 'fail':
api.append(line[4])
print(api)
writer=Writer()
writer.copy_open('/Users/niweizhong/Desktop/case1.xls','/Users/niweizhong/Desktop/case2.xls')
sheetname=writer.get_sheets()
......
......@@ -22,10 +22,17 @@ class Http():
self.value = []
self.header = {}
def addheader(self,heaad):
heaads = json.loads(heaad)
for key in heaads.keys():
self.header[key] = heaads[key]
# def addheader(self,heaad):
# heaads = json.loads(heaad)
# for key in heaads.keys():
# self.header[key] = heaads[key]
# return self.header
def addheader(self,header):
"""添加头部信息"""
headers = json.loads(header)
# 循环取headers中value,存放至self.header中
for key in headers.keys():
self.header[key] = headers[key]
return self.header
def addcookies(self):
......@@ -58,17 +65,32 @@ class Http():
self.url = url
return self.url
# def post(self, api, default, variable=''):
# param = self.addparam(default, variable)
# if 'application/jso' in self.header:
# param1 = json.dumps(param)
# self.result = self.session.post(self.url + api,headers=self.header, data=param1, cookies=self.cookies)
# self.header['Content-Type'] = 'application/x-www-form-urlencoded'
# else:
# self.result = self.session.post(self.url + api, data=param, cookies=self.cookies)
# #self.cookies = self.result.cookies
# self.jsons = json.loads(self.result.text)
# #self.value = []
# return self.jsons
def post(self, api, default, variable=''):
"""post请求"""
# 调用self.addparam方法,生成入参参数
param = self.addparam(default, variable)
if 'application/jso' in self.header:
param1 = json.dumps(param)
self.result = self.session.post(self.url + api,headers=self.header, data=param1, cookies=self.cookies)
self.header['Content-Type'] = 'application/x-www-form-urlencoded'
if self.header != {}:
# if 'json' in self.header:
# param1 = json.dumps(param)
self.result = self.session.post(self.url + api, headers=self.header, data=param, cookies=self.cookies)
self.header = {}
else:
# 进行post请求,获取返回信息
self.result = self.session.post(self.url + api, data=param, cookies=self.cookies)
#self.cookies = self.result.cookies
self.jsons = json.loads(self.result.text)
#self.value = []
return self.jsons
def get(self,api,default, variable=''):
......@@ -78,32 +100,47 @@ class Http():
#self.cookies = self.result.cookies
self.jsons = json.loads(self.result.text)
return self.jsons
def timing(self,n):
time.sleep(n)
def savejson(self,key,value):
results = self.jsons[value]
self.params[key] = results
return self.params
def savecookies(self):
self.cookiess = self.cookies
return self.cookiess
def assertquals(self,key,values):
"""断言str类型"""
dic = self.jsons
value = jsonpath.jsonpath(dic, key)
if value[0] == values:
print('pass')
value.append(values)
if value[0] == value[1]:
return value, 'pass'
else:
print('fail')
return value, 'fail'
# if value[0] == values:
# print('pass')
# else:
# print('fail')
def assertquals_int(self,key,values):
"""断言int类型"""
dic = self.jsons
value = jsonpath.jsonpath(dic, key)
print(values)
if value[0] == int(values):
return 'pass'
value.append(int(values))
if value[0] == value[1]:
return value, 'pass'
else:
return 'fail'
return value, 'fail'
# print(values)
# if value[0] == int(values):
# return 'pass'
# else:
# return 'fail'
def __getparms(self,str):
for key in self.params.keys():
......@@ -173,8 +210,11 @@ class Http():
# 清除self.value内容,用于初始化
# def clear_values(self):
# self.value = []
# return self.value
def clear_values(self):
self.value = []
self.value.clear()
return self.value
# 数据库取值
......@@ -279,36 +319,27 @@ class InterFace():
object = slef.xlsx_getRow(brd_sheet, i)
result = slef.xlsx_request(object)
arr = []
if result != 0:
code = jsonpath.jsonpath(result, object["code"])
arr.append(code[0])
arr.append(object["expected_code"])
if result == 0:
print("此行是接口说明,跳过不执行")
elif type(object["expected_code"]) == type(1.0):
if result.get(object["code"]) == object["expected_code"]:
slef.xlsx_set(bwt_sheet, i, 10, "pass", False)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
else:
slef.xlsx_set(bwt_sheet, i, 10, "fail", True)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
else:
if str(result.get(object["code"])) == object["expected_code"]:
slef.xlsx_set(bwt_sheet, i, 10, "pass", False)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
code = jsonpath.jsonpath(result, object["code"])
if isinstance(code[0], int):
if code[0] == int(object["expected_code"]):
slef.xlsx_set(bwt_sheet, i, 10, "pass", False)
slef.writeResultDate(bwt_sheet, i, 11, result)
else:
slef.xlsx_set(bwt_sheet, i, 10, "fail", True)
slef.writeResultDate(bwt_sheet, i, 11, result)
else:
slef.xlsx_set(bwt_sheet, i, 10, "fail", True)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
if code[0] == object["expected_code"]:
slef.xlsx_set(bwt_sheet, i, 10, "pass", False)
slef.writeResultDate(bwt_sheet, i, 11, result)
else:
slef.xlsx_set(bwt_sheet, i, 10, "fail", True)
slef.writeResultDate(bwt_sheet, i, 11, result)
arr.append(code[0])
arr.append(object["expected_code"])
arr.append(object["code"])
arr.append(object["path"])
return result, arr
# coding: UTF-8
'''
pip install DingtalkChatbot
'''
# 2019/7/22 created by dadu
from dingtalkchatbot.chatbot import DingtalkChatbot
import time
from common.Excel import Reader
import sys
qaTeamWebhook = 'https://oapi.dingtalk.com/robot/send?access_token=2656a52b44f1e2d9ebbf5e57560b5a7c2619dc07cec35c62d19b8b252e24a064'
qaTeamDing = DingtalkChatbot(qaTeamWebhook)
runTime = time.strftime('%Y-%m-%d %H:%M:%S')
def resultTextFalse(reporturl):
#path = 'D:/workSpace/Python_workspace/apitestunit.xls'
path = '/opt/apitest/apiAutoByJenkins/repot/apitest.xls'
readExp = Reader()
arrExpSingle = readExp.api_result(path)
expMsg = '\n'.join(arrExpSingle)
qaTeamDing.send_text(msg = "接口自动化执行异常结果\n执行时间:"+ runTime+ "\n执行异常接口列表:\n"+expMsg+"\n详细结果地址:"+reporturl)
#path = 'D:/workSpace/Python_workspace/apitestunit.xls'
path = '/opt/apitest/apiAutoByJenkins/repot/apitest.xls'
readExp = Reader()
arrExpSingle = readExp.api_result(path)
expMsg = '\n'.join(arrExpSingle)
qaTeamDing.send_text(msg = "接口自动化执行异常结果\n执行时间:"+ runTime+ "\n执行异常接口列表:\n"+expMsg+"\n详细结果地址:"+reporturl)
def resultTextPass(reporturl):
qaTeamDing.send_text(msg = "接口自动化执行无误\n执行时间:"+ runTime+ "\n详细结果地址:"+reporturl)
qaTeamDing.send_text(msg = "接口自动化执行无误\n执行时间:"+ runTime+ "\n详细结果地址:"+reporturl)
def msgSend(passrate,reporturl):
if float(passrate)==100.00:
resultTextPass(reporturl)
else :
resultTextFalse(reporturl)
\ No newline at end of file
if float(passrate)==100.00:
resultTextPass(reporturl)
else :
resultTextFalse(reporturl)
\ No newline at end of file
......@@ -3,8 +3,9 @@ import HTMLTestReportCN
import time
from common.Mail import sendResultMail
from HTMLTestReportCN import TestResult
from common.resultToDing import msgSend
#from common.resultToDing import msgSend
from common.resultToDingForQateam import msgSend
import sys
#case_dir = 'D:/workSpace/Python_workspace/myPython/.idea/interfaceTest_ddt/case'
#report_dir = 'D:/workSpace/Python_workspace/myPython/.idea/interfaceTest_ddt/repot'
......@@ -23,8 +24,12 @@ with open(reportFileName, 'wb') as f:
str=runner.getPassRate()
passRateValue = str.split('%',1)[0]
#msgSend(passRateValue,reportUrl,str)
#msgSend(passRateValue,reportUrl,sys.argv[1])
msgSend(passRateValue,reportUrl)
smtpSer ="smtp.exmail.qq.com"
# 测试服务器群发测试组成员
sendUser ="apitest@demogic.com"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment