TSG-101:将con_traffic_inject.py 合并到unitest_self.py中去
This commit is contained in:
@@ -82,7 +82,6 @@ services:
|
|||||||
update-ca-certificates
|
update-ca-certificates
|
||||||
cat /root/unittest/badssl.test.hosts >> /etc/hosts
|
cat /root/unittest/badssl.test.hosts >> /etc/hosts
|
||||||
python /root/unittest/unittest_self.py &
|
python /root/unittest/unittest_self.py &
|
||||||
python /root/unittest/con_traffic_inject.py &
|
|
||||||
tail -f /dev/null
|
tail -f /dev/null
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,26 @@ URLInsert = 'https://cn.bing.com/?FORM=BEHPTB'
|
|||||||
URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
|
URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
|
||||||
URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
|
URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
|
||||||
|
|
||||||
|
URLdictConTrafficInject = {
|
||||||
|
"0k":"https://downloadfile.self-test.geedge.net/0k", \
|
||||||
|
"1k":"https://downloadfile.self-test.geedge.net/1k", \
|
||||||
|
"2k":"https://downloadfile.self-test.geedge.net/2k", \
|
||||||
|
"4k":"https://downloadfile.self-test.geedge.net/4k", \
|
||||||
|
"8k":"https://downloadfile.self-test.geedge.net/8k", \
|
||||||
|
"16k":"https://downloadfile.self-test.geedge.net/16k", \
|
||||||
|
"32k":"https://downloadfile.self-test.geedge.net/32k", \
|
||||||
|
"64k":"https://downloadfile.self-test.geedge.net/64k", \
|
||||||
|
"128k":"https://downloadfile.self-test.geedge.net/128k", \
|
||||||
|
"256k":"https://downloadfile.self-test.geedge.net/256k", \
|
||||||
|
"512k":"https://downloadfile.self-test.geedge.net/512k", \
|
||||||
|
"1M":"https://downloadfile.self-test.geedge.net/1M", \
|
||||||
|
"2M":"https://downloadfile.self-test.geedge.net/2M", \
|
||||||
|
"4M":"https://downloadfile.self-test.geedge.net/4M", \
|
||||||
|
"8M":"https://downloadfile.self-test.geedge.net/8M", \
|
||||||
|
"16M":"https://downloadfile.self-test.geedge.net/16M", \
|
||||||
|
"32M":"https://downloadfile.self-test.geedge.net/32M", \
|
||||||
|
"64M":"https://downloadfile.self-test.geedge.net/64M"}
|
||||||
|
|
||||||
|
|
||||||
ssl_bypass_info_re = "ssl bypass ok"
|
ssl_bypass_info_re = "ssl bypass ok"
|
||||||
ssl_intercept_info_re = "ssl intercept ok"
|
ssl_intercept_info_re = "ssl intercept ok"
|
||||||
@@ -38,6 +58,8 @@ http_insert_info_re = "http connection insert success"
|
|||||||
http_hijack_info_re = "http connection hijack success"
|
http_hijack_info_re = "http connection hijack success"
|
||||||
http_block_info_re = "http connection block success"
|
http_block_info_re = "http connection block success"
|
||||||
|
|
||||||
|
https_download_file_info_re = "http download file success"
|
||||||
|
|
||||||
wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131']
|
wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131']
|
||||||
|
|
||||||
class SSLCheckRequestBuild:
|
class SSLCheckRequestBuild:
|
||||||
@@ -303,6 +325,93 @@ class SslHttpRequestBuild:
|
|||||||
raise Exception("Error:http connection hijack fail")
|
raise Exception("Error:http connection hijack fail")
|
||||||
|
|
||||||
|
|
||||||
|
class SSLFileDownloadBuild:
|
||||||
|
def __init__(self):
|
||||||
|
self.sizeList = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"]
|
||||||
|
self.resultList = []
|
||||||
|
self.isException = False
|
||||||
|
|
||||||
|
def build_conninfo_json(self,conn):
|
||||||
|
dictconninfo = {}
|
||||||
|
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
|
||||||
|
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
|
||||||
|
dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME)
|
||||||
|
dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME)
|
||||||
|
dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME)
|
||||||
|
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
|
||||||
|
dictconninfo["redirect_count"] = conn.getinfo(pycurl.REDIRECT_COUNT)
|
||||||
|
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
|
||||||
|
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
|
||||||
|
dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE)
|
||||||
|
dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE)
|
||||||
|
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
|
||||||
|
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
|
||||||
|
dictconninfo["time_connect"] = conn.getinfo(pycurl.CONNECT_TIME)
|
||||||
|
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
|
||||||
|
return dictconninfo
|
||||||
|
|
||||||
|
def get_value_from_succ_conn(self,urlkey,url,conn):
|
||||||
|
dictinfo = {}
|
||||||
|
dictinfo["downloadsize"] = urlkey
|
||||||
|
dictinfo["url"] = url
|
||||||
|
dictinfo["time"] = time.asctime( time.localtime(time.time()))
|
||||||
|
dictinfo["result"] = self.build_conninfo_json(conn)
|
||||||
|
self.resultList.append(dictinfo)
|
||||||
|
|
||||||
|
def conn_filedownload(self,urlkey,url):
|
||||||
|
issuer = ()
|
||||||
|
conn = pycurl.Curl()
|
||||||
|
errdict = {}
|
||||||
|
conn.setopt(conn.WRITEFUNCTION, BytesIO().write)
|
||||||
|
conn.setopt(conn.SSL_VERIFYPEER, False)
|
||||||
|
conn.setopt(conn.OPT_CERTINFO, 1)
|
||||||
|
conn.setopt(conn.URL,url)
|
||||||
|
conn.perform()
|
||||||
|
certs = conn.getinfo(conn.INFO_CERTINFO)
|
||||||
|
for cert_info in certs[0]:
|
||||||
|
if cert_info[0].lower() == "issuer":
|
||||||
|
issuer = cert_info
|
||||||
|
break
|
||||||
|
if len(issuer) <= 0:
|
||||||
|
errdict["status"] = "error"
|
||||||
|
errdict["errinfo"] = "Get certificate info error"
|
||||||
|
errdict["url"] = url
|
||||||
|
errdict["time"] = time.asctime( time.localtime(time.time()))
|
||||||
|
self.resultList.append(errdict)
|
||||||
|
self.isException = True
|
||||||
|
elif not re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0):
|
||||||
|
errdict["status"] = "error"
|
||||||
|
errdict["errinfo"] = "Intercept fail: no Tango cert"
|
||||||
|
errdict["url"] = url
|
||||||
|
errdict["time"] = time.asctime( time.localtime(time.time()))
|
||||||
|
self.resultList.append(errdict)
|
||||||
|
self.isException = True
|
||||||
|
else:
|
||||||
|
self.get_value_from_succ_conn(urlkey,url,conn)
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def write_log(self):
|
||||||
|
logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime())
|
||||||
|
logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest"
|
||||||
|
with open(logNewestPath,"w+") as f:
|
||||||
|
f.write(json.dumps(self.resultList, sort_keys=True, indent=4, separators=(',', ': ')))
|
||||||
|
f.close()
|
||||||
|
with open(logpath,"a+") as f:
|
||||||
|
f.write(json.dumps(self.resultList))
|
||||||
|
f.write("\n")
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
def downfile_run(self):
|
||||||
|
for sizefield in self.sizeList:
|
||||||
|
self.conn_filedownload(sizefield,URLdictConTrafficInject[sizefield])
|
||||||
|
self.write_log()
|
||||||
|
if self.isException == True:
|
||||||
|
raise Exception("Error:http_hijack download file fail")
|
||||||
|
else:
|
||||||
|
raise Exception(https_download_file_info_re)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class SslUnitTest(unittest.TestCase):
|
class SslUnitTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_securityPolicy_bypass(self):
|
def test_securityPolicy_bypass(self):
|
||||||
@@ -370,6 +479,12 @@ class SslUnitTest(unittest.TestCase):
|
|||||||
with self.assertRaisesRegex(Exception, http_insert_info_re):
|
with self.assertRaisesRegex(Exception, http_insert_info_re):
|
||||||
httpHandler.http_insert()
|
httpHandler.http_insert()
|
||||||
|
|
||||||
|
def test_securityPolicy_con_traffic_inject(self):
|
||||||
|
requestHandler = SSLFileDownloadBuild()
|
||||||
|
with self.assertRaisesRegex(Exception,https_download_file_info_re):
|
||||||
|
requestHandler.downfile_run()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
suite = unittest.TestSuite()
|
suite = unittest.TestSuite()
|
||||||
suite._cleanup = False
|
suite._cleanup = False
|
||||||
@@ -386,6 +501,7 @@ if __name__ == '__main__':
|
|||||||
suite.addTest(SslUnitTest('test_proxyPolicy_replace'))
|
suite.addTest(SslUnitTest('test_proxyPolicy_replace'))
|
||||||
suite.addTest(SslUnitTest('test_proxyPolicy_hijack'))
|
suite.addTest(SslUnitTest('test_proxyPolicy_hijack'))
|
||||||
suite.addTest(SslUnitTest('test_proxyPolicy_insert'))
|
suite.addTest(SslUnitTest('test_proxyPolicy_insert'))
|
||||||
|
suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject'))
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
logpath = "/root/result_self_test/unittest/unittest_result.log." + time.strftime("%Y-%m-%d",time.localtime())
|
logpath = "/root/result_self_test/unittest/unittest_result.log." + time.strftime("%Y-%m-%d",time.localtime())
|
||||||
|
|||||||
Reference in New Issue
Block a user