1、增加 cn.bing.com 到host 文件 2、增加处理http proxy 策略

This commit is contained in:
fumingwei
2020-09-11 18:52:23 +08:00
parent 2642f4ac56
commit db818d7ea4
2 changed files with 189 additions and 121 deletions

View File

@@ -10,20 +10,25 @@ import getopt
import ciunittest
import argparse
from telegraf.client import TelegrafClient
import hashlib
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
URLIntercept = 'https://sha256.badssl.self-test.geedge.net'
URLSexpired = 'https://expired.badssl.self-test.geedge.net'
URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net'
URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net'
URLSslRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLSslReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLSslInsert = 'https://cn.bing.com/?FORM=BEHPTB'
URLSslHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLSslBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
URLIntercept = 'https://sha256.badssl.self-test.geedge.net'
URLSexpired = 'https://expired.badssl.self-test.geedge.net'
URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net'
URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net'
URLRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLInsert = 'https://cn.bing.com/?FORM=BEHPTB'
URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLHttpRedirect = 'http://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLHttpReplace = 'http://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLHttpInsert = 'http://cn.bing.com/?FORM=BEHPTB'
URLHttpHijack = 'http://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLHttpBlock = 'http://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLConTraffic_1k = "https://downloadfile.self-test.geedge.net/1k"
URLConTraffic_4k = "https://downloadfile.self-test.geedge.net/4k"
@@ -40,16 +45,21 @@ URLConTraffic_64M = "https://downloadfile.self-test.geedge.net/64M"
ssl_bypass_info_re = "Ssl connection bypass success"
ssl_intercept_info_re = "Ssl connection intercept success"
https_exprired_info_re = "https exprired ok"
https_self_signed_info_re = "https self signed ok"
https_untrusted_root_info_re = "https untrusted_root ok"
https_exprired_info_re = "Ssl exprired cert check success"
https_self_signed_info_re = "Ssl self signed cert check success"
https_untrusted_root_info_re = "Ssl untrusted_root cert check success"
ssl_redirect_info_re = "Ssl connection redirect success"
ssl_replace_info_re = "Ssl connection replace success"
ssl_insert_info_re = "Ssl connection insert success"
ssl_hijack_info_re = "Ssl connection hijack success"
ssl_block_info_re = "Ssl connection block success"
http_redirect_info_re = "http connection redirect success"
http_replace_info_re = "http connection replace success"
http_insert_info_re = "http connection insert success"
http_hijack_info_re = "http connection hijack success"
http_block_info_re = "http connection block success"
https_conn_taffic_1k_re = 'https download file 1k success'
https_conn_taffic_4k_re = 'https download file 4k success'
https_conn_taffic_16k_re = 'https download file 16k success'
@@ -61,7 +71,6 @@ https_conn_taffic_16M_re = 'https download file 16M success'
https_conn_taffic_64M_re = 'https download file 64M success'
wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131']
class SSLCheckRequestBuild:
def __init__(self):
@@ -155,116 +164,144 @@ class SslInterceptRequestBuild:
class SslHttpRequestBuild:
class ProxyRequestBuild:
def __init__(self):
self.bodyBuf = BytesIO()
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve)
def _cert_verify(self, pxy_action_info_re,certs):
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception( pxy_info_re)
def _cert_verify(self, pxy_action_info_re,certs , isSsl):
if isSsl == True:
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
return
else:
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
else:
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
def http_redirect(self,conTimeout):
self.conn.setopt(self.conn.URL, URLRedirect)
def _set_conn_opt(self,isSsl):
if isSsl == True:
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def proxy_redirect(self,url, proxy_info_re, isSsl, conTimeout):
self.conn.setopt(self.conn.URL, url)
self.conn.setopt(self.conn.TIMEOUT, conTimeout)
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self._set_conn_opt(isSsl)
certs = None
#self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
self._cert_verify(http_redirect_info_re,certs)
self._cert_verify(ssl_redirect_info_re,certs, isSsl)
if rescode == 301 or rescode == 302:
raise Exception(http_redirect_info_re)
raise Exception(proxy_info_re)
else:
raise Exception("Error:Http connection redirect fail")
if isSsl == True:
raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode)
def http_replace(self,conTimeout):
def proxy_replace(self,url, proxy_info_re, isSsl, conTimeout):
certs = None
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
self.conn.setopt(self.conn.URL, URLReplace)
self.conn.setopt(self.conn.URL, url)
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self._set_conn_opt(isSsl)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
body = self.bodyBuf.getvalue().decode('utf-8')
print(url)
print(proxy_info_re)
print(isSsl)
self.conn.close()
self._cert_verify(http_replace_info_re,certs)
print (body)
self._cert_verify(ssl_replace_info_re,certs, isSsl)
if not re.search(r'EnglishSearchShared', body, 0) and \
re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0):
raise Exception(http_replace_info_re)
raise Exception(proxy_info_re)
else:
raise Exception("Error:Http connection replace fail")
if isSsl == True:
raise Exception("Error:Ssl connection replace fail")
else:
raise Exception("Error:Http connection replace fail")
def http_insert(self,conTimeout):
def proxy_insert(self,url, proxy_info_re, isSsl, conTimeout):
certs = None
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
self.conn.setopt(self.conn.URL, URLInsert)
self.conn.setopt(self.conn.URL, url)
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self._set_conn_opt(isSsl)
self.conn.perform()
body = self.bodyBuf.getvalue().decode('utf-8')
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
self._cert_verify(http_insert_info_re,certs)
self._cert_verify(ssl_insert_info_re,certs, isSsl)
if re.search(r'httpSelfcheckInsert', body, 0) and \
re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0):
raise Exception(http_insert_info_re)
raise Exception(proxy_info_re)
else:
raise Exception("Error:Http connection insert fail")
if isSsl == True:
raise Exception("Error:Ssl connection insert fail")
else:
raise Exception("Error:Http connection insert fail")
def http_block(self,conTimeout):
self.conn.setopt(self.conn.URL, URLBlock)
def proxy_block(self,url, proxy_info_re, isSsl, conTimeout):
certs = None
self.conn.setopt(self.conn.URL, url)
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write)
self._set_conn_opt(isSsl)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close()
self._cert_verify(http_block_info_re,certs)
self._cert_verify(ssl_block_info_re,certs, isSsl)
if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451):
raise Exception(http_block_info_re)
raise Exception(proxy_info_re)
else:
raise Exception("Error:http connection block fail")
if isSsl == True:
raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode)
def http_hijack(self,conTimeout):
def proxy_hijack(self,url, proxy_info_re, isSsl, conTimeout):
certs = None
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.setopt(self.conn.URL, URLHijack)
self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write)
self.conn.setopt(self.conn.URL, url)
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, 8 * 1024 * 1024)
self._set_conn_opt(isSsl)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
self._cert_verify(http_hijack_info_re,certs)
if os.path.exists("/root/http_hijack.out"):
os.remove("/root/http_hijack.out")
cmdtodo = 'curl %s -k -s --resolve cn.bing.com:443:192.0.2.131 -o /root/http_hijack.out' % URLHijack
optdl = os.popen(cmdtodo)
if len(optdl.read()):
optdl.close()
raise Exception("Error:http_hijack download file fail")
optdl.close()
if not os.path.exists("/root/http_hijack.out"):
raise Exception("Error:http_hijack download file fail")
optmd5 = os.popen("md5sum /root/http_hijack.out")
if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", optmd5.read(), 0):
optmd5.close()
raise Exception(http_hijack_info_re)
self._cert_verify(ssl_hijack_info_re,certs, isSsl)
hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue())
if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0):
raise Exception(proxy_info_re)
else:
optmd5.close()
raise Exception("Error:http connection hijack fail")
if isSsl == True:
raise Exception("Error:Ssl connection hijack fail")
else:
raise Exception("Error:Http connection hijack fail")
class SSLFileDownloadBuild:
@@ -274,7 +311,8 @@ class SSLFileDownloadBuild:
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, 8 * 1024 * 1024)
def _get_conninfo(self,conn):
dictconninfo = {}
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
@@ -366,30 +404,58 @@ class SslUnitTest(unittest.TestCase):
with self.assertRaisesRegex(Exception, https_untrusted_root_info_re):
requestHandler.ssl_intercept_certerrUntrusted_root(1)
def test_proxyPolicy_redirect(self):
httpHandler = SslHttpRequestBuild()
def test_proxyPolicy_ssl_redirect(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_redirect_info_re):
proxyHandler.proxy_redirect(URLSslRedirect ,ssl_redirect_info_re, True,2)
def test_proxyPolicy_ssl_block(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_block_info_re):
proxyHandler.proxy_block(URLSslBlock,ssl_block_info_re, True, 2)
def test_proxyPolicy_ssl_replace(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_replace_info_re):
proxyHandler.proxy_replace(URLSslReplace,ssl_replace_info_re, True,2)
def test_proxyPolicy_ssl_hijack(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_hijack_info_re):
proxyHandler.proxy_hijack(URLSslHijack,ssl_hijack_info_re, True,2)
def test_proxyPolicy_ssl_insert(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_insert_info_re):
proxyHandler.proxy_insert(URLSslInsert,ssl_insert_info_re,True,2)
def test_proxyPolicy_http_redirect(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_redirect_info_re):
httpHandler.http_redirect(2)
proxyHandler.proxy_redirect(URLHttpRedirect,http_redirect_info_re, False,2)
def test_proxyPolicy_block(self):
httpHandler = SslHttpRequestBuild()
def test_proxyPolicy_http_block(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_block_info_re):
httpHandler.http_block(2)
proxyHandler.proxy_block(URLHttpBlock,http_block_info_re, False,2)
def test_proxyPolicy_replace(self):
httpHandler = SslHttpRequestBuild()
def test_proxyPolicy_http_replace(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_replace_info_re):
httpHandler.http_replace(2)
proxyHandler.proxy_replace(URLHttpReplace,http_replace_info_re, False,2)
def test_proxyPolicy_hijack(self):
httpHandler = SslHttpRequestBuild()
def test_proxyPolicy_http_hijack(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_hijack_info_re):
httpHandler.http_hijack(2)
proxyHandler.proxy_hijack(URLHttpHijack,http_hijack_info_re, False,2)
def test_proxyPolicy_insert(self):
httpHandler = SslHttpRequestBuild()
def test_proxyPolicy_http_insert(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_insert_info_re):
httpHandler.http_insert(2)
proxyHandler.proxy_insert(URLHttpInsert,http_insert_info_re,False, 2)
def test_https_con_traffic_1k(self):
requestHandler = SSLFileDownloadBuild()
@@ -469,31 +535,32 @@ class TsgDiagnoseRun:
sys.exit(1)
def _init_suite(self):
if self.format == 'txt':
self.suite = unittest.TestSuite()
self.suite._cleanup = False
self.suite.addTest(SslUnitTest('test_securityPolicy_bypass'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_redirect'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_block'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_replace'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_hijack'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_insert'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_1k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_4k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_16k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_64k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_256k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_1M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_4M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_16M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_64M'))
if self.format == 'json':
self.suite = None
self.suite = unittest.TestSuite()
self.suite._cleanup = False
self.suite.addTest(SslUnitTest('test_securityPolicy_bypass'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_redirect'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_block'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_replace'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_hijack'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_insert'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_http_redirect'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_http_block'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_http_replace'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_http_hijack'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_http_insert'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_1k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_4k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_16k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_64k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_256k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_1M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_4M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_16M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_64M'))
def _write_suite_result_into_file(self):
resultDict = '/root/result_tsg_diagnose/unittest/'