import sys import unittest import json import pycurl import os import re import time import io from io import BytesIO import getopt import ciunittest import argparse import hashlib from configparser import ConfigParser import random import dns.exception import dns.resolver import sys import logging suite_test_config_dict = {'test_securityPolicy_bypass': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_dnsRequest_deny_drop': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_dnsRequest_deny_redirect_a': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_dnsRequest_deny_redirect_aaaa': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_dnsRequest_deny_redirect_a_range_ttl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_dnsRequest_deny_redirect_aaaa_range_ttl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_dnsRequest_allow_rdtype_a': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_dnsRequest_allow_rdtype_aaaa': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_dnsRequest_allow_rdtype_cname': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_securityPolicy_intercept': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_securityPolicy_intercept_certerrExpired': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_securityPolicy_intercept_certerrSelf_signed': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_securityPolicy_intercept_certerrUntrusted_root': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_ssl_redirect': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_ssl_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_ssl_replace': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_ssl_hijack': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_ssl_insert': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_http_redirect': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_http_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_http_replace': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_http_hijack': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_proxyPolicy_http_insert': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_1k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_4k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_16k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_64k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_256k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_1M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_4M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_16M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_https_con_traffic_64M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_http_firewall_allow': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_http_firewall_deny_drop': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600}, 'test_http_firewall_deny_rst': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_http_firewall_deny_block': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_ssl_firewall_allow': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'test_ssl_firewall_deny_drop': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600}, 'test_ssl_firewall_deny_rst': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, 'start_time_random_delay_range': {'enabled':1,'left_edge':1,'right_edge':30}} ssl_bypass_info_re = "Ssl connection bypass success" ssl_intercept_info_re = "Ssl connection intercept success" https_exprired_info_re = "Ssl exprired cert check success" https_self_signed_info_re = "Ssl self signed cert check success" https_untrusted_root_info_re = "Ssl untrusted_root cert check success" ssl_redirect_info_re = "Ssl connection redirect success" ssl_replace_info_re = "Ssl connection replace success" ssl_insert_info_re = "Ssl connection insert success" ssl_hijack_info_re = "Ssl connection hijack success" ssl_block_info_re = "Ssl connection block success" http_redirect_info_re = "http connection redirect success" http_replace_info_re = "http connection replace success" http_insert_info_re = "http connection insert success" http_hijack_info_re = "http connection hijack success" http_block_info_re = "http connection block success" https_conn_taffic_1k_re = 'https download file 1k success' https_conn_taffic_4k_re = 'https download file 4k success' https_conn_taffic_16k_re = 'https download file 16k success' https_conn_taffic_64k_re = 'https download file 64k success' https_conn_taffic_256k_re = 'https download file 256k success' https_conn_taffic_1M_re = 'https download file 1M success' https_conn_taffic_4M_re = 'https download file 4M success' https_conn_taffic_16M_re = 'https download file 16M success' https_conn_taffic_64M_re = 'https download file 64M success' http_firewall_allow_re = "http firewall action allow success" http_firewall_deny_drop_re = "http firewall aciton deny subaction drop success" http_firewall_deny_rst_re = "http firewall action deny subaction rst success" http_firewall_deny_block_re = "http firewall aciton deny subaction block success" ssl_firewall_allow_re = "ssl firewall action allow success" ssl_firewall_deny_drop_re = "ssl firewall action deny subaction drop success" ssl_firewall_deny_rst_re = "ssl firewall action deny subaction rst success" URLBypass = 'https://sha384.badssl.selftest.gdnt-cloud.website' URLIntercept = 'https://sha256.badssl.selftest.gdnt-cloud.website' URLSslExpired = 'https://expired.badssl.selftest.gdnt-cloud.website' URLSslSelfsigned = 'https://self-signed.badssl.selftest.gdnt-cloud.website' URLSslSuntrustedroot = 'https://untrusted-root.badssl.selftest.gdnt-cloud.website' URLSslRedirect = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js' URLSslReplace = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js' URLSslInsert = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html' URLSslHijack = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js' URLSslBlock = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js' URLHttpRedirect = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js' URLHttpReplace = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js' URLHttpInsert = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html' URLHttpHijack = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js' URLHttpBlock = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js' URLConTraffic_1k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k" URLConTraffic_4k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k" URLConTraffic_16k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k" URLConTraffic_64k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k" URLConTraffic_256k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k" URLConTraffic_1M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M" URLConTraffic_4M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M" URLConTraffic_16M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M" URLConTraffic_64M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M" URLHttpFirewallAllow = "http://http.badssl.selftest.gdnt-cloud.website" URLHttpFirewallDenyDrop = "http://http-credit-card.badssl.selftest.gdnt-cloud.website" URLHttpFirewallDenyRst = "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website" URLHttpFirewallDenyBlock = "http://http-login.badssl.selftest.gdnt-cloud.website" URLSslFirewallAllow = "https://sha512.badssl.selftest.gdnt-cloud.website" URLSslFirewallDenyDrop = "https://rsa2048.badssl.selftest.gdnt-cloud.website" URLSslFirewallDenyRst = "https://rsa4096.badssl.selftest.gdnt-cloud.website" HOST_DNS_ALLOW_A = "dnstest.allow-a-ipv4.selftest.gdnt-cloud.website" HOST_DNS_DENY_REDIRECT_A = "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website" HOST_DNS_DENY_DORY = "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website" HOST_DNS_DENY_REDIRECT_A_RTTL = "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website" HOST_DNS_ALLOW_AAAA = "dnstest.allow-4a-ipv6.selftest.gdnt-cloud.website" HOST_DNS_DENY_REDIRECT_AAAA = "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website" HOST_DNS_DENY_REDIRECT_AAAA_RTTL = "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website" HOST_DNS_CNAME_QUERY = "dnstest.test-cname.selftest.gdnt-cloud.website" HOST_DNS_CNAME_ANSWER = "dnstest.testanswer-cname.selftest.gdnt-cloud.website" DNS_REDIRECT_IPV4_ADDR = "33.252.0.101" DNS_REDIRECT_IPV6_ADDR = "2001:db8::1001" DNS_ALLOW_A_ADDR = "233.252.0.1" DNS_ALLOW_AAAA_ADDR = "2001:db8::1" DNS_SERVER_ALLOW_TTL = 60 DNS_SERVER_REDIRECT_TTL = 333 DNS_SERVER_REDIRECT_RANGE_LOW = 400 DNS_SERVER_REDIRECT_RANGE_HIGH = 500 DNS_SERVER_IP = ["192.0.2.135"] DnsRequestFirewallDenyDrop = "Dns request timeout is deny drop sucess" DnsARequestFireWallDenyRedirect = "Dns rdtype A request is deny reidrect sucess" DnsAAAARequestFireWallDenyRedirect = "Dns rdtype AAAA request is deny redirect sucess" DnsARequestFireWallDenyRedirectRangTTL = "Dns rdtype A request is deny reidrect and range ttl sucess" DnsAAAARequestFireWallDenyRedirectRangTTL = "Dns rdtype AAAA request is deny redirect and range ttl sucess" DnsARequestFirewallAllow = "Dns rdtype A request data is sucess" DnsAAAARequestFirewallAllow = "Dns rdtype AAAA request data is sucess" DnsCNAMERequestFirewallAllow = "Dns rdtype CNAME request data is sucess" class _WritelnDecorator(object): """Used to decorate file-like objects with a handy 'writeln' method""" def __init__(self,stream): self.stream = stream def __getattr__(self, attr): if attr in ('stream', '__getstate__'): raise AttributeError(attr) return getattr(self.stream,attr) def writeln(self, arg=None): if arg: self.write(arg) self.write('\n') # text-mode streams translate to \r\n if needed class DignTextTestResult(unittest.result.TestResult): """A test result class that can print formatted text results to a stream. Used by TextTestRunner. """ separator1 = '=' * 70 separator2 = '-' * 70 def __init__(self, stream, descriptions, verbosity): super(DignTextTestResult, self).__init__(stream, descriptions, verbosity) self.stream = stream self.dignStream = _WritelnDecorator(sys.stderr) self.showAll = verbosity > 1 self.dots = verbosity == 1 self.descriptions = descriptions def getDescription(self, test): doc_first_line = test.shortDescription() if self.descriptions and doc_first_line: return '\n'.join((str(test), doc_first_line)) else: return str(test) def startTest(self, test): super(DignTextTestResult, self).startTest(test) if self.showAll: self.stream.write(self.getDescription(test)) self.dignStream.write(self.getDescription(test)) self.stream.write(" ... ") self.dignStream.write(" ... ") self.stream.flush() self.dignStream.flush() def addSuccess(self, test): super(DignTextTestResult, self).addSuccess(test) if self.showAll: self.stream.writeln("ok") self.dignStream.writeln("ok") elif self.dots: self.stream.write('.') self.dignStream.write('.') self.stream.flush() self.dignStream.flush() def addError(self, test, err): super(DignTextTestResult, self).addError(test, err) if self.showAll: self.stream.writeln("ERROR") self.dignStream.writeln("ERROR") elif self.dots: self.stream.write('E') self.dignStream.write('E') self.stream.flush() self.dignStream.flush() def addFailure(self, test, err): super(DignTextTestResult, self).addFailure(test, err) if self.showAll: self.stream.writeln("FAIL") self.dignStream.writeln("FAIL") elif self.dots: self.stream.write('F') self.dignStream.write('F') self.stream.flush() self.dignStream.flush() def addSkip(self, test, reason): super(DignTextTestResult, self).addSkip(test, reason) if self.showAll: self.stream.writeln("skipped {0!r}".format(reason)) self.dignStream.writeln("skipped {0!r}".format(reason)) elif self.dots: self.stream.write("s") self.dignStream.write("s") self.stream.flush() self.dignStream.flush() def addExpectedFailure(self, test, err): super(DignTextTestResult, self).addExpectedFailure(test, err) if self.showAll: self.stream.writeln("expected failure") self.dignStream.writeln("expected failure") elif self.dots: self.stream.write("x") self.dignStream.write("x") self.stream.flush() self.dignStream.flush() def addUnexpectedSuccess(self, test): super(DignTextTestResult, self).addUnexpectedSuccess(test) if self.showAll: self.stream.writeln("unexpected success") self.dignStream.writeln("unexpected success") elif self.dots: self.stream.write("u") self.dignStream.write("u") self.stream.flush() self.dignStream.flush() def printErrors(self): if self.dots or self.showAll: self.stream.writeln() self.dignStream.writeln() self.printErrorList('ERROR', self.errors) self.printErrorList('FAIL', self.failures) def printErrorList(self, flavour, errors): for test, err in errors: self.stream.writeln(self.separator1) self.dignStream.writeln(self.separator1) self.stream.writeln("%s: %s" % (flavour,self.getDescription(test))) self.dignStream.writeln("%s: %s" % (flavour,self.getDescription(test))) self.stream.writeln(self.separator2) self.dignStream.writeln(self.separator2) self.stream.writeln("%s" % err) self.dignStream.writeln("%s" % err) def get_logger(name,logPath,enableConsole=True): logger = logging.getLogger(name) fileHandler = logging.FileHandler(logPath) fmt = '%(asctime)s, %(levelname)s, %(message)s' formatter = logging.Formatter(fmt=fmt, datefmt='%a %b %d %H:%M:%S %Y') fileHandler.setFormatter(formatter) fileHandler.setLevel(logging.DEBUG) logger.addHandler(fileHandler) if enableConsole == True: consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(formatter) consoleHandler.setLevel(logging.DEBUG) logger.addHandler(consoleHandler) logger.setLevel(logging.DEBUG) return logger class DNSCheckRequestBuild: def dns_action_deny_subaction_drop(self): dns_resolver=dns.resolver.Resolver() dns_resolver.nameservers = DNS_SERVER_IP dns_resolver.timeout = float(3) dns_resolver.lifetime = float(3) try: dns_answer = dns_resolver.query(HOST_DNS_DENY_DORY, 'A') except dns.exception.DNSException as errorinfo: if type(errorinfo) == dns.exception.Timeout: raise Exception(DnsRequestFirewallDenyDrop) else: raise Exception("Error: The dns_action_deny_subaction_drop check failure, code: %s" % errorinfo) else: raise Exception("Error: The dns_action_deny_subaction_drop test deny drop failure" ) def dns_action_deny_subaction_redirect_a(self): dns_resolver=dns.resolver.Resolver() dns_resolver.nameservers = DNS_SERVER_IP dns_resolver.timeout = float(3) dns_resolver.lifetime = float(3) try: dns_answer = dns_resolver.query(HOST_DNS_DENY_REDIRECT_A, 'A') except dns.exception.DNSException as errorinfo: raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure, code: %s" % errorinfo) else: # drop-redirect and respond rdtype A ipv4 if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL: raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL)) return for i in dns_answer.response.answer: for j in i.items: if j.rdtype == 1: #'rdtype is A: ipv4' if j.address == DNS_REDIRECT_IPV4_ADDR: raise Exception(DnsARequestFireWallDenyRedirect) else: raise Exception("Error: The dns request rdtype A drop redirect check failure: respond value error") else: raise Exception("Error: The dns request rdtype A drop redirect check failure: respond rdtype error") def dns_action_deny_subaction_redirect_aaaa(self): dns_resolver=dns.resolver.Resolver() dns_resolver.nameservers = DNS_SERVER_IP dns_resolver.timeout = float(3) dns_resolver.lifetime = float(3) try: dns_answer = dns_resolver.query(HOST_DNS_DENY_REDIRECT_AAAA, 'AAAA') except dns.exception.DNSException as errorinfo: raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure, code: %s" % errorinfo) else: # drop-redirect and respond rdtype A ipv6 if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL: raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL)) return for i in dns_answer.response.answer: for j in i.items: if j.rdtype == 28: #'rdtype is A: ipv6' if j.address == DNS_REDIRECT_IPV6_ADDR: raise Exception(DnsAAAARequestFireWallDenyRedirect) else: raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error") else: raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error") def dns_action_deny_subaction_redirect_a_rang_ttl(self): dns_resolver=dns.resolver.Resolver() dns_resolver.nameservers = DNS_SERVER_IP dns_resolver.timeout = float(3) dns_resolver.lifetime = float(3) try: dns_answer = dns_resolver.query(HOST_DNS_DENY_REDIRECT_A_RTTL, 'A') except dns.exception.DNSException as errorinfo: raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure, code: %s" % errorinfo) else: # drop-redirect and respond rdtype A ipv4 if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH: raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl,DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH)) return for i in dns_answer.response.answer: for j in i.items: if j.rdtype == 1: #'rdtype is A: ipv4' if j.address == DNS_REDIRECT_IPV4_ADDR: raise Exception(DnsARequestFireWallDenyRedirectRangTTL) else: raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond value error") else: raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond rdtype error") def dns_action_deny_subaction_redirect_aaaa_rang_ttl(self): dns_resolver=dns.resolver.Resolver() dns_resolver.nameservers = DNS_SERVER_IP dns_resolver.timeout = float(3) dns_resolver.lifetime = float(3) try: dns_answer = dns_resolver.query(HOST_DNS_DENY_REDIRECT_AAAA_RTTL, 'AAAA') except dns.exception.DNSException as errorinfo: raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa range ttl check failure, code: %s" % errorinfo) else: # drop-redirect and respond rdtype A ipv6 if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH: raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH)) return for i in dns_answer.response.answer: for j in i.items: if j.rdtype == 28: #'rdtype is A: ipv6' if j.address == DNS_REDIRECT_IPV6_ADDR: raise Exception(DnsAAAARequestFireWallDenyRedirectRangTTL) else: raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error") else: raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error") def dns_action_allow_rdtype_a(self): dns_resolver=dns.resolver.Resolver() dns_resolver.nameservers = DNS_SERVER_IP dns_resolver.timeout = float(3) dns_resolver.lifetime = float(3) try: dns_answer = dns_resolver.query(HOST_DNS_ALLOW_A, 'A') except dns.exception.DNSException as errorinfo: raise Exception("Error: The dns request rdtype A allow check failure, code: %s" % errorinfo) else: if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL: raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL)) return for i in dns_answer.response.answer: for j in i.items: if j.rdtype == 1: #'rdtype is A: ipv4' if j.address == DNS_ALLOW_A_ADDR: raise Exception(DnsARequestFirewallAllow) else: raise Exception("Error: The dns request rdtype A allow check failure: respond value error") else: raise Exception("Error: The dns request rdtype A allow check failure: respond rdtype error") def dns_action_allow_rdtype_aaaa(self): dns_resolver=dns.resolver.Resolver() dns_resolver.nameservers = DNS_SERVER_IP dns_resolver.timeout = float(3) dns_resolver.lifetime = float(3) try: dns_answer = dns_resolver.query(HOST_DNS_ALLOW_AAAA, 'AAAA') except dns.exception.DNSException as errorinfo: raise Exception("Error: The dns request rdtype AAAA allow check failure, code: %s" % errorinfo) else: if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL: raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL)) return for i in dns_answer.response.answer: for j in i.items: if j.rdtype == 28: #'rdtype is AAAA: ipv6' if j.address == DNS_ALLOW_AAAA_ADDR: raise Exception(DnsAAAARequestFirewallAllow) else: raise Exception("Error: The dns request rdtype AAAA allow check failure: respond value error") else: raise Exception("Error: The dns request rdtype AAAA allow check failure: response rdtype error") def dns_action_allow_rdtype_cname(self): dns_resolver=dns.resolver.Resolver() dns_resolver.nameservers = DNS_SERVER_IP dns_resolver.timeout = float(3) dns_resolver.lifetime = float(3) try: dns_answer = dns_resolver.query(HOST_DNS_CNAME_QUERY, 'CNAME') except dns.exception.DNSException as errorinfo: raise Exception("Error: The dns request rdtype CNAME allow check failure, code: %s" % errorinfo) else: if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL: raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL)) return for i in dns_answer.response.answer: for j in i.items: if j.rdtype == 5: #'CNAME: tag(www.xxx.com)' m=str(j) if m == (HOST_DNS_CNAME_ANSWER + '.'): raise Exception(DnsCNAMERequestFirewallAllow) else: raise Exception("Error: The dns request rdtype CNAME allow check failure: respond value error") else: raise Exception("Error: The dns request rdtype CNAME allow check failure: respond rdtype error") class SSLCheckRequestBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def _set_conn_opt(self,test_suite_name, url): self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) self.conn.setopt(self.conn.URL,url) self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) def _get_conn_issuer(self,certs): issuer = () for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) return issuer def ssl_bypass(self,test_suite_name): self._set_conn_opt(test_suite_name,URLBypass) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = self._get_conn_issuer(certs) if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): raise Exception(ssl_bypass_info_re) elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0): raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1]) else: raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1]) def ssl_intercept(self,test_suite_name): self._set_conn_opt(test_suite_name, URLIntercept) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = self._get_conn_issuer(certs) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1]) else: raise Exception(ssl_intercept_info_re) else: raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) class SslInterceptRequestBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def _set_conn_opt(self,test_suite_name,url): self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) self.conn.setopt(self.conn.URL, url) self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) def _conn_to_perform(self, test_suite_name, sec_info_re): self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception(sec_info_re) else: raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1]) else: if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0) or \ re.search(r'\bCN[\s\S]*=[\s\S]*badssl[\s\S]*',issuer[1],0): raise Exception("Error: Ssl connection intercept failed, cert info: %s" % issuer[1]) else: raise Exception(sec_info_re) def ssl_intercept_certerrExpired(self,test_suite_name): self._set_conn_opt(test_suite_name,URLSslExpired) self._conn_to_perform(test_suite_name,https_exprired_info_re) def ssl_intercept_certerrSelf_signed(self,test_suite_name): self._set_conn_opt(test_suite_name,URLSslSelfsigned) self._conn_to_perform(test_suite_name,https_self_signed_info_re) def ssl_intercept_certerrUntrusted_root(self,test_suite_name,): self._set_conn_opt(test_suite_name,URLSslSuntrustedroot) self._conn_to_perform(test_suite_name,https_untrusted_root_info_re) class ProxyRequestBuild: def __init__(self): self.bodyBuf = BytesIO() self.conn = pycurl.Curl() self.conn.setopt(self.conn.ENCODING, "gzip,deflate") def _cert_verify(self, certs, isSsl): if isSsl == True: issuer = () for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1]) else: return else: raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) def _set_conn_opt(self,test_suite_name, url,isSsl): self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) self.conn.setopt(self.conn.URL,url) self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) if isSsl == True: self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def proxy_redirect(self,test_suite_name,proxy_url,isSsl): certs = None self._set_conn_opt(test_suite_name,proxy_url,isSsl) self.conn.perform() if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.close() self._cert_verify(certs, isSsl) if rescode == 301 or rescode == 302: if isSsl == True: raise Exception(ssl_redirect_info_re) else: raise Exception(http_redirect_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode) else: raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode) def proxy_replace(self,test_suite_name, proxy_url,isSsl): certs = None self._set_conn_opt(test_suite_name, proxy_url,isSsl) self.conn.perform() if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() self._cert_verify(certs, isSsl) if not re.search(r'EnglishSearchShared', body, 0) and \ re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0): if isSsl == True: raise Exception(ssl_replace_info_re) else: raise Exception(http_replace_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection replace fail") else: raise Exception("Error:Http connection replace fail") def proxy_insert(self,test_suite_name,proxy_url,isSsl): certs = None self._set_conn_opt(test_suite_name,proxy_url,isSsl) self.conn.perform() body = self.bodyBuf.getvalue().decode('utf-8') if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() self._cert_verify(certs, isSsl) if re.search(r'httpSelfcheckInsert', body, 0) and \ re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0): if isSsl == True: raise Exception(ssl_insert_info_re) else: raise Exception(http_insert_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection insert fail") else: raise Exception("Error:Http connection insert fail") def proxy_block(self,test_suite_name,proxy_url,isSsl): certs = None self._set_conn_opt(test_suite_name,proxy_url,isSsl) self.conn.perform() if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) body = self.bodyBuf.getvalue().decode('utf-8') self.conn.close() self._cert_verify(certs, isSsl) if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451): if isSsl == True: raise Exception(ssl_block_info_re) else: raise Exception(http_block_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode) else: raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode) def proxy_hijack(self,test_suite_name,proxy_url,isSsl): certs = None self._set_conn_opt(test_suite_name,proxy_url,isSsl) self.conn.perform() if isSsl == True: certs = self.conn.getinfo(self.conn.INFO_CERTINFO) self.conn.close() self._cert_verify(certs, isSsl) hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue()) if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0): if isSsl == True: raise Exception(ssl_hijack_info_re) else: raise Exception(http_hijack_info_re) else: if isSsl == True: raise Exception("Error:Ssl connection hijack fail") else: raise Exception("Error:Http connection hijack fail") class SSLFileDownloadBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) self.conn.setopt(self.conn.OPT_CERTINFO, 1) def _get_conninfo(self,conn): dictconninfo = {} dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME) dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME) dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME) dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) return dictconninfo def _set_conn_opt(self,test_suite_name,url): self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) self.conn.setopt(self.conn.URL,url) self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) def _write_in_logfile(self, sizeStr, connInfoDict): connInfoLogPath = "/opt/dign_client/log/download_conn_info.log" + '.' + time.strftime("%Y-%m-%d", time.localtime()) #connInfoLogger = get_logger("download",connInfoLogPath,enableConsole=False) connInfoStr = json.dumps(connInfoDict) #connInfoLogger.debug("Fize Size:" + sizeStr + ";connection info:" + connInfoStr) with open(connInfoLogPath,"a+") as f: f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Fize Size:" + sizeStr + ";connection info:" + connInfoStr + '\n') f.close() def conn_traffic(self,test_suite_name,url,conn_taffic_re,sizeStr, size): self._set_conn_opt(test_suite_name, url) self.conn.perform() certs = self.conn.getinfo(self.conn.INFO_CERTINFO) conninfo = self._get_conninfo(self.conn) self.conn.close() issuer = () for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info break if len(issuer) <= 0: raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1]) else: raise Exception("Error: Intercept fail: no Tango cert,cert info:%s" % issuer[1]) if int(conninfo["size_download"]) == size: self._write_in_logfile(sizeStr,conninfo) raise Exception(conn_taffic_re) else: raise Exception("Error: connection tarffic size error and is no equal", sizeStr) class HttpFirewallActionBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) def _set_conn_opt(self,test_suite_name, url): self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) self.conn.setopt(self.conn.URL,url) self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) def action_allow(self,test_suite_name): self._set_conn_opt(test_suite_name,URLHttpFirewallAllow) self.conn.perform() rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.close() if rescode == 200: raise Exception(http_firewall_allow_re) else: raise Exception("Error: The stream may be redirected, http code %s" % rescode) def action_deny_subaction_drop(self,test_suite_name): self._set_conn_opt(test_suite_name,URLHttpFirewallDenyDrop) try: self.conn.perform() self.conn.close() except pycurl.error as errorinfo: errcode = errorinfo.args[0] if(errcode == 28): raise Exception(http_firewall_deny_drop_re) else: raise Exception("Error: The stream may be not dropped %s" % errorinfo) def action_deny_subaction_rst(self,test_suite_name): self._set_conn_opt(test_suite_name,URLHttpFirewallDenyRst) try: self.conn.perform() self.conn.close() except pycurl.error as errorinfo: errcode = errorinfo.args[0] if(errcode == 56): raise Exception(http_firewall_deny_rst_re) else: raise Exception("Error: The stream may be not rst %s" % errorinfo) def action_deny_subaction_block(self,test_suite_name): bodyBuf = BytesIO() self._set_conn_opt(test_suite_name,URLHttpFirewallDenyBlock) self.conn.setopt(self.conn.WRITEDATA, bodyBuf) self.conn.perform() rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.close() if rescode == 403: if re.search(r'dign-testing-deny-block', bodyBuf.getvalue().decode('utf-8'), 0): raise Exception(http_firewall_deny_block_re) else: raise Exception("Error: response content is not required, required \'dign-testing-deny-block\'") else: raise Exception("Error: The stream may be not block, http code %s " % rescode) class SslFirewallActionBuild: def __init__(self): self.conn = pycurl.Curl() self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) self.conn.setopt(self.conn.OPT_CERTINFO, 1) self.conn.setopt(self.conn.SSL_VERIFYPEER, False) def _set_conn_opt(self,test_suite_name, url): self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) self.conn.setopt(self.conn.URL,url) self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) def action_allow(self,test_suite_name): self._set_conn_opt(test_suite_name,URLSslFirewallAllow) self.conn.perform() rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) self.conn.close() if rescode == 200: raise Exception(ssl_firewall_allow_re) else: raise Exception("Error: The stream may be redirected, http code %s" % rescode) def action_deny_subaction_drop(self,test_suite_name): self._set_conn_opt(test_suite_name,URLSslFirewallDenyDrop) try: self.conn.perform() self.conn.close() except pycurl.error as errorinfo: errcode = errorinfo.args[0] if(errcode == 28): raise Exception(ssl_firewall_deny_drop_re) else: raise Exception("Error: The stream may be not dropped %s" % errorinfo) def action_deny_subaction_rst(self,test_suite_name): self._set_conn_opt(test_suite_name,URLSslFirewallDenyRst) try: self.conn.perform() self.conn.close() except pycurl.error as errorinfo: errcode = errorinfo.args[0] if(errcode == 35): raise Exception(ssl_firewall_deny_rst_re) else: raise Exception("Error: The stream may be not rst %s" % errorinfo) class SslUnitTest(unittest.TestCase): def test_dnsRequest_deny_drop(self): dnsHandler = DNSCheckRequestBuild() with self.assertRaisesRegex(Exception, DnsRequestFirewallDenyDrop): dnsHandler.dns_action_deny_subaction_drop() def test_dnsRequest_deny_redirect_a(self): dnsHandler = DNSCheckRequestBuild() with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirect): dnsHandler.dns_action_deny_subaction_redirect_a() def test_dnsRequest_deny_redirect_aaaa(self): dnsHandler = DNSCheckRequestBuild() with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirect): dnsHandler.dns_action_deny_subaction_redirect_aaaa() def test_dnsRequest_deny_redirect_a_range_ttl(self): dnsHandler = DNSCheckRequestBuild() with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirectRangTTL): dnsHandler.dns_action_deny_subaction_redirect_a_rang_ttl() def test_dnsRequest_deny_redirect_aaaa_range_ttl(self): dnsHandler = DNSCheckRequestBuild() with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirectRangTTL): dnsHandler.dns_action_deny_subaction_redirect_aaaa_rang_ttl() def test_dnsRequest_allow_rdtype_a(self): dnsHandler = DNSCheckRequestBuild() with self.assertRaisesRegex(Exception, DnsARequestFirewallAllow): dnsHandler.dns_action_allow_rdtype_a() def test_dnsRequest_allow_rdtype_aaaa(self): dnsHandler = DNSCheckRequestBuild() with self.assertRaisesRegex(Exception, DnsAAAARequestFirewallAllow): dnsHandler.dns_action_allow_rdtype_aaaa() def test_dnsRequest_allow_rdtype_cname(self): dnsHandler = DNSCheckRequestBuild() with self.assertRaisesRegex(Exception, DnsCNAMERequestFirewallAllow): dnsHandler.dns_action_allow_rdtype_cname() def test_securityPolicy_bypass(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_bypass_info_re): sslHandler.ssl_bypass('test_securityPolicy_bypass') def test_securityPolicy_intercept(self): sslHandler = SSLCheckRequestBuild() with self.assertRaisesRegex(Exception, ssl_intercept_info_re): sslHandler.ssl_intercept('test_securityPolicy_intercept') def test_securityPolicy_intercept_certerrExpired(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_exprired_info_re): requestHandler.ssl_intercept_certerrExpired('test_securityPolicy_intercept_certerrExpired') def test_securityPolicy_intercept_certerrSelf_signed(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_self_signed_info_re): requestHandler.ssl_intercept_certerrSelf_signed('test_securityPolicy_intercept_certerrSelf_signed') def test_securityPolicy_intercept_certerrUntrusted_root(self): requestHandler = SslInterceptRequestBuild() with self.assertRaisesRegex(Exception, https_untrusted_root_info_re): requestHandler.ssl_intercept_certerrUntrusted_root('test_securityPolicy_intercept_certerrUntrusted_root') def test_proxyPolicy_ssl_redirect(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_redirect_info_re): proxyHandler.proxy_redirect('test_proxyPolicy_ssl_redirect',URLSslRedirect,True) def test_proxyPolicy_ssl_block(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_block_info_re): proxyHandler.proxy_block('test_proxyPolicy_ssl_block', URLSslBlock,True) def test_proxyPolicy_ssl_replace(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_replace_info_re): proxyHandler.proxy_replace('test_proxyPolicy_ssl_replace',URLSslReplace, True) def test_proxyPolicy_ssl_hijack(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_hijack_info_re): proxyHandler.proxy_hijack('test_proxyPolicy_ssl_hijack', URLSslHijack,True) def test_proxyPolicy_ssl_insert(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, ssl_insert_info_re): proxyHandler.proxy_insert('test_proxyPolicy_ssl_insert',URLSslInsert,True) def test_proxyPolicy_http_redirect(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_redirect_info_re): proxyHandler.proxy_redirect('test_proxyPolicy_http_redirect',URLHttpRedirect, False) def test_proxyPolicy_http_block(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_block_info_re): proxyHandler.proxy_block('test_proxyPolicy_http_block', URLHttpBlock,False) def test_proxyPolicy_http_replace(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_replace_info_re): proxyHandler.proxy_replace('test_proxyPolicy_http_replace',URLHttpReplace, False) def test_proxyPolicy_http_hijack(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_hijack_info_re): proxyHandler.proxy_hijack('test_proxyPolicy_http_hijack', URLHttpHijack,False) def test_proxyPolicy_http_insert(self): proxyHandler = ProxyRequestBuild() with self.assertRaisesRegex(Exception, http_insert_info_re): proxyHandler.proxy_insert('test_proxyPolicy_http_insert',URLHttpInsert,False) def test_https_con_traffic_1k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re): requestHandler.conn_traffic( 'test_https_con_traffic_1k', URLConTraffic_1k, https_conn_taffic_1k_re,'1k', 1024) def test_https_con_traffic_4k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re): requestHandler.conn_traffic( 'test_https_con_traffic_4k',URLConTraffic_4k, https_conn_taffic_4k_re, '4k', 4*1024) def test_https_con_traffic_16k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re): requestHandler.conn_traffic( 'test_https_con_traffic_16k', URLConTraffic_16k, https_conn_taffic_16k_re,'16k', 16*1024) def test_https_con_traffic_64k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re): requestHandler.conn_traffic( 'test_https_con_traffic_64k',URLConTraffic_64k, https_conn_taffic_64k_re, '64k', 64*1024) def test_https_con_traffic_256k(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re): requestHandler.conn_traffic( 'test_https_con_traffic_256k', URLConTraffic_256k,https_conn_taffic_256k_re,'256k', 256*1024) def test_https_con_traffic_1M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re): requestHandler.conn_traffic( 'test_https_con_traffic_1M', URLConTraffic_1M, https_conn_taffic_1M_re, '1M', 1024 * 1024) def test_https_con_traffic_4M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re): requestHandler.conn_traffic( 'test_https_con_traffic_4M', URLConTraffic_4M, https_conn_taffic_4M_re,'4M', 4*1024*1024) def test_https_con_traffic_16M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re): requestHandler.conn_traffic( 'test_https_con_traffic_16M', URLConTraffic_16M,https_conn_taffic_16M_re,'16M',16*1024*1024) def test_https_con_traffic_64M(self): requestHandler = SSLFileDownloadBuild() with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re): requestHandler.conn_traffic( 'test_https_con_traffic_64M',URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024) def test_http_firewall_allow(self): requestHandler = HttpFirewallActionBuild() with self.assertRaisesRegex(Exception, http_firewall_allow_re): requestHandler.action_allow('test_http_firewall_allow') def test_http_firewall_deny_drop(self): requestHandler = HttpFirewallActionBuild() with self.assertRaisesRegex(Exception, http_firewall_deny_drop_re): requestHandler.action_deny_subaction_drop('test_http_firewall_deny_drop') def test_http_firewall_deny_rst(self): requestHandler = HttpFirewallActionBuild() with self.assertRaisesRegex(Exception, http_firewall_deny_rst_re): requestHandler.action_deny_subaction_rst('test_http_firewall_deny_rst') def test_http_firewall_deny_block(self): requestHandler = HttpFirewallActionBuild() with self.assertRaisesRegex(Exception, http_firewall_deny_block_re): requestHandler.action_deny_subaction_block('test_http_firewall_deny_block') def test_ssl_firewall_allow(self): requestHandler = SslFirewallActionBuild() with self.assertRaisesRegex(Exception, ssl_firewall_allow_re): requestHandler.action_allow('test_ssl_firewall_allow') def test_ssl_firewall_deny_drop(self): requestHandler = SslFirewallActionBuild() with self.assertRaisesRegex(Exception, ssl_firewall_deny_drop_re): requestHandler.action_deny_subaction_drop('test_ssl_firewall_deny_drop') def test_ssl_firewall_deny_rst(self): requestHandler = SslFirewallActionBuild() with self.assertRaisesRegex(Exception, ssl_firewall_deny_rst_re): requestHandler.action_deny_subaction_rst('test_ssl_firewall_deny_rst') class TsgDiagnose: def __init__(self): self.interval = 1 self.loop = False self.count = 1 self.config = None self.client = None self.config_dict = {} self.dign_duration = 0 def _get_dign_option(self): parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help") parser.add_argument('-i','--interval', type = int, default = 30,help='Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose.') parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535') parser.add_argument('-p','--configpath', type = str, default = '/opt/dign_client/etc/client.conf',help='Specifies the config file, default /opt/dign_client/etc/client.conf') parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal') args = parser.parse_args() self.interval = args.interval self.loop = args.loop self.count = args.count self.config = args.configpath if self.count == 0: print("Error: bad number of tsg diagnose and will exit") parser.print_help() sys.exit(1) def _get_dign_config(self): global suite_test_config_dict config = ConfigParser() config.read(self.config, encoding='UTF-8') for section in config.sections(): if section in suite_test_config_dict: suite_test_config_dict[section].update(config.items(section)) else: suite_test_config_dict[section] = dict(config.items(section)) self.config_dict = suite_test_config_dict for config_value in self.config_dict.values(): if 'conn_timeout' not in config_value: continue if int(config_value['enabled']) != 1: continue self.dign_duration += int(config_value['conn_timeout']) def _add_suite(self,test_suite_name): if int(self.config_dict[test_suite_name]['enabled']) == 1: self.suite.addTest(SslUnitTest(test_suite_name)) def _add_dign_case(self): self.suite = unittest.TestSuite() self.suite._cleanup = False self._add_suite('test_securityPolicy_bypass') self._add_suite('test_securityPolicy_intercept') self._add_suite('test_securityPolicy_intercept_certerrExpired') self._add_suite('test_securityPolicy_intercept_certerrSelf_signed') self._add_suite('test_securityPolicy_intercept_certerrUntrusted_root') self._add_suite('test_proxyPolicy_ssl_redirect') self._add_suite('test_proxyPolicy_ssl_block') self._add_suite('test_proxyPolicy_ssl_replace') self._add_suite('test_proxyPolicy_ssl_hijack') self._add_suite('test_proxyPolicy_ssl_insert') self._add_suite('test_proxyPolicy_http_redirect') self._add_suite('test_proxyPolicy_http_block') self._add_suite('test_proxyPolicy_http_replace') self._add_suite('test_proxyPolicy_http_hijack') self._add_suite('test_proxyPolicy_http_insert') self._add_suite('test_https_con_traffic_1k') self._add_suite('test_https_con_traffic_4k') self._add_suite('test_https_con_traffic_16k') self._add_suite('test_https_con_traffic_64k') self._add_suite('test_https_con_traffic_256k') self._add_suite('test_https_con_traffic_1M') self._add_suite('test_https_con_traffic_4M') self._add_suite('test_https_con_traffic_16M') self._add_suite('test_https_con_traffic_64M') self._add_suite('test_http_firewall_allow') self._add_suite('test_http_firewall_deny_drop') self._add_suite('test_http_firewall_deny_rst') self._add_suite('test_http_firewall_deny_block') self._add_suite('test_ssl_firewall_allow') self._add_suite('test_ssl_firewall_deny_drop') self._add_suite('test_ssl_firewall_deny_rst') self._add_suite('test_dnsRequest_deny_drop') self._add_suite('test_dnsRequest_deny_redirect_a') self._add_suite('test_dnsRequest_deny_redirect_aaaa') self._add_suite('test_dnsRequest_deny_redirect_a_range_ttl') self._add_suite('test_dnsRequest_deny_redirect_aaaa_range_ttl') def _dign_running(self): print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^120s')) runningLogPath = "/opt/dign_client/log/tsg-diagnose.log" + '.' + time.strftime("%Y-%m-%d", time.localtime()) #runningLogger = get_logger("running",runningLogPath, False) #runningLogger.debug("Diagnose Start,the It will take up to %d seconds" %(self.dign_duration)) with open(runningLogPath,"a+") as f: f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Diagnose Start,the It will take up to " + str(self.dign_duration) + " senconds") f.write('\n') f.close() result_stream = io.StringIO() runner = unittest.TextTestRunner(stream=result_stream,verbosity=2,resultclass=DignTextTestResult) runner.run(self.suite) with open(runningLogPath,"a+") as f: f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Diagnose end, Testing results:" + "\n" + result_stream.getvalue()) f.close() #runningLogger.debug("Diagnose end, Testing results:" + "\n" + result_stream.getvalue()) print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^120s')) def dign_exec(self): self._get_dign_option() self._get_dign_config() self._add_dign_case() dign_counter = 0 try: if int(self.config_dict['start_time_random_delay_range']['enabled']) == 1: time.sleep(random.randint( \ int(self.config_dict['start_time_random_delay_range']['left_edge']),\ int(self.config_dict['start_time_random_delay_range']['right_edge']))) while True: print("\nRUN %d" %(dign_counter + 1)) self._dign_running() dign_counter = dign_counter + 1 if not self.loop: if dign_counter >= self.count: break time.sleep(self.interval) except Exception as ex: print("Process get an exception, will exit, Exception info:", ex) sys.exit(1) if __name__ == '__main__': tsg_diagnose_running = TsgDiagnose() tsg_diagnose_running.dign_exec()