From 6f70be05d00a98fb315a7d9e7e544b3ae1615f60 Mon Sep 17 00:00:00 2001 From: fumingwei Date: Tue, 15 Sep 2020 13:55:08 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E4=BF=AE=E6=94=B9=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitlab-ci.yml | 2 +- cmake/Package.cmake | 1 + docker-compose/docker-compose.yml | 1 + unittest_python/unittest/tsg_diagnose.py | 324 ++++++++++++----------- 4 files changed, 168 insertions(+), 160 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 029e317..1a4a80d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -56,7 +56,7 @@ images_build: WPR_IMAGE_TAG: wpr-tsg-diagnose:latest BADSSL_IMAGE_TAG: badssl-tsg-diagnose:latest script: - - mkdir images + - mkdir -p images - docker build -t $UNITTEST_IMAGE_TAG -f ./unittest_python/Dockerfile ./unittest_python/ - docker save $UNITTEST_IMAGE_TAG > images/unittest.tar - docker build -t $WPR_IMAGE_TAG -f ./wpr_golang/Dockerfile ./wpr_golang/ diff --git a/cmake/Package.cmake b/cmake/Package.cmake index a46dcb7..ee7be04 100644 --- a/cmake/Package.cmake +++ b/cmake/Package.cmake @@ -22,6 +22,7 @@ set(CPACK_RPM_PRE_UNINSTALL_SCRIPT_FILE ${CMAKE_SOURCE_DIR}/cmake/PreunInstall.i set(CPACK_RPM_POST_UNINSTALL_SCRIPT_FILE ${CMAKE_SOURCE_DIR}/cmake/PostunInstall.in) install(FILES docker-compose/docker-compose.yml DESTINATION ./compose) +install(FILES unittest_python/unittest/etc/tsg-diagnose.config DESTINATION ./etc) install(FILES docker-compose/tsg-diagnose.service DESTINATION /usr/lib/systemd/system) install(DIRECTORY images/ DESTINATION ./images) diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index 43244fd..6cec922 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -64,6 +64,7 @@ services: volumes: - /opt/tsg/tsg-diagnose/.badssl_cert_dict:/root/cafile_dict - /opt/tsg/tsg-diagnose/result:/root/result_tsg_diagnose + - /opt/tsg/tsg-diagnose/etc:/root/etc_tsg_diagnose - /etc/localtime:/etc/localtime:ro command: - /bin/sh diff --git a/unittest_python/unittest/tsg_diagnose.py b/unittest_python/unittest/tsg_diagnose.py index 294e3d6..012b876 100644 --- a/unittest_python/unittest/tsg_diagnose.py +++ b/unittest_python/unittest/tsg_diagnose.py @@ -11,44 +11,17 @@ import ciunittest import argparse from telegraf.client import TelegrafClient import hashlib -#from configparser import ConfigParser - -URLBypass = 'https://sha384.badssl.self-test.geedge.net' -URLIntercept = 'https://sha256.badssl.self-test.geedge.net' -URLSexpired = 'https://expired.badssl.self-test.geedge.net' -URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net' -URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net' - -URLSslRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js' -URLSslReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js' -URLSslInsert = 'https://cn.bing.com/?FORM=BEHPTB' -URLSslHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js' -URLSslBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js' - -URLHttpRedirect = 'http://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js' -URLHttpReplace = 'http://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js' -URLHttpInsert = 'http://cn.bing.com/?FORM=BEHPTB' -URLHttpHijack = 'http://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js' -URLHttpBlock = 'http://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js' - -URLConTraffic_1k = "https://downloadfile.self-test.geedge.net/1k" -URLConTraffic_4k = "https://downloadfile.self-test.geedge.net/4k" -URLConTraffic_16k = "https://downloadfile.self-test.geedge.net/16k" -URLConTraffic_64k = "https://downloadfile.self-test.geedge.net/64k" -URLConTraffic_256k = "https://downloadfile.self-test.geedge.net/256k" -URLConTraffic_1M = "https://downloadfile.self-test.geedge.net/1M" -URLConTraffic_4M = "https://downloadfile.self-test.geedge.net/4M" -URLConTraffic_16M = "https://downloadfile.self-test.geedge.net/16M" -URLConTraffic_64M = "https://downloadfile.self-test.geedge.net/64M" - - +from configparser import ConfigParser +import random +suite_test_config_dict = {} ssl_bypass_info_re = "Ssl connection bypass success" ssl_intercept_info_re = "Ssl connection intercept success" https_exprired_info_re = "Ssl exprired cert check success" https_self_signed_info_re = "Ssl self signed cert check success" https_untrusted_root_info_re = "Ssl untrusted_root cert check success" + ssl_redirect_info_re = "Ssl connection redirect success" ssl_replace_info_re = "Ssl connection replace success" ssl_insert_info_re = "Ssl connection insert success" @@ -72,7 +45,6 @@ https_conn_taffic_16M_re = 'https download file 16M success' https_conn_taffic_64M_re = 'https download file 64M success' - class SSLCheckRequestBuild: def __init__(self): self.conn = pycurl.Curl() @@ -80,12 +52,12 @@ class SSLCheckRequestBuild: self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - def ssl_bypass(self,conTimeout): - self.conn.setopt(self.conn.URL,URLBypass) - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() + def _set_conn_opt(self,test_suite_name): + self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) + self.conn.setopt(self.conn.URL,str(suite_test_config_dict[test_suite_name]['url'])) + self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) + + def _get_conn_issuer(self,certs): issuer = () for cert_info in certs[0]: if cert_info[0] == "Issuer": @@ -93,6 +65,15 @@ class SSLCheckRequestBuild: break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) + return issuer + + def ssl_bypass(self,test_suite_name): + self._set_conn_opt(test_suite_name) + self.conn.perform() + certs = self.conn.getinfo(self.conn.INFO_CERTINFO) + self.conn.close() + issuer = self._get_conn_issuer(certs) + if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): raise Exception(ssl_bypass_info_re) elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0): @@ -100,19 +81,12 @@ class SSLCheckRequestBuild: else: raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1]) - def ssl_intercept(self,conTimeout): - self.conn.setopt(self.conn.URL,URLIntercept) - self.conn.setopt(self.conn.TIMEOUT,conTimeout) + def ssl_intercept(self,test_suite_name): + self._set_conn_opt(test_suite_name) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) + issuer = self._get_conn_issuer(certs) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1]) @@ -129,7 +103,12 @@ class SslInterceptRequestBuild: self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - def _conn_to_perform(self, pxy_info_re): + def _set_conn_opt(self,test_suite_name): + self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) + self.conn.setopt(self.conn.URL,str(suite_test_config_dict[test_suite_name]['url'])) + self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) + + def _conn_to_perform(self, test_suite_name, sec_info_re): self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() @@ -142,27 +121,23 @@ class SslInterceptRequestBuild: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception( pxy_info_re) + raise Exception(sec_info_re) else: raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1]) else: raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) - def ssl_intercept_certerrExpired(self,conTimeout): - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self.conn.setopt(self.conn.URL, URLSexpired) - self._conn_to_perform(https_exprired_info_re) + def ssl_intercept_certerrExpired(self,test_suite_name): + self._set_conn_opt(test_suite_name) + self._conn_to_perform(test_suite_name,https_exprired_info_re) - def ssl_intercept_certerrSelf_signed(self,conTimeout): - self.conn.setopt(self.conn.URL,URLSselfsigned) - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self._conn_to_perform(https_self_signed_info_re) - - def ssl_intercept_certerrUntrusted_root(self,conTimeout): - self.conn.setopt(self.conn.URL,URLSuntrustedroot) - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self._conn_to_perform(https_untrusted_root_info_re) + def ssl_intercept_certerrSelf_signed(self,test_suite_name): + self._set_conn_opt(test_suite_name) + self._conn_to_perform(test_suite_name,https_self_signed_info_re) + def ssl_intercept_certerrUntrusted_root(self,test_suite_name,): + self._set_conn_opt(test_suite_name) + self._conn_to_perform(test_suite_name,https_untrusted_root_info_re) class ProxyRequestBuild: @@ -171,7 +146,7 @@ class ProxyRequestBuild: self.conn = pycurl.Curl() self.conn.setopt(self.conn.ENCODING, "gzip,deflate") - def _cert_verify(self, pxy_action_info_re,certs , isSsl): + def _cert_verify(self, certs, isSsl): if isSsl == True: issuer = () for cert_info in certs[0]: @@ -189,109 +164,112 @@ class ProxyRequestBuild: raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) - def _set_conn_opt(self,isSsl): + def _set_conn_opt(self,test_suite_name, isSsl): + self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) + self.conn.setopt(self.conn.URL,str(suite_test_config_dict[test_suite_name]['url'])) + self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) + self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) if isSsl == True: self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - - def proxy_redirect(self,url, proxy_info_re, isSsl, conTimeout): - self.conn.setopt(self.conn.URL, url) - self.conn.setopt(self.conn.TIMEOUT, conTimeout) - self._set_conn_opt(isSsl) + def proxy_redirect(self,test_suite_name,isSsl): certs = None - #self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) + self._set_conn_opt(test_suite_name,isSsl) self.conn.perform() if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.close() - self._cert_verify(ssl_redirect_info_re,certs, isSsl) + self._cert_verify(certs, isSsl) if rescode == 301 or rescode == 302: - raise Exception(proxy_info_re) + if isSsl == True: + raise Exception(ssl_redirect_info_re) + else: + raise Exception(http_redirect_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode) else: raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode) - def proxy_replace(self,url, proxy_info_re, isSsl, conTimeout): + def proxy_replace(self,test_suite_name,isSsl): certs = None - self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) - self.conn.setopt(self.conn.URL, url) - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self._set_conn_opt(isSsl) + self._set_conn_opt(test_suite_name, isSsl) self.conn.perform() if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) body = self.bodyBuf.getvalue().decode('utf-8') - self._cert_verify(ssl_replace_info_re,certs, isSsl) + self.conn.close() + self._cert_verify(certs, isSsl) if not re.search(r'EnglishSearchShared', body, 0) and \ re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0): - raise Exception(proxy_info_re) + if isSsl == True: + raise Exception(ssl_replace_info_re) + else: + raise Exception(http_replace_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection replace fail") else: raise Exception("Error:Http connection replace fail") - def proxy_insert(self,url, proxy_info_re, isSsl, conTimeout): + def proxy_insert(self,test_suite_name,isSsl): certs = None - self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) - self.conn.setopt(self.conn.URL, url) - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self._set_conn_opt(isSsl) + self._set_conn_opt(test_suite_name,isSsl) self.conn.perform() body = self.bodyBuf.getvalue().decode('utf-8') if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() - self._cert_verify(ssl_insert_info_re,certs, isSsl) + self._cert_verify(certs, isSsl) if re.search(r'httpSelfcheckInsert', body, 0) and \ re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0): - raise Exception(proxy_info_re) + if isSsl == True: + raise Exception(ssl_insert_info_re) + else: + raise Exception(http_insert_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection insert fail") else: raise Exception("Error:Http connection insert fail") - def proxy_block(self,url, proxy_info_re, isSsl, conTimeout): + def proxy_block(self,test_suite_name,isSsl): certs = None - self.conn.setopt(self.conn.URL, url) - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) - self._set_conn_opt(isSsl) + self._set_conn_opt(test_suite_name,isSsl) self.conn.perform() if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() - self._cert_verify(ssl_block_info_re,certs, isSsl) + self._cert_verify(certs, isSsl) if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451): - raise Exception(proxy_info_re) + if isSsl == True: + raise Exception(ssl_block_info_re) + else: + raise Exception(http_block_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode) else: raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode) - def proxy_hijack(self,url, proxy_info_re, isSsl, conTimeout): + def proxy_hijack(self,test_suite_name,isSsl): certs = None - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self.conn.setopt(self.conn.URL, url) - self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, 8 * 1024 * 1024) - self._set_conn_opt(isSsl) + self._set_conn_opt(test_suite_name,isSsl) self.conn.perform() if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() - self._cert_verify(ssl_hijack_info_re,certs, isSsl) + self._cert_verify(certs, isSsl) hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue()) if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0): - raise Exception(proxy_info_re) + if isSsl == True: + raise Exception(ssl_hijack_info_re) + else: + raise Exception(http_hijack_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection hijack fail") @@ -299,15 +277,14 @@ class ProxyRequestBuild: raise Exception("Error:Http connection hijack fail") - class SSLFileDownloadBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.OPT_CERTINFO, 1) - self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'}) - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, 8 * 1024 * 1024) + #self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'}) + self.client = TelegrafClient(host=str(suite_test_config_dict['telegraf']['host']), port=int(suite_test_config_dict['telegraf']['port']),tags={str(suite_test_config_dict['telegraf']['tags_key']):str(suite_test_config_dict['telegraf']['tags_value'])}) def _get_conninfo(self,conn): dictconninfo = {} @@ -324,6 +301,11 @@ class SSLFileDownloadBuild: dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) return dictconninfo + def _set_conn_opt(self,test_suite_name): + self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) + self.conn.setopt(self.conn.URL,str(suite_test_config_dict[test_suite_name]['url'])) + self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) + def _write_in_nezha(self, sizeStr, connInfoDict): nzdict = {} nzname = 'conn_taffic_status_size_' + sizeStr @@ -347,9 +329,8 @@ class SSLFileDownloadBuild: fn.close() f.close() - def conn_traffic(self,URL,conn_taffic_re, sizeStr, size,conTimeout): - self.conn.setopt(self.conn.TIMEOUT,conTimeout) - self.conn.setopt(self.conn.URL,URL) + def conn_traffic(self,test_suite_name, conn_taffic_re, sizeStr, size): + self._set_conn_opt(test_suite_name) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) conninfo = self._get_conninfo(self.conn) @@ -378,125 +359,124 @@ class SslUnitTest(unittest.TestCase): def test_securityPolicy_bypass(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_bypass_info_re): - sslHandler.ssl_bypass(1) + sslHandler.ssl_bypass('test_securityPolicy_bypass') def test_securityPolicy_intercept(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_intercept_info_re): - sslHandler.ssl_intercept(1) + sslHandler.ssl_intercept('test_securityPolicy_intercept') def test_securityPolicy_intercept_certerrExpired(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_exprired_info_re): - requestHandler.ssl_intercept_certerrExpired(1) + requestHandler.ssl_intercept_certerrExpired('test_securityPolicy_intercept_certerrExpired') def test_securityPolicy_intercept_certerrSelf_signed(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_self_signed_info_re): - requestHandler.ssl_intercept_certerrSelf_signed(1) + requestHandler.ssl_intercept_certerrSelf_signed('test_securityPolicy_intercept_certerrSelf_signed') def test_securityPolicy_intercept_certerrUntrusted_root(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_untrusted_root_info_re): - requestHandler.ssl_intercept_certerrUntrusted_root(1) + requestHandler.ssl_intercept_certerrUntrusted_root('test_securityPolicy_intercept_certerrUntrusted_root') def test_proxyPolicy_ssl_redirect(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_redirect_info_re): - proxyHandler.proxy_redirect(URLSslRedirect ,ssl_redirect_info_re, True,2) + proxyHandler.proxy_redirect('test_proxyPolicy_ssl_redirect',True) def test_proxyPolicy_ssl_block(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_block_info_re): - proxyHandler.proxy_block(URLSslBlock,ssl_block_info_re, True, 2) + proxyHandler.proxy_block('test_proxyPolicy_ssl_block', True) def test_proxyPolicy_ssl_replace(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_replace_info_re): - proxyHandler.proxy_replace(URLSslReplace,ssl_replace_info_re, True,2) + proxyHandler.proxy_replace('test_proxyPolicy_ssl_replace', True) def test_proxyPolicy_ssl_hijack(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_hijack_info_re): - proxyHandler.proxy_hijack(URLSslHijack,ssl_hijack_info_re, True,2) + proxyHandler.proxy_hijack('test_proxyPolicy_ssl_hijack', True) def test_proxyPolicy_ssl_insert(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_insert_info_re): - proxyHandler.proxy_insert(URLSslInsert,ssl_insert_info_re,True,2) + proxyHandler.proxy_insert('test_proxyPolicy_ssl_insert',True) def test_proxyPolicy_http_redirect(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_redirect_info_re): - proxyHandler.proxy_redirect(URLHttpRedirect,http_redirect_info_re, False,2) + proxyHandler.proxy_redirect('test_proxyPolicy_http_redirect', False) def test_proxyPolicy_http_block(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_block_info_re): - proxyHandler.proxy_block(URLHttpBlock,http_block_info_re, False,2) + proxyHandler.proxy_block('test_proxyPolicy_http_block', False) def test_proxyPolicy_http_replace(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_replace_info_re): - proxyHandler.proxy_replace(URLHttpReplace,http_replace_info_re, False,2) + proxyHandler.proxy_replace('test_proxyPolicy_http_replace', False) def test_proxyPolicy_http_hijack(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_hijack_info_re): - proxyHandler.proxy_hijack(URLHttpHijack,http_hijack_info_re, False,2) + proxyHandler.proxy_hijack('test_proxyPolicy_http_hijack', False) def test_proxyPolicy_http_insert(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_insert_info_re): - proxyHandler.proxy_insert(URLHttpInsert,http_insert_info_re,False, 2) - + proxyHandler.proxy_insert('test_proxyPolicy_http_insert',False) def test_https_con_traffic_1k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re): - requestHandler.conn_traffic( URLConTraffic_1k,https_conn_taffic_1k_re, '1k', 1024,1) + requestHandler.conn_traffic( 'test_https_con_traffic_1k',https_conn_taffic_1k_re,'1k', 1024) def test_https_con_traffic_4k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re): - requestHandler.conn_traffic( URLConTraffic_4k,https_conn_taffic_4k_re, '4k', 4*1024,1) + requestHandler.conn_traffic( 'test_https_con_traffic_4k',https_conn_taffic_4k_re, '4k', 4*1024) def test_https_con_traffic_16k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re): - requestHandler.conn_traffic( URLConTraffic_16k,https_conn_taffic_16k_re, '16k', 16*1024,1) + requestHandler.conn_traffic( 'test_https_con_traffic_16k', https_conn_taffic_16k_re,'16k', 16*1024) def test_https_con_traffic_64k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re): - requestHandler.conn_traffic( URLConTraffic_64k,https_conn_taffic_64k_re, '64k', 64*1024,1) + requestHandler.conn_traffic( 'test_https_con_traffic_64k',https_conn_taffic_64k_re, '64k', 64*1024) def test_https_con_traffic_256k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re): - requestHandler.conn_traffic( URLConTraffic_256k,https_conn_taffic_256k_re, '256k', 256*1024,2) + requestHandler.conn_traffic( 'test_https_con_traffic_256k', https_conn_taffic_256k_re,'256k', 256*1024) def test_https_con_traffic_1M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re): - requestHandler.conn_traffic( URLConTraffic_1M,https_conn_taffic_1M_re, '1M', 1024 * 1024,2) + requestHandler.conn_traffic( 'test_https_con_traffic_1M',https_conn_taffic_1M_re, '1M', 1024 * 1024) def test_https_con_traffic_4M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re): - requestHandler.conn_traffic( URLConTraffic_4M,https_conn_taffic_4M_re, '4M', 4*1024*1024,2) + requestHandler.conn_traffic( 'test_https_con_traffic_4M', https_conn_taffic_4M_re,'4M', 4*1024*1024) def test_https_con_traffic_16M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re): - requestHandler.conn_traffic( URLConTraffic_16M,https_conn_taffic_16M_re, '16M',16*1024*1024,4) + requestHandler.conn_traffic( 'test_https_con_traffic_16M', https_conn_taffic_16M_re,'16M',16*1024*1024) def test_https_con_traffic_64M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re): - requestHandler.conn_traffic( URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024,4) + requestHandler.conn_traffic( 'test_https_con_traffic_64M',https_conn_taffic_64M_re, '64M', 64*1024*1024) class TsgDiagnoseRun: def __init__(self): @@ -505,14 +485,17 @@ class TsgDiagnoseRun: self.write = None self.loop = False self.count = 1 - self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'}) + self.config = None + self.client = None + self.config_dict = {} def _get_suite_option(self): parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help") - parser.add_argument('-i','--interval', type = int, default = 1,help='Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose.') + parser.add_argument('-i','--interval', type = int, default = 30,help='Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose.') parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535') parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: json,txt, the default is txt.') parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file or NEZHA. Specifies the output file name or NEZHA.') + parser.add_argument('-p','--configpath', type = str, default = '/root/unittest/etc/tsg-diagnose.config',help='Specifies the config file, default /root/unittest/etc/tsg-diagnose.config') parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal') args = parser.parse_args() self.interval = args.interval @@ -520,6 +503,7 @@ class TsgDiagnoseRun: self.write = args.write self.loop = args.loop self.count = args.count + self.config = args.configpath if self.count == 0: print("Error: bad number of tsg diagnose and will exit") parser.print_help() @@ -530,33 +514,51 @@ class TsgDiagnoseRun: parser.print_help() sys.exit(1) + def _set_telegraf(self): + # self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'}) + self.client = TelegrafClient(host=str(self.config_dict['telegraf']['host']), port=int(self.config_dict['telegraf']['port']),tags={str(self.config_dict['telegraf']['tags_key']):str(self.config_dict['telegraf']['tags_value'])}) + + + def _get_suite_config(self): + global suite_test_config_dict + config = ConfigParser() + config.read(self.config, encoding='UTF-8') + for section in config.sections(): + self.config_dict[section] = dict(config.items(section)) + suite_test_config_dict = self.config_dict + + def _add_suite(self,test_suite_name): + if int(self.config_dict[test_suite_name]['enabled']) == 1: + self.suite.addTest(SslUnitTest(test_suite_name)) + def _init_suite(self): self.suite = unittest.TestSuite() self.suite._cleanup = False - self.suite.addTest(SslUnitTest('test_securityPolicy_bypass')) - self.suite.addTest(SslUnitTest('test_securityPolicy_intercept')) - self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired')) - self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed')) - self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_redirect')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_block')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_replace')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_hijack')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_ssl_insert')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_http_redirect')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_http_block')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_http_replace')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_http_hijack')) - self.suite.addTest(SslUnitTest('test_proxyPolicy_http_insert')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_1k')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_4k')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_16k')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_64k')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_256k')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_1M')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_4M')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_16M')) - self.suite.addTest(SslUnitTest('test_https_con_traffic_64M')) + self._add_suite('test_securityPolicy_bypass') + self._add_suite('test_securityPolicy_intercept') + self._add_suite('test_securityPolicy_intercept_certerrExpired') + self._add_suite('test_securityPolicy_intercept_certerrSelf_signed') + self._add_suite('test_securityPolicy_intercept_certerrUntrusted_root') + self._add_suite('test_proxyPolicy_ssl_redirect') + self._add_suite('test_proxyPolicy_ssl_block') + self._add_suite('test_proxyPolicy_ssl_replace') + self._add_suite('test_proxyPolicy_ssl_hijack') + self._add_suite('test_proxyPolicy_ssl_insert') + self._add_suite('test_proxyPolicy_http_redirect') + self._add_suite('test_proxyPolicy_http_block') + self._add_suite('test_proxyPolicy_http_replace') + self._add_suite('test_proxyPolicy_http_hijack') + self._add_suite('test_proxyPolicy_http_insert') + self._add_suite('test_https_con_traffic_1k') + self._add_suite('test_https_con_traffic_4k') + self._add_suite('test_https_con_traffic_16k') + self._add_suite('test_https_con_traffic_64k') + self._add_suite('test_https_con_traffic_256k') + self._add_suite('test_https_con_traffic_1M') + self._add_suite('test_https_con_traffic_4M') + self._add_suite('test_https_con_traffic_16M') + self._add_suite('test_https_con_traffic_64M') + def _write_suite_result_into_file(self): resultDict = '/root/result_tsg_diagnose/unittest/' @@ -626,8 +628,12 @@ class TsgDiagnoseRun: def execute_suite_tsg_diagnose(self): self._get_suite_option() + self._get_suite_config() + self._set_telegraf() self._init_suite() try: + if int(self.config_dict['start_time_random_delay_range']['enabled']) == 1: + time.sleep(random.randint(int(self.config_dict['start_time_random_delay_range']['left_edge']),int(self.config_dict['start_time_random_delay_range']['right_edge']))) counter = 0 print("Tsg diagnose run sum: %d" % self.count) while True: