TSG-101:1、增加测试结果写入nezha 2、删除proxy policy 测试wrong host,pinning test,revoked三项 3、精简下载测试,删除文件下载一半url 4、修改本地下载文件测试日志路径

This commit is contained in:
fumingwei
2020-09-04 17:52:52 +08:00
parent 30f56b252b
commit 9c3d3bbbf7
3 changed files with 222 additions and 271 deletions

View File

@@ -75,10 +75,12 @@ services:
arp -i eth0 -s 192.0.2.130 02:42:c0:a8:fd:82 arp -i eth0 -s 192.0.2.130 02:42:c0:a8:fd:82
arp -i eth0 -s 192.0.2.131 02:42:C0:A8:FD:83 arp -i eth0 -s 192.0.2.131 02:42:C0:A8:FD:83
mkdir -p /root/result_tsg_diagnose/unittest mkdir -p /root/result_tsg_diagnose/unittest
mkdir -p /root/result_tsg_diagnose/con_traffic_inject mkdir -p /root/result_tsg_diagnose/conn_traffic_status
cp -rf /root/cafile_dict/certs/sets/current/gen/crt/ca-root.crt /usr/local/share/ca-certificates cp -rf /root/cafile_dict/certs/sets/current/gen/crt/ca-root.crt /usr/local/share/ca-certificates
update-ca-certificates update-ca-certificates
cat /root/unittest/badssl.test.hosts >> /etc/hosts cat /root/unittest/badssl.test.hosts >> /etc/hosts
echo '0 2 * * * /usr/local/bin/python /root/unittest/clear_file_timeout.py' > /etc/crontabs/root
echo '0 2 * * * /usr/local/bin/python /root/unittest/clear_file_timeout.py -d /root/result_tsg_diagnose/conn_traffic_status' > /etc/crontabs/root
crond crond
python /root/unittest/tsg_diagnose.py -l python /root/unittest/tsg_diagnose.py -l

View File

@@ -10,7 +10,7 @@ RUN sed -i s@/dl-cdn.alpinelinux.org/@/mirrors.ustc.edu.cn/@g /etc/apk/repositor
&& pip3 install pycurl \ && pip3 install pycurl \
&& pip3 install httpstat \ && pip3 install httpstat \
&& pip3 install CIUnitTest \ && pip3 install CIUnitTest \
&& echo '0 2 * * * /usr/local/bin/python /root/unittest/clear_file_timeout.py' > /etc/crontabs/root && pip3 install pytelegraf
WORKDIR /root/unittest WORKDIR /root/unittest

View File

