1、删除tsg-diagnose rpm包 的preinstall 2、修改unittest_self.py 为tsg_diagnose.py 3、变更tsg_diagnose 生成目录 4、增加定时删除tsg_diagnose结果的定时任务 5、新增安装CIUnitTest python 包
This commit is contained in:
609
unittest_python/unittest/tsg_diagnose.py
Normal file
609
unittest_python/unittest/tsg_diagnose.py
Normal file
@@ -0,0 +1,609 @@
|
||||
import sys
|
||||
import unittest
|
||||
import json
|
||||
import pycurl
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from io import BytesIO
|
||||
import getopt
|
||||
import ciunittest
|
||||
import argparse
|
||||
|
||||
|
||||
|
||||
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
|
||||
URLIntercept = 'https://sha256.badssl.self-test.geedge.net'
|
||||
URLSexpired = 'https://expired.badssl.self-test.geedge.net'
|
||||
URLSwronghost = 'https://wrong.host.badssl.self-test.geedge.net'
|
||||
URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net'
|
||||
URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net'
|
||||
URLSrevoked = 'https://revoked.badssl.self-test.geedge.net'
|
||||
URLSpinningtest = 'https://pinning-test.badssl.self-test.geedge.net'
|
||||
|
||||
URLRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
|
||||
URLReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
|
||||
URLInsert = 'https://cn.bing.com/?FORM=BEHPTB'
|
||||
URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
|
||||
URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
|
||||
|
||||
URLdictConTrafficInject = {
|
||||
"0k":"https://downloadfile.self-test.geedge.net/0k", \
|
||||
"1k":"https://downloadfile.self-test.geedge.net/1k", \
|
||||
"2k":"https://downloadfile.self-test.geedge.net/2k", \
|
||||
"4k":"https://downloadfile.self-test.geedge.net/4k", \
|
||||
"8k":"https://downloadfile.self-test.geedge.net/8k", \
|
||||
"16k":"https://downloadfile.self-test.geedge.net/16k", \
|
||||
"32k":"https://downloadfile.self-test.geedge.net/32k", \
|
||||
"64k":"https://downloadfile.self-test.geedge.net/64k", \
|
||||
"128k":"https://downloadfile.self-test.geedge.net/128k", \
|
||||
"256k":"https://downloadfile.self-test.geedge.net/256k", \
|
||||
"512k":"https://downloadfile.self-test.geedge.net/512k", \
|
||||
"1M":"https://downloadfile.self-test.geedge.net/1M", \
|
||||
"2M":"https://downloadfile.self-test.geedge.net/2M", \
|
||||
"4M":"https://downloadfile.self-test.geedge.net/4M", \
|
||||
"8M":"https://downloadfile.self-test.geedge.net/8M", \
|
||||
"16M":"https://downloadfile.self-test.geedge.net/16M", \
|
||||
"32M":"https://downloadfile.self-test.geedge.net/32M", \
|
||||
"64M":"https://downloadfile.self-test.geedge.net/64M"}
|
||||
|
||||
|
||||
ssl_bypass_info_re = "Ssl connection bypass success"
|
||||
ssl_intercept_info_re = "Ssl connection intercept success"
|
||||
https_exprired_info_re = "https exprired ok"
|
||||
https_wrong_host_info_re = "https wrong host ok"
|
||||
https_self_signed_info_re = "https self signed ok"
|
||||
https_untrusted_root_info_re = "https untrusted_root ok"
|
||||
https_revoked_info_re = "https revoked ok"
|
||||
https_pinning_test_info_re = "https pinning-test ok"
|
||||
http_redirect_info_re = "http connection redirect success"
|
||||
http_replace_info_re = "http connection replace success"
|
||||
http_insert_info_re = "http connection insert success"
|
||||
http_hijack_info_re = "http connection hijack success"
|
||||
http_block_info_re = "http connection block success"
|
||||
|
||||
https_download_file_info_re = "http download file success"
|
||||
|
||||
wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131']
|
||||
|
||||
class SSLCheckRequestBuild:
|
||||
def __init__(self):
|
||||
self.conn = pycurl.Curl()
|
||||
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
||||
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
|
||||
self.conn.setopt(self.conn.TIMEOUT, 1)
|
||||
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
|
||||
|
||||
def ssl_bypass(self):
|
||||
self.conn.setopt(self.conn.URL,URLBypass)
|
||||
self.conn.perform()
|
||||
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
||||
self.conn.close()
|
||||
issuer = ()
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0] == "Issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
||||
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0):
|
||||
raise Exception(ssl_bypass_info_re)
|
||||
elif re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
||||
raise Exception("Error:Ssl connection is intercepted, not bypass")
|
||||
else:
|
||||
raise Exception("Error:Got other error certificate information, ssl connection's packages may loss")
|
||||
|
||||
def ssl_intercept(self):
|
||||
self.conn.setopt(self.conn.URL,URLIntercept)
|
||||
self.conn.perform()
|
||||
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
||||
self.conn.close()
|
||||
issuer = ()
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0].lower() == "issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
||||
if re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0):
|
||||
raise Exception(ssl_intercept_info_re)
|
||||
elif re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0):
|
||||
raise Exception("Error: Ssl connection is bypass, not intercept")
|
||||
else:
|
||||
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
|
||||
|
||||
|
||||
class SslInterceptRequestBuild:
|
||||
def __init__(self):
|
||||
self.conn = pycurl.Curl()
|
||||
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
||||
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
|
||||
self.conn.setopt(self.conn.TIMEOUT, 1)
|
||||
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
|
||||
|
||||
def ssl_intercept_certerrExpired(self):
|
||||
self.conn.setopt(self.conn.URL, URLSexpired)
|
||||
self.conn.perform()
|
||||
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
||||
self.conn.close()
|
||||
issuer = ()
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0].lower() == "issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
||||
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
||||
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
||||
raise Exception(https_exprired_info_re)
|
||||
else:
|
||||
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
|
||||
else:
|
||||
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
|
||||
|
||||
|
||||
def ssl_intercept_certerrWrong_host(self):
|
||||
self.conn.setopt(self.conn.URL,URLSwronghost)
|
||||
self.conn.setopt(self.conn.SSL_VERIFYHOST, False)
|
||||
self.conn.perform()
|
||||
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
||||
self.conn.close()
|
||||
issuer = ()
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0].lower() == "issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
||||
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
||||
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
||||
raise Exception(https_wrong_host_info_re)
|
||||
else:
|
||||
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
|
||||
else:
|
||||
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
|
||||
|
||||
def ssl_intercept_certerrSelf_signed(self):
|
||||
self.conn.setopt(self.conn.URL,URLSselfsigned)
|
||||
self.conn.perform()
|
||||
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
||||
self.conn.close()
|
||||
issuer = ()
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0].lower() == "issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
||||
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
||||
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
||||
raise Exception(https_self_signed_info_re)
|
||||
else:
|
||||
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
|
||||
else:
|
||||
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
|
||||
|
||||
def ssl_intercept_certerrUntrusted_root(self):
|
||||
self.conn.setopt(self.conn.URL,URLSuntrustedroot)
|
||||
self.conn.perform()
|
||||
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
||||
self.conn.close()
|
||||
issuer = ()
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0].lower() == "issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
||||
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
||||
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
||||
raise Exception(https_untrusted_root_info_re)
|
||||
else:
|
||||
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
|
||||
else:
|
||||
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
|
||||
|
||||
|
||||
def ssl_intercept_certerrRevoked(self):
|
||||
|
||||
self.conn.setopt(self.conn.URL,URLSrevoked)
|
||||
self.conn.perform()
|
||||
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
||||
self.conn.close()
|
||||
issuer = ()
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0].lower() == "issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
||||
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
||||
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
||||
raise Exception(https_revoked_info_re)
|
||||
else:
|
||||
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
|
||||
else:
|
||||
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
|
||||
|
||||
|
||||
def ssl_intercept_certerrPinning_test(self):
|
||||
|
||||
self.conn.setopt(self.conn.URL,URLSpinningtest)
|
||||
self.conn.perform()
|
||||
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
||||
self.conn.close()
|
||||
issuer = ()
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0].lower() == "issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
||||
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
||||
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
||||
raise Exception(https_pinning_test_info_re)
|
||||
else:
|
||||
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
|
||||
else:
|
||||
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
|
||||
|
||||
|
||||
class SslHttpRequestBuild:
|
||||
def __init__(self):
|
||||
self.bodyBuf = BytesIO()
|
||||
self.conn = pycurl.Curl()
|
||||
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
|
||||
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
|
||||
self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
|
||||
self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve)
|
||||
self.conn.setopt(self.conn.TIMEOUT, 1)
|
||||
|
||||
def http_redirect(self):
|
||||
self.conn.setopt(self.conn.URL, URLRedirect)
|
||||
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
||||
self.conn.perform()
|
||||
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
||||
self.conn.close()
|
||||
if rescode == 301 or rescode == 302:
|
||||
raise Exception(http_redirect_info_re)
|
||||
else:
|
||||
raise Exception("Error:Http connection redirect fail")
|
||||
|
||||
def http_replace(self):
|
||||
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
|
||||
self.conn.setopt(self.conn.URL, URLReplace)
|
||||
resCode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
||||
self.conn.perform()
|
||||
body = self.bodyBuf.getvalue().decode('utf-8')
|
||||
self.conn.close()
|
||||
if not re.search(r'EnglishSearchShared', body, 0) and \
|
||||
re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0):
|
||||
raise Exception(http_replace_info_re)
|
||||
else:
|
||||
raise Exception("Error:Http connection replace fail")
|
||||
|
||||
def http_insert(self):
|
||||
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
|
||||
self.conn.setopt(self.conn.URL, URLInsert)
|
||||
resCode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
||||
self.conn.perform()
|
||||
body = self.bodyBuf.getvalue().decode('utf-8')
|
||||
self.conn.close()
|
||||
if re.search(r'httpSelfcheckInsert', body, 0) and \
|
||||
re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0):
|
||||
raise Exception(http_insert_info_re)
|
||||
else:
|
||||
raise Exception("Error:Http connection insert fail")
|
||||
|
||||
def http_block(self):
|
||||
self.conn.setopt(self.conn.URL, URLBlock)
|
||||
self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write)
|
||||
self.conn.perform()
|
||||
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
||||
body = self.bodyBuf.getvalue().decode('utf-8')
|
||||
self.conn.close()
|
||||
if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451):
|
||||
raise Exception(http_block_info_re)
|
||||
else:
|
||||
raise Exception("Error:http connection block fail")
|
||||
|
||||
def http_hijack(self):
|
||||
|
||||
self.conn.setopt(self.conn.URL, URLHijack)
|
||||
self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write)
|
||||
self.conn.perform()
|
||||
self.conn.close()
|
||||
if os.path.exists("/root/http_hijack.out"):
|
||||
os.remove("/root/http_hijack.out")
|
||||
cmdtodo = 'curl %s -k -s --resolve cn.bing.com:443:192.0.2.131 -o /root/http_hijack.out' % URLHijack
|
||||
optdl = os.popen(cmdtodo)
|
||||
if len(optdl.read()):
|
||||
optdl.close()
|
||||
raise Exception("Error:http_hijack download file fail")
|
||||
optdl.close()
|
||||
if not os.path.exists("/root/http_hijack.out"):
|
||||
raise Exception("Error:http_hijack download file fail")
|
||||
optmd5 = os.popen("md5sum /root/http_hijack.out")
|
||||
if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", optmd5.read(), 0):
|
||||
optmd5.close()
|
||||
raise Exception(http_hijack_info_re)
|
||||
else:
|
||||
optmd5.close()
|
||||
raise Exception("Error:http connection hijack fail")
|
||||
|
||||
|
||||
class SSLFileDownloadBuild:
|
||||
def __init__(self):
|
||||
self.sizeList = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"]
|
||||
self.resultList = []
|
||||
self.isException = False
|
||||
|
||||
def build_conninfo_json(self,conn):
|
||||
dictconninfo = {}
|
||||
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
|
||||
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
|
||||
dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME)
|
||||
dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME)
|
||||
dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME)
|
||||
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
|
||||
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
|
||||
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
|
||||
dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE)
|
||||
dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE)
|
||||
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
|
||||
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
|
||||
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
|
||||
return dictconninfo
|
||||
|
||||
def get_value_from_succ_conn(self,urlkey,url,conn):
|
||||
dictinfo = {}
|
||||
dictinfo["downloadsize"] = urlkey
|
||||
dictinfo["url"] = url
|
||||
dictinfo["time"] = time.asctime( time.localtime(time.time()))
|
||||
dictinfo["result"] = self.build_conninfo_json(conn)
|
||||
self.resultList.append(dictinfo)
|
||||
|
||||
def conn_filedownload(self,urlkey,url):
|
||||
issuer = ()
|
||||
conn = pycurl.Curl()
|
||||
errdict = {}
|
||||
conn.setopt(conn.WRITEFUNCTION, BytesIO().write)
|
||||
conn.setopt(conn.SSL_VERIFYPEER, False)
|
||||
conn.setopt(conn.OPT_CERTINFO, 1)
|
||||
conn.setopt(conn.TIMEOUT, 1)
|
||||
conn.setopt(conn.URL,url)
|
||||
conn.perform()
|
||||
certs = conn.getinfo(conn.INFO_CERTINFO)
|
||||
for cert_info in certs[0]:
|
||||
if cert_info[0].lower() == "issuer":
|
||||
issuer = cert_info
|
||||
break
|
||||
if len(issuer) <= 0:
|
||||
errdict["status"] = "error"
|
||||
errdict["errinfo"] = "Get certificate info error"
|
||||
errdict["url"] = url
|
||||
errdict["time"] = time.asctime( time.localtime(time.time()))
|
||||
self.resultList.append(errdict)
|
||||
self.isException = True
|
||||
elif not re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0):
|
||||
errdict["status"] = "error"
|
||||
errdict["errinfo"] = "Intercept fail: no Tango cert"
|
||||
errdict["url"] = url
|
||||
errdict["time"] = time.asctime( time.localtime(time.time()))
|
||||
self.resultList.append(errdict)
|
||||
self.isException = True
|
||||
else:
|
||||
self.get_value_from_succ_conn(urlkey,url,conn)
|
||||
conn.close()
|
||||
|
||||
def write_log(self):
|
||||
logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime())
|
||||
logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest"
|
||||
with open(logNewestPath,"w+") as f:
|
||||
f.write(json.dumps(self.resultList, sort_keys=True, indent=4, separators=(',', ': ')))
|
||||
f.close()
|
||||
with open(logpath,"a+") as f:
|
||||
f.write(json.dumps(self.resultList))
|
||||
f.write("\n")
|
||||
f.close()
|
||||
|
||||
def downfile_run(self):
|
||||
for sizefield in self.sizeList:
|
||||
self.conn_filedownload(sizefield,URLdictConTrafficInject[sizefield])
|
||||
self.write_log()
|
||||
if self.isException == True:
|
||||
raise Exception("Error:http_hijack download file fail")
|
||||
else:
|
||||
raise Exception(https_download_file_info_re)
|
||||
|
||||
|
||||
|
||||
class SslUnitTest(unittest.TestCase):
|
||||
|
||||
def test_securityPolicy_bypass(self):
|
||||
sslHandler = SSLCheckRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, ssl_bypass_info_re):
|
||||
sslHandler.ssl_bypass()
|
||||
|
||||
def test_securityPolicy_intercept(self):
|
||||
sslHandler = SSLCheckRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, ssl_intercept_info_re):
|
||||
sslHandler.ssl_intercept()
|
||||
|
||||
def test_securityPolicy_intercept_certerrExpired(self):
|
||||
requestHandler = SslInterceptRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, https_exprired_info_re):
|
||||
requestHandler.ssl_intercept_certerrExpired()
|
||||
|
||||
def test_securityPolicy_intercept_certerrWrong_host(self):
|
||||
requestHandler = SslInterceptRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, https_wrong_host_info_re):
|
||||
requestHandler.ssl_intercept_certerrWrong_host()
|
||||
|
||||
def test_securityPolicy_intercept_certerrSelf_signed(self):
|
||||
requestHandler = SslInterceptRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, https_self_signed_info_re):
|
||||
requestHandler.ssl_intercept_certerrSelf_signed()
|
||||
|
||||
def test_securityPolicy_intercept_certerrUntrusted_root(self):
|
||||
requestHandler = SslInterceptRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, https_untrusted_root_info_re):
|
||||
requestHandler.ssl_intercept_certerrUntrusted_root()
|
||||
|
||||
def test_securityPolicy_intercept_certerrRevoked(self):
|
||||
requestHandler = SslInterceptRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, https_revoked_info_re):
|
||||
requestHandler.ssl_intercept_certerrRevoked()
|
||||
|
||||
def test_securityPolicy_intercept_certerrPinning_test(self):
|
||||
requestHandler = SslInterceptRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, https_pinning_test_info_re):
|
||||
requestHandler.ssl_intercept_certerrPinning_test()
|
||||
|
||||
def test_proxyPolicy_redirect(self):
|
||||
httpHandler = SslHttpRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, http_redirect_info_re):
|
||||
httpHandler.http_redirect()
|
||||
|
||||
def test_proxyPolicy_block(self):
|
||||
httpHandler = SslHttpRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, http_block_info_re):
|
||||
httpHandler.http_block()
|
||||
|
||||
def test_proxyPolicy_replace(self):
|
||||
httpHandler = SslHttpRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, http_replace_info_re):
|
||||
httpHandler.http_replace()
|
||||
|
||||
def test_proxyPolicy_hijack(self):
|
||||
httpHandler = SslHttpRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, http_hijack_info_re):
|
||||
httpHandler.http_hijack()
|
||||
|
||||
def test_proxyPolicy_insert(self):
|
||||
httpHandler = SslHttpRequestBuild()
|
||||
with self.assertRaisesRegex(Exception, http_insert_info_re):
|
||||
httpHandler.http_insert()
|
||||
|
||||
def test_securityPolicy_con_traffic_inject(self):
|
||||
requestHandler = SSLFileDownloadBuild()
|
||||
with self.assertRaisesRegex(Exception,https_download_file_info_re):
|
||||
requestHandler.downfile_run()
|
||||
|
||||
|
||||
class TsgDiagnoseRun:
|
||||
def __init__(self):
|
||||
self.interval = 1
|
||||
self.format = "txt"
|
||||
self.write = None
|
||||
self.loop = False
|
||||
self.count = 1
|
||||
|
||||
def _get_suite_option(self):
|
||||
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help")
|
||||
parser.add_argument('-i','--interval', type = int, default = 1,help='Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose.')
|
||||
parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535')
|
||||
parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: json,txt, the default is txt.')
|
||||
parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file. Specifies the output file name.')
|
||||
parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal')
|
||||
args = parser.parse_args()
|
||||
self.interval = args.interval
|
||||
self.format = args.format
|
||||
self.write = args.write
|
||||
self.loop = args.loop
|
||||
self.count = args.count
|
||||
if self.count == 0:
|
||||
print("Error: bad number of tsg diagnose and will exit")
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
if self.format not in ('json', 'txt'):
|
||||
print("Error: bad output format of tsg diagnose and will exit")
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
def _init_suite(self):
|
||||
if self.format == 'txt':
|
||||
self.suite = unittest.TestSuite()
|
||||
self.suite._cleanup = False
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_bypass'))
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept'))
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired'))
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host'))
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed'))
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root'))
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked'))
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test'))
|
||||
self.suite.addTest(SslUnitTest('test_proxyPolicy_redirect'))
|
||||
self.suite.addTest(SslUnitTest('test_proxyPolicy_block'))
|
||||
self.suite.addTest(SslUnitTest('test_proxyPolicy_replace'))
|
||||
self.suite.addTest(SslUnitTest('test_proxyPolicy_hijack'))
|
||||
self.suite.addTest(SslUnitTest('test_proxyPolicy_insert'))
|
||||
self.suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject'))
|
||||
|
||||
if self.format == 'json':
|
||||
self.suite = None
|
||||
|
||||
|
||||
def _write_suite_result(self):
|
||||
resultDict = '/root/result_self_test/unittest/'
|
||||
resultNewestPath = resultDict + self.write
|
||||
resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
|
||||
if self.format == 'txt':
|
||||
with open(resultNewestPath,"w+") as f:
|
||||
runner = unittest.TextTestRunner(stream=f,verbosity=2)
|
||||
runner.run(self.suite)
|
||||
f.close()
|
||||
|
||||
if self.format == 'json':
|
||||
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
|
||||
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True)
|
||||
with open(resultNewestPath,"w+") as f:
|
||||
f.write(result_json)
|
||||
f.close()
|
||||
|
||||
with open(resultPath,"w+") as f:
|
||||
fn = open(resultNewestPath,'r')
|
||||
f.write(fn.read())
|
||||
fn.close()
|
||||
f.close()
|
||||
|
||||
def _stdout_suite_result(self):
|
||||
print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s'))
|
||||
if self.format == 'txt':
|
||||
runner = unittest.TextTestRunner(verbosity=2)
|
||||
runner.run(self.suite)
|
||||
if self.format == 'json':
|
||||
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
|
||||
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True)
|
||||
print(result_json)
|
||||
print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s'))
|
||||
|
||||
def _output_suite_result(self):
|
||||
if self.write:
|
||||
self._write_suite_result()
|
||||
else:
|
||||
self._stdout_suite_result()
|
||||
|
||||
def execute_suite_tsg_diagnose(self):
|
||||
self._get_suite_option()
|
||||
self._init_suite()
|
||||
try:
|
||||
counter = 0
|
||||
print("Tsg diagnose run sum: %d" % self.count)
|
||||
while True:
|
||||
print("\nRUN %d" %(counter + 1))
|
||||
self._output_suite_result()
|
||||
counter = counter + 1
|
||||
if not self.loop:
|
||||
if counter >= self.count:
|
||||
break
|
||||
time.sleep(self.interval)
|
||||
except Exception as ex:
|
||||
print("Process get an exception, will exit, Exception info:", ex)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
tsg_diagnose_run = TsgDiagnoseRun()
|
||||
tsg_diagnose_run.execute_suite_tsg_diagnose()
|
||||
Reference in New Issue
Block a user