291 lines
11 KiB
Python
291 lines
11 KiB
Python
import os
|
||
import subprocess
|
||
from time import sleep
|
||
import platform
|
||
|
||
|
||
|
||
class Order:
|
||
def CMD(self,data):
|
||
result = os.popen(data)
|
||
# res = result.read().encoding('GBK')
|
||
res = result.read()
|
||
result.close()
|
||
# res = res.decode("unicode-escape")
|
||
return res
|
||
def Linux(self):
|
||
pass
|
||
# 根据证书颁发者名字判断证书是否替换
|
||
def Cert_Verification(self,data):
|
||
c = []
|
||
print(1)
|
||
#with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo:
|
||
with open(r'certificate.yaml', 'r') as foo:
|
||
print(2)
|
||
for line in foo.readlines():
|
||
if data in line:
|
||
print(line)
|
||
c.append('证书已替换')
|
||
else:
|
||
pass
|
||
if '证书已替换' in c:
|
||
# print('证书已替换')
|
||
foo.close()
|
||
return '证书已替换'
|
||
else:
|
||
# print('证书未替换')
|
||
foo.close()
|
||
return '证书未替换'
|
||
|
||
def Content_Type(self,data):
|
||
d = []
|
||
with open('certificate.yaml', 'r') as foo:
|
||
for line in foo.readlines():
|
||
if data in line:
|
||
# print(line)
|
||
d.append('Content_Type已替换')
|
||
else:
|
||
pass
|
||
if 'Content_Type已替换' in d:
|
||
# print('证书已替换')
|
||
foo.close()
|
||
return 'Content_Type已替换'
|
||
else:
|
||
# print('证书未替换')
|
||
foo.close()
|
||
return 'Content_Type未替换'
|
||
# curl路由内容设置
|
||
def curl_name(self,data):
|
||
#curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
|
||
curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
|
||
return curl_name
|
||
# 控制器
|
||
def manu(self,url,Certificate):
|
||
# print(data['url'])
|
||
n = 0
|
||
while n != len(url):
|
||
b = self.curl_name(url[n])
|
||
d = self.CMD(b)
|
||
# print(d)
|
||
sleep(1)
|
||
if Certificate != "":
|
||
c =self.Cert_Verification(Certificate)
|
||
# f = self.Content_Type(data["Content_Type"])
|
||
sleep(1)
|
||
assert_cer = url[n]+c
|
||
# assert_Content_Type = data['Content_Type']+f
|
||
n+=1
|
||
return d,assert_cer
|
||
|
||
def FTP(self, ftp_type):
|
||
windows_path = os.getcwd()
|
||
linux_path = os.getcwd().replace('\\', '/')
|
||
# 判断FTP执行类型:(下载/登录)
|
||
if ftp_type == "下载":
|
||
# 调用cmd执行FTP下载文件
|
||
data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" -o zmmtext123.txt'
|
||
d = self.CMD(data)
|
||
sleep(5)
|
||
fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814
|
||
if fsize == 435814:
|
||
return "ftp_success"
|
||
else:
|
||
return "ftp_fail"
|
||
elif ftp_type == "登录":
|
||
data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" | iconv -f utf-8 -t gbk'
|
||
d = self.CMD(data)
|
||
# print(d)
|
||
if "Graphical (Xorg) program starter for ADRIANE" in d:
|
||
return "ftp_success"
|
||
else:
|
||
return "ftp_fail"
|
||
# FTP 下载
|
||
def FTP_down(self, ftp_url,file_size,file_name):
|
||
windows_path = os.getcwd()
|
||
linux_path = os.getcwd().replace('\\', '/')
|
||
# 判断FTP执行类型:(下载/登录)
|
||
# 调用cmd执行FTP下载文件
|
||
data = 'curl -m 20 '+ftp_url+ '-o '+ file_name + " ' "
|
||
print(data)
|
||
d = self.CMD(data)
|
||
sleep(5)
|
||
fsize = os.path.getsize(linux_path + "/"+file_name) # 435814
|
||
print(fsize)
|
||
if fsize == file_size:
|
||
return "ftp_success"
|
||
else:
|
||
return "ftp_fail"
|
||
|
||
# FTP 登录
|
||
def FTP_login(self, ftp_url,file_content):
|
||
SYS = self.Operating_System()
|
||
if SYS == "Windows":
|
||
data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
|
||
d = self.CMD(data)
|
||
else:
|
||
data = 'curl -m 10 '+ftp_url
|
||
d = self.CMD(data)
|
||
|
||
if file_content in d:
|
||
return "ftp_success"
|
||
else:
|
||
return "ftp_fail"
|
||
|
||
# 判断当前操作系统
|
||
def Operating_System(self):
|
||
os_name = platform.system()
|
||
return os_name
|
||
|
||
|
||
#需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容
|
||
# 全局变量 null,将java中的空值(null),装换位python中空值("")
|
||
global null
|
||
null = ''
|
||
|
||
# 对需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容,header 启用自定义json
|
||
def Jsoneditmanu(self, jsons, datas=None,header=None):
|
||
#判断是否启用自定义json
|
||
if header != None:
|
||
header = eval(header)
|
||
# 返回 header
|
||
return header
|
||
# 判断是否需要更改json内容
|
||
elif datas != None:
|
||
# datas = eval(datas)
|
||
jsons = eval(jsons)
|
||
# 循环遍历替换json内容
|
||
for k, v in datas.items():
|
||
Order.UpdateAllvalues(self,jsons, k, v)
|
||
return jsons
|
||
else:
|
||
# 返回初始json
|
||
return jsons
|
||
|
||
# 循环嵌套替换
|
||
def UpdateAllvalues(self,mydict, key, value):
|
||
if isinstance(mydict, dict): # 使用isinstance检测数据类型,如果是字典
|
||
if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key
|
||
mydict[key] = value
|
||
for k in mydict.keys(): # 遍历字典的所有子层级,将子层级赋值为变量chdict,分别替换子层级第一层中所有key对应的value,最后在把替换后的子层级赋值给当前处理的key
|
||
chdict = mydict[k]
|
||
Order.UpdateAllvalues(self,chdict, key, value)
|
||
mydict[k] = chdict
|
||
elif isinstance(mydict, list): # 如是list
|
||
for element in mydict: # 遍历list元素,以下重复上面的操作
|
||
if isinstance(element, dict):
|
||
if key in element.keys():
|
||
element[key] = value
|
||
for k in element.keys():
|
||
chdict = element[k]
|
||
Order.UpdateAllvalues(self,chdict, key, value)
|
||
element[k] = chdict
|
||
|
||
|
||
|
||
#递归提取json中符合要求键的值
|
||
import json
|
||
def get_dict_allkeys(self,dict_a):
|
||
"""
|
||
遍历嵌套字典,获取json返回结果的所有key值
|
||
:param dict_a:
|
||
:return: key_list
|
||
"""
|
||
if isinstance(dict_a, dict): # 使用isinstance检测数据类型
|
||
# 如果为字典类型,则提取key存放到key_list中
|
||
for x in range(len(dict_a)):
|
||
temp_key = list(dict_a.keys())[x]
|
||
temp_value = dict_a[temp_key]
|
||
if temp_key.endswith("Id"):
|
||
key_list.append(temp_value)
|
||
Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历
|
||
elif isinstance(dict_a, list):
|
||
# 如果为列表类型,则遍历列表里的元素,将字典类型的按照上面的方法提取key
|
||
for k in dict_a:
|
||
if isinstance(k, dict):
|
||
for x in range(len(k)):
|
||
temp_key = list(k.keys())[x]
|
||
temp_value = k[temp_key]
|
||
if temp_key.endswith("Id"):
|
||
key_list.append(temp_value)
|
||
Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历
|
||
return key_list
|
||
|
||
# 判断值是否在列表中
|
||
def VerifyProxy(self,data,lists):
|
||
global key_list
|
||
key_list = []
|
||
datas = Order.get_dict_allkeys(self,data)
|
||
print(type(datas))
|
||
lists=lists.split(",")
|
||
print(type(lists))
|
||
print("gsd")
|
||
datas2=list(map(str,datas))
|
||
print(datas2)
|
||
print(datas)
|
||
print(lists)
|
||
|
||
if set(datas2) > set(lists):
|
||
return "true"
|
||
else:
|
||
return "flase"
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# datas = {"url":['https://www.baidu.com'],
|
||
# "Certificate":"Tango Secure Gateway CA",
|
||
# # "Content_Type":"text/html",
|
||
# 'log':'Security Event Logs',
|
||
# "sni":['baidu'],
|
||
# "intercept_code":"200",
|
||
# "log_code":"200",
|
||
# "certifucate":"1",
|
||
# "log_content":"true"
|
||
# }
|
||
# # data= {"url":['https://www.baidu.com'],
|
||
# # "Certificate":"Tango Secure Gateway CA"
|
||
# # }
|
||
# # url = ['https://www.baidu.com']
|
||
# # url = ['https://www.baidu.com']
|
||
# # url = ['https://www.baidu.com']
|
||
# # # Certificate1 = "Tango Secure Gateway CA"
|
||
# # Certificate = "baidu"
|
||
# # a='Tango Secure Gateway CA'
|
||
# # s = Order()
|
||
# # b = s.manu(url,Certificate)
|
||
# # print(b[1])
|
||
# # FTP下载 传入ftp的路径和文件大小
|
||
# ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" '
|
||
# ftp_size = 435814
|
||
# ftp_issue = s.FTP_down(ftp_url,ftp_size)
|
||
# # FTP登录 传入ftp的路径和文件内容
|
||
# ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" '
|
||
# file_content = "Graphical (Xorg) program starter for ADRIANE"
|
||
# ftp_issue = s.FTP_login(ftp_url,file_content)
|
||
# for i in b:
|
||
# print(i)
|
||
# dd = s.CMD('curl -I https://www.baidu.com')
|
||
# print(dd)
|
||
# if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd:
|
||
# print("ok")
|
||
# a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk'
|
||
|
||
|
||
# 自己写一个字典测试一下上面的方法好用不好用
|
||
jsons = {"opAction":"add","policyList":{"policyId":"","policyName":"2324242423","policyType":"tsg_security","action":"intercept","userTags":"","doBlacklist":0,"doLog":1,"policyDesc":"","effectiveRange":{"tag_sets":[[]]},"userRegion":{"protocol":"SSL","keyring":1,"decryption":1,"decrypt_mirror":{"enable":0,"mirror_profile":null}},"referenceObject":[{"objectId":28054,"protocolFields":["TSG_SECURITY_SOURCE_ADDR"]}],"isValid":0,"scheduleId":[],"appObjectIdArray":[3]}}
|
||
datas = {"protocol":"edit","opAction":"edit","policyId":123,'protocolFields':1}
|
||
|
||
print("替换前:\n %s" % jsons)
|
||
|
||
|
||
a = Order()
|
||
b = a.Jsoneditmanu(jsons,datas)
|
||
# print("替换前:\n %s" % jsons)
|
||
print("替换后:\n %s" % b)
|
||
|
||
data = {"aid":[{'bid':2},{'cid':3}]}
|
||
print(type(data))
|
||
# data="""{}"""
|
||
# data1 = json.loads(data)
|
||
get_keys = get_dict_allkeys(data)
|
||
print(get_keys)
|