@@ -11,44 +11,17 @@ import ciunittest
import argparse
from telegraf . client import TelegrafClient
import hashlib
# from configparser import ConfigParser
URLBypass = ' https://sha384.badssl.self-test.geedge.net '
URLIntercept = ' https://sha256.badssl.self-test.geedge.net '
URLSexpired = ' https://expired.badssl.self-test.geedge.net '
URLSselfsigned = ' https://self-signed.badssl.self-test.geedge.net '
URLSuntrustedroot = ' https://untrusted-root.badssl.self-test.geedge.net '
URLSslRedirect = ' https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js '
URLSslReplace = ' https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js '
URLSslInsert = ' https://cn.bing.com/?FORM=BEHPTB '
URLSslHijack = ' https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js '
URLSslBlock = ' https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js '
URLHttpRedirect = ' http://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js '
URLHttpReplace = ' http://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js '
URLHttpInsert = ' http://cn.bing.com/?FORM=BEHPTB '
URLHttpHijack = ' http://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js '
URLHttpBlock = ' http://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js '
URLConTraffic_1k = " https://downloadfile.self-test.geedge.net/1k "
URLConTraffic_4k = " https://downloadfile.self-test.geedge.net/4k "
URLConTraffic_16k = " https://downloadfile.self-test.geedge.net/16k "
URLConTraffic_64k = " https://downloadfile.self-test.geedge.net/64k "
URLConTraffic_256k = " https://downloadfile.self-test.geedge.net/256k "
URLConTraffic_1M = " https://downloadfile.self-test.geedge.net/1M "
URLConTraffic_4M = " https://downloadfile.self-test.geedge.net/4M "
URLConTraffic_16M = " https://downloadfile.self-test.geedge.net/16M "
URLConTraffic_64M = " https://downloadfile.self-test.geedge.net/64M "
from configparser import ConfigParser
import random
suite_test_config_dict = { }
ssl_bypass_info_re = " Ssl connection bypass success "
ssl_intercept_info_re = " Ssl connection intercept success "
https_exprired_info_re = " Ssl exprired cert check success "
https_self_signed_info_re = " Ssl self signed cert check success "
https_untrusted_root_info_re = " Ssl untrusted_root cert check success "
ssl_redirect_info_re = " Ssl connection redirect success "
ssl_replace_info_re = " Ssl connection replace success "
ssl_insert_info_re = " Ssl connection insert success "
@@ -72,7 +45,6 @@ https_conn_taffic_16M_re = 'https download file 16M success'
https_conn_taffic_64M_re = ' https download file 64M success '
class SSLCheckRequestBuild :
def __init__ ( self ) :
self . conn = pycurl . Curl ( )
@@ -80,12 +52,12 @@ class SSLCheckRequestBuild:
self . conn . setopt ( self . conn . OPT_CERTINFO , 1 )
self . conn . setopt ( self . conn . SSL_VERIFYPEER , False )
def ssl_bypass ( self , conTimeout ) :
self . conn . setopt ( self . conn . URL , URLBypass )
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . conn . perform ( )
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
self . conn . close ( )
def _set_conn_opt ( self , test_suite_name ) :
self . conn . setopt ( self . conn . MAX_RECV_SPEED_LARGE , int ( suite_test_config_dict [ test_suite_name ] [ ' max_recv_speed_large ' ] ) )
self . conn . setopt ( self . conn . URL , str ( suite_test_config_dict [ test_suite_name ] [ ' url ' ] ) )
self . conn . setopt ( self . conn . TIMEOUT , int ( suite_test_config_dict [ test_suite_name ] [ ' conn_timeout ' ] ) )
def _get_conn_issuer ( self , certs ) :
issuer = ( )
for cert_info in certs [ 0 ] :
if cert_info [ 0 ] == " Issuer " :
@@ -93,6 +65,15 @@ class SSLCheckRequestBuild:
break
if len ( issuer ) < = 0 :
raise Exception ( " Error: Get certificate info error, certificate ' s length is %s " % len ( issuer ) )
return issuer
def ssl_bypass ( self , test_suite_name ) :
self . _set_conn_opt ( test_suite_name )
self . conn . perform ( )
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
self . conn . close ( )
issuer = self . _get_conn_issuer ( certs )
if re . search ( r ' \ bCN[ \ s]*=[ \ s]*BadSSL \ b ' , issuer [ 1 ] , 0 ) :
raise Exception ( ssl_bypass_info_re )
elif re . search ( r ' \ bCN[ \ s]*=[ \ s]*Tango[ \ s]*Secure[ \ s]*Gateway[ \ s]*CA[ \ s \ S]* \ b ' , issuer [ 1 ] , 0 ) :
@@ -100,19 +81,12 @@ class SSLCheckRequestBuild:
else :
raise Exception ( " Error:Got other error certificate information, cert info: %s " % issuer [ 1 ] )
def ssl_intercept ( self , conTimeout ) :
self . conn . setopt ( self . conn . URL , URLIntercept )
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
def ssl_intercept ( self , test_suite_name ) :
self . _set_conn_opt ( test_suite_name )
self . conn . perform ( )
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
self . conn . close ( )
issuer = ( )
for cert_info in certs [ 0 ] :
if cert_info [ 0 ] . lower ( ) == " issuer " :
issuer = cert_info
break
if len ( issuer ) < = 0 :
raise Exception ( " Error: Get certificate info error, certificate ' s length is %s " % len ( issuer ) )
issuer = self . _get_conn_issuer ( certs )
if re . search ( r ' \ bCN[ \ s]*=[ \ s]*Tango \ b ' , issuer [ 1 ] , 0 ) :
if re . search ( r ' \ bCN = Tango[ \ s \ S]*UNTRUST \ b ' , issuer [ 1 ] , 0 ) :
raise Exception ( " Error: Ssl connection is intercept, cert is untrust, cert info: %s " % issuer [ 1 ] )
@@ -129,7 +103,12 @@ class SslInterceptRequestBuild:
self . conn . setopt ( self . conn . OPT_CERTINFO , 1 )
self . conn . setopt ( self . conn . SSL_VERIFYPEER , False )
def _conn_to_perform ( self , pxy_info_r e ) :
def _set_ conn_opt ( self , test_suite_nam e) :
self . conn . setopt ( self . conn . MAX_RECV_SPEED_LARGE , int ( suite_test_config_dict [ test_suite_name ] [ ' max_recv_speed_large ' ] ) )
self . conn . setopt ( self . conn . URL , str ( suite_test_config_dict [ test_suite_name ] [ ' url ' ] ) )
self . conn . setopt ( self . conn . TIMEOUT , int ( suite_test_config_dict [ test_suite_name ] [ ' conn_timeout ' ] ) )
def _conn_to_perform ( self , test_suite_name , sec_info_re ) :
self . conn . perform ( )
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
self . conn . close ( )
@@ -142,27 +121,23 @@ class SslInterceptRequestBuild:
raise Exception ( " Error: Get certificate info error, certificate ' s length is %s " % len ( issuer ) )
if re . search ( r ' \ bCN[ \ s]*=[ \ s]*Tango \ b ' , issuer [ 1 ] , 0 ) :
if re . search ( r ' \ bCN = Tango[ \ s \ S]*UNTRUST \ b ' , issuer [ 1 ] , 0 ) :
raise Exception ( pxy _info_re )
raise Exception ( sec _info_re)
else :
raise Exception ( " Error: Ssl connection is intercept, cert maybe trust, cert info: %s " % issuer [ 1 ] )
else :
raise Exception ( " Error: Got other error certificate information, cert info: %s " % issuer [ 1 ] )
def ssl_intercept_certerrExpired ( self , conTimeout ) :
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . conn. setopt ( self . conn . URL , URLS expired )
self . _conn_to_perform ( https_exprired_info_re )
def ssl_intercept_certerrExpired ( self , test_suite_name ) :
self . _set_conn_opt ( test_suite_name )
self . _ conn_to_perform ( test_suite_name , https_ expr ired_info_re )
def ssl_intercept_certerrSelf_signed ( self , conTimeout ) :
self . conn . setopt ( self . conn . URL , URLSselfsigned )
self . conn. setopt ( self . conn . TIMEOUT , conTimeout )
self . _conn_to_perform ( https_self_signed_info_re )
def ssl_intercept_certerrUntrusted_root ( self , conTimeout ) :
self . conn . setopt ( self . conn . URL , URLSuntrustedroot )
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . _conn_to_perform ( https_untrusted_root_info_re )
def ssl_intercept_certerrSelf_signed ( self , test_suite_name ) :
self . _set_conn_opt ( test_suite_name )
self . _ conn_to_perform ( test_suite_name , https_self_signed_info_re )
def ssl_intercept_certerrUntrusted_root ( self , test_suite_name , ) :
self . _set_conn_opt ( test_suite_name )
self . _conn_to_perform ( test_suite_name , https_untrusted_root_info_re )
class ProxyRequestBuild :
@@ -171,7 +146,7 @@ class ProxyRequestBuild:
self . conn = pycurl . Curl ( )
self . conn . setopt ( self . conn . ENCODING , " gzip,deflate " )
def _cert_verify ( self , pxy_action_info_re , certs , isSsl ) :
def _cert_verify ( self , certs , isSsl ) :
if isSsl == True :
issuer = ( )
for cert_info in certs [ 0 ] :
@@ -189,109 +164,112 @@ class ProxyRequestBuild:
raise Exception ( " Error: Got other error certificate information, cert info: %s " % issuer [ 1 ] )
def _set_conn_opt ( self , isSsl ) :
def _set_conn_opt ( self , test_suite_name , isSsl) :
self . conn . setopt ( self . conn . MAX_RECV_SPEED_LARGE , int ( suite_test_config_dict [ test_suite_name ] [ ' max_recv_speed_large ' ] ) )
self . conn . setopt ( self . conn . URL , str ( suite_test_config_dict [ test_suite_name ] [ ' url ' ] ) )
self . conn . setopt ( self . conn . TIMEOUT , int ( suite_test_config_dict [ test_suite_name ] [ ' conn_timeout ' ] ) )
self . conn . setopt ( self . conn . WRITEDATA , self . bodyBuf )
if isSsl == True :
self . conn . setopt ( self . conn . OPT_CERTINFO , 1 )
self . conn . setopt ( self . conn . SSL_VERIFYPEER , False )
def proxy_redirect ( self , url , proxy_info_re , isSsl , conTimeout ) :
self . conn . setopt ( self . conn . URL , url )
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . _set_conn_opt ( isSsl )
def proxy_redirect ( self , test_suite_name , isSsl ) :
certs = None
# self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write )
self. _set_conn_opt ( test_suite_name , isSsl )
self . conn . perform ( )
if isSsl == True :
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
rescode = self . conn . getinfo ( self . conn . RESPONSE_CODE )
self . conn . close ( )
self . _cert_verify ( ssl_redirect_info_re , certs, isSsl )
self . _cert_verify ( certs , isSsl )
if rescode == 301 or rescode == 302 :
raise Exception ( proxy_info_r e )
if isSsl == Tru e:
raise Exception ( ssl_redirect_info_re )
else :
raise Exception ( http_redirect_info_re )
else :
if isSsl == True :
raise Exception ( " Error:Ssl connection redirect fail, RESPONSE_CODE = %d " % rescode )
else :
raise Exception ( " Error:Http Connection redirect fail,RESPONSE_CODE = %d " % rescode )
def proxy_replace ( self , url , proxy_info_r e, isSsl , conTimeout ):
def proxy_replace ( self , test_suite_nam e, isSsl ) :
certs = None
self . conn . setopt ( self . conn . WRITEDATA , self . bodyBuf )
self . conn . setopt ( self . conn . URL , url )
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . _set_conn_opt ( isSsl )
self . _set_conn_opt ( test_suite_name , isSsl )
self . conn . perform ( )
if isSsl == True :
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
body = self . bodyBuf . getvalue ( ) . decode ( ' utf-8 ' )
self . _cert_verify ( ssl_replace_info_re , certs , isSsl )
self . conn . close ( )
self . _cert_verify ( certs , isSsl )
if not re . search ( r ' EnglishSearchShared ' , body , 0 ) and \
re . search ( r ' 03C174CD9D809789CCEC18D6F585DF3E ' , body , 0 ) :
raise Exception ( proxy_info_r e)
if isSsl == Tru e:
raise Exception ( ssl_replace_info_re )
else :
raise Exception ( http_replace_info_re )
else :
if isSsl == True :
raise Exception ( " Error:Ssl connection replace fail " )
else :
raise Exception ( " Error:Http connection replace fail " )
def proxy_insert ( self , url , proxy_info_r e, isSsl , conTimeout ):
def proxy_insert ( self , test_suite_nam e, isSsl ) :
certs = None
self . conn . setopt ( self . conn . WRITEDATA , self . bodyBuf )
self . conn . setopt ( self . conn . URL , url )
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . _set_conn_opt ( isSsl )
self . _set_conn_opt ( test_suite_name , isSsl )
self . conn . perform ( )
body = self . bodyBuf . getvalue ( ) . decode ( ' utf-8 ' )
if isSsl == True :
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
self . conn . close ( )
self . _cert_verify ( ssl_insert_info_re , certs, isSsl )
self . _cert_verify ( certs , isSsl )
if re . search ( r ' httpSelfcheckInsert ' , body , 0 ) and \
re . search ( r ' 5BE3754D1EA8D51E8D993060FA225330 ' , body , 0 ) :
raise Exception ( proxy_info_r e)
if isSsl == Tru e:
raise Exception ( ssl_insert_info_re )
else :
raise Exception ( http_insert_info_re )
else :
if isSsl == True :
raise Exception ( " Error:Ssl connection insert fail " )
else :
raise Exception ( " Error:Http connection insert fail " )
def proxy_block ( self , url , proxy_info_r e, isSsl , conTimeout ):
def proxy_block ( self , test_suite_nam e, isSsl ) :
certs = None
self . conn . setopt ( self . conn . URL , ur l)
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . conn . setopt ( self . conn . WRITEFUNCTION , self . bodyBuf . write )
self . _set_conn_opt ( isSsl )
self . _set_conn_opt ( test_suite_name , isSs l)
self . conn . perform ( )
if isSsl == True :
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
rescode = self . conn . getinfo ( self . conn . RESPONSE_CODE )
body = self . bodyBuf . getvalue ( ) . decode ( ' utf-8 ' )
self . conn . close ( )
self . _cert_verify ( ssl_block_info_re , certs, isSsl )
self . _cert_verify ( certs , isSsl )
if re . search ( r ' E33F01E50AFE043191931DD40190B09B ' , body , 0 ) and ( rescode == 404 or rescode == 451 ) :
raise Exception ( proxy_info_r e)
if isSsl == Tru e:
raise Exception ( ssl_block_info_re )
else :
raise Exception ( http_block_info_re )
else :
if isSsl == True :
raise Exception ( " Error:Ssl connection block fail, RESPONSE_CODE = %d " % rescode )
else :
raise Exception ( " Error:Http connection block fail, RESPONSE_CODE = %d " % rescode )
def proxy_hijack ( self , url , proxy_info_r e, isSsl , conTimeout ):
def proxy_hijack ( self , test_suite_nam e, isSsl ) :
certs = None
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . conn . setopt ( self . conn . URL , url )
self . conn . setopt ( self . conn . WRITEDATA , self . bodyBuf )
self . conn . setopt ( self . conn . MAX_RECV_SPEED_LARGE , 8 * 1024 * 1024 )
self . _set_conn_opt ( isSsl )
self . _set_conn_opt ( test_suite_name , isSsl )
self . conn . perform ( )
if isSsl == True :
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
self . conn . close ( )
self . _cert_verify ( ssl_hijack_info_re , certs, isSsl )
self . _cert_verify ( certs , isSsl )
hijack_file_md5 = hashlib . md5 ( self . bodyBuf . getvalue ( ) )
if re . search ( " 4bf06db1a228c5c8d978ebf9e1169d0d " , hijack_file_md5 . hexdigest ( ) , 0 ) :
raise Exception ( proxy_info_r e)
if isSsl == Tru e:
raise Exception ( ssl_hijack_info_re )
else :
raise Exception ( http_hijack_info_re )
else :
if isSsl == True :
raise Exception ( " Error:Ssl connection hijack fail " )
@@ -299,15 +277,14 @@ class ProxyRequestBuild:
raise Exception ( " Error:Http connection hijack fail " )
class SSLFileDownloadBuild :
def __init__ ( self ) :
self . conn = pycurl . Curl ( )
self . conn . setopt ( self . conn . WRITEFUNCTION , BytesIO ( ) . write )
self . conn . setopt ( self . conn . SSL_VERIFYPEER , False )
self . conn . setopt ( self . conn . OPT_CERTINFO , 1 )
self. client = TelegrafClient( host = ' 192.51.100.1 ' , port = 8100 , tags = { ' app_name ' : ' tsg-diagnose' } )
self . conn . setopt ( self . conn . MAX_RECV_SPEED_LARGE , 8 * 1024 * 1024 )
# self. client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':' tsg-diagnose'} )
self . client = TelegrafClient ( host = str ( suite_test_config_dict [ ' telegraf ' ] [ ' host ' ] ) , port = int ( suite_test_config_dict [ ' telegraf ' ] [ ' port ' ] ) , tags = { str ( suite_test_config_dict [ ' telegraf ' ] [ ' tags_key ' ] ) : str ( suite_test_config_dict [ ' telegraf ' ] [ ' tags_value ' ] ) } )
def _get_conninfo ( self , conn ) :
dictconninfo = { }
@@ -324,6 +301,11 @@ class SSLFileDownloadBuild:
dictconninfo [ " time_pretransfer " ] = conn . getinfo ( pycurl . PRETRANSFER_TIME )
return dictconninfo
def _set_conn_opt ( self , test_suite_name ) :
self . conn . setopt ( self . conn . MAX_RECV_SPEED_LARGE , int ( suite_test_config_dict [ test_suite_name ] [ ' max_recv_speed_large ' ] ) )
self . conn . setopt ( self . conn . URL , str ( suite_test_config_dict [ test_suite_name ] [ ' url ' ] ) )
self . conn . setopt ( self . conn . TIMEOUT , int ( suite_test_config_dict [ test_suite_name ] [ ' conn_timeout ' ] ) )
def _write_in_nezha ( self , sizeStr , connInfoDict ) :
nzdict = { }
nzname = ' conn_taffic_status_size_ ' + sizeStr
@@ -347,9 +329,8 @@ class SSLFileDownloadBuild:
fn . close ( )
f . close ( )
def conn_traffic ( self , URL , conn_taffic_re , sizeStr , size , conTimeout ):
self . conn . setopt ( self . conn . TIMEOUT , conTimeout )
self . conn . setopt ( self . conn . URL , URL )
def conn_traffic ( self , test_suite_name , conn_taffic_re , sizeStr , size ) :
self . _set_conn_opt ( test_suite_name )
self . conn . perform ( )
certs = self . conn . getinfo ( self . conn . INFO_CERTINFO )
conninfo = self . _get_conninfo ( self . conn )
@@ -378,125 +359,124 @@ class SslUnitTest(unittest.TestCase):
def test_securityPolicy_bypass ( self ) :
sslHandler = SSLCheckRequestBuild ( )
with self . assertRaisesRegex ( Exception , ssl_bypass_info_re ) :
sslHandler . ssl_bypass ( 1 )
sslHandler . ssl_bypass ( ' test_securityPolicy_bypass ' )
def test_securityPolicy_intercept ( self ) :
sslHandler = SSLCheckRequestBuild ( )
with self . assertRaisesRegex ( Exception , ssl_intercept_info_re ) :
sslHandler . ssl_intercept ( 1 )
sslHandler . ssl_intercept ( ' test_securityPolicy_intercept ' )
def test_securityPolicy_intercept_certerrExpired ( self ) :
requestHandler = SslInterceptRequestBuild ( )
with self . assertRaisesRegex ( Exception , https_exprired_info_re ) :
requestHandler . ssl_intercept_certerrExpired ( 1 )
requestHandler . ssl_intercept_certerrExpired ( ' test_securityPolicy_intercept_certerrExpired ' )
def test_securityPolicy_intercept_certerrSelf_signed ( self ) :
requestHandler = SslInterceptRequestBuild ( )
with self . assertRaisesRegex ( Exception , https_self_signed_info_re ) :
requestHandler . ssl_intercept_certerrSelf_signed ( 1 )
requestHandler . ssl_intercept_certerrSelf_signed ( ' test_securityPolicy_intercept_certerrSelf_signed ' )
def test_securityPolicy_intercept_certerrUntrusted_root ( self ) :
requestHandler = SslInterceptRequestBuild ( )
with self . assertRaisesRegex ( Exception , https_untrusted_root_info_re ) :
requestHandler . ssl_intercept_certerrUntrusted_root ( 1 )
requestHandler . ssl_intercept_certerrUntrusted_root ( ' test_securityPolicy_intercept_certerrUntrusted_root ' )
def test_proxyPolicy_ssl_redirect ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , ssl_redirect_info_re ) :
proxyHandler . proxy_redirect ( URLSslRedirect , ssl_redirect_info_re , True , 2 )
proxyHandler . proxy_redirect ( ' test_proxyPolicy_ ssl_redirect' , True )
def test_proxyPolicy_ssl_block ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , ssl_block_info_re ) :
proxyHandler . proxy_block ( URLSslBlock , ssl_block_info_re , True , 2 )
proxyHandler . proxy_block ( ' test_proxyPolicy_ ssl_block' , True )
def test_proxyPolicy_ssl_replace ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , ssl_replace_info_re ) :
proxyHandler . proxy_replace ( URLSslReplace , ssl_replace_info_re , True , 2 )
proxyHandler . proxy_replace ( ' test_proxyPolicy_ ssl_replace' , True )
def test_proxyPolicy_ssl_hijack ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , ssl_hijack_info_re ) :
proxyHandler . proxy_hijack ( URLSslHijack , ssl_hijack_info_re , True , 2 )
proxyHandler . proxy_hijack ( ' test_proxyPolicy_ ssl_hijack' , True )
def test_proxyPolicy_ssl_insert ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , ssl_insert_info_re ) :
proxyHandler . proxy_insert ( URLSslInsert , ssl_insert_info_re , True , 2 )
proxyHandler . proxy_insert ( ' test_proxyPolicy_ ssl_insert' , True )
def test_proxyPolicy_http_redirect ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , http_redirect_info_re ) :
proxyHandler . proxy_redirect ( URLHttpRedirect , http_redirect_info_re , False , 2 )
proxyHandler . proxy_redirect ( ' test_proxyPolicy_ http_redirect' , False )
def test_proxyPolicy_http_block ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , http_block_info_re ) :
proxyHandler . proxy_block ( URLHttpBlock , http_block_info_re , False , 2 )
proxyHandler . proxy_block ( ' test_proxyPolicy_ http_block' , False )
def test_proxyPolicy_http_replace ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , http_replace_info_re ) :
proxyHandler . proxy_replace ( URLHttpReplace , http_replace_info_re , False , 2 )
proxyHandler . proxy_replace ( ' test_proxyPolicy_ http_replace' , False )
def test_proxyPolicy_http_hijack ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , http_hijack_info_re ) :
proxyHandler . proxy_hijack ( URLHttpHijack , http_hijack_info_re , False , 2 )
proxyHandler . proxy_hijack ( ' test_proxyPolicy_ http_hijack' , False )
def test_proxyPolicy_http_insert ( self ) :
proxyHandler = ProxyRequestBuild ( )
with self . assertRaisesRegex ( Exception , http_insert_info_re ) :
proxyHandler . proxy_insert ( URLHttpInsert , http_insert_info_re , False , 2 )
proxyHandler . proxy_insert ( ' test_proxyPolicy_ http_insert' , False )
def test_https_con_traffic_1k ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_1k_re ) :
requestHandler . conn_traffic ( URLConT raffic_1k, https_conn_taffic_1k_re , ' 1k ' , 1024 , 1 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_1k' , https_conn_taffic_1k_re , ' 1k ' , 1024 )
def test_https_con_traffic_4k ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_4k_re ) :
requestHandler . conn_traffic ( URLConT raffic_4k, https_conn_taffic_4k_re , ' 4k ' , 4 * 1024 , 1 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_4k' , https_conn_taffic_4k_re , ' 4k ' , 4 * 1024 )
def test_https_con_traffic_16k ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_16k_re ) :
requestHandler . conn_traffic ( URLConT raffic_16k, https_conn_taffic_16k_re , ' 16k ' , 16 * 1024 , 1 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_16k' , https_conn_taffic_16k_re , ' 16k ' , 16 * 1024 )
def test_https_con_traffic_64k ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_64k_re ) :
requestHandler . conn_traffic ( URLConT raffic_64k, https_conn_taffic_64k_re , ' 64k ' , 64 * 1024 , 1 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_64k' , https_conn_taffic_64k_re , ' 64k ' , 64 * 1024 )
def test_https_con_traffic_256k ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_256k_re ) :
requestHandler . conn_traffic ( URLConT raffic_256k, https_conn_taffic_256k_re , ' 256k ' , 256 * 1024 , 2 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_256k' , https_conn_taffic_256k_re , ' 256k ' , 256 * 1024 )
def test_https_con_traffic_1M ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_1M_re ) :
requestHandler . conn_traffic ( URLConT raffic_1M, https_conn_taffic_1M_re , ' 1M ' , 1024 * 1024 , 2 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_1M' , https_conn_taffic_1M_re , ' 1M ' , 1024 * 1024 )
def test_https_con_traffic_4M ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_4M_re ) :
requestHandler . conn_traffic ( URLConT raffic_4M, https_conn_taffic_4M_re , ' 4M ' , 4 * 1024 * 1024 , 2 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_4M' , https_conn_taffic_4M_re , ' 4M ' , 4 * 1024 * 1024 )
def test_https_con_traffic_16M ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_16M_re ) :
requestHandler . conn_traffic ( URLConT raffic_16M, https_conn_taffic_16M_re , ' 16M ' , 16 * 1024 * 1024 , 4 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_16M' , https_conn_taffic_16M_re , ' 16M ' , 16 * 1024 * 1024 )
def test_https_con_traffic_64M ( self ) :
requestHandler = SSLFileDownloadBuild ( )
with self . assertRaisesRegex ( Exception , https_conn_taffic_64M_re ) :
requestHandler . conn_traffic ( URLConT raffic_64M, https_conn_taffic_64M_re , ' 64M ' , 64 * 1024 * 1024 , 4 )
requestHandler . conn_traffic ( ' test_https_con_t raffic_64M' , https_conn_taffic_64M_re , ' 64M ' , 64 * 1024 * 1024 )
class TsgDiagnoseRun :
def __init__ ( self ) :
@@ -505,14 +485,17 @@ class TsgDiagnoseRun:
self . write = None
self . loop = False
self . count = 1
self . client = TelegrafClient ( host = ' 192.51.100.1 ' , port = 8100 , tags = { ' app_name ' : ' tsg-diagnose ' } )
self . config = None
self . client = None
self . config_dict = { }
def _get_suite_option ( self ) :
parser = argparse . ArgumentParser ( description = " Tsg Tools - tsg diagnose " , epilog = " Example:help " )
parser . add_argument ( ' -i ' , ' --interval ' , type = int , default = 1 , help = ' Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose. ' )
parser . add_argument ( ' -i ' , ' --interval ' , type = int , default = 30 , help = ' Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose. ' )
parser . add_argument ( ' -c ' , ' --count ' , type = int , default = 1 , help = ' Specifies the count of tsg diagnoses ,range:1-65535 ' )
parser . add_argument ( ' -f ' , ' --format ' , type = str , default = ' txt ' , help = ' Specifies the result output format of the tsg diagnose. There two formats: json, txt, the default is txt. ' )
parser . add_argument ( ' -w ' , ' --write ' , type = str , default = None , help = ' Write out result into file or NEZHA. Specifies the output file name or NEZHA. ' )
parser . add_argument ( ' -p ' , ' --configpath ' , type = str , default = ' /root/unittest/etc/tsg-diagnose.config ' , help = ' Specifies the config file, default /root/unittest/etc/tsg-diagnose.config ' )
parser . add_argument ( ' -l ' , ' --loop ' , action = ' store_true ' , default = False , help = ' Tsg diagnose loop, exit when recv a signal ' )
args = parser . parse_args ( )
self . interval = args . interval
@@ -520,6 +503,7 @@ class TsgDiagnoseRun:
self . write = args . write
self . loop = args . loop
self . count = args . count
self . config = args . configpath
if self . count == 0 :
print ( " Error: bad number of tsg diagnose and will exit " )
parser . print_help ( )
@@ -530,33 +514,51 @@ class TsgDiagnoseRun:
parser . print_help ( )
sys . exit ( 1 )
def _set_telegraf ( self ) :
# self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
self . client = TelegrafClient ( host = str ( self . config_dict [ ' telegraf ' ] [ ' host ' ] ) , port = int ( self . config_dict [ ' telegraf ' ] [ ' port ' ] ) , tags = { str ( self . config_dict [ ' telegraf ' ] [ ' tags_key ' ] ) : str ( self . config_dict [ ' telegraf ' ] [ ' tags_value ' ] ) } )
def _get_suite_config ( self ) :
global suite_test_config_dict
config = ConfigParser ( )
config . read ( self . config , encoding = ' UTF-8 ' )
for section in config . sections ( ) :
self . config_dict [ section ] = dict ( config . items ( section ) )
suite_test_config_dict = self . config_dict
def _add_suite ( self , test_suite_name ) :
if int ( self . config_dict [ test_suite_name ] [ ' enabled ' ] ) == 1 :
self . suite . addTest ( SslUnitTest ( test_suite_name ) )
def _init_suite ( self ) :
self . suite = unittest . TestSuite ( )
self . suite . _cleanup = False
self . suite. addTest ( SslUnitTest ( ' test_securityPolicy_bypass ' ) )
self . suite. addTest ( SslUnitTest ( ' test_securityPolicy_intercept ' ) )
self . suite. addTest ( SslUnitTest ( ' test_securityPolicy_intercept_certerrExpired ' ) )
self . suite. addTest ( SslUnitTest ( ' test_securityPolicy_intercept_certerrSelf_signed ' ) )
self . suite. addTest ( SslUnitTest ( ' test_securityPolicy_intercept_certerrUntrusted_root ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_ssl_redirect ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_ssl_block ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_ssl_replace ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_ssl_hijack ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_ssl_insert ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_http_redirect ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_http_block ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_http_replace ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_http_hijack ' ) )
self . suite. addTest ( SslUnitTest ( ' test_proxyPolicy_http_insert ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_1k ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_4k ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_16k ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_64k ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_256k ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_1M ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_4M ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_16M ' ) )
self . suite. addTest ( SslUnitTest ( ' test_https_con_traffic_64M ' ) )
self . _add_ suite( ' test_securityPolicy_bypass ' )
self . _add_ suite( ' test_securityPolicy_intercept ' )
self . _add_ suite( ' test_securityPolicy_intercept_certerrExpired ' )
self . _add_ suite( ' test_securityPolicy_intercept_certerrSelf_signed ' )
self . _add_ suite( ' test_securityPolicy_intercept_certerrUntrusted_root ' )
self . _add_ suite( ' test_proxyPolicy_ssl_redirect ' )
self . _add_ suite( ' test_proxyPolicy_ssl_block ' )
self . _add_ suite( ' test_proxyPolicy_ssl_replace ' )
self . _add_ suite( ' test_proxyPolicy_ssl_hijack ' )
self . _add_ suite( ' test_proxyPolicy_ssl_insert ' )
self . _add_ suite( ' test_proxyPolicy_http_redirect ' )
self . _add_ suite( ' test_proxyPolicy_http_block ' )
self . _add_ suite( ' test_proxyPolicy_http_replace ' )
self . _add_ suite( ' test_proxyPolicy_http_hijack ' )
self . _add_ suite( ' test_proxyPolicy_http_insert ' )
self . _add_ suite( ' test_https_con_traffic_1k ' )
self . _add_ suite( ' test_https_con_traffic_4k ' )
self . _add_ suite( ' test_https_con_traffic_16k ' )
self . _add_ suite( ' test_https_con_traffic_64k ' )
self . _add_ suite( ' test_https_con_traffic_256k ' )
self . _add_ suite( ' test_https_con_traffic_1M ' )
self . _add_ suite( ' test_https_con_traffic_4M ' )
self . _add_ suite( ' test_https_con_traffic_16M ' )
self . _add_ suite( ' test_https_con_traffic_64M ' )
def _write_suite_result_into_file ( self ) :
resultDict = ' /root/result_tsg_diagnose/unittest/ '
@@ -626,8 +628,12 @@ class TsgDiagnoseRun:
def execute_suite_tsg_diagnose ( self ) :
self . _get_suite_option ( )
self . _get_suite_config ( )
self . _set_telegraf ( )
self . _init_suite ( )
try :
if int ( self . config_dict [ ' start_time_random_delay_range ' ] [ ' enabled ' ] ) == 1 :
time . sleep ( random . randint ( int ( self . config_dict [ ' start_time_random_delay_range ' ] [ ' left_edge ' ] ) , int ( self . config_dict [ ' start_time_random_delay_range ' ] [ ' right_edge ' ] ) ) )
counter = 0
print ( " Tsg diagnose run sum: %d " % self . count )
while True :