#!/user/bin/python #-*-coding:utf-8-*- import requests import random import json #import allure list = [] #请求schema接口得到返回数据,用于其他接口 def schema(schemauerl,token): url = schemauerl #"http://192.168.44.72:8080/v1/log/schema?logType=security_event_log" headers = {"Content-Type":"application/x-www-form-urlencoded","Authorization":token} response = requests.get(url=url,headers=headers) return response.json() #根据schema接口返回数据,得出所有属性所支持的比较类型的列表 #1、根据[doc][allow_query]值为true列支持搜索; #2、如有[doc][constraints][operator_functions]值,操作优先; #3、如有[doc][data]值则对应属性取值为data所列code值; #4、int和long的范围不一致; #5、string要包含特殊字符 #6、给查询条件赋值,要给出边界和正常值 #7、IP(V4、V6)和URL要给出专门的方法生成 import ipaddress #生成随机ipv4或ipv6 MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1 MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1 def random_ipv4(): return ipaddress.IPv4Address._string_from_ip_int( random.randint(0, MAX_IPV4) ) def random_ipv6(): return ipaddress.IPv6Address._string_from_ip_int( random.randint(0, MAX_IPV6) ) from random import Random # 生成 12 位随机 URL 地址 def randrom_url(): str = '' str1 = '' chars = 'abcdefghijklmnopqrstuvwxyz0123456789' chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()' length = len(chars) length1 = len(chars1) random = Random() for x in range(random.randint(8,16)): str += chars[random.randint(0,length - 1)] for pp in range(random.randint(8,16)): str1 += chars1[random.randint(0,length1 - 1)] url = str[0:-5]+"." + str[0:-6] + "." + str[0:-7] + "/" + str1 print(url) return url def Filter1(schemauerl,token): json_str = schema(schemauerl,token) print(type(json_str)) #获取日志属性定义 fields = json_str["data"]["fields"] #获取不同属性支持的部不同操作 operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"] for i in fields: number = random.randint(-2147483648,2147483647) maxnumber = 2147483647 minnumber = -2147483648 str = random.choice('abcdefghijklmnopqrstuvwxyz!@#$%^&*') name = i["name"] doc = i["doc"] #获取无任何特殊说明列: if doc == None: type1 = i["type"] for j in operator: if type1 == j["type"]: if type1 == "int" or type1 == "long": value1 = number functions = j["functions"] functions1 = functions.split(",") for v in functions1: if v=="in" or v == "not in": str1 = name + " "+v + " "+ "(" +f"{value1}" + ")" list.append(str1) else: str1 = name + " " + v + " " + f"{value1}" list.append(str1) elif type1 == "string": value1 = str functions = j["functions"] functions1 = functions.split(",") for v in functions1: if v == "not empty" or v == "empty": str1 = v + "(" + " '" + name + " '" + ")" list.append(str1) elif v == "in" or v == "not in": str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" list.append(str1) else: str1 = name + " " + v + " " + " '"+ value1 + " '" list.append(str1) else: if i["doc"]["constraints"]== None: type1 = i["type"] for j in operator: if type1==j["type"]: if type1=="int" or type1=="long": value1 = number functions = j["functions"] functions1 = functions.split(",") for v in functions1: if v == "in" or v == "not in": str1 = name + " " + v + " " + "(" + f"{value1}" + ")" list.append(str1) else: str1 = name + " " + v + " " + f"{value1}" list.append(str1) elif type1=="string": value1 = str functions = j["functions"] functions1 = functions.split(",") for v in functions1: if v == "not empty" or v == "empty": str1 = v + "(" + " '" + name + " '" + ")" list.append(str1) elif v == "in" or v == "not in": str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" list.append(str1) else: str1 = name + " " + v + " " + " '" + value1 + " '" list.append(str1) else: if i["doc"]["constraints"]["operator_functions"]==None: type1 = i["type"] for j in operator: if type1 == j["type"]: if type1 == "int" or type1 == "long": value1 = number functions = j["functions"] functions1 = functions.split(",") for v in functions1: if v == "in" or v == "not in" : str1 = name + " " + v + " " + "(" + f"{value1}" + ")" list.append(str1) else: str1 = name + " " + v + " " + f"{value1}" list.append(str1) elif type1 == "string": value1 = str functions = j["functions"] functions1 = functions.split(",") for v in functions1: if v == "not empty" or v == "empty": str1 = v + "(" + " '" + name + " '" + ")" list.append(str1) elif v == "in" or v == "not in": str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" list.append(str1) else: str1 = name + " " + v + " " + " '" + value1 + " '" list.append(str1) else: type1 = i["type"] operator1 = i["doc"]["constraints"]["operator_functions"] operator2 = operator1.split(",") data = i["doc"]["data"] for d in data: code = d["code"] if type1 == "int" or type1 == "long": for o in operator2: str1 = name + " "+ o + " "+ code list.append(str1) else: for o in operator2: str1 = name + " "+ o + " "+ " '" + code + " '" list.append(str1) print(list) return list #根据Filter1方法中的的数据,写入log请求接口中,来验证log请求接口 def logapiverify(logurl,schemauerl,token,starttime,endtime,logtype): filter2 = Filter1(schemauerl,token) a = schema(schemauerl,token) fields = a["data"]["fields"] print(fields) str2 = "" for i in filter2: str2 = str2 + i + " " + "and" + " " url = logurl #"http://192.168.44.72:8080/v1/log/list" headers = {"Content-Type": "application/json", "Authorization": token} data = { "start_common_recv_time":starttime, "end_common_recv_time":endtime, "logType":logtype, "fields":fields, "filter":i } print(data) print(json.dumps(data)) response1 = requests.post(url=url, data=json.dumps(data),headers=headers) code = response1.json()["code"] assert code == 200 print(response1.json()["code"]) return response1.json() print(str2) str3 = str2[0:-4] print(str3) url = logurl # "http://192.168.44.72:8080/v1/log/list" headers = {"Content-Type": "application/json", "Authorization": token} data = { "start_common_recv_time": starttime, "end_common_recv_time": endtime, "logType": logtype, "fields": fields, "filter": str3 } print(data) print(json.dumps(data)) response1 = requests.post(url=url, data=json.dumps(data), headers=headers) code = response1.json()["code"] print(response1.json()) assert code == 200 print(response1.json()["code"]) #精确filter,请求日志接口 def loglistverify(logurl,schemauerl,token,starttime,endtime,logtype,filtervalue): a = schema(schemauerl,token) fields = a["data"]["fields"] print(fields) url = logurl # "http://192.168.44.72:8080/v1/log/list" headers = {"Content-Type": "application/json", "Authorization": token} data = { "start_common_recv_time": starttime, "end_common_recv_time": endtime, "logType": logtype, "fields": fields, "filter": filtervalue } print(data) print(json.dumps(data)) response1 = requests.post(url=url, data=json.dumps(data), headers=headers) code = response1.json()["code"] print(response1.json()) assert code == 200 print(response1.json()["code"]) return response1.json() #if __name__ == '__main__': # logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log")