This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
dongxiaoyan-tsg-autotest/04-CustomLibrary/Custometest/cmd_cer.py
2020-05-26 14:41:02 +08:00

221 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
from time import sleep
import platform
class Order:
def CMD(self,data):
result = os.popen(data)
# res = result.read().encoding('GBK')
res = result.read()
result.close()
# res = res.decode("unicode-escape")
return res
def Linux(self):
pass
# 根据证书颁发者名字判断证书是否替换
def Cert_Verification(self,data):
c = []
print(1)
#with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo:
with open(r'certificate.yaml', 'r') as foo:
print(2)
for line in foo.readlines():
if data in line:
print(line)
c.append('证书已替换')
else:
pass
if '证书已替换' in c:
# print('证书已替换')
foo.close()
return '证书已替换'
else:
# print('证书未替换')
foo.close()
return '证书未替换'
def Content_Type(self,data):
d = []
with open('certificate.yaml', 'r') as foo:
for line in foo.readlines():
if data in line:
# print(line)
d.append('Content_Type已替换')
else:
pass
if 'Content_Type已替换' in d:
# print('证书已替换')
foo.close()
return 'Content_Type已替换'
else:
# print('证书未替换')
foo.close()
return 'Content_Type未替换'
# curl路由内容设置
def curl_name(self,data):
#curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
return curl_name
# 控制器
def manu(self,url,Certificate):
# print(data['url'])
n = 0
while n != len(url):
b = self.curl_name(url[n])
d = self.CMD(b)
# print(d)
sleep(1)
if Certificate != "":
c =self.Cert_Verification(Certificate)
# f = self.Content_Type(data["Content_Type"])
sleep(1)
assert_cer = url[n]+c
# assert_Content_Type = data['Content_Type']+f
n+=1
return d,assert_cer
def FTP(self, ftp_type):
windows_path = os.getcwd()
linux_path = os.getcwd().replace('\\', '/')
# 判断FTP执行类型下载/登录)
if ftp_type == "下载":
# 调用cmd执行FTP下载文件
data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" -o zmmtext123.txt'
d = self.CMD(data)
sleep(5)
fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814
if fsize == 435814:
return "ftp_success"
else:
return "ftp_fail"
elif ftp_type == "登录":
data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" | iconv -f utf-8 -t gbk'
d = self.CMD(data)
# print(d)
if "Graphical (Xorg) program starter for ADRIANE" in d:
return "ftp_success"
else:
return "ftp_fail"
# FTP 下载
def FTP_down(self, ftp_url,file_size,file_name):
windows_path = os.getcwd()
linux_path = os.getcwd().replace('\\', '/')
# 判断FTP执行类型下载/登录)
# 调用cmd执行FTP下载文件
data = 'curl -m 20 '+ftp_url+ '-o '+ file_name + " ' "
print(data)
d = self.CMD(data)
sleep(5)
fsize = os.path.getsize(linux_path + "/"+file_name) # 435814
print(fsize)
if fsize == file_size:
return "ftp_success"
else:
return "ftp_fail"
# FTP 登录
def FTP_login(self, ftp_url,file_content):
SYS = self.Operating_System()
if SYS == "Windows":
data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
d = self.CMD(data)
else:
data = 'curl -m 10 '+ftp_url
d = self.CMD(data)
if file_content in d:
return "ftp_success"
else:
return "ftp_fail"
# 判断当前操作系统
def Operating_System(self):
os_name = platform.system()
return os_name
#需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容
# 全局变量 null将java中的空值null,装换位python中空值""
global null
null = ''
# 对需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容header 启用自定义json
def Jsoneditmanu(self, jsons, datas=None,header=None):
#判断是否启用自定义json
if header != None:
header = eval(header)
# 返回 header
return header
# 判断是否需要更改json内容
elif datas != None:
# datas = eval(datas)
jsons = eval(jsons)
# 循环遍历替换json内容
for k, v in datas.items():
Order.UpdateAllvalues(self,jsons, k, v)
return jsons
else:
# 返回初始json
return jsons
# 循环嵌套替换
def UpdateAllvalues(self,mydict, key, value):
if isinstance(mydict, dict): # 使用isinstance检测数据类型如果是字典
if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key
mydict[key] = value
for k in mydict.keys(): # 遍历字典的所有子层级将子层级赋值为变量chdict分别替换子层级第一层中所有key对应的value最后在把替换后的子层级赋值给当前处理的key
chdict = mydict[k]
Order.UpdateAllvalues(self,chdict, key, value)
mydict[k] = chdict
elif isinstance(mydict, list): # 如是list
for element in mydict: # 遍历list元素以下重复上面的操作
if isinstance(element, dict):
if key in element.keys():
element[key] = value
for k in element.keys():
chdict = element[k]
Order.UpdateAllvalues(self,chdict, key, value)
element[k] = chdict
if __name__ == '__main__':
datas = {"url":['https://www.baidu.com'],
"Certificate":"Tango Secure Gateway CA",
# "Content_Type":"text/html",
'log':'Security Event Logs',
"sni":['baidu'],
"intercept_code":"200",
"log_code":"200",
"certifucate":"1",
"log_content":"true"
}
# data= {"url":['https://www.baidu.com'],
# "Certificate":"Tango Secure Gateway CA"
# }
# url = ['https://www.baidu.com']
# url = ['https://www.baidu.com']
# url = ['https://www.baidu.com']
# # Certificate1 = "Tango Secure Gateway CA"
# Certificate = "baidu"
# a='Tango Secure Gateway CA'
# s = Order()
# b = s.manu(url,Certificate)
# print(b[1])
# FTP下载 传入ftp的路径和文件大小
ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" '
ftp_size = 435814
ftp_issue = s.FTP_down(ftp_url,ftp_size)
# FTP登录 传入ftp的路径和文件内容
ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:chrome@example.com" '
file_content = "Graphical (Xorg) program starter for ADRIANE"
ftp_issue = s.FTP_login(ftp_url,file_content)
# for i in b:
# print(i)
# dd = s.CMD('curl -I https://www.baidu.com')
# print(dd)
# if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd:
# print("ok")
# a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk'