1上传新的接口测试方法 2.上传接口测试用例

This commit is contained in:
byb11
2021-04-28 18:13:54 +08:00
parent ff24ca2b53
commit 253793312b
4 changed files with 208 additions and 22 deletions

View File

@@ -22,23 +22,24 @@ def FieldValidation(responsedict, targetlist):
responsekeys = getKeys(response)
# 判断目的条件的Key在数据中是否存在
if target[0] in responsekeys:
#targetkey 判断的字段
targetkey = target[0]
# 判断条件
conditions = target[1]
# 返回数据中对应key的Value列表
responsevaluelist = getjsonvalue(response,target[0])
for responsevalue in responsevaluelist:
#判断value值是否为列表转化为字符串
if isinstance(responsevalue, list):
responsevalue=str(responsevalue)
if len(target) == 3:
targetvalue = target[2]
p = conditional(conditions, responsevalue, targetkey, sum, targetvalue)
strlist.append(p)
elif len(target) == 2:
p = conditional(conditions, responsevalue, targetkey, sum)
strlist.append(p)
if len(target) != 1:
#targetkey 判断的字段
targetkey = target[0]
# 判断条件
conditions = target[1]
# 返回数据中对应key的Value列表
responsevaluelist = getjsonvalue(response,target[0])
for responsevalue in responsevaluelist:
#判断value值是否为列表转化为字符串
if isinstance(responsevalue, list):
responsevalue=str(responsevalue)
if len(target) == 3:
targetvalue = target[2]
p = conditional(conditions, responsevalue, targetkey, sum, targetvalue)
strlist.append(p)
elif len(target) == 2:
p = conditional(conditions, responsevalue, targetkey, sum)
strlist.append(p)
else:
str2 = "返回数据第" + str(sum) + "组数据中不存在该字段:" + target[0]
print(str2)

View File

@@ -270,7 +270,7 @@ def distributed_query(logurl, token):
url = logurl # url示例http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M
headers = {"Content-Type": "application/json", "Authorization": token}
response = requests.get(url=url, headers=headers)
code = response1.json()["code"]
code = response.json()["code"]
print(response.json())
assert code == 200
print(response.json()["code"])
@@ -297,5 +297,54 @@ def timedistribution(logurl, token, starttime, endtime, logtype, granularity, fi
print(response1.json()["code"])
assert code == 200
return response1.json()
# 日志总数查询
def countlog_query(logurl, token, starttime, endtime, logtype):
url = logurl
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"pageSize": 20,
"logType": logtype,
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"filter": ""
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
print(response1.json()["code"])
assert code == 200
return response1.json()
# 日志导出接口
def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
a = schema(schemauerl, token)
fields = a["data"]["fields"]
print(fields)
url = logurl
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": filtervalue
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
a=type(response1)
if a != "class 'requests.models.Response'":
assert 1 == 1
else:
assert 1 == 2
# if __name__ == '__main__':
# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log")