This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-tsg-diagnose/images_build/client/dign_client/bin/client.py

1619 lines
69 KiB
Python
Raw Normal View History

2020-05-28 19:30:31 +08:00
import sys
2019-12-20 15:38:14 +08:00
import unittest
import json
import pycurl
import os
import re
import time
import io
2019-12-20 15:38:14 +08:00
from io import BytesIO
2024-05-16 17:21:02 +08:00
# import getopt
# import ciunittest
import argparse
import hashlib
2020-09-15 13:55:08 +08:00
from configparser import ConfigParser
import random
import dns.exception
import dns.resolver
import sys
import logging
2024-05-16 17:21:02 +08:00
import copy
from prettytable import PrettyTable,NONE,HEADER
import yaml
from urllib.parse import urlparse
from scapy.all import *
from scapy.layers.inet import IP, TCP
from scapy.sendrecv import AsyncSniffer
from urllib.parse import urlparse, parse_qs
2024-05-16 17:21:02 +08:00
class ConfigLoader:
def __init__(self, config_path: str):
self.__config_path = config_path
self.__configs = self._load_configs()
2024-05-16 17:21:02 +08:00
def _load_configs(self):
2024-05-16 17:21:02 +08:00
config_parser = ConfigParser()
config_parser.read(self.__config_path, encoding='UTF-8')
return config_parser.sections()
2024-05-16 17:21:02 +08:00
def get_value_by_section_and_key(self, section, key):
if section in self.__configs and key in self.__configs[section]:
value = self.__configs[section][key]
if isinstance(value, str) and value.isdigit():
return int(value)
else:
return value
else:
return None
class ServiceFunctionConfigLoader:
TABLE_NAME = "service_function_maps"
SF_NAME_KEY = "service_function_name"
DEVICE_ID_KEY = "device_id"
def __init__(self, config_path, name_pattern):
self._config_path = config_path
self._name_pattern = name_pattern
self._configs = None
self._load_configs()
self._pairs = self._read_name_id_pairs_by_name_pattern(self._name_pattern)
def _load_configs(self):
try:
with open(self._config_path, 'r') as f:
load_configs = yaml.safe_load(f)
if load_configs is not None:
self._configs = load_configs
else:
print(f"Error: No config loaded from path: {self._config_path}")
except FileNotFoundError:
print(f"Error: File not found: {self._config_path}")
except yaml.YAMLError as e:
print(f"Error parsing YAML file: {e}")
def _read_name_id_pairs_by_name_pattern(self, name_pattern):
if (self._configs is None) or (self.TABLE_NAME not in self._configs):
return []
return [
{"name": item[self.SF_NAME_KEY], "id": int(item[self.DEVICE_ID_KEY])}
for item in self._configs[self.TABLE_NAME]
if re.match(name_pattern, item[self.SF_NAME_KEY])
]
@property
def name_id_pairs(self):
return self._pairs
2024-05-16 17:21:02 +08:00
class CommandParser:
COUNT_MIN = 1
COUNT_MAX = 65535
INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$'
#INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
2024-05-16 17:21:02 +08:00
def __init__(self):
self.__start_up()
def __start_up(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose")
parser.add_argument('-i','--interval', type = int, default = 30,
help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.')
parser.add_argument('-c','--count', type = int, default = 1,
help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].')
parser.add_argument('--periodical', action='store_true', default = False,
help='Enable TSG diagnose periodical, exit when recv a signal.')
parser.add_argument('--service_function_names_regexp', type = str, default = '.+',
help = "Regexp of the service function names to run. Example: .+")
parser.add_argument('--service_function_config_path', type = str, default = '/opt/dign_client/share/service_function_maps.yaml',
help = "Specifies the service function config file, default /opt/dign_client/share/service_function_maps.yaml")
2024-05-16 17:21:02 +08:00
parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf',
help='Specifies the config file, default /opt/dign_client/etc/client.conf')
parser.add_argument('--case_names_regexp', type = str, default = '.+',
help='Regexp of the case names to run. Example: .+')
parser.add_argument('--enable_delay', action='store_true', default = False,
help='Enable a random delay between 1 and 30 seconds before running a case.')
parser.add_argument('--delay_left_edge', type = int, default = 1,
help='The seconds of the delay left edge. Default: 1.')
parser.add_argument('--delay_right_edge', type = int, default = 30,
help='The seconds of the delay right edge. Default: 30.')
2024-05-16 17:21:02 +08:00
# parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running')
args = parser.parse_args()
if not self.__is_legal_args(args):
parser.print_help()
sys.exit(1)
2024-05-16 17:21:02 +08:00
self.__interval_s = args.interval
self.__count = args.count
self.__loop = args.periodical
2024-05-16 17:21:02 +08:00
self.__config_path = args.config_path
self.__case_names_regexp = args.case_names_regexp
self.__service_function_names_regexp = args.service_function_names_regexp
self.__service_function_config_path = args.service_function_config_path
self._enable_delay = args.enable_delay
self._delay_left_edge = args.delay_left_edge
self._delay_right_edge = args.delay_right_edge
2024-05-16 17:21:02 +08:00
def __is_legal_args(self, args) -> bool:
if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX:
print("Error: the number of TSG diagnoses must in [1,65535]")
return False
if args.delay_left_edge > args.delay_right_edge:
print("Error: The delay in seconds on the left edge is greater than on the right edge.")
2024-05-16 17:21:02 +08:00
return True
@property
def interval_s(self):
return self.__interval_s
@property
def count(self):
return self.__count
@property
def loop(self):
return self.__loop
@property
def config_path(self):
return self.__config_path
@property
def case_names_regexp(self):
return r"{}".format(self.__case_names_regexp)
@property
def service_function_names_regexp(self):
return r"{}".format(self.__service_function_names_regexp)
@property
def service_function_config_path(self):
return self.__service_function_config_path
@property
def delay_seconds(self):
if self._enable_delay:
return random.randint(self._delay_left_edge, self._delay_right_edge)
else:
return None
2024-05-16 17:21:02 +08:00
class ServerAddressBuilder:
IPv4_4TH_OCTET_LEFT_EDGE = 101
IPv4_1_TO_3TH_OCTET = "192.0.2"
2024-05-16 17:21:02 +08:00
def __init__(self, service_function_index: int):
self._service_function_index = service_function_index
self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index
def read_resolves_by_url(self, url):
parsed_url = urlparse(url)
port = parsed_url.port
if not port:
if parsed_url.scheme == 'https':
port = 443
elif parsed_url.scheme == 'http':
port = 80
return [f"{parsed_url.hostname}:{port}:{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}"]
# @property
# def resolves(self) -> list:
# return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST]
2024-05-16 17:21:02 +08:00
@property
def nameservers(self) -> list:
return [f'{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}']
class TcpPacketsCapture:
DSCP_KEY = ["DSCP"]
def __init__(self, server_ip, server_port):
self._server_ip = server_ip
self._server_port = server_port
self._quadruple_to_dscp_table = {}
self._build_filter()
2024-06-04 09:51:42 +08:00
self._iface_names = get_if_list()
# def _read_server_ip_and_port(self):
def _build_filter(self):
self._filter = f"tcp and src host {self._server_ip} and src port {self._server_port}"
def start(self):
2024-06-04 09:51:42 +08:00
self._sniff_thread = AsyncSniffer(iface=self._iface_names, prn=self._packet_callback, filter=self._filter)
self._sniff_thread.start()
def _packet_callback(self, packet):
if IP in packet and TCP in packet:
src_ip = packet[IP].src
if src_ip == self._server_ip:
dscp_value = self._read_dscp_value_from_packet(packet)
quadruple = self._build_quadruple(packet, True)
self._update_quadruple_to_dscp_table(quadruple, dscp_value)
def _read_dscp_value_from_packet(self, packet):
ip_header = packet[IP]
return (ip_header.tos & 0xfc) >> 2
def _build_quadruple(self, packet, is_s2c):
src_ip = packet[IP].src
dst_ip = packet[IP].dst
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
if is_s2c:
return f"{dst_ip}:{dst_port},{src_ip}:{src_port}"
else:
return f"{src_ip}:{src_port},{dst_ip}:{dst_port}"
def _update_quadruple_to_dscp_table(self, quadruple, dscp_value):
self._quadruple_to_dscp_table[quadruple] = dscp_value
def stop(self):
self._sniff_thread.stop()
self._sniff_thread.join()
def read_dscp_value_by_quadruple(self, quadruple):
if quadruple in self._quadruple_to_dscp_table:
return self._quadruple_to_dscp_table[quadruple]
else:
return None
@staticmethod
def read_server_ip_and_port_from_resolve(resolves):
resolve = resolves[0]
resolve_split = resolve.split(":")
return resolve_split[2], resolve_split[1]
class TcpPacketsCaptureAssertion:
@staticmethod
def is_dscp_equal(actual_dscp, expected_dscp):
if actual_dscp is None:
return False, f"Error: Not read DSCP value."
if actual_dscp == expected_dscp:
return True, None
else:
return False, f"Error: Failed to verify DSCP value. Actual DSCP: {actual_dscp}, expected DSCP: {expected_dscp}."
2024-05-16 17:21:02 +08:00
class URLTransferBuilder:
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed: int):
2024-05-16 17:21:02 +08:00
self._url = url
self._request_resolve = request_resolve
self._conn_timeout = conn_timeout
self._max_recv_speed = max_recv_speed
2024-05-16 17:21:02 +08:00
self._conn = None
self._response_code = None
self._response_buffer = BytesIO()
self._error_info = None
self._size_download = None
self._local_ip = None
self._local_port = None
self._remote_ip = None
self._remote_port = None
2024-05-16 17:21:02 +08:00
def _setup_connection(self):
self._response_buffer = BytesIO()
self._conn = pycurl.Curl()
self._conn.setopt(self._conn.WRITEFUNCTION, self._response_buffer.write)
self._conn.setopt(self._conn.RESOLVE, self._request_resolve)
self._conn.setopt(self._conn.URL, self._url)
self._conn.setopt(self._conn.TIMEOUT, self._conn_timeout)
self._conn.setopt(self._conn.MAX_RECV_SPEED_LARGE, self._max_recv_speed)
2024-05-16 17:21:02 +08:00
def _perform_connection(self):
self._conn.perform()
self._response_code = self._conn.getinfo(self._conn.RESPONSE_CODE)
self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD)
self._local_ip = self._conn.getinfo(pycurl.LOCAL_IP)
self._local_port = self._conn.getinfo(pycurl.LOCAL_PORT)
self._remote_ip = self._conn.getinfo(pycurl.PRIMARY_IP)
self._remote_port = self._conn.getinfo(pycurl.PRIMARY_PORT)
2024-05-16 17:21:02 +08:00
def _close_connection(self):
self._conn.close()
def connect(self):
try:
2024-05-16 17:21:02 +08:00
self._setup_connection()
self._perform_connection()
except pycurl.error as error_info:
self._error_info = error_info
finally:
self._close_connection()
2024-05-16 17:21:02 +08:00
@property
def response_code(self):
return self._response_code
@property
def response_body(self):
return self._response_buffer.getvalue()
@property
def error_info(self):
return self._error_info
@property
def size_download(self):
return self._size_download
@property
def quadruple(self):
return f"{self._local_ip}:{self._local_port},{self._remote_ip}:{self._remote_port}"
2024-05-16 17:21:02 +08:00
class HttpURLTransferBuilder(URLTransferBuilder):
def _perform_connection(self):
super()._perform_connection()
class HttpsURLTransferBuilder(URLTransferBuilder):
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed: int):
super().__init__(url, request_resolve, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
self._certs_info = None
def _setup_connection(self):
super()._setup_connection()
self._conn.setopt(self._conn.OPT_CERTINFO, 1)
self._conn.setopt(self._conn.SSL_VERIFYPEER, False)
def _perform_connection(self):
super()._perform_connection()
self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO)
@property
def cert_issuer(self) -> str:
return self._get_cert_issuer(self._certs_info)
def _get_cert_issuer(self, certs) -> str:
if certs is None:
return ""
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info[1]
return issuer
class DNSQueryBuilder:
def __init__(self, domain: str, namesevers: list, conn_timeout: int):
self._domain = domain
self._nameservers = namesevers
self._conn_timeout = conn_timeout
self._dns_resolver = None
self._dns_answer = None
self._error_info = None
def _setup(self):
self._dns_resolver = dns.resolver.Resolver()
self._dns_resolver.nameservers = self._nameservers
self._dns_resolver.search = []
self._dns_resolver.use_search_by_default = False
self._dns_resolver.timeout = float(self._conn_timeout)
self._dns_resolver.lifetime = float(self._conn_timeout)
def _query(self, record_type):
try:
2024-05-16 17:21:02 +08:00
self._setup()
self._dns_answer = self._dns_resolver.query(self._domain, record_type)
except Exception as error_info:
self._error_info = error_info
@property
def error_info(self):
return self._error_info
@property
def rrset_ttl(self):
if self._dns_answer is not None:
return self._dns_answer.rrset.ttl
return None
@property
def response_answer(self):
if self._dns_answer is not None:
return self._dns_answer.response.answer
return None
2024-06-04 15:58:44 +08:00
@property
def rdtype_address_pairs(self):
rdtype_address_pairs = []
if self._dns_answer is not None:
for answer in self._dns_answer.response.answer:
for item in answer.items:
rdtype_address_pairs.append({"rdtype": item.rdtype, "address": item.address})
return rdtype_address_pairs
2024-05-16 17:21:02 +08:00
class DNSQueryTypeABuilder(DNSQueryBuilder):
def query(self):
self._query("A")
class DNSQueryTypeAAAABuilder(DNSQueryBuilder):
def query(self):
self._query("AAAA")
class URLTransferResponseAssertion:
@staticmethod
def is_cert_issuer_matched(cert_issuer, regex):
2024-06-04 15:58:44 +08:00
if cert_issuer is None:
return False, f"Error: Failed to verify cert issuer. Actual cert issuer is None."
if re.search(regex, cert_issuer, 0):
2024-05-16 17:21:02 +08:00
return True, None
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: Failed to verify cert issuer. Actual cert issuer: {cert_issuer}."
@staticmethod
def is_response_code_equal(actual_code, expected_code):
2024-06-04 15:58:44 +08:00
if actual_code == expected_code:
2024-05-16 17:21:02 +08:00
return True, None
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: Failed to verfiy response code. Actual code: {actual_code}, expected code: {expected_code}."
@staticmethod
def is_response_body_matched(response_body, regex):
2024-06-04 15:58:44 +08:00
response_body_utf8 = response_body.decode('utf-8')
if re.search(regex, response_body_utf8, 0):
2024-05-16 17:21:02 +08:00
return True, None
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: The response body fail to match regex: {regex}."
@staticmethod
def is_response_body_not_matched(response_body, regex):
2024-06-04 15:58:44 +08:00
response_body_utf8 = response_body.decode('utf-8')
if not re.search(regex, response_body_utf8, 0):
2024-05-16 17:21:02 +08:00
return True, None
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: The response body matched the regex: {regex}."
@staticmethod
def is_response_body_md5_equal(response_body, expected_md5):
2024-06-04 15:58:44 +08:00
response_body_md5_value = hashlib.md5(response_body).hexdigest()
if expected_md5 == response_body_md5_value:
2024-05-16 17:21:02 +08:00
return True, None
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: The response body md5 fail to match. Actual md5 value: {response_body_md5_value}."
@staticmethod
def is_download_size_equal(actual_size, expected_size):
2024-06-04 15:58:44 +08:00
if actual_size == expected_size:
2024-05-16 17:21:02 +08:00
return True, None
2019-12-20 15:38:14 +08:00
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: The response body download size fail to match. Actual size: {actual_size}."
2019-12-20 15:38:14 +08:00
@staticmethod
def is_pycurl_error_code_equal(error_info, expected_error_code):
2024-06-04 15:58:44 +08:00
if error_info is None:
2024-05-16 17:21:02 +08:00
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
2024-06-04 15:58:44 +08:00
if error_info.args[0] == expected_error_code:
2024-05-16 17:21:02 +08:00
return True, None
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: The erro code not equal to desired. Actual error info: {error_info}."
2019-12-20 15:38:14 +08:00
@staticmethod
def is_pycurl_error_none(error_info):
2024-06-04 15:58:44 +08:00
if error_info is None:
2024-05-16 17:21:02 +08:00
return True, None
2019-12-20 15:38:14 +08:00
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: The pycurl error is not None. Actual error info: {error_info}."
2024-05-16 17:21:02 +08:00
class DNSResponseAssertion:
@staticmethod
def is_error_info_none(error_info):
2024-06-04 15:58:44 +08:00
if error_info is None:
2024-05-16 17:21:02 +08:00
return True, None
2024-06-04 15:58:44 +08:00
return False, f"Error: The error info: {error_info}."
2024-05-16 17:21:02 +08:00
@staticmethod
def is_error_type_equal(error_info, expected_type):
2024-06-04 15:58:44 +08:00
if error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
2024-05-16 17:21:02 +08:00
2024-06-04 15:58:44 +08:00
if type(error_info) == expected_type:
2024-05-16 17:21:02 +08:00
return True, None
else:
2024-06-04 15:58:44 +08:00
return False, f"Error: error type not equal to {expected_type}, error info: {error_info}."
2024-05-16 17:21:02 +08:00
@staticmethod
def is_ttl_equal(actual_ttl, expected_ttl):
2024-06-04 15:58:44 +08:00
if actual_ttl == expected_ttl:
2024-05-16 17:21:02 +08:00
return True, None
2024-06-04 15:58:44 +08:00
return False, f"Error: Actual ttl(%s) not equal to %d." %(actual_ttl, expected_ttl)
2024-05-16 17:21:02 +08:00
@staticmethod
def is_ttl_in_range(actual_ttl, expected_left_edge, expected_right_edge):
2024-06-04 15:58:44 +08:00
if actual_ttl >= expected_left_edge and actual_ttl <= expected_right_edge:
2024-05-16 17:21:02 +08:00
return True, None
2024-06-04 15:58:44 +08:00
return False, f"Error: ttl(%d) not in [%d-%d]." %(actual_ttl, expected_left_edge, expected_right_edge)
@staticmethod
def is_rdtype_address_pair_included(rdtype_address_pairs, expected_rdtype, expected_address):
2024-06-04 15:58:44 +08:00
#rdtype 1: ipv4, 28: ipv6.
for pair in rdtype_address_pairs:
if pair["address"] == expected_address and pair["rdtype"] == expected_rdtype:
return True, None
return False, f"Expected rdtype address pair[{expected_rdtype}, {expected_address}] not in {rdtype_address_pairs}."
2024-05-16 17:21:02 +08:00
class ProxyCasesRunner:
@staticmethod
def action_intercept_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-28 17:12:46 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
2024-05-28 17:12:46 +08:00
return True, None
@staticmethod
def action_intercept_protocol_https_cert_error(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-28 17:12:46 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b')
if not status:
return False, info
2024-05-28 17:12:46 +08:00
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_1k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024)
if not status:
return False, info
return True, None
2024-05-28 17:12:46 +08:00
@staticmethod
def action_intercept_protocol_https_download_size_4k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 4)
if not status:
return False, info
return True, None
2024-05-28 17:12:46 +08:00
@staticmethod
def action_intercept_protocol_https_download_size_16k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 16)
if not status:
return False, info
return True, None
2024-05-28 17:12:46 +08:00
@staticmethod
def action_intercept_protocol_https_download_size_64k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 64)
if not status:
return False, info
return True, None
2024-05-28 17:12:46 +08:00
@staticmethod
def action_intercept_protocol_https_download_size_256k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 256)
if not status:
return False, info
return True, None
2024-05-28 17:12:46 +08:00
@staticmethod
def action_intercept_protocol_https_download_size_1M(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024)
if not status:
return False, info
return True, None
2024-05-28 17:12:46 +08:00
@staticmethod
def action_intercept_protocol_https_download_size_4M(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 4)
if not status:
return False, info
return True, None
2024-05-28 17:12:46 +08:00
@staticmethod
def action_intercept_protocol_https_download_size_16M(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 16)
if not status:
return False, info
return True, None
2024-05-28 17:12:46 +08:00
@staticmethod
def action_intercept_protocol_https_download_size_64M(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-28 17:12:46 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 64)
if not status:
return False, info
2024-05-28 17:12:46 +08:00
return True, None
@staticmethod
def action_redirect_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 302)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_redirect_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 302)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_block_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_block_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_replace_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_replace_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
status, info = URLTransferResponseAssertion.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_hijack_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_hijack_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_insert_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_insert_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_protocol_http_filter_host(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_protocol_http_filter_url(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
2024-05-17 11:52:49 +08:00
class ShapingCaseRunner:
@staticmethod
def rate_limit_0bps_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-17 11:52:49 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
2024-05-17 11:52:49 +08:00
return True, None
@staticmethod
def rate_limit_0bps_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-17 11:52:49 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
2024-05-17 11:52:49 +08:00
return True, None
@staticmethod
def rate_limit_1000gbps_protocol_http(url, resolves, conn_timeout, max_recv_speed):
server_ip, server_port = TcpPacketsCapture.read_server_ip_and_port_from_resolve(resolves)
capture = TcpPacketsCapture(server_ip, server_port)
capture.start()
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-17 11:52:49 +08:00
conn.connect()
capture.stop()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
actual_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
status, info = TcpPacketsCaptureAssertion.is_dscp_equal(actual_dscp, 8)
if not status:
return False, info
2024-05-17 11:52:49 +08:00
return True, None
@staticmethod
def rate_limit_1000gbps_protocol_https(url, resolves, conn_timeout, max_recv_speed):
server_ip, server_port = TcpPacketsCapture.read_server_ip_and_port_from_resolve(resolves)
capture = TcpPacketsCapture(server_ip, server_port)
capture.start()
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-17 11:52:49 +08:00
conn.connect()
capture.stop()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
actual_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
status, info = TcpPacketsCaptureAssertion.is_dscp_equal(actual_dscp, 8)
if not status:
return False, info
2024-05-17 11:52:49 +08:00
return True, None
2024-05-16 17:21:02 +08:00
class FirewallCasesRunner:
@staticmethod
def action_bypass_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_allow_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_drop_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_reset_protocol_http_filter_host(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56)
if not status:
return False, info
return True, None
2024-05-16 17:21:02 +08:00
@staticmethod
def action_deny_subaction_reset_protocol_http_filter_url(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56)
if not status:
return False, info
return True, None
2024-05-16 17:21:02 +08:00
@staticmethod
def action_deny_subaction_block_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 403)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r"dign-testing-deny-block")
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_allow_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_drop_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_reset_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 35)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_drop_protocol_dns(domain, nameservers, conn_timeout):
2024-05-16 17:21:02 +08:00
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_redirect_protocol_dns_type_a(domain, nameservers, conn_timeout):
2024-05-16 17:21:02 +08:00
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
status, info = DNSResponseAssertion.is_ttl_equal(request.rrset_ttl, 333)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_redirect_protocol_dns_type_aaaa(domain, nameservers, conn_timeout):
2024-05-16 17:21:02 +08:00
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
2024-05-16 17:21:02 +08:00
status, info = DNSResponseAssertion.is_ttl_equal(request.rrset_ttl, 333)
if not status:
return False, info
2019-12-20 15:38:14 +08:00
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not status:
return False, info
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
return True, None
2019-12-20 15:38:14 +08:00
@staticmethod
def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(domain, nameservers, conn_timeout):
2024-05-16 17:21:02 +08:00
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
2019-12-20 15:38:14 +08:00
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
2019-12-20 15:38:14 +08:00
status, info = DNSResponseAssertion.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not status:
return False, info
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not status:
return False, info
2019-12-20 15:38:14 +08:00
2024-05-16 17:21:02 +08:00
return True, None
@staticmethod
def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(domain, nameservers, conn_timeout):
2024-05-16 17:21:02 +08:00
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
status, info = DNSResponseAssertion.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not status:
return False, info
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not status:
return False, info
2024-05-16 17:21:02 +08:00
return True, None
class ResultExportBuilder:
2024-06-04 09:51:42 +08:00
COLUMN_0 = "Test cases"
COLUMN_1 = "Result"
COLUMN_2 = "Fail Reason"
RESULT_OK = "ok"
RESULT_FAIL = "FAIL"
DEFAULT_INFO = "-"
def __init__(self):
2024-05-16 17:21:02 +08:00
self._exporter = None
self._create_exporter()
2024-05-16 17:21:02 +08:00
def _create_exporter(self):
self._exporter = PrettyTable()
self._exporter.vrules = NONE
2024-06-04 09:51:42 +08:00
self._exporter.field_names = [self.COLUMN_0, self.COLUMN_1, self.COLUMN_2]
self._exporter.align[self.COLUMN_0] = "l"
self._exporter.align[self.COLUMN_1] = "l"
self._exporter.align[self.COLUMN_2] = "l"
2024-05-16 17:21:02 +08:00
def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True):
status, description = self._convert_case_result(case_result)
if enable_ouptut_by_case:
self._output_by_case_name(case_name, status)
2024-06-04 09:51:42 +08:00
#if not case_result[0]:
self._add_exporter_row(case_name, status, description)
2024-05-16 17:21:02 +08:00
def _convert_case_result(self, case_result):
2024-06-04 09:51:42 +08:00
status = self.RESULT_OK
2024-05-16 17:21:02 +08:00
if not case_result[0]:
2024-06-04 09:51:42 +08:00
status = self.RESULT_FAIL
2024-06-04 09:51:42 +08:00
description = self.DEFAULT_INFO
2024-05-16 17:21:02 +08:00
if case_result[1] is not None:
description = case_result[1]
2024-05-16 17:21:02 +08:00
return status, description
2024-05-16 17:21:02 +08:00
def _output_by_case_name(self, case_name, case_status):
print(f"{case_name} ... {case_status}")
2024-06-04 09:51:42 +08:00
def _add_exporter_row(self, column0: str, column1: str, column2: str):
self._exporter.add_row([column0, column1, column2])
2024-05-16 17:21:02 +08:00
def export(self):
if len(self._exporter._rows) > 0:
2024-06-04 09:51:42 +08:00
print("\nSummary:")
2024-05-16 17:21:02 +08:00
print(self._exporter)
class DiagnoseCasesRunner:
def __init__(self, cmd_parser: CommandParser):
self._loop = cmd_parser.loop
self._count = cmd_parser.count
self._interval_s = cmd_parser.interval_s
self._case_names_regexp = cmd_parser.case_names_regexp
self._config_path = cmd_parser.config_path
self._service_function_names_regexp = cmd_parser.service_function_names_regexp
self._service_function_config_path = cmd_parser.service_function_config_path
self._random_delay_seconds = cmd_parser.delay_seconds
2024-05-16 17:21:02 +08:00
self._config_loader = ConfigLoader(self._config_path)
self._cases_info = [
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_Allow_HTTP",
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_allow_protocol_http,
"request_content": "http://http.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_Allow_HTTPS",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": FirewallCasesRunner.action_allow_protocol_https,
"request_content": "https://sha512.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyReset_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http,
"request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyReset_HTTPS",
"protocol_type": "https",
"test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_https,
"request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyReset_FilterHost_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http_filter_host,
"request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyReset_FilterURL_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http_filter_url,
"request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyBlock_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_block_protocol_http,
"request_content": "http://http-login.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyDrop_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_http,
"request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 4,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyDrop_HTTPS",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_https,
"request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 4,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyDrop_DNS",
2024-05-16 17:21:02 +08:00
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_dns,
"request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website",
'conn_timeout': 3,
'max_recv_speed': 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyRedirect_A_DNS",
2024-05-16 17:21:02 +08:00
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_a,
"request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website",
"conn_timeout": 3,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyRedirect_AAAA_DNS",
2024-05-16 17:21:02 +08:00
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_aaaa,
"request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website",
"conn_timeout": 3,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyRedirect_ARangeTTL_DNS",
2024-05-16 17:21:02 +08:00
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl,
"request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website",
"conn_timeout": 3,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Firewall_DenyRedirect_AAAARangeTTL_DNS",
2024-05-16 17:21:02 +08:00
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl,
"request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website",
"conn_timeout": 3,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https,
"request_content": "https://sha256.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Intercept_HTTPS_CertExpired",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error,
"request_content": "https://expired.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Intercept_HTTPS_CertSelfSigned",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error,
"request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Intercept_HTTPS_CertUntrustedRoot",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error,
"request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Intercept_HTTPS_Response_1k",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_1k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS_Response_4k",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_4k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS_Response_16k",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_16k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS_Response_64k",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_64k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS_Response_256k",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_256k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS_Response_1M",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_1M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS_Response_4M",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_4M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS_Response_16M",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_16M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M",
"conn_timeout": 4,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Intercept_HTTPS_Response_64M",
2024-05-16 17:21:02 +08:00
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_64M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M",
"conn_timeout": 12,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Manipulation_Redirect_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_redirect_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Manipulation_Redirect_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_redirect_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Manipulation_Replace_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_replace_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Manipulation_Replace_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_replace_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Manipulation_Deny_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_block_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Manipulation_Deny_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_block_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Manipulation_Deny_FilterHost_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_deny_protocol_http_filter_host,
"request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Manipulation_Deny_FilterURL_HTTP",
2024-05-16 17:21:02 +08:00
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_deny_protocol_http_filter_url,
"request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-17 11:52:49 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Proxy_Manipulation_Hijack_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_hijack_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Manipulation_Hijack_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_hijack_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Manipulation_Insert_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_insert_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Proxy_Manipulation_Insert_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_insert_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html",
"conn_timeout": 1,
"max_recv_speed": 6553600
2024-05-28 17:12:46 +08:00
},
{
"name": "Shaping_RateLimit0bps_HTTP",
2024-05-17 11:52:49 +08:00
"protocol_type": "http",
"test_function": ShapingCaseRunner.rate_limit_0bps_protocol_http,
"request_content": "http://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M",
"conn_timeout": 4,
"max_recv_speed": 6553600
2024-05-17 11:52:49 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Shaping_RateLimit0bps_HTTPS",
2024-05-17 11:52:49 +08:00
"protocol_type": "https",
"test_function": ShapingCaseRunner.rate_limit_0bps_protocol_https,
"request_content": "https://testing-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/16M",
"conn_timeout": 4,
"max_recv_speed": 6553600
2024-05-17 11:52:49 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Shaping_RateLimit1000gbps_HTTP",
2024-05-17 11:52:49 +08:00
"protocol_type": "http",
"test_function": ShapingCaseRunner.rate_limit_1000gbps_protocol_http,
"request_content": "http://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M",
"conn_timeout": 4,
"max_recv_speed": 6553600
2024-05-17 11:52:49 +08:00
},
{
2024-05-28 17:12:46 +08:00
"name": "Shaping_RateLimit1000gbps_HTTPS",
2024-05-17 11:52:49 +08:00
"protocol_type": "https",
"test_function": ShapingCaseRunner.rate_limit_1000gbps_protocol_https,
"request_content": "https://testing-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/16M",
"conn_timeout": 4,
"max_recv_speed": 6553600
2024-05-16 17:21:02 +08:00
}
]
def run_cases(self):
if self._random_delay_seconds is not None:
time.sleep(self._random_delay_seconds)
2024-05-16 17:21:02 +08:00
run_count = 1
while True:
print("\nRUN %d" % run_count)
self.__run_cases_by_service_function_ids()
if (self._loop is not True) and self._count >= run_count:
break
time.sleep(self._interval_s)
run_count = run_count + 1
2024-05-16 17:21:02 +08:00
def __run_cases_by_service_function_ids(self):
sf_loader = ServiceFunctionConfigLoader(self._service_function_config_path, self._service_function_names_regexp)
for sf_pair in sf_loader.name_id_pairs:
2024-05-16 17:21:02 +08:00
start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(format(("Service function name: " + str(sf_pair["name"]) + ",Test start time: " + start_timestamp),'#^100s'))
self._run_cases_in_one_service_function_id(sf_pair["id"])
2024-05-16 17:21:02 +08:00
end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(format(("Service function name: " + str(sf_pair["name"]) + ",Test end time: " + end_timestamp),'=^100s'))
2024-05-16 17:21:02 +08:00
def _run_cases_in_one_service_function_id(self, service_function_id):
resolve_builder = ServerAddressBuilder(service_function_id)
2024-05-16 17:21:02 +08:00
exporter = ResultExportBuilder()
for case_info in self._cases_info:
if re.fullmatch(self._case_names_regexp, case_info["name"]):
self._run_one_case(case_info, resolve_builder, exporter)
2024-05-16 17:21:02 +08:00
exporter.export()
def _run_one_case(self, case_info, resolve_builder, exporter):
conn_timeout = self._get_conn_timeout_from_configs(case_info)
max_recv_speed = self._get_max_recv_speed_from_configs(case_info)
2024-05-16 17:21:02 +08:00
test_func = case_info.get("test_function")
if test_func:
if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https":
resolve = resolve_builder.read_resolves_by_url(case_info["request_content"])
ret = test_func(case_info["request_content"], resolve, conn_timeout, max_recv_speed)
2024-05-16 17:21:02 +08:00
if case_info["protocol_type"] == "dns":
ret = test_func(case_info["request_content"], resolve_builder.nameservers, conn_timeout)
2024-05-16 17:21:02 +08:00
exporter.append_case_result(case_info["name"], ret)
def _get_conn_timeout_from_configs(self, case_info):
conn_timeout = self._config_loader.get_value_by_section_and_key(case_info["name"], "conn_timeout")
if conn_timeout is not None:
return conn_timeout
else:
return case_info["conn_timeout"]
2024-05-16 17:21:02 +08:00
def _get_max_recv_speed_from_configs(self, case_info):
max_recv_speed = self._config_loader.get_value_by_section_and_key(case_info["name"], "max_recv_speed")
if max_recv_speed is not None:
return max_recv_speed
else:
return case_info["max_recv_speed"]
2020-09-15 13:55:08 +08:00
if __name__ == '__main__':
2024-05-16 17:21:02 +08:00
cmd_parser = CommandParser()
dign = DiagnoseCasesRunner(cmd_parser)
dign.run_cases()