diff --git a/04-CustomLibrary/Custometest/ReportSchema.py b/04-CustomLibrary/Custometest/ReportSchema.py new file mode 100644 index 0000000..fa13e96 --- /dev/null +++ b/04-CustomLibrary/Custometest/ReportSchema.py @@ -0,0 +1,402 @@ +import requests +import random +import json +import time +import ipaddress +#生成随机ipv4或ipv6 +MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1 +MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1 +def random_ipv4(): +# return ipaddress.IPv4Address._randomstring_from_ip_int( +# random.randint(0, MAX_IPV4) +# ) + re="192.168.50.11" + return re + +def random_ipv6(): + return ipaddress.IPv6Address._randomstring_from_ip_int( + random.randint(0, MAX_IPV6) + ) + +#随机生成邮箱地址 +def RandomEmail( emailType=None, rang=None): + __emailtype = ["@qq.com", "@163.com", "@126.com", "@189.com"] + # 如果没有指定邮箱类型,默认在 __emailtype中随机一个 + if emailType == None: + __randomEmail = random.choice(__emailtype) + else: + __randomEmail = emailType + # 如果没有指定邮箱长度,默认在4-10之间随机 + if rang == None: + __rang = random.randint(4, 10) + else: + __rang = int(rang) + __Number = "0123456789qbcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPWRSTUVWXYZ" + __randomNumber = "".join(random.choice(__Number) for i in range(__rang)) + _email = __randomNumber + __randomEmail + return _email + +#获取Schema +def schema(schemauerl,token): + url ="http://192.168.44.72:8080/v1/log/schema?logType=security_event_log" + headers = {"Content-Type":"application/x-www-form-urlencoded","Authorization":token} + response = requests.get(url=url,headers=headers) + return response.json() + +#获取json串中groupColumnList的值 +def groupby(schemajson): + #生成随机数 +# securitynb=random.randint(1,33) +# proxynb=random.randint(1,23) +# sessionnb=random.randint(1,24) +# Radiusnb=random.randint(1,4) + #取出group值得列表 + dimensions=schemajson["data"]["doc"]["schema_query"]["dimensions"] + dimensions.append("common_recv_time"); +# print(dimensions) + randomstr_1= random.sample(dimensions, 4) + #定义grp为返回值group的列表 + grp=[] + for i in randomstr_1: + a={"name":i} + grp.append(a) + + re=[grp,randomstr_1] + return re + +#获取json串中queryColumnList的值 +def DataBindings(schemajson,randomstr_1): + #生成queryColumnList列表 + metrics=schemajson["data"]["doc"]["schema_query"]["metrics"] + metrics.append("common_log_id") + #在列表里随机去除元素 + randomstr_2= random.sample(metrics, 6) + #在聚合列表中去掉groupby中的重复的元素 + randomstr_3=array_diff(randomstr_2,randomstr_1) + #将groupby中元素添加到串中 + qul=[] + for i in randomstr_1: + a={"name":i} + qul.append(a) + + fields = schemajson["data"]["fields"] + list_1=["sum","min","max","avg","count"] + list_2=["count","count_distinct"] + for i in randomstr_3: + for j in fields: + if i == j["name"] : + jtype=j["type"] + if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double": + radomlist=random.sample(list_1,1) + randomstr_4={"name":i,"expression":radomlist[0]} + qul.append(randomstr_4) + elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp": + randomlist=random.sample(list_2,1) + randomstr_4={"name":i,"expression":randomlist[0]} + qul.append(randomstr_4) + + return qul + +# #去除a列表中存在的b的元素 +def array_diff(a, b): + #定义空列表 + c=[] + #range(len(a))取的为列表a的索引,根据a的 + for i in range(len(a)): + #取出索引对应的值 + t=a[i] + #判断值是否存在在序列b中 + if t not in b: + #如果序列不在b中,则写入序列c + c.append(t) + #返回序列c,c就是列表a去除列表b之后的元素 + return c + +def filterCondition(schemajson): + number = random.randint(0,100000) + randomstr= random.choice('abcdefghijklmnopqrstuvwxyz') + schemafilters=schemajson["data"]["doc"]["schema_query"]["filters"] + list1= random.sample(schemafilters,3) + print("条件列表",list1) + print("number",number) + print("randomstr",randomstr) + print("str(number)",str(number)) + #获取不同属性支持的部不同操作 + fields = schemajson["data"]["fields"] + operator = schemajson["data"]["doc"]["schema_query"]["references"]["operator"] + andConditions=[] + for i in list1: + #遍历fields列表 + for k in fields: + #当filters列表值等于fields的name时 + if i == k["name"]: + name = k["name"] + doc = k["doc"] + #获取无任何特殊说明列: + if doc == None: + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list=[] + Operator=["=","!=",">","<",">=","<="] + randomOperator= random.sample(Operator, 1) + value=[str(number)] + Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list=[] + Operator=["=","!=","Like","Not Like","notEmpty"] + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + value=[] + if randomOperator == "=" or randomOperator == "!=": + value.append(str(number)) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(randomstr) + elif randomOperator=="notEmpty": + value=[] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + + else: + if k["doc"]["constraints"]== None: + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list=[] + Operator=["=","!=",">","<",">=","<="] + randomOperator= random.sample(Operator, 1) + value=[str(number)] + Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list=[] + Operator=["=","!=","Like","Not Like","notEmpty"] + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + value=[] + if randomOperator == "=" or randomOperator == "!=": + value.append(str(number)) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(randomstr) + elif randomOperator=="notEmpty": + value=[] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + + else: + if k["doc"]["constraints"]["operator_functions"]==None: + conrandomstraints=k["doc"]["constraints"] + type1 = k["type"] + if type1 == "int" or type1 == "long": + Operator=["=","!=",">","<",">=","<="] + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + if conrandomstraints["type"] == "timestamp": + #获取当前时间戳 + t = int(time.time()) + orConditions_list=[] + value=[str(t)] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + + elif type1 == "string": + Operator=["=","!=","Like","Not Like","notEmpty"] + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + if conrandomstraints["type"] == "ip": + orConditions_list=[] + #获取ip + ip =random_ipv4() + value=[] + if randomOperator == "=" or randomOperator == "!=": + value.append(ip) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(ip) + elif randomOperator=="notEmpty": + value=[] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + elif conrandomstraints["type"] == "email": + orConditions_list=[] + Operator=["=","!=","Like","Not Like","notEmpty"] + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + #获取ip + emil =RandomEmail() + value=[] + if randomOperator == "=" or randomOperator == "!=": + value.append(emil) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(emil) + elif randomOperator=="notEmpty": + value=[] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + else: + type1 = k["type"] + orConditions_list=[] + operator1 = k["doc"]["constraints"]["operator_functions"] + operator2 = operator1.split(",") + operator3=random.sample(operator2,1) + operatordata=k["doc"]["data"] + code=[] + for i in operatordata: + code_1=i["code"] + code.append(code_1) + code2=random.sample(code, 1) + Field={"name":name,"expression":operator3[0],"value":code2,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + filterCondition={"andConditions":andConditions} + print(filterCondition) + return filterCondition + +#获取having条件的串 +def Havinginfo(schemajson): + number = random.randint(0,100000) + schemametrics=schemajson["data"]["doc"]["schema_query"]["metrics"] + aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"] + schemametrics.append("common_log_id") + metricslist= random.sample(schemametrics,3) + fields = schemajson["data"]["fields"] + operator=["=","!=",">","<",">=","<="] + andConditions_list=[] + #遍历的到的having条件列表 + for i in metricslist: + for j in fields: + if i == j["name"]: + name = j["name"] + type1=j["type"] + for v in aggregation: + if type1 == v["type"]: + orConditions_list=[] + functionsstr=v["functions"] + functionslist = functionsstr.split(",") + functions_1=random.sample(functionslist, 1) + if functions_1=="COUNT_DISTINCT" and type1 != "string": + functions_1=random.sample(functionslist, 1) + operator_1=random.sample(operator, 1) + havingdict={"name":name,"function":str.lower(functions_1[0]),"expression":operator_1[0],"value":str(number)} + orConditions_list.append(havingdict) + orConditions={"orConditions":orConditions_list} + andConditions_list.append(orConditions) + + + havingCondition={"andConditions":andConditions_list} + print(havingCondition) + return havingCondition + +#拼接字符串 +def datasetjson(schemauerl,token,testname): + #获取Schema +# aaa=["common_sled_ip","common_device_id","common_sessions","common_c2s_pkt_num"] + schema_new=schema(schemauerl,token) + group_re=groupby(schema_new) + groupColumnList=group_re[0] + group_randomstr=group_re[1] + queryColumnList=DataBindings(schema_new,group_randomstr) + filterCondition_1=filterCondition(schema_new) + Havinginfo_1=Havinginfo(schema_new) + datasetdict = { + "list": { + "name":testname, + "logType": "security_event_log", + "groupColumnList":groupColumnList, + "queryColumnList":queryColumnList, + "filterCondition":filterCondition_1, + "havingCondition":Havinginfo_1 +# "isInitialize":0 + } + } + print(datasetdict) + print(json.dumps(datasetdict)) + return json.dumps(datasetdict) + +#拼接char得串 +def charjson(schemaurl,token,datasetjson): + schema_new=schema(schemauerl,token) + queryColumnList=datasetjson["list"]["queryColumnList"] + fields = schema_new["data"]["fields"] + namelist=[] + #获取条件得label + for i in queryColumnList: + for j in fields: + if i == j["name"]: + namelist.append(j[label]) + #图表类型列表 + chartType_1=["line","pie","bar","area","table"] + chartType_2=["pie","bar","table"] + chartType=[] + #随机选择图表类型 + for i in namelist: + if i == "Receive Time" or i == "Start Time" or i == "End Time": + chartType=random.sample(chartType_1, 1) + else: + chartType=random.sample(chartType_2, 1) + + if chartType[0] == "line": + dataBinding=[] + #将时间条件赋值给dataBinding + for j in namelist: + if j == "Receive Time" or j == "Start Time" or j == "End Time": + dataBinding.append(j) + + + + + + elif chartType[0] == "pie": + print(111) + elif chartType[0] == "bar": + print(111) + + elif chartType[0] == "area": + print(111) + + elif chartType[0] == "table": + print(111) + + + + + +def ReportInterfaceTest(schemaurl,token,dataseturl,datasetgeturl,testname): + headers = {"Content-Type": "application/json","Authorization": token} + reportjsom=datasetjson(schemaurl, token,testname) + response1 = requests.post(url=dataseturl, data=reportjsom, headers=headers) + print("返回数据",response1) + code = response1.json()["code"] + print("code:",code) + assert code == 200 + #获取id + datasetget=requests.get(url=datasetgeturl,headers=headers) + print(datasetget.json()) + dasetget=datasetget.json() + id=dasetget["data"]["list"][0]["id"] + return id + + + + + + + + + + + +