Commit 1565e125 by zhangchunyao

Delete Http.py

parent 571bd4ad
import requests
import json,time, xlrd, xlwt, textwrap
import jsonpath
import pymysql
from urllib.parse import urlencode
class Http():
'''
by LiangChao
at 2019/7/11
封装关键字
'''
def __init__(self):
self.url =''
self.session = requests.session()
self.result = None
self.jsons = {}
self.params = {}
self.cookies = {
'Hm_lvt_47362f54f594efad07774d01ad0858d5': '1559783995,1560069495,1560079873,1560735612',
'web_login_cookie_id': 'fefd1c81641711e69d0818c58a146fd2',
'Hm_lpvt_47362f54f594efad07774d01ad0858d5': '1560844337'}
self.value = []
self.header = {}
def addheader(self,heaad):
heaads = json.loads(heaad)
for key in heaads.keys():
self.header[key] = heaads[key]
return self.header
def addcookies(self):
self.session.cookies = self.savecookies()
return self.session.cookies
def addparam(self,paramters='',str=''):
if paramters =='':
paramters ='{}'
self.params = json.loads(paramters)
for i in self.params.keys():
if i == 'transId' or i == 'timestamp':
a = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.params[i] = self.params[i] + a
#self.params = paramters
if str == '':
return self.params
else:
key = json.loads(str)
keylist = key['params']
i = 0
value_list = self.value
for k in keylist:
self.params[k] = value_list[i]
i = i+1
#print(self.params)
return self.params
def seturl(self,url):
self.url = url
return self.url
def post(self, api, default, variable=''):
param = self.addparam(default, variable)
if 'application/jso' in self.header:
param1 = json.dumps(param)
self.result = self.session.post(self.url + api,headers=self.header, data=param1, cookies=self.cookies)
self.header['Content-Type'] = 'application/x-www-form-urlencoded'
else:
self.result = self.session.post(self.url + api, data=param, cookies=self.cookies)
#self.cookies = self.result.cookies
self.jsons = json.loads(self.result.text)
#self.value = []
return self.jsons
def get(self,api,default, variable=''):
param = self.addparam(default, variable)
paramts = urlencode(param)
self.result = self.session.get(self.url + api + paramts)
#self.cookies = self.result.cookies
self.jsons = json.loads(self.result.text)
return self.jsons
def timing(self,n):
time.sleep(n)
def savejson(self,key,value):
results = self.jsons[value]
self.params[key] = results
return self.params
def savecookies(self):
self.cookiess = self.cookies
return self.cookiess
def assertquals(self,key,values):
dic = self.jsons
value = jsonpath.jsonpath(dic, key)
if value[0] == values:
print('pass')
else:
print('fail')
def assertquals_int(self,key,values):
dic = self.jsons
value = jsonpath.jsonpath(dic, key)
print(values)
if value[0] == int(values):
return 'pass'
else:
return 'fail'
def __getparms(self,str):
for key in self.params.keys():
str = str.replace('{' + key + '}', self.params[key])
print(str)
return str
def dict_get(self, locators, dic=None, default=None):
'''
:param dic: 输入需要在其中取值的原始字典 <dict>
:param locators: 输入取值定位器, 如:['result', 'msg', '-1', 'status'] <list>
:param default: 进行取值中报错时所返回的默认值 (default: None)
:return: 返回根据参数locators找出的值
'''
if dic == None:
dic = self.jsons
else:
dic = dic
if not type(locators) in [dict, list]:
a = json.loads(locators)
locators = a['getvalue']
if not isinstance(dic, dict) or not isinstance(locators, list):
return default
value = None
for locator in locators:
if not type(value) in [dict, list] and isinstance(locator, str) and not self.can_convert_to_int(locator):
try:
value = dic[locator]
except KeyError:
return default
continue
if isinstance(value, dict):
try:
value = self.dict_get([locator],value)
except KeyError:
return default
continue
if isinstance(value, list) and self.can_convert_to_int(locator):
try:
value = value[int(locator)]
except IndexError:
return default
continue
if not type(value) in [dict, list]:
self.value.append(value)
#print(value)
return value
# jsonpath 取值
def Addjson(self, locators):
dic = self.jsons
value = jsonpath.jsonpath(dic, locators)
#print(value)
self.value.append(value[0])
return value
def can_convert_to_int(self,input):
try:
int(input)
return True
except BaseException:
return False
# 清除self.value内容,用于初始化
def clear_values(self):
self.value = []
return self.value
# 数据库取值
def Database_query(self,dbname,sql):
conn = pymysql.connect(host="56cbb9f62fac6.sh.cdb.myqcloud.com",
user="cdb_outerroot",
password="@09ui%sbc09",
port=6634,
database=dbname,
charset="utf8")
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
# 定义要执行的SQL语句
# 执行SQL语句
cursor.execute(sql)
res = cursor.fetchall()
# 关闭光标对象
cursor.close()
# 关闭数据库连接
conn.close()
for key in res[0]:
self.value.append(key)
return self.value
class InterFace():
def __init__(self):
pass
# 打开文件
def xlsx_open(slef,filepath):
book = xlrd.open_workbook(filepath)
return book
def xlsx_getRow(slef,sheet, row):
object = {}
object["method"] = sheet.cell_value(row, 2)
object["host"] = sheet.cell_value(row, 3)
object["path"] = sheet.cell_value(row, 4)
object["url"] = object["host"] + object["path"]
object["params"] = sheet.cell_value(row, 5)
object["headers"] = sheet.cell_value(row, 6)
object["cookies"] = sheet.cell_value(row, 7)
object["code"] = sheet.cell_value(row, 8)
object["expected_code"] = sheet.cell_value(row, 9)
# print(object["cookies"])
return object
def xlsx_request(slef,object):
headers = {"Content-Type":"application/x-www-form-urlencoded"}
cookies = {
'Hm_lvt_47362f54f594efad07774d01ad0858d5': '1559783995,1560069495,1560079873,1560735612',
'web_login_cookie_id': 'fefd1c81641711e69d0818c58a146fd2',
'Hm_lpvt_47362f54f594efad07774d01ad0858d5': '1560844337'
}
if object["method"] == "post":
dic = json.loads(object["params"])
params1 = dic
#params1 = object["params"]
#print(params1)
if object["headers"]!="":
if 'application/jso' in object["headers"]:
params1 = json.dumps(object["params"])
#print(params1)
head=object["headers"]
header=json.loads(head)
response = requests.post(object["url"], params1,headers=header,cookies=cookies)
results = response.json()
else:
response = requests.post(object["url"], params1,cookies=cookies)
results = response.json()
result = results
elif object["method"] == "get":
print(object["params"])
dic = json.loads(object["params"])
params1 = urlencode(dic)
response = requests.get(object["url"], params1)
result = response.json()
else:
return 0
return result
def xlsx_set(slef,sheet, row, col, value, red=False):
style = "font:colour_index red;"
if red == False:
sheet.write(row, col, value)
else:
sheet.write(row, col, value, xlwt.easyxf(style))
def xlsx_save(slef,book, filepath):
book.save(filepath)
# 2019/6/24 update by datu
def writeResultDate(slef,bwt_sheet, inrow, incol, result):
# incol=11
# len(textwrap.wrap(text='abcdefghi', width=1))
for var in textwrap.wrap(text=json.dumps(result, ensure_ascii=False), width=30000):
slef.xlsx_set(bwt_sheet, inrow, incol, var, False)
incol = incol + 1
def dosheet(slef,brd, bwt, i):
brd_sheet = brd.sheets()[0]
bwt_sheet = bwt.get_sheet(0)
object = slef.xlsx_getRow(brd_sheet, i)
result = slef.xlsx_request(object)
if result == 0:
print("此行是接口说明,跳过不执行")
elif type(object["expected_code"]) == type(1.0):
if result.get(object["code"]) == object["expected_code"]:
slef.xlsx_set(bwt_sheet, i, 10, "pass", False)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
else:
slef.xlsx_set(bwt_sheet, i, 10, "fail", True)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
else:
if str(result.get(object["code"])) == object["expected_code"]:
slef.xlsx_set(bwt_sheet, i, 10, "pass", False)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
else:
slef.xlsx_set(bwt_sheet, i, 10, "fail", True)
# xlsx_set(bwt_sheet, i, 11, json.dumps(result,ensure_ascii=False), False)
inRow = i
inCol = 11
slef.writeResultDate(bwt_sheet, inRow, inCol, result)
return result
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment