This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
dongxiaoyan-tsg-autotest/04-CustomLibrary/Custometest/Schema.py
dongxiaoyan ab634e5c63 1、注释无用内容;
2、添加拼接查询条件要求,@流浪远方需根据要求完善代码;
3、增加有目标精确查询返回结果校验
2021-03-29 09:30:27 +08:00

214 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#/user/bin/python
#-*-coding:utf-8-*-
import requests
import random
import json
#import allure
number = random.randint(0,100000)
str = random.choice('abcdefghijklmnopqrstuvwxyz')
list = []
#请求schema接口得到返回数据用于其他接口
def schema(schemauerl,token):
url = schemauerl #"http://192.168.44.72:8080/v1/log/schema?logType=security_event_log"
headers = {"Content-Type":"application/x-www-form-urlencoded","Authorization":token}
response = requests.get(url=url,headers=headers)
return response.json()
#根据schema接口返回数据得出所有属性所支持的比较类型的列表
#1、根据[doc][allow_query]值为true列支持搜索
#2、如有[doc][constraints][operator_functions]值,操作优先;
#3、如有[doc][data]值则对应属性取值为data所列code值
#4、int和long的范围不一致
#5、string要包含特殊字符
#6、给查询条件赋值要给出边界和正常值
#7、IPV4、V6和URL要给出专门的方法生成
def Filter1(schemauerl,token):
json_str = schema(schemauerl,token)
print(type(json_str))
#获取日志属性定义
fields = json_str["data"]["fields"]
#获取不同属性支持的部不同操作
operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"]
for i in fields:
name = i["name"]
doc = i["doc"]
#获取无任何特殊说明列:
if doc == None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v=="in" or v == "not in":
str1 = name + " "+v + " "+ "(" +f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "not empty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '"+ value1 + " '"
list.append(str1)
else:
if i["doc"]["constraints"]== None:
type1 = i["type"]
for j in operator:
if type1==j["type"]:
if type1=="int" or type1=="long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1=="string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "not empty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
if i["doc"]["constraints"]["operator_functions"]==None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in" :
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "not empty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
type1 = i["type"]
operator1 = i["doc"]["constraints"]["operator_functions"]
operator2 = operator1.split(",")
data = i["doc"]["data"]
for d in data:
code = d["code"]
if type1 == "int" or type1 == "long":
for o in operator2:
str1 = name + " "+ o + " "+ code
list.append(str1)
else:
for o in operator2:
str1 = name + " "+ o + " "+ " '" + code + " '"
list.append(str1)
print(list)
return list
#根据Filter1方法中的的数据写入log请求接口中来验证log请求接口
def logapiverify(logurl,schemauerl,token,starttime,endtime,logtype):
filter2 = Filter1(schemauerl,token)
a = schema(schemauerl,token)
fields = a["data"]["fields"]
print(fields)
str2 = ""
for i in filter2:
str2 = str2 + i + " " + "and" + " "
url = logurl #"http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time":starttime,
"end_common_recv_time":endtime,
"logType":logtype,
"fields":fields,
"filter":i
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data),headers=headers)
code = response1.json()["code"]
assert code == 200
print(response1.json()["code"])
return response1.json()
print(str2)
str3 = str2[0:-4]
print(str3)
url = logurl # "http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": str3
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
assert code == 200
print(response1.json()["code"])
#精确filter请求日志接口
def loglistverify(logurl,schemauerl,token,starttime,endtime,logtype,filtervalue):
a = schema(schemauerl,token)
fields = a["data"]["fields"]
print(fields)
url = logurl # "http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": filtervalue
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
assert code == 200
print(response1.json()["code"])
return response1.json()
#if __name__ == '__main__':
# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log")