1、TSG-1199 持续注入自检报文,动态监测系统的处理表现 2、修改unittest 持续运行
This commit is contained in:
@@ -61,6 +61,7 @@ services:
|
|||||||
ipv4_address: 172.31.254.4
|
ipv4_address: 172.31.254.4
|
||||||
volumes:
|
volumes:
|
||||||
- /root/.badssl_self_test_cert_dict:/root/cafile_dict
|
- /root/.badssl_self_test_cert_dict:/root/cafile_dict
|
||||||
|
- /root/.result_self_test:/root/result_self_test
|
||||||
- /etc/localtime:/etc/localtime:ro
|
- /etc/localtime:/etc/localtime:ro
|
||||||
command:
|
command:
|
||||||
- /bin/sh
|
- /bin/sh
|
||||||
|
|||||||
@@ -86,3 +86,4 @@
|
|||||||
192.168.253.130 www.badssl.self-test.geedge.net
|
192.168.253.130 www.badssl.self-test.geedge.net
|
||||||
192.168.253.130 xn--n1aae7f7o.badssl.self-test.geedge.net
|
192.168.253.130 xn--n1aae7f7o.badssl.self-test.geedge.net
|
||||||
#### end of badssl.self-test.geedge.net hosts ####
|
#### end of badssl.self-test.geedge.net hosts ####
|
||||||
|
192.168.253.130 https://downloadfile.self-test.geedge.net
|
||||||
|
|||||||
94
unittest_python/unittest/con_traffic_inject.py
Normal file
94
unittest_python/unittest/con_traffic_inject.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
import json
|
||||||
|
import pycurl
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
class SSLBuild:
|
||||||
|
def __init__(self):
|
||||||
|
self.urldict = {"0k":"https://downloadfile.self-test.geedge.net/0k", \
|
||||||
|
"1k":"https://downloadfile.self-test.geedge.net/1k", \
|
||||||
|
"2k":"https://downloadfile.self-test.geedge.net/2k", \
|
||||||
|
"4k":"https://downloadfile.self-test.geedge.net/4k", \
|
||||||
|
"8k":"https://downloadfile.self-test.geedge.net/8k", \
|
||||||
|
"16k":"https://downloadfile.self-test.geedge.net/16k", \
|
||||||
|
"32k":"https://downloadfile.self-test.geedge.net/32k", \
|
||||||
|
"64k":"https://downloadfile.self-test.geedge.net/64k", \
|
||||||
|
"128k":"https://downloadfile.self-test.geedge.net/128k", \
|
||||||
|
"256k":"https://downloadfile.self-test.geedge.net/256k", \
|
||||||
|
"512k":"https://downloadfile.self-test.geedge.net/512k", \
|
||||||
|
"1M":"https://downloadfile.self-test.geedge.net/1M", \
|
||||||
|
"2M":"https://downloadfile.self-test.geedge.net/2M", \
|
||||||
|
"4M":"https://downloadfile.self-test.geedge.net/4M", \
|
||||||
|
"8M":"https://downloadfile.self-test.geedge.net/8M", \
|
||||||
|
"16M":"https://downloadfile.self-test.geedge.net/16M", \
|
||||||
|
"32M":"https://downloadfile.self-test.geedge.net/32M", \
|
||||||
|
"64M":"https://downloadfile.self-test.geedge.net/64M"}
|
||||||
|
self.sizelist = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"]
|
||||||
|
self.resultlist = []
|
||||||
|
|
||||||
|
def build_conninfo_json(self,conn):
|
||||||
|
dictconninfo = {}
|
||||||
|
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
|
||||||
|
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
|
||||||
|
dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME)
|
||||||
|
dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME)
|
||||||
|
dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME)
|
||||||
|
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
|
||||||
|
dictconninfo["redirect_count"] = conn.getinfo(pycurl.REDIRECT_COUNT)
|
||||||
|
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
|
||||||
|
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
|
||||||
|
dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE)
|
||||||
|
dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE)
|
||||||
|
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
|
||||||
|
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
|
||||||
|
dictconninfo["time_connect"] = conn.getinfo(pycurl.CONNECT_TIME)
|
||||||
|
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
|
||||||
|
return dictconninfo
|
||||||
|
|
||||||
|
def intecept_succ_get_value(self,urlkey,url,conn):
|
||||||
|
dictinfo = {}
|
||||||
|
dictinfo["downloadsize"] = urlkey
|
||||||
|
dictinfo["url"] = url
|
||||||
|
dictinfo["time"] = time.asctime( time.localtime(time.time()))
|
||||||
|
dictinfo["result"] = self.build_conninfo_json(conn)
|
||||||
|
self.resultlist.append(dictinfo)
|
||||||
|
|
||||||
|
|
||||||
|
def ssl_conn(self,urlkey,url):
|
||||||
|
issuer = ()
|
||||||
|
conn = pycurl.Curl()
|
||||||
|
errdict = {}
|
||||||
|
conn.setopt(conn.WRITEFUNCTION, BytesIO().write)
|
||||||
|
conn.setopt(conn.SSL_VERIFYPEER, False)
|
||||||
|
conn.setopt(conn.OPT_CERTINFO, 1)
|
||||||
|
conn.setopt(conn.URL,url)
|
||||||
|
conn.perform()
|
||||||
|
certs = conn.getinfo(conn.INFO_CERTINFO)
|
||||||
|
for cert_info in certs[0]:
|
||||||
|
if cert_info[0].lower() == "issuer":
|
||||||
|
issuer = cert_info
|
||||||
|
break
|
||||||
|
if len(issuer) <= 0:
|
||||||
|
errdict["status"] = "error"
|
||||||
|
errdict["errinfo"] = "Get certificate info error"
|
||||||
|
errdict["url"] = url
|
||||||
|
errdict["time"] = time.asctime( time.localtime(time.time()))
|
||||||
|
self.resultlist.append(errdict)
|
||||||
|
else:
|
||||||
|
self.intecept_succ_get_value(urlkey,url,conn)
|
||||||
|
conn.close()
|
||||||
|
def ssl_intercept(self):
|
||||||
|
|
||||||
|
for sizefield in self.sizelist:
|
||||||
|
self.ssl_conn(sizefield,self.urldict[sizefield])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
while True:
|
||||||
|
ssl = SSLBuild()
|
||||||
|
ssl.ssl_intercept()
|
||||||
|
with open("/root/result_self_test/result_con_traffic_inject.txt","w+") as f:
|
||||||
|
f.write(json.dumps(ssl.resultlist, sort_keys=True, indent=4, separators=(',', ': ')))
|
||||||
|
time.sleep(1)
|
||||||
@@ -3,7 +3,9 @@ import json
|
|||||||
import pycurl
|
import pycurl
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
import xmlrunner
|
||||||
|
|
||||||
|
|
||||||
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
|
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
|
||||||
@@ -370,6 +372,7 @@ class SslUnitTest(unittest.TestCase):
|
|||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
suite = unittest.TestSuite()
|
suite = unittest.TestSuite()
|
||||||
|
suite._cleanup = False
|
||||||
suite.addTest(SslUnitTest('test_ssl_bypass'))
|
suite.addTest(SslUnitTest('test_ssl_bypass'))
|
||||||
suite.addTest(SslUnitTest('test_ssl_intercept'))
|
suite.addTest(SslUnitTest('test_ssl_intercept'))
|
||||||
suite.addTest(SslUnitTest('test_ssl_intercept_expired'))
|
suite.addTest(SslUnitTest('test_ssl_intercept_expired'))
|
||||||
@@ -383,10 +386,10 @@ if __name__ == '__main__':
|
|||||||
suite.addTest(SslUnitTest('test_http_replace'))
|
suite.addTest(SslUnitTest('test_http_replace'))
|
||||||
suite.addTest(SslUnitTest('test_http_hijack'))
|
suite.addTest(SslUnitTest('test_http_hijack'))
|
||||||
suite.addTest(SslUnitTest('test_http_insert'))
|
suite.addTest(SslUnitTest('test_http_insert'))
|
||||||
with open("/root/unittest_result.txt","w+") as f:
|
with open("/root/result_self_test/unittest_result.txt","w+") as f:
|
||||||
runner = unittest.TextTestRunner(stream=f,verbosity=2)
|
runner = unittest.TextTestRunner(stream=f,verbosity=2)
|
||||||
runner.run(suite)
|
while 1:
|
||||||
print('\n'*10)
|
f.write("Test start time: ")
|
||||||
print("-"*100)
|
f.write(time.asctime(time.localtime(time.time())))
|
||||||
os.system("cat /root/unittest_result.txt")
|
f.write("\n")
|
||||||
print("="*100)
|
runner.run(suite)
|
||||||
|
|||||||
Reference in New Issue
Block a user