修改Report纯接口关键字,新增纯接口用例
This commit is contained in:
@@ -3,6 +3,7 @@ import random
|
||||
import json
|
||||
import time
|
||||
import ipaddress
|
||||
from builtins import list
|
||||
#生成随机ipv4或ipv6
|
||||
MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
|
||||
MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
|
||||
@@ -39,7 +40,7 @@ def schema(schemauerl,token,logtype):
|
||||
return response.json()
|
||||
|
||||
#获取json串中groupColumnList的值
|
||||
def groupby(schemajson):
|
||||
def groupby(schemajson,logtype,testpoint):
|
||||
#生成随机数
|
||||
# securitynb=random.randint(1,33)
|
||||
# proxynb=random.randint(1,23)
|
||||
@@ -49,23 +50,35 @@ def groupby(schemajson):
|
||||
dimensions=schemajson["data"]["doc"]["schema_query"]["dimensions"]
|
||||
dimensions.append("common_recv_time");
|
||||
# print(dimensions)
|
||||
randomstr_1= random.sample(dimensions, 4)
|
||||
randomstr_1=[]
|
||||
if testpoint == "GroupBy":
|
||||
randomstr_1=dimensions
|
||||
if logtype == "security_event_log" or logtype == "connection_record_log" or logtype == "voip_record_log" :
|
||||
randomstr_1.remove("common_start_time")
|
||||
randomstr_1.remove("common_end_time")
|
||||
else:
|
||||
randomstr_1= random.sample(dimensions, 4)
|
||||
#定义grp为返回值group的列表
|
||||
grp=[]
|
||||
for i in randomstr_1:
|
||||
a={"name":i}
|
||||
grp.append(a)
|
||||
|
||||
re=[grp,randomstr_1]
|
||||
re=[grp,randomstr_1]
|
||||
print("groupby",re)
|
||||
return re
|
||||
|
||||
#获取json串中queryColumnList的值
|
||||
def DataBindings(schemajson,randomstr_1):
|
||||
def DataBindings(schemajson,randomstr_1,testpoint):
|
||||
#生成queryColumnList列表
|
||||
metrics=schemajson["data"]["doc"]["schema_query"]["metrics"]
|
||||
metrics.append("common_log_id")
|
||||
#在列表里随机去除元素
|
||||
randomstr_2= random.sample(metrics, 6)
|
||||
randomstr_2=[]
|
||||
if testpoint == "DataBindings" :
|
||||
randomstr_2=metrics
|
||||
else :
|
||||
randomstr_2= random.sample(metrics, 6)
|
||||
#在聚合列表中去掉groupby中的重复的元素
|
||||
randomstr_3=array_diff(randomstr_2,randomstr_1)
|
||||
#将groupby中元素添加到串中
|
||||
@@ -85,7 +98,7 @@ def DataBindings(schemajson,randomstr_1):
|
||||
radomlist=random.sample(list_1,1)
|
||||
randomstr_4={"name":i,"expression":radomlist[0]}
|
||||
qul.append(randomstr_4)
|
||||
elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp":
|
||||
elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string":
|
||||
randomlist=random.sample(list_2,1)
|
||||
randomstr_4={"name":i,"expression":randomlist[0]}
|
||||
qul.append(randomstr_4)
|
||||
@@ -107,11 +120,15 @@ def array_diff(a, b):
|
||||
#返回序列c,c就是列表a去除列表b之后的元素
|
||||
return c
|
||||
|
||||
def filterCondition(schemajson):
|
||||
def filterCondition(schemajson,testpoint):
|
||||
number = random.randint(0,100000)
|
||||
randomstr= random.choice('abcdefghijklmnopqrstuvwxyz')
|
||||
schemafilters=schemajson["data"]["doc"]["schema_query"]["filters"]
|
||||
list1= random.sample(schemafilters,3)
|
||||
list1=[]
|
||||
if testpoint== "Filter":
|
||||
list1=schemafilters
|
||||
else:
|
||||
list1= random.sample(schemafilters,3)
|
||||
print("条件列表",list1)
|
||||
print("number",number)
|
||||
print("randomstr",randomstr)
|
||||
@@ -261,12 +278,16 @@ def filterCondition(schemajson):
|
||||
return filterCondition
|
||||
|
||||
#获取having条件的串
|
||||
def havingjson(schemajson):
|
||||
def havingjson(schemajson,testpoint):
|
||||
number = random.randint(0,100000)
|
||||
schemametrics=schemajson["data"]["doc"]["schema_query"]["metrics"]
|
||||
aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"]
|
||||
schemametrics.append("common_log_id")
|
||||
metricslist= random.sample(schemametrics,3)
|
||||
metricslist=[]
|
||||
if testpoint == "Having":
|
||||
metricslist=schemametrics
|
||||
else:
|
||||
metricslist= random.sample(schemametrics,3)
|
||||
fields = schemajson["data"]["fields"]
|
||||
operator=["=","!=",">","<",">=","<="]
|
||||
andConditions_list=[]
|
||||
@@ -294,18 +315,18 @@ def havingjson(schemajson):
|
||||
return havingCondition
|
||||
|
||||
#拼接字符串
|
||||
def datasetjson(schemauerl,token,testname,logtype):
|
||||
def datasetjson(schemauerl,token,testname,logtype,testpoint):
|
||||
schema_new=schema(schemauerl,token,logtype)
|
||||
group_re=groupby(schema_new)
|
||||
group_re=groupby(schema_new,logtype,testpoint)
|
||||
groupColumnList=group_re[0]
|
||||
group_randomstr=group_re[1]
|
||||
queryColumnList=DataBindings(schema_new,group_randomstr)
|
||||
filterCondition_1=filterCondition(schema_new)
|
||||
havingjson_1=havingjson(schema_new)
|
||||
queryColumnList=DataBindings(schema_new,group_randomstr,testpoint)
|
||||
filterCondition_1=filterCondition(schema_new,testpoint)
|
||||
havingjson_1=havingjson(schema_new,testpoint)
|
||||
datasetdict = {
|
||||
"list": {
|
||||
"name":testname,
|
||||
"logType": "security_event_log",
|
||||
"logType": logtype,
|
||||
"groupColumnList":groupColumnList,
|
||||
"queryColumnList":queryColumnList,
|
||||
"filterCondition":filterCondition_1,
|
||||
@@ -477,10 +498,10 @@ def Reportsjson(chartId,testname):
|
||||
print(json.dumps(reportJobdct_2))
|
||||
return json.dumps(reportJobdct_2)
|
||||
|
||||
def ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,datasetgeturl,chargeturl,testname,logtype):
|
||||
def ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,datasetgeturl,chargeturl,testname,logtype,testpoint):
|
||||
headers = {"Content-Type": "application/json","Authorization": token}
|
||||
#dataset生成json串并发送请求
|
||||
_datasetjson=datasetjson(schemaurl, token,testname,logtype)
|
||||
_datasetjson=datasetjson(schemaurl,token,testname,logtype,testpoint)
|
||||
response1 = requests.post(url=dataseturl, data=_datasetjson, headers=headers)
|
||||
print("返回数据1",response1)
|
||||
code = response1.json()["code"]
|
||||
|
||||
Reference in New Issue
Block a user