This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-tsg-diagnose/unittest_python/unittest/tsg_diagnose.py

890 lines
44 KiB
Python
Raw Normal View History

2020-05-28 19:30:31 +08:00
import sys
2019-12-20 15:38:14 +08:00
import unittest
import json
import pycurl
import os
import re
import time
2019-12-20 15:38:14 +08:00
from io import BytesIO
import getopt
import ciunittest
import argparse
from telegraf.client import TelegrafClient
import hashlib
2020-09-15 13:55:08 +08:00
from configparser import ConfigParser
import random
suite_test_config_dict = {'test_securityPolicy_bypass': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept_certerrExpired': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept_certerrSelf_signed': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept_certerrUntrusted_root': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_redirect': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_replace': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_hijack': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_insert': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_redirect': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_replace': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_hijack': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_insert': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_1k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_4k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_16k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_64k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_256k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_1M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_4M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_16M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_64M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_http_firewall_allow': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_http_firewall_deny_drop': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600},
'test_http_firewall_deny_rst': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_http_firewall_deny_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_ssl_firewall_allow': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_ssl_firewall_deny_drop': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600},
'test_ssl_firewall_deny_rst': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'start_time_random_delay_range': {'enabled':1,'left_edge':1,'right_edge':30},
'telegraf': {'host':'192.51.100.1','port':8100,'tags_key':'app_name','tags_value' :'tsg-diagnose'}}
2019-12-20 15:38:14 +08:00
ssl_bypass_info_re = "Ssl connection bypass success"
ssl_intercept_info_re = "Ssl connection intercept success"
https_exprired_info_re = "Ssl exprired cert check success"
https_self_signed_info_re = "Ssl self signed cert check success"
https_untrusted_root_info_re = "Ssl untrusted_root cert check success"
2020-09-15 13:55:08 +08:00
ssl_redirect_info_re = "Ssl connection redirect success"
ssl_replace_info_re = "Ssl connection replace success"
ssl_insert_info_re = "Ssl connection insert success"
ssl_hijack_info_re = "Ssl connection hijack success"
ssl_block_info_re = "Ssl connection block success"
2019-12-20 15:38:14 +08:00
http_redirect_info_re = "http connection redirect success"
http_replace_info_re = "http connection replace success"
http_insert_info_re = "http connection insert success"
http_hijack_info_re = "http connection hijack success"
http_block_info_re = "http connection block success"
https_conn_taffic_1k_re = 'https download file 1k success'
https_conn_taffic_4k_re = 'https download file 4k success'
https_conn_taffic_16k_re = 'https download file 16k success'
https_conn_taffic_64k_re = 'https download file 64k success'
https_conn_taffic_256k_re = 'https download file 256k success'
https_conn_taffic_1M_re = 'https download file 1M success'
https_conn_taffic_4M_re = 'https download file 4M success'
https_conn_taffic_16M_re = 'https download file 16M success'
https_conn_taffic_64M_re = 'https download file 64M success'
http_firewall_allow_re = "http firewall action allow success"
http_firewall_deny_drop_re = "http firewall aciton deny subaction drop success"
http_firewall_deny_rst_re = "http firewall action deny subaction rst success"
http_firewall_deny_block_re = "http firewall aciton deny subaction block success"
ssl_firewall_allow_re = "ssl firewall action allow success"
ssl_firewall_deny_drop_re = "ssl firewall action deny subaction drop success"
ssl_firewall_deny_rst_re = "ssl firewall action deny subaction rst success"
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
URLIntercept = 'https://sha256.badssl.self-test.geedge.net'
URLSslExpired = 'https://expired.badssl.self-test.geedge.net'
URLSslSelfsigned = 'https://self-signed.badssl.self-test.geedge.net'
URLSslSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net'
URLSslRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLSslReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLSslInsert = 'https://cn.bing.com/?FORM=BEHPTB'
URLSslHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLSslBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLHttpRedirect = 'http://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLHttpReplace = 'http://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLHttpInsert = 'http://cn.bing.com/?FORM=BEHPTB'
URLHttpHijack = 'http://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLHttpBlock = 'http://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLConTraffic_1k = "https://downloadfile.self-test.geedge.net/1k"
URLConTraffic_4k = "https://downloadfile.self-test.geedge.net/4k"
URLConTraffic_16k = "https://downloadfile.self-test.geedge.net/16k"
URLConTraffic_64k = "https://downloadfile.self-test.geedge.net/64k"
URLConTraffic_256k = "https://downloadfile.self-test.geedge.net/256k"
URLConTraffic_1M = "https://downloadfile.self-test.geedge.net/1M"
URLConTraffic_4M = "https://downloadfile.self-test.geedge.net/4M"
URLConTraffic_16M = "https://downloadfile.self-test.geedge.net/16M"
URLConTraffic_64M = "https://downloadfile.self-test.geedge.net/64M"
URLHttpFirewallAllow = "http://http.badssl.self-test.geedge.net"
URLHttpFirewallDenyDrop = "http://http-credit-card.badssl.self-test.geedge.net"
URLHttpFirewallDenyRst = "http://http-dynamic-login.badssl.self-test.geedge.net"
URLHttpFirewallDenyBlock = "http://http-login.badssl.self-test.geedge.net"
URLSslFirewallAllow = "https://sha512.badssl.self-test.geedge.net"
URLSslFirewallDenyDrop = "https://rsa2048.badssl.self-test.geedge.net"
URLSslFirewallDenyRst = "https://rsa4096.badssl.self-test.geedge.net"
2019-12-20 15:38:14 +08:00
class SSLCheckRequestBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name, url):
2020-09-15 13:55:08 +08:00
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
2020-09-15 13:55:08 +08:00
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _get_conn_issuer(self,certs):
2019-12-20 15:38:14 +08:00
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
2020-09-15 13:55:08 +08:00
return issuer
def ssl_bypass(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLBypass)
2020-09-15 13:55:08 +08:00
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = self._get_conn_issuer(certs)
2019-12-20 15:38:14 +08:00
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0):
raise Exception(ssl_bypass_info_re)
elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0):
raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1])
2019-12-20 15:38:14 +08:00
else:
raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1])
2019-12-20 15:38:14 +08:00
2020-09-15 13:55:08 +08:00
def ssl_intercept(self,test_suite_name):
self._set_conn_opt(test_suite_name, URLIntercept)
2019-12-20 15:38:14 +08:00
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
2020-09-15 13:55:08 +08:00
issuer = self._get_conn_issuer(certs)
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1])
else:
raise Exception(ssl_intercept_info_re)
2019-12-20 15:38:14 +08:00
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
2019-12-20 15:38:14 +08:00
class SslInterceptRequestBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name,url):
2020-09-15 13:55:08 +08:00
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL, url)
2020-09-15 13:55:08 +08:00
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _conn_to_perform(self, test_suite_name, sec_info_re):
2019-12-20 15:38:14 +08:00
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
2019-12-20 15:38:14 +08:00
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
2020-09-15 13:55:08 +08:00
raise Exception(sec_info_re)
2019-12-20 15:38:14 +08:00
else:
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
2019-12-20 15:38:14 +08:00
else:
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0) or \
re.search(r'\bCN[\s\S]*=[\s\S]*badssl[\s\S]*',issuer[1],0):
raise Exception("Error: Ssl connection intercept failed, cert info: %s" % issuer[1])
else:
raise Exception(sec_info_re)
2019-12-20 15:38:14 +08:00
2020-09-15 13:55:08 +08:00
def ssl_intercept_certerrExpired(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslExpired)
2020-09-15 13:55:08 +08:00
self._conn_to_perform(test_suite_name,https_exprired_info_re)
2019-12-20 15:38:14 +08:00
2020-09-15 13:55:08 +08:00
def ssl_intercept_certerrSelf_signed(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslSelfsigned)
2020-09-15 13:55:08 +08:00
self._conn_to_perform(test_suite_name,https_self_signed_info_re)
2019-12-20 15:38:14 +08:00
2020-09-15 13:55:08 +08:00
def ssl_intercept_certerrUntrusted_root(self,test_suite_name,):
self._set_conn_opt(test_suite_name,URLSslSuntrustedroot)
2020-09-15 13:55:08 +08:00
self._conn_to_perform(test_suite_name,https_untrusted_root_info_re)
2019-12-20 15:38:14 +08:00
class ProxyRequestBuild:
2019-12-20 15:38:14 +08:00
def __init__(self):
self.bodyBuf = BytesIO()
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
2020-09-15 13:55:08 +08:00
def _cert_verify(self, certs, isSsl):
if isSsl == True:
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1])
else:
return
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
def _set_conn_opt(self,test_suite_name, url,isSsl):
2020-09-15 13:55:08 +08:00
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
2020-09-15 13:55:08 +08:00
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
if isSsl == True:
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def proxy_redirect(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
2019-12-20 15:38:14 +08:00
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
2019-12-20 15:38:14 +08:00
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
2020-09-15 13:55:08 +08:00
self._cert_verify(certs, isSsl)
2019-12-20 15:38:14 +08:00
if rescode == 301 or rescode == 302:
2020-09-15 13:55:08 +08:00
if isSsl == True:
raise Exception(ssl_redirect_info_re)
else:
raise Exception(http_redirect_info_re)
2019-12-20 15:38:14 +08:00
else:
if isSsl == True:
raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode)
2019-12-20 15:38:14 +08:00
def proxy_replace(self,test_suite_name, proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name, proxy_url,isSsl)
2019-12-20 15:38:14 +08:00
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
2019-12-20 15:38:14 +08:00
body = self.bodyBuf.getvalue().decode('utf-8')
2020-09-15 13:55:08 +08:00
self.conn.close()
self._cert_verify(certs, isSsl)
2019-12-20 15:38:14 +08:00
if not re.search(r'EnglishSearchShared', body, 0) and \
re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0):
2020-09-15 13:55:08 +08:00
if isSsl == True:
raise Exception(ssl_replace_info_re)
else:
raise Exception(http_replace_info_re)
2019-12-20 15:38:14 +08:00
else:
if isSsl == True:
raise Exception("Error:Ssl connection replace fail")
else:
raise Exception("Error:Http connection replace fail")
2019-12-20 15:38:14 +08:00
def proxy_insert(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
2019-12-20 15:38:14 +08:00
self.conn.perform()
body = self.bodyBuf.getvalue().decode('utf-8')
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
2019-12-20 15:38:14 +08:00
self.conn.close()
2020-09-15 13:55:08 +08:00
self._cert_verify(certs, isSsl)
2019-12-20 15:38:14 +08:00
if re.search(r'httpSelfcheckInsert', body, 0) and \
re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0):
2020-09-15 13:55:08 +08:00
if isSsl == True:
raise Exception(ssl_insert_info_re)
else:
raise Exception(http_insert_info_re)
2019-12-20 15:38:14 +08:00
else:
if isSsl == True:
raise Exception("Error:Ssl connection insert fail")
else:
raise Exception("Error:Http connection insert fail")
2019-12-20 15:38:14 +08:00
def proxy_block(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
2019-12-20 15:38:14 +08:00
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
2019-12-20 15:38:14 +08:00
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close()
2020-09-15 13:55:08 +08:00
self._cert_verify(certs, isSsl)
2019-12-20 15:38:14 +08:00
if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451):
2020-09-15 13:55:08 +08:00
if isSsl == True:
raise Exception(ssl_block_info_re)
else:
raise Exception(http_block_info_re)
2019-12-20 15:38:14 +08:00
else:
if isSsl == True:
raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode)
2019-12-20 15:38:14 +08:00
def proxy_hijack(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
2019-12-20 15:38:14 +08:00
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
2019-12-20 15:38:14 +08:00
self.conn.close()
2020-09-15 13:55:08 +08:00
self._cert_verify(certs, isSsl)
hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue())
if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0):
2020-09-15 13:55:08 +08:00
if isSsl == True:
raise Exception(ssl_hijack_info_re)
else:
raise Exception(http_hijack_info_re)
2019-12-20 15:38:14 +08:00
else:
if isSsl == True:
raise Exception("Error:Ssl connection hijack fail")
else:
raise Exception("Error:Http connection hijack fail")
2019-12-20 15:38:14 +08:00
class SSLFileDownloadBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
2020-09-15 13:55:08 +08:00
#self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
self.client = TelegrafClient(host=str(suite_test_config_dict['telegraf']['host']), port=int(suite_test_config_dict['telegraf']['port']),tags={str(suite_test_config_dict['telegraf']['tags_key']):str(suite_test_config_dict['telegraf']['tags_value'])})
def _get_conninfo(self,conn):
dictconninfo = {}
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME)
dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME)
dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME)
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
return dictconninfo
def _set_conn_opt(self,test_suite_name,url):
2020-09-15 13:55:08 +08:00
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
2020-09-15 13:55:08 +08:00
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _write_in_nezha(self, sizeStr, connInfoDict):
nzdict = {}
nzname = 'conn_taffic_status_size_' + sizeStr
dictKeyTime = "conn_traffic_" + sizeStr + "_size_total_time"
dcitKeyStatus = "conn_traffic_" + sizeStr + "_size_status"
nzdict[dictKeyTime] = connInfoDict['total_time']
nzdict[dcitKeyStatus] = connInfoDict['status']
self.client.metric(nzname, nzdict)
def _write_in_logfile(self, sizeStr, connInfoDict):
logNewestPath = "/root/result_tsg_diagnose/conn_traffic_status/conn_traffic_status_" + sizeStr
logPath = logNewestPath + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
connInfoStr = json.dumps(connInfoDict)
with open(logNewestPath,"w+") as f:
f.write(connInfoStr)
f.close()
with open(logPath,"w+") as f:
fn = open(logNewestPath,'r')
f.write(fn.read())
fn.close()
f.close()
def conn_traffic(self,test_suite_name,url,conn_taffic_re,sizeStr, size):
self._set_conn_opt(test_suite_name, url)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
conninfo = self._get_conninfo(self.conn)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1])
else:
raise Exception("Error: Intercept fail: no Tango cert,cert info:%s" % issuer[1])
if int(conninfo["size_download"]) == size:
self._write_in_nezha(sizeStr,conninfo)
self._write_in_logfile(sizeStr,conninfo)
raise Exception(conn_taffic_re)
else:
raise Exception("Error: connection tarffic size error and is no equal", sizeStr)
class HttpFirewallActionBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def action_allow(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallAllow)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 200:
raise Exception(http_firewall_allow_re)
else:
raise Exception("Error: The stream may be redirected, http code %s" % rescode)
def action_deny_subaction_drop(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyDrop)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 28):
raise Exception(http_firewall_deny_drop_re)
else:
raise Exception("Error: The stream may be not dropped %s" % errorinfo)
def action_deny_subaction_rst(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyRst)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 56):
raise Exception(http_firewall_deny_rst_re)
else:
raise Exception("Error: The stream may be not rst %s" % errorinfo)
def action_deny_subaction_block(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyBlock)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 403:
raise Exception(http_firewall_deny_block_re)
else:
raise Exception("Error: The stream may be not block, http code %s " % rescode)
class SslFirewallActionBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def action_allow(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallAllow)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 200:
raise Exception(ssl_firewall_allow_re)
else:
raise Exception("Error: The stream may be redirected, http code %s" % rescode)
def action_deny_subaction_drop(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallDenyDrop)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 28):
raise Exception(ssl_firewall_deny_drop_re)
else:
raise Exception("Error: The stream may be not dropped %s" % errorinfo)
def action_deny_subaction_rst(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallDenyRst)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 35):
raise Exception(ssl_firewall_deny_rst_re)
else:
raise Exception("Error: The stream may be not rst %s" % errorinfo)
2019-12-20 15:38:14 +08:00
class SslUnitTest(unittest.TestCase):
2020-05-28 19:30:31 +08:00
def test_securityPolicy_bypass(self):
2019-12-20 15:38:14 +08:00
sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_bypass_info_re):
2020-09-15 13:55:08 +08:00
sslHandler.ssl_bypass('test_securityPolicy_bypass')
2019-12-20 15:38:14 +08:00
2020-05-28 19:30:31 +08:00
def test_securityPolicy_intercept(self):
2019-12-20 15:38:14 +08:00
sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_intercept_info_re):
2020-09-15 13:55:08 +08:00
sslHandler.ssl_intercept('test_securityPolicy_intercept')
2019-12-20 15:38:14 +08:00
2020-05-28 19:30:31 +08:00
def test_securityPolicy_intercept_certerrExpired(self):
2019-12-20 15:38:14 +08:00
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_exprired_info_re):
2020-09-15 13:55:08 +08:00
requestHandler.ssl_intercept_certerrExpired('test_securityPolicy_intercept_certerrExpired')
2019-12-20 15:38:14 +08:00
2020-05-28 19:30:31 +08:00
def test_securityPolicy_intercept_certerrSelf_signed(self):
2019-12-20 15:38:14 +08:00
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_self_signed_info_re):
2020-09-15 13:55:08 +08:00
requestHandler.ssl_intercept_certerrSelf_signed('test_securityPolicy_intercept_certerrSelf_signed')
2019-12-20 15:38:14 +08:00
2020-05-28 19:30:31 +08:00
def test_securityPolicy_intercept_certerrUntrusted_root(self):
2019-12-20 15:38:14 +08:00
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_untrusted_root_info_re):
2020-09-15 13:55:08 +08:00
requestHandler.ssl_intercept_certerrUntrusted_root('test_securityPolicy_intercept_certerrUntrusted_root')
2019-12-20 15:38:14 +08:00
def test_proxyPolicy_ssl_redirect(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_redirect_info_re):
proxyHandler.proxy_redirect('test_proxyPolicy_ssl_redirect',URLSslRedirect,True)
def test_proxyPolicy_ssl_block(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_block_info_re):
proxyHandler.proxy_block('test_proxyPolicy_ssl_block', URLSslBlock,True)
def test_proxyPolicy_ssl_replace(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_replace_info_re):
proxyHandler.proxy_replace('test_proxyPolicy_ssl_replace',URLSslReplace, True)
def test_proxyPolicy_ssl_hijack(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_hijack_info_re):
proxyHandler.proxy_hijack('test_proxyPolicy_ssl_hijack', URLSslHijack,True)
def test_proxyPolicy_ssl_insert(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_insert_info_re):
proxyHandler.proxy_insert('test_proxyPolicy_ssl_insert',URLSslInsert,True)
def test_proxyPolicy_http_redirect(self):
proxyHandler = ProxyRequestBuild()
2019-12-20 15:38:14 +08:00
with self.assertRaisesRegex(Exception, http_redirect_info_re):
proxyHandler.proxy_redirect('test_proxyPolicy_http_redirect',URLHttpRedirect, False)
2019-12-20 15:38:14 +08:00
def test_proxyPolicy_http_block(self):
proxyHandler = ProxyRequestBuild()
2019-12-20 15:38:14 +08:00
with self.assertRaisesRegex(Exception, http_block_info_re):
proxyHandler.proxy_block('test_proxyPolicy_http_block', URLHttpBlock,False)
2019-12-20 15:38:14 +08:00
def test_proxyPolicy_http_replace(self):
proxyHandler = ProxyRequestBuild()
2019-12-20 15:38:14 +08:00
with self.assertRaisesRegex(Exception, http_replace_info_re):
proxyHandler.proxy_replace('test_proxyPolicy_http_replace',URLHttpReplace, False)
2019-12-20 15:38:14 +08:00
def test_proxyPolicy_http_hijack(self):
proxyHandler = ProxyRequestBuild()
2019-12-20 15:38:14 +08:00
with self.assertRaisesRegex(Exception, http_hijack_info_re):
proxyHandler.proxy_hijack('test_proxyPolicy_http_hijack', URLHttpHijack,False)
2019-12-20 15:38:14 +08:00
def test_proxyPolicy_http_insert(self):
proxyHandler = ProxyRequestBuild()
2019-12-20 15:38:14 +08:00
with self.assertRaisesRegex(Exception, http_insert_info_re):
proxyHandler.proxy_insert('test_proxyPolicy_http_insert',URLHttpInsert,False)
2019-12-20 15:38:14 +08:00
def test_https_con_traffic_1k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_1k', URLConTraffic_1k, https_conn_taffic_1k_re,'1k', 1024)
def test_https_con_traffic_4k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_4k',URLConTraffic_4k, https_conn_taffic_4k_re, '4k', 4*1024)
def test_https_con_traffic_16k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_16k', URLConTraffic_16k, https_conn_taffic_16k_re,'16k', 16*1024)
def test_https_con_traffic_64k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_64k',URLConTraffic_64k, https_conn_taffic_64k_re, '64k', 64*1024)
def test_https_con_traffic_256k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_256k', URLConTraffic_256k,https_conn_taffic_256k_re,'256k', 256*1024)
def test_https_con_traffic_1M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_1M', URLConTraffic_1M, https_conn_taffic_1M_re, '1M', 1024 * 1024)
def test_https_con_traffic_4M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_4M', URLConTraffic_4M, https_conn_taffic_4M_re,'4M', 4*1024*1024)
def test_https_con_traffic_16M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_16M', URLConTraffic_16M,https_conn_taffic_16M_re,'16M',16*1024*1024)
def test_https_con_traffic_64M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_64M',URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024)
def test_http_firewall_allow(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_allow_re):
requestHandler.action_allow('test_http_firewall_allow')
def test_http_firewall_deny_drop(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_drop_re):
requestHandler.action_deny_subaction_drop('test_http_firewall_deny_drop')
def test_http_firewall_deny_rst(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_rst_re):
requestHandler.action_deny_subaction_rst('test_http_firewall_deny_rst')
def test_http_firewall_deny_block(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_block_re):
requestHandler.action_deny_subaction_block('test_http_firewall_deny_block')
def test_ssl_firewall_allow(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_allow_re):
requestHandler.action_allow('test_ssl_firewall_allow')
def test_ssl_firewall_deny_drop(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_deny_drop_re):
requestHandler.action_deny_subaction_drop('test_ssl_firewall_deny_drop')
def test_ssl_firewall_deny_rst(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_deny_rst_re):
requestHandler.action_deny_subaction_rst('test_ssl_firewall_deny_rst')
class TsgDiagnoseRun:
def __init__(self):
self.interval = 1
self.format = "txt"
self.write = None
self.loop = False
self.count = 1
2020-09-15 13:55:08 +08:00
self.config = None
self.client = None
self.config_dict = {}
def _get_suite_option(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help")
2020-09-15 13:55:08 +08:00
parser.add_argument('-i','--interval', type = int, default = 30,help='Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose.')
parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535')
parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: jsontxt, the default is txt.')
parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file or NEZHA. Specifies the output file name or NEZHA.')
parser.add_argument('-p','--configpath', type = str, default = '/root/etc_tsg_diagnose/tsg-diagnose.config',help='Specifies the config file, default /root/etc_tsg_diagnose/tsg-diagnose.config')
parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal')
args = parser.parse_args()
self.interval = args.interval
self.format = args.format
self.write = args.write
self.loop = args.loop
self.count = args.count
2020-09-15 13:55:08 +08:00
self.config = args.configpath
if self.count == 0:
print("Error: bad number of tsg diagnose and will exit")
parser.print_help()
sys.exit(1)
if self.format not in ('json', 'txt'):
print("Error: bad output format of tsg diagnose and will exit")
parser.print_help()
sys.exit(1)
2020-09-15 13:55:08 +08:00
def _set_telegraf(self):
# self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
self.client = TelegrafClient(host=str(self.config_dict['telegraf']['host']), port=int(self.config_dict['telegraf']['port']),tags={str(self.config_dict['telegraf']['tags_key']):str(self.config_dict['telegraf']['tags_value'])})
def _get_suite_config(self):
global suite_test_config_dict
config = ConfigParser()
config.read(self.config, encoding='UTF-8')
for section in config.sections():
if section in suite_test_config_dict:
suite_test_config_dict[section].update(config.items(section))
else:
suite_test_config_dict[section] = dict(config.items(section))
self.config_dict = suite_test_config_dict
2020-09-15 13:55:08 +08:00
def _add_suite(self,test_suite_name):
if int(self.config_dict[test_suite_name]['enabled']) == 1:
self.suite.addTest(SslUnitTest(test_suite_name))
def _init_suite(self):
self.suite = unittest.TestSuite()
self.suite._cleanup = False
2020-09-15 13:55:08 +08:00
self._add_suite('test_securityPolicy_bypass')
self._add_suite('test_securityPolicy_intercept')
self._add_suite('test_securityPolicy_intercept_certerrExpired')
self._add_suite('test_securityPolicy_intercept_certerrSelf_signed')
self._add_suite('test_securityPolicy_intercept_certerrUntrusted_root')
self._add_suite('test_proxyPolicy_ssl_redirect')
self._add_suite('test_proxyPolicy_ssl_block')
self._add_suite('test_proxyPolicy_ssl_replace')
self._add_suite('test_proxyPolicy_ssl_hijack')
self._add_suite('test_proxyPolicy_ssl_insert')
self._add_suite('test_proxyPolicy_http_redirect')
self._add_suite('test_proxyPolicy_http_block')
self._add_suite('test_proxyPolicy_http_replace')
self._add_suite('test_proxyPolicy_http_hijack')
self._add_suite('test_proxyPolicy_http_insert')
self._add_suite('test_https_con_traffic_1k')
self._add_suite('test_https_con_traffic_4k')
self._add_suite('test_https_con_traffic_16k')
self._add_suite('test_https_con_traffic_64k')
self._add_suite('test_https_con_traffic_256k')
self._add_suite('test_https_con_traffic_1M')
self._add_suite('test_https_con_traffic_4M')
self._add_suite('test_https_con_traffic_16M')
self._add_suite('test_https_con_traffic_64M')
self._add_suite('test_http_firewall_allow')
self._add_suite('test_http_firewall_deny_drop')
self._add_suite('test_http_firewall_deny_rst')
self._add_suite('test_http_firewall_deny_block')
self._add_suite('test_ssl_firewall_allow')
self._add_suite('test_ssl_firewall_deny_drop')
self._add_suite('test_ssl_firewall_deny_rst')
def _write_suite_result_into_file(self):
resultDict = '/root/result_tsg_diagnose/unittest/'
resultNewestPath = resultDict + self.write
resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
if self.format == 'txt':
with open(resultNewestPath,"w+") as f:
2020-05-28 19:30:31 +08:00
runner = unittest.TextTestRunner(stream=f,verbosity=2)
runner.run(self.suite)
f.close()
if self.format == 'json':
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True)
with open(resultNewestPath,"w+") as f:
f.write(result_json)
2020-05-28 19:30:31 +08:00
f.close()
with open(resultPath,"w+") as f:
fn = open(resultNewestPath,'r')
f.write(fn.read())
fn.close()
f.close()
def _write_suite_result_into_NEZHA(self):
nzdict = {}
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=False)
result_dict = json.loads(result_json)
reuslt_list = result_dict['results']
succsum = 0
failsum = 0
for reuslt in reuslt_list:
succkey = reuslt['name'].split()[0] + '_succ'
nzdict[succkey] = 0
if reuslt['type'] == 'success':
nzdict[succkey] = 1
succsum = succsum + 1
if reuslt['type'] == 'failure':
failsum = failsum + 1
nzdict['succsum'] = succsum
self.client.metric('tsg_diagnose_result', nzdict)
result_dict['succsum'] = succsum
result_dict['failsum'] = failsum
result_stdout = json.dumps(result_dict)
print(result_stdout)
def _stdout_suite_result(self):
print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s'))
if self.format == 'txt':
runner = unittest.TextTestRunner(verbosity=2)
runner.run(self.suite)
if self.format == 'json':
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True)
print(result_json)
print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s'))
def _output_suite_result(self):
if self.write and self.write != 'NEZHA':
self._write_suite_result_into_file()
elif self.write == 'NEZHA':
self._write_suite_result_into_NEZHA()
else:
self._stdout_suite_result()
def execute_suite_tsg_diagnose(self):
self._get_suite_option()
2020-09-15 13:55:08 +08:00
self._get_suite_config()
self._set_telegraf()
self._init_suite()
try:
2020-09-15 13:55:08 +08:00
if int(self.config_dict['start_time_random_delay_range']['enabled']) == 1:
time.sleep(random.randint(int(self.config_dict['start_time_random_delay_range']['left_edge']),int(self.config_dict['start_time_random_delay_range']['right_edge'])))
counter = 0
print("Tsg diagnose run sum: %d" % self.count)
while True:
print("\nRUN %d" %(counter + 1))
self._output_suite_result()
counter = counter + 1
if not self.loop:
if counter >= self.count:
break
time.sleep(self.interval)
except Exception as ex:
print("Process get an exception, will exit, Exception info:", ex)
sys.exit(1)
if __name__ == '__main__':
tsg_diagnose_run = TsgDiagnoseRun()
tsg_diagnose_run.execute_suite_tsg_diagnose()