1、新增命令行模式 2、新增tsg diagnose输出json格式

This commit is contained in:
fumingwei
2020-08-27 16:25:27 +08:00
parent 7871a22439
commit 0a648267e2

View File

@@ -6,6 +6,11 @@ import os
import re
import time
from io import BytesIO
import getopt
import ciunittest
import xmlrunner
import argparse
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
@@ -67,6 +72,7 @@ class SSLCheckRequestBuild:
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.TIMEOUT, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def ssl_bypass(self):
@@ -113,6 +119,7 @@ class SslInterceptRequestBuild:
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.TIMEOUT, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def ssl_intercept_certerrExpired(self):
@@ -250,6 +257,7 @@ class SslHttpRequestBuild:
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve)
self.conn.setopt(self.conn.TIMEOUT, 1)
def http_redirect(self):
self.conn.setopt(self.conn.URL, URLRedirect)
@@ -362,7 +370,8 @@ class SSLFileDownloadBuild:
errdict = {}
conn.setopt(conn.WRITEFUNCTION, BytesIO().write)
conn.setopt(conn.SSL_VERIFYPEER, False)
conn.setopt(conn.OPT_CERTINFO, 1)
conn.setopt(conn.OPT_CERTINFO, 1)
conn.setopt(conn.TIMEOUT, 1)
conn.setopt(conn.URL,url)
conn.perform()
certs = conn.getinfo(conn.INFO_CERTINFO)
@@ -483,38 +492,119 @@ class SslUnitTest(unittest.TestCase):
requestHandler.downfile_run()
if __name__ == '__main__':
suite = unittest.TestSuite()
suite._cleanup = False
suite.addTest(SslUnitTest('test_securityPolicy_bypass'))
suite.addTest(SslUnitTest('test_securityPolicy_intercept'))
suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired'))
suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host'))
suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed'))
suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root'))
suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked'))
suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test'))
suite.addTest(SslUnitTest('test_proxyPolicy_redirect'))
suite.addTest(SslUnitTest('test_proxyPolicy_block'))
suite.addTest(SslUnitTest('test_proxyPolicy_replace'))
suite.addTest(SslUnitTest('test_proxyPolicy_hijack'))
suite.addTest(SslUnitTest('test_proxyPolicy_insert'))
suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject'))
while True:
try:
logpath = "/root/result_self_test/unittest/unittest_result.log." + time.strftime("%Y-%m-%d",time.localtime())
lognewestpath = "/root/result_self_test/unittest/unittest_result.log.newest"
with open(lognewestpath,"w+") as f:
class TsgDiagnoseRun:
def __init__(self):
self.interval = 1
self.format = "txt"
self.write = None
self.loop = False
self.count = 1
def _get_suite_option(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help")
parser.add_argument('-i','--interval', type = int, default = 1,help='Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose.')
parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535')
parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: jsontxt, the default is txt.')
parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file. Specifies the output file name.')
parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal')
args = parser.parse_args()
self.interval = args.interval
self.format = args.format
self.write = args.write
self.loop = args.loop
self.count = args.count
if self.count == 0:
print("Error: bad number of tsg diagnose and will exit")
parser.print_help()
sys.exit(1)
if self.format not in ('json', 'txt'):
print("Error: bad output format of tsg diagnose and will exit")
parser.print_help()
sys.exit(1)
def _init_suite(self):
if self.format == 'txt':
self.suite = unittest.TestSuite()
self.suite._cleanup = False
self.suite.addTest(SslUnitTest('test_securityPolicy_bypass'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_redirect'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_block'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_replace'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_hijack'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_insert'))
self.suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject'))
if self.format == 'json':
self.suite = None
def _write_suite_result(self):
resultDict = '/root/result_self_test/unittest/'
resultNewestPath = resultDict + self.write
resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
if self.format == 'txt':
with open(resultNewestPath,"w+") as f:
runner = unittest.TextTestRunner(stream=f,verbosity=2)
f.write("\n" + "#"*50 + "Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "#"*50 + "\n")
runner.run(suite)
f.write("\n" + "="*50 + "Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "="*50 + "\n")
runner.run(self.suite)
f.close()
with open(logpath,"a+") as f:
fn = open(lognewestpath,'r')
f.write(fn.read())
fn.close()
f.close()
time.sleep(1)
except:
print("Exception:an exception occurred during the execution of the program:unittest",file=sys.stderr)
if self.format == 'json':
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True)
with open(resultNewestPath,"w+") as f:
f.write(result_json)
f.close()
with open(resultPath,"w+") as f:
fn = open(resultNewestPath,'r')
f.write(fn.read())
fn.close()
f.close()
def _stdout_suite_result(self):
print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s'))
if self.format == 'txt':
runner = unittest.TextTestRunner(verbosity=2)
runner.run(self.suite)
if self.format == 'json':
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True)
print(result_json)
print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s'))
def _output_suite_result(self):
if self.write:
self._write_suite_result()
else:
self._stdout_suite_result()
def execute_suite_tsg_diagnose(self):
self._get_suite_option()
self._init_suite()
try:
counter = 0
print("Tsg diagnose run sum: %d" % self.count)
while True:
print("\nRUN %d" %(counter + 1))
self._output_suite_result()
counter = counter + 1
if not self.loop:
if counter >= self.count:
break
time.sleep(self.interval)
except Exception as ex:
print("Process get an exception, will exit, Exception info:", ex)
sys.exit(1)
if __name__ == '__main__':
tsg_diagnose_run = TsgDiagnoseRun()
tsg_diagnose_run.execute_suite_tsg_diagnose()