This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-tsg-diagnose/images_build/client/dign_client/bin/client.py

1450 lines
66 KiB
Python
Raw Normal View History

2020-05-28 19:30:31 +08:00
import sys
2019-12-20 15:38:14 +08:00
import unittest
import json
import pycurl
import os
import re
import time
import io
2019-12-20 15:38:14 +08:00
from io import BytesIO
2024-05-16 17:21:02 +08:00
# import getopt
# import ciunittest
import argparse
import hashlib
2020-09-15 13:55:08 +08:00
from configparser import ConfigParser
import random
import dns.exception
import dns.resolver
import sys
import logging
2024-05-16 17:21:02 +08:00
import copy
from prettytable import PrettyTable,NONE,HEADER
import yaml
2024-05-16 17:21:02 +08:00
class ConfigLoader:
DEFAULT_CONFIGS = {
'test_firewallBypass_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyDrop_dns': {
2024-05-17 11:52:49 +08:00
'conn_timeout': 3,'max_recv_speed_large': 6553600
2024-05-16 17:21:02 +08:00
},
'test_firewallDenyRedirectA_dns': {
2024-05-17 11:52:49 +08:00
'conn_timeout': 3,'max_recv_speed_large': 6553600
2024-05-16 17:21:02 +08:00
},
'test_firewallDenyRedirectAAAA_dns': {
2024-05-17 11:52:49 +08:00
'conn_timeout': 3,'max_recv_speed_large': 6553600
2024-05-16 17:21:02 +08:00
},
'test_firewallDenyRedirectARangeTTL_dns': {
2024-05-17 11:52:49 +08:00
'conn_timeout': 3,'max_recv_speed_large': 6553600
2024-05-16 17:21:02 +08:00
},
'test_firewallDenyRedirectAAAARangeTTL_dns': {
2024-05-17 11:52:49 +08:00
'conn_timeout': 3,'max_recv_speed_large': 6553600
2024-05-16 17:21:02 +08:00
},
'test_dnsRequest_allow_rdtype_a': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_dnsRequest_allow_rdtype_aaaa': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_dnsRequest_allow_rdtype_cname': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslCerterrExpired': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslCerterrSelfsigned': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslCerterrUntrustedroot': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyRedirect_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyBlock_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyReplace_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyHijack_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyInsert_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyRedirect_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyBlock_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyReplace_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyHijack_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyInsert_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize1k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize4k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize16k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize64k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize256k': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize1M': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize4M': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallIntercept_sslDownloadSize16M': {
2024-05-17 11:52:49 +08:00
'conn_timeout': 4,'max_recv_speed_large': 6553600
2024-05-16 17:21:02 +08:00
},
'test_firewallIntercept_sslDownloadSize64M': {
2024-05-17 11:52:49 +08:00
'conn_timeout': 12,'max_recv_speed_large': 6553600
2024-05-16 17:21:02 +08:00
},
'test_firewallAllow_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyDrop_http': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_firewallDenyReset_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyBlock_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallAllow_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyDrop_ssl': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_firewallDenyReset_ssl': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyResetFilterHost_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_firewallDenyResetFilterURL_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyDenyFilterHost_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
'test_proxyDenyFilterURL_http': {
'conn_timeout': 1,'max_recv_speed_large': 6553600
},
2024-05-17 11:52:49 +08:00
'test_shaping_ratelimit_0bps_http': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_shaping_ratelimit_0bps_https': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_shaping_ratelimit_1000gbps_http': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
},
'test_shaping_ratelimit_1000gbps_https': {
'conn_timeout': 4,'max_recv_speed_large': 6553600
2024-05-16 17:21:02 +08:00
}
}
def __init__(self, config_path: str):
self.__config_path = config_path
self.__configs = copy.deepcopy(self.DEFAULT_CONFIGS)
self.__load_configs()
def __load_configs(self):
config_parser = ConfigParser()
config_parser.read(self.__config_path, encoding='UTF-8')
for section in config_parser.sections():
if section in self.__configs:
self.__configs[section].update(config_parser.items(section))
else:
self.__configs[section] = dict(config_parser.items(section))
2024-05-16 17:21:02 +08:00
def get_value_by_section_and_key(self, section, key):
if section in self.__configs and key in self.__configs[section]:
value = self.__configs[section][key]
if isinstance(value, str) and value.isdigit():
return int(value)
else:
return value
else:
2024-05-16 17:21:02 +08:00
raise KeyError("%s not in configs or %s.%s not in configs." % (section, section, key))
class ServiceFunctionConfigLoader:
TABLE_NAME = "service_function_maps"
SF_NAME_KEY = "service_function_name"
DEVICE_ID_KEY = "device_id"
def __init__(self, config_path, name_pattern):
self._config_path = config_path
self._name_pattern = name_pattern
self._configs = None
self._load_configs()
self._pairs = self._read_name_id_pairs_by_name_pattern(self._name_pattern)
def _load_configs(self):
try:
with open(self._config_path, 'r') as f:
load_configs = yaml.safe_load(f)
if load_configs is not None:
self._configs = load_configs
else:
print(f"Error: No config loaded from path: {self._config_path}")
except FileNotFoundError:
print(f"Error: File not found: {self._config_path}")
except yaml.YAMLError as e:
print(f"Error parsing YAML file: {e}")
def _read_name_id_pairs_by_name_pattern(self, name_pattern):
if (self._configs is None) or (self.TABLE_NAME not in self._configs):
return []
return [
{"name": item[self.SF_NAME_KEY], "id": int(item[self.DEVICE_ID_KEY])}
for item in self._configs[self.TABLE_NAME]
if re.match(name_pattern, item[self.SF_NAME_KEY])
]
@property
def name_id_pairs(self):
return self._pairs
2024-05-16 17:21:02 +08:00
class CommandParser:
COUNT_MIN = 1
COUNT_MAX = 65535
INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$'
#INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
2024-05-16 17:21:02 +08:00
def __init__(self):
self.__start_up()
def __start_up(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose")
parser.add_argument('-i','--interval', type = int, default = 30,
help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.')
parser.add_argument('-c','--count', type = int, default = 1,
help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].')
parser.add_argument('-l','--loop', action='store_true', default = False,
help='Enable TSG diagnose loop, exit when recv a signal.')
parser.add_argument('--service_function_names_regexp', type = str, default = '.+',
help = "Regexp of the case names to run. Example: .+")
parser.add_argument('--service_function_config_path', type = str, default = '/opt/dign_client/share/service_function_maps.yaml',
help = "Specifies the service function config file, default /opt/dign_client/share/service_function_maps.yaml")
2024-05-16 17:21:02 +08:00
parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf',
help='Specifies the config file, default /opt/dign_client/etc/client.conf')
parser.add_argument('--case_names_regexp', type = str, default = '.+',
help='Regexp of the case names to run. Example: .+')
parser.add_argument('--enable_delay', action='store_true', default = False,
help='Enable a random delay between 1 and 30 seconds before running a case.')
parser.add_argument('--delay_left_edge', type = int, default = 1,
help='The seconds of the delay left edge. Default: 1.')
parser.add_argument('--delay_right_edge', type = int, default = 30,
help='The seconds of the delay right edge. Default: 30.')
2024-05-16 17:21:02 +08:00
# parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running')
args = parser.parse_args()
if not self.__is_legal_args(args):
parser.print_help()
sys.exit(1)
2024-05-16 17:21:02 +08:00
self.__interval_s = args.interval
self.__count = args.count
self.__loop = args.loop
self.__config_path = args.config_path
self.__case_names_regexp = args.case_names_regexp
self.__service_function_names_regexp = args.service_function_names_regexp
self.__service_function_config_path = args.service_function_config_path
self._enable_delay = args.enable_delay
self._delay_left_edge = args.delay_left_edge
self._delay_right_edge = args.delay_right_edge
2024-05-16 17:21:02 +08:00
def __is_legal_args(self, args) -> bool:
if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX:
print("Error: the number of TSG diagnoses must in [1,65535]")
return False
if args.delay_left_edge > args.delay_right_edge:
print("Error: The delay in seconds on the left edge is greater than on the right edge.")
2024-05-16 17:21:02 +08:00
return True
@property
def interval_s(self):
return self.__interval_s
@property
def count(self):
return self.__count
@property
def loop(self):
return self.__loop
@property
def config_path(self):
return self.__config_path
@property
def case_names_regexp(self):
return r"{}".format(self.__case_names_regexp)
@property
def service_function_names_regexp(self):
return r"{}".format(self.__service_function_names_regexp)
@property
def service_function_config_path(self):
return self.__service_function_config_path
@property
def delay_seconds(self):
if self._enable_delay:
return random.randint(self._delay_left_edge, self._delay_right_edge)
else:
return None
2024-05-16 17:21:02 +08:00
class ServerAddressBuilder:
IPv4_4TH_OCTET_LEFT_EDGE = 101
DOMAIN_TO_PORT_LIST = [
('sha384.badssl.selftest.gdnt-cloud.website', 443),
('sha256.badssl.selftest.gdnt-cloud.website', 443),
('expired.badssl.selftest.gdnt-cloud.website', 443),
('self-signed.badssl.selftest.gdnt-cloud.website', 443),
('untrusted-root.badssl.selftest.gdnt-cloud.website', 443),
('web-replay.badssl.selftest.gdnt-cloud.website', 80),
('web-replay.badssl.selftest.gdnt-cloud.website', 443),
('testing-download.badssl.selftest.gdnt-cloud.website', 443),
('http.badssl.selftest.gdnt-cloud.website', 80),
('http-credit-card.badssl.selftest.gdnt-cloud.website', 80),
('http-dynamic-login.badssl.selftest.gdnt-cloud.website', 80),
('http-login.badssl.selftest.gdnt-cloud.website', 80),
('sha512.badssl.selftest.gdnt-cloud.website', 443),
('rsa2048.badssl.selftest.gdnt-cloud.website', 443),
('rsa4096.badssl.selftest.gdnt-cloud.website', 443),
('testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website', 80),
('testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website', 80),
('testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website', 80),
2024-05-17 11:52:49 +08:00
('testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website', 80),
('testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website', 80),
('testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website', 443),
('testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website', 80),
('testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website', 443)
2024-05-16 17:21:02 +08:00
]
def __init__(self, service_function_index: int):
self._service_function_index = service_function_index
self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index
@property
def resolves(self) -> list:
return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST]
@property
def nameservers(self) -> list:
return ['192.0.2.%d' % self._ipv4_4th_octet]
class URLTransferBuilder:
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int):
self._url = url
self._request_resolve = request_resolve
self._conn_timeout = conn_timeout
self._max_recv_speed_large = max_recv_speed_large
self._conn = None
self._response_code = None
self._response_buffer = BytesIO()
self._error_info = None
self._size_download = None
def _setup_connection(self):
self._response_buffer = BytesIO()
self._conn = pycurl.Curl()
self._conn.setopt(self._conn.WRITEFUNCTION, self._response_buffer.write)
self._conn.setopt(self._conn.RESOLVE, self._request_resolve)
self._conn.setopt(self._conn.URL, self._url)
self._conn.setopt(self._conn.TIMEOUT, self._conn_timeout)
self._conn.setopt(self._conn.MAX_RECV_SPEED_LARGE, self._max_recv_speed_large)
def _perform_connection(self):
self._conn.perform()
self._response_code = self._conn.getinfo(self._conn.RESPONSE_CODE)
self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD)
def _close_connection(self):
self._conn.close()
def connect(self):
try:
2024-05-16 17:21:02 +08:00
self._setup_connection()
self._perform_connection()
except pycurl.error as error_info:
self._error_info = error_info
finally:
self._close_connection()
2024-05-16 17:21:02 +08:00
@property
def response_code(self):
return self._response_code
@property
def response_body(self):
return self._response_buffer.getvalue()
@property
def error_info(self):
return self._error_info
@property
def size_download(self):
return self._size_download
class HttpURLTransferBuilder(URLTransferBuilder):
def _perform_connection(self):
super()._perform_connection()
class HttpsURLTransferBuilder(URLTransferBuilder):
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int):
super().__init__(url, request_resolve, conn_timeout, max_recv_speed_large)
self._certs_info = None
def _setup_connection(self):
super()._setup_connection()
self._conn.setopt(self._conn.OPT_CERTINFO, 1)
self._conn.setopt(self._conn.SSL_VERIFYPEER, False)
def _perform_connection(self):
super()._perform_connection()
self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO)
@property
def cert_issuer(self) -> str:
return self._get_cert_issuer(self._certs_info)
def _get_cert_issuer(self, certs) -> str:
if certs is None:
return ""
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info[1]
return issuer
class DNSQueryBuilder:
def __init__(self, domain: str, namesevers: list, conn_timeout: int):
self._domain = domain
self._nameservers = namesevers
self._conn_timeout = conn_timeout
self._dns_resolver = None
self._dns_answer = None
self._error_info = None
def _setup(self):
self._dns_resolver = dns.resolver.Resolver()
self._dns_resolver.nameservers = self._nameservers
self._dns_resolver.search = []
self._dns_resolver.use_search_by_default = False
self._dns_resolver.timeout = float(self._conn_timeout)
self._dns_resolver.lifetime = float(self._conn_timeout)
def _query(self, record_type):
try:
2024-05-16 17:21:02 +08:00
self._setup()
self._dns_answer = self._dns_resolver.query(self._domain, record_type)
except Exception as error_info:
self._error_info = error_info
@property
def error_info(self):
return self._error_info
@property
def rrset_ttl(self):
if self._dns_answer is not None:
return self._dns_answer.rrset.ttl
return None
@property
def response_answer(self):
if self._dns_answer is not None:
return self._dns_answer.response.answer
return None
class DNSQueryTypeABuilder(DNSQueryBuilder):
def query(self):
self._query("A")
class DNSQueryTypeAAAABuilder(DNSQueryBuilder):
def query(self):
self._query("AAAA")
class URLTransferResponseAnalyzer:
def is_cert_issuer_matched(self, present_cert_issuer, desired_pattern):
if present_cert_issuer is None:
return False, f"Error: Failed to verify cert issuer. Present cert issuer is None."
if re.search(desired_pattern, present_cert_issuer, 0):
return True, None
else:
return False, f"Error: Failed to verify cert issuer. Present cert issuer: {present_cert_issuer}."
2024-05-16 17:21:02 +08:00
def is_response_code_equal(self, present_code, desired_code):
if present_code == desired_code:
return True, None
else:
2024-05-16 17:21:02 +08:00
return False, f"Error: Failed to verfiy response code. Present code: {present_code}, desired code: {desired_code}."
2024-05-16 17:21:02 +08:00
def is_response_body_matched(self, present_body, desired_pattern):
present_body_utf8 = present_body.decode('utf-8')
if re.search(desired_pattern, present_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body fail to match the desired content."
2024-05-16 17:21:02 +08:00
def is_response_body_not_matched(self, present_body, desired_pattern):
present_body_utf8 = present_body.decode('utf-8')
if not re.search(desired_pattern, present_body_utf8, 0):
return True, None
else:
2024-05-16 17:21:02 +08:00
return False, f"Error: The response body matched the desired content: {desired_pattern}."
2024-05-16 17:21:02 +08:00
def is_response_body_md5_equal(self, present_body, disired_md5):
present_body_md5_value = hashlib.md5(present_body).hexdigest()
if re.search(disired_md5, present_body_md5_value, 0):
return True, None
else:
2024-05-16 17:21:02 +08:00
return False, f"Error: The response body md5 fail to match. Present md5 value: {present_body_md5_value}."
2024-05-16 17:21:02 +08:00
def is_download_size_equal(self, present_size, disired_size):
if present_size == disired_size:
return True, None
2019-12-20 15:38:14 +08:00
else:
2024-05-16 17:21:02 +08:00
return False, f"Error: The response body download size fail to match. present_size: {present_size}."
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
def is_pycurl_error_code_equal(self, present_error_info, disired_error_code):
if present_error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
if present_error_info.args[0] == disired_error_code:
return True, None
else:
return False, f"Error: The erro code not equal to desired. Present error info: {present_error_info}."
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
def is_pycurl_error_none(self, present_error_info):
if present_error_info is None:
return True, None
2019-12-20 15:38:14 +08:00
else:
2024-05-16 17:21:02 +08:00
return False, f"Error: The pycurl error is not None. Present error info: {present_error_info}."
class DNSResponseAnalyzer:
def is_error_info_none(self, present_error_info):
if present_error_info is None:
return True, None
return False, f"Error: The error info: {present_error_info}."
def is_error_type_equal(self, present_error_info, desired_type):
if present_error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
2024-05-16 17:21:02 +08:00
if type(present_error_info) == desired_type:
return True, None
else:
return False, f"Error: error type not equal to {desired_type}, error info: {present_error_info}."
def is_ttl_equal(self, present_ttl, desired_ttl):
if present_ttl == desired_ttl:
return True, None
return False, f"Error: Present ttl(%s) not equal to %d." %(present_ttl, desired_ttl)
def is_ttl_in_range(self, present_ttl, desired_left_edge, desired_right_edge):
if present_ttl >= desired_left_edge and present_ttl <= desired_right_edge:
return True, None
return False, f"Error: ttl(%d) not in [%d-%d]." %(present_ttl, desired_left_edge, desired_right_edge)
def is_address_equal(self, response_answer, desired_address, desired_address_type):
if desired_address_type == "ipv4":
rdtype = 1
if desired_address_type == "ipv6":
rdtype = 28
for i in response_answer:
for j in i.items:
if j.rdtype == rdtype: # 1: ipv4, 28: ipv6
if j.address == desired_address:
return True, None
else:
return False, f"Error: Present address error."
else:
return False, f"Error: Response rdtype error."
class ProxyCasesRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
def action_redirect_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
2024-05-16 17:21:02 +08:00
desired_cert_issuer_pattern = r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b'
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302)
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_redirect_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302)
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_block_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
def action_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
def action_replace_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not is_body_matched[0]:
return False, is_body_matched[1]
is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not is_body_matched[0]:
return False, is_body_not_matched[1]
return True, None
def action_replace_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not is_body_matched[0]:
return False, is_body_matched[1]
is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not is_body_matched[0]:
return False, is_body_not_matched[1]
return True, None
def action_hijack_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_hijack_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_insert_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not is_json_key_matched[0]:
return False, is_json_key_matched[1]
is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not is_json_val_matched[0]:
return False, is_json_val_matched[1]
return True, None
def action_insert_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not is_json_key_matched[0]:
return False, is_json_key_matched[1]
is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not is_json_val_matched[0]:
return False, is_json_key_matched[1]
return True, None
def action_deny_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large):
2024-05-17 11:52:49 +08:00
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
2024-05-16 17:21:02 +08:00
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host')
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
def action_deny_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url')
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
2024-05-17 11:52:49 +08:00
class ShapingCaseRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
self._dns_analyzer = DNSResponseAnalyzer()
def rate_limit_0bps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def rate_limit_0bps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def rate_limit_1000gbps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
2024-05-17 11:52:49 +08:00
return True, None
def rate_limit_1000gbps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
2024-05-17 11:52:49 +08:00
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
return True, None
2024-05-16 17:21:02 +08:00
class FirewallCasesRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
self._dns_analyzer = DNSResponseAnalyzer()
def action_bypass_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
return True, None
def action_intercept_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
return True, None
def action_intercept_protocol_https_cert_error(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
return True, None
def action_allow_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_deny_subaction_drop_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_deny_subaction_reset_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 56)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_deny_subaction_reset_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large):
return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large)
def action_deny_subaction_reset_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large):
return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large)
def action_deny_subaction_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 403)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r"dign-testing-deny-block")
if not is_body_matched[0]:
return False, is_body_matched[1]
return True, None
def action_allow_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
return True, None
def action_deny_subaction_drop_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_deny_subaction_reset_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 35)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_intercept_protocol_https_download_size_1k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024)
def action_intercept_protocol_https_download_size_4k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 4)
def action_intercept_protocol_https_download_size_16k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 16)
def action_intercept_protocol_https_download_size_64k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 64)
def action_intercept_protocol_https_download_size_256k(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 256)
def action_intercept_protocol_https_download_size_1M(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024)
def action_intercept_protocol_https_download_size_4M(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 4)
def action_intercept_protocol_https_download_size_16M(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 16)
def action_intercept_protocol_https_download_size_64M(self, url, resolves, conn_timeout, max_recv_speed_large):
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 64)
def _action_intercept_protocol_ssl_by_download_size(self, url, resolves, conn_timeout, max_recv_speed_large, download_size):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_download_size_equal = self._analyzer.is_download_size_equal(conn.size_download, download_size)
if not is_download_size_equal[0]:
return False, is_download_size_equal[1]
return True, None
def action_deny_subaction_drop_protocol_dns(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_type_equal = self._dns_analyzer.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
return True, None
def action_deny_subaction_redirect_protocol_dns_type_a(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4")
if not is_address_equal[0]:
return False, is_address_equal[1]
return True, None
def action_deny_subaction_redirect_protocol_dns_type_aaaa(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6")
if not is_address_equal[0]:
return False, is_address_equal[1]
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
return True, None
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
2024-05-16 17:21:02 +08:00
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4")
if not is_address_equal[0]:
return False, is_address_equal[1]
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
return True, None
2024-05-16 17:21:02 +08:00
def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
2024-05-16 17:21:02 +08:00
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
2024-05-16 17:21:02 +08:00
is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
2024-05-16 17:21:02 +08:00
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6")
if not is_address_equal[0]:
return False, is_address_equal[1]
return True, None
class ResultExportBuilder:
def __init__(self):
2024-05-16 17:21:02 +08:00
self._exporter = None
self._create_exporter()
2024-05-16 17:21:02 +08:00
def _create_exporter(self):
self._exporter = PrettyTable()
self._exporter.vrules = NONE
self._exporter.field_names = ["Case name", "Status", "Description"]
self._exporter.align["Case name"] = "l"
self._exporter.align["Status"] = "l"
self._exporter.align["Description"] = "l"
2024-05-16 17:21:02 +08:00
def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True):
status, description = self._convert_case_result(case_result)
if enable_ouptut_by_case:
self._output_by_case_name(case_name, status)
2024-05-16 17:21:02 +08:00
if not case_result[0]:
self._add_exporter_row(case_name, status, description)
2024-05-16 17:21:02 +08:00
def _convert_case_result(self, case_result):
status = "ok"
if not case_result[0]:
status = "FAIL"
2024-05-16 17:21:02 +08:00
description = "-"
if case_result[1] is not None:
description = case_result[1]
2024-05-16 17:21:02 +08:00
return status, description
2024-05-16 17:21:02 +08:00
def _output_by_case_name(self, case_name, case_status):
print(f"{case_name} ... {case_status}")
2024-05-16 17:21:02 +08:00
def _add_exporter_row(self, case_name: str, status: str, description: str):
self._exporter.add_row([case_name, status, description])
def export(self):
if len(self._exporter._rows) > 0:
print(self._exporter)
class DiagnoseCasesRunner:
def __init__(self, cmd_parser: CommandParser):
self._loop = cmd_parser.loop
self._count = cmd_parser.count
self._interval_s = cmd_parser.interval_s
self._case_names_regexp = cmd_parser.case_names_regexp
self._config_path = cmd_parser.config_path
self._service_function_names_regexp = cmd_parser.service_function_names_regexp
self._service_function_config_path = cmd_parser.service_function_config_path
self._random_delay_seconds = cmd_parser.delay_seconds
2024-05-16 17:21:02 +08:00
self._config_loader = ConfigLoader(self._config_path)
self._proxy_case = ProxyCasesRunner()
self._firewall_case = FirewallCasesRunner()
2024-05-17 11:52:49 +08:00
self._shaping_case = ShapingCaseRunner()
2024-05-16 17:21:02 +08:00
self._cases_info = [
{
"name": "test_firewallBypass_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_bypass_protocol_https,
"request_content": "https://sha384.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https,
"request_content": "https://sha256.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_sslCerterrExpired",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
"request_content": "https://expired.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_sslCerterrSelfsigned",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
"request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_sslCerterrUntrustedroot",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
"request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_proxyRedirect_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_redirect_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js"
},
{
"name": "test_proxyBlock_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_block_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js"
},
{
"name": "test_proxyReplace_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_replace_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js"
},
{
"name": "test_proxyHijack_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_hijack_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js"
},
{
"name": "test_proxyInsert_ssl",
"protocol_type": "https",
"test_function": self._proxy_case.action_insert_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html"
},
{
"name": "test_proxyRedirect_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_redirect_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js"
},
{
"name": "test_proxyBlock_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_block_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js"
},
{
"name": "test_proxyReplace_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_replace_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js"
},
{
"name": "test_proxyHijack_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_hijack_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js"
},
{
"name": "test_proxyInsert_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_insert_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html"
},
{
"name": "test_firewallAllow_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_allow_protocol_http,
"request_content": "http://http.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyDrop_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_http,
"request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyReset_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http,
"request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyBlock_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_block_protocol_http,
"request_content": "http://http-login.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallAllow_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_allow_protocol_https,
"request_content": "https://sha512.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyDrop_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_https,
"request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyReset_ssl",
"protocol_type": "https",
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_https,
"request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyDrop_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_dns,
"request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyRedirectA_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a,
"request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyRedirectAAAA_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa,
"request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyRedirectARangeTTL_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl,
"request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyRedirectAAAARangeTTL_dns",
"protocol_type": "dns",
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl,
"request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallIntercept_sslDownloadSize1k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_1k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k"
},
{
"name": "test_firewallIntercept_sslDownloadSize4k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_4k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k"
},
{
"name": "test_firewallIntercept_sslDownloadSize16k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_16k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k"
},
{
"name": "test_firewallIntercept_sslDownloadSize64k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_64k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k"
},
{
"name": "test_firewallIntercept_sslDownloadSize256k",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_256k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k"
},
{
"name": "test_firewallIntercept_sslDownloadSize1M",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_1M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M"
},
{
"name": "test_firewallIntercept_sslDownloadSize4M",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_4M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M"
},
{
"name": "test_firewallIntercept_sslDownloadSize16M",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_16M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M"
},
{
"name": "test_firewallIntercept_sslDownloadSize64M",
"protocol_type": "https",
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_64M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M"
},
{
"name": "test_firewallDenyResetFilterHost_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_host,
"request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_firewallDenyResetFilterURL_http",
"protocol_type": "http",
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_url,
"request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_proxyDenyFilterHost_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_deny_protocol_http_filter_host,
"request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website"
},
{
"name": "test_proxyDenyFilterURL_http",
"protocol_type": "http",
"test_function": self._proxy_case.action_deny_protocol_http_filter_url,
"request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website"
2024-05-17 11:52:49 +08:00
},
{
"name": "test_shaping_ratelimit_0bps_http",
"protocol_type": "http",
"test_function": self._shaping_case.rate_limit_0bps_protocol_http,
"request_content": "http://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M"
},
{
"name": "test_shaping_ratelimit_0bps_https",
"protocol_type": "https",
"test_function": self._shaping_case.rate_limit_0bps_protocol_https,
"request_content": "https://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M"
},
{
"name": "test_shaping_ratelimit_1000gbps_http",
"protocol_type": "http",
"test_function": self._shaping_case.rate_limit_1000gbps_protocol_http,
"request_content": "http://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M"
},
{
"name": "test_shaping_ratelimit_1000gbps_https",
"protocol_type": "https",
"test_function": self._shaping_case.rate_limit_1000gbps_protocol_https,
"request_content": "https://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M"
2024-05-16 17:21:02 +08:00
}
]
def run_cases(self):
if self._random_delay_seconds is not None:
time.sleep(self._random_delay_seconds)
2024-05-16 17:21:02 +08:00
run_count = 1
while True:
print("\nRUN %d" % run_count)
self.__run_cases_by_service_function_ids()
if (self._loop is not True) and self._count >= run_count:
break
time.sleep(self._interval_s)
run_count = run_count + 1
2024-05-16 17:21:02 +08:00
def __run_cases_by_service_function_ids(self):
sf_loader = ServiceFunctionConfigLoader(self._service_function_config_path, self._service_function_names_regexp)
for sf_pair in sf_loader.name_id_pairs:
2024-05-16 17:21:02 +08:00
start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(format(("Service function name: " + str(sf_pair["name"]) + ",Test start time: " + start_timestamp),'#^100s'))
self._run_cases_in_one_service_function_id(sf_pair["id"])
2024-05-16 17:21:02 +08:00
end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(format(("Service function name: " + str(sf_pair["name"]) + ",Test end time: " + end_timestamp),'=^100s'))
2024-05-16 17:21:02 +08:00
def _run_cases_in_one_service_function_id(self, service_function_id):
resolve_Builder = ServerAddressBuilder(service_function_id)
exporter = ResultExportBuilder()
for case_info in self._cases_info:
if re.fullmatch(self._case_names_regexp, case_info["name"]):
self._run_one_case(case_info, resolve_Builder, exporter)
exporter.export()
def _run_one_case(self, case_info, resolve_Builder, exporter):
conn_timeout = self._get_conn_timeout_from_configs(case_info["name"])
max_recv_speed_large = self._get_max_recv_speed_large_from_configs(case_info["name"])
test_func = case_info.get("test_function")
if test_func:
if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https":
ret = test_func(case_info["request_content"], resolve_Builder.resolves, conn_timeout, max_recv_speed_large)
if case_info["protocol_type"] == "dns":
ret = test_func(case_info["request_content"], resolve_Builder.nameservers, conn_timeout)
exporter.append_case_result(case_info["name"], ret)
def _get_conn_timeout_from_configs(self, section) -> int:
return self._config_loader.get_value_by_section_and_key(section, "conn_timeout")
def _get_max_recv_speed_large_from_configs(self, section) -> int:
return self._config_loader.get_value_by_section_and_key(section, "max_recv_speed_large")
2020-09-15 13:55:08 +08:00
if __name__ == '__main__':
2024-05-16 17:21:02 +08:00
cmd_parser = CommandParser()
dign = DiagnoseCasesRunner(cmd_parser)
dign.run_cases()