1.上传Report纯接口测试反向方法 2.上传report反向用例
This commit is contained in:
871
04-CustomLibrary/Custometest/ReportSchema_Negtive.py
Normal file
871
04-CustomLibrary/Custometest/ReportSchema_Negtive.py
Normal file
@@ -0,0 +1,871 @@
|
||||
import requests
|
||||
import random
|
||||
import json
|
||||
import time
|
||||
import ipaddress
|
||||
|
||||
# Report纯接口测试 反向用例方法,不验证数据统计准确性,单纯验证接口
|
||||
|
||||
# 生成随机ipv4或ipv6
|
||||
MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
|
||||
MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
|
||||
|
||||
|
||||
def random_ipv4():
|
||||
return ipaddress.IPv4Address._string_from_ip_int(
|
||||
random.randint(0, MAX_IPV4))
|
||||
|
||||
|
||||
def random_ipv6():
|
||||
return ipaddress.IPv6Address._string_from_ip_int(
|
||||
random.randint(0, MAX_IPV6))
|
||||
|
||||
|
||||
# 随机生成邮箱地址
|
||||
def RandomEmail(emailType=None, rang=None):
|
||||
__emailtype = ["@qq.com", "@163.com", "@126.com", "@189.com"]
|
||||
# 如果没有指定邮箱类型,默认在 __emailtype中随机一个
|
||||
if emailType == None:
|
||||
__randomEmail = random.choice(__emailtype)
|
||||
else:
|
||||
__randomEmail = emailType
|
||||
# 如果没有指定邮箱长度,默认在4-10之间随机
|
||||
if rang == None:
|
||||
__rang = random.randint(4, 10)
|
||||
else:
|
||||
__rang = int(rang)
|
||||
__Number = "0123456789qbcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPWRSTUVWXYZ"
|
||||
__randomNumber = "".join(random.choice(__Number) for i in range(__rang))
|
||||
_email = __randomNumber + __randomEmail
|
||||
return _email
|
||||
|
||||
|
||||
# 获取Schema
|
||||
def schema(schemauerl, token, logtype):
|
||||
url = "http://192.168.44.72:8080/v1/log/schema?logType=" + logtype
|
||||
headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token}
|
||||
response = requests.get(url=url, headers=headers)
|
||||
return response.json()
|
||||
|
||||
|
||||
# 获取json串中groupColumnList的值
|
||||
def groupby(schemajson, logtype, testpoint):
|
||||
dimensions = schemajson["data"]["doc"]["schema_query"]["dimensions"]
|
||||
dimensions.append("common_recv_time");
|
||||
if logtype == "security_event_log" or logtype == "connection_record_log" or logtype == "voip_record_log":
|
||||
dimensions.remove("common_start_time")
|
||||
dimensions.remove("common_end_time")
|
||||
randomstr_1 = []
|
||||
if testpoint == "GroupBy":
|
||||
randomstr_1.append("GroupBy_Negtive")
|
||||
else:
|
||||
randomstr_1 = random.sample(dimensions, 4)
|
||||
|
||||
# 定义grp为返回值group的列表
|
||||
grp = []
|
||||
for i in randomstr_1:
|
||||
a = {"name": i}
|
||||
grp.append(a)
|
||||
|
||||
re = [grp, randomstr_1]
|
||||
print("groupby", re)
|
||||
return re
|
||||
|
||||
|
||||
# 获取json串中queryColumnList的值
|
||||
def DataBindings(schemajson, randomstr_1, testpoint, field):
|
||||
# 生成queryColumnList列表
|
||||
print("field", field)
|
||||
metrics = schemajson["data"]["doc"]["schema_query"]["metrics"]
|
||||
metrics.append("common_log_id")
|
||||
# 在列表里随机元素
|
||||
randomstr_2 = []
|
||||
if testpoint == "DataBindings_Field" or testpoint == "DataBindings_Aggregate":
|
||||
randomstr_2.append(field)
|
||||
randomstr_3 = randomstr_2
|
||||
else:
|
||||
randomstr_2 = random.sample(metrics, 6)
|
||||
# 在聚合列表中去掉groupby中的重复的元素
|
||||
randomstr_3 = array_diff(randomstr_2, randomstr_1)
|
||||
# 将groupby中元素添加到串中
|
||||
qul = []
|
||||
for i in randomstr_1:
|
||||
a = {"name": i}
|
||||
qul.append(a)
|
||||
|
||||
fields = schemajson["data"]["fields"]
|
||||
if testpoint == "DataBindings_Aggregate":
|
||||
list_1 = ["countdistinct"]
|
||||
list_2 = ["summ"]
|
||||
else:
|
||||
list_1 = ["sum", "min", "max", "avg", "count"]
|
||||
list_2 = ["count", "count_distinct"]
|
||||
|
||||
if testpoint == "DataBindings_Field":
|
||||
Aggregate = "sum"
|
||||
randomstr_4 = {"name": randomstr_2[0], "expression": Aggregate}
|
||||
qul.append(randomstr_4)
|
||||
|
||||
elif testpoint == "DataBindings_Aggregate":
|
||||
for i in randomstr_3:
|
||||
for j in fields:
|
||||
if i == j["name"]:
|
||||
jtype = j["type"]
|
||||
label = i
|
||||
sun = 1
|
||||
if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double":
|
||||
for Aggregate in list_1:
|
||||
randomstr_4 = {"name": i, "expression": Aggregate, "label": label}
|
||||
qul.append(randomstr_4)
|
||||
label = label + str(sun)
|
||||
sun += 1
|
||||
elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string":
|
||||
for Aggregate in list_2:
|
||||
randomstr_4 = {"name": i, "expression": Aggregate, "label": label}
|
||||
qul.append(randomstr_4)
|
||||
label = label + str(sun)
|
||||
sun += 1
|
||||
|
||||
else:
|
||||
for i in randomstr_3:
|
||||
for j in fields:
|
||||
if i == j["name"]:
|
||||
jtype = j["type"]
|
||||
if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double":
|
||||
radomlist = random.sample(list_1, 1)
|
||||
randomstr_4 = {"name": i, "expression": radomlist[0]}
|
||||
qul.append(randomstr_4)
|
||||
elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string":
|
||||
randomlist = random.sample(list_2, 1)
|
||||
randomstr_4 = {"name": i, "expression": randomlist[0]}
|
||||
qul.append(randomstr_4)
|
||||
print("DataBindings", qul)
|
||||
return qul
|
||||
|
||||
|
||||
# #去除a列表中存在的b的元素
|
||||
def array_diff(a, b):
|
||||
# 定义空列表
|
||||
c = []
|
||||
# range(len(a))取的为列表a的索引,根据a的
|
||||
for i in range(len(a)):
|
||||
# 取出索引对应的值
|
||||
t = a[i]
|
||||
# 判断值是否存在在序列b中
|
||||
if t not in b:
|
||||
# 如果序列不在b中,则写入序列c
|
||||
c.append(t)
|
||||
# 返回序列c,c就是列表a去除列表b之后的元素
|
||||
return c
|
||||
|
||||
|
||||
def filterCondition(schemajson, testpoint, field):
|
||||
number = random.randint(0, 100000)
|
||||
randomstr = random.choice('abcdefghijklmnopqrstuvwxyz')
|
||||
schemafilters = schemajson["data"]["doc"]["schema_query"]["filters"]
|
||||
list1 = []
|
||||
if testpoint == "Filter_Field" or testpoint == "Filter_Operator":
|
||||
list1.append(field)
|
||||
else:
|
||||
list1 = random.sample(schemafilters, 4)
|
||||
# 获取不同属性支持的部不同操作
|
||||
fields = schemajson["data"]["fields"]
|
||||
operator = schemajson["data"]["doc"]["schema_query"]["references"]["operator"]
|
||||
andConditions = []
|
||||
if testpoint == "Filter_Field":
|
||||
orConditions_list = []
|
||||
Field = {"name": field, "expression": "!=", "value": [1], "type": "int"}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
elif testpoint == "Filter_Operator":
|
||||
for i in list1:
|
||||
# 遍历fields列表
|
||||
for k in fields:
|
||||
# 当filters列表值等于fields的name时
|
||||
if i == k["name"]:
|
||||
name = k["name"]
|
||||
type1 = k["type"]
|
||||
if type1 == "int" or type1 == "long":
|
||||
orConditions_list = []
|
||||
Operator = ["=="]
|
||||
randomOperator = random.sample(Operator, 1)
|
||||
value = [str(number)]
|
||||
Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
elif type1 == "string":
|
||||
orConditions_list = []
|
||||
Operator = ["=="]
|
||||
randomOperator_1 = random.sample(Operator, 1)
|
||||
randomOperator = randomOperator_1[0]
|
||||
value = []
|
||||
value.append(str(number))
|
||||
Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
#
|
||||
# else:
|
||||
# if k["doc"]["constraints"] == None:
|
||||
# type1 = k["type"]
|
||||
# if type1 == "int" or type1 == "long":
|
||||
# orConditions_list = []
|
||||
# Operator = ["=", "!=", ">", "<", ">=", "<="]
|
||||
# if testpoint == "Filter":
|
||||
# for op in Operator:
|
||||
# value = [str(number)]
|
||||
# Field = {"name": name, "expression": op, "value": value, "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# else:
|
||||
# randomOperator = random.sample(Operator, 1)
|
||||
# value = [str(number)]
|
||||
# Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# orConditions = {"orConditions": orConditions_list}
|
||||
# andConditions.append(orConditions)
|
||||
# elif type1 == "string":
|
||||
# orConditions_list = []
|
||||
# Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
# if testpoint == "Filter":
|
||||
# for op in Operator:
|
||||
# randomOperator = op
|
||||
# value = []
|
||||
# if randomOperator == "=" or randomOperator == "!=":
|
||||
# value.append(str(number))
|
||||
# elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
# value.append(randomstr)
|
||||
# elif randomOperator == "notEmpty":
|
||||
# value = []
|
||||
# Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# else:
|
||||
# randomOperator_1 = random.sample(Operator, 1)
|
||||
# randomOperator = randomOperator_1[0]
|
||||
# value = []
|
||||
# if randomOperator == "=" or randomOperator == "!=":
|
||||
# value.append(str(number))
|
||||
# elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
# value.append(randomstr)
|
||||
# elif randomOperator == "notEmpty":
|
||||
# value = []
|
||||
# Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# orConditions = {"orConditions": orConditions_list}
|
||||
# andConditions.append(orConditions)
|
||||
#
|
||||
# else:
|
||||
# if k["doc"]["constraints"]["operator_functions"] == None:
|
||||
# conrandomstraints = k["doc"]["constraints"]
|
||||
# type1 = k["type"]
|
||||
# if type1 == "int" or type1 == "long":
|
||||
# orConditions_list = []
|
||||
# Operator = ["=", "!=", ">", "<", ">=", "<="]
|
||||
# if testpoint == "Filter":
|
||||
# for op in Operator:
|
||||
# randomOperator = op
|
||||
# if conrandomstraints["type"] == "timestamp":
|
||||
# # 获取当前时间戳
|
||||
# t = int(time.time())
|
||||
# value = [str(t)]
|
||||
# Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
# "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# else:
|
||||
# randomOperator_1 = random.sample(Operator, 1)
|
||||
# randomOperator = randomOperator_1[0]
|
||||
# if conrandomstraints["type"] == "timestamp":
|
||||
# # 获取当前时间戳
|
||||
# t = int(time.time())
|
||||
# value = [str(t)]
|
||||
# Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
# "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# orConditions = {"orConditions": orConditions_list}
|
||||
# andConditions.append(orConditions)
|
||||
# elif type1 == "string":
|
||||
# orConditions_list = []
|
||||
# Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
# if testpoint == "Filter":
|
||||
# if conrandomstraints["type"] == "ip":
|
||||
# for op in Operator:
|
||||
# # 获取ip
|
||||
# ip = random_ipv4()
|
||||
# value = []
|
||||
# if op == "=" or op == "!=":
|
||||
# value.append(ip)
|
||||
# elif op == "Like" or op == "Not Like":
|
||||
# value.append(ip)
|
||||
# elif op == "notEmpty":
|
||||
# value = []
|
||||
# Field = {"name": name, "expression": op, "value": value, "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# elif conrandomstraints["type"] == "email":
|
||||
# for op in Operator:
|
||||
# randomOperator = op
|
||||
# Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
# randomOperator_1 = random.sample(Operator, 1)
|
||||
# randomOperator = randomOperator_1[0]
|
||||
# # 获取ip
|
||||
# emil = RandomEmail()
|
||||
# value = []
|
||||
# if randomOperator == "=" or randomOperator == "!=":
|
||||
# value.append(emil)
|
||||
# elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
# value.append(emil)
|
||||
# elif randomOperator == "notEmpty":
|
||||
# value = []
|
||||
# Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
# "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# else:
|
||||
# randomOperator_1 = random.sample(Operator, 1)
|
||||
# randomOperator = randomOperator_1[0]
|
||||
# if conrandomstraints["type"] == "ip":
|
||||
# # 获取ip
|
||||
# ip = random_ipv4()
|
||||
# value = []
|
||||
# if randomOperator == "=" or randomOperator == "!=":
|
||||
# value.append(ip)
|
||||
# elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
# value.append(ip)
|
||||
# elif randomOperator == "notEmpty":
|
||||
# value = []
|
||||
# Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
# "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# orConditions = {"orConditions": orConditions_list}
|
||||
# andConditions.append(orConditions)
|
||||
# elif conrandomstraints["type"] == "email":
|
||||
# Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
# randomOperator_1 = random.sample(Operator, 1)
|
||||
# randomOperator = randomOperator_1[0]
|
||||
# # 获取ip
|
||||
# emil = RandomEmail()
|
||||
# value = []
|
||||
# if randomOperator == "=" or randomOperator == "!=":
|
||||
# value.append(emil)
|
||||
# elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
# value.append(emil)
|
||||
# elif randomOperator == "notEmpty":
|
||||
# value = []
|
||||
# Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
# "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# orConditions = {"orConditions": orConditions_list}
|
||||
# andConditions.append(orConditions)
|
||||
# else:
|
||||
# type1 = k["type"]
|
||||
# orConditions_list = []
|
||||
# operator1 = k["doc"]["constraints"]["operator_functions"]
|
||||
# operator2 = operator1.split(",")
|
||||
# if testpoint == "Filter":
|
||||
# for op in operator2:
|
||||
# operatordata = k["doc"]["data"]
|
||||
# code = []
|
||||
# for i in operatordata:
|
||||
# code_1 = i["code"]
|
||||
# code.append(code_1)
|
||||
# for co in code:
|
||||
# Field = {"name": name, "expression": op, "value": co, "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# else:
|
||||
# operator3 = random.sample(operator2, 1)
|
||||
# operatordata = k["doc"]["data"]
|
||||
# code = []
|
||||
# for i in operatordata:
|
||||
# code_1 = i["code"]
|
||||
# code.append(code_1)
|
||||
# code2 = random.sample(code, 1)
|
||||
# Field = {"name": name, "expression": operator3[0], "value": code2, "type": type1}
|
||||
# orConditions_list.append(Field)
|
||||
# orConditions = {"orConditions": orConditions_list}
|
||||
# andConditions.append(orConditions)
|
||||
else:
|
||||
for i in list1:
|
||||
# 遍历fields列表
|
||||
for k in fields:
|
||||
# 当filters列表值等于fields的name时
|
||||
if i == k["name"]:
|
||||
name = k["name"]
|
||||
doc = k["doc"]
|
||||
# 获取无任何特殊说明列:
|
||||
if doc == None:
|
||||
type1 = k["type"]
|
||||
if type1 == "int" or type1 == "long":
|
||||
orConditions_list = []
|
||||
Operator = ["=", "!=", ">", "<", ">=", "<="]
|
||||
if testpoint == "Filter":
|
||||
for op in Operator:
|
||||
value = [str(number)]
|
||||
Field = {"name": name, "expression": op, "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
else:
|
||||
randomOperator = random.sample(Operator, 1)
|
||||
value = [str(number)]
|
||||
Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
elif type1 == "string":
|
||||
orConditions_list = []
|
||||
Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
if testpoint == "Filter":
|
||||
for op in Operator:
|
||||
value = []
|
||||
if op == "=" or op == "!=":
|
||||
value.append(str(number))
|
||||
elif op == "Like" or op == "Not Like":
|
||||
value.append(randomstr)
|
||||
elif op == "notEmpty" or op == "empty":
|
||||
value = []
|
||||
Field = {"name": name, "expression": op, "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
else:
|
||||
randomOperator_1 = random.sample(Operator, 1)
|
||||
randomOperator = randomOperator_1[0]
|
||||
value = []
|
||||
if randomOperator == "=" or randomOperator == "!=":
|
||||
value.append(str(number))
|
||||
elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
value.append(randomstr)
|
||||
elif randomOperator == "notEmpty":
|
||||
value = []
|
||||
Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
|
||||
else:
|
||||
if k["doc"]["constraints"] == None:
|
||||
type1 = k["type"]
|
||||
if type1 == "int" or type1 == "long":
|
||||
orConditions_list = []
|
||||
Operator = ["=", "!=", ">", "<", ">=", "<="]
|
||||
if testpoint == "Filter":
|
||||
for op in Operator:
|
||||
value = [str(number)]
|
||||
Field = {"name": name, "expression": op, "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
else:
|
||||
randomOperator = random.sample(Operator, 1)
|
||||
value = [str(number)]
|
||||
Field = {"name": name, "expression": randomOperator[0], "value": value,
|
||||
"type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
elif type1 == "string":
|
||||
orConditions_list = []
|
||||
Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
if testpoint == "Filter":
|
||||
for op in Operator:
|
||||
randomOperator = op
|
||||
value = []
|
||||
if randomOperator == "=" or randomOperator == "!=":
|
||||
value.append(str(number))
|
||||
elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
value.append(randomstr)
|
||||
elif randomOperator == "notEmpty":
|
||||
value = []
|
||||
Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
"type": type1}
|
||||
orConditions_list.append(Field)
|
||||
else:
|
||||
randomOperator_1 = random.sample(Operator, 1)
|
||||
randomOperator = randomOperator_1[0]
|
||||
value = []
|
||||
if randomOperator == "=" or randomOperator == "!=":
|
||||
value.append(str(number))
|
||||
elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
value.append(randomstr)
|
||||
elif randomOperator == "notEmpty":
|
||||
value = []
|
||||
Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
|
||||
else:
|
||||
if k["doc"]["constraints"]["operator_functions"] == None:
|
||||
conrandomstraints = k["doc"]["constraints"]
|
||||
type1 = k["type"]
|
||||
if type1 == "int" or type1 == "long":
|
||||
orConditions_list = []
|
||||
Operator = ["=", "!=", ">", "<", ">=", "<="]
|
||||
if testpoint == "Filter":
|
||||
for op in Operator:
|
||||
randomOperator = op
|
||||
if conrandomstraints["type"] == "timestamp":
|
||||
# 获取当前时间戳
|
||||
t = int(time.time())
|
||||
value = [str(t)]
|
||||
Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
"type": type1}
|
||||
orConditions_list.append(Field)
|
||||
else:
|
||||
randomOperator_1 = random.sample(Operator, 1)
|
||||
randomOperator = randomOperator_1[0]
|
||||
if conrandomstraints["type"] == "timestamp":
|
||||
# 获取当前时间戳
|
||||
t = int(time.time())
|
||||
value = [str(t)]
|
||||
Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
"type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
elif type1 == "string":
|
||||
orConditions_list = []
|
||||
Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
if testpoint == "Filter":
|
||||
if conrandomstraints["type"] == "ip":
|
||||
for op in Operator:
|
||||
# 获取ip
|
||||
ip = random_ipv4()
|
||||
value = []
|
||||
if op == "=" or op == "!=":
|
||||
value.append(ip)
|
||||
elif op == "Like" or op == "Not Like":
|
||||
value.append(ip)
|
||||
elif op == "notEmpty":
|
||||
value = []
|
||||
Field = {"name": name, "expression": op, "value": value, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
elif conrandomstraints["type"] == "email":
|
||||
for op in Operator:
|
||||
randomOperator = op
|
||||
Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
randomOperator_1 = random.sample(Operator, 1)
|
||||
randomOperator = randomOperator_1[0]
|
||||
# 获取ip
|
||||
emil = RandomEmail()
|
||||
value = []
|
||||
if randomOperator == "=" or randomOperator == "!=":
|
||||
value.append(emil)
|
||||
elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
value.append(emil)
|
||||
elif randomOperator == "notEmpty":
|
||||
value = []
|
||||
Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
"type": type1}
|
||||
orConditions_list.append(Field)
|
||||
else:
|
||||
randomOperator_1 = random.sample(Operator, 1)
|
||||
randomOperator = randomOperator_1[0]
|
||||
if conrandomstraints["type"] == "ip":
|
||||
# 获取ip
|
||||
ip = random_ipv4()
|
||||
value = []
|
||||
if randomOperator == "=" or randomOperator == "!=":
|
||||
value.append(ip)
|
||||
elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
value.append(ip)
|
||||
elif randomOperator == "notEmpty":
|
||||
value = []
|
||||
Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
"type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
elif conrandomstraints["type"] == "email":
|
||||
Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
|
||||
randomOperator_1 = random.sample(Operator, 1)
|
||||
randomOperator = randomOperator_1[0]
|
||||
# 获取ip
|
||||
emil = RandomEmail()
|
||||
value = []
|
||||
if randomOperator == "=" or randomOperator == "!=":
|
||||
value.append(emil)
|
||||
elif randomOperator == "Like" or randomOperator == "Not Like":
|
||||
value.append(emil)
|
||||
elif randomOperator == "notEmpty":
|
||||
value = []
|
||||
Field = {"name": name, "expression": randomOperator, "value": value,
|
||||
"type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
else:
|
||||
type1 = k["type"]
|
||||
orConditions_list = []
|
||||
operator1 = k["doc"]["constraints"]["operator_functions"]
|
||||
operator2 = operator1.split(",")
|
||||
if testpoint == "Filter":
|
||||
for op in operator2:
|
||||
operatordata = k["doc"]["data"]
|
||||
code = []
|
||||
for i in operatordata:
|
||||
code_1 = i["code"]
|
||||
code.append(code_1)
|
||||
for co in code:
|
||||
Field = {"name": name, "expression": op, "value": co, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
else:
|
||||
operator3 = random.sample(operator2, 1)
|
||||
operatordata = k["doc"]["data"]
|
||||
code = []
|
||||
for i in operatordata:
|
||||
code_1 = i["code"]
|
||||
code.append(code_1)
|
||||
code2 = random.sample(code, 1)
|
||||
Field = {"name": name, "expression": operator3[0], "value": code2, "type": type1}
|
||||
orConditions_list.append(Field)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions.append(orConditions)
|
||||
filterCondition = {"andConditions": andConditions}
|
||||
print("filterCondition", filterCondition)
|
||||
return filterCondition
|
||||
|
||||
|
||||
# 获取having条件的串
|
||||
def havingjson(schemajson, testpoint, field):
|
||||
number = random.randint(0, 100000)
|
||||
schemametrics = schemajson["data"]["doc"]["schema_query"]["metrics"]
|
||||
aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"]
|
||||
schemametrics.append("common_log_id")
|
||||
metricslist = []
|
||||
if testpoint == "Having_Field" or testpoint == "Having_Aggregate" or testpoint == "Having_Operator":
|
||||
metricslist.append(field)
|
||||
else:
|
||||
metricslist = random.sample(schemametrics, 4)
|
||||
fields = schemajson["data"]["fields"]
|
||||
|
||||
if testpoint == "Having_Aggregate":
|
||||
Aggregate = ["COUNTT"]
|
||||
else:
|
||||
Aggregate = ["COUNT", "AVG", "SUM", "MAX", "MIN"]
|
||||
|
||||
if testpoint == "Having_Operator":
|
||||
operator = ["=="]
|
||||
else:
|
||||
operator = ["=", "!=", ">", "<", ">=", "<="]
|
||||
|
||||
andConditions_list = []
|
||||
# 遍历的到的having条件列表
|
||||
if testpoint == "Having_Field":
|
||||
orConditions_list=[]
|
||||
havingdict = {"name": field, "function": "count","expression": "=", "value": 11}
|
||||
orConditions_list.append(havingdict)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions_list.append(orConditions)
|
||||
elif testpoint == "Having_Aggregate":
|
||||
for j in fields:
|
||||
if field == j["name"]:
|
||||
name = j["name"]
|
||||
type1 = j["type"]
|
||||
for v in aggregation:
|
||||
if type1 == v["type"]:
|
||||
orConditions_list = []
|
||||
if v["type"] != "string":
|
||||
functionslist = Aggregate
|
||||
else:
|
||||
functionslist = ["COUNTT"]
|
||||
if field == "common_log_id":
|
||||
functionslist = ["COUNTT"]
|
||||
functions_1 = random.sample(functionslist, 1)
|
||||
operator_1 = random.sample(operator, 1)
|
||||
havingdict = {"name": name, "function": str.lower(functions_1[0]),
|
||||
"expression": operator_1[0], "value": str(number)}
|
||||
orConditions_list.append(havingdict)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions_list.append(orConditions)
|
||||
elif testpoint == "Having_Operator":
|
||||
for j in fields:
|
||||
if field == j["name"]:
|
||||
name = j["name"]
|
||||
type1 = j["type"]
|
||||
for v in aggregation:
|
||||
if type1 == v["type"]:
|
||||
orConditions_list = []
|
||||
if v["type"] != "string":
|
||||
functionslist = Aggregate
|
||||
else:
|
||||
functionsstr = v["functions"]
|
||||
functionslist = functionsstr.split(",")
|
||||
if field == "common_log_id":
|
||||
functionslist = ["COUNT"]
|
||||
functions_1 = random.sample(functionslist, 1)
|
||||
if functions_1 == "COUNT_DISTINCT" and type1 != "string":
|
||||
functions_1 = random.sample(functionslist, 1)
|
||||
operator_1 = random.sample(operator, 1)
|
||||
|
||||
havingdict = {"name": name, "function": str.lower(functions_1[0]),
|
||||
"expression": operator_1[0], "value": str(number)}
|
||||
orConditions_list.append(havingdict)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions_list.append(orConditions)
|
||||
|
||||
else:
|
||||
for i in metricslist:
|
||||
for j in fields:
|
||||
if i == j["name"]:
|
||||
name = j["name"]
|
||||
type1 = j["type"]
|
||||
for v in aggregation:
|
||||
if type1 == v["type"]:
|
||||
orConditions_list = []
|
||||
if v["type"] != "string":
|
||||
functionslist = Aggregate
|
||||
else:
|
||||
functionsstr = v["functions"]
|
||||
functionslist = functionsstr.split(",")
|
||||
if field == "common_log_id":
|
||||
functionslist = ["COUNT"]
|
||||
if testpoint == "Having":
|
||||
for functions_1 in functionslist:
|
||||
for operator_1 in operator:
|
||||
havingdict = {"name": name, "function": str.lower(functions_1),
|
||||
"expression": operator_1, "value": str(number)}
|
||||
orConditions_list.append(havingdict)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions_list.append(orConditions)
|
||||
else:
|
||||
functions_1 = random.sample(functionslist, 1)
|
||||
if functions_1 == "COUNT_DISTINCT" and type1 != "string":
|
||||
functions_1 = random.sample(functionslist, 1)
|
||||
operator_1 = random.sample(operator, 1)
|
||||
|
||||
havingdict = {"name": name, "function": str.lower(functions_1[0]),
|
||||
"expression": operator_1[0], "value": str(number)}
|
||||
orConditions_list.append(havingdict)
|
||||
orConditions = {"orConditions": orConditions_list}
|
||||
andConditions_list.append(orConditions)
|
||||
havingCondition = {"andConditions": andConditions_list}
|
||||
print("having", havingCondition)
|
||||
return havingCondition
|
||||
|
||||
# 拼接字符串
|
||||
def datasetjson(schemauerl, token, testname, logtype, testpoint, field):
|
||||
schema_new = schema(schemauerl, token, logtype)
|
||||
group_re = groupby(schema_new, logtype, testpoint)
|
||||
groupColumnList = group_re[0]
|
||||
group_randomstr = group_re[1]
|
||||
queryColumnList = DataBindings(schema_new, group_randomstr, testpoint, field)
|
||||
filterCondition_1 = filterCondition(schema_new, testpoint, field)
|
||||
havingjson_1 = havingjson(schema_new, testpoint, field)
|
||||
if testpoint == "LogType":
|
||||
logtype = field
|
||||
datasetdict = {
|
||||
"list": {
|
||||
"name": testname,
|
||||
"logType": logtype,
|
||||
"groupColumnList": groupColumnList,
|
||||
"queryColumnList": queryColumnList,
|
||||
"filterCondition": filterCondition_1,
|
||||
"havingCondition": havingjson_1
|
||||
}
|
||||
}
|
||||
print(datasetdict)
|
||||
print("datasetjson", json.dumps(datasetdict))
|
||||
return json.dumps(datasetdict)
|
||||
|
||||
def ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, datasetgeturl, chargeturl, testname, logtype,
|
||||
testpoint, field=None):
|
||||
headers = {"Content-Type": "application/json", "Authorization": token}
|
||||
# dataset生成json串并发送请求
|
||||
_datasetjson = datasetjson(schemaurl, token, testname, logtype, testpoint, field)
|
||||
response1 = requests.post(url=dataseturl, data=_datasetjson, headers=headers)
|
||||
print("返回数据1", response1)
|
||||
code = response1.json()["code"]
|
||||
print("datasetcode:", code)
|
||||
if testpoint == "LogType":
|
||||
assert code == 40040002
|
||||
elif testpoint == "GroupBy":
|
||||
assert code == 40040008
|
||||
elif testpoint == "DataBindings_Field":
|
||||
assert code == 40040004
|
||||
elif testpoint == "DataBindings_Aggregate":
|
||||
assert code == 40040006
|
||||
elif testpoint == "Filter_Field":
|
||||
assert code == 40040007
|
||||
elif testpoint == "Filter_Operator":
|
||||
assert code == 40040010
|
||||
elif testpoint == "Having_Field":
|
||||
assert code == 40040074
|
||||
elif testpoint == "Having_Aggregate":
|
||||
assert code == 40040072
|
||||
elif testpoint == "Having_Operator":
|
||||
assert code == 40040073
|
||||
|
||||
def ReportPositiveTest_Negtive(host, port, token, dataseturl, charurl, repporturl, logtypelist):
|
||||
testpoint=["LogType","GroupBy","DataBindings_Field","DataBindings_Aggregate","Filter_Field","Filter_Operator","Having_Field","Having_Aggregate","Having_Operator"]
|
||||
for logtype in logtypelist:
|
||||
schemaurl = "http://" + host + ":" + port + "/v1/log/schema?logType=" + logtype
|
||||
schema_new = schema(schemaurl, token, logtype)
|
||||
metrics = schema_new["data"]["doc"]["schema_query"]["metrics"]
|
||||
schemafilters = schema_new["data"]["doc"]["schema_query"]["filters"]
|
||||
metrics.append("common_log_id")
|
||||
for j in testpoint:
|
||||
print(j)
|
||||
if j == "LogType":
|
||||
testname = "Report" + logtype + j
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
filter = "Negtive_log"
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
|
||||
if j == "GroupBy":
|
||||
testname = "Report" + logtype + j
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
filter = "GroupByNegtive"
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
|
||||
if j == "DataBindings_Field":
|
||||
testname = "Report" + logtype + j
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
filter = "DataBindingsFieldNegtive"
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
|
||||
if j == "DataBindings_Aggregate":
|
||||
for filter in metrics:
|
||||
testname = "Report" + logtype + j + filter
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
|
||||
if j == "Filter_Field":
|
||||
testname = "Report" + logtype + j
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
filter = "FilterFieldNegtive"
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
|
||||
if j == "Filter_Operator":
|
||||
for filter in schemafilters:
|
||||
testname = "Report" + logtype + j + filter
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
if j == "Having_Field":
|
||||
testname = "Report" + logtype + j
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
filter="HavingFieldNegtive"
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
|
||||
if j == "Having_Aggregate":
|
||||
for filter in metrics:
|
||||
testname = "Report" + logtype + j + filter
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
|
||||
if j == "Having_Operator":
|
||||
for filter in metrics:
|
||||
testname = "Report" + logtype + j + filter
|
||||
dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
|
||||
char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
|
||||
ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
|
||||
testname, logtype, j, filter)
|
||||
Reference in New Issue
Block a user