Commit ec46228e by zhangchunyao

code commit

parent 6a164efd
apiAutoByJenkins @ ee10ebab
Subproject commit ee10ebab9cbf5be98302a39730643b69c0421508
from common.Http import Http
from common.Excel import Reader, Writer
import unittest
import json,ddt
http = Http()
reader = Reader()
writer = Writer()
#reader.open_excle('D:/dminterfacetest/repot/testcase.xls')
reader.open_excle('/opt/apitest/apiAutoByJenkins/repot/testcase.xls')
sheetname = reader.get_sheets()
#writer.copy_open('D:/dminterfacetest/repot/testcase.xls', 'D:/dminterfacetest/repot/testcase1.xls')
writer.copy_open('/opt/apitest/apiAutoByJenkins/repot/testcase.xls', 'opt/apitest/apiAutoByJenkins/repot/testcase1.xls')
sheetname1 = writer.get_sheets()
writer.set_sheet(sheetname1[0])
for sheet in sheetname:
# 设置当前读取的sheet页面
reader.set_sheet(sheet)
data = reader.dict_data()
#print(data)
@ddt.ddt
class Run(unittest.TestCase):
@classmethod
def setUp(self):
pass
def runner(self,line,http):
if len(line[0])>0 or len(line[1])>0:
return
if line[3] == 'post':
result = http.post(line[4], line[5], line[6])
return result
if line[3] == 'addheader':
result = http.addheader(line[4])
return result
if line[3] == 'assertquals':
result = http.assertquals(line[4], line[5])
return result
if line[3] == 'savejson':
result = http.savejson(line[4], line[5])
return result
if line[3] == 'clear_values':
http.clear_values()
return
if line[3] == 'assertquals_int':
result = http.assertquals_int(line[4], line[5])
return result
if line[3] == 'seturl':
result = http.seturl(line[4])
return result
if line[3] == 'Addjson':
result = http.Addjson(line[4])
return result
if line[3] == 'Database_query':
result = http.Database_query(line[4],line[5])
return result
if line[3] =='get':
result = http.get(line[4], line[5], line[6])
return result
@ddt.data(*data)
def test_run(self,data):
result = self.runner(reader.readline_new(data[-1]), http)
if result != None:
print(result)
results = json.dumps(result)
writer.write(data[-1], 7, results)
writer.save_close()
if __name__ == '__main__':
run = Run()
run.test_run()
\ No newline at end of file
from common.Excel import Reader, Writer
from common.Http import InterFace
from xlutils.copy import copy
import unittest
import json
import ddt
reader = Reader()
writer = Writer()
inter = InterFace()
APIPATH = '/opt/apitest/apiAutoByJenkins/repot/apitest.xls'
#APIPATH = 'D:/workSpace/Python_workspace/apitestunit.xls'
br = reader.open_excle(APIPATH)
sheetname = reader.get_sheets()
bw = copy(br)
data = reader.sheet_cont()
@ddt.ddt
class Run(unittest.TestCase):
@classmethod
def setUp(self):
pass
@ddt.data(*data)
def test_run(self,data):
result = inter.dosheet(br, bw, data)
print(result)
def tearDown(self):
inter.xlsx_save(bw, APIPATH)
if __name__ == '__main__':
run = Run()
run.test_run()
\ No newline at end of file
import os,xlrd
from xlutils.copy import copy
class Reader:
'''
by LiangChao
at 2019/7/11
用来读取Excel文件内容
'''
def __init__(self):
#整个excel工作簿缓存
self.workbook=None
#当前工作簿sheet
self.sheet=None
#当前sheet的行数
self.rows=0
#当前读取到的行数
self.cols = 0
self.r=0
self.key_num = []
#打开Excel
def open_excle(self,srcfile):
#如果打开的文件不存在,就报错
if not os.path.isfile(srcfile):
print("error:%s not exist!" % (srcfile))
return
#设置读取exel使用utf8编码
xlrd.Book.encoding="utf8"
#读取EXCEL内容到缓存wordbook
self.workbook=xlrd.open_workbook(filename=srcfile)
#选取第一个sheet页面
self.sheet=self.workbook.sheet_by_index(0)
#设置rows为当前sheet行数
self.rows=self.sheet.nrows
self.cols = self.sheet.ncols
#设置默认读取为第一行
self.r=0
return self.workbook
#获取sheet页面
def get_sheets(self):
#通过sheet名字,切换sheet页面
sheets=self.workbook.sheet_names()
return sheets
#切换sheet页面
def set_sheet(self,name):
#通过sheet名字,切换sheet页面
self.sheet=self.workbook.sheet_by_name(name)
self.rows=self.sheet.nrows
self.r=0
return
#逐行读取
def readline(self):
row1=None
#如果当前还没到最后一行,则往下读取一行
if self.r<self.rows:
#读取第r行的内容
row=self.sheet.row_values(self.r)
#设置下一次读取r的下一行
self.r=self.r+1
#辅助遍历行里面的列
i=0
row1=row
#把读取的数据都变成字符串
for strs in row:
row1[i]=str(strs)
i=i+1
return row1
def dict_data(self):
if self.rows <= 1:
print("总行数小于1")
else:
r = []
for i in list(range(self.rows - 1)):
# 从第二行取对应values值
value = self.sheet.row_values(i+1)
values = value[2:4]
if values[0] !='' :
values.append(i+1)
#self.key_num.append(i+2)
r.append(values)
return r
def sheet_cont(self,):
cont = self.sheet.nrows
r = []
for i in range(1,cont):
r.append(i)
return r
def readline_new(self,num):
# num = self.key_num
row1 = None
# 如果当前还没到最后一行,则往下读取一行
if num < self.rows:
# 读取第r行的内容
row = self.sheet.row_values(num)
# 辅助遍历行里面的列
i = 0
row1 = row
# 把读取的数据都变成字符串
for strs in row:
row1[i] = str(strs)
i = i + 1
return row1
class Writer:
'''
by kongwen 2019/7/8 用来复制写入excel
'''
def __init__(self):
#读取需要的复制excel
self.workbook=None
#拷贝的工作簿
self.wb=None
#当前工作的sheet页
self.sheet=None
#记录生成的文件,用来保存
self.df=None
#记录写入的行
self.row=0
#记录写入的列
self.clo=0
#复制并打开excel
def copy_open(self,srcfile,dstfile):
#判断要复制的文件是否存在
if not os.path.isfile(srcfile):
print(srcfile+" not exist")
return
#判断要新建的文件是否存在,存在则提示
if os.path.isfile(dstfile):
print("warning:"+dstfile+"file already exist")
#记录要保存的文件
self.df=dstfile
#读取EXCEL到缓存
#formatting_info带格式的复制
self.workbook=xlrd.open_workbook(filename=srcfile,formatting_info=True)
#拷贝
self.wb=copy(self.workbook)
#默认使用第一个sheet
#sheet=wb.get_sheet('Sheet1')
return
#获取sheet页面
def get_sheets(self):
#获取所有sheet的名字,并返回第一个列表
sheets=self.workbook.sheet_names()
#print(sheets)
return sheets
#切换sheet页面
def set_sheet(self,name):
#通过sheet名字,切换sheet页面
self.sheet=self.wb.get_sheet(name)
return
#写入指定单元格,保留原格式
def write(self,r,c,value):
#获取写入的单元格
def _getCell(sheet,r,c):
#获取行
row=sheet._Worksheet__rows.get(r)
if not row:
return None
#获取单元格
cell=row._Row__cells.get(c)
return cell
#获取要写入的单元格
cell=_getCell(self.sheet,r,c)
#写入值
self.sheet.write(r,c,value)
if cell:
#获取要写入的单元格
ncell=_getCell(self.sheet,r,c)
if ncell:
#设置写入后格式和写入前一样
ncell.xf_idx=cell.xf_idx
return
#保存
def save_close(self):
#保存复制后的文件到硬盘
self.wb.save(self.df)
return
#调试
if __name__ == '__main__':
reader=Reader()
reader.open_excle('/Users/niweizhong/Desktop/case1.xls')
sheetname=reader.get_sheets()
for sheet in sheetname:
#设置当前读取的sheet页面
reader.set_sheet(sheet)
for i in range(reader.rows):
print(reader.readline())
writer=Writer()
writer.copy_open('/Users/niweizhong/Desktop/case1.xls','/Users/niweizhong/Desktop/case2.xls')
sheetname=writer.get_sheets()
writer.set_sheet(sheetname[0])
writer.write(1,1,'William')
writer.save_close()
import requests
import json,time, xlrd, xlwt, textwrap
import jsonpath
import pymysql
from urllib.parse import urlencode
class Http():
'''
by LiangChao
at 2019/7/11
封装关键字
'''
def __init__(self):
self.url =''
self.session = requests.session()
self.result = None
self.jsons = {}
self.params = {}
self.cookies = {
'Hm_lvt_47362f54f594efad07774d01ad0858d5': '1559783995,1560069495,1560079873,1560735612',
'web_login_cookie_id': 'fefd1c81641711e69d0818c58a146fd2',
'Hm_lpvt_47362f54f594efad07774d01ad0858d5': '1560844337'}
self.value = []
self.header = {}
def addheader(self,heaad):
heaads = json.loads(heaad)
for key in heaads.keys():
self.header[key] = heaads[key]
return self.header
def addcookies(self):
self.session.cookies = self.savecookies()
return self.session.cookies
def addparam(self,paramters='',str=''):
if paramters =='':
paramters ='{}'
self.params = json.loads(paramters)
for i in self.params.keys():
if i == 'transId' or i == 'timestamp':
a = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.params[i] = self.params[i] + a
#self.params = paramters
if str == '':
return self.params
else:
key = json.loads(str)
keylist = key['params']
i = 0
value_list = self.value
for k in keylist:
self.params[k] = value_list[i]
i = i+1
#print(self.params)
return self.params
def seturl(self,url):
self.url = url
return self.url
def post(self, api, default, variable=''):
param = self.addparam(default, variable)
if 'application/jso' in self.header:
param1 = json.dumps(param)
self.result = self.session.post(self.url + api,headers=self.header, data=param1, cookies=self.cookies)
self.header['Content-Type'] = 'application/x-www-form-urlencoded'
else:
self.result = self.session.post(self.url + api, data=param, cookies=self.cookies)
#self.cookies = self.result.cookies
self.jsons = json.loads(self.result.text)
#self.value = []
return self.jsons
def get(self,api,default, variable=''):
param = self.addparam(default, variable)
paramts = urlencode(param)
self.result = self.session.get(self.url + api + paramts)
#self.cookies = self.result.cookies
self.jsons = json.loads(self.result.text)
return self.jsons
def timing(self,n):
time.sleep(n)
def savejson(self,key,value):
results = self.jsons[value]
self.params[key] = results
return self.params
def savecookies(self):
self.cookiess = self.cookies
return self.cookiess
def assertquals(self,key,values):
dic = self.jsons
value = jsonpath.jsonpath(dic, key)
if value[0] == values:
print('pass')
else:
print('fail')
def assertquals_int(self,key,values):
dic = self.jsons
value = jsonpath.jsonpath(dic, key)
print(values)
if value[0] == int(values):
return 'pass'
else:
return 'fail'
def __getparms(self,str):
for key in self.params.keys():
str = str.replace('{' + key + '}', self.params[key])
print(str)
return str
def dict_get(self, locators, dic=None, default=None):
'''
:param dic: 输入需要在其中取值的原始字典 <dict>
:param locators: 输入取值定位器, 如:['result', 'msg', '-1', 'status'] <list>
:param default: 进行取值中报错时所返回的默认值 (default: None)
:return: 返回根据参数locators找出的值
'''
if dic == None:
dic = self.jsons
else:
dic = dic
if not type(locators) in [dict, list]:
a = json.loads(locators)
locators = a['getvalue']
if not isinstance(dic, dict) or not isinstance(locators, list):
return default
value = None
for locator in locators:
if not type(value) in [dict, list] and isinstance(locator, str) and not self.can_convert_to_int(locator):
try:
value = dic[locator]
except KeyError:
return default
continue
if isinstance(value, dict):
try:
value = self.dict_get([locator],value)
except KeyError:
return default
continue
if isinstance(value, list) and self.can_convert_to_int(locator):
try:
value = value[int(locator)]
except IndexError:
return default
continue
if not type(value) in [dict, list]:
self.value.append(value)
#print(value)
return value
# jsonpath 取值
def Addjson(self, locators):
dic = self.jsons
value = jsonpath.jsonpath(dic, locators)
#print(value)
self.value.append(value[0])
return value
def can_convert_to_int(self,input):
try:
int(input)
return True
except BaseException:
return False
# 清除self.value内容,用于初始化
def clear_values(self):
self.value = []
return self.value
# 数据库取值
def Database_query(self,dbname,sql):
conn = pymysql.connect(host="56cbb9f62fac6.sh.cdb.myqcloud.com",
user="cdb_outerroot",
password="@09ui%sbc09",
port=6634,
database=dbname,
charset="utf8")
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
# 定义要执行的SQL语句
# 执行SQL语句
cursor.execute(sql)
res = cursor.fetchall()
# 关闭光标对象
cursor.close()
# 关闭数据库连接
conn.close()
for key in res[0]:
self.value.append(key)
return self.value
class InterFace():
def __init__(self):
pass
# 打开文件
def xlsx_open(slef,filepath):
book = xlrd.open_workbook(filepath)
return book
def xlsx_getRow(slef,sheet, row):
object = {}
object["method"] = sheet.cell_value(row, 2)
object["host"] = sheet.cell_value(row, 3)
object["path"] = sheet.cell_value(row, 4)
object["url"] = object["host"] + object["path"]
object["params"] = sheet.cell_value(row, 5)
object["headers"] = sheet.cell_value(row, 6)
object["cookies"] = sheet.cell_value(row, 7)
object["code"] = sheet.cell_value(row, 8)
object["expected_code"] = sheet.cell_value(row, 9)
# print(object["cookies"])
return object
def xlsx_request(slef,object):
headers = {"Content-Type":"application/x-www-form-urlencoded"}
cookies = {
'Hm_lvt_47362f54f594efad07774d01ad0858d5': '1559783995,1560069495,1560079873,1560735612',
'web_login_cookie_id': 'fefd1c81641711e69d0818c58a146fd2',
'Hm_lpvt_47362f54f594efad07774d01ad0858d5': '1560844337'
}
if object["method"] == "post":
dic = json.loads(object["params"])
params1 = dic
#params1 = object["params"]
#print(params1)
if object["headers"]!="":
if 'application/jso' in object["headers"]:
params1 = json.dumps(object["params"])
#print(params1)
head=object["headers"]
header=json.loads(head)
response = requests.post(object["url"], params1,headers=header,cookies=cookies)
results = response.json()
else:
response = requests.post(object["url"], params1,cookies=cookies)
results = response.json()
result = results
elif object["method"] == "get":
print(object["params"])
dic = json.loads(object["params"])
params1 = urlencode(dic)
response = requests.get(object["url"], params1)
result = response.json()
else:
return 0
return result
def xlsx_set(slef,sheet, row, col, value, red=False):
style = "font:colour_index red;"
if red == False:
sheet.write(row, col, value)
else:
sheet.write(row, col, value, xlwt.easyxf(style))
def xlsx_save(slef,book, filepath):
book.save(filepath)
# 2019/6/24 update by datu
def writeResultDate(slef,bwt_sheet, inrow, incol, result):
# incol=11
# len(textwrap.wrap(text='abcdefghi', width=1))
for var in textwrap.wrap(text=json.dumps(result, ensure_ascii=False), width=30000):
slef.xlsx_set(bwt_sheet, inrow, incol, var, False)
incol = incol + 1
def dosheet(slef,brd, bwt, i):
brd_sheet = brd.sheets()[0]
bwt_sheet = bwt.get_sheet(0)
object = slef.xlsx_getRow(brd_sheet, i)
result = slef.xlsx_request(object)
if result == 0:
print("此行是接口说明,跳过不执行")
elif type(object["expected_code"]) == type(1.0):
if result.get(object["code"]) == object["expected_code"]:
slef.xlsx_set(bwt_sheet, i, 10, "pass", False)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
else:
slef.xlsx_set(bwt_sheet, i, 10, "fail", True)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
else:
if str(result.get(object["code"])) == object["expected_code"]:
slef.xlsx_set(bwt_sheet, i, 10, "pass", False)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
else:
slef.xlsx_set(bwt_sheet, i, 10, "fail", True)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
return result
#coding=utf-8
import smtplib
from email.mime.text import MIMEText #MIMRText()定义邮件正文
from email.mime.multipart import MIMEMultipart #MIMEMulipart模块构造带附
def sendResultMail(smtpSever,sendUser,sendPwd,recvUser,reportName):
#发送邮件的服务器
#smtpserver = 'smtp.sina.com'
smtpserver =smtpSever
#发送邮件用户和密码
#user ="xxx@sina.com"
#password = "xxx"
user = sendUser
password = sendPwd
#发送者
#sender = "xxx@sina.com"
sender = sendUser
#接收者
#receiver = "1xxx@qq.com"
receiver = recvUser
#邮件主题
subject = "接口自动化执行结果"
#发送附件
sendfile = open(reportName,"rb").read()
att = MIMEText(sendfile,"base64","utf-8")
att["Content-Type"] = "application/octet-stream"
att["Content-Disposition"] = "attachment;filename = 'autoInterfaceResult.html'"
#msghtml = MIMEText('<h1 style="color:red;font-size:100px">各位好,附件为接口自动化测试详细结果,请查阅</h1><img src="cid:small">', 'html', 'utf-8')
msghtml = MIMEText('<h2 style="color:red;font-size:50px">各位好,附件为接口自动化测试详细结果,请查阅</h2>', 'html', 'utf-8')
#msg = MIMEText('接口自动化测试结果', 'plain', 'utf-8')
msgRoot = MIMEMultipart('related')
msgRoot['Subject'] = subject
msgRoot['from'] = "测试服务器"
msgRoot['To'] = receiver
msgRoot.attach(att)
msgRoot.attach(msghtml)
#msgRoot.attach(msg)
smtp = smtplib.SMTP()
smtp.connect(smtpserver)
smtp.login(user,password)
smtp.sendmail(sender,receiver,msgRoot.as_string())
smtp.quit()
import unittest
import HTMLTestReportCN
import time
from common.Mail import sendResultMail
#case_dir = 'D:/workSpace/Python_workspace/myPython/.idea/interfaceTest_ddt/case'
#report_dir = 'D:/workSpace/Python_workspace/myPython/.idea/interfaceTest_ddt/repot'
case_dir = '/opt/apitest/apiAutoByJenkins/case'
report_dir = '/opt/apitest/apiAutoByJenkins/repot'
discover = unittest.defaultTestLoader.discover(case_dir,pattern='test*.py')
now = time.strftime('%Y-%m-%d %H_%M_%S')
report_name = report_dir+'/'+now+' test_report.html'
with open(report_name, 'wb') as f:
runner = HTMLTestReportCN.HTMLTestRunner(stream=f, title='接口自动化测试报告', description='接口测试用例执行情况', tester='jenkins')
runner.run(discover)
smtpSer ="smtp.exmail.qq.com"
# 测试服务器群发测试组成员
sendUser ="apitest@demogic.com"
sendPwd ="Api@201906"
recvUser ="test@demogic.com"
#recvUser = "zhangchunyao@demogic.com"
sendResultMail(smtpSer,sendUser,sendPwd,recvUser,report_name)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment