diff --git a/unittest_python/unittest/unittest_self.py b/unittest_python/unittest/unittest_self.py index a1f5c18..3e2d9e7 100644 --- a/unittest_python/unittest/unittest_self.py +++ b/unittest_python/unittest/unittest_self.py @@ -6,6 +6,11 @@ import os import re import time from io import BytesIO +import getopt +import ciunittest +import xmlrunner +import argparse + URLBypass = 'https://sha384.badssl.self-test.geedge.net' @@ -67,6 +72,7 @@ class SSLCheckRequestBuild: self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) + self.conn.setopt(self.conn.TIMEOUT, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def ssl_bypass(self): @@ -113,6 +119,7 @@ class SslInterceptRequestBuild: self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) + self.conn.setopt(self.conn.TIMEOUT, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def ssl_intercept_certerrExpired(self): @@ -250,6 +257,7 @@ class SslHttpRequestBuild: self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.ENCODING, "gzip,deflate") self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve) + self.conn.setopt(self.conn.TIMEOUT, 1) def http_redirect(self): self.conn.setopt(self.conn.URL, URLRedirect) @@ -362,7 +370,8 @@ class SSLFileDownloadBuild: errdict = {} conn.setopt(conn.WRITEFUNCTION, BytesIO().write) conn.setopt(conn.SSL_VERIFYPEER, False) - conn.setopt(conn.OPT_CERTINFO, 1) + conn.setopt(conn.OPT_CERTINFO, 1) + conn.setopt(conn.TIMEOUT, 1) conn.setopt(conn.URL,url) conn.perform() certs = conn.getinfo(conn.INFO_CERTINFO) @@ -483,38 +492,119 @@ class SslUnitTest(unittest.TestCase): requestHandler.downfile_run() -if __name__ == '__main__': - suite = unittest.TestSuite() - suite._cleanup = False - suite.addTest(SslUnitTest('test_securityPolicy_bypass')) - suite.addTest(SslUnitTest('test_securityPolicy_intercept')) - suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired')) - suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host')) - suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed')) - suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root')) - suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked')) - suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test')) - suite.addTest(SslUnitTest('test_proxyPolicy_redirect')) - suite.addTest(SslUnitTest('test_proxyPolicy_block')) - suite.addTest(SslUnitTest('test_proxyPolicy_replace')) - suite.addTest(SslUnitTest('test_proxyPolicy_hijack')) - suite.addTest(SslUnitTest('test_proxyPolicy_insert')) - suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject')) - while True: - try: - logpath = "/root/result_self_test/unittest/unittest_result.log." + time.strftime("%Y-%m-%d",time.localtime()) - lognewestpath = "/root/result_self_test/unittest/unittest_result.log.newest" - with open(lognewestpath,"w+") as f: +class TsgDiagnoseRun: + def __init__(self): + self.interval = 1 + self.format = "txt" + self.write = None + self.loop = False + self.count = 1 + + def _get_suite_option(self): + parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help") + parser.add_argument('-i','--interval', type = int, default = 1,help='Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose.') + parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535') + parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: json,txt, the default is txt.') + parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file. Specifies the output file name.') + parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal') + args = parser.parse_args() + self.interval = args.interval + self.format = args.format + self.write = args.write + self.loop = args.loop + self.count = args.count + if self.count == 0: + print("Error: bad number of tsg diagnose and will exit") + parser.print_help() + sys.exit(1) + + if self.format not in ('json', 'txt'): + print("Error: bad output format of tsg diagnose and will exit") + parser.print_help() + sys.exit(1) + + def _init_suite(self): + if self.format == 'txt': + self.suite = unittest.TestSuite() + self.suite._cleanup = False + self.suite.addTest(SslUnitTest('test_securityPolicy_bypass')) + self.suite.addTest(SslUnitTest('test_securityPolicy_intercept')) + self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired')) + self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host')) + self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed')) + self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root')) + self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked')) + self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test')) + self.suite.addTest(SslUnitTest('test_proxyPolicy_redirect')) + self.suite.addTest(SslUnitTest('test_proxyPolicy_block')) + self.suite.addTest(SslUnitTest('test_proxyPolicy_replace')) + self.suite.addTest(SslUnitTest('test_proxyPolicy_hijack')) + self.suite.addTest(SslUnitTest('test_proxyPolicy_insert')) + self.suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject')) + + if self.format == 'json': + self.suite = None + + + def _write_suite_result(self): + resultDict = '/root/result_self_test/unittest/' + resultNewestPath = resultDict + self.write + resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) + if self.format == 'txt': + with open(resultNewestPath,"w+") as f: runner = unittest.TextTestRunner(stream=f,verbosity=2) - f.write("\n" + "#"*50 + "Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "#"*50 + "\n") - runner.run(suite) - f.write("\n" + "="*50 + "Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "="*50 + "\n") + runner.run(self.suite) f.close() - with open(logpath,"a+") as f: - fn = open(lognewestpath,'r') - f.write(fn.read()) - fn.close() - f.close() - time.sleep(1) - except: - print("Exception:an exception occurred during the execution of the program:unittest",file=sys.stderr) + + if self.format == 'json': + self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest) + result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True) + with open(resultNewestPath,"w+") as f: + f.write(result_json) + f.close() + + with open(resultPath,"w+") as f: + fn = open(resultNewestPath,'r') + f.write(fn.read()) + fn.close() + f.close() + + def _stdout_suite_result(self): + print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s')) + if self.format == 'txt': + runner = unittest.TextTestRunner(verbosity=2) + runner.run(self.suite) + if self.format == 'json': + self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest) + result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True) + print(result_json) + print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s')) + + def _output_suite_result(self): + if self.write: + self._write_suite_result() + else: + self._stdout_suite_result() + + def execute_suite_tsg_diagnose(self): + self._get_suite_option() + self._init_suite() + try: + counter = 0 + print("Tsg diagnose run sum: %d" % self.count) + while True: + print("\nRUN %d" %(counter + 1)) + self._output_suite_result() + counter = counter + 1 + if not self.loop: + if counter >= self.count: + break + time.sleep(self.interval) + except Exception as ex: + print("Process get an exception, will exit, Exception info:", ex) + sys.exit(1) + + +if __name__ == '__main__': + tsg_diagnose_run = TsgDiagnoseRun() + tsg_diagnose_run.execute_suite_tsg_diagnose() \ No newline at end of file