This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-tsg-diagnose/images_build/client/dign_client/bin/client.py
2024-05-27 14:50:58 +08:00

1542 lines
69 KiB
Python

import sys
import unittest
import json
import pycurl
import os
import re
import time
import io
from io import BytesIO
# import getopt
# import ciunittest
import argparse
import hashlib
from configparser import ConfigParser
import random
import dns.exception
import dns.resolver
import sys
import logging
import copy
from prettytable import PrettyTable,NONE,HEADER
import yaml
from urllib.parse import urlparse
from scapy.all import *
from scapy.layers.inet import IP, TCP
from scapy.sendrecv import AsyncSniffer
from urllib.parse import urlparse, parse_qs
class ConfigLoader:
DEFAULT_CONFIGS = {
'test_firewallBypass_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyDrop_dns': {
'conn_timeout': 3,'max_recv_speed_large': 6553600
},
'test_firewallDenyRedirectA_dns': {
'conn_timeout': 3,'max_recv_speed_large': 6553600
},
'test_firewallDenyRedirectAAAA_dns': {
'conn_timeout': 3,'max_recv_speed_large': 6553600
},
'test_firewallDenyRedirectARangeTTL_dns': {
'conn_timeout': 3,'max_recv_speed_large': 6553600
},
'test_firewallDenyRedirectAAAARangeTTL_dns': {
'conn_timeout': 3,'max_recv_speed_large': 6553600
},
'test_dnsRequest_allow_rdtype_a': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_dnsRequest_allow_rdtype_aaaa': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_dnsRequest_allow_rdtype_cname': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslCerterrExpired': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslCerterrSelfsigned': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslCerterrUntrustedroot': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyRedirect_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyBlock_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyReplace_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyHijack_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyInsert_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyRedirect_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyBlock_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyReplace_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyHijack_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyInsert_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize1k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize4k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize16k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize64k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize256k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize1M': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize4M': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize16M': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize64M': {
'conn_timeout': 12,'max_recv_speed_large': 6553600
},
'test_firewallAllow_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyDrop_http': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_firewallDenyReset_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyBlock_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallAllow_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyDrop_ssl': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_firewallDenyReset_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyResetFilterHost_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyResetFilterURL_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyDenyFilterHost_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyDenyFilterURL_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_shaping_ratelimit_0bps_http': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_shaping_ratelimit_0bps_https': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_shaping_ratelimit_1000gbps_http': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_shaping_ratelimit_1000gbps_https': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
}
}
def __init__(self, config_path: str):
self.__config_path = config_path
self.__configs = copy.deepcopy(self.DEFAULT_CONFIGS)
self.__load_configs()
def __load_configs(self):
config_parser = ConfigParser()
config_parser.read(self.__config_path, encoding='UTF-8')
for section in config_parser.sections():
if section in self.__configs:
self.__configs[section].update(config_parser.items(section))
else:
self.__configs[section] = dict(config_parser.items(section))
def get_value_by_section_and_key(self, section, key):
if section in self.__configs and key in self.__configs[section]:
value = self.__configs[section][key]
if isinstance(value, str) and value.isdigit():
return int(value)
else:
return value
else:
raise KeyError("%s not in configs or %s.%s not in configs." % (section, section, key))
class ServiceFunctionConfigLoader:
TABLE_NAME = "service_function_maps"
SF_NAME_KEY = "service_function_name"
DEVICE_ID_KEY = "device_id"
def __init__(self, config_path, name_pattern):
self._config_path = config_path
self._name_pattern = name_pattern
self._configs = None
self._load_configs()
self._pairs = self._read_name_id_pairs_by_name_pattern(self._name_pattern)
def _load_configs(self):
try:
with open(self._config_path, 'r') as f:
load_configs = yaml.safe_load(f)
if load_configs is not None:
self._configs = load_configs
else:
print(f"Error: No config loaded from path: {self._config_path}")
except FileNotFoundError:
print(f"Error: File not found: {self._config_path}")
except yaml.YAMLError as e:
print(f"Error parsing YAML file: {e}")
def _read_name_id_pairs_by_name_pattern(self, name_pattern):
if (self._configs is None) or (self.TABLE_NAME not in self._configs):
return []
return [
{"name": item[self.SF_NAME_KEY], "id": int(item[self.DEVICE_ID_KEY])}
for item in self._configs[self.TABLE_NAME]
if re.match(name_pattern, item[self.SF_NAME_KEY])
]
@property
def name_id_pairs(self):
return self._pairs
class CommandParser:
COUNT_MIN = 1
COUNT_MAX = 65535
INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$'
#INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
def __init__(self):
self.__start_up()
def __start_up(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose")
parser.add_argument('-i','--interval', type = int, default = 30,
help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.')
parser.add_argument('-c','--count', type = int, default = 1,
help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].')
parser.add_argument('--periodical', action='store_true', default = False,
help='Enable TSG diagnose periodical, exit when recv a signal.')
parser.add_argument('--service_function_names_regexp', type = str, default = '.+',
help = "Regexp of the service function names to run. Example: .+")
parser.add_argument('--service_function_config_path', type = str, default = '/opt/dign_client/share/service_function_maps.yaml',
help = "Specifies the service function config file, default /opt/dign_client/share/service_function_maps.yaml")
parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf',
help='Specifies the config file, default /opt/dign_client/etc/client.conf')
parser.add_argument('--case_names_regexp', type = str, default = '.+',
help='Regexp of the case names to run. Example: .+')
parser.add_argument('--enable_delay', action='store_true', default = False,
help='Enable a random delay between 1 and 30 seconds before running a case.')
parser.add_argument('--delay_left_edge', type = int, default = 1,
help='The seconds of the delay left edge. Default: 1.')
parser.add_argument('--delay_right_edge', type = int, default = 30,
help='The seconds of the delay right edge. Default: 30.')
# parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running')
args = parser.parse_args()
if not self.__is_legal_args(args):
parser.print_help()
sys.exit(1)
self.__interval_s = args.interval
self.__count = args.count
self.__loop = args.periodical
self.__config_path = args.config_path
self.__case_names_regexp = args.case_names_regexp
self.__service_function_names_regexp = args.service_function_names_regexp
self.__service_function_config_path = args.service_function_config_path
self._enable_delay = args.enable_delay
self._delay_left_edge = args.delay_left_edge
self._delay_right_edge = args.delay_right_edge
def __is_legal_args(self, args) -> bool:
if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX:
print("Error: the number of TSG diagnoses must in [1,65535]")
return False
if args.delay_left_edge > args.delay_right_edge:
print("Error: The delay in seconds on the left edge is greater than on the right edge.")
return True
@property
def interval_s(self):
return self.__interval_s
@property
def count(self):
return self.__count
@property
def loop(self):
return self.__loop
@property
def config_path(self):
return self.__config_path
@property
def case_names_regexp(self):
return r"{}".format(self.__case_names_regexp)
@property
def service_function_names_regexp(self):
return r"{}".format(self.__service_function_names_regexp)
@property
def service_function_config_path(self):
return self.__service_function_config_path
@property
def delay_seconds(self):
if self._enable_delay:
return random.randint(self._delay_left_edge, self._delay_right_edge)
else:
return None
class ServerAddressBuilder:
IPv4_4TH_OCTET_LEFT_EDGE = 101
IPv4_1_TO_3TH_OCTET = "192.0.2"
def __init__(self, service_function_index: int):
self._service_function_index = service_function_index
self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index
def read_resolves_by_url(self, url):
parsed_url = urlparse(url)
port = parsed_url.port
if not port:
if parsed_url.scheme == 'https':
port = 443
elif parsed_url.scheme == 'http':
port = 80
return [f"{parsed_url.hostname}:{port}:{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}"]
# @property
# def resolves(self) -> list:
# return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST]
@property
def nameservers(self) -> list:
return [f'{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}']
class TcpPacketsCapture:
IFACE = "net1"
DSCP_KEY = ["DSCP"]
def __init__(self, server_ip, server_port):
self._server_ip = server_ip
self._server_port = server_port
self._quadruple_to_dscp_table = {}
self._build_filter()
# def _read_server_ip_and_port(self):
def _build_filter(self):
self._filter = f"tcp and src host {self._server_ip} and src port {self._server_port}"
def start(self):
self._sniff_thread = AsyncSniffer(iface=self.IFACE, prn=self._packet_callback, filter=self._filter)
self._sniff_thread.start()
def _packet_callback(self, packet):
if IP in packet and TCP in packet:
src_ip = packet[IP].src
if src_ip == self._server_ip:
dscp_value = self._read_dscp_value_from_packet(packet)
quadruple = self._build_quadruple(packet, True)
self._update_quadruple_to_dscp_table(quadruple, dscp_value)
def _read_dscp_value_from_packet(self, packet):
ip_header = packet[IP]
return (ip_header.tos & 0xfc) >> 2
def _build_quadruple(self, packet, is_s2c):
src_ip = packet[IP].src
dst_ip = packet[IP].dst
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
if is_s2c:
return f"{dst_ip}:{dst_port},{src_ip}:{src_port}"
else:
return f"{src_ip}:{src_port},{dst_ip}:{dst_port}"
def _update_quadruple_to_dscp_table(self, quadruple, dscp_value):
self._quadruple_to_dscp_table[quadruple] = dscp_value
def stop(self):
self._sniff_thread.stop()
self._sniff_thread.join()
def read_dscp_value_by_quadruple(self, quadruple):
if quadruple in self._quadruple_to_dscp_table:
return self._quadruple_to_dscp_table[quadruple]
else:
return None
class TcpPacketsCaptureAnalyzer:
def is_dscp_equal(self, present_dscp, desired_dscp):
if present_dscp is None:
return False, f"Error: Not read DSCP value."
if present_dscp == desired_dscp:
return True, None
else:
return False, f"Error: Failed to verify DSCP value. Present DSCP: {present_dscp}, desired DSCP: {desired_dscp}."
class URLTransferBuilder:
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int):
self._url = url
self._request_resolve = request_resolve
self._conn_timeout = conn_timeout
self._max_recv_speed_large = max_recv_speed_large
self._conn = None
self._response_code = None
self._response_buffer = BytesIO()
self._error_info = None
self._size_download = None
self._local_ip = None
self._local_port = None
self._remote_ip = None
self._remote_port = None
def _setup_connection(self):
self._response_buffer = BytesIO()
self._conn = pycurl.Curl()
self._conn.setopt(self._conn.WRITEFUNCTION, self._response_buffer.write)
self._conn.setopt(self._conn.RESOLVE, self._request_resolve)
self._conn.setopt(self._conn.URL, self._url)
self._conn.setopt(self._conn.TIMEOUT, self._conn_timeout)
self._conn.setopt(self._conn.MAX_RECV_SPEED_LARGE, self._max_recv_speed_large)
def _perform_connection(self):
self._conn.perform()
self._response_code = self._conn.getinfo(self._conn.RESPONSE_CODE)
self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD)
self._local_ip = self._conn.getinfo(pycurl.LOCAL_IP)
self._local_port = self._conn.getinfo(pycurl.LOCAL_PORT)
self._remote_ip = self._conn.getinfo(pycurl.PRIMARY_IP)
self._remote_port = self._conn.getinfo(pycurl.PRIMARY_PORT)
def _close_connection(self):
self._conn.close()
def connect(self):
try:
self._setup_connection()
self._perform_connection()
except pycurl.error as error_info:
self._error_info = error_info
finally:
self._close_connection()
@property
def response_code(self):
return self._response_code
@property
def response_body(self):
return self._response_buffer.getvalue()
@property
def error_info(self):
return self._error_info
@property
def size_download(self):
return self._size_download
@property
def quadruple(self):
return f"{self._local_ip}:{self._local_port},{self._remote_ip}:{self._remote_port}"
class HttpURLTransferBuilder(URLTransferBuilder):
def _perform_connection(self):
super()._perform_connection()
class HttpsURLTransferBuilder(URLTransferBuilder):
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int):
super().__init__(url, request_resolve, conn_timeout, max_recv_speed_large)
self._certs_info = None
def _setup_connection(self):
super()._setup_connection()
self._conn.setopt(self._conn.OPT_CERTINFO, 1)
self._conn.setopt(self._conn.SSL_VERIFYPEER, False)
def _perform_connection(self):
super()._perform_connection()
self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO)
@property
def cert_issuer(self) -> str:
return self._get_cert_issuer(self._certs_info)
def _get_cert_issuer(self, certs) -> str:
if certs is None:
return ""
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info[1]
return issuer
class DNSQueryBuilder:
def __init__(self, domain: str, namesevers: list, conn_timeout: int):
self._domain = domain
self._nameservers = namesevers
self._conn_timeout = conn_timeout
self._dns_resolver = None
self._dns_answer = None
self._error_info = None
def _setup(self):
self._dns_resolver = dns.resolver.Resolver()
self._dns_resolver.nameservers = self._nameservers
self._dns_resolver.search = []
self._dns_resolver.use_search_by_default = False
self._dns_resolver.timeout = float(self._conn_timeout)
self._dns_resolver.lifetime = float(self._conn_timeout)
def _query(self, record_type):
try:
self._setup()
self._dns_answer = self._dns_resolver.query(self._domain, record_type)
except Exception as error_info:
self._error_info = error_info
@property
def error_info(self):
return self._error_info
@property
def rrset_ttl(self):
if self._dns_answer is not None:
return self._dns_answer.rrset.ttl
return None
@property
def response_answer(self):
if self._dns_answer is not None:
return self._dns_answer.response.answer
return None
class DNSQueryTypeABuilder(DNSQueryBuilder):
def query(self):
self._query("A")
class DNSQueryTypeAAAABuilder(DNSQueryBuilder):
def query(self):
self._query("AAAA")
class URLTransferResponseAnalyzer:
def is_cert_issuer_matched(self, present_cert_issuer, desired_pattern):
if present_cert_issuer is None:
return False, f"Error: Failed to verify cert issuer. Present cert issuer is None."
if re.search(desired_pattern, present_cert_issuer, 0):
return True, None
else:
return False, f"Error: Failed to verify cert issuer. Present cert issuer: {present_cert_issuer}."
def is_response_code_equal(self, present_code, desired_code):
if present_code == desired_code:
return True, None
else:
return False, f"Error: Failed to verfiy response code. Present code: {present_code}, desired code: {desired_code}."
def is_response_body_matched(self, present_body, desired_pattern):
present_body_utf8 = present_body.decode('utf-8')
if re.search(desired_pattern, present_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body fail to match the desired content."
def is_response_body_not_matched(self, present_body, desired_pattern):
present_body_utf8 = present_body.decode('utf-8')
if not re.search(desired_pattern, present_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body matched the desired content: {desired_pattern}."
def is_response_body_md5_equal(self, present_body, disired_md5):
present_body_md5_value = hashlib.md5(present_body).hexdigest()
if re.search(disired_md5, present_body_md5_value, 0):
return True, None
else:
return False, f"Error: The response body md5 fail to match. Present md5 value: {present_body_md5_value}."
def is_download_size_equal(self, present_size, disired_size):
if present_size == disired_size:
return True, None
else:
return False, f"Error: The response body download size fail to match. present_size: {present_size}."
def is_pycurl_error_code_equal(self, present_error_info, disired_error_code):
if present_error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
if present_error_info.args[0] == disired_error_code:
return True, None
else:
return False, f"Error: The erro code not equal to desired. Present error info: {present_error_info}."
def is_pycurl_error_none(self, present_error_info):
if present_error_info is None:
return True, None
else:
return False, f"Error: The pycurl error is not None. Present error info: {present_error_info}."
class DNSResponseAnalyzer:
def is_error_info_none(self, present_error_info):
if present_error_info is None:
return True, None
return False, f"Error: The error info: {present_error_info}."
def is_error_type_equal(self, present_error_info, desired_type):
if present_error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
if type(present_error_info) == desired_type:
return True, None
else:
return False, f"Error: error type not equal to {desired_type}, error info: {present_error_info}."
def is_ttl_equal(self, present_ttl, desired_ttl):
if present_ttl == desired_ttl:
return True, None
return False, f"Error: Present ttl(%s) not equal to %d." %(present_ttl, desired_ttl)
def is_ttl_in_range(self, present_ttl, desired_left_edge, desired_right_edge):
if present_ttl >= desired_left_edge and present_ttl <= desired_right_edge:
return True, None
return False, f"Error: ttl(%d) not in [%d-%d]." %(present_ttl, desired_left_edge, desired_right_edge)
def is_address_equal(self, response_answer, desired_address, desired_address_type):
if desired_address_type == "ipv4":
rdtype = 1
if desired_address_type == "ipv6":
rdtype = 28
for i in response_answer:
for j in i.items:
if j.rdtype == rdtype: # 1: ipv4, 28: ipv6
if j.address == desired_address:
return True, None
else:
return False, f"Error: Present address error."
else:
return False, f"Error: Response rdtype error."
class ProxyCasesRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
def action_redirect_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
desired_cert_issuer_pattern = r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b'
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302)
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_redirect_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302)
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_block_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
def action_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
def action_replace_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not is_body_matched[0]:
return False, is_body_matched[1]
is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not is_body_matched[0]:
return False, is_body_not_matched[1]
return True, None
def action_replace_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not is_body_matched[0]:
return False, is_body_matched[1]
is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not is_body_matched[0]:
return False, is_body_not_matched[1]
return True, None
def action_hijack_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_hijack_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_insert_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not is_json_key_matched[0]:
return False, is_json_key_matched[1]
is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not is_json_val_matched[0]:
return False, is_json_val_matched[1]
return True, None
def action_insert_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not is_json_key_matched[0]:
return False, is_json_key_matched[1]
is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not is_json_val_matched[0]:
return False, is_json_key_matched[1]
return True, None
def action_deny_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host')
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
def action_deny_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url')
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
class ShapingCaseRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
self._dns_analyzer = DNSResponseAnalyzer()
self._capture_analyzer = TcpPacketsCaptureAnalyzer()
def rate_limit_0bps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def rate_limit_0bps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def rate_limit_1000gbps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
server_ip, server_port = self._read_server_ip_and_port_from_resolve(resolves)
capture = TcpPacketsCapture(server_ip, server_port)
capture.start()
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
capture.stop()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
present_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
is_dscp_equal = self._capture_analyzer.is_dscp_equal(present_dscp, 8)
if not is_dscp_equal[0]:
return False, is_dscp_equal[1]
return True, None
def rate_limit_1000gbps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
server_ip, server_port = self._read_server_ip_and_port_from_resolve(resolves)
capture = TcpPacketsCapture(server_ip, server_port)
capture.start()
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
capture.stop()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
present_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
is_dscp_equal = self._capture_analyzer.is_dscp_equal(present_dscp, 8)
if not is_dscp_equal[0]:
return False, is_dscp_equal[1]
return True, None
def _read_server_ip_and_port_from_resolve(self, resolves):
resolve = resolves[0]
resolve_split = resolve.split(":")
return resolve_split[2], resolve_split[1]
class FirewallCasesRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
self._dns_analyzer = DNSResponseAnalyzer()
def action_bypass_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
return True, None
def action_intercept_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
return True, None
def action_intercept_protocol_https_cert_error(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
return True, None
def action_allow_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_deny_subaction_drop_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_deny_subaction_reset_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 56)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_deny_subaction_reset_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large):
return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large)
def action_deny_subaction_reset_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large):
return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large)
def action_deny_subaction_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 403)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r"dign-testing-deny-block")
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
def action_allow_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_deny_subaction_drop_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_deny_subaction_reset_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 35)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_intercept_protocol_https_download_size_1k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024)
def action_intercept_protocol_https_download_size_4k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 4)
def action_intercept_protocol_https_download_size_16k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 16)
def action_intercept_protocol_https_download_size_64k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 64)
def action_intercept_protocol_https_download_size_256k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 256)
def action_intercept_protocol_https_download_size_1M(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024)
def action_intercept_protocol_https_download_size_4M(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 4)
def action_intercept_protocol_https_download_size_16M(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 16)
def action_intercept_protocol_https_download_size_64M(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 64)
def _action_intercept_protocol_ssl_by_download_size(self, url, resolves, conn_timeout, max_recv_speed_large, download_size):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_download_size_equal = self._analyzer.is_download_size_equal(conn.size_download, download_size)
if not is_download_size_equal[0]:
return False, is_download_size_equal[1]
return True, None
def action_deny_subaction_drop_protocol_dns(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_type_equal = self._dns_analyzer.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_deny_subaction_redirect_protocol_dns_type_a(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4")
if not is_address_equal[0]:
return False, is_address_equal[1]
return True, None
def action_deny_subaction_redirect_protocol_dns_type_aaaa(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6")
if not is_address_equal[0]:
return False, is_address_equal[1]
return True, None
def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4")
if not is_address_equal[0]:
return False, is_address_equal[1]
return True, None
def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6")
if not is_address_equal[0]:
return False, is_address_equal[1]
return True, None
class ResultExportBuilder:
def __init__(self):
self._exporter = None
self._create_exporter()
def _create_exporter(self):
self._exporter = PrettyTable()
self._exporter.vrules = NONE
self._exporter.field_names = ["Case name", "Status", "Description"]
self._exporter.align["Case name"] = "l"
self._exporter.align["Status"] = "l"
self._exporter.align["Description"] = "l"
def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True):
status, description = self._convert_case_result(case_result)
if enable_ouptut_by_case:
self._output_by_case_name(case_name, status)
if not case_result[0]:
self._add_exporter_row(case_name, status, description)
def _convert_case_result(self, case_result):
status = "ok"
if not case_result[0]:
status = "FAIL"
description = "-"
if case_result[1] is not None:
description = case_result[1]
return status, description
def _output_by_case_name(self, case_name, case_status):
print(f"{case_name} ... {case_status}")
def _add_exporter_row(self, case_name: str, status: str, description: str):
self._exporter.add_row([case_name, status, description])
def export(self):
if len(self._exporter._rows) > 0:
print(self._exporter)
class DiagnoseCasesRunner:
def __init__(self, cmd_parser: CommandParser):
self._loop = cmd_parser.loop
self._count = cmd_parser.count
self._interval_s = cmd_parser.interval_s
self._case_names_regexp = cmd_parser.case_names_regexp
self._config_path = cmd_parser.config_path
self._service_function_names_regexp = cmd_parser.service_function_names_regexp
self._service_function_config_path = cmd_parser.service_function_config_path
self._random_delay_seconds = cmd_parser.delay_seconds
self._config_loader = ConfigLoader(self._config_path)
self._proxy_case = ProxyCasesRunner()
self._firewall_case = FirewallCasesRunner()
self._shaping_case = ShapingCaseRunner()
self._cases_info = [
{
"name": "test_firewallBypass_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_bypass_protocol_https,
"request_content": "https://sha384.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https,
"request_content": "https://sha256.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_sslCerterrExpired",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
"request_content": "https://expired.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_sslCerterrSelfsigned",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
"request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_sslCerterrUntrustedroot",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
"request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_proxyRedirect_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_redirect_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js"
},
{
"name": "test_proxyBlock_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_block_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js"
},
{
"name": "test_proxyReplace_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_replace_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js"
},
{
"name": "test_proxyHijack_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_hijack_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js"
},
{
"name": "test_proxyInsert_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_insert_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html"
},
{
"name": "test_proxyRedirect_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_redirect_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js"
},
{
"name": "test_proxyBlock_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_block_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js"
},
{
"name": "test_proxyReplace_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_replace_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js"
},
{
"name": "test_proxyHijack_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_hijack_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js"
},
{
"name": "test_proxyInsert_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_insert_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html"
},
{
"name": "test_firewallAllow_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_allow_protocol_http,
"request_content": "http://http.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyDrop_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_http,
"request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyReset_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http,
"request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyBlock_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_block_protocol_http,
"request_content": "http://http-login.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallAllow_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_allow_protocol_https,
"request_content": "https://sha512.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyDrop_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_https,
"request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyReset_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_https,
"request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyDrop_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_dns,
"request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyRedirectA_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a,
"request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyRedirectAAAA_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa,
"request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyRedirectARangeTTL_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl,
"request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyRedirectAAAARangeTTL_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl,
"request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_sslDownloadSize1k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_1k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k"
},
{
"name": "test_firewallIntercept_sslDownloadSize4k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_4k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k"
},
{
"name": "test_firewallIntercept_sslDownloadSize16k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_16k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k"
},
{
"name": "test_firewallIntercept_sslDownloadSize64k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_64k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k"
},
{
"name": "test_firewallIntercept_sslDownloadSize256k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_256k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k"
},
{
"name": "test_firewallIntercept_sslDownloadSize1M",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_1M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M"
},
{
"name": "test_firewallIntercept_sslDownloadSize4M",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_4M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M"
},
{
"name": "test_firewallIntercept_sslDownloadSize16M",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_16M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M"
},
{
"name": "test_firewallIntercept_sslDownloadSize64M",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_64M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M"
},
{
"name": "test_firewallDenyResetFilterHost_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_host,
"request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyResetFilterURL_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_url,
"request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_proxyDenyFilterHost_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_deny_protocol_http_filter_host,
"request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_proxyDenyFilterURL_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_deny_protocol_http_filter_url,
"request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_shaping_ratelimit_0bps_http",
"protocol_type": "http",
"test_function": self._shaping_case.rate_limit_0bps_protocol_http,
"request_content": "http://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M"
},
{
"name": "test_shaping_ratelimit_0bps_https",
"protocol_type": "https",
"test_function": self._shaping_case.rate_limit_0bps_protocol_https,
"request_content": "https://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M"
},
{
"name": "test_shaping_ratelimit_1000gbps_http",
"protocol_type": "http",
"test_function": self._shaping_case.rate_limit_1000gbps_protocol_http,
"request_content": "http://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M"
},
{
"name": "test_shaping_ratelimit_1000gbps_https",
"protocol_type": "https",
"test_function": self._shaping_case.rate_limit_1000gbps_protocol_https,
"request_content": "https://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M"
}
]
def run_cases(self):
if self._random_delay_seconds is not None:
time.sleep(self._random_delay_seconds)
run_count = 1
while True:
print("\nRUN %d" % run_count)
self.__run_cases_by_service_function_ids()
if (self._loop is not True) and self._count >= run_count:
break
time.sleep(self._interval_s)
run_count = run_count + 1
def __run_cases_by_service_function_ids(self):
sf_loader = ServiceFunctionConfigLoader(self._service_function_config_path, self._service_function_names_regexp)
for sf_pair in sf_loader.name_id_pairs:
start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(format(("Service function name: " + str(sf_pair["name"]) + ",Test start time: " + start_timestamp),'#^100s'))
self._run_cases_in_one_service_function_id(sf_pair["id"])
end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(format(("Service function name: " + str(sf_pair["name"]) + ",Test end time: " + end_timestamp),'=^100s'))
def _run_cases_in_one_service_function_id(self, service_function_id):
resolve_builder = ServerAddressBuilder(service_function_id)
exporter = ResultExportBuilder()
for case_info in self._cases_info:
if re.fullmatch(self._case_names_regexp, case_info["name"]):
self._run_one_case(case_info, resolve_builder, exporter)
exporter.export()
def _run_one_case(self, case_info, resolve_builder, exporter):
conn_timeout = self._get_conn_timeout_from_configs(case_info["name"])
max_recv_speed_large = self._get_max_recv_speed_large_from_configs(case_info["name"])
test_func = case_info.get("test_function")
if test_func:
if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https":
resolve = resolve_builder.read_resolves_by_url(case_info["request_content"])
ret = test_func(case_info["request_content"], resolve, conn_timeout, max_recv_speed_large)
if case_info["protocol_type"] == "dns":
ret = test_func(case_info["request_content"], resolve_builder.nameservers, conn_timeout)
exporter.append_case_result(case_info["name"], ret)
def _get_conn_timeout_from_configs(self, section) -> int:
return self._config_loader.get_value_by_section_and_key(section, "conn_timeout")
def _get_max_recv_speed_large_from_configs(self, section) -> int:
return self._config_loader.get_value_by_section_and_key(section, "max_recv_speed_large")
if __name__ == '__main__':
cmd_parser = CommandParser()
dign = DiagnoseCasesRunner(cmd_parser)
dign.run_cases()