import sys import unittest import json import pycurl import os import re import time from io import BytesIO import getopt import ciunittest import argparse from telegraf.client import TelegrafClient URLBypass = 'https://sha384.badssl.self-test.geedge.net' URLIntercept = 'https://sha256.badssl.self-test.geedge.net' URLSexpired = 'https://expired.badssl.self-test.geedge.net' URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net' URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net' URLRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js' URLReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js' URLInsert = 'https://cn.bing.com/?FORM=BEHPTB' URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js' URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js' URLConTraffic_1k = "https://downloadfile.self-test.geedge.net/1k" URLConTraffic_4k = "https://downloadfile.self-test.geedge.net/4k" URLConTraffic_16k = "https://downloadfile.self-test.geedge.net/16k" URLConTraffic_64k = "https://downloadfile.self-test.geedge.net/64k" URLConTraffic_256k = "https://downloadfile.self-test.geedge.net/256k" URLConTraffic_1M = "https://downloadfile.self-test.geedge.net/1M" URLConTraffic_4M = "https://downloadfile.self-test.geedge.net/4M" URLConTraffic_16M = "https://downloadfile.self-test.geedge.net/16M" URLConTraffic_64M = "https://downloadfile.self-test.geedge.net/64M" ssl_bypass_info_re = "Ssl connection bypass success" ssl_intercept_info_re = "Ssl connection intercept success" https_exprired_info_re = "https exprired ok" https_self_signed_info_re = "https self signed ok" https_untrusted_root_info_re = "https untrusted_root ok" http_redirect_info_re = "http connection redirect success" http_replace_info_re = "http connection replace success" http_insert_info_re = "http connection insert success" http_hijack_info_re = "http connection hijack success" http_block_info_re = "http connection block success" https_conn_taffic_1k_re = 'https download file 1k success' https_conn_taffic_4k_re = 'https download file 4k success' https_conn_taffic_16k_re = 'https download file 16k success' https_conn_taffic_64k_re = 'https download file 64k success' https_conn_taffic_256k_re = 'https download file 256k success' https_conn_taffic_1M_re = 'https download file 1M success' https_conn_taffic_4M_re = 'https download file 4M success' https_conn_taffic_16M_re = 'https download file 16M success' https_conn_taffic_64M_re = 'https download file 64M success' wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131'] class SSLCheckRequestBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def ssl_bypass(self,conTimeout): self.conn.setopt(self.conn.URL,URLBypass) self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): raise Exception(ssl_bypass_info_re) elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0): raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1]) else: raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1]) def ssl_intercept(self,conTimeout): self.conn.setopt(self.conn.URL,URLIntercept) self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1]) else: raise Exception(ssl_intercept_info_re) else: raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) class SslInterceptRequestBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def _conn_to_perform(self, pxy_info_re): self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception( pxy_info_re) else: raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1]) else: raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) def ssl_intercept_certerrExpired(self,conTimeout): self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.setopt(self.conn.URL, URLSexpired) self._conn_to_perform(https_exprired_info_re) def ssl_intercept_certerrSelf_signed(self,conTimeout): self.conn.setopt(self.conn.URL,URLSselfsigned) self.conn.setopt(self.conn.TIMEOUT,conTimeout) self._conn_to_perform(https_self_signed_info_re) def ssl_intercept_certerrUntrusted_root(self,conTimeout): self.conn.setopt(self.conn.URL,URLSuntrustedroot) self.conn.setopt(self.conn.TIMEOUT,conTimeout) self._conn_to_perform(https_untrusted_root_info_re) class SslHttpRequestBuild: def __init__(self): self.bodyBuf = BytesIO() self.conn = pycurl.Curl() self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.ENCODING, "gzip,deflate") self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve) def _cert_verify(self, pxy_action_info_re,certs): issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception( pxy_info_re) else: raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1]) else: raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) def http_redirect(self,conTimeout): self.conn.setopt(self.conn.URL, URLRedirect) self.conn.setopt(self.conn.TIMEOUT, conTimeout) self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.close() self._cert_verify(http_redirect_info_re,certs) if rescode == 301 or rescode == 302: raise Exception(http_redirect_info_re) else: raise Exception("Error:Http connection redirect fail") def http_replace(self,conTimeout): self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) self.conn.setopt(self.conn.URL, URLReplace) self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() self._cert_verify(http_replace_info_re,certs) if not re.search(r'EnglishSearchShared', body, 0) and \ re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0): raise Exception(http_replace_info_re) else: raise Exception("Error:Http connection replace fail") def http_insert(self,conTimeout): self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) self.conn.setopt(self.conn.URL, URLInsert) self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.perform() body = self.bodyBuf.getvalue().decode('utf-8') certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() self._cert_verify(http_insert_info_re,certs) if re.search(r'httpSelfcheckInsert', body, 0) and \ re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0): raise Exception(http_insert_info_re) else: raise Exception("Error:Http connection insert fail") def http_block(self,conTimeout): self.conn.setopt(self.conn.URL, URLBlock) self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() self._cert_verify(http_block_info_re,certs) if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451): raise Exception(http_block_info_re) else: raise Exception("Error:http connection block fail") def http_hijack(self,conTimeout): self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.setopt(self.conn.URL, URLHijack) self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() self._cert_verify(http_hijack_info_re,certs) if os.path.exists("/root/http_hijack.out"): os.remove("/root/http_hijack.out") cmdtodo = 'curl %s -k -s --resolve cn.bing.com:443:192.0.2.131 -o /root/http_hijack.out' % URLHijack optdl = os.popen(cmdtodo) if len(optdl.read()): optdl.close() raise Exception("Error:http_hijack download file fail") optdl.close() if not os.path.exists("/root/http_hijack.out"): raise Exception("Error:http_hijack download file fail") optmd5 = os.popen("md5sum /root/http_hijack.out") if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", optmd5.read(), 0): optmd5.close() raise Exception(http_hijack_info_re) else: optmd5.close() raise Exception("Error:http connection hijack fail") class SSLFileDownloadBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'}) def _get_conninfo(self,conn): dictconninfo = {} dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME) dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME) dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME) dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) return dictconninfo def _write_in_nezha(self, sizeStr, connInfoDict): nzdict = {} nzname = 'conn_taffic_status_size_' + sizeStr dictKeyTime = "conn_traffic_" + sizeStr + "_size_total_time" dcitKeyStatus = "conn_traffic_" + sizeStr + "_size_status" nzdict[dictKeyTime] = connInfoDict['total_time'] nzdict[dcitKeyStatus] = connInfoDict['status'] self.client.metric(nzname, nzdict) def _write_in_logfile(self, sizeStr, connInfoDict): logNewestPath = "/root/result_tsg_diagnose/conn_traffic_status/conn_traffic_status_" + sizeStr logPath = logNewestPath + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) connInfoStr = json.dumps(connInfoDict) with open(logNewestPath,"w+") as f: f.write(connInfoStr) f.close() with open(logPath,"w+") as f: fn = open(logNewestPath,'r') f.write(fn.read()) fn.close() f.close() def conn_traffic(self,URL,conn_taffic_re, sizeStr, size,conTimeout): self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.setopt(self.conn.URL,URL) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) conninfo = self._get_conninfo(self.conn) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if not re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception("Error: Intercept fail: no Tango cert,cert info:%s" % issuer[1]) if int(conninfo["size_download"]) == size: self._write_in_nezha(sizeStr,conninfo) self._write_in_logfile(sizeStr,conninfo) raise Exception(conn_taffic_re) else: raise Exception("Error: connection tarffic size error and is no equal", sizeStr) class SslUnitTest(unittest.TestCase): def test_securityPolicy_bypass(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_bypass_info_re): sslHandler.ssl_bypass(1) def test_securityPolicy_intercept(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_intercept_info_re): sslHandler.ssl_intercept(1) def test_securityPolicy_intercept_certerrExpired(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_exprired_info_re): requestHandler.ssl_intercept_certerrExpired(1) def test_securityPolicy_intercept_certerrSelf_signed(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_self_signed_info_re): requestHandler.ssl_intercept_certerrSelf_signed(1) def test_securityPolicy_intercept_certerrUntrusted_root(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_untrusted_root_info_re): requestHandler.ssl_intercept_certerrUntrusted_root(1) def test_proxyPolicy_redirect(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_redirect_info_re): httpHandler.http_redirect(2) def test_proxyPolicy_block(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_block_info_re): httpHandler.http_block(2) def test_proxyPolicy_replace(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_replace_info_re): httpHandler.http_replace(2) def test_proxyPolicy_hijack(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_hijack_info_re): httpHandler.http_hijack(2) def test_proxyPolicy_insert(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_insert_info_re): httpHandler.http_insert(2) def test_https_con_traffic_1k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re): requestHandler.conn_traffic( URLConTraffic_1k,https_conn_taffic_1k_re, '1k', 1024,1) def test_https_con_traffic_4k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re): requestHandler.conn_traffic( URLConTraffic_4k,https_conn_taffic_4k_re, '4k', 4*1024,1) def test_https_con_traffic_16k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re): requestHandler.conn_traffic( URLConTraffic_16k,https_conn_taffic_16k_re, '16k', 16*1024,1) def test_https_con_traffic_64k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re): requestHandler.conn_traffic( URLConTraffic_64k,https_conn_taffic_64k_re, '64k', 64*1024,1) def test_https_con_traffic_256k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re): requestHandler.conn_traffic( URLConTraffic_256k,https_conn_taffic_256k_re, '256k', 256*1024,2) def test_https_con_traffic_1M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re): requestHandler.conn_traffic( URLConTraffic_1M,https_conn_taffic_1M_re, '1M', 1024 * 1024,2) def test_https_con_traffic_4M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re): requestHandler.conn_traffic( URLConTraffic_4M,https_conn_taffic_4M_re, '4M', 4*1024*1024,2) def test_https_con_traffic_16M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re): requestHandler.conn_traffic( URLConTraffic_16M,https_conn_taffic_16M_re, '16M',16*1024*1024,4) def test_https_con_traffic_64M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re): requestHandler.conn_traffic( URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024,4) class TsgDiagnoseRun: def __init__(self): self.interval = 1 self.format = "txt" self.write = None self.loop = False self.count = 1 self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'}) def _get_suite_option(self): parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help") parser.add_argument('-i','--interval', type = int, default = 1,help='Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose.') parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535') parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: json,txt, the default is txt.') parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file or NEZHA. Specifies the output file name or NEZHA.') parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal') args = parser.parse_args() self.interval = args.interval self.format = args.format self.write = args.write self.loop = args.loop self.count = args.count if self.count == 0: print("Error: bad number of tsg diagnose and will exit") parser.print_help() sys.exit(1) if self.format not in ('json', 'txt'): print("Error: bad output format of tsg diagnose and will exit") parser.print_help() sys.exit(1) def _init_suite(self): if self.format == 'txt': self.suite = unittest.TestSuite() self.suite._cleanup = False self.suite.addTest(SslUnitTest('test_securityPolicy_bypass')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root')) self.suite.addTest(SslUnitTest('test_proxyPolicy_redirect')) self.suite.addTest(SslUnitTest('test_proxyPolicy_block')) self.suite.addTest(SslUnitTest('test_proxyPolicy_replace')) self.suite.addTest(SslUnitTest('test_proxyPolicy_hijack')) self.suite.addTest(SslUnitTest('test_proxyPolicy_insert')) self.suite.addTest(SslUnitTest('test_https_con_traffic_1k')) self.suite.addTest(SslUnitTest('test_https_con_traffic_4k')) self.suite.addTest(SslUnitTest('test_https_con_traffic_16k')) self.suite.addTest(SslUnitTest('test_https_con_traffic_64k')) self.suite.addTest(SslUnitTest('test_https_con_traffic_256k')) self.suite.addTest(SslUnitTest('test_https_con_traffic_1M')) self.suite.addTest(SslUnitTest('test_https_con_traffic_4M')) self.suite.addTest(SslUnitTest('test_https_con_traffic_16M')) self.suite.addTest(SslUnitTest('test_https_con_traffic_64M')) if self.format == 'json': self.suite = None def _write_suite_result_into_file(self): resultDict = '/root/result_tsg_diagnose/unittest/' resultNewestPath = resultDict + self.write resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) if self.format == 'txt': with open(resultNewestPath,"w+") as f: runner = unittest.TextTestRunner(stream=f,verbosity=2) runner.run(self.suite) f.close() if self.format == 'json': self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest) result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True) with open(resultNewestPath,"w+") as f: f.write(result_json) f.close() with open(resultPath,"w+") as f: fn = open(resultNewestPath,'r') f.write(fn.read()) fn.close() f.close() def _write_suite_result_into_NEZHA(self): nzdict = {} self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest) result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=False) result_dict = json.loads(result_json) reuslt_list = result_dict['results'] succsum = 0 failsum = 0 for reuslt in reuslt_list: succkey = reuslt['name'].split()[0] + '_succ' nzdict[succkey] = 0 if reuslt['type'] == 'success': nzdict[succkey] = 1 succsum = succsum + 1 if reuslt['type'] == 'failure': failsum = failsum + 1 nzdict['succsum'] = succsum self.client.metric('tsg_diagnose_result', nzdict) result_dict['succsum'] = succsum result_dict['failsum'] = succsum result_stdout = json.dumps(result_dict) print(result_stdout) def _stdout_suite_result(self): print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s')) if self.format == 'txt': runner = unittest.TextTestRunner(verbosity=2) runner.run(self.suite) if self.format == 'json': self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest) result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True) print(result_json) print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s')) def _output_suite_result(self): if self.write and self.write != 'NEZHA': self._write_suite_result_into_file() elif self.write == 'NEZHA': self._write_suite_result_into_NEZHA() else: self._stdout_suite_result() def execute_suite_tsg_diagnose(self): self._get_suite_option() self._init_suite() try: counter = 0 print("Tsg diagnose run sum: %d" % self.count) while True: print("\nRUN %d" %(counter + 1)) self._output_suite_result() counter = counter + 1 if not self.loop: if counter >= self.count: break time.sleep(self.interval) except Exception as ex: print("Process get an exception, will exit, Exception info:", ex) sys.exit(1) if __name__ == '__main__': tsg_diagnose_run = TsgDiagnoseRun() tsg_diagnose_run.execute_suite_tsg_diagnose()