diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index fc74d7b..43244fd 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -75,10 +75,12 @@ services: arp -i eth0 -s 192.0.2.130 02:42:c0:a8:fd:82 arp -i eth0 -s 192.0.2.131 02:42:C0:A8:FD:83 mkdir -p /root/result_tsg_diagnose/unittest - mkdir -p /root/result_tsg_diagnose/con_traffic_inject + mkdir -p /root/result_tsg_diagnose/conn_traffic_status cp -rf /root/cafile_dict/certs/sets/current/gen/crt/ca-root.crt /usr/local/share/ca-certificates update-ca-certificates cat /root/unittest/badssl.test.hosts >> /etc/hosts + echo '0 2 * * * /usr/local/bin/python /root/unittest/clear_file_timeout.py' > /etc/crontabs/root + echo '0 2 * * * /usr/local/bin/python /root/unittest/clear_file_timeout.py -d /root/result_tsg_diagnose/conn_traffic_status' > /etc/crontabs/root crond python /root/unittest/tsg_diagnose.py -l diff --git a/unittest_python/Dockerfile b/unittest_python/Dockerfile index dc590de..2e4bb45 100644 --- a/unittest_python/Dockerfile +++ b/unittest_python/Dockerfile @@ -10,7 +10,7 @@ RUN sed -i s@/dl-cdn.alpinelinux.org/@/mirrors.ustc.edu.cn/@g /etc/apk/repositor && pip3 install pycurl \ && pip3 install httpstat \ && pip3 install CIUnitTest \ - && echo '0 2 * * * /usr/local/bin/python /root/unittest/clear_file_timeout.py' > /etc/crontabs/root + && pip3 install pytelegraf WORKDIR /root/unittest diff --git a/unittest_python/unittest/tsg_diagnose.py b/unittest_python/unittest/tsg_diagnose.py index 9c24821..99ff595 100644 --- a/unittest_python/unittest/tsg_diagnose.py +++ b/unittest_python/unittest/tsg_diagnose.py @@ -9,17 +9,15 @@ from io import BytesIO import getopt import ciunittest import argparse +from telegraf.client import TelegrafClient URLBypass = 'https://sha384.badssl.self-test.geedge.net' URLIntercept = 'https://sha256.badssl.self-test.geedge.net' URLSexpired = 'https://expired.badssl.self-test.geedge.net' -URLSwronghost = 'https://wrong.host.badssl.self-test.geedge.net' URLSselfsigned = 'https://self-signed.badssl.self-test.geedge.net' URLSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net' -URLSrevoked = 'https://revoked.badssl.self-test.geedge.net' -URLSpinningtest = 'https://pinning-test.badssl.self-test.geedge.net' URLRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js' URLReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js' @@ -27,42 +25,41 @@ URLInsert = 'https://cn.bing.com/?FORM=BEHPTB' URLHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js' URLBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js' -URLdictConTrafficInject = { - "0k":"https://downloadfile.self-test.geedge.net/0k", \ - "1k":"https://downloadfile.self-test.geedge.net/1k", \ - "2k":"https://downloadfile.self-test.geedge.net/2k", \ - "4k":"https://downloadfile.self-test.geedge.net/4k", \ - "8k":"https://downloadfile.self-test.geedge.net/8k", \ - "16k":"https://downloadfile.self-test.geedge.net/16k", \ - "32k":"https://downloadfile.self-test.geedge.net/32k", \ - "64k":"https://downloadfile.self-test.geedge.net/64k", \ - "128k":"https://downloadfile.self-test.geedge.net/128k", \ - "256k":"https://downloadfile.self-test.geedge.net/256k", \ - "512k":"https://downloadfile.self-test.geedge.net/512k", \ - "1M":"https://downloadfile.self-test.geedge.net/1M", \ - "2M":"https://downloadfile.self-test.geedge.net/2M", \ - "4M":"https://downloadfile.self-test.geedge.net/4M", \ - "8M":"https://downloadfile.self-test.geedge.net/8M", \ - "16M":"https://downloadfile.self-test.geedge.net/16M", \ - "32M":"https://downloadfile.self-test.geedge.net/32M", \ - "64M":"https://downloadfile.self-test.geedge.net/64M"} +URLConTraffic_1k = "https://downloadfile.self-test.geedge.net/1k" +URLConTraffic_4k = "https://downloadfile.self-test.geedge.net/4k" +URLConTraffic_16k = "https://downloadfile.self-test.geedge.net/16k" +URLConTraffic_64k = "https://downloadfile.self-test.geedge.net/64k" +URLConTraffic_256k = "https://downloadfile.self-test.geedge.net/256k" +URLConTraffic_1M = "https://downloadfile.self-test.geedge.net/1M" +URLConTraffic_4M = "https://downloadfile.self-test.geedge.net/4M" +URLConTraffic_16M = "https://downloadfile.self-test.geedge.net/16M" +URLConTraffic_64M = "https://downloadfile.self-test.geedge.net/64M" + + ssl_bypass_info_re = "Ssl connection bypass success" ssl_intercept_info_re = "Ssl connection intercept success" https_exprired_info_re = "https exprired ok" -https_wrong_host_info_re = "https wrong host ok" https_self_signed_info_re = "https self signed ok" https_untrusted_root_info_re = "https untrusted_root ok" -https_revoked_info_re = "https revoked ok" -https_pinning_test_info_re = "https pinning-test ok" http_redirect_info_re = "http connection redirect success" http_replace_info_re = "http connection replace success" http_insert_info_re = "http connection insert success" http_hijack_info_re = "http connection hijack success" http_block_info_re = "http connection block success" -https_download_file_info_re = "http download file success" + +https_conn_taffic_1k_re = 'https download file 1k success' +https_conn_taffic_4k_re = 'https download file 4k success' +https_conn_taffic_16k_re = 'https download file 16k success' +https_conn_taffic_64k_re = 'https download file 64k success' +https_conn_taffic_256k_re = 'https download file 256k success' +https_conn_taffic_1M_re = 'https download file 1M success' +https_conn_taffic_4M_re = 'https download file 4M success' +https_conn_taffic_16M_re = 'https download file 16M success' +https_conn_taffic_64M_re = 'https download file 64M success' + wpr_dns_resolve = ['cn.bing.com:443:192.0.2.131'] @@ -71,11 +68,11 @@ class SSLCheckRequestBuild: self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) - self.conn.setopt(self.conn.TIMEOUT, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - def ssl_bypass(self): + def ssl_bypass(self,conTimeout): self.conn.setopt(self.conn.URL,URLBypass) + self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() @@ -88,13 +85,14 @@ class SSLCheckRequestBuild: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): raise Exception(ssl_bypass_info_re) - elif re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception("Error:Ssl connection is intercepted, not bypass") + elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0): + raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1]) else: - raise Exception("Error:Got other error certificate information, ssl connection's packages may loss") + raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1]) - def ssl_intercept(self): + def ssl_intercept(self,conTimeout): self.conn.setopt(self.conn.URL,URLIntercept) + self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() @@ -105,12 +103,13 @@ class SSLCheckRequestBuild: break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0): - raise Exception(ssl_intercept_info_re) - elif re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): - raise Exception("Error: Ssl connection is bypass, not intercept") + if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): + if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): + raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1]) + else: + raise Exception(ssl_intercept_info_re) else: - raise Exception("Error: Got other error certificate information, ssl connection's packages may loss") + raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) class SslInterceptRequestBuild: @@ -118,135 +117,43 @@ class SslInterceptRequestBuild: self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) - self.conn.setopt(self.conn.TIMEOUT, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - def ssl_intercept_certerrExpired(self): + def _conn_to_perform(self, pxy_info_re): + self.conn.perform() + certs = self.conn.getinfo(self.conn.INFO_CERTINFO) + self.conn.close() + issuer = () + for cert_info in certs[0]: + if cert_info[0].lower() == "issuer": + issuer = cert_info + break + if len(issuer) <= 0: + raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) + if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): + if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): + raise Exception( pxy_info_re) + else: + raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1]) + else: + raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) + + def ssl_intercept_certerrExpired(self,conTimeout): + self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.setopt(self.conn.URL, URLSexpired) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception(https_exprired_info_re) - else: - raise Exception("Error: get error certificate, Possible tsg certificate verification error") - else: - raise Exception("Error: Got other error certificate information, ssl connection's packages may loss") + self._conn_to_perform(https_exprired_info_re) - - def ssl_intercept_certerrWrong_host(self): - self.conn.setopt(self.conn.URL,URLSwronghost) - self.conn.setopt(self.conn.SSL_VERIFYHOST, False) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception(https_wrong_host_info_re) - else: - raise Exception("Error: get error certificate, Possible tsg certificate verification error") - else: - raise Exception("Error: Got other error certificate information, ssl connection's packages may loss") - - def ssl_intercept_certerrSelf_signed(self): + def ssl_intercept_certerrSelf_signed(self,conTimeout): self.conn.setopt(self.conn.URL,URLSselfsigned) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception(https_self_signed_info_re) - else: - raise Exception("Error: get error certificate, Possible tsg certificate verification error") - else: - raise Exception("Error: Got other error certificate information, ssl connection's packages may loss") + self.conn.setopt(self.conn.TIMEOUT,conTimeout) + self._conn_to_perform(https_self_signed_info_re) - def ssl_intercept_certerrUntrusted_root(self): + def ssl_intercept_certerrUntrusted_root(self,conTimeout): self.conn.setopt(self.conn.URL,URLSuntrustedroot) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception(https_untrusted_root_info_re) - else: - raise Exception("Error: get error certificate, Possible tsg certificate verification error") - else: - raise Exception("Error: Got other error certificate information, ssl connection's packages may loss") + self.conn.setopt(self.conn.TIMEOUT,conTimeout) + self._conn_to_perform(https_untrusted_root_info_re) - def ssl_intercept_certerrRevoked(self): - - self.conn.setopt(self.conn.URL,URLSrevoked) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception(https_revoked_info_re) - else: - raise Exception("Error: get error certificate, Possible tsg certificate verification error") - else: - raise Exception("Error: Got other error certificate information, ssl connection's packages may loss") - - - def ssl_intercept_certerrPinning_test(self): - - self.conn.setopt(self.conn.URL,URLSpinningtest) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception(https_pinning_test_info_re) - else: - raise Exception("Error: get error certificate, Possible tsg certificate verification error") - else: - raise Exception("Error: Got other error certificate information, ssl connection's packages may loss") - class SslHttpRequestBuild: def __init__(self): @@ -256,10 +163,10 @@ class SslHttpRequestBuild: self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.ENCODING, "gzip,deflate") self.conn.setopt(self.conn.RESOLVE,wpr_dns_resolve) - self.conn.setopt(self.conn.TIMEOUT, 1) - def http_redirect(self): + def http_redirect(self,conTimeout): self.conn.setopt(self.conn.URL, URLRedirect) + self.conn.setopt(self.conn.TIMEOUT, conTimeout) self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.perform() rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) @@ -269,10 +176,10 @@ class SslHttpRequestBuild: else: raise Exception("Error:Http connection redirect fail") - def http_replace(self): + def http_replace(self,conTimeout): self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) self.conn.setopt(self.conn.URL, URLReplace) - resCode = self.conn.getinfo(self.conn.RESPONSE_CODE) + self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.perform() body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() @@ -282,10 +189,10 @@ class SslHttpRequestBuild: else: raise Exception("Error:Http connection replace fail") - def http_insert(self): + def http_insert(self,conTimeout): self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) self.conn.setopt(self.conn.URL, URLInsert) - resCode = self.conn.getinfo(self.conn.RESPONSE_CODE) + self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.perform() body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() @@ -295,8 +202,9 @@ class SslHttpRequestBuild: else: raise Exception("Error:Http connection insert fail") - def http_block(self): + def http_block(self,conTimeout): self.conn.setopt(self.conn.URL, URLBlock) + self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) self.conn.perform() rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) @@ -307,8 +215,8 @@ class SslHttpRequestBuild: else: raise Exception("Error:http connection block fail") - def http_hijack(self): - + def http_hijack(self,conTimeout): + self.conn.setopt(self.conn.TIMEOUT,conTimeout) self.conn.setopt(self.conn.URL, URLHijack) self.conn.setopt(self.conn.WRITEFUNCTION, self.bodyBuf.write) self.conn.perform() @@ -333,12 +241,14 @@ class SslHttpRequestBuild: class SSLFileDownloadBuild: - def __init__(self): - self.sizeList = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"] - self.resultList = [] - self.isException = False + def __init__(self): + self.conn = pycurl.Curl() + self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) + self.conn.setopt(self.conn.SSL_VERIFYPEER, False) + self.conn.setopt(self.conn.OPT_CERTINFO, 1) + self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'}) - def build_conninfo_json(self,conn): + def _get_conninfo(self,conn): dictconninfo = {} dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) @@ -348,74 +258,58 @@ class SSLFileDownloadBuild: dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) - dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE) - dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE) dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) return dictconninfo - def get_value_from_succ_conn(self,urlkey,url,conn): - dictinfo = {} - dictinfo["downloadsize"] = urlkey - dictinfo["url"] = url - dictinfo["time"] = time.asctime( time.localtime(time.time())) - dictinfo["result"] = self.build_conninfo_json(conn) - self.resultList.append(dictinfo) + def _write_in_nezha(self, sizeStr, connInfoDict): + nzdict = {} + nzname = 'conn_taffic_status_size_' + sizeStr + dictKeyTime = "conn_traffic_" + sizeStr + "_size_total_time" + dcitKeyStatus = "conn_traffic_" + sizeStr + "_size_status" + nzdict[dictKeyTime] = connInfoDict['total_time'] + nzdict[dcitKeyStatus] = connInfoDict['status'] + self.client.metric(nzname, nzdict) - def conn_filedownload(self,urlkey,url): - issuer = () - conn = pycurl.Curl() - errdict = {} - conn.setopt(conn.WRITEFUNCTION, BytesIO().write) - conn.setopt(conn.SSL_VERIFYPEER, False) - conn.setopt(conn.OPT_CERTINFO, 1) - conn.setopt(conn.TIMEOUT, 1) - conn.setopt(conn.URL,url) - conn.perform() - certs = conn.getinfo(conn.INFO_CERTINFO) - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - errdict["status"] = "error" - errdict["errinfo"] = "Get certificate info error" - errdict["url"] = url - errdict["time"] = time.asctime( time.localtime(time.time())) - self.resultList.append(errdict) - self.isException = True - elif not re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0): - errdict["status"] = "error" - errdict["errinfo"] = "Intercept fail: no Tango cert" - errdict["url"] = url - errdict["time"] = time.asctime( time.localtime(time.time())) - self.resultList.append(errdict) - self.isException = True - else: - self.get_value_from_succ_conn(urlkey,url,conn) - conn.close() - - def write_log(self): - logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime()) - logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest" + def _write_in_logfile(self, sizeStr, connInfoDict): + logNewestPath = "/root/result_tsg_diagnose/conn_traffic_status/conn_traffic_status_" + sizeStr + logPath = logNewestPath + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) + connInfoStr = json.dumps(connInfoDict) with open(logNewestPath,"w+") as f: - f.write(json.dumps(self.resultList, sort_keys=True, indent=4, separators=(',', ': '))) - f.close() - with open(logpath,"a+") as f: - f.write(json.dumps(self.resultList)) - f.write("\n") - f.close() + f.write(connInfoStr) + f.close() - def downfile_run(self): - for sizefield in self.sizeList: - self.conn_filedownload(sizefield,URLdictConTrafficInject[sizefield]) - self.write_log() - if self.isException == True: - raise Exception("Error:http_hijack download file fail") - else: - raise Exception(https_download_file_info_re) + with open(logPath,"w+") as f: + fn = open(logNewestPath,'r') + f.write(fn.read()) + fn.close() + f.close() + def conn_traffic(self,URL,conn_taffic_re, sizeStr, size,conTimeout): + self.conn.setopt(self.conn.TIMEOUT,conTimeout) + self.conn.setopt(self.conn.URL,URL) + self.conn.perform() + certs = self.conn.getinfo(self.conn.INFO_CERTINFO) + conninfo = self._get_conninfo(self.conn) + self.conn.close() + issuer = () + for cert_info in certs[0]: + if cert_info[0] == "Issuer": + issuer = cert_info + break + if len(issuer) <= 0: + raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) + if not re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): + raise Exception("Error: Intercept fail: no Tango cert") + + if int(conninfo["size_download"]) == size: + self._write_in_nezha(sizeStr,conninfo) + self._write_in_logfile(sizeStr,conninfo) + raise Exception(conn_taffic_re) + else: + raise Exception("Error: connection tarffic size error and is no equal", sizeStr) + class SslUnitTest(unittest.TestCase): @@ -423,73 +317,97 @@ class SslUnitTest(unittest.TestCase): def test_securityPolicy_bypass(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_bypass_info_re): - sslHandler.ssl_bypass() + sslHandler.ssl_bypass(1) def test_securityPolicy_intercept(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_intercept_info_re): - sslHandler.ssl_intercept() + sslHandler.ssl_intercept(1) def test_securityPolicy_intercept_certerrExpired(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_exprired_info_re): - requestHandler.ssl_intercept_certerrExpired() - - def test_securityPolicy_intercept_certerrWrong_host(self): - requestHandler = SslInterceptRequestBuild() - with self.assertRaisesRegex(Exception, https_wrong_host_info_re): - requestHandler.ssl_intercept_certerrWrong_host() + requestHandler.ssl_intercept_certerrExpired(1) def test_securityPolicy_intercept_certerrSelf_signed(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_self_signed_info_re): - requestHandler.ssl_intercept_certerrSelf_signed() + requestHandler.ssl_intercept_certerrSelf_signed(1) def test_securityPolicy_intercept_certerrUntrusted_root(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_untrusted_root_info_re): - requestHandler.ssl_intercept_certerrUntrusted_root() - - def test_securityPolicy_intercept_certerrRevoked(self): - requestHandler = SslInterceptRequestBuild() - with self.assertRaisesRegex(Exception, https_revoked_info_re): - requestHandler.ssl_intercept_certerrRevoked() - - def test_securityPolicy_intercept_certerrPinning_test(self): - requestHandler = SslInterceptRequestBuild() - with self.assertRaisesRegex(Exception, https_pinning_test_info_re): - requestHandler.ssl_intercept_certerrPinning_test() + requestHandler.ssl_intercept_certerrUntrusted_root(1) def test_proxyPolicy_redirect(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_redirect_info_re): - httpHandler.http_redirect() + httpHandler.http_redirect(2) def test_proxyPolicy_block(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_block_info_re): - httpHandler.http_block() + httpHandler.http_block(2) def test_proxyPolicy_replace(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_replace_info_re): - httpHandler.http_replace() + httpHandler.http_replace(2) def test_proxyPolicy_hijack(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_hijack_info_re): - httpHandler.http_hijack() + httpHandler.http_hijack(2) def test_proxyPolicy_insert(self): httpHandler = SslHttpRequestBuild() with self.assertRaisesRegex(Exception, http_insert_info_re): - httpHandler.http_insert() + httpHandler.http_insert(2) - def test_securityPolicy_con_traffic_inject(self): + def test_https_con_traffic_1k(self): requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_download_file_info_re): - requestHandler.downfile_run() - + with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re): + requestHandler.conn_traffic( URLConTraffic_1k,https_conn_taffic_1k_re, '1k', 1024,1) + + def test_https_con_traffic_4k(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re): + requestHandler.conn_traffic( URLConTraffic_4k,https_conn_taffic_4k_re, '4k', 4*1024,1) + + def test_https_con_traffic_16k(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re): + requestHandler.conn_traffic( URLConTraffic_16k,https_conn_taffic_16k_re, '16k', 16*1024,1) + + def test_https_con_traffic_64k(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re): + requestHandler.conn_traffic( URLConTraffic_64k,https_conn_taffic_64k_re, '64k', 64*1024,1) + + def test_https_con_traffic_256k(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re): + requestHandler.conn_traffic( URLConTraffic_256k,https_conn_taffic_256k_re, '256k', 256*1024,2) + + def test_https_con_traffic_1M(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re): + requestHandler.conn_traffic( URLConTraffic_1M,https_conn_taffic_1M_re, '1M', 1024 * 1024,2) + + def test_https_con_traffic_4M(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re): + requestHandler.conn_traffic( URLConTraffic_4M,https_conn_taffic_4M_re, '4M', 4*1024*1024,2) + + def test_https_con_traffic_16M(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re): + requestHandler.conn_traffic( URLConTraffic_16M,https_conn_taffic_16M_re, '16M',16*1024*1024,4) + + def test_https_con_traffic_64M(self): + requestHandler = SSLFileDownloadBuild() + with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re): + requestHandler.conn_traffic( URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024,4) class TsgDiagnoseRun: def __init__(self): @@ -498,13 +416,14 @@ class TsgDiagnoseRun: self.write = None self.loop = False self.count = 1 + self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'}) def _get_suite_option(self): parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help") parser.add_argument('-i','--interval', type = int, default = 1,help='Wait interval seconds between each tsg disagnose. The default is to wait for one second between each tsg diagnose.') parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535') parser.add_argument('-f','--format', type = str, default = 'txt',help='Specifies the result output format of the tsg diagnose. There two formats: json,txt, the default is txt.') - parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file. Specifies the output file name.') + parser.add_argument('-w','--write', type = str, default = None,help='Write out result into file or NEZHA. Specifies the output file name or NEZHA.') parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal') args = parser.parse_args() self.interval = args.interval @@ -529,24 +448,28 @@ class TsgDiagnoseRun: self.suite.addTest(SslUnitTest('test_securityPolicy_bypass')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrExpired')) - self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrWrong_host')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrSelf_signed')) self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrUntrusted_root')) - self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrRevoked')) - self.suite.addTest(SslUnitTest('test_securityPolicy_intercept_certerrPinning_test')) self.suite.addTest(SslUnitTest('test_proxyPolicy_redirect')) self.suite.addTest(SslUnitTest('test_proxyPolicy_block')) self.suite.addTest(SslUnitTest('test_proxyPolicy_replace')) self.suite.addTest(SslUnitTest('test_proxyPolicy_hijack')) self.suite.addTest(SslUnitTest('test_proxyPolicy_insert')) - self.suite.addTest(SslUnitTest('test_securityPolicy_con_traffic_inject')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_1k')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_4k')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_16k')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_64k')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_256k')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_1M')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_4M')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_16M')) + self.suite.addTest(SslUnitTest('test_https_con_traffic_64M')) if self.format == 'json': self.suite = None - - def _write_suite_result(self): - resultDict = '/root/result_self_test/unittest/' + def _write_suite_result_into_file(self): + resultDict = '/root/result_tsg_diagnose/unittest/' resultNewestPath = resultDict + self.write resultPath = resultDict + self.write + "." + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) if self.format == 'txt': @@ -568,6 +491,30 @@ class TsgDiagnoseRun: fn.close() f.close() + def _write_suite_result_into_NEZHA(self): + nzdict = {} + self.suite = unittest.TestLoader().loadTestsFromTestCase(SslUnitTest) + result_json = ciunittest.JsonTestRunner().run(self.suite, formatted=False) + result_dict = json.loads(result_json) + reuslt_list = result_dict['results'] + succsum = 0 + failsum = 0 + for reuslt in reuslt_list: + succkey = reuslt['name'].split()[0] + '_succ' + nzdict[succkey] = 0 + if reuslt['type'] == 'success': + nzdict[succkey] = 1 + succsum = succsum + 1 + if reuslt['type'] == 'failure': + failsum = failsum + 1 + + nzdict['succsum'] = succsum + self.client.metric('tsg_diagnose_result', nzdict) + result_dict['succsum'] = succsum + result_dict['failsum'] = succsum + result_stdout = json.dumps(result_dict) + print(result_stdout) + def _stdout_suite_result(self): print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s')) if self.format == 'txt': @@ -580,8 +527,10 @@ class TsgDiagnoseRun: print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s')) def _output_suite_result(self): - if self.write: - self._write_suite_result() + if self.write and self.write != 'NEZHA': + self._write_suite_result_into_file() + elif self.write == 'NEZHA': + self._write_suite_result_into_NEZHA() else: self._stdout_suite_result()