import sys import unittest import json import pycurl import os import re import time from io import BytesIO URLBypass = 'https://sha384.badssl.self-test.geedge.net' URLIntercept = 'https://sha256.badssl.self-test.geedge.net' URLSexpired = 'https://expired.badssl.self-test.geedge.net' URLSwronghost = 'https://wrong.host.badssl.self-test.geedge.net' URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net' URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net' URLSrevoked = 'https://revoked.badssl.self-test.geedge.net' URLSpinningtest = 'https://pinning-test.badssl.self-test.geedge.net' URLRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js' URLReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js' URLInsert = 'https://cn.bing.com/?FORM=BEHPTB' URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js' URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js' URLdictConTrafficInject = { "0k":"https://downloadfile.self-test.geedge.net/0k", \ "1k":"https://downloadfile.self-test.geedge.net/1k", \ "2k":"https://downloadfile.self-test.geedge.net/2k", \ "4k":"https://downloadfile.self-test.geedge.net/4k", \ "8k":"https://downloadfile.self-test.geedge.net/8k", \ "16k":"https://downloadfile.self-test.geedge.net/16k", \ "32k":"https://downloadfile.self-test.geedge.net/32k", \ "64k":"https://downloadfile.self-test.geedge.net/64k", \ "128k":"https://downloadfile.self-test.geedge.net/128k", \ "256k":"https://downloadfile.self-test.geedge.net/256k", \ "512k":"https://downloadfile.self-test.geedge.net/512k", \ "1M":"https://downloadfile.self-test.geedge.net/1M", \ "2M":"https://downloadfile.self-test.geedge.net/2M", \ "4M":"https://downloadfile.self-test.geedge.net/4M", \ "8M":"https://downloadfile.self-test.geedge.net/8M", \ "16M":"https://downloadfile.self-test.geedge.net/16M", \ "32M":"https://downloadfile.self-test.geedge.net/32M", \ "64M":"https://downloadfile.self-test.geedge.net/64M"} ssl_bypass_info_re = "ssl bypass ok" ssl_intercept_info_re = "ssl intercept ok" https_exprired_info_re = "https exprired ok" https_wrong_host_info_re = "https wrong host ok" https_self_signed_info_re = "https self signed ok" https_untrusted_root_info_re = "https untrusted_root ok" https_revoked_info_re = "https revoked ok" https_pinning_test_info_re = "https pinning-test ok" http_redirect_info_re = "http connection redirect success" http_replace_info_re = "http connection replace success" http_insert_info_re = "http connection insert success" http_hijack_info_re = "http connection hijack success" http_block_info_re = "http connection block success" https_download_file_info_re = "http download file success" wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131'] class SSLCheckRequestBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def ssl_bypass(self): self.conn.setopt(self.conn.URL,URLBypass) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: get Certificate info error") if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): raise Exception(ssl_bypass_info_re) elif re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception("Error:Ssl link is intercepted") else: raise Exception("Error:Got other error certificate information") def ssl_intercept(self): self.conn.setopt(self.conn.URL,URLIntercept) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: get Certificate info error") if re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0): raise Exception(ssl_intercept_info_re) elif re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): raise Exception("Error:Ssl link is intercepted") else: raise Exception("Error:Got other error certificate information") class SslInterceptRequestBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def ssl_intercept_certerrExpired(self): self.conn.setopt(self.conn.URL, URLSexpired) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: get Certificate info error") if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception(https_exprired_info_re) else: raise Exception("Fail:ssl intercept cert is trust") else: raise Exception("Error:Got other error certificate information") def ssl_intercept_certerrWrong_host(self): self.conn.setopt(self.conn.URL,URLSwronghost) self.conn.setopt(self.conn.SSL_VERIFYHOST, False) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: get Certificate info error") if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception(https_wrong_host_info_re ) else: raise Exception("Fail:ssl intercept cert is trust") else: raise Exception("Error:Got other error certificate information") def ssl_intercept_certerrSelf_signed(self): self.conn.setopt(self.conn.URL,URLSselfsigned) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: get Certificate info error") if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception(https_self_signed_info_re) else: raise Exception("Fail:ssl intercept cert is trust") else: raise Exception("Error:Got other error certificate information") def ssl_intercept_certerrUntrusted_root(self): self.conn.setopt(self.conn.URL,URLSuntrustedroot) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: get Certificate info error") if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception(https_untrusted_root_info_re) else: raise Exception("Fail:ssl intercept cert is trust") else: raise Exception("Error:Got other error certificate information") def ssl_intercept_certerrRevoked(self): self.conn.setopt(self.conn.URL,URLSrevoked) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: get Certificate info error") if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception(https_revoked_info_re) else: raise Exception("Fail:ssl intercept cert is trust") else: raise Exception("Error:Got other error certificate information") def ssl_intercept_certerrPinning_test(self): self.conn.setopt(self.conn.URL,URLSpinningtest) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: get Certificate info error") if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception(https_pinning_test_info_re) else: raise Exception("Fail:ssl intercept cert is trust") else: raise Exception("Error:Got other error certificate information") class SslHttpRequestBuild: def __init__(self): self.bodyBuf = BytesIO() self.conn = pycurl.Curl() self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.ENCODING, "gzip,deflate") self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve) def http_redirect(self): self.conn.setopt(self.conn.URL, URLRedirect) self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.perform() rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.close() if rescode == 301 or rescode == 302: raise Exception(http_redirect_info_re) else: raise Exception("Error:Http connection redirect fail") def http_replace(self): self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) self.conn.setopt(self.conn.URL, URLReplace) resCode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.perform() body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() if not re.search(r'EnglishSearchShared', body, 0) and \ re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0): raise Exception(http_replace_info_re) else: raise Exception("Error:Http connection replace fail") def http_insert(self): self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) self.conn.setopt(self.conn.URL, URLInsert) resCode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.perform() body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() if re.search(r'httpSelfcheckInsert', body, 0) and \ re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0): raise Exception(http_insert_info_re) else: raise Exception("Error:Http connection insert fail") def http_block(self): self.conn.setopt(self.conn.URL, URLBlock) self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) self.conn.perform() rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451): raise Exception(http_block_info_re) else: raise Exception("Error:http connection block fail") def http_hijack(self): self.conn.setopt(self.conn.URL, URLHijack) self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) self.conn.perform() self.conn.close() if os.path.exists("/root/http_hijack.out"): os.remove("/root/http_hijack.out") cmdtodo = 'curl %s -k -s --resolve cn.bing.com:443:192.0.2.131 -o /root/http_hijack.out' % URLHijack optdl = os.popen(cmdtodo) if len(optdl.read()): optdl.close() raise Exception("Error:http_hijack download file fail") optdl.close() if not os.path.exists("/root/http_hijack.out"): raise Exception("Error:http_hijack download file fail") optmd5 = os.popen("md5sum /root/http_hijack.out") if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", optmd5.read(), 0): optmd5.close() raise Exception(http_hijack_info_re) else: optmd5.close() raise Exception("Error:http connection hijack fail") class SSLFileDownloadBuild: def __init__(self): self.sizeList = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"] self.resultList = [] self.isException = False def build_conninfo_json(self,conn): dictconninfo = {} dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME) dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME) dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME) dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE) dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE) dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) return dictconninfo def get_value_from_succ_conn(self,urlkey,url,conn): dictinfo = {} dictinfo["downloadsize"] = urlkey dictinfo["url"] = url dictinfo["time"] = time.asctime( time.localtime(time.time())) dictinfo["result"] = self.build_conninfo_json(conn) self.resultList.append(dictinfo) def conn_filedownload(self,urlkey,url): issuer = () conn = pycurl.Curl() errdict = {} conn.setopt(conn.WRITEFUNCTION, BytesIO().write) conn.setopt(conn.SSL_VERIFYPEER, False) conn.setopt(conn.OPT_CERTINFO, 1) conn.setopt(conn.URL,url) conn.perform() certs = conn.getinfo(conn.INFO_CERTINFO) for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: errdict["status"] = "error" errdict["errinfo"] = "Get certificate info error" errdict["url"] = url errdict["time"] = time.asctime( time.localtime(time.time())) self.resultList.append(errdict) self.isException = True elif not re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0): errdict["status"] = "error" errdict["errinfo"] = "Intercept fail: no Tango cert" errdict["url"] = url errdict["time"] = time.asctime( time.localtime(time.time())) self.resultList.append(errdict) self.isException = True else: self.get_value_from_succ_conn(urlkey,url,conn) conn.close() def write_log(self): logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime()) logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest" with open(logNewestPath,"w+") as f: f.write(json.dumps(self.resultList, sort_keys=True, indent=4, separators=(',', ': '))) f.close() with open(logpath,"a+") as f: f.write(json.dumps(self.resultList)) f.write("\n") f.close() def downfile_run(self): for sizefield in self.sizeList: self.conn_filedownload(sizefield,URLdictConTrafficInject[sizefield]) self.write_log() if self.isException == True: raise Exception("Error:http_hijack download file fail") else: raise Exception(https_download_file_info_re) class SslUnitTest(unittest.TestCase): def test_securityPolicy_bypass(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_bypass_info_re): sslHandler.ssl_bypass() def test_securityPolicy_intercept(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_intercept_info_re): sslHandler.ssl_intercept() def test_securityPolicy_intercept_certerrExpired(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_exprired_info_re): requestHandler.ssl_intercept_certerrExpired() def test_securityPolicy_intercept_certerrWrong_host(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_wrong_host_info_re): requestHandler.ssl_intercept_certerrWrong_host() def test_securityPolicy_intercept_certerrSelf_signed(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_self_signed_info_re): requestHandler.ssl_intercept_certerrSelf_signed() def test_securityPolicy_intercept_certerrUntrusted_root(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_untrusted_root_info_re): requestHandler.ssl_intercept_certerrUntrusted_root() def test_securityPolicy_intercept_certerrRevoked(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_revoked_info_re): requestHandler.ssl_intercept_certerrRevoked() def test_securityPolicy_intercept_certerrPinning_test(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_pinning_test_info_re): requestHandler.ssl_intercept_certerrPinning_test() def test_proxyPolicy_redirect(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_redirect_info_re): httpHandler.http_redirect() def test_proxyPolicy_block(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_block_info_re): httpHandler.http_block() def test_proxyPolicy_replace(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_replace_info_re): httpHandler.http_replace() def test_proxyPolicy_hijack(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_hijack_info_re): httpHandler.http_hijack() def test_proxyPolicy_insert(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_insert_info_re): httpHandler.http_insert() def test_securityPolicy_con_traffic_inject(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_download_file_info_re): requestHandler.downfile_run() if __name__ == '__main__': suite = unittest.TestSuite() suite._cleanup = False suite.addTest(SslUnitTest('test_securityPolicy_bypass')) suite.addTest(SslUnitTest('test_securityPolicy_intercept')) suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired')) suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host')) suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed')) suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root')) suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked')) suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test')) suite.addTest(SslUnitTest('test_proxyPolicy_redirect')) suite.addTest(SslUnitTest('test_proxyPolicy_block')) suite.addTest(SslUnitTest('test_proxyPolicy_replace')) suite.addTest(SslUnitTest('test_proxyPolicy_hijack')) suite.addTest(SslUnitTest('test_proxyPolicy_insert')) suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject')) while True: try: logpath = "/root/result_self_test/unittest/unittest_result.log." + time.strftime("%Y-%m-%d",time.localtime()) lognewestpath = "/root/result_self_test/unittest/unittest_result.log.newest" with open(lognewestpath,"w+") as f: runner = unittest.TextTestRunner(stream=f,verbosity=2) f.write("\n" + "#"*50 + "Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "#"*50 + "\n") runner.run(suite) f.write("\n" + "="*50 + "Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "="*50 + "\n") f.close() with open(logpath,"a+") as f: fn = open(lognewestpath,'r') f.write(fn.read()) fn.close() f.close() time.sleep(1) except: print("Exception:an exception occurred during the execution of the program:unittest",file=sys.stderr)