import sys import unittest import json import pycurl import os import re import time import io from io import BytesIO # import getopt # import ciunittest import argparse import hashlib from configparser import ConfigParser import random import dns.exception import dns.resolver import sys import logging import copy from prettytable import PrettyTable,NONE,HEADER import yaml from urllib.parse import urlparse from scapy.all import * from scapy.layers.inet import IP, TCP from scapy.sendrecv import AsyncSniffer from urllib.parse import urlparse, parse_qs class ConfigLoader: DEFAULT_CONFIGS = { 'Firewall_DenyDrop_DNS': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'Firewall_DenyRedirect_A_DNS': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'Firewall_DenyRedirect_AAAA_DNS': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'Firewall_DenyRedirect_ARangeTTL_DNS': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'Firewall_DenyRedirect_AAAARangeTTL_DNS': { 'conn_timeout': 3,'max_recv_speed_large': 6553600 }, 'test_dnsRequest_allow_rdtype_a': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_dnsRequest_allow_rdtype_aaaa': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'test_dnsRequest_allow_rdtype_cname': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_CertExpired': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_CertSelfSigned': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_CertUntrustedRoot': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Redirect_HTTPS': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Deny_HTTPS': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Replace_HTTPS': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Hijack_HTTPS': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Insert_HTTPS': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Redirect_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Deny_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Replace_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Hijack_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Insert_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_1k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_4k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_16k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_64k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_256k': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_1M': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_4M': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_16M': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'Proxy_Intercept_HTTPS_Response_64M': { 'conn_timeout': 12,'max_recv_speed_large': 6553600 }, 'Firewall_Allow_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Firewall_DenyDrop_HTTP': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'Firewall_DenyReset_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Firewall_DenyBlock_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Firewall_Allow_HTTPS': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Firewall_DenyDrop_HTTPS': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'Firewall_DenyReset_HTTPS': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Firewall_DenyReset_FilterHost_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Firewall_DenyReset_FilterURL_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Deny_FilterHost_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Proxy_Manipulation_Deny_FilterURL_HTTP': { 'conn_timeout': 1,'max_recv_speed_large': 6553600 }, 'Shaping_RateLimit0bps_HTTP': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'Shaping_RateLimit0bps_HTTPS': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'Shaping_RateLimit1000gbps_HTTP': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 }, 'Shaping_RateLimit1000gbps_HTTPS': { 'conn_timeout': 4,'max_recv_speed_large': 6553600 } } def __init__(self, config_path: str): self.__config_path = config_path self.__configs = copy.deepcopy(self.DEFAULT_CONFIGS) self.__load_configs() def __load_configs(self): config_parser = ConfigParser() config_parser.read(self.__config_path, encoding='UTF-8') for section in config_parser.sections(): if section in self.__configs: self.__configs[section].update(config_parser.items(section)) else: self.__configs[section] = dict(config_parser.items(section)) def get_value_by_section_and_key(self, section, key): if section in self.__configs and key in self.__configs[section]: value = self.__configs[section][key] if isinstance(value, str) and value.isdigit(): return int(value) else: return value else: raise KeyError("%s not in configs or %s.%s not in configs." % (section, section, key)) class ServiceFunctionConfigLoader: TABLE_NAME = "service_function_maps" SF_NAME_KEY = "service_function_name" DEVICE_ID_KEY = "device_id" def __init__(self, config_path, name_pattern): self._config_path = config_path self._name_pattern = name_pattern self._configs = None self._load_configs() self._pairs = self._read_name_id_pairs_by_name_pattern(self._name_pattern) def _load_configs(self): try: with open(self._config_path, 'r') as f: load_configs = yaml.safe_load(f) if load_configs is not None: self._configs = load_configs else: print(f"Error: No config loaded from path: {self._config_path}") except FileNotFoundError: print(f"Error: File not found: {self._config_path}") except yaml.YAMLError as e: print(f"Error parsing YAML file: {e}") def _read_name_id_pairs_by_name_pattern(self, name_pattern): if (self._configs is None) or (self.TABLE_NAME not in self._configs): return [] return [ {"name": item[self.SF_NAME_KEY], "id": int(item[self.DEVICE_ID_KEY])} for item in self._configs[self.TABLE_NAME] if re.match(name_pattern, item[self.SF_NAME_KEY]) ] @property def name_id_pairs(self): return self._pairs class CommandParser: COUNT_MIN = 1 COUNT_MAX = 65535 INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$' #INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' def __init__(self): self.__start_up() def __start_up(self): parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose") parser.add_argument('-i','--interval', type = int, default = 30, help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.') parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].') parser.add_argument('--periodical', action='store_true', default = False, help='Enable TSG diagnose periodical, exit when recv a signal.') parser.add_argument('--service_function_names_regexp', type = str, default = '.+', help = "Regexp of the service function names to run. Example: .+") parser.add_argument('--service_function_config_path', type = str, default = '/opt/dign_client/share/service_function_maps.yaml', help = "Specifies the service function config file, default /opt/dign_client/share/service_function_maps.yaml") parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf', help='Specifies the config file, default /opt/dign_client/etc/client.conf') parser.add_argument('--case_names_regexp', type = str, default = '.+', help='Regexp of the case names to run. Example: .+') parser.add_argument('--enable_delay', action='store_true', default = False, help='Enable a random delay between 1 and 30 seconds before running a case.') parser.add_argument('--delay_left_edge', type = int, default = 1, help='The seconds of the delay left edge. Default: 1.') parser.add_argument('--delay_right_edge', type = int, default = 30, help='The seconds of the delay right edge. Default: 30.') # parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running') args = parser.parse_args() if not self.__is_legal_args(args): parser.print_help() sys.exit(1) self.__interval_s = args.interval self.__count = args.count self.__loop = args.periodical self.__config_path = args.config_path self.__case_names_regexp = args.case_names_regexp self.__service_function_names_regexp = args.service_function_names_regexp self.__service_function_config_path = args.service_function_config_path self._enable_delay = args.enable_delay self._delay_left_edge = args.delay_left_edge self._delay_right_edge = args.delay_right_edge def __is_legal_args(self, args) -> bool: if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX: print("Error: the number of TSG diagnoses must in [1,65535]") return False if args.delay_left_edge > args.delay_right_edge: print("Error: The delay in seconds on the left edge is greater than on the right edge.") return True @property def interval_s(self): return self.__interval_s @property def count(self): return self.__count @property def loop(self): return self.__loop @property def config_path(self): return self.__config_path @property def case_names_regexp(self): return r"{}".format(self.__case_names_regexp) @property def service_function_names_regexp(self): return r"{}".format(self.__service_function_names_regexp) @property def service_function_config_path(self): return self.__service_function_config_path @property def delay_seconds(self): if self._enable_delay: return random.randint(self._delay_left_edge, self._delay_right_edge) else: return None class ServerAddressBuilder: IPv4_4TH_OCTET_LEFT_EDGE = 101 IPv4_1_TO_3TH_OCTET = "192.0.2" def __init__(self, service_function_index: int): self._service_function_index = service_function_index self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index def read_resolves_by_url(self, url): parsed_url = urlparse(url) port = parsed_url.port if not port: if parsed_url.scheme == 'https': port = 443 elif parsed_url.scheme == 'http': port = 80 return [f"{parsed_url.hostname}:{port}:{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}"] # @property # def resolves(self) -> list: # return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST] @property def nameservers(self) -> list: return [f'{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}'] class TcpPacketsCapture: IFACE = "net1" DSCP_KEY = ["DSCP"] def __init__(self, server_ip, server_port): self._server_ip = server_ip self._server_port = server_port self._quadruple_to_dscp_table = {} self._build_filter() # def _read_server_ip_and_port(self): def _build_filter(self): self._filter = f"tcp and src host {self._server_ip} and src port {self._server_port}" def start(self): self._sniff_thread = AsyncSniffer(iface=self.IFACE, prn=self._packet_callback, filter=self._filter) self._sniff_thread.start() def _packet_callback(self, packet): if IP in packet and TCP in packet: src_ip = packet[IP].src if src_ip == self._server_ip: dscp_value = self._read_dscp_value_from_packet(packet) quadruple = self._build_quadruple(packet, True) self._update_quadruple_to_dscp_table(quadruple, dscp_value) def _read_dscp_value_from_packet(self, packet): ip_header = packet[IP] return (ip_header.tos & 0xfc) >> 2 def _build_quadruple(self, packet, is_s2c): src_ip = packet[IP].src dst_ip = packet[IP].dst src_port = packet[TCP].sport dst_port = packet[TCP].dport if is_s2c: return f"{dst_ip}:{dst_port},{src_ip}:{src_port}" else: return f"{src_ip}:{src_port},{dst_ip}:{dst_port}" def _update_quadruple_to_dscp_table(self, quadruple, dscp_value): self._quadruple_to_dscp_table[quadruple] = dscp_value def stop(self): self._sniff_thread.stop() self._sniff_thread.join() def read_dscp_value_by_quadruple(self, quadruple): if quadruple in self._quadruple_to_dscp_table: return self._quadruple_to_dscp_table[quadruple] else: return None class TcpPacketsCaptureAnalyzer: def is_dscp_equal(self, present_dscp, desired_dscp): if present_dscp is None: return False, f"Error: Not read DSCP value." if present_dscp == desired_dscp: return True, None else: return False, f"Error: Failed to verify DSCP value. Present DSCP: {present_dscp}, desired DSCP: {desired_dscp}." class URLTransferBuilder: def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int): self._url = url self._request_resolve = request_resolve self._conn_timeout = conn_timeout self._max_recv_speed_large = max_recv_speed_large self._conn = None self._response_code = None self._response_buffer = BytesIO() self._error_info = None self._size_download = None self._local_ip = None self._local_port = None self._remote_ip = None self._remote_port = None def _setup_connection(self): self._response_buffer = BytesIO() self._conn = pycurl.Curl() self._conn.setopt(self._conn.WRITEFUNCTION, self._response_buffer.write) self._conn.setopt(self._conn.RESOLVE, self._request_resolve) self._conn.setopt(self._conn.URL, self._url) self._conn.setopt(self._conn.TIMEOUT, self._conn_timeout) self._conn.setopt(self._conn.MAX_RECV_SPEED_LARGE, self._max_recv_speed_large) def _perform_connection(self): self._conn.perform() self._response_code = self._conn.getinfo(self._conn.RESPONSE_CODE) self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD) self._local_ip = self._conn.getinfo(pycurl.LOCAL_IP) self._local_port = self._conn.getinfo(pycurl.LOCAL_PORT) self._remote_ip = self._conn.getinfo(pycurl.PRIMARY_IP) self._remote_port = self._conn.getinfo(pycurl.PRIMARY_PORT) def _close_connection(self): self._conn.close() def connect(self): try: self._setup_connection() self._perform_connection() except pycurl.error as error_info: self._error_info = error_info finally: self._close_connection() @property def response_code(self): return self._response_code @property def response_body(self): return self._response_buffer.getvalue() @property def error_info(self): return self._error_info @property def size_download(self): return self._size_download @property def quadruple(self): return f"{self._local_ip}:{self._local_port},{self._remote_ip}:{self._remote_port}" class HttpURLTransferBuilder(URLTransferBuilder): def _perform_connection(self): super()._perform_connection() class HttpsURLTransferBuilder(URLTransferBuilder): def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int): super().__init__(url, request_resolve, conn_timeout, max_recv_speed_large) self._certs_info = None def _setup_connection(self): super()._setup_connection() self._conn.setopt(self._conn.OPT_CERTINFO, 1) self._conn.setopt(self._conn.SSL_VERIFYPEER, False) def _perform_connection(self): super()._perform_connection() self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO) @property def cert_issuer(self) -> str: return self._get_cert_issuer(self._certs_info) def _get_cert_issuer(self, certs) -> str: if certs is None: return "" for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info[1] return issuer class DNSQueryBuilder: def __init__(self, domain: str, namesevers: list, conn_timeout: int): self._domain = domain self._nameservers = namesevers self._conn_timeout = conn_timeout self._dns_resolver = None self._dns_answer = None self._error_info = None def _setup(self): self._dns_resolver = dns.resolver.Resolver() self._dns_resolver.nameservers = self._nameservers self._dns_resolver.search = [] self._dns_resolver.use_search_by_default = False self._dns_resolver.timeout = float(self._conn_timeout) self._dns_resolver.lifetime = float(self._conn_timeout) def _query(self, record_type): try: self._setup() self._dns_answer = self._dns_resolver.query(self._domain, record_type) except Exception as error_info: self._error_info = error_info @property def error_info(self): return self._error_info @property def rrset_ttl(self): if self._dns_answer is not None: return self._dns_answer.rrset.ttl return None @property def response_answer(self): if self._dns_answer is not None: return self._dns_answer.response.answer return None class DNSQueryTypeABuilder(DNSQueryBuilder): def query(self): self._query("A") class DNSQueryTypeAAAABuilder(DNSQueryBuilder): def query(self): self._query("AAAA") class URLTransferResponseAnalyzer: def is_cert_issuer_matched(self, present_cert_issuer, desired_pattern): if present_cert_issuer is None: return False, f"Error: Failed to verify cert issuer. Present cert issuer is None." if re.search(desired_pattern, present_cert_issuer, 0): return True, None else: return False, f"Error: Failed to verify cert issuer. Present cert issuer: {present_cert_issuer}." def is_response_code_equal(self, present_code, desired_code): if present_code == desired_code: return True, None else: return False, f"Error: Failed to verfiy response code. Present code: {present_code}, desired code: {desired_code}." def is_response_body_matched(self, present_body, desired_pattern): present_body_utf8 = present_body.decode('utf-8') if re.search(desired_pattern, present_body_utf8, 0): return True, None else: return False, f"Error: The response body fail to match the desired content." def is_response_body_not_matched(self, present_body, desired_pattern): present_body_utf8 = present_body.decode('utf-8') if not re.search(desired_pattern, present_body_utf8, 0): return True, None else: return False, f"Error: The response body matched the desired content: {desired_pattern}." def is_response_body_md5_equal(self, present_body, disired_md5): present_body_md5_value = hashlib.md5(present_body).hexdigest() if re.search(disired_md5, present_body_md5_value, 0): return True, None else: return False, f"Error: The response body md5 fail to match. Present md5 value: {present_body_md5_value}." def is_download_size_equal(self, present_size, disired_size): if present_size == disired_size: return True, None else: return False, f"Error: The response body download size fail to match. present_size: {present_size}." def is_pycurl_error_code_equal(self, present_error_info, disired_error_code): if present_error_info is None: return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." if present_error_info.args[0] == disired_error_code: return True, None else: return False, f"Error: The erro code not equal to desired. Present error info: {present_error_info}." def is_pycurl_error_none(self, present_error_info): if present_error_info is None: return True, None else: return False, f"Error: The pycurl error is not None. Present error info: {present_error_info}." class DNSResponseAnalyzer: def is_error_info_none(self, present_error_info): if present_error_info is None: return True, None return False, f"Error: The error info: {present_error_info}." def is_error_type_equal(self, present_error_info, desired_type): if present_error_info is None: return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." if type(present_error_info) == desired_type: return True, None else: return False, f"Error: error type not equal to {desired_type}, error info: {present_error_info}." def is_ttl_equal(self, present_ttl, desired_ttl): if present_ttl == desired_ttl: return True, None return False, f"Error: Present ttl(%s) not equal to %d." %(present_ttl, desired_ttl) def is_ttl_in_range(self, present_ttl, desired_left_edge, desired_right_edge): if present_ttl >= desired_left_edge and present_ttl <= desired_right_edge: return True, None return False, f"Error: ttl(%d) not in [%d-%d]." %(present_ttl, desired_left_edge, desired_right_edge) def is_address_equal(self, response_answer, desired_address, desired_address_type): if desired_address_type == "ipv4": rdtype = 1 if desired_address_type == "ipv6": rdtype = 28 for i in response_answer: for j in i.items: if j.rdtype == rdtype: # 1: ipv4, 28: ipv6 if j.address == desired_address: return True, None else: return False, f"Error: Present address error." else: return False, f"Error: Response rdtype error." class ProxyCasesRunner: def __init__(self) -> None: self._analyzer = URLTransferResponseAnalyzer() def action_intercept_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] return True, None def action_intercept_protocol_https_cert_error(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b') if not is_cert_matched[0]: return False, is_cert_matched[1] return True, None def action_intercept_protocol_https_download_size_1k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024) def action_intercept_protocol_https_download_size_4k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 4) def action_intercept_protocol_https_download_size_16k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 16) def action_intercept_protocol_https_download_size_64k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 64) def action_intercept_protocol_https_download_size_256k(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 256) def action_intercept_protocol_https_download_size_1M(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024) def action_intercept_protocol_https_download_size_4M(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 4) def action_intercept_protocol_https_download_size_16M(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 16) def action_intercept_protocol_https_download_size_64M(self, url, resolves, conn_timeout, max_recv_speed_large): return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 64) def _action_intercept_protocol_ssl_by_download_size(self, url, resolves, conn_timeout, max_recv_speed_large, download_size): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_download_size_equal = self._analyzer.is_download_size_equal(conn.size_download, download_size) if not is_download_size_equal[0]: return False, is_download_size_equal[1] return True, None def action_redirect_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] desired_cert_issuer_pattern = r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b' is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_redirect_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_block_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B') if not is_body_matched[0]: return False, is_body_matched[1] return True, None def action_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B') if not is_body_matched[0]: return False, is_body_matched[1] return True, None def action_replace_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E') if not is_body_matched[0]: return False, is_body_matched[1] is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared') if not is_body_matched[0]: return False, is_body_not_matched[1] return True, None def action_replace_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E') if not is_body_matched[0]: return False, is_body_matched[1] is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared') if not is_body_matched[0]: return False, is_body_not_matched[1] return True, None def action_hijack_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d") if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_hijack_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d") if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_insert_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert') if not is_json_key_matched[0]: return False, is_json_key_matched[1] is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330') if not is_json_val_matched[0]: return False, is_json_val_matched[1] return True, None def action_insert_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert') if not is_json_key_matched[0]: return False, is_json_key_matched[1] is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330') if not is_json_val_matched[0]: return False, is_json_key_matched[1] return True, None def action_deny_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host') if not is_body_matched[0]: return False, is_body_matched[1] return True, None def action_deny_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url') if not is_body_matched[0]: return False, is_body_matched[1] return True, None class ShapingCaseRunner: def __init__(self) -> None: self._analyzer = URLTransferResponseAnalyzer() self._dns_analyzer = DNSResponseAnalyzer() self._capture_analyzer = TcpPacketsCaptureAnalyzer() def rate_limit_0bps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def rate_limit_0bps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def rate_limit_1000gbps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): server_ip, server_port = self._read_server_ip_and_port_from_resolve(resolves) capture = TcpPacketsCapture(server_ip, server_port) capture.start() conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() capture.stop() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) if not is_code_equal[0]: return False, is_code_equal[1] present_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple) is_dscp_equal = self._capture_analyzer.is_dscp_equal(present_dscp, 8) if not is_dscp_equal[0]: return False, is_dscp_equal[1] return True, None def rate_limit_1000gbps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): server_ip, server_port = self._read_server_ip_and_port_from_resolve(resolves) capture = TcpPacketsCapture(server_ip, server_port) capture.start() conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() capture.stop() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) if not is_code_equal[0]: return False, is_code_equal[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not is_cert_matched[0]: return False, is_cert_matched[1] present_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple) is_dscp_equal = self._capture_analyzer.is_dscp_equal(present_dscp, 8) if not is_dscp_equal[0]: return False, is_dscp_equal[1] return True, None def _read_server_ip_and_port_from_resolve(self, resolves): resolve = resolves[0] resolve_split = resolve.split(":") return resolve_split[2], resolve_split[1] class FirewallCasesRunner: def __init__(self) -> None: self._analyzer = URLTransferResponseAnalyzer() self._dns_analyzer = DNSResponseAnalyzer() def action_bypass_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not is_cert_matched[0]: return False, is_cert_matched[1] return True, None def action_allow_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_deny_subaction_drop_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_reset_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 56) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_reset_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large): return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large) def action_deny_subaction_reset_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large): return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large) def action_deny_subaction_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 403) if not is_code_equal[0]: return False, is_code_equal[1] is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r"dign-testing-deny-block") if not is_body_matched[0]: return False, is_body_matched[1] return True, None def action_allow_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) if not is_error_none[0]: return False, is_error_none[1] is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not is_cert_matched[0]: return False, is_cert_matched[1] is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) if not is_code_equal[0]: return False, is_code_equal[1] return True, None def action_deny_subaction_drop_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_reset_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) conn.connect() is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 35) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_drop_protocol_dns(self, domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() is_error_type_equal = self._dns_analyzer.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout) if not is_error_type_equal[0]: return False, is_error_type_equal[1] return True, None def action_deny_subaction_redirect_protocol_dns_type_a(self, domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) if not is_error_info_none[0]: return False, is_error_info_none[1] is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333) if not is_ttl_equal[0]: return False, is_ttl_equal[1] is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4") if not is_address_equal[0]: return False, is_address_equal[1] return True, None def action_deny_subaction_redirect_protocol_dns_type_aaaa(self, domain, nameservers, conn_timeout): request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout) request.query() is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) if not is_error_info_none[0]: return False, is_error_info_none[1] is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333) if not is_ttl_equal[0]: return False, is_ttl_equal[1] is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6") if not is_address_equal[0]: return False, is_address_equal[1] return True, None def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(self, domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) if not is_error_info_none[0]: return False, is_error_info_none[1] is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500) if not is_ttl_equal[0]: return False, is_ttl_equal[1] is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4") if not is_address_equal[0]: return False, is_address_equal[1] return True, None def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(self, domain, nameservers, conn_timeout): request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout) request.query() is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) if not is_error_info_none[0]: return False, is_error_info_none[1] is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500) if not is_ttl_equal[0]: return False, is_ttl_equal[1] is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6") if not is_address_equal[0]: return False, is_address_equal[1] return True, None class ResultExportBuilder: def __init__(self): self._exporter = None self._create_exporter() def _create_exporter(self): self._exporter = PrettyTable() self._exporter.vrules = NONE self._exporter.field_names = ["Case name", "Status", "Description"] self._exporter.align["Case name"] = "l" self._exporter.align["Status"] = "l" self._exporter.align["Description"] = "l" def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True): status, description = self._convert_case_result(case_result) if enable_ouptut_by_case: self._output_by_case_name(case_name, status) if not case_result[0]: self._add_exporter_row(case_name, status, description) def _convert_case_result(self, case_result): status = "ok" if not case_result[0]: status = "FAIL" description = "-" if case_result[1] is not None: description = case_result[1] return status, description def _output_by_case_name(self, case_name, case_status): print(f"{case_name} ... {case_status}") def _add_exporter_row(self, case_name: str, status: str, description: str): self._exporter.add_row([case_name, status, description]) def export(self): if len(self._exporter._rows) > 0: print(self._exporter) class DiagnoseCasesRunner: def __init__(self, cmd_parser: CommandParser): self._loop = cmd_parser.loop self._count = cmd_parser.count self._interval_s = cmd_parser.interval_s self._case_names_regexp = cmd_parser.case_names_regexp self._config_path = cmd_parser.config_path self._service_function_names_regexp = cmd_parser.service_function_names_regexp self._service_function_config_path = cmd_parser.service_function_config_path self._random_delay_seconds = cmd_parser.delay_seconds self._config_loader = ConfigLoader(self._config_path) self._proxy_case = ProxyCasesRunner() self._firewall_case = FirewallCasesRunner() self._shaping_case = ShapingCaseRunner() self._cases_info = [ { "name": "Firewall_Allow_HTTP", "protocol_type": "http", "test_function": self._firewall_case.action_allow_protocol_http, "request_content": "http://http.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_Allow_HTTPS", "protocol_type": "https", "test_function": self._firewall_case.action_allow_protocol_https, "request_content": "https://sha512.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyReset_HTTP", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http, "request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyReset_HTTPS", "protocol_type": "https", "test_function": self._firewall_case.action_deny_subaction_reset_protocol_https, "request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyReset_FilterHost_HTTP", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_host, "request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyReset_FilterURL_HTTP", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_url, "request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyBlock_HTTP", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_block_protocol_http, "request_content": "http://http-login.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyDrop_HTTP", "protocol_type": "http", "test_function": self._firewall_case.action_deny_subaction_drop_protocol_http, "request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyDrop_HTTPS", "protocol_type": "https", "test_function": self._firewall_case.action_deny_subaction_drop_protocol_https, "request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyDrop_DNS", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_drop_protocol_dns, "request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyRedirect_A_DNS", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a, "request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyRedirect_AAAA_DNS", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa, "request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyRedirect_ARangeTTL_DNS", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl, "request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website" }, { "name": "Firewall_DenyRedirect_AAAARangeTTL_DNS", "protocol_type": "dns", "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl, "request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website" }, { "name": "Proxy_Intercept_HTTPS", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https, "request_content": "https://sha256.badssl.selftest.gdnt-cloud.website" }, { "name": "Proxy_Intercept_HTTPS_CertExpired", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_cert_error, "request_content": "https://expired.badssl.selftest.gdnt-cloud.website" }, { "name": "Proxy_Intercept_HTTPS_CertSelfSigned", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_cert_error, "request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website" }, { "name": "Proxy_Intercept_HTTPS_CertUntrustedRoot", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_cert_error, "request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website" }, { "name": "Proxy_Intercept_HTTPS_Response_1k", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_1k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k" }, { "name": "Proxy_Intercept_HTTPS_Response_4k", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_4k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k" }, { "name": "Proxy_Intercept_HTTPS_Response_16k", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_16k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k" }, { "name": "Proxy_Intercept_HTTPS_Response_64k", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_64k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k" }, { "name": "Proxy_Intercept_HTTPS_Response_256k", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_256k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k" }, { "name": "Proxy_Intercept_HTTPS_Response_1M", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_1M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M" }, { "name": "Proxy_Intercept_HTTPS_Response_4M", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_4M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M" }, { "name": "Proxy_Intercept_HTTPS_Response_16M", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_16M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M" }, { "name": "Proxy_Intercept_HTTPS_Response_64M", "protocol_type": "https", "test_function": self._proxy_case.action_intercept_protocol_https_download_size_64M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M" }, { "name": "Proxy_Manipulation_Redirect_HTTP", "protocol_type": "http", "test_function": self._proxy_case.action_redirect_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js" }, { "name": "Proxy_Manipulation_Redirect_HTTPS", "protocol_type": "https", "test_function": self._proxy_case.action_redirect_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js" }, { "name": "Proxy_Manipulation_Replace_HTTP", "protocol_type": "http", "test_function": self._proxy_case.action_replace_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js" }, { "name": "Proxy_Manipulation_Replace_HTTPS", "protocol_type": "https", "test_function": self._proxy_case.action_replace_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js" }, { "name": "Proxy_Manipulation_Deny_HTTP", "protocol_type": "http", "test_function": self._proxy_case.action_block_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js" }, { "name": "Proxy_Manipulation_Deny_HTTPS", "protocol_type": "https", "test_function": self._proxy_case.action_block_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js" }, { "name": "Proxy_Manipulation_Deny_FilterHost_HTTP", "protocol_type": "http", "test_function": self._proxy_case.action_deny_protocol_http_filter_host, "request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website" }, { "name": "Proxy_Manipulation_Deny_FilterURL_HTTP", "protocol_type": "http", "test_function": self._proxy_case.action_deny_protocol_http_filter_url, "request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website" }, { "name": "Proxy_Manipulation_Hijack_HTTP", "protocol_type": "http", "test_function": self._proxy_case.action_hijack_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js" }, { "name": "Proxy_Manipulation_Hijack_HTTPS", "protocol_type": "https", "test_function": self._proxy_case.action_hijack_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js" }, { "name": "Proxy_Manipulation_Insert_HTTP", "protocol_type": "http", "test_function": self._proxy_case.action_insert_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html" }, { "name": "Proxy_Manipulation_Insert_HTTPS", "protocol_type": "https", "test_function": self._proxy_case.action_insert_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html" }, { "name": "Shaping_RateLimit0bps_HTTP", "protocol_type": "http", "test_function": self._shaping_case.rate_limit_0bps_protocol_http, "request_content": "http://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M" }, { "name": "Shaping_RateLimit0bps_HTTPS", "protocol_type": "https", "test_function": self._shaping_case.rate_limit_0bps_protocol_https, "request_content": "https://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M" }, { "name": "Shaping_RateLimit1000gbps_HTTP", "protocol_type": "http", "test_function": self._shaping_case.rate_limit_1000gbps_protocol_http, "request_content": "http://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M" }, { "name": "Shaping_RateLimit1000gbps_HTTPS", "protocol_type": "https", "test_function": self._shaping_case.rate_limit_1000gbps_protocol_https, "request_content": "https://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M" } ] def run_cases(self): if self._random_delay_seconds is not None: time.sleep(self._random_delay_seconds) run_count = 1 while True: print("\nRUN %d" % run_count) self.__run_cases_by_service_function_ids() if (self._loop is not True) and self._count >= run_count: break time.sleep(self._interval_s) run_count = run_count + 1 def __run_cases_by_service_function_ids(self): sf_loader = ServiceFunctionConfigLoader(self._service_function_config_path, self._service_function_names_regexp) for sf_pair in sf_loader.name_id_pairs: start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(format(("Service function name: " + str(sf_pair["name"]) + ",Test start time: " + start_timestamp),'#^100s')) self._run_cases_in_one_service_function_id(sf_pair["id"]) end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(format(("Service function name: " + str(sf_pair["name"]) + ",Test end time: " + end_timestamp),'=^100s')) def _run_cases_in_one_service_function_id(self, service_function_id): resolve_builder = ServerAddressBuilder(service_function_id) exporter = ResultExportBuilder() for case_info in self._cases_info: if re.fullmatch(self._case_names_regexp, case_info["name"]): self._run_one_case(case_info, resolve_builder, exporter) exporter.export() def _run_one_case(self, case_info, resolve_builder, exporter): conn_timeout = self._get_conn_timeout_from_configs(case_info["name"]) max_recv_speed_large = self._get_max_recv_speed_large_from_configs(case_info["name"]) test_func = case_info.get("test_function") if test_func: if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https": resolve = resolve_builder.read_resolves_by_url(case_info["request_content"]) ret = test_func(case_info["request_content"], resolve, conn_timeout, max_recv_speed_large) if case_info["protocol_type"] == "dns": ret = test_func(case_info["request_content"], resolve_builder.nameservers, conn_timeout) exporter.append_case_result(case_info["name"], ret) def _get_conn_timeout_from_configs(self, section) -> int: return self._config_loader.get_value_by_section_and_key(section, "conn_timeout") def _get_max_recv_speed_large_from_configs(self, section) -> int: return self._config_loader.get_value_by_section_and_key(section, "max_recv_speed_large") if __name__ == '__main__': cmd_parser = CommandParser() dign = DiagnoseCasesRunner(cmd_parser) dign.run_cases()