This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
dongxiaoyan-tsg-autotest/04-CustomLibrary/Custometest/LogSchema.py
2021-05-08 09:41:33 +08:00

419 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# /user/bin/python
# -*-coding:utf-8-*-
import requests
import random
import json
import LogResponseVAL
import time, datetime
# import allure
list = []
# 请求schema接口得到返回数据用于其他接口
def schema(schemauerl, token):
url = schemauerl # "http://192.168.44.72:8080/v1/log/schema?logType=security_event_log"
headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token}
response = requests.get(url=url, headers=headers)
return response.json()
# 根据schema接口返回数据得出所有属性所支持的比较类型的列表
# 1、根据[doc][allow_query]值为true列支持搜索
# 2、如有[doc][constraints][operator_functions]值,操作优先;
# 3、如有[doc][data]值则对应属性取值为data所列code值
# 4、int和long的范围不一致
# 5、string要包含特殊字符
# 6、给查询条件赋值要给出边界和正常值
# 7、IPV4、V6和URL要给出专门的方法生成
import ipaddress
# 生成随机ipv4或ipv6
MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
def random_ipv4():
return ipaddress.IPv4Address._string_from_ip_int(
random.randint(0, MAX_IPV4)
)
def random_ipv6():
return ipaddress.IPv6Address._string_from_ip_int(
random.randint(0, MAX_IPV6)
)
from random import Random
# 生成 12 位随机 URL 地址
def randrom_url():
str = ''
str1 = ''
chars = 'abcdefghijklmnopqrstuvwxyz0123456789'
chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()'
length = len(chars)
length1 = len(chars1)
random = Random()
for x in range(random.randint(8, 16)):
str += chars[random.randint(0, length - 1)]
for pp in range(random.randint(8, 16)):
str1 += chars1[random.randint(0, length1 - 1)]
url = str[0:-5] + "." + str[0:-6] + "." + str[0:-7] + "/" + str1
print(url)
return url
def Filter1(schemauerl, token):
json_str = schema(schemauerl, token)
print(type(json_str))
# 获取日志属性定义
fields = json_str["data"]["fields"]
# 获取不同属性支持的部不同操作
operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"]
for i in fields:
number = random.randint(0, 2147483647)
maxnumber = 2147483647
minnumber = -2147483648
str = random.choice('abcdefghijklmnopqrstuvwxyz!@#%^&*')
name = i["name"]
doc = i["doc"]
# 获取无任何特殊说明列:
if doc == None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "notEmpty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
if i["doc"]["constraints"] == None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "notEmpty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
if i["doc"]["constraints"]["operator_functions"] == None:
type1 = i["type"]
for j in operator:
if type1 == j["type"]:
if type1 == "int" or type1 == "long":
value1 = number
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + f"{value1}"
list.append(str1)
elif type1 == "string":
value1 = str
functions = j["functions"]
functions1 = functions.split(",")
for v in functions1:
if v == "notEmpty" or v == "empty":
str1 = v + "(" + " '" + name + " '" + ")"
list.append(str1)
elif v == "in" or v == "not in":
str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
list.append(str1)
else:
str1 = name + " " + v + " " + " '" + value1 + " '"
list.append(str1)
else:
type1 = i["type"]
operator1 = i["doc"]["constraints"]["operator_functions"]
operator2 = operator1.split(",")
data = i["doc"]["data"]
for d in data:
code = d["code"]
if type1 == "int" or type1 == "long":
for o in operator2:
str1 = name + " " + o + " " + code
list.append(str1)
else:
for o in operator2:
str1 = name + " " + o + " " + " '" + code + " '"
list.append(str1)
print(list)
return list
# 根据Filter1方法中的的数据写入log请求接口中来验证log请求接口
def logapiverify(logurl, token, starttime, endtime, host,port,logtypelist):
for logtype in logtypelist:
schemauerl="http://"+host+":"+port+"/v1/log/schema?logType="+logtype
filter2 = Filter1(schemauerl, token)
a = schema(schemauerl, token)
fields = a["data"]["fields"]
print(fields)
str2 = ""
for i in filter2:
print("条件:",i)
str2 = str2 + i + " " + "and" + " "
url = logurl # "http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": i
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print("code",code)
assert code == 200
print(response1.json()["code"])
# return response1.json()
# print("111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
# print(str2)
# str3 = str2[0:-4]
# print(str3)
# url = logurl # "http://192.168.44.72:8080/v1/log/list"
# headers = {"Content-Type": "application/json",
# "Authorization": token}
# data = {
# "start_common_recv_time": starttime,
# "end_common_recv_time": endtime,
# "logType": logtype,
# "fields": fields,
# "filter": str3
# }
# print(data)
# print(json.dumps(data))
# response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
# code = response1.json()["code"]
# print(response1.json())
# assert code == 200
# print(response1.json()["code"])
# 精确filter请求日志接口
def loglistverify(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
a = schema(schemauerl, token)
fields = a["data"]["fields"]
print(fields)
url = logurl # "http://192.168.44.72:8080/v1/log/list"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": filtervalue
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
assert code == 200
print(response1.json()["code"])
return response1.json()
# 事件日志和通联日志时间分布查询 日志检索条件校验filter内容验证
def distributed_query(logurl, token):
url = logurl # url示例http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M
headers = {"Content-Type": "application/json", "Authorization": token}
response = requests.get(url=url, headers=headers)
code = response.json()["code"]
print(response.json())
assert code == 200
print(response.json()["code"])
return response.json()
#日志检索条件校验 纯接口
def LogRetrieve(schemaurl,host,port,token,logType,datajson):
data=datajson["data"]["list"][0]
keylist = LogResponseVAL.getKeys(data)
a = schema(schemaurl, token)
fields=a["data"]["fields"]
for i in keylist:
conditions = data[i]
if conditions != None:
for field in fields:
name = field["name"]
if i == name:
if field["type"] == "string":
filter = "logType=" + logType + "&" + "filter=" + i + "=" + "'" + conditions + "'"
Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + filter
print(Logurl)
responsebody = distributed_query(Logurl, token)
else:
if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
timeStamp = str(int(time.mktime(timeArray)))
filter = "logType=" + logType + "&" + "filter=" + i + "=" + timeStamp
Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + filter
print(Logurl)
responsebody = distributed_query(Logurl, token)
else:
filter = "logType="+logType+"&"+"filter="+i+"="+conditions
Logurl = "http://"+host+":"+port+"/v1/interface/gateway/sql/galaxy/log/filter/validation?"+filter
print(Logurl)
responsebody = distributed_query(Logurl, token)
# 原始日志检索时间分布计算
def timedistribution(logurl, token, starttime, endtime, logtype, granularity, filtervalue):
url = logurl # "http://192.168.44.72:8080/v1/log/timedistribution"
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"startTime": starttime,
"endTime": endtime,
"logType": logtype,
"granularity": granularity,
"filter": filtervalue
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
print(response1.json()["code"])
assert code == 200
return response1.json()
# 日志总数查询
def countlog_query(logurl, token, starttime, endtime, logtype):
url = logurl
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"pageSize": 20,
"logType": logtype,
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"filter": ""
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
code = response1.json()["code"]
print(response1.json())
print(response1.json()["code"])
assert code == 200
return response1.json()
# 日志导出接口
def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
a = schema(schemauerl, token)
fields = a["data"]["fields"]
print(fields)
url = logurl
headers = {"Content-Type": "application/json",
"Authorization": token}
data = {
"start_common_recv_time": starttime,
"end_common_recv_time": endtime,
"logType": logtype,
"fields": fields,
"filter": filtervalue
}
print(data)
print(json.dumps(data))
response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
a=type(response1)
if a != "class 'requests.models.Response'":
assert 1 == 1
else:
assert 1 == 2
#判断日志内详情字段
def LogFieldValidation(schemauerl,token,datajson):
Schemajson = schema(schemauerl, token)
fields=Schemajson["data"]["fields"]
keylist= LogResponseVAL.getKeys(datajson["data"]["list"][0])
schema_typedict=Schemajson["data"]["doc"]["schema_type"]
schema_typelistkey=schema_typedict.keys()
for schema_typekey in schema_typelistkey: #取出schema_type内的每一个key
for i in schema_typedict[schema_typekey]["columns"]:
for filter in fields:
if filter["name"] == i:
if filter["doc"] == None:
if i not in keylist:
print("该字段未存在日志详情内",i)
assert 1==2
else:
print("该字段通过在日志详情内",i)
else:
if filter["doc"]["visibility"] != "disabled":
if i not in keylist:
print("该字段未存在日志详情内",i)
assert 1==2
else:
print("该字段通过在日志详情内",i)
# if __name__ == '__main__':
# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log")