import requests import random import json import time import ipaddress #生成随机ipv4或ipv6 MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1 MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1 def random_ipv4(): # return ipaddress.IPv4Address._randomstring_from_ip_int( # random.randint(0, MAX_IPV4) # ) re="192.168.50.11" return re def random_ipv6(): return ipaddress.IPv6Address._randomstring_from_ip_int( random.randint(0, MAX_IPV6) ) #随机生成邮箱地址 def RandomEmail( emailType=None, rang=None): __emailtype = ["@qq.com", "@163.com", "@126.com", "@189.com"] # 如果没有指定邮箱类型,默认在 __emailtype中随机一个 if emailType == None: __randomEmail = random.choice(__emailtype) else: __randomEmail = emailType # 如果没有指定邮箱长度,默认在4-10之间随机 if rang == None: __rang = random.randint(4, 10) else: __rang = int(rang) __Number = "0123456789qbcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPWRSTUVWXYZ" __randomNumber = "".join(random.choice(__Number) for i in range(__rang)) _email = __randomNumber + __randomEmail return _email #获取Schema def schema(schemauerl,token): url ="http://192.168.44.72:8080/v1/log/schema?logType=security_event_log" headers = {"Content-Type":"application/x-www-form-urlencoded","Authorization":token} response = requests.get(url=url,headers=headers) return response.json() #获取json串中groupColumnList的值 def groupby(schemajson): #生成随机数 # securitynb=random.randint(1,33) # proxynb=random.randint(1,23) # sessionnb=random.randint(1,24) # Radiusnb=random.randint(1,4) #取出group值得列表 dimensions=schemajson["data"]["doc"]["schema_query"]["dimensions"] dimensions.append("common_recv_time"); # print(dimensions) randomstr_1= random.sample(dimensions, 4) #定义grp为返回值group的列表 grp=[] for i in randomstr_1: a={"name":i} grp.append(a) re=[grp,randomstr_1] return re #获取json串中queryColumnList的值 def DataBindings(schemajson,randomstr_1): #生成queryColumnList列表 metrics=schemajson["data"]["doc"]["schema_query"]["metrics"] metrics.append("common_log_id") #在列表里随机去除元素 randomstr_2= random.sample(metrics, 6) #在聚合列表中去掉groupby中的重复的元素 randomstr_3=array_diff(randomstr_2,randomstr_1) #将groupby中元素添加到串中 qul=[] for i in randomstr_1: a={"name":i} qul.append(a) fields = schemajson["data"]["fields"] list_1=["sum","min","max","avg","count"] list_2=["count","count_distinct"] for i in randomstr_3: for j in fields: if i == j["name"] : jtype=j["type"] if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double": radomlist=random.sample(list_1,1) randomstr_4={"name":i,"expression":radomlist[0]} qul.append(randomstr_4) elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp": randomlist=random.sample(list_2,1) randomstr_4={"name":i,"expression":randomlist[0]} qul.append(randomstr_4) return qul # #去除a列表中存在的b的元素 def array_diff(a, b): #定义空列表 c=[] #range(len(a))取的为列表a的索引,根据a的 for i in range(len(a)): #取出索引对应的值 t=a[i] #判断值是否存在在序列b中 if t not in b: #如果序列不在b中,则写入序列c c.append(t) #返回序列c,c就是列表a去除列表b之后的元素 return c def filterCondition(schemajson): number = random.randint(0,100000) randomstr= random.choice('abcdefghijklmnopqrstuvwxyz') schemafilters=schemajson["data"]["doc"]["schema_query"]["filters"] list1= random.sample(schemafilters,3) print("条件列表",list1) print("number",number) print("randomstr",randomstr) print("str(number)",str(number)) #获取不同属性支持的部不同操作 fields = schemajson["data"]["fields"] operator = schemajson["data"]["doc"]["schema_query"]["references"]["operator"] andConditions=[] for i in list1: #遍历fields列表 for k in fields: #当filters列表值等于fields的name时 if i == k["name"]: name = k["name"] doc = k["doc"] #获取无任何特殊说明列: if doc == None: type1 = k["type"] if type1 == "int" or type1 == "long": orConditions_list=[] Operator=["=","!=",">","<",">=","<="] randomOperator= random.sample(Operator, 1) value=[str(number)] Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1} orConditions_list.append(Field) orConditions={"orConditions":orConditions_list} andConditions.append(orConditions) elif type1 == "string": orConditions_list=[] Operator=["=","!=","Like","Not Like","notEmpty"] randomOperator_1= random.sample(Operator, 1) randomOperator=randomOperator_1[0] value=[] if randomOperator == "=" or randomOperator == "!=": value.append(str(number)) elif randomOperator == "Like" or randomOperator == "Not Like": value.append(randomstr) elif randomOperator=="notEmpty": value=[] Field={"name":name,"expression":randomOperator,"value":value,"type":type1} orConditions_list.append(Field) orConditions={"orConditions":orConditions_list} andConditions.append(orConditions) else: if k["doc"]["constraints"]== None: type1 = k["type"] if type1 == "int" or type1 == "long": orConditions_list=[] Operator=["=","!=",">","<",">=","<="] randomOperator= random.sample(Operator, 1) value=[str(number)] Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1} orConditions_list.append(Field) orConditions={"orConditions":orConditions_list} andConditions.append(orConditions) elif type1 == "string": orConditions_list=[] Operator=["=","!=","Like","Not Like","notEmpty"] randomOperator_1= random.sample(Operator, 1) randomOperator=randomOperator_1[0] value=[] if randomOperator == "=" or randomOperator == "!=": value.append(str(number)) elif randomOperator == "Like" or randomOperator == "Not Like": value.append(randomstr) elif randomOperator=="notEmpty": value=[] Field={"name":name,"expression":randomOperator,"value":value,"type":type1} orConditions_list.append(Field) orConditions={"orConditions":orConditions_list} andConditions.append(orConditions) else: if k["doc"]["constraints"]["operator_functions"]==None: conrandomstraints=k["doc"]["constraints"] type1 = k["type"] if type1 == "int" or type1 == "long": Operator=["=","!=",">","<",">=","<="] randomOperator_1= random.sample(Operator, 1) randomOperator=randomOperator_1[0] if conrandomstraints["type"] == "timestamp": #获取当前时间戳 t = int(time.time()) orConditions_list=[] value=[str(t)] Field={"name":name,"expression":randomOperator,"value":value,"type":type1} orConditions_list.append(Field) orConditions={"orConditions":orConditions_list} andConditions.append(orConditions) elif type1 == "string": Operator=["=","!=","Like","Not Like","notEmpty"] randomOperator_1= random.sample(Operator, 1) randomOperator=randomOperator_1[0] if conrandomstraints["type"] == "ip": orConditions_list=[] #获取ip ip =random_ipv4() value=[] if randomOperator == "=" or randomOperator == "!=": value.append(ip) elif randomOperator == "Like" or randomOperator == "Not Like": value.append(ip) elif randomOperator=="notEmpty": value=[] Field={"name":name,"expression":randomOperator,"value":value,"type":type1} orConditions_list.append(Field) orConditions={"orConditions":orConditions_list} andConditions.append(orConditions) elif conrandomstraints["type"] == "email": orConditions_list=[] Operator=["=","!=","Like","Not Like","notEmpty"] randomOperator_1= random.sample(Operator, 1) randomOperator=randomOperator_1[0] #获取ip emil =RandomEmail() value=[] if randomOperator == "=" or randomOperator == "!=": value.append(emil) elif randomOperator == "Like" or randomOperator == "Not Like": value.append(emil) elif randomOperator=="notEmpty": value=[] Field={"name":name,"expression":randomOperator,"value":value,"type":type1} orConditions_list.append(Field) orConditions={"orConditions":orConditions_list} andConditions.append(orConditions) else: type1 = k["type"] orConditions_list=[] operator1 = k["doc"]["constraints"]["operator_functions"] operator2 = operator1.split(",") operator3=random.sample(operator2,1) operatordata=k["doc"]["data"] code=[] for i in operatordata: code_1=i["code"] code.append(code_1) code2=random.sample(code, 1) Field={"name":name,"expression":operator3[0],"value":code2,"type":type1} orConditions_list.append(Field) orConditions={"orConditions":orConditions_list} andConditions.append(orConditions) filterCondition={"andConditions":andConditions} print(filterCondition) return filterCondition #获取having条件的串 def Havinginfo(schemajson): number = random.randint(0,100000) schemametrics=schemajson["data"]["doc"]["schema_query"]["metrics"] aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"] schemametrics.append("common_log_id") metricslist= random.sample(schemametrics,3) fields = schemajson["data"]["fields"] operator=["=","!=",">","<",">=","<="] andConditions_list=[] #遍历的到的having条件列表 for i in metricslist: for j in fields: if i == j["name"]: name = j["name"] type1=j["type"] for v in aggregation: if type1 == v["type"]: orConditions_list=[] functionsstr=v["functions"] functionslist = functionsstr.split(",") functions_1=random.sample(functionslist, 1) if functions_1=="COUNT_DISTINCT" and type1 != "string": functions_1=random.sample(functionslist, 1) operator_1=random.sample(operator, 1) havingdict={"name":name,"function":str.lower(functions_1[0]),"expression":operator_1[0],"value":str(number)} orConditions_list.append(havingdict) orConditions={"orConditions":orConditions_list} andConditions_list.append(orConditions) havingCondition={"andConditions":andConditions_list} print(havingCondition) return havingCondition #拼接字符串 def datasetjson(schemauerl,token,testname): #获取Schema # aaa=["common_sled_ip","common_device_id","common_sessions","common_c2s_pkt_num"] schema_new=schema(schemauerl,token) group_re=groupby(schema_new) groupColumnList=group_re[0] group_randomstr=group_re[1] queryColumnList=DataBindings(schema_new,group_randomstr) filterCondition_1=filterCondition(schema_new) Havinginfo_1=Havinginfo(schema_new) datasetdict = { "list": { "name":testname, "logType": "security_event_log", "groupColumnList":groupColumnList, "queryColumnList":queryColumnList, "filterCondition":filterCondition_1, "havingCondition":Havinginfo_1 # "isInitialize":0 } } print(datasetdict) print(json.dumps(datasetdict)) return json.dumps(datasetdict) #拼接char得串 def charjson(schemaurl,token,datasetjson): schema_new=schema(schemauerl,token) queryColumnList=datasetjson["list"]["queryColumnList"] fields = schema_new["data"]["fields"] namelist=[] #获取条件得label for i in queryColumnList: for j in fields: if i == j["name"]: namelist.append(j[label]) #图表类型列表 chartType_1=["line","pie","bar","area","table"] chartType_2=["pie","bar","table"] chartType=[] #随机选择图表类型 for i in namelist: if i == "Receive Time" or i == "Start Time" or i == "End Time": chartType=random.sample(chartType_1, 1) else: chartType=random.sample(chartType_2, 1) if chartType[0] == "line": dataBinding=[] #将时间条件赋值给dataBinding for j in namelist: if j == "Receive Time" or j == "Start Time" or j == "End Time": dataBinding.append(j) elif chartType[0] == "pie": print(111) elif chartType[0] == "bar": print(111) elif chartType[0] == "area": print(111) elif chartType[0] == "table": print(111) def ReportInterfaceTest(schemaurl,token,dataseturl,datasetgeturl,testname): headers = {"Content-Type": "application/json","Authorization": token} reportjsom=datasetjson(schemaurl, token,testname) response1 = requests.post(url=dataseturl, data=reportjsom, headers=headers) print("返回数据",response1) code = response1.json()["code"] print("code:",code) assert code == 200 #获取id datasetget=requests.get(url=datasetgeturl,headers=headers) print(datasetget.json()) dasetget=datasetget.json() id=dasetget["data"]["list"][0]["id"] return id