This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
dongxiaoyan-tsg-autotest/04-CustomLibrary/Custometest/cmd_cer.py

291 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
from time import sleep
import platform
class Order:
def CMD(self,data):
result = os.popen(data)
# res = result.read().encoding('GBK')
res = result.read()
result.close()
# res = res.decode("unicode-escape")
return res
def Linux(self):
pass
# 根据证书颁发者名字判断证书是否替换
def Cert_Verification(self,data):
c = []
print(1)
#with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo:
with open(r'certificate.yaml', 'r') as foo:
print(2)
for line in foo.readlines():
if data in line:
print(line)
c.append('证书已替换')
else:
pass
if '证书已替换' in c:
# print('证书已替换')
foo.close()
return '证书已替换'
else:
# print('证书未替换')
foo.close()
return '证书未替换'
def Content_Type(self,data):
d = []
with open('certificate.yaml', 'r') as foo:
for line in foo.readlines():
if data in line:
# print(line)
d.append('Content_Type已替换')
else:
pass
if 'Content_Type已替换' in d:
# print('证书已替换')
foo.close()
return 'Content_Type已替换'
else:
# print('证书未替换')
foo.close()
return 'Content_Type未替换'
# curl路由内容设置
def curl_name(self,data):
#curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
return curl_name
# 控制器
def manu(self,url,Certificate):
# print(data['url'])
n = 0
while n != len(url):
b = self.curl_name(url[n])
d = self.CMD(b)
# print(d)
sleep(1)
if Certificate != "":
c =self.Cert_Verification(Certificate)
# f = self.Content_Type(data["Content_Type"])
sleep(1)
assert_cer = url[n]+c
# assert_Content_Type = data['Content_Type']+f
n+=1
return d,assert_cer
def FTP(self, ftp_type):
windows_path = os.getcwd()
linux_path = os.getcwd().replace('\\', '/')
# 判断FTP执行类型下载/登录)
if ftp_type == "下载":
# 调用cmd执行FTP下载文件
data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" -o zmmtext123.txt'
d = self.CMD(data)
sleep(5)
fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814
if fsize == 435814:
return "ftp_success"
else:
return "ftp_fail"
elif ftp_type == "登录":
data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" | iconv -f utf-8 -t gbk'
d = self.CMD(data)
# print(d)
if "Graphical (Xorg) program starter for ADRIANE" in d:
return "ftp_success"
else:
return "ftp_fail"
# FTP 下载
def FTP_down(self, ftp_url,file_size,file_name):
windows_path = os.getcwd()
linux_path = os.getcwd().replace('\\', '/')
# 判断FTP执行类型下载/登录)
# 调用cmd执行FTP下载文件
data = 'curl -m 20 '+ftp_url+ '-o '+ file_name + " ' "
print(data)
d = self.CMD(data)
sleep(5)
fsize = os.path.getsize(linux_path + "/"+file_name) # 435814
print(fsize)
if fsize == file_size:
return "ftp_success"
else:
return "ftp_fail"
# FTP 登录
def FTP_login(self, ftp_url,file_content):
SYS = self.Operating_System()
if SYS == "Windows":
data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
d = self.CMD(data)
else:
data = 'curl -m 10 '+ftp_url
d = self.CMD(data)
if file_content in d:
return "ftp_success"
else:
return "ftp_fail"
# 判断当前操作系统
def Operating_System(self):
os_name = platform.system()
return os_name
#需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容
# 全局变量 null将java中的空值null,装换位python中空值""
global null
null = ''
# 对需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容header 启用自定义json
def Jsoneditmanu(self, jsons, datas=None,header=None):
#判断是否启用自定义json
if header != None:
header = eval(header)
# 返回 header
return header
# 判断是否需要更改json内容
elif datas != None:
# datas = eval(datas)
jsons = eval(jsons)
# 循环遍历替换json内容
for k, v in datas.items():
Order.UpdateAllvalues(self,jsons, k, v)
return jsons
else:
# 返回初始json
return jsons
# 循环嵌套替换
def UpdateAllvalues(self,mydict, key, value):
if isinstance(mydict, dict): # 使用isinstance检测数据类型如果是字典
if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key
mydict[key] = value
for k in mydict.keys(): # 遍历字典的所有子层级将子层级赋值为变量chdict分别替换子层级第一层中所有key对应的value最后在把替换后的子层级赋值给当前处理的key
chdict = mydict[k]
Order.UpdateAllvalues(self,chdict, key, value)
mydict[k] = chdict
elif isinstance(mydict, list): # 如是list
for element in mydict: # 遍历list元素以下重复上面的操作
if isinstance(element, dict):
if key in element.keys():
element[key] = value
for k in element.keys():
chdict = element[k]
Order.UpdateAllvalues(self,chdict, key, value)
element[k] = chdict
#递归提取json中符合要求键的值
import json
def get_dict_allkeys(self,dict_a):
"""
遍历嵌套字典获取json返回结果的所有key值
:param dict_a:
:return: key_list
"""
if isinstance(dict_a, dict): # 使用isinstance检测数据类型
# 如果为字典类型则提取key存放到key_list中
for x in range(len(dict_a)):
temp_key = list(dict_a.keys())[x]
temp_value = dict_a[temp_key]
if temp_key.endswith("Id"):
key_list.append(temp_value)
Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历
elif isinstance(dict_a, list):
# 如果为列表类型则遍历列表里的元素将字典类型的按照上面的方法提取key
for k in dict_a:
if isinstance(k, dict):
for x in range(len(k)):
temp_key = list(k.keys())[x]
temp_value = k[temp_key]
if temp_key.endswith("Id"):
key_list.append(temp_value)
Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历
return key_list
# 判断值是否在列表中
def VerifyProxy(self,data,lists):
global key_list
key_list = []
datas = Order.get_dict_allkeys(self,data)
print(type(datas))
lists=lists.split(",")
print(type(lists))
print("gsd")
datas2=list(map(str,datas))
print(datas2)
print(datas)
print(lists)
if set(datas2) > set(lists):
return "true"
else:
return "flase"
if __name__ == '__main__':
# datas = {"url":['https://www.baidu.com'],
# "Certificate":"Tango Secure Gateway CA",
# # "Content_Type":"text/html",
# 'log':'Security Event Logs',
# "sni":['baidu'],
# "intercept_code":"200",
# "log_code":"200",
# "certifucate":"1",
# "log_content":"true"
# }
# # data= {"url":['https://www.baidu.com'],
# # "Certificate":"Tango Secure Gateway CA"
# # }
# # url = ['https://www.baidu.com']
# # url = ['https://www.baidu.com']
# # url = ['https://www.baidu.com']
# # # Certificate1 = "Tango Secure Gateway CA"
# # Certificate = "baidu"
# # a='Tango Secure Gateway CA'
# # s = Order()
# # b = s.manu(url,Certificate)
# # print(b[1])
# # FTP下载 传入ftp的路径和文件大小
# ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" '
# ftp_size = 435814
# ftp_issue = s.FTP_down(ftp_url,ftp_size)
# # FTP登录 传入ftp的路径和文件内容
# ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" '
# file_content = "Graphical (Xorg) program starter for ADRIANE"
# ftp_issue = s.FTP_login(ftp_url,file_content)
# for i in b:
# print(i)
# dd = s.CMD('curl -I https://www.baidu.com')
# print(dd)
# if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd:
# print("ok")
# a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk'
# 自己写一个字典测试一下上面的方法好用不好用
jsons = {"opAction":"add","policyList":{"policyId":"","policyName":"2324242423","policyType":"tsg_security","action":"intercept","userTags":"","doBlacklist":0,"doLog":1,"policyDesc":"","effectiveRange":{"tag_sets":[[]]},"userRegion":{"protocol":"SSL","keyring":1,"decryption":1,"decrypt_mirror":{"enable":0,"mirror_profile":null}},"referenceObject":[{"objectId":28054,"protocolFields":["TSG_SECURITY_SOURCE_ADDR"]}],"isValid":0,"scheduleId":[],"appObjectIdArray":[3]}}
datas = {"protocol":"edit","opAction":"edit","policyId":123,'protocolFields':1}
print("替换前:\n %s" % jsons)
a = Order()
b = a.Jsoneditmanu(jsons,datas)
# print("替换前:\n %s" % jsons)
print("替换后:\n %s" % b)
data = {"aid":[{'bid':2},{'cid':3}]}
print(type(data))
# data="""{}"""
# data1 = json.loads(data)
get_keys = get_dict_allkeys(data)
print(get_keys)