import sys import unittest import json import pycurl import os import re import time import io from io import BytesIO # import getopt # import ciunittest import argparse import hashlib from configparser import ConfigParser import random import dns.exception import dns.resolver import sys import logging import copy from prettytable import PrettyTable,VRuleStyle import yaml from urllib.parse import urlparse from scapy.all import * from scapy.layers.inet import IP, TCP from scapy.sendrecv import AsyncSniffer from urllib.parse import urlparse, parse_qs class ConfigLoader: def __init__(self, config_path: str): self._config_path = config_path self._configs = {} if os.path.exists(self._config_path): self._load_configs() def _load_configs(self): config_parser = ConfigParser() config_parser.read(self._config_path, encoding='UTF-8') return config_parser.sections() def get_value_by_section_and_key(self, section, key): if section in self._configs and key in self._configs[section]: value = self._configs[section][key] if isinstance(value, str) and value.isdigit(): return int(value) else: return value else: return None class ServiceFunctionConfigLoader: TABLE_NAME = "service_function_maps" SF_NAME_KEY = "service_function_name" DEVICE_ID_KEY = "device_id" def __init__(self, config_path, name_pattern): self._config_path = config_path self._name_pattern = name_pattern self._configs = None self._load_configs() self._pairs = self._read_name_id_pairs_by_name_pattern(self._name_pattern) def _load_configs(self): try: with open(self._config_path, 'r') as f: load_configs = yaml.safe_load(f) if load_configs is not None: self._configs = load_configs else: print(f"Error: No config loaded from path: {self._config_path}") except FileNotFoundError: print(f"Error: File not found: {self._config_path}") except yaml.YAMLError as e: print(f"Error parsing YAML file: {e}") def _read_name_id_pairs_by_name_pattern(self, name_pattern): if (self._configs is None) or (self.TABLE_NAME not in self._configs): return [] return [ {"name": item[self.SF_NAME_KEY], "id": int(item[self.DEVICE_ID_KEY])} for item in self._configs[self.TABLE_NAME] if re.fullmatch(name_pattern, item[self.SF_NAME_KEY]) ] @property def name_id_pairs(self): return self._pairs class CommandParser: COUNT_MIN = 1 COUNT_MAX = 65535 INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$' #INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' def __init__(self): self.__start_up() def __start_up(self): parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose") parser.add_argument('-i','--interval', type = int, default = 30, help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.') parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].') parser.add_argument('--periodical', action='store_true', default = False, help='Enable TSG diagnose periodical, exit when recv a signal.') parser.add_argument('--service_function_names_regexp', type = str, default = '.+', help = "Regexp of the service function names to run. Example: .+") parser.add_argument('--service_function_config_path', type = str, default = '/opt/dign_client/share/service_function_maps.yaml', help = "Specifies the service function config file, default /opt/dign_client/share/service_function_maps.yaml") parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf', help='Specifies the config file, default /opt/dign_client/etc/client.conf') parser.add_argument('--case_names_regexp', type = str, default = '.+', help='Regexp of the case names to run. Example: .+') parser.add_argument('--enable_delay', action='store_true', default = False, help='Enable a random delay between 1 and 30 seconds before running a case.') parser.add_argument('--delay_left_edge', type = int, default = 1, help='The seconds of the delay left edge. Default: 1.') parser.add_argument('--delay_right_edge', type = int, default = 30, help='The seconds of the delay right edge. Default: 30.') # parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running') args = parser.parse_args() if not self.__is_legal_args(args): parser.print_help() sys.exit(1) self.__interval_s = args.interval self.__count = args.count self.__loop = args.periodical self.__config_path = args.config_path self.__case_names_regexp = args.case_names_regexp self.__service_function_names_regexp = args.service_function_names_regexp self.__service_function_config_path = args.service_function_config_path self._enable_delay = args.enable_delay self._delay_left_edge = args.delay_left_edge self._delay_right_edge = args.delay_right_edge def __is_legal_args(self, args) -> bool: if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX: print("Error: the number of TSG diagnoses must in [1,65535]") return False if args.delay_left_edge > args.delay_right_edge: print("Error: The delay in seconds on the left edge is greater than on the right edge.") return True @property def interval_s(self): return self.__interval_s @property def count(self): return self.__count @property def loop(self): return self.__loop @property def config_path(self): return self.__config_path @property def case_names_regexp(self): return r"{}".format(self.__case_names_regexp) @property def service_function_names_regexp(self): return r"{}".format(self.__service_function_names_regexp) @property def service_function_config_path(self): return self.__service_function_config_path @property def delay_seconds(self): if self._enable_delay: return random.randint(self._delay_left_edge, self._delay_right_edge) else: return None class ServerAddressBuilder: IPv4_4TH_OCTET_LEFT_EDGE = 101 IPv4_1_TO_3TH_OCTET = "192.0.2" def __init__(self, service_function_index: int): self._service_function_index = service_function_index self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index def read_resolves_by_url(self, url): parsed_url = urlparse(url) port = parsed_url.port if not port: if parsed_url.scheme == 'https': port = 443 elif parsed_url.scheme == 'http': port = 80 return [f"{parsed_url.hostname}:{port}:{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}"] # @property # def resolves(self) -> list: # return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST] @property def nameservers(self) -> list: return [f'{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}'] class TcpPacketsCapture: DSCP_KEY = ["DSCP"] def __init__(self, server_ip, server_port): self._server_ip = server_ip self._server_port = server_port self._quadruple_to_dscp_table = {} self._build_filter() self._iface_names = get_if_list() # def _read_server_ip_and_port(self): def _build_filter(self): self._filter = f"tcp and src host {self._server_ip} and src port {self._server_port}" def start(self): time.sleep(0.5) self._sniff_thread = AsyncSniffer(iface=self._iface_names, prn=self._packet_callback, filter=self._filter) self._sniff_thread.start() def _packet_callback(self, packet): if IP in packet and TCP in packet: src_ip = packet[IP].src if src_ip == self._server_ip: dscp_value = self._read_dscp_value_from_packet(packet) quadruple = self._build_quadruple(packet, True) self._update_quadruple_to_dscp_table(quadruple, dscp_value) def _read_dscp_value_from_packet(self, packet): ip_header = packet[IP] return (ip_header.tos & 0xfc) >> 2 def _build_quadruple(self, packet, is_s2c): src_ip = packet[IP].src dst_ip = packet[IP].dst src_port = packet[TCP].sport dst_port = packet[TCP].dport if is_s2c: return f"{dst_ip}:{dst_port},{src_ip}:{src_port}" else: return f"{src_ip}:{src_port},{dst_ip}:{dst_port}" def _update_quadruple_to_dscp_table(self, quadruple, dscp_value): self._quadruple_to_dscp_table[quadruple] = dscp_value def stop(self): time.sleep(0.5) if self._sniff_thread is not None and self._sniff_thread.running: self._sniff_thread.stop() self._sniff_thread.join() def read_dscp_value_by_quadruple(self, quadruple): if quadruple in self._quadruple_to_dscp_table: return self._quadruple_to_dscp_table[quadruple] else: return None @staticmethod def read_server_ip_and_port_from_resolve(resolves): resolve = resolves[0] resolve_split = resolve.split(":") return resolve_split[2], resolve_split[1] class TcpPacketsCaptureAssertion: @staticmethod def is_dscp_equal(actual_dscp, expected_dscp): if actual_dscp is None: return False, f"Error: Not read DSCP value." if actual_dscp == expected_dscp: return True, None else: return False, f"Error: Failed to verify DSCP value. Actual DSCP: {actual_dscp}, expected DSCP: {expected_dscp}." class URLTransferBuilder: def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed, tcp_mss=None): self._url = url self._request_resolve = request_resolve self._conn_timeout = conn_timeout self._max_recv_speed = max_recv_speed self._tcp_mss = tcp_mss self._conn = None self._response_code = None self._response_buffer = BytesIO() self._error_info = None self._size_download = None self._local_ip = None self._local_port = None self._remote_ip = None self._remote_port = None self._total_time_s = None self._speed_download = None def opensocket_callback(self, purpose, address): new_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) new_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_MAXSEG, self._tcp_mss) return new_socket def _setup_connection(self): self._response_buffer = BytesIO() self._conn = pycurl.Curl() self._conn.setopt(pycurl.WRITEFUNCTION, self._response_buffer.write) self._conn.setopt(pycurl.RESOLVE, self._request_resolve) self._conn.setopt(pycurl.URL, self._url) self._conn.setopt(pycurl.TIMEOUT, self._conn_timeout) if self._max_recv_speed is not None: self._conn.setopt(pycurl.MAX_RECV_SPEED_LARGE, self._max_recv_speed) if self._tcp_mss is not None: self._conn.setopt(pycurl.OPENSOCKETFUNCTION, self.opensocket_callback) def _perform_connection(self): self._conn.perform() self._response_code = self._conn.getinfo(pycurl.RESPONSE_CODE) self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD) self._local_ip = self._conn.getinfo(pycurl.LOCAL_IP) self._local_port = self._conn.getinfo(pycurl.LOCAL_PORT) self._remote_ip = self._conn.getinfo(pycurl.PRIMARY_IP) self._remote_port = self._conn.getinfo(pycurl.PRIMARY_PORT) self._total_time_s = self._conn.getinfo(pycurl.TOTAL_TIME) self._speed_download = self._conn.getinfo(pycurl.SPEED_DOWNLOAD) def _close_connection(self): self._conn.close() def connect(self): try: self._setup_connection() self._perform_connection() except pycurl.error as error_info: self._error_info = error_info finally: self._close_connection() @property def response_code(self): return self._response_code @property def response_body(self): return self._response_buffer.getvalue() @property def error_info(self): return self._error_info @property def size_download(self): return self._size_download @property def speed_download(self): return self._speed_download @property def quadruple(self): return f"{self._local_ip}:{self._local_port},{self._remote_ip}:{self._remote_port}" @property def total_time_s(self): if self._close_connection is None: return None return self._total_time_s class HttpURLTransferBuilder(URLTransferBuilder): def _perform_connection(self): super()._perform_connection() class HttpsURLTransferBuilder(URLTransferBuilder): def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed: int, tcp_mss=None): super().__init__(url, request_resolve, conn_timeout, max_recv_speed, tcp_mss) self._certs_info = None def _setup_connection(self): super()._setup_connection() self._conn.setopt(self._conn.OPT_CERTINFO, 1) self._conn.setopt(self._conn.SSL_VERIFYPEER, False) def _perform_connection(self): super()._perform_connection() self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO) @property def cert_issuer(self) -> str: return self._get_cert_issuer(self._certs_info) def _get_cert_issuer(self, certs) -> str: if certs is None: return "" for cert_info in certs[0]: if cert_info[0] == "Issuer": issuer = cert_info[1] return issuer class DNSQueryBuilder: def __init__(self, domain: str, namesevers: list, conn_timeout: int): self._domain = domain self._nameservers = namesevers self._conn_timeout = conn_timeout self._dns_resolver = None self._dns_answer = None self._error_info = None def _setup(self): self._dns_resolver = dns.resolver.Resolver() self._dns_resolver.nameservers = self._nameservers self._dns_resolver.search = [] self._dns_resolver.use_search_by_default = False self._dns_resolver.timeout = float(self._conn_timeout) self._dns_resolver.lifetime = float(self._conn_timeout) def _query(self, record_type): try: self._setup() self._dns_answer = self._dns_resolver.resolve(self._domain, record_type) except Exception as error_info: self._error_info = error_info @property def error_info(self): return self._error_info @property def rrset_ttl(self): if self._dns_answer is not None: return self._dns_answer.rrset.ttl return None @property def response_answer(self): if self._dns_answer is not None: return self._dns_answer.response.answer return None @property def rdtype_address_pairs(self): rdtype_address_pairs = [] if self._dns_answer is not None: for answer in self._dns_answer.response.answer: for item in answer.items: rdtype_address_pairs.append({"rdtype": item.rdtype, "address": item.address}) return rdtype_address_pairs class DNSQueryTypeABuilder(DNSQueryBuilder): def query(self): self._query("A") class DNSQueryTypeAAAABuilder(DNSQueryBuilder): def query(self): self._query("AAAA") class URLTransferResponseAssertion: @staticmethod def is_cert_issuer_matched(cert_issuer, regex): if cert_issuer is None: return False, f"Error: Failed to verify cert issuer. Actual cert issuer is None." if re.search(regex, cert_issuer, 0): return True, None else: return False, f"Error: Failed to verify cert issuer. Actual cert issuer: {cert_issuer}." @staticmethod def is_response_code_equal(actual_code, expected_code): if actual_code == expected_code: return True, None else: return False, f"Error: Failed to verfiy response code. Actual code: {actual_code}, expected code: {expected_code}." @staticmethod def is_response_body_matched(response_body, regex): response_body_utf8 = response_body.decode('utf-8') if re.search(regex, response_body_utf8, 0): return True, None else: return False, f"Error: The response body fail to match regex: {regex}." @staticmethod def is_response_body_not_matched(response_body, regex): response_body_utf8 = response_body.decode('utf-8') if not re.search(regex, response_body_utf8, 0): return True, None else: return False, f"Error: The response body matched the regex: {regex}." @staticmethod def is_response_body_md5_equal(response_body, expected_md5): response_body_md5_value = hashlib.md5(response_body).hexdigest() if expected_md5 == response_body_md5_value: return True, None else: return False, f"Error: The response body md5 fail to match. Actual md5 value: {response_body_md5_value}." @staticmethod def is_download_size_equal(actual_size, expected_size): if actual_size == expected_size: return True, None else: return False, f"Error: The response body download size fail to match. Actual size: {actual_size}." @staticmethod def is_pycurl_error_code_equal(error_info, expected_error_code): if error_info is None: return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." if error_info.args[0] == expected_error_code: return True, None else: return False, f"Error: The erro code not equal to desired. Actual error info: {error_info}." @staticmethod def is_pycurl_error_none(error_info): if error_info is None: return True, None else: return False, f"Error: The pycurl error is not None. Actual error info: {error_info}." @staticmethod def is_connect_time_less_than(actual_time_s, expected_time_s): if actual_time_s < expected_time_s: return True, None return False, f"Error: The actual time is less than expected. Actual time:{actual_time_s}s, Expected time: {expected_time_s}s." class DNSResponseAssertion: @staticmethod def is_error_info_none(error_info): if error_info is None: return True, None return False, f"Error: The error info: {error_info}." @staticmethod def is_error_type_equal(error_info, expected_type): if error_info is None: return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." if type(error_info) == expected_type: return True, None else: return False, f"Error: error type not equal to {expected_type}, error info: {error_info}." @staticmethod def is_ttl_equal(actual_ttl, expected_ttl): if actual_ttl == expected_ttl: return True, None return False, f"Error: Actual ttl(%s) not equal to %d." %(actual_ttl, expected_ttl) @staticmethod def is_ttl_in_range(actual_ttl, expected_left_edge, expected_right_edge): if actual_ttl >= expected_left_edge and actual_ttl <= expected_right_edge: return True, None return False, f"Error: ttl(%d) not in [%d-%d]." %(actual_ttl, expected_left_edge, expected_right_edge) @staticmethod def is_rdtype_address_pair_included(rdtype_address_pairs, expected_rdtype, expected_address): #rdtype 1: ipv4, 28: ipv6. for pair in rdtype_address_pairs: if pair["address"] == expected_address and pair["rdtype"] == expected_rdtype: return True, None return False, f"Expected rdtype address pair[{expected_rdtype}, {expected_address}] not in {rdtype_address_pairs}." class ProxyCasesRunner: @staticmethod def action_intercept_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_chello_fragment_mss_150(url, resolves, conn_timeout, max_recv_speed): tcp_mss = 150 conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed, tcp_mss) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_chello_fragment_mss_200(url, resolves, conn_timeout, max_recv_speed): tcp_mss = 200 conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed, tcp_mss) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_chello_fragment_mss_300(url, resolves, conn_timeout, max_recv_speed): tcp_mss = 300 conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed, tcp_mss) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_cert_error(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b') if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_1k(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024) if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_4k(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 4) if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_16k(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 16) if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_64k(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 64) if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_256k(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 256) if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_1M(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024) if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_4M(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 4) if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_16M(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 16) if not status: return False, info return True, None @staticmethod def action_intercept_protocol_https_download_size_64M(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 64) if not status: return False, info return True, None @staticmethod def action_redirect_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 302) if not status: return False, info return True, None @staticmethod def action_redirect_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 302) if not status: return False, info return True, None @staticmethod def action_block_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B') if not status: return False, info return True, None @staticmethod def action_block_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B') if not status: return False, info return True, None @staticmethod def action_replace_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared') if not status: return False, info return True, None @staticmethod def action_replace_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared') if not status: return False, info return True, None @staticmethod def action_hijack_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d") if not status: return False, info return True, None @staticmethod def action_hijack_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d") if not status: return False, info return True, None @staticmethod def action_insert_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330') if not status: return False, info return True, None @staticmethod def action_insert_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330') if not status: return False, info return True, None @staticmethod def action_deny_protocol_http_filter_host(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host') if not status: return False, info return True, None @staticmethod def action_deny_protocol_http_filter_url(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url') if not status: return False, info return True, None class ShapingCaseRunner: @staticmethod def no_rate_limit_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200) if not status: return False, info status, info = URLTransferResponseAssertion.is_connect_time_less_than(conn.total_time_s, 5) if not status: return False, info return True, f"The connect total time in seconds is: {conn.total_time_s}." @staticmethod def no_rate_limit_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_connect_time_less_than(conn.total_time_s, 5) if not status: return False, info return True, f"The connect total time in seconds is: {conn.total_time_s}." @staticmethod def rate_limit_0bps_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28) if not status: return False, info return True, None @staticmethod def rate_limit_0bps_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28) if not status: return False, info return True, None @staticmethod def rate_limit_1000gbps_protocol_http(url, resolves, conn_timeout, max_recv_speed): server_ip, server_port = TcpPacketsCapture.read_server_ip_and_port_from_resolve(resolves) capture = TcpPacketsCapture(server_ip, server_port) capture.start() conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() capture.stop() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200) if not status: return False, info actual_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple) status, info = TcpPacketsCaptureAssertion.is_dscp_equal(actual_dscp, 8) if not status: return False, info return True, None @staticmethod def rate_limit_1000gbps_protocol_https(url, resolves, conn_timeout, max_recv_speed): server_ip, server_port = TcpPacketsCapture.read_server_ip_and_port_from_resolve(resolves) capture = TcpPacketsCapture(server_ip, server_port) capture.start() conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() capture.stop() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not status: return False, info actual_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple) status, info = TcpPacketsCaptureAssertion.is_dscp_equal(actual_dscp, 8) if not status: return False, info return True, None class FirewallCasesRunner: @staticmethod def action_bypass_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not status: return False, info return True, None @staticmethod def action_allow_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200) if not status: return False, info return True, None @staticmethod def action_deny_subaction_drop_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28) if not status: return False, info return True, None @staticmethod def action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56) if not status: return False, info return True, None @staticmethod def action_deny_subaction_reset_protocol_http_filter_host(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56) if not status: return False, info return True, None @staticmethod def action_deny_subaction_reset_protocol_http_filter_url(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56) if not status: return False, info return True, None @staticmethod def action_deny_subaction_block_protocol_http(url, resolves, conn_timeout, max_recv_speed): conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 403) if not status: return False, info status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r"dign-testing-deny-block") if not status: return False, info return True, None @staticmethod def action_allow_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info) if not status: return False, info status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') if not status: return False, info status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200) if not status: return False, info return True, None @staticmethod def action_deny_subaction_drop_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28) if not status: return False, info return True, None @staticmethod def action_deny_subaction_reset_protocol_https(url, resolves, conn_timeout, max_recv_speed): conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed) conn.connect() status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 35) if not status: return False, info return True, None @staticmethod def action_deny_subaction_drop_protocol_dns(domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() status, info = DNSResponseAssertion.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout) if not status: return False, info return True, None @staticmethod def action_deny_subaction_redirect_protocol_dns_type_a(domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() status, info = DNSResponseAssertion.is_error_info_none(request.error_info) if not status: return False, info status, info = DNSResponseAssertion.is_ttl_equal(request.rrset_ttl, 333) if not status: return False, info status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101") if not status: return False, info return True, None @staticmethod def action_deny_subaction_redirect_protocol_dns_type_aaaa(domain, nameservers, conn_timeout): request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout) request.query() status, info = DNSResponseAssertion.is_error_info_none(request.error_info) if not status: return False, info status, info = DNSResponseAssertion.is_ttl_equal(request.rrset_ttl, 333) if not status: return False, info status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001") if not status: return False, info return True, None @staticmethod def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(domain, nameservers, conn_timeout): request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) request.query() status, info = DNSResponseAssertion.is_error_info_none(request.error_info) if not status: return False, info status, info = DNSResponseAssertion.is_ttl_in_range(request.rrset_ttl, 400, 500) if not status: return False, info status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101") if not status: return False, info return True, None @staticmethod def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(domain, nameservers, conn_timeout): request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout) request.query() status, info = DNSResponseAssertion.is_error_info_none(request.error_info) if not status: return False, info status, info = DNSResponseAssertion.is_ttl_in_range(request.rrset_ttl, 400, 500) if not status: return False, info status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001") if not status: return False, info return True, None class ResultExportBuilder: COLUMN_0 = "Test cases" COLUMN_1 = "Result" COLUMN_2 = "Description" RESULT_OK = "ok" RESULT_FAIL = "FAIL" DEFAULT_INFO = "-" def __init__(self): self._exporter = None self._create_exporter() def _create_exporter(self): self._exporter = PrettyTable() self._exporter.vrules = VRuleStyle.NONE self._exporter.field_names = [self.COLUMN_0, self.COLUMN_1, self.COLUMN_2] self._exporter.align[self.COLUMN_0] = "l" self._exporter.align[self.COLUMN_1] = "l" self._exporter.align[self.COLUMN_2] = "l" def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True): status, description = self._convert_case_result(case_result) if enable_ouptut_by_case: self._output_by_case_name(case_name, status) #if not case_result[0]: self._add_exporter_row(case_name, status, description) def _convert_case_result(self, case_result): status = self.RESULT_OK if not case_result[0]: status = self.RESULT_FAIL description = self.DEFAULT_INFO if case_result[1] is not None: description = case_result[1] return status, description def _output_by_case_name(self, case_name, case_status): print(f"{case_name} ... {case_status}") def _add_exporter_row(self, column0: str, column1: str, column2: str): self._exporter.add_row([column0, column1, column2]) def export(self): if len(self._exporter._rows) > 0: print("\nSummary:") print(self._exporter) class DiagnoseCasesRunner: def __init__(self, cmd_parser: CommandParser): self._loop = cmd_parser.loop self._count = cmd_parser.count self._interval_s = cmd_parser.interval_s self._case_names_regexp = cmd_parser.case_names_regexp self._config_path = cmd_parser.config_path self._service_function_names_regexp = cmd_parser.service_function_names_regexp self._service_function_config_path = cmd_parser.service_function_config_path self._random_delay_seconds = cmd_parser.delay_seconds self._config_loader = ConfigLoader(self._config_path) self._cases_info = [ { "name": "Firewall_Allow_HTTP", "protocol_type": "http", "test_function": FirewallCasesRunner.action_allow_protocol_http, "request_content": "http://http.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Firewall_Allow_HTTPS", "protocol_type": "https", "test_function": FirewallCasesRunner.action_allow_protocol_https, "request_content": "https://sha512.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyReset_HTTP", "protocol_type": "http", "test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http, "request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyReset_HTTPS", "protocol_type": "https", "test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_https, "request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyReset_FilterHost_HTTP", "protocol_type": "http", "test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http_filter_host, "request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyReset_FilterURL_HTTP", "protocol_type": "http", "test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http_filter_url, "request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyBlock_HTTP", "protocol_type": "http", "test_function": FirewallCasesRunner.action_deny_subaction_block_protocol_http, "request_content": "http://http-login.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyDrop_HTTP", "protocol_type": "http", "test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_http, "request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website", "conn_timeout": 4, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyDrop_HTTPS", "protocol_type": "https", "test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_https, "request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website", "conn_timeout": 4, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyDrop_DNS", "protocol_type": "dns", "test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_dns, "request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website", 'conn_timeout': 3, 'max_recv_speed': 6553600 }, { "name": "Firewall_DenyRedirect_A_DNS", "protocol_type": "dns", "test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_a, "request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website", "conn_timeout": 3, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyRedirect_AAAA_DNS", "protocol_type": "dns", "test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_aaaa, "request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website", "conn_timeout": 3, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyRedirect_ARangeTTL_DNS", "protocol_type": "dns", "test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl, "request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website", "conn_timeout": 3, "max_recv_speed": 6553600 }, { "name": "Firewall_DenyRedirect_AAAARangeTTL_DNS", "protocol_type": "dns", "test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl, "request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website", "conn_timeout": 3, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https, "request_content": "https://sha256.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_ChelloFragment_MSS_150", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_chello_fragment_mss_150, "request_content": "https://sha256.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_ChelloFragment_MSS_200", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_chello_fragment_mss_200, "request_content": "https://sha256.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_ChelloFragment_MSS_300", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_chello_fragment_mss_300, "request_content": "https://sha256.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_CertExpired", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error, "request_content": "https://expired.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_CertSelfSigned", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error, "request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_CertUntrustedRoot", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error, "request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_1k", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_1k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_4k", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_4k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_16k", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_16k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_64k", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_64k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_256k", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_256k, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_1M", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_1M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_4M", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_4M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_16M", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_16M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M", "conn_timeout": 4, "max_recv_speed": 6553600 }, { "name": "Proxy_Intercept_HTTPS_Response_64M", "protocol_type": "https", "test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_64M, "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M", "conn_timeout": 12, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Redirect_HTTP", "protocol_type": "http", "test_function": ProxyCasesRunner.action_redirect_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Redirect_HTTPS", "protocol_type": "https", "test_function": ProxyCasesRunner.action_redirect_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Replace_HTTP", "protocol_type": "http", "test_function": ProxyCasesRunner.action_replace_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Replace_HTTPS", "protocol_type": "https", "test_function": ProxyCasesRunner.action_replace_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Deny_HTTP", "protocol_type": "http", "test_function": ProxyCasesRunner.action_block_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Deny_HTTPS", "protocol_type": "https", "test_function": ProxyCasesRunner.action_block_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Deny_FilterHost_HTTP", "protocol_type": "http", "test_function": ProxyCasesRunner.action_deny_protocol_http_filter_host, "request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Deny_FilterURL_HTTP", "protocol_type": "http", "test_function": ProxyCasesRunner.action_deny_protocol_http_filter_url, "request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Hijack_HTTP", "protocol_type": "http", "test_function": ProxyCasesRunner.action_hijack_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Hijack_HTTPS", "protocol_type": "https", "test_function": ProxyCasesRunner.action_hijack_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Insert_HTTP", "protocol_type": "http", "test_function": ProxyCasesRunner.action_insert_protocol_http, "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Proxy_Manipulation_Insert_HTTPS", "protocol_type": "https", "test_function": ProxyCasesRunner.action_insert_protocol_https, "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html", "conn_timeout": 1, "max_recv_speed": 6553600 }, { "name": "Shaping_NoRateLimit_HTTP", "protocol_type": "http", "test_function": ShapingCaseRunner.no_rate_limit_protocol_http, "request_content": "http://testing-shaping-no-rate-limit.badssl.selftest.gdnt-cloud.website/resources/64M", "conn_timeout": 12, "max_recv_speed": None }, { "name": "Shaping_NoRateLimit_HTTPS", "protocol_type": "https", "test_function": ShapingCaseRunner.no_rate_limit_protocol_https, "request_content": "https://testing-shaping-no-rate-limit.badssl.selftest.gdnt-cloud.website/resources/64M", "conn_timeout": 12, "max_recv_speed": None }, { "name": "Shaping_RateLimit0bps_HTTP", "protocol_type": "http", "test_function": ShapingCaseRunner.rate_limit_0bps_protocol_http, "request_content": "http://testing-shaping-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/64M", "conn_timeout": 12, "max_recv_speed": 6553600 }, { "name": "Shaping_RateLimit0bps_HTTPS", "protocol_type": "https", "test_function": ShapingCaseRunner.rate_limit_0bps_protocol_https, "request_content": "https://testing-shaping-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/64M", "conn_timeout": 12, "max_recv_speed": 6553600 }, { "name": "Shaping_RateLimit1000gbps_HTTP", "protocol_type": "http", "test_function": ShapingCaseRunner.rate_limit_1000gbps_protocol_http, "request_content": "http://testing-shaping-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/64M", "conn_timeout": 12, "max_recv_speed": 6553600 }, { "name": "Shaping_RateLimit1000gbps_HTTPS", "protocol_type": "https", "test_function": ShapingCaseRunner.rate_limit_1000gbps_protocol_https, "request_content": "https://testing-shaping-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/64M", "conn_timeout": 12, "max_recv_speed": 6553600 } ] def run_cases(self): if self._random_delay_seconds is not None: time.sleep(self._random_delay_seconds) run_count = 1 while True: print("\nRUN %d" % run_count) self.__run_cases_by_service_function_ids() if (self._loop is not True) and self._count >= run_count: break time.sleep(self._interval_s) run_count = run_count + 1 def __run_cases_by_service_function_ids(self): sf_loader = ServiceFunctionConfigLoader(self._service_function_config_path, self._service_function_names_regexp) for sf_pair in sf_loader.name_id_pairs: start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(format(("Service function name: " + str(sf_pair["name"]) + ",Test start time: " + start_timestamp),'#^100s')) self._run_cases_in_one_service_function_id(sf_pair["id"]) end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(format(("Service function name: " + str(sf_pair["name"]) + ",Test end time: " + end_timestamp),'=^100s')) def _run_cases_in_one_service_function_id(self, service_function_id): resolve_builder = ServerAddressBuilder(service_function_id) exporter = ResultExportBuilder() for case_info in self._cases_info: if re.fullmatch(self._case_names_regexp, case_info["name"]): self._run_one_case(case_info, resolve_builder, exporter) exporter.export() def _run_one_case(self, case_info, resolve_builder, exporter): conn_timeout = self._get_conn_timeout_from_configs(case_info) max_recv_speed = self._get_max_recv_speed_from_configs(case_info) test_func = case_info.get("test_function") if test_func: if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https": resolve = resolve_builder.read_resolves_by_url(case_info["request_content"]) ret = test_func(case_info["request_content"], resolve, conn_timeout, max_recv_speed) if case_info["protocol_type"] == "dns": ret = test_func(case_info["request_content"], resolve_builder.nameservers, conn_timeout) exporter.append_case_result(case_info["name"], ret) def _get_conn_timeout_from_configs(self, case_info): conn_timeout = self._config_loader.get_value_by_section_and_key(case_info["name"], "conn_timeout") if conn_timeout is not None: return conn_timeout else: return case_info["conn_timeout"] def _get_max_recv_speed_from_configs(self, case_info): max_recv_speed = self._config_loader.get_value_by_section_and_key(case_info["name"], "max_recv_speed") if max_recv_speed is not None: return max_recv_speed else: return case_info["max_recv_speed"] if __name__ == '__main__': cmd_parser = CommandParser() dign = DiagnoseCasesRunner(cmd_parser) dign.run_cases()