This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
dongxiaoyan-tsg-autotest/04-CustomLibrary/Custometest/ReportSchema.py
2021-04-09 14:31:00 +08:00

522 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import random
import json
import time
import ipaddress
#生成随机ipv4或ipv6
MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
def random_ipv4():
return ipaddress.IPv4Address._string_from_ip_int(
random.randint(0, MAX_IPV4))
def random_ipv6():
return ipaddress.IPv6Address._string_from_ip_int(
random.randint(0, MAX_IPV6))
#随机生成邮箱地址
def RandomEmail( emailType=None, rang=None):
__emailtype = ["@qq.com", "@163.com", "@126.com", "@189.com"]
# 如果没有指定邮箱类型,默认在 __emailtype中随机一个
if emailType == None:
__randomEmail = random.choice(__emailtype)
else:
__randomEmail = emailType
# 如果没有指定邮箱长度默认在4-10之间随机
if rang == None:
__rang = random.randint(4, 10)
else:
__rang = int(rang)
__Number = "0123456789qbcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPWRSTUVWXYZ"
__randomNumber = "".join(random.choice(__Number) for i in range(__rang))
_email = __randomNumber + __randomEmail
return _email
#获取Schema
def schema(schemauerl,token,logtype):
url ="http://192.168.44.72:8080/v1/log/schema?logType="+logtype
headers = {"Content-Type":"application/x-www-form-urlencoded","Authorization":token}
response = requests.get(url=url,headers=headers)
return response.json()
#获取json串中groupColumnList的值
def groupby(schemajson):
#生成随机数
# securitynb=random.randint(1,33)
# proxynb=random.randint(1,23)
# sessionnb=random.randint(1,24)
# Radiusnb=random.randint(1,4)
#取出group值得列表
dimensions=schemajson["data"]["doc"]["schema_query"]["dimensions"]
dimensions.append("common_recv_time");
# print(dimensions)
randomstr_1= random.sample(dimensions, 4)
#定义grp为返回值group的列表
grp=[]
for i in randomstr_1:
a={"name":i}
grp.append(a)
re=[grp,randomstr_1]
return re
#获取json串中queryColumnList的值
def DataBindings(schemajson,randomstr_1):
#生成queryColumnList列表
metrics=schemajson["data"]["doc"]["schema_query"]["metrics"]
metrics.append("common_log_id")
#在列表里随机去除元素
randomstr_2= random.sample(metrics, 6)
#在聚合列表中去掉groupby中的重复的元素
randomstr_3=array_diff(randomstr_2,randomstr_1)
#将groupby中元素添加到串中
qul=[]
for i in randomstr_1:
a={"name":i}
qul.append(a)
fields = schemajson["data"]["fields"]
list_1=["sum","min","max","avg","count"]
list_2=["count","count_distinct"]
for i in randomstr_3:
for j in fields:
if i == j["name"] :
jtype=j["type"]
if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double":
radomlist=random.sample(list_1,1)
randomstr_4={"name":i,"expression":radomlist[0]}
qul.append(randomstr_4)
elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp":
randomlist=random.sample(list_2,1)
randomstr_4={"name":i,"expression":randomlist[0]}
qul.append(randomstr_4)
return qul
# #去除a列表中存在的b的元素
def array_diff(a, b):
#定义空列表
c=[]
#range(len(a))取的为列表a的索引根据a的
for i in range(len(a)):
#取出索引对应的值
t=a[i]
#判断值是否存在在序列b中
if t not in b:
#如果序列不在b中则写入序列c
c.append(t)
#返回序列cc就是列表a去除列表b之后的元素
return c
def filterCondition(schemajson):
number = random.randint(0,100000)
randomstr= random.choice('abcdefghijklmnopqrstuvwxyz')
schemafilters=schemajson["data"]["doc"]["schema_query"]["filters"]
list1= random.sample(schemafilters,3)
print("条件列表",list1)
print("number",number)
print("randomstr",randomstr)
print("str(number)",str(number))
#获取不同属性支持的部不同操作
fields = schemajson["data"]["fields"]
operator = schemajson["data"]["doc"]["schema_query"]["references"]["operator"]
andConditions=[]
for i in list1:
#遍历fields列表
for k in fields:
#当filters列表值等于fields的name时
if i == k["name"]:
name = k["name"]
doc = k["doc"]
#获取无任何特殊说明列:
if doc == None:
type1 = k["type"]
if type1 == "int" or type1 == "long":
orConditions_list=[]
Operator=["=","!=",">","<",">=","<="]
randomOperator= random.sample(Operator, 1)
value=[str(number)]
Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1}
orConditions_list.append(Field)
orConditions={"orConditions":orConditions_list}
andConditions.append(orConditions)
elif type1 == "string":
orConditions_list=[]
Operator=["=","!=","Like","Not Like","notEmpty"]
randomOperator_1= random.sample(Operator, 1)
randomOperator=randomOperator_1[0]
value=[]
if randomOperator == "=" or randomOperator == "!=":
value.append(str(number))
elif randomOperator == "Like" or randomOperator == "Not Like":
value.append(randomstr)
elif randomOperator=="notEmpty":
value=[]
Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
orConditions_list.append(Field)
orConditions={"orConditions":orConditions_list}
andConditions.append(orConditions)
else:
if k["doc"]["constraints"]== None:
type1 = k["type"]
if type1 == "int" or type1 == "long":
orConditions_list=[]
Operator=["=","!=",">","<",">=","<="]
randomOperator= random.sample(Operator, 1)
value=[str(number)]
Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1}
orConditions_list.append(Field)
orConditions={"orConditions":orConditions_list}
andConditions.append(orConditions)
elif type1 == "string":
orConditions_list=[]
Operator=["=","!=","Like","Not Like","notEmpty"]
randomOperator_1= random.sample(Operator, 1)
randomOperator=randomOperator_1[0]
value=[]
if randomOperator == "=" or randomOperator == "!=":
value.append(str(number))
elif randomOperator == "Like" or randomOperator == "Not Like":
value.append(randomstr)
elif randomOperator=="notEmpty":
value=[]
Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
orConditions_list.append(Field)
orConditions={"orConditions":orConditions_list}
andConditions.append(orConditions)
else:
if k["doc"]["constraints"]["operator_functions"]==None:
conrandomstraints=k["doc"]["constraints"]
type1 = k["type"]
if type1 == "int" or type1 == "long":
Operator=["=","!=",">","<",">=","<="]
randomOperator_1= random.sample(Operator, 1)
randomOperator=randomOperator_1[0]
if conrandomstraints["type"] == "timestamp":
#获取当前时间戳
t = int(time.time())
orConditions_list=[]
value=[str(t)]
Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
orConditions_list.append(Field)
orConditions={"orConditions":orConditions_list}
andConditions.append(orConditions)
elif type1 == "string":
Operator=["=","!=","Like","Not Like","notEmpty"]
randomOperator_1= random.sample(Operator, 1)
randomOperator=randomOperator_1[0]
if conrandomstraints["type"] == "ip":
orConditions_list=[]
#获取ip
ip =random_ipv4()
value=[]
if randomOperator == "=" or randomOperator == "!=":
value.append(ip)
elif randomOperator == "Like" or randomOperator == "Not Like":
value.append(ip)
elif randomOperator=="notEmpty":
value=[]
Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
orConditions_list.append(Field)
orConditions={"orConditions":orConditions_list}
andConditions.append(orConditions)
elif conrandomstraints["type"] == "email":
orConditions_list=[]
Operator=["=","!=","Like","Not Like","notEmpty"]
randomOperator_1= random.sample(Operator, 1)
randomOperator=randomOperator_1[0]
#获取ip
emil =RandomEmail()
value=[]
if randomOperator == "=" or randomOperator == "!=":
value.append(emil)
elif randomOperator == "Like" or randomOperator == "Not Like":
value.append(emil)
elif randomOperator=="notEmpty":
value=[]
Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
orConditions_list.append(Field)
orConditions={"orConditions":orConditions_list}
andConditions.append(orConditions)
else:
type1 = k["type"]
orConditions_list=[]
operator1 = k["doc"]["constraints"]["operator_functions"]
operator2 = operator1.split(",")
operator3=random.sample(operator2,1)
operatordata=k["doc"]["data"]
code=[]
for i in operatordata:
code_1=i["code"]
code.append(code_1)
code2=random.sample(code, 1)
Field={"name":name,"expression":operator3[0],"value":code2,"type":type1}
orConditions_list.append(Field)
orConditions={"orConditions":orConditions_list}
andConditions.append(orConditions)
filterCondition={"andConditions":andConditions}
print(filterCondition)
return filterCondition
#获取having条件的串
def havingjson(schemajson):
number = random.randint(0,100000)
schemametrics=schemajson["data"]["doc"]["schema_query"]["metrics"]
aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"]
schemametrics.append("common_log_id")
metricslist= random.sample(schemametrics,3)
fields = schemajson["data"]["fields"]
operator=["=","!=",">","<",">=","<="]
andConditions_list=[]
#遍历的到的having条件列表
for i in metricslist:
for j in fields:
if i == j["name"]:
name = j["name"]
type1=j["type"]
for v in aggregation:
if type1 == v["type"]:
orConditions_list=[]
functionsstr=v["functions"]
functionslist = functionsstr.split(",")
functions_1=random.sample(functionslist, 1)
if functions_1=="COUNT_DISTINCT" and type1 != "string":
functions_1=random.sample(functionslist, 1)
operator_1=random.sample(operator, 1)
havingdict={"name":name,"function":str.lower(functions_1[0]),"expression":operator_1[0],"value":str(number)}
orConditions_list.append(havingdict)
orConditions={"orConditions":orConditions_list}
andConditions_list.append(orConditions)
havingCondition={"andConditions":andConditions_list}
print(havingCondition)
return havingCondition
#拼接字符串
def datasetjson(schemauerl,token,testname,logtype):
schema_new=schema(schemauerl,token,logtype)
group_re=groupby(schema_new)
groupColumnList=group_re[0]
group_randomstr=group_re[1]
queryColumnList=DataBindings(schema_new,group_randomstr)
filterCondition_1=filterCondition(schema_new)
havingjson_1=havingjson(schema_new)
datasetdict = {
"list": {
"name":testname,
"logType": "security_event_log",
"groupColumnList":groupColumnList,
"queryColumnList":queryColumnList,
"filterCondition":filterCondition_1,
"havingCondition":havingjson_1
}
}
print(datasetdict)
print(json.dumps(datasetdict))
return json.dumps(datasetdict)
#拼接char的json串
def charjson(schemaurl,token,queryColumnList,groupColumnList,datasetid,testname,logtype):
print("queryColumnList",queryColumnList)
schema_new=schema(schemaurl,token,logtype)
fields = schema_new["data"]["fields"]
# 获取条件的label
namelist=[]
for i in queryColumnList:
for j in fields:
if i["name"] == j["name"]:
j_label=j["label"]
namelist.append(j_label)
print("namelist",namelist)
#获取聚合条件的label
groupColumnlaberList=[]
for i in groupColumnList:
for j in fields:
if i["name"] == j["name"]:
j_label=j["label"]
groupColumnlaberList.append(j_label)
print("groupColumnlaberList",groupColumnlaberList)
#图表类型列表
chartType_1=["line","pie","bar","area","table"]
chartType_2=["pie","bar","table"]
chartType=[]
# #随机选择图表类型
s=1
for i in namelist:
if i == "Receive Time" or i == "Start Time" or i == "End Time":
s+=1
if s != 1:
chartType=random.sample(chartType_1, 1)
else:
chartType=random.sample(chartType_2, 1)
chardict={}
print("chartType",chartType)
if chartType[0] == "line" or chartType[0] == "area":
dataBinding=[]
#将时间条件赋值给dataBinding
for j in namelist:
if j == "Receive Time" or j == "Start Time" or j == "End Time":
dataBinding.append(j)
timelin={
"dataBinding": dataBinding[0],
"format": "Time"
}
print("timelin",timelin)
namelist.remove(dataBinding[0]) #从统计查询数据列对象内去掉时间条件
groupColumnlaberList.remove(dataBinding[0]) #从聚合条件内去掉时间的条件
for i in groupColumnlaberList: #从统计查询条件内去掉聚合条件内的值
namelist.remove(i)
print("namelistrome",namelist)
linlist=[]
for i in namelist:
lindict={
"dataBinding": i,
"type": "Line Up",
"format": "Default",
}
linlist.append(lindict)
listdict={
"name": testname,
"datasetId": datasetid,
"datasetName": "",
"chartType": chartType[0],
"dataTop": 0,
"orderBy": "",
"orderDesc": 0,
"drilldownTop": 0,
"timeline": timelin,
"line": linlist
}
chardict={"list": listdict}
elif chartType[0] == "pie" or chartType[0] == "bar":
xAxisdataBinding=random.sample(groupColumnlaberList, 1)
xAxisdict={
"dataBinding": xAxisdataBinding[0],
"dataTop": 5,
"dataType": ""
}
for i in groupColumnlaberList:
namelist.remove(i)
yAxisBinding=random.sample(namelist, 1)
yAxisdict={
"dataBinding": yAxisBinding[0],
"format": "Default",
}
yAxislist=[yAxisdict]
listdict={
"name": testname,
"datasetId": datasetid,
"datasetName": "",
"chartType": chartType[0],
"dataTop": 0,
"orderBy": "",
"orderDesc": "",
"xAxis": xAxisdict,
"yAxis": yAxislist
}
chardict={"list": listdict}
elif chartType[0] == "table":
columnslist=[]
for i in namelist:
dataBindings={
"dataType": "",
"dataBinding": i,
"format": "Default",
}
dataBindingslist=[]
dataBindingslist.append(dataBindings)
columnsdict={
"title": i,
"width": 0,
"dataBindings": dataBindingslist
}
columnslist.append(columnsdict)
listdict={
"name": testname,
"datasetId": datasetid,
"datasetName": "",
"chartType": "table",
"dataTop": 5,
"orderBy": "",
"orderDesc": "",
"drilldownTop": 5,
"tableType": "Regular",
"columns": columnslist
}
chardict={"list": listdict}
print(json.dumps(chardict))
return json.dumps(chardict)
def Reportsjson(chartId,testname):
charlist=[]
chardict={
"chartId": chartId,
"timeGranulartiy": 1,
"timeUnit": "",
# "disabled": true
}
charlist.append(chardict)
reportJobList=[]
reportJobdct_1={
"rangeType": "last",
"rangeInterval": 1,
"rangeUnit": "week",
"jobName": testname,
"scheduleId": "",
"chartList": charlist,
"isNotice": 0,
"noticeMethod": "",
"startTime": "",
"endTime": "",
"filterCondition": None,
"isDisplayTrafficTrend": 1
}
reportJobdct_2={"reportJobList": reportJobdct_1}
print(json.dumps(reportJobdct_2))
return json.dumps(reportJobdct_2)
def ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,datasetgeturl,chargeturl,testname,logtype):
headers = {"Content-Type": "application/json","Authorization": token}
#dataset生成json串并发送请求
_datasetjson=datasetjson(schemaurl, token,testname,logtype)
response1 = requests.post(url=dataseturl, data=_datasetjson, headers=headers)
print("返回数据1",response1)
code = response1.json()["code"]
print("datasetcode:",code)
assert code == 200
#获取dataset的id
datasetget=requests.get(url=datasetgeturl,headers=headers)
dasetget=datasetget.json()
datesetid=dasetget["data"]["list"][0]["id"]
_datasetjson=json.loads(_datasetjson)
queryColumnList=_datasetjson["list"]["queryColumnList"]
groupColumnList=_datasetjson["list"]["groupColumnList"]
#生成charlibrariesjson串
charlibrariesjson=charjson(schemaurl, token,queryColumnList,groupColumnList,datesetid,testname,logtype)
response2 = requests.post(url=charurl, data=charlibrariesjson, headers=headers)
code = response2.json()["code"]
assert code == 200
#获取char libraries的id
charget=requests.get(url=chargeturl,headers=headers)
charget=charget.json()
charid=charget["data"]["list"][0]["id"]
#report生成json串并发送请求
reportjson=Reportsjson(charid,testname)
response3 = requests.post(url=repporturl, data=reportjson, headers=headers)
code = response3.json()["code"]
assert code == 200