@@ -9,17 +9,15 @@ from io import BytesIO
import getopt import getopt
import ciunittest import ciunittest
import argparse import argparse
from telegraf.client import TelegrafClient
URLBypass = 'https://sha384.badssl.self-test.geedge.net' URLBypass = 'https://sha384.badssl.self-test.geedge.net'
URLIntercept = 'https://sha256.badssl.self-test.geedge.net' URLIntercept = 'https://sha256.badssl.self-test.geedge.net'
URLSexpired = 'https://expired.badssl.self-test.geedge.net' URLSexpired = 'https://expired.badssl.self-test.geedge.net'
URLSwronghost = 'https://wrong.host.badssl.self-test.geedge.net'
URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net' URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net'
URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net' URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net'
URLSrevoked = 'https://revoked.badssl.self-test.geedge.net'
URLSpinningtest = 'https://pinning-test.badssl.self-test.geedge.net'
URLRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js' URLRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js' URLReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
@@ -27,42 +25,41 @@ URLInsert = 'https://cn.bing.com/?FORM=BEHPTB'
URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js' URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js' URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLdictConTrafficInject = { URLConTraffic_1k = "https://downloadfile.self-test.geedge.net/1k"
"0k":"https://downloadfile.self-test.geedge.net/0k", \ URLConTraffic_4k = "https://downloadfile.self-test.geedge.net/4k"
"1k":"https://downloadfile.self-test.geedge.net/1k", \ URLConTraffic_16k = "https://downloadfile.self-test.geedge.net/16k"
"2k":"https://downloadfile.self-test.geedge.net/2k", \ URLConTraffic_64k = "https://downloadfile.self-test.geedge.net/64k"
"4k":"https://downloadfile.self-test.geedge.net/4k", \ URLConTraffic_256k = "https://downloadfile.self-test.geedge.net/256k"
"8k":"https://downloadfile.self-test.geedge.net/8k", \ URLConTraffic_1M = "https://downloadfile.self-test.geedge.net/1M"
"16k":"https://downloadfile.self-test.geedge.net/16k", \ URLConTraffic_4M = "https://downloadfile.self-test.geedge.net/4M"
"32k":"https://downloadfile.self-test.geedge.net/32k", \ URLConTraffic_16M = "https://downloadfile.self-test.geedge.net/16M"
"64k":"https://downloadfile.self-test.geedge.net/64k", \ URLConTraffic_64M = "https://downloadfile.self-test.geedge.net/64M"
"128k":"https://downloadfile.self-test.geedge.net/128k", \
"256k":"https://downloadfile.self-test.geedge.net/256k", \
"512k":"https://downloadfile.self-test.geedge.net/512k", \
"1M":"https://downloadfile.self-test.geedge.net/1M", \
"2M":"https://downloadfile.self-test.geedge.net/2M", \
"4M":"https://downloadfile.self-test.geedge.net/4M", \
"8M":"https://downloadfile.self-test.geedge.net/8M", \
"16M":"https://downloadfile.self-test.geedge.net/16M", \
"32M":"https://downloadfile.self-test.geedge.net/32M", \
"64M":"https://downloadfile.self-test.geedge.net/64M"}
ssl_bypass_info_re = "Ssl connection bypass success" ssl_bypass_info_re = "Ssl connection bypass success"
ssl_intercept_info_re = "Ssl connection intercept success" ssl_intercept_info_re = "Ssl connection intercept success"
https_exprired_info_re = "https exprired ok" https_exprired_info_re = "https exprired ok"
https_wrong_host_info_re = "https wrong host ok"
https_self_signed_info_re = "https self signed ok" https_self_signed_info_re = "https self signed ok"
https_untrusted_root_info_re = "https untrusted_root ok" https_untrusted_root_info_re = "https untrusted_root ok"
https_revoked_info_re = "https revoked ok"
https_pinning_test_info_re = "https pinning-test ok"
http_redirect_info_re = "http connection redirect success" http_redirect_info_re = "http connection redirect success"
http_replace_info_re = "http connection replace success" http_replace_info_re = "http connection replace success"
http_insert_info_re = "http connection insert success" http_insert_info_re = "http connection insert success"
http_hijack_info_re = "http connection hijack success" http_hijack_info_re = "http connection hijack success"
http_block_info_re = "http connection block success" http_block_info_re = "http connection block success"
https_download_file_info_re = "http download file success"
https_conn_taffic_1k_re = 'https download file 1k success'
https_conn_taffic_4k_re = 'https download file 4k success'
https_conn_taffic_16k_re = 'https download file 16k success'
https_conn_taffic_64k_re = 'https download file 64k success'
https_conn_taffic_256k_re = 'https download file 256k success'
https_conn_taffic_1M_re = 'https download file 1M success'
https_conn_taffic_4M_re = 'https download file 4M success'
https_conn_taffic_16M_re = 'https download file 16M success'
https_conn_taffic_64M_re = 'https download file 64M success'
wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131'] wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131']
@@ -71,11 +68,11 @@ class SSLCheckRequestBuild:
self.conn = pycurl.Curl() self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.TIMEOUT, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def ssl_bypass(self): def ssl_bypass(self,conTimeout):
self.conn.setopt(self.conn.URL,URLBypass) self.conn.setopt(self.conn.URL,URLBypass)
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.perform() self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO) certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close() self.conn.close()
@@ -88,13 +85,14 @@ class SSLCheckRequestBuild:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0):
raise Exception(ssl_bypass_info_re) raise Exception(ssl_bypass_info_re)
elif re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0):
raise Exception("Error:Ssl connection is intercepted, not bypass") raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1])
else: else:
raise Exception("Error:Got other error certificate information, ssl connection's packages may loss") raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1])
def ssl_intercept(self): def ssl_intercept(self,conTimeout):
self.conn.setopt(self.conn.URL,URLIntercept) self.conn.setopt(self.conn.URL,URLIntercept)
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.perform() self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO) certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close() self.conn.close()
@@ -105,12 +103,13 @@ class SSLCheckRequestBuild:
break break
if len(issuer) <= 0: if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0): if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
raise Exception(ssl_intercept_info_re) if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
elif re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1])
raise Exception("Error: Ssl connection is bypass, not intercept") else:
raise Exception(ssl_intercept_info_re)
else: else:
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss") raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
class SslInterceptRequestBuild: class SslInterceptRequestBuild:
@@ -118,135 +117,43 @@ class SslInterceptRequestBuild:
self.conn = pycurl.Curl() self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.TIMEOUT, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def ssl_intercept_certerrExpired(self): def _conn_to_perform(self, pxy_info_re):
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception( pxy_info_re)
else:
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
def ssl_intercept_certerrExpired(self,conTimeout):
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.setopt(self.conn.URL, URLSexpired) self.conn.setopt(self.conn.URL, URLSexpired)
self.conn.perform() self._conn_to_perform(https_exprired_info_re)
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(https_exprired_info_re)
else:
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
else:
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
def ssl_intercept_certerrSelf_signed(self,conTimeout):
def ssl_intercept_certerrWrong_host(self):
self.conn.setopt(self.conn.URL,URLSwronghost)
self.conn.setopt(self.conn.SSL_VERIFYHOST, False)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(https_wrong_host_info_re)
else:
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
else:
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
def ssl_intercept_certerrSelf_signed(self):
self.conn.setopt(self.conn.URL,URLSselfsigned) self.conn.setopt(self.conn.URL,URLSselfsigned)
self.conn.perform() self.conn.setopt(self.conn.TIMEOUT,conTimeout)
certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self._conn_to_perform(https_self_signed_info_re)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(https_self_signed_info_re)
else:
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
else:
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
def ssl_intercept_certerrUntrusted_root(self): def ssl_intercept_certerrUntrusted_root(self,conTimeout):
self.conn.setopt(self.conn.URL,URLSuntrustedroot) self.conn.setopt(self.conn.URL,URLSuntrustedroot)
self.conn.perform() self.conn.setopt(self.conn.TIMEOUT,conTimeout)
certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self._conn_to_perform(https_untrusted_root_info_re)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(https_untrusted_root_info_re)
else:
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
else:
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
def ssl_intercept_certerrRevoked(self):
self.conn.setopt(self.conn.URL,URLSrevoked)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(https_revoked_info_re)
else:
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
else:
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
def ssl_intercept_certerrPinning_test(self):
self.conn.setopt(self.conn.URL,URLSpinningtest)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(https_pinning_test_info_re)
else:
raise Exception("Error: get error certificate, Possible tsg certificate verification error")
else:
raise Exception("Error: Got other error certificate information, ssl connection's packages may loss")
class SslHttpRequestBuild: class SslHttpRequestBuild:
def __init__(self): def __init__(self):
@@ -256,10 +163,10 @@ class SslHttpRequestBuild:
self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.ENCODING, "gzip,deflate") self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve) self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve)
self.conn.setopt(self.conn.TIMEOUT, 1)
def http_redirect(self): def http_redirect(self,conTimeout):
self.conn.setopt(self.conn.URL, URLRedirect) self.conn.setopt(self.conn.URL, URLRedirect)
self.conn.setopt(self.conn.TIMEOUT, conTimeout)
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.perform() self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
@@ -269,10 +176,10 @@ class SslHttpRequestBuild:
else: else:
raise Exception("Error:Http connection redirect fail") raise Exception("Error:Http connection redirect fail")
def http_replace(self): def http_replace(self,conTimeout):
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
self.conn.setopt(self.conn.URL, URLReplace) self.conn.setopt(self.conn.URL, URLReplace)
resCode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.perform() self.conn.perform()
body = self.bodyBuf.getvalue().decode('utf-8') body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close() self.conn.close()
@@ -282,10 +189,10 @@ class SslHttpRequestBuild:
else: else:
raise Exception("Error:Http connection replace fail") raise Exception("Error:Http connection replace fail")
def http_insert(self): def http_insert(self,conTimeout):
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
self.conn.setopt(self.conn.URL, URLInsert) self.conn.setopt(self.conn.URL, URLInsert)
resCode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.perform() self.conn.perform()
body = self.bodyBuf.getvalue().decode('utf-8') body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close() self.conn.close()
@@ -295,8 +202,9 @@ class SslHttpRequestBuild:
else: else:
raise Exception("Error:Http connection insert fail") raise Exception("Error:Http connection insert fail")
def http_block(self): def http_block(self,conTimeout):
self.conn.setopt(self.conn.URL, URLBlock) self.conn.setopt(self.conn.URL, URLBlock)
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write)
self.conn.perform() self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
@@ -307,8 +215,8 @@ class SslHttpRequestBuild:
else: else:
raise Exception("Error:http connection block fail") raise Exception("Error:http connection block fail")
def http_hijack(self): def http_hijack(self,conTimeout):
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.setopt(self.conn.URL, URLHijack) self.conn.setopt(self.conn.URL, URLHijack)
self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write)
self.conn.perform() self.conn.perform()
@@ -333,12 +241,14 @@ class SslHttpRequestBuild:
class SSLFileDownloadBuild: class SSLFileDownloadBuild:
def __init__(self): def __init__(self):
self.sizeList = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"] self.conn = pycurl.Curl()
self.resultList = [] self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.isException = False self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
def build_conninfo_json(self,conn): def _get_conninfo(self,conn):
dictconninfo = {} dictconninfo = {}
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
@@ -348,74 +258,58 @@ class SSLFileDownloadBuild:
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE)
dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE)
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
return dictconninfo return dictconninfo
def get_value_from_succ_conn(self,urlkey,url,conn): def _write_in_nezha(self, sizeStr, connInfoDict):
dictinfo = {} nzdict = {}
dictinfo["downloadsize"] = urlkey nzname = 'conn_taffic_status_size_' + sizeStr
dictinfo["url"] = url dictKeyTime = "conn_traffic_" + sizeStr + "_size_total_time"
dictinfo["time"] = time.asctime( time.localtime(time.time())) dcitKeyStatus = "conn_traffic_" + sizeStr + "_size_status"
dictinfo["result"] = self.build_conninfo_json(conn) nzdict[dictKeyTime] = connInfoDict['total_time']
self.resultList.append(dictinfo) nzdict[dcitKeyStatus] = connInfoDict['status']
self.client.metric(nzname, nzdict)
def conn_filedownload(self,urlkey,url): def _write_in_logfile(self, sizeStr, connInfoDict):
issuer = () logNewestPath = "/root/result_tsg_diagnose/conn_traffic_status/conn_traffic_status_" + sizeStr
conn = pycurl.Curl() logPath = logNewestPath + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
errdict = {} connInfoStr = json.dumps(connInfoDict)
conn.setopt(conn.WRITEFUNCTION, BytesIO().write)
conn.setopt(conn.SSL_VERIFYPEER, False)
conn.setopt(conn.OPT_CERTINFO, 1)
conn.setopt(conn.TIMEOUT, 1)
conn.setopt(conn.URL,url)
conn.perform()
certs = conn.getinfo(conn.INFO_CERTINFO)
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
errdict["status"] = "error"
errdict["errinfo"] = "Get certificate info error"
errdict["url"] = url
errdict["time"] = time.asctime( time.localtime(time.time()))
self.resultList.append(errdict)
self.isException = True
elif not re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0):
errdict["status"] = "error"
errdict["errinfo"] = "Intercept fail: no Tango cert"
errdict["url"] = url
errdict["time"] = time.asctime( time.localtime(time.time()))
self.resultList.append(errdict)
self.isException = True
else:
self.get_value_from_succ_conn(urlkey,url,conn)
conn.close()
def write_log(self):
logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime())
logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest"
with open(logNewestPath,"w+") as f: with open(logNewestPath,"w+") as f:
f.write(json.dumps(self.resultList, sort_keys=True, indent=4, separators=(',', ': '))) f.write(connInfoStr)
f.close() f.close()
with open(logpath,"a+") as f:
f.write(json.dumps(self.resultList))
f.write("\n")
f.close()
def downfile_run(self): with open(logPath,"w+") as f:
for sizefield in self.sizeList: fn = open(logNewestPath,'r')
self.conn_filedownload(sizefield,URLdictConTrafficInject[sizefield]) f.write(fn.read())
self.write_log() fn.close()
if self.isException == True: f.close()
raise Exception("Error:http_hijack download file fail")
else:
raise Exception(https_download_file_info_re)
def conn_traffic(self,URL,conn_taffic_re, sizeStr, size,conTimeout):
self.conn.setopt(self.conn.TIMEOUT,conTimeout)
self.conn.setopt(self.conn.URL,URL)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
conninfo = self._get_conninfo(self.conn)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if not re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Intercept fail: no Tango cert")
if int(conninfo["size_download"]) == size:
self._write_in_nezha(sizeStr,conninfo)
self._write_in_logfile(sizeStr,conninfo)
raise Exception(conn_taffic_re)
else:
raise Exception("Error: connection tarffic size error and is no equal", sizeStr)
class SslUnitTest(unittest.TestCase): class SslUnitTest(unittest.TestCase):
@@ -423,73 +317,97 @@ class SslUnitTest(unittest.TestCase):
def test_securityPolicy_bypass(self): def test_securityPolicy_bypass(self):
sslHandler = SSLCheckRequestBuild() sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_bypass_info_re): with self.assertRaisesRegex(Exception, ssl_bypass_info_re):
sslHandler.ssl_bypass() sslHandler.ssl_bypass(1)
def test_securityPolicy_intercept(self): def test_securityPolicy_intercept(self):
sslHandler = SSLCheckRequestBuild() sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_intercept_info_re): with self.assertRaisesRegex(Exception, ssl_intercept_info_re):
sslHandler.ssl_intercept() sslHandler.ssl_intercept(1)
def test_securityPolicy_intercept_certerrExpired(self): def test_securityPolicy_intercept_certerrExpired(self):
requestHandler = SslInterceptRequestBuild() requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_exprired_info_re): with self.assertRaisesRegex(Exception, https_exprired_info_re):
requestHandler.ssl_intercept_certerrExpired() requestHandler.ssl_intercept_certerrExpired(1)
def test_securityPolicy_intercept_certerrWrong_host(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_wrong_host_info_re):
requestHandler.ssl_intercept_certerrWrong_host()
def test_securityPolicy_intercept_certerrSelf_signed(self): def test_securityPolicy_intercept_certerrSelf_signed(self):
requestHandler = SslInterceptRequestBuild() requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_self_signed_info_re): with self.assertRaisesRegex(Exception, https_self_signed_info_re):
requestHandler.ssl_intercept_certerrSelf_signed() requestHandler.ssl_intercept_certerrSelf_signed(1)
def test_securityPolicy_intercept_certerrUntrusted_root(self): def test_securityPolicy_intercept_certerrUntrusted_root(self):
requestHandler = SslInterceptRequestBuild() requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_untrusted_root_info_re): with self.assertRaisesRegex(Exception, https_untrusted_root_info_re):
requestHandler.ssl_intercept_certerrUntrusted_root() requestHandler.ssl_intercept_certerrUntrusted_root(1)
def test_securityPolicy_intercept_certerrRevoked(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_revoked_info_re):
requestHandler.ssl_intercept_certerrRevoked()
def test_securityPolicy_intercept_certerrPinning_test(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_pinning_test_info_re):
requestHandler.ssl_intercept_certerrPinning_test()
def test_proxyPolicy_redirect(self): def test_proxyPolicy_redirect(self):
httpHandler = SslHttpRequestBuild() httpHandler = SslHttpRequestBuild()
with self.assertRaisesRegex(Exception, http_redirect_info_re): with self.assertRaisesRegex(Exception, http_redirect_info_re):
httpHandler.http_redirect() httpHandler.http_redirect(2)
def test_proxyPolicy_block(self): def test_proxyPolicy_block(self):
httpHandler = SslHttpRequestBuild() httpHandler = SslHttpRequestBuild()
with self.assertRaisesRegex(Exception, http_block_info_re): with self.assertRaisesRegex(Exception, http_block_info_re):
httpHandler.http_block() httpHandler.http_block(2)
def test_proxyPolicy_replace(self): def test_proxyPolicy_replace(self):
httpHandler = SslHttpRequestBuild() httpHandler = SslHttpRequestBuild()
with self.assertRaisesRegex(Exception, http_replace_info_re): with self.assertRaisesRegex(Exception, http_replace_info_re):
httpHandler.http_replace() httpHandler.http_replace(2)
def test_proxyPolicy_hijack(self): def test_proxyPolicy_hijack(self):
httpHandler = SslHttpRequestBuild() httpHandler = SslHttpRequestBuild()
with self.assertRaisesRegex(Exception, http_hijack_info_re): with self.assertRaisesRegex(Exception, http_hijack_info_re):
httpHandler.http_hijack() httpHandler.http_hijack(2)
def test_proxyPolicy_insert(self): def test_proxyPolicy_insert(self):
httpHandler = SslHttpRequestBuild() httpHandler = SslHttpRequestBuild()
with self.assertRaisesRegex(Exception, http_insert_info_re): with self.assertRaisesRegex(Exception, http_insert_info_re):
httpHandler.http_insert() httpHandler.http_insert(2)
def test_securityPolicy_con_traffic_inject(self): def test_https_con_traffic_1k(self):
requestHandler = SSLFileDownloadBuild() requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_download_file_info_re): with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re):
requestHandler.downfile_run() requestHandler.conn_traffic( URLConTraffic_1k,https_conn_taffic_1k_re, '1k', 1024,1)
def test_https_con_traffic_4k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re):
requestHandler.conn_traffic( URLConTraffic_4k,https_conn_taffic_4k_re, '4k', 4*1024,1)
def test_https_con_traffic_16k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re):
requestHandler.conn_traffic( URLConTraffic_16k,https_conn_taffic_16k_re, '16k', 16*1024,1)
def test_https_con_traffic_64k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re):
requestHandler.conn_traffic( URLConTraffic_64k,https_conn_taffic_64k_re, '64k', 64*1024,1)
def test_https_con_traffic_256k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re):
requestHandler.conn_traffic( URLConTraffic_256k,https_conn_taffic_256k_re, '256k', 256*1024,2)
def test_https_con_traffic_1M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re):
requestHandler.conn_traffic( URLConTraffic_1M,https_conn_taffic_1M_re, '1M', 1024 * 1024,2)
def test_https_con_traffic_4M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re):
requestHandler.conn_traffic( URLConTraffic_4M,https_conn_taffic_4M_re, '4M', 4*1024*1024,2)
def test_https_con_traffic_16M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re):
requestHandler.conn_traffic( URLConTraffic_16M,https_conn_taffic_16M_re, '16M',16*1024*1024,4)
def test_https_con_traffic_64M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re):
requestHandler.conn_traffic( URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024,4)
class TsgDiagnoseRun: class TsgDiagnoseRun:
def __init__(self): def __init__(self):
@@ -498,13 +416,14 @@ class TsgDiagnoseRun:
self.write = None self.write = None
self.loop = False self.loop = False
self.count = 1 self.count = 1
self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
def _get_suite_option(self): def _get_suite_option(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help") parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help")
parser.add_argument('-i','--interval', type = int, default = 1,help='Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose.') parser.add_argument('-i','--interval', type = int, default = 1,help='Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose.')
parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535') parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535')
parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: jsontxt, the default is txt.') parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: jsontxt, the default is txt.')
parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file. Specifies the output file name.') parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file or NEZHA. Specifies the output file name or NEZHA.')
parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal') parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal')
args = parser.parse_args() args = parser.parse_args()
self.interval = args.interval self.interval = args.interval
@@ -529,24 +448,28 @@ class TsgDiagnoseRun:
self.suite.addTest(SslUnitTest('test_securityPolicy_bypass')) self.suite.addTest(SslUnitTest('test_securityPolicy_bypass'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked'))
self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_redirect')) self.suite.addTest(SslUnitTest('test_proxyPolicy_redirect'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_block')) self.suite.addTest(SslUnitTest('test_proxyPolicy_block'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_replace')) self.suite.addTest(SslUnitTest('test_proxyPolicy_replace'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_hijack')) self.suite.addTest(SslUnitTest('test_proxyPolicy_hijack'))
self.suite.addTest(SslUnitTest('test_proxyPolicy_insert')) self.suite.addTest(SslUnitTest('test_proxyPolicy_insert'))
self.suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject')) self.suite.addTest(SslUnitTest('test_https_con_traffic_1k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_4k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_16k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_64k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_256k'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_1M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_4M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_16M'))
self.suite.addTest(SslUnitTest('test_https_con_traffic_64M'))
if self.format == 'json': if self.format == 'json':
self.suite = None self.suite = None
def _write_suite_result_into_file(self):
def _write_suite_result(self): resultDict = '/root/result_tsg_diagnose/unittest/'
resultDict = '/root/result_self_test/unittest/'
resultNewestPath = resultDict + self.write resultNewestPath = resultDict + self.write
resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
if self.format == 'txt': if self.format == 'txt':
@@ -568,6 +491,30 @@ class TsgDiagnoseRun:
fn.close() fn.close()
f.close() f.close()
def _write_suite_result_into_NEZHA(self):
nzdict = {}
self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest)
result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=False)
result_dict = json.loads(result_json)
reuslt_list = result_dict['results']
succsum = 0
failsum = 0
for reuslt in reuslt_list:
succkey = reuslt['name'].split()[0] + '_succ'
nzdict[succkey] = 0
if reuslt['type'] == 'success':
nzdict[succkey] = 1
succsum = succsum + 1
if reuslt['type'] == 'failure':
failsum = failsum + 1
nzdict['succsum'] = succsum
self.client.metric('tsg_diagnose_result', nzdict)
result_dict['succsum'] = succsum
result_dict['failsum'] = succsum
result_stdout = json.dumps(result_dict)
print(result_stdout)
def _stdout_suite_result(self): def _stdout_suite_result(self):
print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s')) print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s'))
if self.format == 'txt': if self.format == 'txt':
@@ -580,8 +527,10 @@ class TsgDiagnoseRun:
print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s')) print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s'))
def _output_suite_result(self): def _output_suite_result(self):
if self.write: if self.write and self.write != 'NEZHA':
self._write_suite_result() self._write_suite_result_into_file()
elif self.write == 'NEZHA':
self._write_suite_result_into_NEZHA()
else: else:
self._stdout_suite_result() self._stdout_suite_result()