This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-tsg-diagnose/unittest_python/unittest/tsg_diagnose.py

702 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import unittest
import json
import pycurl
import os
import re
import time
from io import BytesIO
import getopt
import ciunittest
import argparse
from telegraf.client import TelegrafClient
import hashlib
from configparser import ConfigParser
import random
suite_test_config_dict = {'test_securityPolicy_bypass': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_securityPolicy_intercept': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept_certerrExpired': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_securityPolicy_intercept_certerrSelf_signed': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept_certerrUntrusted_root': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_proxyPolicy_ssl_redirect': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_proxyPolicy_ssl_replace': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_hijack': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_proxyPolicy_ssl_insert': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_redirect': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_proxyPolicy_http_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_replace': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_proxyPolicy_http_hijack': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_insert': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_https_con_traffic_1k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_4k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_https_con_traffic_16k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_64k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_https_con_traffic_256k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_1M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_https_con_traffic_4M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_16M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},'test_https_con_traffic_64M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'start_time_random_delay_range': {'enabled':1,'left_edge':1,'right_edge':30},'telegraf': {'host':'192.51.100.1','port':8100,'tags_key':'app_name','tags_value' :'tsg-diagnose'}}
ssl_bypass_info_re = "Ssl connection bypass success"
ssl_intercept_info_re = "Ssl connection intercept success"
https_exprired_info_re = "Ssl exprired cert check success"
https_self_signed_info_re = "Ssl self signed cert check success"
https_untrusted_root_info_re = "Ssl untrusted_root cert check success"
ssl_redirect_info_re = "Ssl connection redirect success"
ssl_replace_info_re = "Ssl connection replace success"
ssl_insert_info_re = "Ssl connection insert success"
ssl_hijack_info_re = "Ssl connection hijack success"
ssl_block_info_re = "Ssl connection block success"
http_redirect_info_re = "http connection redirect success"
http_replace_info_re = "http connection replace success"
http_insert_info_re = "http connection insert success"
http_hijack_info_re = "http connection hijack success"
http_block_info_re = "http connection block success"
https_conn_taffic_1k_re = 'https download file 1k success'
https_conn_taffic_4k_re = 'https download file 4k success'
https_conn_taffic_16k_re = 'https download file 16k success'
https_conn_taffic_64k_re = 'https download file 64k success'
https_conn_taffic_256k_re = 'https download file 256k success'
https_conn_taffic_1M_re = 'https download file 1M success'
https_conn_taffic_4M_re = 'https download file 4M success'
https_conn_taffic_16M_re = 'https download file 16M success'
https_conn_taffic_64M_re = 'https download file 64M success'
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
URLIntercept = 'https://sha256.badssl.self-test.geedge.net'
URLSslExpired = 'https://expired.badssl.self-test.geedge.net'
URLSslSelfsigned = 'https://self-signed.badssl.self-test.geedge.net'
URLSslSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net'
URLSslRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLSslReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLSslInsert = 'https://cn.bing.com/?FORM=BEHPTB'
URLSslHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLSslBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLHttpRedirect = 'http://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLHttpReplace = 'http://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLHttpInsert = 'http://cn.bing.com/?FORM=BEHPTB'
URLHttpHijack = 'http://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLHttpBlock = 'http://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLConTraffic_1k = "https://downloadfile.self-test.geedge.net/1k"
URLConTraffic_4k = "https://downloadfile.self-test.geedge.net/4k"
URLConTraffic_16k = "https://downloadfile.self-test.geedge.net/16k"
URLConTraffic_64k = "https://downloadfile.self-test.geedge.net/64k"
URLConTraffic_256k = "https://downloadfile.self-test.geedge.net/256k"
URLConTraffic_1M = "https://downloadfile.self-test.geedge.net/1M"
URLConTraffic_4M = "https://downloadfile.self-test.geedge.net/4M"
URLConTraffic_16M = "https://downloadfile.self-test.geedge.net/16M"
URLConTraffic_64M = "https://downloadfile.self-test.geedge.net/64M"
class SSLCheckRequestBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _get_conn_issuer(self,certs):
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
return issuer
def ssl_bypass(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLBypass)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = self._get_conn_issuer(certs)
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0):
raise Exception(ssl_bypass_info_re)
elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0):
raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1])
else:
raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1])
def ssl_intercept(self,test_suite_name):
self._set_conn_opt(test_suite_name, URLIntercept)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = self._get_conn_issuer(certs)
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1])
else:
raise Exception(ssl_intercept_info_re)
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
class SslInterceptRequestBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name,url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL, url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _conn_to_perform(self, test_suite_name, sec_info_re):
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(sec_info_re)
else:
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
def ssl_intercept_certerrExpired(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslExpired)
self._conn_to_perform(test_suite_name,https_exprired_info_re)
def ssl_intercept_certerrSelf_signed(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslSelfsigned)
self._conn_to_perform(test_suite_name,https_self_signed_info_re)
def ssl_intercept_certerrUntrusted_root(self,test_suite_name,):
self._set_conn_opt(test_suite_name,URLSslSuntrustedroot)
self._conn_to_perform(test_suite_name,https_untrusted_root_info_re)
class ProxyRequestBuild:
def __init__(self):
self.bodyBuf = BytesIO()
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
def _cert_verify(self, certs, isSsl):
if isSsl == True:
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
return
else:
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
def _set_conn_opt(self,test_suite_name, url,isSsl):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
if isSsl == True:
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def proxy_redirect(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
self._cert_verify(certs, isSsl)
if rescode == 301 or rescode == 302:
if isSsl == True:
raise Exception(ssl_redirect_info_re)
else:
raise Exception(http_redirect_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode)
def proxy_replace(self,test_suite_name, proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name, proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close()
self._cert_verify(certs, isSsl)
if not re.search(r'EnglishSearchShared', body, 0) and \
re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0):
if isSsl == True:
raise Exception(ssl_replace_info_re)
else:
raise Exception(http_replace_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection replace fail")
else:
raise Exception("Error:Http connection replace fail")
def proxy_insert(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
body = self.bodyBuf.getvalue().decode('utf-8')
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
self._cert_verify(certs, isSsl)
if re.search(r'httpSelfcheckInsert', body, 0) and \
re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0):
if isSsl == True:
raise Exception(ssl_insert_info_re)
else:
raise Exception(http_insert_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection insert fail")
else:
raise Exception("Error:Http connection insert fail")
def proxy_block(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close()
self._cert_verify(certs, isSsl)
if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451):
if isSsl == True:
raise Exception(ssl_block_info_re)
else:
raise Exception(http_block_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode)
def proxy_hijack(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
self._cert_verify(certs, isSsl)
hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue())
if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0):
if isSsl == True:
raise Exception(ssl_hijack_info_re)
else:
raise Exception(http_hijack_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection hijack fail")
else:
raise Exception("Error:Http connection hijack fail")
class SSLFileDownloadBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
#self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
self.client = TelegrafClient(host=str(suite_test_config_dict['telegraf']['host']), port=int(suite_test_config_dict['telegraf']['port']),tags={str(suite_test_config_dict['telegraf']['tags_key']):str(suite_test_config_dict['telegraf']['tags_value'])})
def _get_conninfo(self,conn):
dictconninfo = {}
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME)
dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME)
dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME)
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
return dictconninfo
def _set_conn_opt(self,test_suite_name,url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _write_in_nezha(self, sizeStr, connInfoDict):
nzdict = {}
nzname = 'conn_taffic_status_size_' + sizeStr
dictKeyTime = "conn_traffic_" + sizeStr + "_size_total_time"
dcitKeyStatus = "conn_traffic_" + sizeStr + "_size_status"
nzdict[dictKeyTime] = connInfoDict['total_time']
nzdict[dcitKeyStatus] = connInfoDict['status']
self.client.metric(nzname, nzdict)
def _write_in_logfile(self, sizeStr, connInfoDict):
logNewestPath = "/root/result_tsg_diagnose/conn_traffic_status/conn_traffic_status_" + sizeStr
logPath = logNewestPath + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
connInfoStr = json.dumps(connInfoDict)
with open(logNewestPath,"w+") as f:
f.write(connInfoStr)
f.close()
with open(logPath,"w+") as f:
fn = open(logNewestPath,'r')
f.write(fn.read())
fn.close()
f.close()
def conn_traffic(self,test_suite_name,url,conn_taffic_re,sizeStr, size):
self._set_conn_opt(test_suite_name, url)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
conninfo = self._get_conninfo(self.conn)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if not re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Intercept fail: no Tango cert,cert info:%s" % issuer[1])
if int(conninfo["size_download"]) == size:
self._write_in_nezha(sizeStr,conninfo)
self._write_in_logfile(sizeStr,conninfo)
raise Exception(conn_taffic_re)
else:
raise Exception("Error: connection tarffic size error and is no equal", sizeStr)
class SslUnitTest(unittest.TestCase):
def test_securityPolicy_bypass(self):
sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_bypass_info_re):
sslHandler.ssl_bypass('test_securityPolicy_bypass')
def test_securityPolicy_intercept(self):
sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_intercept_info_re):
sslHandler.ssl_intercept('test_securityPolicy_intercept')
def test_securityPolicy_intercept_certerrExpired(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_exprired_info_re):
requestHandler.ssl_intercept_certerrExpired('test_securityPolicy_intercept_certerrExpired')
def test_securityPolicy_intercept_certerrSelf_signed(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_self_signed_info_re):
requestHandler.ssl_intercept_certerrSelf_signed('test_securityPolicy_intercept_certerrSelf_signed')
def test_securityPolicy_intercept_certerrUntrusted_root(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_untrusted_root_info_re):
requestHandler.ssl_intercept_certerrUntrusted_root('test_securityPolicy_intercept_certerrUntrusted_root')
def test_proxyPolicy_ssl_redirect(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_redirect_info_re):
proxyHandler.proxy_redirect('test_proxyPolicy_ssl_redirect',URLSslRedirect,True)
def test_proxyPolicy_ssl_block(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_block_info_re):
proxyHandler.proxy_block('test_proxyPolicy_ssl_block', URLSslBlock,True)
def test_proxyPolicy_ssl_replace(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_replace_info_re):
proxyHandler.proxy_replace('test_proxyPolicy_ssl_replace',URLSslReplace, True)
def test_proxyPolicy_ssl_hijack(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_hijack_info_re):
proxyHandler.proxy_hijack('test_proxyPolicy_ssl_hijack', URLSslHijack,True)
def test_proxyPolicy_ssl_insert(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_insert_info_re):
proxyHandler.proxy_insert('test_proxyPolicy_ssl_insert',URLSslInsert,True)
def test_proxyPolicy_http_redirect(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_redirect_info_re):
proxyHandler.proxy_redirect('test_proxyPolicy_http_redirect',URLHttpRedirect, False)
def test_proxyPolicy_http_block(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_block_info_re):
proxyHandler.proxy_block('test_proxyPolicy_http_block', URLHttpBlock,False)
def test_proxyPolicy_http_replace(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_replace_info_re):
proxyHandler.proxy_replace('test_proxyPolicy_http_replace',URLHttpReplace, False)
def test_proxyPolicy_http_hijack(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_hijack_info_re):
proxyHandler.proxy_hijack('test_proxyPolicy_http_hijack', URLHttpHijack,False)
def test_proxyPolicy_http_insert(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_insert_info_re):
proxyHandler.proxy_insert('test_proxyPolicy_http_insert',URLHttpInsert,False)
def test_https_con_traffic_1k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_1k', URLConTraffic_1k, https_conn_taffic_1k_re,'1k', 1024)
def test_https_con_traffic_4k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_4k',URLConTraffic_4k, https_conn_taffic_4k_re, '4k', 4*1024)
def test_https_con_traffic_16k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_16k', URLConTraffic_16k, https_conn_taffic_16k_re,'16k', 16*1024)
def test_https_con_traffic_64k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_64k',URLConTraffic_64k, https_conn_taffic_64k_re, '64k', 64*1024)
def test_https_con_traffic_256k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_256k', URLConTraffic_256k,https_conn_taffic_256k_re,'256k', 256*1024)
def test_https_con_traffic_1M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_1M', URLConTraffic_1M, https_conn_taffic_1M_re, '1M', 1024 * 1024)
def test_https_con_traffic_4M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_4M', URLConTraffic_4M, https_conn_taffic_4M_re,'4M', 4*1024*1024)
def test_https_con_traffic_16M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_16M', URLConTraffic_16M,https_conn_taffic_16M_re,'16M',16*1024*1024)
def test_https_con_traffic_64M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_64M',URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024)
class TsgDiagnoseRun:
def __init__(self):
self.interval = 1
self.format = "txt"
self.write = None
self.loop = False
self.count = 1
self.config = None
self.client = None
self.config_dict = {}
def _get_suite_option(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help")
parser.add_argument('-i','--interval', type = int, default = 30,help='Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose.')
parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535')
parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: jsontxt, the default is txt.')
parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file or NEZHA. Specifies the output file name or NEZHA.')
parser.add_argument('-p','--configpath', type = str, default = '/root/unittest/etc/tsg-diagnose.config',help='Specifies the config file, default /root/unittest/etc/tsg-diagnose.config')
parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal')
args = parser.parse_args()
self.interval = args.interval
self.format = args.format
self.write = args.write
self.loop = args.loop
self.count = args.count
self.config = args.configpath
if self.count == 0:
print("Error: bad number of tsg diagnose and will exit")
parser.print_help()
sys.exit(1)
if self.format not in ('json', 'txt'):
print("Error: bad output format of tsg diagnose and will exit")
parser.print_help()
sys.exit(1)
def _set_telegraf(self):
# self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
self.client = TelegrafClient(host=str(self.config_dict['telegraf']['host']), port=int(self.config_dict['telegraf']['port']),tags={str(self.config_dict['telegraf']['tags_key']):str(self.config_dict['telegraf']['tags_value'])})
def _get_suite_config(self):
global suite_test_config_dict
config = ConfigParser()
config.read(self.config, encoding='UTF-8')
for section in config.sections():
if section in suite_test_config_dict:
suite_test_config_dict[section].update(config.items(section))
else:
suite_test_config_dict[section] = dict(config.items(section))
self.config_dict = suite_test_config_dict
def _add_suite(self,test_suite_name):
if int(self.config_dict[test_suite_name]['enabled']) == 1:
self.suite.addTest(SslUnitTest(test_suite_name))
def _init_suite(self):
self.suite = unittest.TestSuite()
self.suite._cleanup = False
self._add_suite('test_securityPolicy_bypass')
self._add_suite('test_securityPolicy_intercept')
self._add_suite('test_securityPolicy_intercept_certerrExpired')
self._add_suite('test_securityPolicy_intercept_certerrSelf_signed')
self._add_suite('test_securityPolicy_intercept_certerrUntrusted_root')
self._add_suite('test_proxyPolicy_ssl_redirect')
self._add_suite('test_proxyPolicy_ssl_block')
self._add_suite('test_proxyPolicy_ssl_replace')
self._add_suite('test_proxyPolicy_ssl_hijack')
self._add_suite('test_proxyPolicy_ssl_insert')
self._add_suite('test_proxyPolicy_http_redirect')
self._add_suite('test_proxyPolicy_http_block')
self._add_suite('test_proxyPolicy_http_replace')
self._add_suite('test_proxyPolicy_http_hijack')
self._add_suite('test_proxyPolicy_http_insert')
self._add_suite('test_https_con_traffic_1k')
self._add_suite('test_https_con_traffic_4k')
self._add_suite('test_https_con_traffic_16k')
self._add_suite('test_https_con_traffic_64k')
self._add_suite('test_https_con_traffic_256k')
self._add_suite('test_https_con_traffic_1M')
self._add_suite('test_https_con_traffic_4M')
self._add_suite('test_https_con_traffic_16M')
self._add_suite('test_https_con_traffic_64M')
def _write_suite_result_into_file(self):
resultDict = '/root/result_tsg_diagnose/unittest/'
resultNewestPath = resultDict + self.write
resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
if self.format == 'txt':
with open(resultNewestPath,"w+") as f:
runner = unittest.TextTestRunner(stream=f,verbosity=2)
runner.run(self.suite)
f.close()
if self.format == 'json':
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True)
with open(resultNewestPath,"w+") as f:
f.write(result_json)
f.close()
with open(resultPath,"w+") as f:
fn = open(resultNewestPath,'r')
f.write(fn.read())
fn.close()
f.close()
def _write_suite_result_into_NEZHA(self):
nzdict = {}
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=False)
result_dict = json.loads(result_json)
reuslt_list = result_dict['results']
succsum = 0
failsum = 0
for reuslt in reuslt_list:
succkey = reuslt['name'].split()[0] + '_succ'
nzdict[succkey] = 0
if reuslt['type'] == 'success':
nzdict[succkey] = 1
succsum = succsum + 1
if reuslt['type'] == 'failure':
failsum = failsum + 1
nzdict['succsum'] = succsum
self.client.metric('tsg_diagnose_result', nzdict)
result_dict['succsum'] = succsum
result_dict['failsum'] = succsum
result_stdout = json.dumps(result_dict)
print(result_stdout)
def _stdout_suite_result(self):
print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s'))
if self.format == 'txt':
runner = unittest.TextTestRunner(verbosity=2)
runner.run(self.suite)
if self.format == 'json':
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=True)
print(result_json)
print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s'))
def _output_suite_result(self):
if self.write and self.write != 'NEZHA':
self._write_suite_result_into_file()
elif self.write == 'NEZHA':
self._write_suite_result_into_NEZHA()
else:
self._stdout_suite_result()
def execute_suite_tsg_diagnose(self):
self._get_suite_option()
self._get_suite_config()
self._set_telegraf()
self._init_suite()
try:
if int(self.config_dict['start_time_random_delay_range']['enabled']) == 1:
time.sleep(random.randint(int(self.config_dict['start_time_random_delay_range']['left_edge']),int(self.config_dict['start_time_random_delay_range']['right_edge'])))
counter = 0
print("Tsg diagnose run sum: %d" % self.count)
while True:
print("\nRUN %d" %(counter + 1))
self._output_suite_result()
counter = counter + 1
if not self.loop:
if counter >= self.count:
break
time.sleep(self.interval)
except Exception as ex:
print("Process get an exception, will exit, Exception info:", ex)
sys.exit(1)
if __name__ == '__main__':
tsg_diagnose_run = TsgDiagnoseRun()
tsg_diagnose_run.execute_suite_tsg_diagnose()