This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-tsg-diagnose/images_build/client/dign_client/bin/client.py

1086 lines
59 KiB
Python

import sys
import unittest
import json
import pycurl
import os
import re
import time
import io
from io import BytesIO
import getopt
import ciunittest
import argparse
from telegraf.client import TelegrafClient
import hashlib
from configparser import ConfigParser
import random
import dns.exception
import dns.resolver
import sys
suite_test_config_dict = {'test_securityPolicy_bypass': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_deny_drop': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_deny_redirect_a': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_deny_redirect_aaaa': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_deny_redirect_a_range_ttl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_deny_redirect_aaaa_range_ttl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_allow_rdtype_a': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_allow_rdtype_aaaa': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_allow_rdtype_cname': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept_certerrExpired': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept_certerrSelf_signed': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_securityPolicy_intercept_certerrUntrusted_root': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_redirect': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_replace': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_hijack': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_ssl_insert': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_redirect': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_replace': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_hijack': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyPolicy_http_insert': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_1k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_4k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_16k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_64k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_256k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_1M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_4M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_16M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_https_con_traffic_64M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_http_firewall_allow': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_http_firewall_deny_drop': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600},
'test_http_firewall_deny_rst': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_http_firewall_deny_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_ssl_firewall_allow': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_ssl_firewall_deny_drop': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600},
'test_ssl_firewall_deny_rst': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'start_time_random_delay_range': {'enabled':1,'left_edge':1,'right_edge':30},
'telegraf': {'host':'192.51.100.1','port':8100,'tags_key':'app_name','tags_value' :'tsg-diagnose'}}
ssl_bypass_info_re = "Ssl connection bypass success"
ssl_intercept_info_re = "Ssl connection intercept success"
https_exprired_info_re = "Ssl exprired cert check success"
https_self_signed_info_re = "Ssl self signed cert check success"
https_untrusted_root_info_re = "Ssl untrusted_root cert check success"
ssl_redirect_info_re = "Ssl connection redirect success"
ssl_replace_info_re = "Ssl connection replace success"
ssl_insert_info_re = "Ssl connection insert success"
ssl_hijack_info_re = "Ssl connection hijack success"
ssl_block_info_re = "Ssl connection block success"
http_redirect_info_re = "http connection redirect success"
http_replace_info_re = "http connection replace success"
http_insert_info_re = "http connection insert success"
http_hijack_info_re = "http connection hijack success"
http_block_info_re = "http connection block success"
https_conn_taffic_1k_re = 'https download file 1k success'
https_conn_taffic_4k_re = 'https download file 4k success'
https_conn_taffic_16k_re = 'https download file 16k success'
https_conn_taffic_64k_re = 'https download file 64k success'
https_conn_taffic_256k_re = 'https download file 256k success'
https_conn_taffic_1M_re = 'https download file 1M success'
https_conn_taffic_4M_re = 'https download file 4M success'
https_conn_taffic_16M_re = 'https download file 16M success'
https_conn_taffic_64M_re = 'https download file 64M success'
http_firewall_allow_re = "http firewall action allow success"
http_firewall_deny_drop_re = "http firewall aciton deny subaction drop success"
http_firewall_deny_rst_re = "http firewall action deny subaction rst success"
http_firewall_deny_block_re = "http firewall aciton deny subaction block success"
ssl_firewall_allow_re = "ssl firewall action allow success"
ssl_firewall_deny_drop_re = "ssl firewall action deny subaction drop success"
ssl_firewall_deny_rst_re = "ssl firewall action deny subaction rst success"
URLBypass = 'https://sha384.badssl.self-test.geedge.net'
URLIntercept = 'https://sha256.badssl.self-test.geedge.net'
URLSslExpired = 'https://expired.badssl.self-test.geedge.net'
URLSslSelfsigned = 'https://self-signed.badssl.self-test.geedge.net'
URLSslSuntrustedroot = 'https://untrusted-root.badssl.self-test.geedge.net'
URLSslRedirect = 'https://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLSslReplace = 'https://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLSslInsert = 'https://cn.bing.com/?FORM=BEHPTB'
URLSslHijack = 'https://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLSslBlock = 'https://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLHttpRedirect = 'http://cn.bing.com/rs/2V/pE/cj,nj/b1392357/d94c45f4.js'
URLHttpReplace = 'http://cn.bing.com/rs/5j/1pF/cj,nj/2213d9b6/b50738ca.js'
URLHttpInsert = 'http://cn.bing.com/?FORM=BEHPTB'
URLHttpHijack = 'http://cn.bing.com/rs/31/2n/cj,nj/4c7364c5/40e1b425.js'
URLHttpBlock = 'http://cn.bing.com/rs/31/22/cj,nj/3f1e2270/f8c6dd44.js'
URLConTraffic_1k = "https://downloadfile.self-test.geedge.net/1k"
URLConTraffic_4k = "https://downloadfile.self-test.geedge.net/4k"
URLConTraffic_16k = "https://downloadfile.self-test.geedge.net/16k"
URLConTraffic_64k = "https://downloadfile.self-test.geedge.net/64k"
URLConTraffic_256k = "https://downloadfile.self-test.geedge.net/256k"
URLConTraffic_1M = "https://downloadfile.self-test.geedge.net/1M"
URLConTraffic_4M = "https://downloadfile.self-test.geedge.net/4M"
URLConTraffic_16M = "https://downloadfile.self-test.geedge.net/16M"
URLConTraffic_64M = "https://downloadfile.self-test.geedge.net/64M"
URLHttpFirewallAllow = "http://http.badssl.self-test.geedge.net"
URLHttpFirewallDenyDrop = "http://http-credit-card.badssl.self-test.geedge.net"
URLHttpFirewallDenyRst = "http://http-dynamic-login.badssl.self-test.geedge.net"
URLHttpFirewallDenyBlock = "http://http-login.badssl.self-test.geedge.net"
URLSslFirewallAllow = "https://sha512.badssl.self-test.geedge.net"
URLSslFirewallDenyDrop = "https://rsa2048.badssl.self-test.geedge.net"
URLSslFirewallDenyRst = "https://rsa4096.badssl.self-test.geedge.net"
DNS_SERVER_ALLOW_TTL = 60
DNS_SERVER_REDIRECT_TTL = 333
DNS_SERVER_REDIRECT_RANGE_LOW = 400
DNS_SERVER_REDIRECT_RANGE_HIGH = 500
DNS_SERVER_IP = ["192.0.2.135"]
DnsRequestFirewallDenyDrop = "Dns request timeout is deny drop sucess"
DnsARequestFireWallDenyRedirect = "Dns rdtype A request is deny reidrect sucess"
DnsAAAARequestFireWallDenyRedirect = "Dns rdtype AAAA request is deny redirect sucess"
DnsARequestFireWallDenyRedirectRangTTL = "Dns rdtype A request is deny reidrect and range ttl sucess"
DnsAAAARequestFireWallDenyRedirectRangTTL = "Dns rdtype AAAA request is deny redirect and range ttl sucess"
DnsARequestFirewallAllow = "Dns rdtype A request data is sucess"
DnsAAAARequestFirewallAllow = "Dns rdtype AAAA request data is sucess"
DnsCNAMERequestFirewallAllow = "Dns rdtype CNAME request data is sucess"
class DNSCheckRequestBuild:
def dns_action_deny_subaction_drop(self):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(3)
dns_resolver.lifetime = float(3)
try:
dns_answer = dns_resolver.query("www.3test-ipv4.com", 'A')
except dns.exception.DNSException as errorinfo:
if type(errorinfo) == dns.exception.Timeout:
raise Exception(DnsRequestFirewallDenyDrop)
else:
raise Exception("Error: The dns_action_deny_subaction_drop check failure, code: %s" % errorinfo)
else:
raise Exception("Error: The dns_action_deny_subaction_drop test deny drop failure" )
def dns_action_deny_subaction_redirect_a(self):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(3)
dns_resolver.lifetime = float(3)
try:
dns_answer = dns_resolver.query("www.2test-ipv4.com", 'A')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure, code: %s" % errorinfo)
else: # drop-redirect and respond rdtype A ipv4
if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL:
raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 1: #'rdtype is A: ipv4'
if j.address == "99.99.99.99":
raise Exception(DnsARequestFireWallDenyRedirect)
else:
raise Exception("Error: The dns request rdtype A drop redirect check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype A drop redirect check failure: respond rdtype error")
def dns_action_deny_subaction_redirect_aaaa(self):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(3)
dns_resolver.lifetime = float(3)
try:
dns_answer = dns_resolver.query("www.2test-ipv6.com", 'AAAA')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure, code: %s" % errorinfo)
else: # drop-redirect and respond rdtype A ipv6
if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL:
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 28: #'rdtype is A: ipv6'
if j.address == "99:99::99:99":
raise Exception(DnsAAAARequestFireWallDenyRedirect)
else:
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error")
def dns_action_deny_subaction_redirect_a_rang_ttl(self):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(3)
dns_resolver.lifetime = float(3)
try:
dns_answer = dns_resolver.query("www.4test-ipv4.com", 'A')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure, code: %s" % errorinfo)
else: # drop-redirect and respond rdtype A ipv4
if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH:
raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl,DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 1: #'rdtype is A: ipv4'
if j.address == "99.99.99.99":
raise Exception(DnsARequestFireWallDenyRedirectRangTTL)
else:
raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond rdtype error")
def dns_action_deny_subaction_redirect_aaaa_rang_ttl(self):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(3)
dns_resolver.lifetime = float(3)
try:
dns_answer = dns_resolver.query("www.4test-ipv6.com", 'AAAA')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa range ttl check failure, code: %s" % errorinfo)
else: # drop-redirect and respond rdtype A ipv6
if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH:
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 28: #'rdtype is A: ipv6'
if j.address == "99:99::99:99":
raise Exception(DnsAAAARequestFireWallDenyRedirectRangTTL)
else:
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error")
def dns_action_allow_rdtype_a(self):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(3)
dns_resolver.lifetime = float(3)
try:
dns_answer = dns_resolver.query("www.1test-ipv4.com", 'A')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns request rdtype A allow check failure, code: %s" % errorinfo)
else:
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 1: #'rdtype is A: ipv4'
if j.address == "10.1.2.3":
raise Exception(DnsARequestFirewallAllow)
else:
raise Exception("Error: The dns request rdtype A allow check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype A allow check failure: respond rdtype error")
def dns_action_allow_rdtype_aaaa(self):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(3)
dns_resolver.lifetime = float(3)
try:
dns_answer = dns_resolver.query("www.1test-ipv6.com", 'AAAA')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns request rdtype AAAA allow check failure, code: %s" % errorinfo)
else:
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 28: #'rdtype is AAAA: ipv6'
if j.address == "11aa:11:22::33":
raise Exception(DnsAAAARequestFirewallAllow)
else:
raise Exception("Error: The dns request rdtype AAAA allow check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype AAAA allow check failure: response rdtype error")
def dns_action_allow_rdtype_cname(self):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(3)
dns_resolver.lifetime = float(3)
try:
dns_answer = dns_resolver.query("www.1test-cname.com", 'CNAME')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns request rdtype CNAME allow check failure, code: %s" % errorinfo)
else:
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 5: #'CNAME: tag(www.xxx.com)'
m=str(j)
if m == "www.1testanswer-cname.com.":
raise Exception(DnsCNAMERequestFirewallAllow)
else:
raise Exception("Error: The dns request rdtype CNAME allow check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype CNAME allow check failure: respond rdtype error")
class SSLCheckRequestBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _get_conn_issuer(self,certs):
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
return issuer
def ssl_bypass(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLBypass)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = self._get_conn_issuer(certs)
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0):
raise Exception(ssl_bypass_info_re)
elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0):
raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1])
else:
raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1])
def ssl_intercept(self,test_suite_name):
self._set_conn_opt(test_suite_name, URLIntercept)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = self._get_conn_issuer(certs)
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1])
else:
raise Exception(ssl_intercept_info_re)
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
class SslInterceptRequestBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name,url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL, url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _conn_to_perform(self, test_suite_name, sec_info_re):
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(sec_info_re)
else:
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
else:
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0) or \
re.search(r'\bCN[\s\S]*=[\s\S]*badssl[\s\S]*',issuer[1],0):
raise Exception("Error: Ssl connection intercept failed, cert info: %s" % issuer[1])
else:
raise Exception(sec_info_re)
def ssl_intercept_certerrExpired(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslExpired)
self._conn_to_perform(test_suite_name,https_exprired_info_re)
def ssl_intercept_certerrSelf_signed(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslSelfsigned)
self._conn_to_perform(test_suite_name,https_self_signed_info_re)
def ssl_intercept_certerrUntrusted_root(self,test_suite_name,):
self._set_conn_opt(test_suite_name,URLSslSuntrustedroot)
self._conn_to_perform(test_suite_name,https_untrusted_root_info_re)
class ProxyRequestBuild:
def __init__(self):
self.bodyBuf = BytesIO()
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
def _cert_verify(self, certs, isSsl):
if isSsl == True:
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1])
else:
return
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
def _set_conn_opt(self,test_suite_name, url,isSsl):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
if isSsl == True:
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def proxy_redirect(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
self._cert_verify(certs, isSsl)
if rescode == 301 or rescode == 302:
if isSsl == True:
raise Exception(ssl_redirect_info_re)
else:
raise Exception(http_redirect_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode)
def proxy_replace(self,test_suite_name, proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name, proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close()
self._cert_verify(certs, isSsl)
if not re.search(r'EnglishSearchShared', body, 0) and \
re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0):
if isSsl == True:
raise Exception(ssl_replace_info_re)
else:
raise Exception(http_replace_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection replace fail")
else:
raise Exception("Error:Http connection replace fail")
def proxy_insert(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
body = self.bodyBuf.getvalue().decode('utf-8')
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
self._cert_verify(certs, isSsl)
if re.search(r'httpSelfcheckInsert', body, 0) and \
re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0):
if isSsl == True:
raise Exception(ssl_insert_info_re)
else:
raise Exception(http_insert_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection insert fail")
else:
raise Exception("Error:Http connection insert fail")
def proxy_block(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close()
self._cert_verify(certs, isSsl)
if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451):
if isSsl == True:
raise Exception(ssl_block_info_re)
else:
raise Exception(http_block_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode)
def proxy_hijack(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
self._cert_verify(certs, isSsl)
hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue())
if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0):
if isSsl == True:
raise Exception(ssl_hijack_info_re)
else:
raise Exception(http_hijack_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection hijack fail")
else:
raise Exception("Error:Http connection hijack fail")
class SSLFileDownloadBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
#self.client = TelegrafClient(host='192.51.100.1', port=8100,tags={'app_name':'tsg-diagnose'})
self.client = TelegrafClient(host=str(suite_test_config_dict['telegraf']['host']), port=int(suite_test_config_dict['telegraf']['port']),tags={str(suite_test_config_dict['telegraf']['tags_key']):str(suite_test_config_dict['telegraf']['tags_value'])})
def _get_conninfo(self,conn):
dictconninfo = {}
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME)
dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME)
dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME)
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
return dictconninfo
def _set_conn_opt(self,test_suite_name,url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _write_in_nezha(self, sizeStr, connInfoDict):
nzdict = {}
nzname = 'conn_taffic_status_size_' + sizeStr
dictKeyTime = "conn_traffic_" + sizeStr + "_size_total_time"
dcitKeyStatus = "conn_traffic_" + sizeStr + "_size_status"
nzdict[dictKeyTime] = connInfoDict['total_time']
nzdict[dcitKeyStatus] = connInfoDict['status']
self.client.metric(nzname, nzdict)
def _write_in_logfile(self, sizeStr, connInfoDict):
logNewestPath = "/opt/dign_client/log/conn_traffic_status/conn_traffic_status_" + sizeStr
logPath = logNewestPath + time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
connInfoStr = json.dumps(connInfoDict)
with open(logNewestPath,"w+") as f:
f.write(connInfoStr)
f.close()
with open(logPath,"w+") as f:
fn = open(logNewestPath,'r')
f.write(fn.read())
fn.close()
f.close()
def conn_traffic(self,test_suite_name,url,conn_taffic_re,sizeStr, size):
self._set_conn_opt(test_suite_name, url)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
conninfo = self._get_conninfo(self.conn)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1])
else:
raise Exception("Error: Intercept fail: no Tango cert,cert info:%s" % issuer[1])
if int(conninfo["size_download"]) == size:
self._write_in_nezha(sizeStr,conninfo)
self._write_in_logfile(sizeStr,conninfo)
raise Exception(conn_taffic_re)
else:
raise Exception("Error: connection tarffic size error and is no equal", sizeStr)
class HttpFirewallActionBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def action_allow(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallAllow)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 200:
raise Exception(http_firewall_allow_re)
else:
raise Exception("Error: The stream may be redirected, http code %s" % rescode)
def action_deny_subaction_drop(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyDrop)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 28):
raise Exception(http_firewall_deny_drop_re)
else:
raise Exception("Error: The stream may be not dropped %s" % errorinfo)
def action_deny_subaction_rst(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyRst)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 56):
raise Exception(http_firewall_deny_rst_re)
else:
raise Exception("Error: The stream may be not rst %s" % errorinfo)
def action_deny_subaction_block(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyBlock)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 403:
raise Exception(http_firewall_deny_block_re)
else:
raise Exception("Error: The stream may be not block, http code %s " % rescode)
class SslFirewallActionBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def action_allow(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallAllow)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 200:
raise Exception(ssl_firewall_allow_re)
else:
raise Exception("Error: The stream may be redirected, http code %s" % rescode)
def action_deny_subaction_drop(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallDenyDrop)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 28):
raise Exception(ssl_firewall_deny_drop_re)
else:
raise Exception("Error: The stream may be not dropped %s" % errorinfo)
def action_deny_subaction_rst(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallDenyRst)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 35):
raise Exception(ssl_firewall_deny_rst_re)
else:
raise Exception("Error: The stream may be not rst %s" % errorinfo)
class SslUnitTest(unittest.TestCase):
def test_dnsRequest_deny_drop(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsRequestFirewallDenyDrop):
dnsHandler.dns_action_deny_subaction_drop()
def test_dnsRequest_deny_redirect_a(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirect):
dnsHandler.dns_action_deny_subaction_redirect_a()
def test_dnsRequest_deny_redirect_aaaa(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirect):
dnsHandler.dns_action_deny_subaction_redirect_aaaa()
def test_dnsRequest_deny_redirect_a_range_ttl(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirectRangTTL):
dnsHandler.dns_action_deny_subaction_redirect_a_rang_ttl()
def test_dnsRequest_deny_redirect_aaaa_range_ttl(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirectRangTTL):
dnsHandler.dns_action_deny_subaction_redirect_aaaa_rang_ttl()
def test_dnsRequest_allow_rdtype_a(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsARequestFirewallAllow):
dnsHandler.dns_action_allow_rdtype_a()
def test_dnsRequest_allow_rdtype_aaaa(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsAAAARequestFirewallAllow):
dnsHandler.dns_action_allow_rdtype_aaaa()
def test_dnsRequest_allow_rdtype_cname(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsCNAMERequestFirewallAllow):
dnsHandler.dns_action_allow_rdtype_cname()
def test_securityPolicy_bypass(self):
sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_bypass_info_re):
sslHandler.ssl_bypass('test_securityPolicy_bypass')
def test_securityPolicy_intercept(self):
sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_intercept_info_re):
sslHandler.ssl_intercept('test_securityPolicy_intercept')
def test_securityPolicy_intercept_certerrExpired(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_exprired_info_re):
requestHandler.ssl_intercept_certerrExpired('test_securityPolicy_intercept_certerrExpired')
def test_securityPolicy_intercept_certerrSelf_signed(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_self_signed_info_re):
requestHandler.ssl_intercept_certerrSelf_signed('test_securityPolicy_intercept_certerrSelf_signed')
def test_securityPolicy_intercept_certerrUntrusted_root(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_untrusted_root_info_re):
requestHandler.ssl_intercept_certerrUntrusted_root('test_securityPolicy_intercept_certerrUntrusted_root')
def test_proxyPolicy_ssl_redirect(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_redirect_info_re):
proxyHandler.proxy_redirect('test_proxyPolicy_ssl_redirect',URLSslRedirect,True)
def test_proxyPolicy_ssl_block(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_block_info_re):
proxyHandler.proxy_block('test_proxyPolicy_ssl_block', URLSslBlock,True)
def test_proxyPolicy_ssl_replace(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_replace_info_re):
proxyHandler.proxy_replace('test_proxyPolicy_ssl_replace',URLSslReplace, True)
def test_proxyPolicy_ssl_hijack(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_hijack_info_re):
proxyHandler.proxy_hijack('test_proxyPolicy_ssl_hijack', URLSslHijack,True)
def test_proxyPolicy_ssl_insert(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_insert_info_re):
proxyHandler.proxy_insert('test_proxyPolicy_ssl_insert',URLSslInsert,True)
def test_proxyPolicy_http_redirect(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_redirect_info_re):
proxyHandler.proxy_redirect('test_proxyPolicy_http_redirect',URLHttpRedirect, False)
def test_proxyPolicy_http_block(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_block_info_re):
proxyHandler.proxy_block('test_proxyPolicy_http_block', URLHttpBlock,False)
def test_proxyPolicy_http_replace(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_replace_info_re):
proxyHandler.proxy_replace('test_proxyPolicy_http_replace',URLHttpReplace, False)
def test_proxyPolicy_http_hijack(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_hijack_info_re):
proxyHandler.proxy_hijack('test_proxyPolicy_http_hijack', URLHttpHijack,False)
def test_proxyPolicy_http_insert(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_insert_info_re):
proxyHandler.proxy_insert('test_proxyPolicy_http_insert',URLHttpInsert,False)
def test_https_con_traffic_1k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_1k', URLConTraffic_1k, https_conn_taffic_1k_re,'1k', 1024)
def test_https_con_traffic_4k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_4k',URLConTraffic_4k, https_conn_taffic_4k_re, '4k', 4*1024)
def test_https_con_traffic_16k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_16k', URLConTraffic_16k, https_conn_taffic_16k_re,'16k', 16*1024)
def test_https_con_traffic_64k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_64k',URLConTraffic_64k, https_conn_taffic_64k_re, '64k', 64*1024)
def test_https_con_traffic_256k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re):
requestHandler.conn_traffic( 'test_https_con_traffic_256k', URLConTraffic_256k,https_conn_taffic_256k_re,'256k', 256*1024)
def test_https_con_traffic_1M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_1M', URLConTraffic_1M, https_conn_taffic_1M_re, '1M', 1024 * 1024)
def test_https_con_traffic_4M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_4M', URLConTraffic_4M, https_conn_taffic_4M_re,'4M', 4*1024*1024)
def test_https_con_traffic_16M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_16M', URLConTraffic_16M,https_conn_taffic_16M_re,'16M',16*1024*1024)
def test_https_con_traffic_64M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re):
requestHandler.conn_traffic( 'test_https_con_traffic_64M',URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024)
def test_http_firewall_allow(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_allow_re):
requestHandler.action_allow('test_http_firewall_allow')
def test_http_firewall_deny_drop(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_drop_re):
requestHandler.action_deny_subaction_drop('test_http_firewall_deny_drop')
def test_http_firewall_deny_rst(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_rst_re):
requestHandler.action_deny_subaction_rst('test_http_firewall_deny_rst')
def test_http_firewall_deny_block(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_block_re):
requestHandler.action_deny_subaction_block('test_http_firewall_deny_block')
def test_ssl_firewall_allow(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_allow_re):
requestHandler.action_allow('test_ssl_firewall_allow')
def test_ssl_firewall_deny_drop(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_deny_drop_re):
requestHandler.action_deny_subaction_drop('test_ssl_firewall_deny_drop')
def test_ssl_firewall_deny_rst(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_deny_rst_re):
requestHandler.action_deny_subaction_rst('test_ssl_firewall_deny_rst')
class TsgDiagnoseRun:
def __init__(self):
self.interval = 1
self.loop = False
self.count = 1
self.config = None
self.client = None
self.config_dict = {}
def _get_suite_option(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help")
parser.add_argument('-i','--interval', type = int, default = 30,help='Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose.')
parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535')
parser.add_argument('-p','--configpath', type = str, default = '/opt/dign_client/etc/client.conf',help='Specifies the config file, default /opt/dign_client/etc/client.conf')
parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal')
args = parser.parse_args()
self.interval = args.interval
self.loop = args.loop
self.count = args.count
self.config = args.configpath
if self.count == 0:
print("Error: bad number of tsg diagnose and will exit")
parser.print_help()
sys.exit(1)
def _get_suite_config(self):
global suite_test_config_dict
config = ConfigParser()
config.read(self.config, encoding='UTF-8')
for section in config.sections():
if section in suite_test_config_dict:
suite_test_config_dict[section].update(config.items(section))
else:
suite_test_config_dict[section] = dict(config.items(section))
self.config_dict = suite_test_config_dict
def _add_suite(self,test_suite_name):
if int(self.config_dict[test_suite_name]['enabled']) == 1:
self.suite.addTest(SslUnitTest(test_suite_name))
def _init_suite(self):
self.suite = unittest.TestSuite()
self.suite._cleanup = False
self._add_suite('test_dnsRequest_deny_drop')
self._add_suite('test_dnsRequest_deny_redirect_a')
self._add_suite('test_dnsRequest_deny_redirect_aaaa')
self._add_suite('test_dnsRequest_deny_redirect_a_range_ttl')
self._add_suite('test_dnsRequest_deny_redirect_aaaa_range_ttl')
self._add_suite('test_dnsRequest_allow_rdtype_a')
self._add_suite('test_dnsRequest_allow_rdtype_aaaa')
self._add_suite('test_dnsRequest_allow_rdtype_cname')
self._add_suite('test_securityPolicy_bypass')
self._add_suite('test_securityPolicy_intercept')
self._add_suite('test_securityPolicy_intercept_certerrExpired')
self._add_suite('test_securityPolicy_intercept_certerrSelf_signed')
self._add_suite('test_securityPolicy_intercept_certerrUntrusted_root')
self._add_suite('test_proxyPolicy_ssl_redirect')
self._add_suite('test_proxyPolicy_ssl_block')
self._add_suite('test_proxyPolicy_ssl_replace')
self._add_suite('test_proxyPolicy_ssl_hijack')
self._add_suite('test_proxyPolicy_ssl_insert')
self._add_suite('test_proxyPolicy_http_redirect')
self._add_suite('test_proxyPolicy_http_block')
self._add_suite('test_proxyPolicy_http_replace')
self._add_suite('test_proxyPolicy_http_hijack')
self._add_suite('test_proxyPolicy_http_insert')
self._add_suite('test_https_con_traffic_1k')
self._add_suite('test_https_con_traffic_4k')
self._add_suite('test_https_con_traffic_16k')
self._add_suite('test_https_con_traffic_64k')
self._add_suite('test_https_con_traffic_256k')
self._add_suite('test_https_con_traffic_1M')
self._add_suite('test_https_con_traffic_4M')
self._add_suite('test_https_con_traffic_16M')
self._add_suite('test_https_con_traffic_64M')
self._add_suite('test_http_firewall_allow')
self._add_suite('test_http_firewall_deny_drop')
self._add_suite('test_http_firewall_deny_rst')
self._add_suite('test_http_firewall_deny_block')
self._add_suite('test_ssl_firewall_allow')
self._add_suite('test_ssl_firewall_deny_drop')
self._add_suite('test_ssl_firewall_deny_rst')
def _stdout_suite_result(self):
print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s'))
runner = unittest.TextTestRunner(verbosity=2)
runner.run(self.suite)
print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s'))
def _output_suite_result(self):
self._stdout_suite_result()
def execute_suite_tsg_diagnose(self):
self._get_suite_option()
self._get_suite_config()
self._init_suite()
try:
if int(self.config_dict['start_time_random_delay_range']['enabled']) == 1:
time.sleep(random.randint(int(self.config_dict['start_time_random_delay_range']['left_edge']),int(self.config_dict['start_time_random_delay_range']['right_edge'])))
counter = 0
print("Tsg diagnose run sum: %d" % self.count)
while True:
print("\nRUN %d" %(counter + 1))
self._output_suite_result()
counter = counter + 1
if not self.loop:
if counter >= self.count:
break
time.sleep(self.interval)
except Exception as ex:
print("Process get an exception, will exit, Exception info:", ex)
sys.exit(1)
if __name__ == '__main__':
tsg_diagnose_run = TsgDiagnoseRun()
tsg_diagnose_run.execute_suite_tsg_diagnose()