diff --git a/unittest_python/unittest/con_traffic_inject.py b/unittest_python/unittest/con_traffic_inject.py index 59aea93..3626a0e 100644 --- a/unittest_python/unittest/con_traffic_inject.py +++ b/unittest_python/unittest/con_traffic_inject.py @@ -1,3 +1,4 @@ +import sys import json import pycurl import os @@ -64,7 +65,14 @@ class SSLBuild: conn.setopt(conn.SSL_VERIFYPEER, False) conn.setopt(conn.OPT_CERTINFO, 1) conn.setopt(conn.URL,url) - conn.perform() + try: + conn.perform() + except pycurl.error as e: + errdict["status"] = "error" + errdict["errinfo"] = e + errdict["url"] = url + errdict["time"] = time.asctime( time.localtime(time.time())) + return certs = conn.getinfo(conn.INFO_CERTINFO) for cert_info in certs[0]: if cert_info[0].lower() == "issuer": @@ -93,9 +101,18 @@ class SSLBuild: if __name__ == '__main__': while True: - ssl = SSLBuild() - ssl.ssl_intercept() - logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime()) - with open(logpath,"w+") as f: - f.write(json.dumps(ssl.resultlist, sort_keys=True, indent=4, separators=(',', ': '))) - time.sleep(1) + try: + ssl = SSLBuild() + ssl.ssl_intercept() + logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime()) + logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest" + with open(logNewestPath,"w+") as f: + f.write(json.dumps(ssl.resultlist, sort_keys=True, indent=4, separators=(',', ': '))) + f.close() + with open(logpath,"a+") as f: + f.write(json.dumps(ssl.resultlist)) + f.write("\n") + f.close() + time.sleep(1) + except: + print("Exception:an exception occurred during the execution of the program",file=sys.stderr) \ No newline at end of file diff --git a/unittest_python/unittest/unittest_self.py b/unittest_python/unittest/unittest_self.py index 2efddb7..f6e152c 100644 --- a/unittest_python/unittest/unittest_self.py +++ b/unittest_python/unittest/unittest_self.py @@ -1,3 +1,4 @@ +import sys import unittest import json import pycurl @@ -92,7 +93,7 @@ class SslInterceptRequestBuild: self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - def ssl_intercept_expired(self): + def ssl_intercept_certerrExpired(self): self.conn.setopt(self.conn.URL, URLSexpired) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) @@ -113,7 +114,7 @@ class SslInterceptRequestBuild: raise Exception("Error:Got other error certificate information") - def ssl_intercept_wrong_host(self): + def ssl_intercept_certerrWrong_host(self): self.conn.setopt(self.conn.URL,URLSwronghost) self.conn.setopt(self.conn.SSL_VERIFYHOST, False) self.conn.perform() @@ -134,7 +135,7 @@ class SslInterceptRequestBuild: else: raise Exception("Error:Got other error certificate information") - def ssl_intercept_self_signed(self): + def ssl_intercept_certerrSelf_signed(self): self.conn.setopt(self.conn.URL,URLSselfsigned) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) @@ -154,7 +155,7 @@ class SslInterceptRequestBuild: else: raise Exception("Error:Got other error certificate information") - def ssl_intercept_untrusted_root(self): + def ssl_intercept_certerrUntrusted_root(self): self.conn.setopt(self.conn.URL,URLSuntrustedroot) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) @@ -175,7 +176,7 @@ class SslInterceptRequestBuild: raise Exception("Error:Got other error certificate information") - def ssl_intercept_revoked(self): + def ssl_intercept_certerrRevoked(self): self.conn.setopt(self.conn.URL,URLSrevoked) self.conn.perform() @@ -197,7 +198,7 @@ class SslInterceptRequestBuild: raise Exception("Error:Got other error certificate information") - def ssl_intercept_pinning_test(self): + def ssl_intercept_certerrPinning_test(self): self.conn.setopt(self.conn.URL,URLSpinningtest) self.conn.perform() @@ -304,67 +305,67 @@ class SslHttpRequestBuild: class SslUnitTest(unittest.TestCase): - def test_ssl_bypass(self): + def test_securityPolicy_bypass(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_bypass_info_re): sslHandler.ssl_bypass() - def test_ssl_intercept(self): + def test_securityPolicy_intercept(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_intercept_info_re): sslHandler.ssl_intercept() - def test_ssl_intercept_expired(self): + def test_securityPolicy_intercept_certerrExpired(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_exprired_info_re): - requestHandler.ssl_intercept_expired() + requestHandler.ssl_intercept_certerrExpired() - def test_ssl_intercept_wrong_host(self): + def test_securityPolicy_intercept_certerrWrong_host(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_wrong_host_info_re): - requestHandler.ssl_intercept_wrong_host() + requestHandler.ssl_intercept_certerrWrong_host() - def test_ssl_intercept_self_signed(self): + def test_securityPolicy_intercept_certerrSelf_signed(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_self_signed_info_re): - requestHandler.ssl_intercept_self_signed() + requestHandler.ssl_intercept_certerrSelf_signed() - def test_ssl_intercept_untrusted_root(self): + def test_securityPolicy_intercept_certerrUntrusted_root(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_untrusted_root_info_re): - requestHandler.ssl_intercept_untrusted_root() + requestHandler.ssl_intercept_certerrUntrusted_root() - def test_ssl_intercept_revoked(self): + def test_securityPolicy_intercept_certerrRevoked(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_revoked_info_re): - requestHandler.ssl_intercept_revoked() + requestHandler.ssl_intercept_certerrRevoked() - def test_ssl_intercept_pinning_test(self): + def test_securityPolicy_intercept_certerrPinning_test(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_pinning_test_info_re): - requestHandler.ssl_intercept_pinning_test() + requestHandler.ssl_intercept_certerrPinning_test() - def test_http_redirect(self): + def test_proxyPolicy_redirect(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_redirect_info_re): httpHandler.http_redirect() - def test_http_block(self): + def test_proxyPolicy_block(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_block_info_re): httpHandler.http_block() - def test_http_replace(self): + def test_proxyPolicy_replace(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_replace_info_re): httpHandler.http_replace() - def test_http_hijack(self): + def test_proxyPolicy_hijack(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_hijack_info_re): httpHandler.http_hijack() - def test_http_insert(self): + def test_proxyPolicy_insert(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_insert_info_re): httpHandler.http_insert() @@ -372,25 +373,35 @@ class SslUnitTest(unittest.TestCase): if __name__ == '__main__': suite = unittest.TestSuite() suite._cleanup = False - suite.addTest(SslUnitTest('test_ssl_bypass')) - suite.addTest(SslUnitTest('test_ssl_intercept')) - suite.addTest(SslUnitTest('test_ssl_intercept_expired')) - suite.addTest(SslUnitTest('test_ssl_intercept_wrong_host')) - suite.addTest(SslUnitTest('test_ssl_intercept_self_signed')) - suite.addTest(SslUnitTest('test_ssl_intercept_untrusted_root')) - suite.addTest(SslUnitTest('test_ssl_intercept_revoked')) - suite.addTest(SslUnitTest('test_ssl_intercept_pinning_test')) - suite.addTest(SslUnitTest('test_http_redirect')) - suite.addTest(SslUnitTest('test_http_block')) - suite.addTest(SslUnitTest('test_http_replace')) - suite.addTest(SslUnitTest('test_http_hijack')) - suite.addTest(SslUnitTest('test_http_insert')) + suite.addTest(SslUnitTest('test_securityPolicy_bypass')) + suite.addTest(SslUnitTest('test_securityPolicy_intercept')) + suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired')) + suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host')) + suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed')) + suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root')) + suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked')) + suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test')) + suite.addTest(SslUnitTest('test_proxyPolicy_redirect')) + suite.addTest(SslUnitTest('test_proxyPolicy_block')) + suite.addTest(SslUnitTest('test_proxyPolicy_replace')) + suite.addTest(SslUnitTest('test_proxyPolicy_hijack')) + suite.addTest(SslUnitTest('test_proxyPolicy_insert')) while True: - logpath = "/root/result_self_test/unittest/unittest_result.log." + time.strftime("%Y-%m-%d",time.localtime()) - with open(logpath,"w+") as f: - runner = unittest.TextTestRunner(stream=f,verbosity=2) - f.write("Test start time: ") - f.write(time.asctime(time.localtime(time.time()))) - f.write("\n") - runner.run(suite) - time.sleep(1) + try: + logpath = "/root/result_self_test/unittest/unittest_result.log." + time.strftime("%Y-%m-%d",time.localtime()) + lognewestpath = "/root/result_self_test/unittest/unittest_result.log.newest" + with open(lognewestpath,"w+") as f: + runner = unittest.TextTestRunner(stream=f,verbosity=2) + f.write("Test start time: ") + f.write(time.asctime(time.localtime(time.time()))) + f.write("\n") + runner.run(suite) + f.close() + with open(logpath,"a+") as f: + fn = open(lognewestpath,'r') + f.write(fn.read()) + fn.close() + f.close() + time.sleep(1) + except: + print("Exception:an exception occurred during the execution of the program:unittest",file=sys.stderr)