This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
dongxiaoyan-tsg-autotest/04-CustomLibrary/Custometest/Schema.py

204 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#/user/bin/python
#-*-coding:utf-8-*-
import requests
import random
import json
import allure
number = random.randint(0,100000)
str = random.choice('abcdefghijklmnopqrstuvwxyz')
list = []
#请求schema接口得到返回数据用于其他接口
def schema(schemauerl,token):
url = schemauerl #"http://192.168.44.72:8080/v1/log/schema?logType=security_event_log"
headers = {"Content-Type":"application/x-www-form-urlencoded","Authorization":token}
response = requests.get(url=url,headers=headers)
return response.json()
#根据schema接口返回数据得出所有属性所支持的比较类型的列表
def Filter1(schemauerl,token):
json_str = schema(schemauerl,token)
print(type(json_str))
fields = json_str["data"]["fields"]
operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"]
for i in fields:
name = i["name"]
doc = i["doc"]
if doc == None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v=="in" or v == "not in":
str1 = name + " "+v + " "+ "(" +f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "not empty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '"+ value1 + " '"
list.append(str1)
else:
if i["doc"]["constraints"]== None:
type1 = i["type"]
for j in operator:
if type1==j["type"]:
if type1=="int" or type1=="long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1=="string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "not empty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
if i["doc"]["constraints"]["operator_functions"]==None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in" :
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "not empty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
type1 = i["type"]
operator1 = i["doc"]["constraints"]["operator_functions"]
operator2 = operator1.split(",")
data = i["doc"]["data"]
for d in data:
code = d["code"]
if type1 == "int" or type1 == "long":
for o in operator2:
str1 = name + " "+ o + " "+ code
list.append(str1)
else:
for o in operator2:
str1 = name + " "+ o + " "+ " '" + code + " '"
list.append(str1)
print(list)
return list
#根据Filter1方法中的的数据写入log请求接口中来验证log请求接口
def logapiverify(logurl,schemauerl,token,starttime,endtime,logtype):
filter2 = Filter1(schemauerl,token)
a = schema(schemauerl,token)
fields = a["data"]["fields"]
print(fields)
str2 = ""
for i in filter2:
str2 = str2 + i + " " + "and" + " "
url = logurl #"http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time":starttime,
"end_common_recv_time":endtime,
"logType":logtype,
"fields":fields,
"filter":i
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data),headers=headers)
code = response1.json()["code"]
assert code == 200
print(response1.json()["code"])
return response1.json()
print(str2)
str3 = str2[0:-4]
print(str3)
url = logurl # "http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": str3
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
assert code == 200
print(response1.json()["code"])
#精确filter请求日志接口
def loglistverify(logurl,schemauerl,token,starttime,endtime,logtype,filtervalue):
a = schema(schemauerl,token)
fields = a["data"]["fields"]
print(fields)
url = logurl # "http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": filtervalue
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
assert code == 200
print(response1.json()["code"])
return response1.json()
#if __name__ == '__main__':
# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log")