This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
dongxiaoyan-tsg-autotest/04-CustomLibrary/Custometest/LogSchema.py

513 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# /user/bin/python
# -*-coding:utf-8-*-
import requests
import random
import json
import LogResponseVAL
import time, datetime
# import allure
# 请求schema接口得到返回数据用于其他接口
def schema(schemauerl, token):
url = schemauerl
headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token}
response = requests.get(url=url, headers=headers)
return response.json()
# 根据schema接口返回数据得出所有属性所支持的比较类型的列表
# 1、根据[doc][allow_query]值为true列支持搜索
# 2、如有[doc][constraints][operator_functions]值,操作优先;
# 3、如有[doc][data]值则对应属性取值为data所列code值
# 4、int和long的范围不一致
# 5、string要包含特殊字符
# 6、给查询条件赋值要给出边界和正常值
# 7、IPV4、V6和URL要给出专门的方法生成
import ipaddress
# 生成随机ipv4或ipv6
MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
def random_ipv4():
return ipaddress.IPv4Address._string_from_ip_int(
random.randint(0, MAX_IPV4)
)
def random_ipv6():
return ipaddress.IPv6Address._string_from_ip_int(
random.randint(0, MAX_IPV6)
)
from random import Random
# 生成 12 位随机 URL 地址
def randrom_url():
str = ''
str1 = ''
chars = 'abcdefghijklmnopqrstuvwxyz0123456789'
chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()'
length = len(chars)
length1 = len(chars1)
random = Random()
for x in range(random.randint(8, 16)):
str += chars[random.randint(0, length - 1)]
for pp in range(random.randint(8, 16)):
str1 += chars1[random.randint(0, length1 - 1)]
url = str[0:-5] + "." + str[0:-6] + "." + str[0:-7] + "/" + str1
print(url)
return url
def Filter1(schemauerl, token):
list = []
json_str = schema(schemauerl, token)
print("schemauerl",json_str)
print(type(json_str))
# 获取日志属性定义
fields = json_str["data"]["fields"]
print("1111111111",fields)
# 获取不同属性支持的部不同操作
operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"]
for i in fields:
number = random.randint(0, 2147483647)
maxnumber = 2147483647
minnumber = -2147483648
str = random.choice('abcdefghijklmnopqrstuvwxyz!@#%^&*')
name = i["name"]
doc = i["doc"]
# 获取无任何特殊说明列:
if doc == None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "notEmpty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
if i["doc"]["constraints"] == None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "notEmpty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
if i["doc"]["constraints"]["operator_functions"] == None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "notEmpty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
type1 = i["type"]
operator1 = i["doc"]["constraints"]["operator_functions"]
operator2 = operator1.split(",")
data = i["doc"]["data"]
for d in data:
code = d["code"]
if type1 == "int" or type1 == "long":
for o in operator2:
str1 = name + " " + o + " " + code
list.append(str1)
else:
for o in operator2:
str1 = name + " " + o + " " + " '" + code + " '"
list.append(str1)
print("22222222222",list)
return list
# 根据Filter1方法中的的数据写入log请求接口中来验证log请求接口
def logapiverify(schemauerl,logurl, token, starttime, endtime,logtype):
filter2 = Filter1(schemauerl, token)
a = schema(schemauerl, token)
fields = a["data"]["fields"]
print("333333333333",filter2)
for i in filter2:
print("条件:", i)
url = logurl # "http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": i
}
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
assert code == 200
print(response1.json()["code"])
return response1.json()
# print("111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
# print(str2)
# str3 = str2[0:-4]
# print(str3)
# url = logurl # "http://192.168.44.72:8080/v1/log/list"
# headers = {"Content-Type": "application/json",
# "Authorization": token}
# data = {
# "start_common_recv_time": starttime,
# "end_common_recv_time": endtime,
# "logType": logtype,
# "fields": fields,
# "filter": str3
# }
# print(data)
# print(json.dumps(data))
# response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
# code = response1.json()["code"]
# print(response1.json())
# assert code == 200
# print(response1.json()["code"])
# 精确filter请求日志接口
def loglistverify(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
a = schema(schemauerl, token)
fields = a["data"]["fields"]
url = logurl # "http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": filtervalue
}
# print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
assert code == 200
print(response1.json()["code"])
return response1.json()
#目的性验证,循坏返回列表中所有字段进行查询
def loglistverifys(logurl, schemaurl, token, starttime, endtime, logtype, datajson):
nullkey = []
data = datajson
keylist = LogResponseVAL.getKeys(data)
a = schema(schemaurl, token)
fields = a["data"]["fields"]
for i in keylist:
conditions = data[i]
for field in fields:
name = field["name"]
if field["doc"] == None or field["doc"]["visibility"] == None:
if i == name:
if conditions != None and conditions != "":
if field["type"] == "string":
if conditions[0] == "'" and conditions[-1] == "'":
filtervalue = i + " = " + conditions
VasserValue=i + " = " + conditions[1:-1]
else:
filtervalue = i + " = " + "'" + conditions + "'"
VasserValue= i + " = " + conditions
else:
if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
timeStamp = str(int(time.mktime(timeArray)))
filtervalue = i + " = " + timeStamp
VasserValue = filtervalue
else:
filtervalue = i + " = " + str(conditions)
VasserValue = filtervalue
print("filtervalue",filtervalue)
#根据提取条件进行查询日志列表
responsebody = loglistverify(logurl, schemaurl, token, starttime, endtime, logtype,
filtervalue)
filterlist=[VasserValue]
print(VasserValue)
LogResponseVAL.FieldValidation(responsebody,filterlist)
else:
nullkey.append(i) #所有为None或者“”的字段
return nullkey
# 多条循环 变量设置为公共参数 若循环内一个字段没有值 进行下次循坏
def logAllFieldsListInterface(logurl, schemaurl, token, starttime, endtime, logtype, datajson,lognumber,logcycles):
datalist = datajson["data"]["list"]
keylist=[]
number=0
print(lognumber)
print(type(lognumber))
print(logcycles)
print(type(logcycles))
for i in range(0, len(datalist), int(lognumber)):# 循环取出count个列表元素
number+=1
nullkeylist=[]
ret=datalist[i:i + int(lognumber)]
for data in ret:
nullkey=loglistverifys(logurl, schemaurl, token, starttime, endtime, logtype, data)
nullkeylist.append(nullkey)
print(nullkeylist)
for j in nullkeylist:
#对返回的为空的key进行取交集
if len(keylist) == 0:
keylist=j
else:
#取两个列表的交集
keylist=list(set(keylist).intersection(set(j)))
if len(keylist) == 0 or number >= int(logcycles):
break
print("最终数据中没有值的字段为:",keylist)
# 事件日志和通联日志时间分布查询 日志检索条件校验filter内容验证
def distributed_query(logurl, token):
url = logurl # url示例http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M
headers = {"Content-Type": "application/json", "Authorization": token}
response = requests.get(url=url, headers=headers)
code = response.json()["code"]
print(response.json())
assert code == 200
print(response.json()["code"])
return response.json()
#日志检索条件校验 纯接口
def LogRetrieve(schemaurl,host,port,token,logType,datajson):
number = random.randint(0, 2147483647)
str1 = random.choice('abcdefghijklmnopqrstuvwxyz')
data=datajson["data"]["list"][0]
keylist = LogResponseVAL.getKeys(data)
a = schema(schemaurl, token)
fields=a["data"]["fields"]
for i in keylist:
conditions = data[i]
for field in fields:
name = field["name"]
if i == name:
if field["type"] == "string":
filter = "logType=" + logType + "&" + "filter=" + i + "=" + "'" + str1 + "'"
else:
if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
timeStamp = str(int(time.mktime(timeArray)))
filter = "logType=" + logType + "&" + "filter=" + i + "=" + timeStamp
else:
filter = "logType=" + logType + "&" + "filter=" + i + "=" + str(number)
Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + filter
print(Logurl)
responsebody = distributed_query(Logurl, token)
# 日志检索条件校验 复杂sql
def LogRetrieveSql(schemaurl,host,port,token,logType,datajson):
data = datajson["data"]["list"][0]
keylist = LogResponseVAL.getKeys(data)
sqllist=random.sample(keylist, 4)
number = 45585
str1 = random.choice('abcdefghijklmnopqrstuvwxyz')
print(sqllist)
a = schema(schemaurl, token)
filterlist=[]
fields=a["data"]["fields"]
for i in sqllist:
conditions = data[i]
for field in fields:
name = field["name"]
if i == name:
if field["type"] == "string":
if conditions == "" or conditions == None:
conditions=str1
filter = i + "=" + "'" + conditions + "'"
else:
if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
timeStamp = str(int(time.mktime(timeArray)))
filter =i + "=" + timeStamp
else:
if conditions == "" or conditions == None:
conditions = number
filter = i + "=" + str(conditions)
print(filter)
filterlist.append(filter)
sqlfilter = "(("+filterlist[0]+" OR "+filterlist[1]+") AND "+filterlist[2]+") OR "+filterlist[3]
_filter = "logType=" + logType + "&" + "filter=" + sqlfilter
Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + _filter
print(Logurl)
responsebody = distributed_query(Logurl, token)
print(sqlfilter)
# 原始日志检索时间分布计算
def timedistribution(logurl, token, starttime, endtime, logtype, granularity, filtervalue):
url = logurl # "http://192.168.44.72:8080/v1/log/timedistribution"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"startTime": starttime,
"endTime": endtime,
"logType": logtype,
"granularity": granularity,
"filter": filtervalue
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
print(response1.json()["code"])
assert code == 200
return response1.json()
# 日志总数查询
def countlog_query(logurl, token, starttime, endtime, logtype):
url = logurl
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"pageSize": 20,
"logType": logtype,
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"filter": ""
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
print(response1.json()["code"])
assert code == 200
return response1.json()
# 日志导出接口
def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
a = schema(schemauerl, token)
fields = a["data"]["fields"]
print(fields)
url = logurl
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": filtervalue
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
a=type(response1)
if a != "class 'requests.models.Response'":
assert 1 == 1
else:
assert 1 == 2
#判断日志内详情字段
def LogFieldValidation(schemauerl,token,datajson):
Schemajson = schema(schemauerl, token)
fields=Schemajson["data"]["fields"]
keylist= LogResponseVAL.getKeys(datajson["data"]["list"][0])
schema_typedict=Schemajson["data"]["doc"]["schema_type"]
schema_typelistkey=schema_typedict.keys()
for schema_typekey in schema_typelistkey: #取出schema_type内的每一个key
for i in schema_typedict[schema_typekey]["columns"]:
for filter in fields:
if filter["name"] == i:
if filter["doc"] == None:
if i not in keylist:
print("该字段未存在日志详情内",i)
assert 1==2
else:
print("该字段通过在日志详情内",i)
else:
if filter["doc"]["visibility"] != "disabled":
if i not in keylist:
print("该字段未存在日志详情内",i)
assert 1==2
else:
print("该字段通过在日志详情内",i)
# if __name__ == '__main__':
# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log")