diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index 616dc62..2345a93 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -82,7 +82,6 @@ services: update-ca-certificates cat /root/unittest/badssl.test.hosts >> /etc/hosts python /root/unittest/unittest_self.py & - python /root/unittest/con_traffic_inject.py & tail -f /dev/null diff --git a/unittest_python/unittest/unittest_self.py b/unittest_python/unittest/unittest_self.py index 2ba3978..11d232d 100644 --- a/unittest_python/unittest/unittest_self.py +++ b/unittest_python/unittest/unittest_self.py @@ -23,6 +23,26 @@ URLInsert = 'https://cn.bing.com/?FORM=BEHPTB' URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js' URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js' +URLdictConTrafficInject = { + "0k":"https://downloadfile.self-test.geedge.net/0k", \ + "1k":"https://downloadfile.self-test.geedge.net/1k", \ + "2k":"https://downloadfile.self-test.geedge.net/2k", \ + "4k":"https://downloadfile.self-test.geedge.net/4k", \ + "8k":"https://downloadfile.self-test.geedge.net/8k", \ + "16k":"https://downloadfile.self-test.geedge.net/16k", \ + "32k":"https://downloadfile.self-test.geedge.net/32k", \ + "64k":"https://downloadfile.self-test.geedge.net/64k", \ + "128k":"https://downloadfile.self-test.geedge.net/128k", \ + "256k":"https://downloadfile.self-test.geedge.net/256k", \ + "512k":"https://downloadfile.self-test.geedge.net/512k", \ + "1M":"https://downloadfile.self-test.geedge.net/1M", \ + "2M":"https://downloadfile.self-test.geedge.net/2M", \ + "4M":"https://downloadfile.self-test.geedge.net/4M", \ + "8M":"https://downloadfile.self-test.geedge.net/8M", \ + "16M":"https://downloadfile.self-test.geedge.net/16M", \ + "32M":"https://downloadfile.self-test.geedge.net/32M", \ + "64M":"https://downloadfile.self-test.geedge.net/64M"} + ssl_bypass_info_re = "ssl bypass ok" ssl_intercept_info_re = "ssl intercept ok" @@ -38,6 +58,8 @@ http_insert_info_re = "http connection insert success" http_hijack_info_re = "http connection hijack success" http_block_info_re = "http connection block success" +https_download_file_info_re = "http download file success" + wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131'] class SSLCheckRequestBuild: @@ -303,6 +325,93 @@ class SslHttpRequestBuild: raise Exception("Error:http connection hijack fail") +class SSLFileDownloadBuild: + def __init__(self): + self.sizeList = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"] + self.resultList = [] + self.isException = False + + def build_conninfo_json(self,conn): + dictconninfo = {} + dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) + dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) + dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME) + dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME) + dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME) + dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) + dictconninfo["redirect_count"] = conn.getinfo(pycurl.REDIRECT_COUNT) + dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) + dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) + dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE) + dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE) + dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) + dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) + dictconninfo["time_connect"] = conn.getinfo(pycurl.CONNECT_TIME) + dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) + return dictconninfo + + def get_value_from_succ_conn(self,urlkey,url,conn): + dictinfo = {} + dictinfo["downloadsize"] = urlkey + dictinfo["url"] = url + dictinfo["time"] = time.asctime( time.localtime(time.time())) + dictinfo["result"] = self.build_conninfo_json(conn) + self.resultList.append(dictinfo) + + def conn_filedownload(self,urlkey,url): + issuer = () + conn = pycurl.Curl() + errdict = {} + conn.setopt(conn.WRITEFUNCTION, BytesIO().write) + conn.setopt(conn.SSL_VERIFYPEER, False) + conn.setopt(conn.OPT_CERTINFO, 1) + conn.setopt(conn.URL,url) + conn.perform() + certs = conn.getinfo(conn.INFO_CERTINFO) + for cert_info in certs[0]: + if cert_info[0].lower() == "issuer": + issuer = cert_info + break + if len(issuer) <= 0: + errdict["status"] = "error" + errdict["errinfo"] = "Get certificate info error" + errdict["url"] = url + errdict["time"] = time.asctime( time.localtime(time.time())) + self.resultList.append(errdict) + self.isException = True + elif not re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0): + errdict["status"] = "error" + errdict["errinfo"] = "Intercept fail: no Tango cert" + errdict["url"] = url + errdict["time"] = time.asctime( time.localtime(time.time())) + self.resultList.append(errdict) + self.isException = True + else: + self.get_value_from_succ_conn(urlkey,url,conn) + conn.close() + + def write_log(self): + logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime()) + logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest" + with open(logNewestPath,"w+") as f: + f.write(json.dumps(self.resultList, sort_keys=True, indent=4, separators=(',', ': '))) + f.close() + with open(logpath,"a+") as f: + f.write(json.dumps(self.resultList)) + f.write("\n") + f.close() + + def downfile_run(self): + for sizefield in self.sizeList: + self.conn_filedownload(sizefield,URLdictConTrafficInject[sizefield]) + self.write_log() + if self.isException == True: + raise Exception("Error:http_hijack download file fail") + else: + raise Exception(https_download_file_info_re) + + + class SslUnitTest(unittest.TestCase): def test_securityPolicy_bypass(self): @@ -370,6 +479,12 @@ class SslUnitTest(unittest.TestCase): with self.assertRaisesRegex(Exception, http_insert_info_re): httpHandler.http_insert() + def test_securityPolicy_con_traffic_inject(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_download_file_info_re): + requestHandler.downfile_run() + + if __name__ == '__main__': suite = unittest.TestSuite() suite._cleanup = False @@ -386,6 +501,7 @@ if __name__ == '__main__': suite.addTest(SslUnitTest('test_proxyPolicy_replace')) suite.addTest(SslUnitTest('test_proxyPolicy_hijack')) suite.addTest(SslUnitTest('test_proxyPolicy_insert')) + suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject')) while True: try: logpath = "/root/result_self_test/unittest/unittest_result.log." + time.strftime("%Y-%m-%d",time.localtime())