This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-tsg-diagnose/images_build/client/dign_client/bin/client.py
2024-06-11 18:48:02 +08:00

1694 lines
72 KiB
Python

import sys
import unittest
import json
import pycurl
import os
import re
import time
import io
from io import BytesIO
# import getopt
# import ciunittest
import argparse
import hashlib
from configparser import ConfigParser
import random
import dns.exception
import dns.resolver
import sys
import logging
import copy
from prettytable import PrettyTable,NONE,HEADER
import yaml
from urllib.parse import urlparse
from scapy.all import *
from scapy.layers.inet import IP, TCP
from scapy.sendrecv import AsyncSniffer
from urllib.parse import urlparse, parse_qs
class ConfigLoader:
def __init__(self, config_path: str):
self._config_path = config_path
self._configs = {}
if os.path.exists(self._config_path):
self._load_configs()
def _load_configs(self):
config_parser = ConfigParser()
config_parser.read(self._config_path, encoding='UTF-8')
return config_parser.sections()
def get_value_by_section_and_key(self, section, key):
if section in self._configs and key in self._configs[section]:
value = self._configs[section][key]
if isinstance(value, str) and value.isdigit():
return int(value)
else:
return value
else:
return None
class ServiceFunctionConfigLoader:
TABLE_NAME = "service_function_maps"
SF_NAME_KEY = "service_function_name"
DEVICE_ID_KEY = "device_id"
def __init__(self, config_path, name_pattern):
self._config_path = config_path
self._name_pattern = name_pattern
self._configs = None
self._load_configs()
self._pairs = self._read_name_id_pairs_by_name_pattern(self._name_pattern)
def _load_configs(self):
try:
with open(self._config_path, 'r') as f:
load_configs = yaml.safe_load(f)
if load_configs is not None:
self._configs = load_configs
else:
print(f"Error: No config loaded from path: {self._config_path}")
except FileNotFoundError:
print(f"Error: File not found: {self._config_path}")
except yaml.YAMLError as e:
print(f"Error parsing YAML file: {e}")
def _read_name_id_pairs_by_name_pattern(self, name_pattern):
if (self._configs is None) or (self.TABLE_NAME not in self._configs):
return []
return [
{"name": item[self.SF_NAME_KEY], "id": int(item[self.DEVICE_ID_KEY])}
for item in self._configs[self.TABLE_NAME]
if re.match(name_pattern, item[self.SF_NAME_KEY])
]
@property
def name_id_pairs(self):
return self._pairs
class CommandParser:
COUNT_MIN = 1
COUNT_MAX = 65535
INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$'
#INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
def __init__(self):
self.__start_up()
def __start_up(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose")
parser.add_argument('-i','--interval', type = int, default = 30,
help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.')
parser.add_argument('-c','--count', type = int, default = 1,
help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].')
parser.add_argument('--periodical', action='store_true', default = False,
help='Enable TSG diagnose periodical, exit when recv a signal.')
parser.add_argument('--service_function_names_regexp', type = str, default = '.+',
help = "Regexp of the service function names to run. Example: .+")
parser.add_argument('--service_function_config_path', type = str, default = '/opt/dign_client/share/service_function_maps.yaml',
help = "Specifies the service function config file, default /opt/dign_client/share/service_function_maps.yaml")
parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf',
help='Specifies the config file, default /opt/dign_client/etc/client.conf')
parser.add_argument('--case_names_regexp', type = str, default = '.+',
help='Regexp of the case names to run. Example: .+')
parser.add_argument('--enable_delay', action='store_true', default = False,
help='Enable a random delay between 1 and 30 seconds before running a case.')
parser.add_argument('--delay_left_edge', type = int, default = 1,
help='The seconds of the delay left edge. Default: 1.')
parser.add_argument('--delay_right_edge', type = int, default = 30,
help='The seconds of the delay right edge. Default: 30.')
# parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running')
args = parser.parse_args()
if not self.__is_legal_args(args):
parser.print_help()
sys.exit(1)
self.__interval_s = args.interval
self.__count = args.count
self.__loop = args.periodical
self.__config_path = args.config_path
self.__case_names_regexp = args.case_names_regexp
self.__service_function_names_regexp = args.service_function_names_regexp
self.__service_function_config_path = args.service_function_config_path
self._enable_delay = args.enable_delay
self._delay_left_edge = args.delay_left_edge
self._delay_right_edge = args.delay_right_edge
def __is_legal_args(self, args) -> bool:
if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX:
print("Error: the number of TSG diagnoses must in [1,65535]")
return False
if args.delay_left_edge > args.delay_right_edge:
print("Error: The delay in seconds on the left edge is greater than on the right edge.")
return True
@property
def interval_s(self):
return self.__interval_s
@property
def count(self):
return self.__count
@property
def loop(self):
return self.__loop
@property
def config_path(self):
return self.__config_path
@property
def case_names_regexp(self):
return r"{}".format(self.__case_names_regexp)
@property
def service_function_names_regexp(self):
return r"{}".format(self.__service_function_names_regexp)
@property
def service_function_config_path(self):
return self.__service_function_config_path
@property
def delay_seconds(self):
if self._enable_delay:
return random.randint(self._delay_left_edge, self._delay_right_edge)
else:
return None
class ServerAddressBuilder:
IPv4_4TH_OCTET_LEFT_EDGE = 101
IPv4_1_TO_3TH_OCTET = "192.0.2"
def __init__(self, service_function_index: int):
self._service_function_index = service_function_index
self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index
def read_resolves_by_url(self, url):
parsed_url = urlparse(url)
port = parsed_url.port
if not port:
if parsed_url.scheme == 'https':
port = 443
elif parsed_url.scheme == 'http':
port = 80
return [f"{parsed_url.hostname}:{port}:{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}"]
# @property
# def resolves(self) -> list:
# return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST]
@property
def nameservers(self) -> list:
return [f'{self.IPv4_1_TO_3TH_OCTET}.{self._ipv4_4th_octet}']
class TcpPacketsCapture:
DSCP_KEY = ["DSCP"]
def __init__(self, server_ip, server_port):
self._server_ip = server_ip
self._server_port = server_port
self._quadruple_to_dscp_table = {}
self._build_filter()
self._iface_names = get_if_list()
# def _read_server_ip_and_port(self):
def _build_filter(self):
self._filter = f"tcp and src host {self._server_ip} and src port {self._server_port}"
def start(self):
time.sleep(0.5)
self._sniff_thread = AsyncSniffer(iface=self._iface_names, prn=self._packet_callback, filter=self._filter)
self._sniff_thread.start()
def _packet_callback(self, packet):
if IP in packet and TCP in packet:
src_ip = packet[IP].src
if src_ip == self._server_ip:
dscp_value = self._read_dscp_value_from_packet(packet)
quadruple = self._build_quadruple(packet, True)
self._update_quadruple_to_dscp_table(quadruple, dscp_value)
def _read_dscp_value_from_packet(self, packet):
ip_header = packet[IP]
return (ip_header.tos & 0xfc) >> 2
def _build_quadruple(self, packet, is_s2c):
src_ip = packet[IP].src
dst_ip = packet[IP].dst
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
if is_s2c:
return f"{dst_ip}:{dst_port},{src_ip}:{src_port}"
else:
return f"{src_ip}:{src_port},{dst_ip}:{dst_port}"
def _update_quadruple_to_dscp_table(self, quadruple, dscp_value):
self._quadruple_to_dscp_table[quadruple] = dscp_value
def stop(self):
time.sleep(0.5)
if self._sniff_thread is not None and self._sniff_thread.running:
self._sniff_thread.stop()
self._sniff_thread.join()
def read_dscp_value_by_quadruple(self, quadruple):
if quadruple in self._quadruple_to_dscp_table:
return self._quadruple_to_dscp_table[quadruple]
else:
return None
@staticmethod
def read_server_ip_and_port_from_resolve(resolves):
resolve = resolves[0]
resolve_split = resolve.split(":")
return resolve_split[2], resolve_split[1]
class TcpPacketsCaptureAssertion:
@staticmethod
def is_dscp_equal(actual_dscp, expected_dscp):
if actual_dscp is None:
return False, f"Error: Not read DSCP value."
if actual_dscp == expected_dscp:
return True, None
else:
return False, f"Error: Failed to verify DSCP value. Actual DSCP: {actual_dscp}, expected DSCP: {expected_dscp}."
class URLTransferBuilder:
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed):
self._url = url
self._request_resolve = request_resolve
self._conn_timeout = conn_timeout
self._max_recv_speed = max_recv_speed
self._conn = None
self._response_code = None
self._response_buffer = BytesIO()
self._error_info = None
self._size_download = None
self._local_ip = None
self._local_port = None
self._remote_ip = None
self._remote_port = None
self._total_time_s = None
self._speed_download = None
def _setup_connection(self):
self._response_buffer = BytesIO()
self._conn = pycurl.Curl()
self._conn.setopt(pycurl.WRITEFUNCTION, self._response_buffer.write)
self._conn.setopt(pycurl.RESOLVE, self._request_resolve)
self._conn.setopt(pycurl.URL, self._url)
self._conn.setopt(pycurl.TIMEOUT, self._conn_timeout)
if self._max_recv_speed is not None:
self._conn.setopt(pycurl.MAX_RECV_SPEED_LARGE, self._max_recv_speed)
def _perform_connection(self):
self._conn.perform()
self._response_code = self._conn.getinfo(pycurl.RESPONSE_CODE)
self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD)
self._local_ip = self._conn.getinfo(pycurl.LOCAL_IP)
self._local_port = self._conn.getinfo(pycurl.LOCAL_PORT)
self._remote_ip = self._conn.getinfo(pycurl.PRIMARY_IP)
self._remote_port = self._conn.getinfo(pycurl.PRIMARY_PORT)
self._total_time_s = self._conn.getinfo(pycurl.TOTAL_TIME)
self._speed_download = self._conn.getinfo(pycurl.SPEED_DOWNLOAD)
def _close_connection(self):
self._conn.close()
def connect(self):
try:
self._setup_connection()
self._perform_connection()
except pycurl.error as error_info:
self._error_info = error_info
finally:
self._close_connection()
@property
def response_code(self):
return self._response_code
@property
def response_body(self):
return self._response_buffer.getvalue()
@property
def error_info(self):
return self._error_info
@property
def size_download(self):
return self._size_download
@property
def speed_download(self):
return self._speed_download
@property
def quadruple(self):
return f"{self._local_ip}:{self._local_port},{self._remote_ip}:{self._remote_port}"
@property
def total_time_s(self):
if self._close_connection is None:
return None
return self._total_time_s
class HttpURLTransferBuilder(URLTransferBuilder):
def _perform_connection(self):
super()._perform_connection()
class HttpsURLTransferBuilder(URLTransferBuilder):
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed: int):
super().__init__(url, request_resolve, conn_timeout, max_recv_speed)
self._certs_info = None
def _setup_connection(self):
super()._setup_connection()
self._conn.setopt(self._conn.OPT_CERTINFO, 1)
self._conn.setopt(self._conn.SSL_VERIFYPEER, False)
def _perform_connection(self):
super()._perform_connection()
self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO)
@property
def cert_issuer(self) -> str:
return self._get_cert_issuer(self._certs_info)
def _get_cert_issuer(self, certs) -> str:
if certs is None:
return ""
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info[1]
return issuer
class DNSQueryBuilder:
def __init__(self, domain: str, namesevers: list, conn_timeout: int):
self._domain = domain
self._nameservers = namesevers
self._conn_timeout = conn_timeout
self._dns_resolver = None
self._dns_answer = None
self._error_info = None
def _setup(self):
self._dns_resolver = dns.resolver.Resolver()
self._dns_resolver.nameservers = self._nameservers
self._dns_resolver.search = []
self._dns_resolver.use_search_by_default = False
self._dns_resolver.timeout = float(self._conn_timeout)
self._dns_resolver.lifetime = float(self._conn_timeout)
def _query(self, record_type):
try:
self._setup()
self._dns_answer = self._dns_resolver.query(self._domain, record_type)
except Exception as error_info:
self._error_info = error_info
@property
def error_info(self):
return self._error_info
@property
def rrset_ttl(self):
if self._dns_answer is not None:
return self._dns_answer.rrset.ttl
return None
@property
def response_answer(self):
if self._dns_answer is not None:
return self._dns_answer.response.answer
return None
@property
def rdtype_address_pairs(self):
rdtype_address_pairs = []
if self._dns_answer is not None:
for answer in self._dns_answer.response.answer:
for item in answer.items:
rdtype_address_pairs.append({"rdtype": item.rdtype, "address": item.address})
return rdtype_address_pairs
class DNSQueryTypeABuilder(DNSQueryBuilder):
def query(self):
self._query("A")
class DNSQueryTypeAAAABuilder(DNSQueryBuilder):
def query(self):
self._query("AAAA")
class URLTransferResponseAssertion:
@staticmethod
def is_cert_issuer_matched(cert_issuer, regex):
if cert_issuer is None:
return False, f"Error: Failed to verify cert issuer. Actual cert issuer is None."
if re.search(regex, cert_issuer, 0):
return True, None
else:
return False, f"Error: Failed to verify cert issuer. Actual cert issuer: {cert_issuer}."
@staticmethod
def is_response_code_equal(actual_code, expected_code):
if actual_code == expected_code:
return True, None
else:
return False, f"Error: Failed to verfiy response code. Actual code: {actual_code}, expected code: {expected_code}."
@staticmethod
def is_response_body_matched(response_body, regex):
response_body_utf8 = response_body.decode('utf-8')
if re.search(regex, response_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body fail to match regex: {regex}."
@staticmethod
def is_response_body_not_matched(response_body, regex):
response_body_utf8 = response_body.decode('utf-8')
if not re.search(regex, response_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body matched the regex: {regex}."
@staticmethod
def is_response_body_md5_equal(response_body, expected_md5):
response_body_md5_value = hashlib.md5(response_body).hexdigest()
if expected_md5 == response_body_md5_value:
return True, None
else:
return False, f"Error: The response body md5 fail to match. Actual md5 value: {response_body_md5_value}."
@staticmethod
def is_download_size_equal(actual_size, expected_size):
if actual_size == expected_size:
return True, None
else:
return False, f"Error: The response body download size fail to match. Actual size: {actual_size}."
@staticmethod
def is_pycurl_error_code_equal(error_info, expected_error_code):
if error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
if error_info.args[0] == expected_error_code:
return True, None
else:
return False, f"Error: The erro code not equal to desired. Actual error info: {error_info}."
@staticmethod
def is_pycurl_error_none(error_info):
if error_info is None:
return True, None
else:
return False, f"Error: The pycurl error is not None. Actual error info: {error_info}."
@staticmethod
def is_connect_time_less_than(actual_time_s, expected_time_s):
if actual_time_s < expected_time_s:
return True, None
return False, f"Error: The actual time is less than expected. Actual time:{actual_time_s}s, Expected time: {expected_time_s}s."
class DNSResponseAssertion:
@staticmethod
def is_error_info_none(error_info):
if error_info is None:
return True, None
return False, f"Error: The error info: {error_info}."
@staticmethod
def is_error_type_equal(error_info, expected_type):
if error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
if type(error_info) == expected_type:
return True, None
else:
return False, f"Error: error type not equal to {expected_type}, error info: {error_info}."
@staticmethod
def is_ttl_equal(actual_ttl, expected_ttl):
if actual_ttl == expected_ttl:
return True, None
return False, f"Error: Actual ttl(%s) not equal to %d." %(actual_ttl, expected_ttl)
@staticmethod
def is_ttl_in_range(actual_ttl, expected_left_edge, expected_right_edge):
if actual_ttl >= expected_left_edge and actual_ttl <= expected_right_edge:
return True, None
return False, f"Error: ttl(%d) not in [%d-%d]." %(actual_ttl, expected_left_edge, expected_right_edge)
@staticmethod
def is_rdtype_address_pair_included(rdtype_address_pairs, expected_rdtype, expected_address):
#rdtype 1: ipv4, 28: ipv6.
for pair in rdtype_address_pairs:
if pair["address"] == expected_address and pair["rdtype"] == expected_rdtype:
return True, None
return False, f"Expected rdtype address pair[{expected_rdtype}, {expected_address}] not in {rdtype_address_pairs}."
class ProxyCasesRunner:
@staticmethod
def action_intercept_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_cert_error(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b')
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_1k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024)
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_4k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 4)
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_16k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 16)
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_64k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 64)
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_256k(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 256)
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_1M(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024)
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_4M(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 4)
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_16M(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 16)
if not status:
return False, info
return True, None
@staticmethod
def action_intercept_protocol_https_download_size_64M(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, 1024 * 1024 * 64)
if not status:
return False, info
return True, None
@staticmethod
def action_redirect_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 302)
if not status:
return False, info
return True, None
@staticmethod
def action_redirect_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 302)
if not status:
return False, info
return True, None
@staticmethod
def action_block_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not status:
return False, info
return True, None
@staticmethod
def action_block_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not status:
return False, info
return True, None
@staticmethod
def action_replace_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not status:
return False, info
return True, None
@staticmethod
def action_replace_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not status:
return False, info
return True, None
@staticmethod
def action_hijack_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not status:
return False, info
return True, None
@staticmethod
def action_hijack_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not status:
return False, info
return True, None
@staticmethod
def action_insert_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not status:
return False, info
return True, None
@staticmethod
def action_insert_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not status:
return False, info
return True, None
@staticmethod
def action_deny_protocol_http_filter_host(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host')
if not status:
return False, info
return True, None
@staticmethod
def action_deny_protocol_http_filter_url(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url')
if not status:
return False, info
return True, None
class ShapingCaseRunner:
@staticmethod
def no_rate_limit_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_connect_time_less_than(conn.total_time_s, 5)
if not status:
return False, info
return True, f"The connect total time in seconds is: {conn.total_time_s}."
@staticmethod
def no_rate_limit_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_connect_time_less_than(conn.total_time_s, 5)
if not status:
return False, info
return True, f"The connect total time in seconds is: {conn.total_time_s}."
@staticmethod
def rate_limit_0bps_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
return True, None
@staticmethod
def rate_limit_0bps_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
return True, None
@staticmethod
def rate_limit_1000gbps_protocol_http(url, resolves, conn_timeout, max_recv_speed):
server_ip, server_port = TcpPacketsCapture.read_server_ip_and_port_from_resolve(resolves)
capture = TcpPacketsCapture(server_ip, server_port)
capture.start()
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
capture.stop()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
actual_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
status, info = TcpPacketsCaptureAssertion.is_dscp_equal(actual_dscp, 8)
if not status:
return False, info
return True, None
@staticmethod
def rate_limit_1000gbps_protocol_https(url, resolves, conn_timeout, max_recv_speed):
server_ip, server_port = TcpPacketsCapture.read_server_ip_and_port_from_resolve(resolves)
capture = TcpPacketsCapture(server_ip, server_port)
capture.start()
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
capture.stop()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
actual_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
status, info = TcpPacketsCaptureAssertion.is_dscp_equal(actual_dscp, 8)
if not status:
return False, info
return True, None
class FirewallCasesRunner:
@staticmethod
def action_bypass_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
return True, None
@staticmethod
def action_allow_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_drop_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_reset_protocol_http_filter_host(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_reset_protocol_http_filter_url(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_block_protocol_http(url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 403)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r"dign-testing-deny-block")
if not status:
return False, info
return True, None
@staticmethod
def action_allow_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_drop_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_reset_protocol_https(url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 35)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_drop_protocol_dns(domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout)
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_redirect_protocol_dns_type_a(domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
status, info = DNSResponseAssertion.is_ttl_equal(request.rrset_ttl, 333)
if not status:
return False, info
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_redirect_protocol_dns_type_aaaa(domain, nameservers, conn_timeout):
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
status, info = DNSResponseAssertion.is_ttl_equal(request.rrset_ttl, 333)
if not status:
return False, info
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
status, info = DNSResponseAssertion.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not status:
return False, info
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not status:
return False, info
return True, None
@staticmethod
def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(domain, nameservers, conn_timeout):
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
status, info = DNSResponseAssertion.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not status:
return False, info
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not status:
return False, info
return True, None
class ResultExportBuilder:
COLUMN_0 = "Test cases"
COLUMN_1 = "Result"
COLUMN_2 = "Description"
RESULT_OK = "ok"
RESULT_FAIL = "FAIL"
DEFAULT_INFO = "-"
def __init__(self):
self._exporter = None
self._create_exporter()
def _create_exporter(self):
self._exporter = PrettyTable()
self._exporter.vrules = NONE
self._exporter.field_names = [self.COLUMN_0, self.COLUMN_1, self.COLUMN_2]
self._exporter.align[self.COLUMN_0] = "l"
self._exporter.align[self.COLUMN_1] = "l"
self._exporter.align[self.COLUMN_2] = "l"
def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True):
status, description = self._convert_case_result(case_result)
if enable_ouptut_by_case:
self._output_by_case_name(case_name, status)
#if not case_result[0]:
self._add_exporter_row(case_name, status, description)
def _convert_case_result(self, case_result):
status = self.RESULT_OK
if not case_result[0]:
status = self.RESULT_FAIL
description = self.DEFAULT_INFO
if case_result[1] is not None:
description = case_result[1]
return status, description
def _output_by_case_name(self, case_name, case_status):
print(f"{case_name} ... {case_status}")
def _add_exporter_row(self, column0: str, column1: str, column2: str):
self._exporter.add_row([column0, column1, column2])
def export(self):
if len(self._exporter._rows) > 0:
print("\nSummary:")
print(self._exporter)
class DiagnoseCasesRunner:
def __init__(self, cmd_parser: CommandParser):
self._loop = cmd_parser.loop
self._count = cmd_parser.count
self._interval_s = cmd_parser.interval_s
self._case_names_regexp = cmd_parser.case_names_regexp
self._config_path = cmd_parser.config_path
self._service_function_names_regexp = cmd_parser.service_function_names_regexp
self._service_function_config_path = cmd_parser.service_function_config_path
self._random_delay_seconds = cmd_parser.delay_seconds
self._config_loader = ConfigLoader(self._config_path)
self._cases_info = [
{
"name": "Firewall_Allow_HTTP",
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_allow_protocol_http,
"request_content": "http://http.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Firewall_Allow_HTTPS",
"protocol_type": "https",
"test_function": FirewallCasesRunner.action_allow_protocol_https,
"request_content": "https://sha512.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyReset_HTTP",
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http,
"request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyReset_HTTPS",
"protocol_type": "https",
"test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_https,
"request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyReset_FilterHost_HTTP",
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http_filter_host,
"request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyReset_FilterURL_HTTP",
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_reset_protocol_http_filter_url,
"request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyBlock_HTTP",
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_block_protocol_http,
"request_content": "http://http-login.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyDrop_HTTP",
"protocol_type": "http",
"test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_http,
"request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 4,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyDrop_HTTPS",
"protocol_type": "https",
"test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_https,
"request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 4,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyDrop_DNS",
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_drop_protocol_dns,
"request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website",
'conn_timeout': 3,
'max_recv_speed': 6553600
},
{
"name": "Firewall_DenyRedirect_A_DNS",
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_a,
"request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website",
"conn_timeout": 3,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyRedirect_AAAA_DNS",
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_aaaa,
"request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website",
"conn_timeout": 3,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyRedirect_ARangeTTL_DNS",
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl,
"request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website",
"conn_timeout": 3,
"max_recv_speed": 6553600
},
{
"name": "Firewall_DenyRedirect_AAAARangeTTL_DNS",
"protocol_type": "dns",
"test_function": FirewallCasesRunner.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl,
"request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website",
"conn_timeout": 3,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https,
"request_content": "https://sha256.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_CertExpired",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error,
"request_content": "https://expired.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_CertSelfSigned",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error,
"request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_CertUntrustedRoot",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_cert_error,
"request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_1k",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_1k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_4k",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_4k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_16k",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_16k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_64k",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_64k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_256k",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_256k,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_1M",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_1M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_4M",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_4M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_16M",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_16M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M",
"conn_timeout": 4,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Intercept_HTTPS_Response_64M",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_intercept_protocol_https_download_size_64M,
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M",
"conn_timeout": 12,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Redirect_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_redirect_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Redirect_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_redirect_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Replace_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_replace_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Replace_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_replace_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Deny_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_block_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Deny_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_block_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Deny_FilterHost_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_deny_protocol_http_filter_host,
"request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Deny_FilterURL_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_deny_protocol_http_filter_url,
"request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Hijack_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_hijack_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Hijack_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_hijack_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Insert_HTTP",
"protocol_type": "http",
"test_function": ProxyCasesRunner.action_insert_protocol_http,
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Proxy_Manipulation_Insert_HTTPS",
"protocol_type": "https",
"test_function": ProxyCasesRunner.action_insert_protocol_https,
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html",
"conn_timeout": 1,
"max_recv_speed": 6553600
},
{
"name": "Shaping_NoRateLimit_HTTP",
"protocol_type": "http",
"test_function": ShapingCaseRunner.no_rate_limit_protocol_http,
"request_content": "http://testing-shaping-no-rate-limit.badssl.selftest.gdnt-cloud.website/resources/64M",
"conn_timeout": 12,
"max_recv_speed": None
},
{
"name": "Shaping_NoRateLimit_HTTPS",
"protocol_type": "https",
"test_function": ShapingCaseRunner.no_rate_limit_protocol_https,
"request_content": "https://testing-shaping-no-rate-limit.badssl.selftest.gdnt-cloud.website/resources/64M",
"conn_timeout": 12,
"max_recv_speed": None
},
{
"name": "Shaping_RateLimit0bps_HTTP",
"protocol_type": "http",
"test_function": ShapingCaseRunner.rate_limit_0bps_protocol_http,
"request_content": "http://testing-shaping-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/64M",
"conn_timeout": 12,
"max_recv_speed": 6553600
},
{
"name": "Shaping_RateLimit0bps_HTTPS",
"protocol_type": "https",
"test_function": ShapingCaseRunner.rate_limit_0bps_protocol_https,
"request_content": "https://testing-shaping-rate-limit-0bps.badssl.selftest.gdnt-cloud.website/resources/64M",
"conn_timeout": 12,
"max_recv_speed": 6553600
},
{
"name": "Shaping_RateLimit1000gbps_HTTP",
"protocol_type": "http",
"test_function": ShapingCaseRunner.rate_limit_1000gbps_protocol_http,
"request_content": "http://testing-shaping-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/64M",
"conn_timeout": 12,
"max_recv_speed": 6553600
},
{
"name": "Shaping_RateLimit1000gbps_HTTPS",
"protocol_type": "https",
"test_function": ShapingCaseRunner.rate_limit_1000gbps_protocol_https,
"request_content": "https://testing-shaping-rate-limit-1000gbps.badssl.selftest.gdnt-cloud.website/resources/64M",
"conn_timeout": 12,
"max_recv_speed": 6553600
}
]
def run_cases(self):
if self._random_delay_seconds is not None:
time.sleep(self._random_delay_seconds)
run_count = 1
while True:
print("\nRUN %d" % run_count)
self.__run_cases_by_service_function_ids()
if (self._loop is not True) and self._count >= run_count:
break
time.sleep(self._interval_s)
run_count = run_count + 1
def __run_cases_by_service_function_ids(self):
sf_loader = ServiceFunctionConfigLoader(self._service_function_config_path, self._service_function_names_regexp)
for sf_pair in sf_loader.name_id_pairs:
start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(format(("Service function name: " + str(sf_pair["name"]) + ",Test start time: " + start_timestamp),'#^100s'))
self._run_cases_in_one_service_function_id(sf_pair["id"])
end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(format(("Service function name: " + str(sf_pair["name"]) + ",Test end time: " + end_timestamp),'=^100s'))
def _run_cases_in_one_service_function_id(self, service_function_id):
resolve_builder = ServerAddressBuilder(service_function_id)
exporter = ResultExportBuilder()
for case_info in self._cases_info:
if re.fullmatch(self._case_names_regexp, case_info["name"]):
self._run_one_case(case_info, resolve_builder, exporter)
exporter.export()
def _run_one_case(self, case_info, resolve_builder, exporter):
conn_timeout = self._get_conn_timeout_from_configs(case_info)
max_recv_speed = self._get_max_recv_speed_from_configs(case_info)
test_func = case_info.get("test_function")
if test_func:
if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https":
resolve = resolve_builder.read_resolves_by_url(case_info["request_content"])
ret = test_func(case_info["request_content"], resolve, conn_timeout, max_recv_speed)
if case_info["protocol_type"] == "dns":
ret = test_func(case_info["request_content"], resolve_builder.nameservers, conn_timeout)
exporter.append_case_result(case_info["name"], ret)
def _get_conn_timeout_from_configs(self, case_info):
conn_timeout = self._config_loader.get_value_by_section_and_key(case_info["name"], "conn_timeout")
if conn_timeout is not None:
return conn_timeout
else:
return case_info["conn_timeout"]
def _get_max_recv_speed_from_configs(self, case_info):
max_recv_speed = self._config_loader.get_value_by_section_and_key(case_info["name"], "max_recv_speed")
if max_recv_speed is not None:
return max_recv_speed
else:
return case_info["max_recv_speed"]
if __name__ == '__main__':
cmd_parser = CommandParser()
dign = DiagnoseCasesRunner(cmd_parser)
dign.run_cases()