import sys import unittest import json import pycurl import os import re import time import io from io import BytesIO # import getopt # import ciunittest import argparse import hashlib from configparser import ConfigParser import random import dns.exception import dns.resolver import sys import logging import copy from prettytable import PrettyTable,NONE,HEADER import yaml class ConfigLoader: DEFAULT_CONFIGS = { 'test_firewallBypass_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallDenyDrop_dns': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'test_firewallDenyRedirectA_dns': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'test_firewallDenyRedirectAAAA_dns': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'test_firewallDenyRedirectARangeTTL_dns': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'test_firewallDenyRedirectAAAARangeTTL_dns': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'test_dnsRequest_allow_rdtype_a': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_dnsRequest_allow_rdtype_aaaa': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_dnsRequest_allow_rdtype_cname': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslCerterrExpired': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslCerterrSelfsigned': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslCerterrUntrustedroot': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyRedirect_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyBlock_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyReplace_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyHijack_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyInsert_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyRedirect_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyBlock_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyReplace_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyHijack_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyInsert_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize1k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize4k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize16k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize64k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize256k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize1M': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize4M': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize16M': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'test_firewallIntercept_sslDownloadSize64M': { 'conn_timeout': 12,'max_recv_speed_large': 6553600 }, 'test_firewallAllow_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallDenyDrop_http': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'test_firewallDenyReset_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallDenyBlock_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallAllow_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallDenyDrop_ssl': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'test_firewallDenyReset_ssl': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallDenyResetFilterHost_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_firewallDenyResetFilterURL_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyDenyFilterHost_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_proxyDenyFilterURL_http': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_shaping_ratelimit_0bps_http': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'test_shaping_ratelimit_0bps_https': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'test_shaping_ratelimit_1000gbps_http': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'test_shaping_ratelimit_1000gbps_https': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 } } def __init__(self, config_path: str): self.__config_path = config_path self.__configs = copy.deepcopy(self.DEFAULT_CONFIGS) self.__load_configs() def __load_configs(self): config_parser = ConfigParser() config_parser.read(self.__config_path, encoding='UTF-8') for section in config_parser.sections(): if section in self.__configs: self.__configs[section].update(config_parser.items(section)) else: self.__configs[section] = dict(config_parser.items(section)) def get_value_by_section_and_key(self, section, key): if section in self.__configs and key in self.__configs[section]: value = self.__configs[section][key] if isinstance(value, str) and value.isdigit(): return int(value) else: return value else: raise KeyError("%s not in configs or %s.%s not in configs." % (section, section, key)) class ServiceFunctionConfigLoader: TABLE_NAME = "service_function_maps" SF_NAME_KEY = "service_function_name" DEVICE_ID_KEY = "device_id" def __init__(self, config_path, name_pattern): self._config_path = config_path self._name_pattern = name_pattern self._configs = None self._load_configs() self._pairs = self._read_name_id_pairs_by_name_pattern(self._name_pattern) def _load_configs(self): try: with open(self._config_path, 'r') as f: load_configs = yaml.safe_load(f) if load_configs is not None: self._configs = load_configs else: print(f"Error: No config loaded from path: {self._config_path}") except FileNotFoundError: print(f"Error: File not found: {self._config_path}") except yaml.YAMLError as e: print(f"Error parsing YAML file: {e}") def _read_name_id_pairs_by_name_pattern(self, name_pattern): if (self._configs is None) or (self.TABLE_NAME not in self._configs): return [] return [ {"name": item[self.SF_NAME_KEY], "id": int(item[self.DEVICE_ID_KEY])} for item in self._configs[self.TABLE_NAME] if re.match(name_pattern, item[self.SF_NAME_KEY]) ] @property def name_id_pairs(self): return self._pairs class CommandParser: COUNT_MIN = 1 COUNT_MAX = 65535 INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$' #INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' def __init__(self): self.__start_up() def __start_up(self): parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose") parser.add_argument('-i','--interval', type = int, default = 30, help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.') parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].') parser.add_argument('--periodical', action='store_true', default = False, help='Enable TSG diagnose periodical, exit when recv a signal.') parser.add_argument('--service_function_names_regexp', type = str, default = '.+', help = "Regexp of the service function names to run. Example: .+") parser.add_argument('--service_function_config_path', type = str, default = '/opt/dign_client/share/service_function_maps.yaml', help = "Specifies the service function config file, default /opt/dign_client/share/service_function_maps.yaml") parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf', help='Specifies the config file, default /opt/dign_client/etc/client.conf') parser.add_argument('--case_names_regexp', type = str, default = '.+', help='Regexp of the case names to run. Example: .+') parser.add_argument('--enable_delay', action='store_true', default = False, help='Enable a random delay between 1 and 30 seconds before running a case.') parser.add_argument('--delay_left_edge', type = int, default = 1, help='The seconds of the delay left edge. Default: 1.') parser.add_argument('--delay_right_edge', type = int, default = 30, help='The seconds of the delay right edge. Default: 30.') # parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running') args = parser.parse_args() if not self.__is_legal_args(args): parser.print_help() sys.exit(1) self.__interval_s = args.interval self.__count = args.count self.__loop = args.periodical self.__config_path = args.config_path self.__case_names_regexp = args.case_names_regexp self.__service_function_names_regexp = args.service_function_names_regexp self.__service_function_config_path = args.service_function_config_path self._enable_delay = args.enable_delay self._delay_left_edge = args.delay_left_edge self._delay_right_edge = args.delay_right_edge def __is_legal_args(self, args) -> bool: if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX: print("Error: the number of TSG diagnoses must in [1,65535]") return False if args.delay_left_edge > args.delay_right_edge: print("Error: The delay in seconds on the left edge is greater than on the right edge.") return True @property def interval_s(self): return self.__interval_s @property def count(self): return self.__count @property def loop(self): return self.__loop @property def config_path(self): return self.__config_path @property def case_names_regexp(self): return r"{}".format(self.__case_names_regexp) @property def service_function_names_regexp(self): return r"{}".format(self.__service_function_names_regexp) @property def service_function_config_path(self): return self.__service_function_config_path @property def delay_seconds(self): if self._enable_delay: return random.randint(self._delay_left_edge, self._delay_right_edge) else: return None class ServerAddressBuilder: IPv4_4TH_OCTET_LEFT_EDGE = 101 DOMAIN_TO_PORT_LIST = [ ('sha384.badssl.selftest.gdnt-cloud.website', 443), ('sha256.badssl.selftest.gdnt-cloud.website', 443), ('expired.badssl.selftest.gdnt-cloud.website', 443), ('self-signed.badssl.selftest.gdnt-cloud.website', 443), ('untrusted-root.badssl.selftest.gdnt-cloud.website', 443), ('web-replay.badssl.selftest.gdnt-cloud.website', 80), ('web-replay.badssl.selftest.gdnt-cloud.website', 443), ('testing-download.badssl.selftest.gdnt-cloud.website', 443), ('http.badssl.selftest.gdnt-cloud.website', 80), ('http-credit-card.badssl.selftest.gdnt-cloud.website', 80), ('http-dynamic-login.badssl.selftest.gdnt-cloud.website', 80), ('http-login.badssl.selftest.gdnt-cloud.website', 80), ('sha512.badssl.selftest.gdnt-cloud.website', 443), ('rsa2048.badssl.selftest.gdnt-cloud.website', 443), ('rsa4096.badssl.selftest.gdnt-cloud.website', 443), ('testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website', 80), ('testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website', 80), ('testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website', 80), ('testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website', 80), ('testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website', 80), ('testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website', 443), ('testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website', 80), ('testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website', 443) ] def __init__(self, service_function_index: int): self._service_function_index = service_function_index self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index @property def resolves(self) -> list: return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST] @property def nameservers(self) -> list: return ['192.0.2.%d' % self._ipv4_4th_octet] class URLTransferBuilder: def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int): self._url = url self._request_resolve = request_resolve self._conn_timeout = conn_timeout self._max_recv_speed_large = max_recv_speed_large self._conn = None self._response_code = None self._response_buffer = BytesIO() self._error_info = None self._size_download = None def _setup_connection(self): self._response_buffer = BytesIO() self._conn = pycurl.Curl() self._conn.setopt(self._conn.WRITEFUNCTION, self._response_buffer.write) self._conn.setopt(self._conn.RESOLVE, self._request_resolve) self._conn.setopt(self._conn.URL, self._url) self._conn.setopt(self._conn.TIMEOUT, self._conn_timeout) self._conn.setopt(self._conn.MAX_RECV_SPEED_LARGE, self._max_recv_speed_large) def _perform_connection(self): self._conn.perform() self._response_code = self._conn.getinfo(self._conn.RESPONSE_CODE) self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD) def _close_connection(self): self._conn.close() def connect(self): try: self._setup_connection() self._perform_connection() except pycurl.error as error_info: self._error_info = error_info finally: self._close_connection() @property def response_code(self): return self._response_code @property def response_body(self): return self._response_buffer.getvalue() @property def error_info(self): return self._error_info @property def size_download(self): return self._size_download class HttpURLTransferBuilder(URLTransferBuilder): def _perform_connection(self): super()._perform_connection() class HttpsURLTransferBuilder(URLTransferBuilder): def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int): super().__init__(url, request_resolve, conn_timeout, max_recv_speed_large) self._certs_info = None def _setup_connection(self): super()._setup_connection() self._conn.setopt(self._conn.OPT_CERTINFO, 1) self._conn.setopt(self._conn.SSL_VERIFYPEER, False) def _perform_connection(self): super()._perform_connection() self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO) @property def cert_issuer(self) -> str: return self._get_cert_issuer(self._certs_info) def _get_cert_issuer(self, certs) -> str: if certs is None: return "" for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info[1] return issuer class DNSQueryBuilder: def __init__(self, domain: str, namesevers: list, conn_timeout: int): self._domain = domain self._nameservers = namesevers self._conn_timeout = conn_timeout self._dns_resolver = None self._dns_answer = None self._error_info = None def _setup(self): self._dns_resolver = dns.resolver.Resolver() self._dns_resolver.nameservers = self._nameservers self._dns_resolver.search = [] self._dns_resolver.use_search_by_default = False self._dns_resolver.timeout = float(self._conn_timeout) self._dns_resolver.lifetime = float(self._conn_timeout) def _query(self, record_type): try: self._setup() self._dns_answer = self._dns_resolver.query(self._domain, record_type) except Exception as error_info: self._error_info = error_info @property def error_info(self): return self._error_info @property def rrset_ttl(self): if self._dns_answer is not None: return self._dns_answer.rrset.ttl return None @property def response_answer(self): if self._dns_answer is not None: return self._dns_answer.response.answer return None class DNSQueryTypeABuilder(DNSQueryBuilder): def query(self): self._query("A") class DNSQueryTypeAAAABuilder(DNSQueryBuilder): def query(self): self._query("AAAA") class URLTransferResponseAnalyzer: def is_cert_issuer_matched(self, present_cert_issuer, desired_pattern): if present_cert_issuer is None: return False, f"Error: Failed to verify cert issuer. Present cert issuer is None." if re.search(desired_pattern, present_cert_issuer, 0): return True, None else: return False, f"Error: Failed to verify cert issuer. Present cert issuer: {present_cert_issuer}." def is_response_code_equal(self, present_code, desired_code): if present_code == desired_code: return True, None else: return False, f"Error: Failed to verfiy response code. Present code: {present_code}, desired code: {desired_code}." def is_response_body_matched(self, present_body, desired_pattern): present_body_utf8 = present_body.decode('utf-8') if re.search(desired_pattern, present_body_utf8, 0): return True, None else: return False, f"Error: The response body fail to match the desired content." def is_response_body_not_matched(self, present_body, desired_pattern): present_body_utf8 = present_body.decode('utf-8') if not re.search(desired_pattern, present_body_utf8, 0): return True, None else: return False, f"Error: The response body matched the desired content: {desired_pattern}." def is_response_body_md5_equal(self, present_body, disired_md5): present_body_md5_value = hashlib.md5(present_body).hexdigest() if re.search(disired_md5, present_body_md5_value, 0): return True, None else: return False, f"Error: The response body md5 fail to match. Present md5 value: {present_body_md5_value}." def is_download_size_equal(self, present_size, disired_size): if present_size == disired_size: return True, None else: return False, f"Error: The response body download size fail to match. present_size: {present_size}." def is_pycurl_error_code_equal(self, present_error_info, disired_error_code): if present_error_info is None: return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." if present_error_info.args[0] == disired_error_code: return True, None else: return False, f"Error: The erro code not equal to desired. Present error info: {present_error_info}." def is_pycurl_error_none(self, present_error_info): if present_error_info is None: return True, None else: return False, f"Error: The pycurl error is not None. Present error info: {present_error_info}." class DNSResponseAnalyzer: def is_error_info_none(self, present_error_info): if present_error_info is None: return True, None return False, f"Error: The error info: {present_error_info}." def is_error_type_equal(self, present_error_info, desired_type): if present_error_info is None: return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." if type(present_error_info) == desired_type: return True, None else: return False, f"Error: error type not equal to {desired_type}, error info: {present_error_info}." def is_ttl_equal(self, present_ttl, desired_ttl): if present_ttl == desired_ttl: return True, None return False, f"Error: Present ttl(%s) not equal to %d." %(present_ttl, desired_ttl) def is_ttl_in_range(self, present_ttl, desired_left_edge, desired_right_edge): if present_ttl >= desired_left_edge and present_ttl <= desired_right_edge: return True, None return False, f"Error: ttl(%d) not in [%d-%d]." %(present_ttl, desired_left_edge, desired_right_edge) def is_address_equal(self, response_answer, desired_address, desired_address_type): if desired_address_type == "ipv4": rdtype = 1 if desired_address_type == "ipv6": rdtype = 28 for i in response_answer: for j in i.items: if j.rdtype == rdtype: # 1: ipv4, 28: ipv6 if j.address == desired_address: return True, None else: return False, f"Error: Present address error." else: return False, f"Error: Response rdtype error." class ProxyCasesRunner: def __init__(self) -> None: self._analyzer = URLTransferResponseAnalyzer() def action_redirect_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] desired_cert_issuer_pattern = r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b' is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_redirect_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_block_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B') if not is_body_matched[0]: return False, is_body_matched[1] return True, None def action_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B') if not is_body_matched[0]: return False, is_body_matched[1] return True, None def action_replace_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E') if not is_body_matched[0]: return False, is_body_matched[1] is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared') if not is_body_matched[0]: return False, is_body_not_matched[1] return True, None def action_replace_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E') if not is_body_matched[0]: return False, is_body_matched[1] is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared') if not is_body_matched[0]: return False, is_body_not_matched[1] return True, None def action_hijack_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d") if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_hijack_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d") if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_insert_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert') if not is_json_key_matched[0]: return False, is_json_key_matched[1] is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330') if not is_json_val_matched[0]: return False, is_json_val_matched[1] return True, None def action_insert_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert') if not is_json_key_matched[0]: return False, is_json_key_matched[1] is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330') if not is_json_val_matched[0]: return False, is_json_key_matched[1] return True, None def action_deny_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host') if not is_body_matched[0]: return False, is_body_matched[1] return True, None def action_deny_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url') if not is_body_matched[0]: return False, is_body_matched[1] return True, None class ShapingCaseRunner: def __init__(self) -> None: self._analyzer = URLTransferResponseAnalyzer() self._dns_analyzer = DNSResponseAnalyzer() def rate_limit_0bps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def rate_limit_0bps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def rate_limit_1000gbps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def rate_limit_1000gbps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) if not is_code_equal[0]: return False, is_code_equal[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not is_cert_matched[0]: return False, is_cert_matched[1] return True, None class FirewallCasesRunner: def __init__(self) -> None: self._analyzer = URLTransferResponseAnalyzer() self._dns_analyzer = DNSResponseAnalyzer() def action_bypass_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not is_cert_matched[0]: return False, is_cert_matched[1] return True, None def action_intercept_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] return True, None def action_intercept_protocol_https_cert_error(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b') if not is_cert_matched[0]: return False, is_cert_matched[1] return True, None def action_allow_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_deny_subaction_drop_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_reset_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 56) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_reset_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large): return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large) def action_deny_subaction_reset_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large): return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large) def action_deny_subaction_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 403) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r"dign-testing-deny-block") if not is_body_matched[0]: return False, is_body_matched[1] return True, None def action_allow_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_deny_subaction_drop_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_reset_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 35) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_intercept_protocol_https_download_size_1k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024) def action_intercept_protocol_https_download_size_4k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 4) def action_intercept_protocol_https_download_size_16k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 16) def action_intercept_protocol_https_download_size_64k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 64) def action_intercept_protocol_https_download_size_256k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 256) def action_intercept_protocol_https_download_size_1M(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024) def action_intercept_protocol_https_download_size_4M(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 4) def action_intercept_protocol_https_download_size_16M(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 16) def action_intercept_protocol_https_download_size_64M(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 64) def _action_intercept_protocol_ssl_by_download_size(self, url, resolves, conn_timeout, max_recv_speed_large, download_size): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_download_size_equal = self._analyzer.is_download_size_equal(conn.size_download, download_size) if not is_download_size_equal[0]: return False, is_download_size_equal[1] return True, None def action_deny_subaction_drop_protocol_dns(self, domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() is_error_type_equal = self._dns_analyzer.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_redirect_protocol_dns_type_a(self, domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) if not is_error_info_none[0]: return False, is_error_info_none[1] is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333) if not is_ttl_equal[0]: return False, is_ttl_equal[1] is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4") if not is_address_equal[0]: return False, is_address_equal[1] return True, None def action_deny_subaction_redirect_protocol_dns_type_aaaa(self, domain, nameservers, conn_timeout): request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout) request.query() is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) if not is_error_info_none[0]: return False, is_error_info_none[1] is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333) if not is_ttl_equal[0]: return False, is_ttl_equal[1] is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6") if not is_address_equal[0]: return False, is_address_equal[1] return True, None def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(self, domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) if not is_error_info_none[0]: return False, is_error_info_none[1] is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500) if not is_ttl_equal[0]: return False, is_ttl_equal[1] is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4") if not is_address_equal[0]: return False, is_address_equal[1] return True, None def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(self, domain, nameservers, conn_timeout): request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout) request.query() is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) if not is_error_info_none[0]: return False, is_error_info_none[1] is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500) if not is_ttl_equal[0]: return False, is_ttl_equal[1] is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6") if not is_address_equal[0]: return False, is_address_equal[1] return True, None class ResultExportBuilder: def __init__(self): self._exporter = None self._create_exporter() def _create_exporter(self): self._exporter = PrettyTable() self._exporter.vrules = NONE self._exporter.field_names = ["Case name", "Status", "Description"] self._exporter.align["Case name"] = "l" self._exporter.align["Status"] = "l" self._exporter.align["Description"] = "l" def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True): status, description = self._convert_case_result(case_result) if enable_ouptut_by_case: self._output_by_case_name(case_name, status) if not case_result[0]: self._add_exporter_row(case_name, status, description) def _convert_case_result(self, case_result): status = "ok" if not case_result[0]: status = "FAIL" description = "-" if case_result[1] is not None: description = case_result[1] return status, description def _output_by_case_name(self, case_name, case_status): print(f"{case_name} ... {case_status}") def _add_exporter_row(self, case_name: str, status: str, description: str): self._exporter.add_row([case_name, status, description]) def export(self): if len(self._exporter._rows) > 0: print(self._exporter) class DiagnoseCasesRunner: def __init__(self, cmd_parser: CommandParser): self._loop = cmd_parser.loop self._count = cmd_parser.count self._interval_s = cmd_parser.interval_s self._case_names_regexp = cmd_parser.case_names_regexp self._config_path = cmd_parser.config_path self._service_function_names_regexp = cmd_parser.service_function_names_regexp self._service_function_config_path = cmd_parser.service_function_config_path self._random_delay_seconds = cmd_parser.delay_seconds self._config_loader = ConfigLoader(self._config_path) self._proxy_case = ProxyCasesRunner() self._firewall_case = FirewallCasesRunner() self._shaping_case = ShapingCaseRunner() self._cases_info = [ { "name": "test_firewallBypass_ssl", "protocol_type": "https", "test_function": self._firewall_case.action_bypass_protocol_https, "request_content": "https://sha384.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallIntercept_ssl", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https, "request_content": "https://sha256.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallIntercept_sslCerterrExpired", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_cert_error, "request_content": "https://expired.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallIntercept_sslCerterrSelfsigned", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_cert_error, "request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallIntercept_sslCerterrUntrustedroot", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_cert_error, "request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website" }, { "name": "test_proxyRedirect_ssl", "protocol_type": "https", "test_function": self._proxy_case.action_redirect_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js" }, { "name": "test_proxyBlock_ssl", "protocol_type": "https", "test_function": self._proxy_case.action_block_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js" }, { "name": "test_proxyReplace_ssl", "protocol_type": "https", "test_function": self._proxy_case.action_replace_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js" }, { "name": "test_proxyHijack_ssl", "protocol_type": "https", "test_function": self._proxy_case.action_hijack_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js" }, { "name": "test_proxyInsert_ssl", "protocol_type": "https", "test_function": self._proxy_case.action_insert_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html" }, { "name": "test_proxyRedirect_http", "protocol_type": "http", "test_function": self._proxy_case.action_redirect_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js" }, { "name": "test_proxyBlock_http", "protocol_type": "http", "test_function": self._proxy_case.action_block_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js" }, { "name": "test_proxyReplace_http", "protocol_type": "http", "test_function": self._proxy_case.action_replace_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js" }, { "name": "test_proxyHijack_http", "protocol_type": "http", "test_function": self._proxy_case.action_hijack_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js" }, { "name": "test_proxyInsert_http", "protocol_type": "http", "test_function": self._proxy_case.action_insert_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html" }, { "name": "test_firewallAllow_http", "protocol_type": "http", "test_function": self._firewall_case.action_allow_protocol_http, "request_content": "http://http.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyDrop_http", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_drop_protocol_http, "request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyReset_http", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http, "request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyBlock_http", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_block_protocol_http, "request_content": "http://http-login.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallAllow_ssl", "protocol_type": "https", "test_function": self._firewall_case.action_allow_protocol_https, "request_content": "https://sha512.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyDrop_ssl", "protocol_type": "https", "test_function": self._firewall_case.action_deny_subaction_drop_protocol_https, "request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyReset_ssl", "protocol_type": "https", "test_function": self._firewall_case.action_deny_subaction_reset_protocol_https, "request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyDrop_dns", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_drop_protocol_dns, "request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyRedirectA_dns", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a, "request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyRedirectAAAA_dns", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa, "request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyRedirectARangeTTL_dns", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl, "request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyRedirectAAAARangeTTL_dns", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl, "request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website" }, { "name": "test_firewallIntercept_sslDownloadSize1k", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_1k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k" }, { "name": "test_firewallIntercept_sslDownloadSize4k", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_4k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k" }, { "name": "test_firewallIntercept_sslDownloadSize16k", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_16k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k" }, { "name": "test_firewallIntercept_sslDownloadSize64k", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_64k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k" }, { "name": "test_firewallIntercept_sslDownloadSize256k", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_256k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k" }, { "name": "test_firewallIntercept_sslDownloadSize1M", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_1M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M" }, { "name": "test_firewallIntercept_sslDownloadSize4M", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_4M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M" }, { "name": "test_firewallIntercept_sslDownloadSize16M", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_16M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M" }, { "name": "test_firewallIntercept_sslDownloadSize64M", "protocol_type": "https", "test_function": self._firewall_case.action_intercept_protocol_https_download_size_64M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M" }, { "name": "test_firewallDenyResetFilterHost_http", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_host, "request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website" }, { "name": "test_firewallDenyResetFilterURL_http", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_url, "request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website" }, { "name": "test_proxyDenyFilterHost_http", "protocol_type": "http", "test_function": self._proxy_case.action_deny_protocol_http_filter_host, "request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website" }, { "name": "test_proxyDenyFilterURL_http", "protocol_type": "http", "test_function": self._proxy_case.action_deny_protocol_http_filter_url, "request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website" }, { "name": "test_shaping_ratelimit_0bps_http", "protocol_type": "http", "test_function": self._shaping_case.rate_limit_0bps_protocol_http, "request_content": "http://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M" }, { "name": "test_shaping_ratelimit_0bps_https", "protocol_type": "https", "test_function": self._shaping_case.rate_limit_0bps_protocol_https, "request_content": "https://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M" }, { "name": "test_shaping_ratelimit_1000gbps_http", "protocol_type": "http", "test_function": self._shaping_case.rate_limit_1000gbps_protocol_http, "request_content": "http://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M" }, { "name": "test_shaping_ratelimit_1000gbps_https", "protocol_type": "https", "test_function": self._shaping_case.rate_limit_1000gbps_protocol_https, "request_content": "https://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M" } ] def run_cases(self): if self._random_delay_seconds is not None: time.sleep(self._random_delay_seconds) run_count = 1 while True: print("\nRUN %d" % run_count) self.__run_cases_by_service_function_ids() if (self._loop is not True) and self._count >= run_count: break time.sleep(self._interval_s) run_count = run_count + 1 def __run_cases_by_service_function_ids(self): sf_loader = ServiceFunctionConfigLoader(self._service_function_config_path, self._service_function_names_regexp) for sf_pair in sf_loader.name_id_pairs: start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(format(("Service function name: " + str(sf_pair["name"]) + ",Test start time: " + start_timestamp),'#^100s')) self._run_cases_in_one_service_function_id(sf_pair["id"]) end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(format(("Service function name: " + str(sf_pair["name"]) + ",Test end time: " + end_timestamp),'=^100s')) def _run_cases_in_one_service_function_id(self, service_function_id): resolve_Builder = ServerAddressBuilder(service_function_id) exporter = ResultExportBuilder() for case_info in self._cases_info: if re.fullmatch(self._case_names_regexp, case_info["name"]): self._run_one_case(case_info, resolve_Builder, exporter) exporter.export() def _run_one_case(self, case_info, resolve_Builder, exporter): conn_timeout = self._get_conn_timeout_from_configs(case_info["name"]) max_recv_speed_large = self._get_max_recv_speed_large_from_configs(case_info["name"]) test_func = case_info.get("test_function") if test_func: if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https": ret = test_func(case_info["request_content"], resolve_Builder.resolves, conn_timeout, max_recv_speed_large) if case_info["protocol_type"] == "dns": ret = test_func(case_info["request_content"], resolve_Builder.nameservers, conn_timeout) exporter.append_case_result(case_info["name"], ret) def _get_conn_timeout_from_configs(self, section) -> int: return self._config_loader.get_value_by_section_and_key(section, "conn_timeout") def _get_max_recv_speed_large_from_configs(self, section) -> int: return self._config_loader.get_value_by_section_and_key(section, "max_recv_speed_large") if __name__ == '__main__': cmd_parser = CommandParser() dign = DiagnoseCasesRunner(cmd_parser) dign.run_cases()