1313 lines
60 KiB
Python
1313 lines
60 KiB
Python
import sys
|
||
import unittest
|
||
import json
|
||
import pycurl
|
||
import os
|
||
import re
|
||
import time
|
||
import io
|
||
from io import BytesIO
|
||
# import getopt
|
||
# import ciunittest
|
||
import argparse
|
||
import hashlib
|
||
from configparser import ConfigParser
|
||
import random
|
||
import dns.exception
|
||
import dns.resolver
|
||
import sys
|
||
import logging
|
||
import copy
|
||
from prettytable import PrettyTable,NONE,HEADER
|
||
|
||
class ConfigLoader:
|
||
DEFAULT_CONFIGS = {
|
||
'test_firewallBypass_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyDrop_dns': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyRedirectA_dns': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyRedirectAAAA_dns': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyRedirectARangeTTL_dns': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyRedirectAAAARangeTTL_dns': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_dnsRequest_allow_rdtype_a': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_dnsRequest_allow_rdtype_aaaa': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_dnsRequest_allow_rdtype_cname': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslCerterrExpired': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslCerterrSelfsigned': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslCerterrUntrustedroot': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyRedirect_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyBlock_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyReplace_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyHijack_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyInsert_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyRedirect_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyBlock_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyReplace_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyHijack_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyInsert_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize1k': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize4k': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize16k': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize64k': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize256k': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize1M': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize4M': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize16M': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallIntercept_sslDownloadSize64M': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallAllow_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyDrop_http': {
|
||
'conn_timeout': 4,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyReset_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyBlock_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallAllow_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyDrop_ssl': {
|
||
'conn_timeout': 4,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyReset_ssl': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyResetFilterHost_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_firewallDenyResetFilterURL_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyDenyFilterHost_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'test_proxyDenyFilterURL_http': {
|
||
'conn_timeout': 1,'max_recv_speed_large': 6553600
|
||
},
|
||
'start_time_random_delay_range': {
|
||
'left_edge': 1,'right_edge': 30
|
||
}
|
||
}
|
||
|
||
def __init__(self, config_path: str):
|
||
self.__config_path = config_path
|
||
self.__configs = copy.deepcopy(self.DEFAULT_CONFIGS)
|
||
self.__load_configs()
|
||
|
||
def __load_configs(self):
|
||
config_parser = ConfigParser()
|
||
config_parser.read(self.__config_path, encoding='UTF-8')
|
||
for section in config_parser.sections():
|
||
if section in self.__configs:
|
||
self.__configs[section].update(config_parser.items(section))
|
||
else:
|
||
self.__configs[section] = dict(config_parser.items(section))
|
||
|
||
def get_value_by_section_and_key(self, section, key):
|
||
if section in self.__configs and key in self.__configs[section]:
|
||
value = self.__configs[section][key]
|
||
if isinstance(value, str) and value.isdigit():
|
||
return int(value)
|
||
else:
|
||
return value
|
||
else:
|
||
raise KeyError("%s not in configs or %s.%s not in configs." % (section, section, key))
|
||
|
||
class CommandParser:
|
||
COUNT_MIN = 1
|
||
COUNT_MAX = 65535
|
||
INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$'
|
||
#INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
|
||
|
||
def __init__(self):
|
||
self.__start_up()
|
||
|
||
def __start_up(self):
|
||
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose")
|
||
parser.add_argument('-i','--interval', type = int, default = 30,
|
||
help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.')
|
||
parser.add_argument('-c','--count', type = int, default = 1,
|
||
help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].')
|
||
parser.add_argument('-l','--loop', action='store_true', default = False,
|
||
help='Enable TSG diagnose loop, exit when recv a signal.')
|
||
parser.add_argument('--service_function_indexs', type = str, default = "0",
|
||
help = "Specifies the service function indexs. Example: 0,2,4-6,7. Range: [0,63]")
|
||
parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf',
|
||
help='Specifies the config file, default /opt/dign_client/etc/client.conf')
|
||
parser.add_argument('--case_names_regexp', type = str, default = '.+',
|
||
help='Regexp of the case names to run. Example: .+')
|
||
|
||
# parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running')
|
||
args = parser.parse_args()
|
||
|
||
if not self.__is_legal_args(args):
|
||
parser.print_help()
|
||
sys.exit(1)
|
||
|
||
self.__interval_s = args.interval
|
||
self.__count = args.count
|
||
self.__loop = args.loop
|
||
self.__service_function_indexs = args.service_function_indexs
|
||
self.__config_path = args.config_path
|
||
self.__case_names_regexp = args.case_names_regexp
|
||
|
||
def __is_legal_args(self, args) -> bool:
|
||
if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX:
|
||
print("Error: the number of TSG diagnoses must in [1,65535]")
|
||
return False
|
||
|
||
str_indexs = args.service_function_indexs.split(",")
|
||
for item in str_indexs:
|
||
if not re.match(self.INDEX_PATTERN, item):
|
||
sub_indexs = item.split("-")
|
||
if len(sub_indexs) != 2 or (not re.match(self.INDEX_PATTERN, sub_indexs[0])) or \
|
||
(not re.match(self.INDEX_PATTERN, sub_indexs[1])) or (int(sub_indexs[1]) < int(sub_indexs[0])):
|
||
print("Error: The service function indexs must in [0,63]")
|
||
return False
|
||
|
||
return True
|
||
|
||
@property
|
||
def interval_s(self):
|
||
return self.__interval_s
|
||
|
||
@property
|
||
def count(self):
|
||
return self.__count
|
||
|
||
@property
|
||
def loop(self):
|
||
return self.__loop
|
||
|
||
@property
|
||
def service_function_indexs(self):
|
||
return self.__parse_service_function_indexs_str(self.__service_function_indexs)
|
||
|
||
def __parse_service_function_indexs_str(self, indexs_str: str) -> list:
|
||
indexs = []
|
||
str_indexs = indexs_str.split(",")
|
||
for item in str_indexs:
|
||
if re.match(self.INDEX_PATTERN, item):
|
||
indexs.append(int(item))
|
||
else:
|
||
sub_str_indexs = item.split("-")
|
||
indexs.extend([i for i in range(int(sub_str_indexs[0]), int(sub_str_indexs[1]) + 1)])
|
||
indexs = list(set(indexs))
|
||
return indexs
|
||
|
||
@property
|
||
def config_path(self):
|
||
return self.__config_path
|
||
|
||
@property
|
||
def case_names_regexp(self):
|
||
return r"{}".format(self.__case_names_regexp)
|
||
|
||
class ServerAddressBuilder:
|
||
IPv4_4TH_OCTET_LEFT_EDGE = 101
|
||
DOMAIN_TO_PORT_LIST = [
|
||
('sha384.badssl.selftest.gdnt-cloud.website', 443),
|
||
('sha256.badssl.selftest.gdnt-cloud.website', 443),
|
||
('expired.badssl.selftest.gdnt-cloud.website', 443),
|
||
('self-signed.badssl.selftest.gdnt-cloud.website', 443),
|
||
('untrusted-root.badssl.selftest.gdnt-cloud.website', 443),
|
||
('web-replay.badssl.selftest.gdnt-cloud.website', 80),
|
||
('web-replay.badssl.selftest.gdnt-cloud.website', 443),
|
||
('testing-download.badssl.selftest.gdnt-cloud.website', 443),
|
||
('http.badssl.selftest.gdnt-cloud.website', 80),
|
||
('http-credit-card.badssl.selftest.gdnt-cloud.website', 80),
|
||
('http-dynamic-login.badssl.selftest.gdnt-cloud.website', 80),
|
||
('http-login.badssl.selftest.gdnt-cloud.website', 80),
|
||
('sha512.badssl.selftest.gdnt-cloud.website', 443),
|
||
('rsa2048.badssl.selftest.gdnt-cloud.website', 443),
|
||
('rsa4096.badssl.selftest.gdnt-cloud.website', 443),
|
||
('testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website', 80),
|
||
('testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website', 80),
|
||
('testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website', 80),
|
||
('testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website', 80)
|
||
]
|
||
|
||
def __init__(self, service_function_index: int):
|
||
self._service_function_index = service_function_index
|
||
self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index
|
||
|
||
@property
|
||
def resolves(self) -> list:
|
||
return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST]
|
||
|
||
@property
|
||
def nameservers(self) -> list:
|
||
return ['192.0.2.%d' % self._ipv4_4th_octet]
|
||
|
||
class URLTransferBuilder:
|
||
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int):
|
||
self._url = url
|
||
self._request_resolve = request_resolve
|
||
self._conn_timeout = conn_timeout
|
||
self._max_recv_speed_large = max_recv_speed_large
|
||
self._conn = None
|
||
self._response_code = None
|
||
self._response_buffer = BytesIO()
|
||
self._error_info = None
|
||
self._size_download = None
|
||
|
||
def _setup_connection(self):
|
||
self._response_buffer = BytesIO()
|
||
self._conn = pycurl.Curl()
|
||
self._conn.setopt(self._conn.WRITEFUNCTION, self._response_buffer.write)
|
||
self._conn.setopt(self._conn.RESOLVE, self._request_resolve)
|
||
self._conn.setopt(self._conn.URL, self._url)
|
||
self._conn.setopt(self._conn.TIMEOUT, self._conn_timeout)
|
||
self._conn.setopt(self._conn.MAX_RECV_SPEED_LARGE, self._max_recv_speed_large)
|
||
|
||
def _perform_connection(self):
|
||
self._conn.perform()
|
||
self._response_code = self._conn.getinfo(self._conn.RESPONSE_CODE)
|
||
self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD)
|
||
|
||
def _close_connection(self):
|
||
self._conn.close()
|
||
|
||
def connect(self):
|
||
try:
|
||
self._setup_connection()
|
||
self._perform_connection()
|
||
self._close_connection()
|
||
except pycurl.error as error_info:
|
||
self._error_info = error_info
|
||
|
||
@property
|
||
def response_code(self):
|
||
return self._response_code
|
||
|
||
@property
|
||
def response_body(self):
|
||
return self._response_buffer.getvalue()
|
||
|
||
@property
|
||
def error_info(self):
|
||
return self._error_info
|
||
|
||
@property
|
||
def size_download(self):
|
||
return self._size_download
|
||
|
||
class HttpURLTransferBuilder(URLTransferBuilder):
|
||
def _perform_connection(self):
|
||
super()._perform_connection()
|
||
|
||
class HttpsURLTransferBuilder(URLTransferBuilder):
|
||
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int):
|
||
super().__init__(url, request_resolve, conn_timeout, max_recv_speed_large)
|
||
self._certs_info = None
|
||
|
||
def _setup_connection(self):
|
||
super()._setup_connection()
|
||
self._conn.setopt(self._conn.OPT_CERTINFO, 1)
|
||
self._conn.setopt(self._conn.SSL_VERIFYPEER, False)
|
||
|
||
def _perform_connection(self):
|
||
super()._perform_connection()
|
||
self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO)
|
||
|
||
@property
|
||
def cert_issuer(self) -> str:
|
||
return self._get_cert_issuer(self._certs_info)
|
||
|
||
def _get_cert_issuer(self, certs) -> str:
|
||
if certs is None:
|
||
return ""
|
||
for cert_info in certs[0]:
|
||
if cert_info[0] == "Issuer":
|
||
issuer = cert_info[1]
|
||
return issuer
|
||
|
||
class DNSQueryBuilder:
|
||
def __init__(self, domain: str, namesevers: list, conn_timeout: int):
|
||
self._domain = domain
|
||
self._nameservers = namesevers
|
||
self._conn_timeout = conn_timeout
|
||
self._dns_resolver = None
|
||
self._dns_answer = None
|
||
self._error_info = None
|
||
|
||
def _setup(self):
|
||
self._dns_resolver = dns.resolver.Resolver()
|
||
self._dns_resolver.nameservers = self._nameservers
|
||
self._dns_resolver.search = []
|
||
self._dns_resolver.use_search_by_default = False
|
||
self._dns_resolver.timeout = float(self._conn_timeout)
|
||
self._dns_resolver.lifetime = float(self._conn_timeout)
|
||
|
||
def _query(self, record_type):
|
||
try:
|
||
self._setup()
|
||
self._dns_answer = self._dns_resolver.query(self._domain, record_type)
|
||
except Exception as error_info:
|
||
self._error_info = error_info
|
||
|
||
@property
|
||
def error_info(self):
|
||
return self._error_info
|
||
|
||
@property
|
||
def rrset_ttl(self):
|
||
if self._dns_answer is not None:
|
||
return self._dns_answer.rrset.ttl
|
||
return None
|
||
|
||
@property
|
||
def response_answer(self):
|
||
if self._dns_answer is not None:
|
||
return self._dns_answer.response.answer
|
||
return None
|
||
|
||
class DNSQueryTypeABuilder(DNSQueryBuilder):
|
||
def query(self):
|
||
self._query("A")
|
||
|
||
class DNSQueryTypeAAAABuilder(DNSQueryBuilder):
|
||
def query(self):
|
||
self._query("AAAA")
|
||
|
||
|
||
class URLTransferResponseAnalyzer:
|
||
def is_cert_issuer_matched(self, present_cert_issuer, desired_pattern):
|
||
if present_cert_issuer is None:
|
||
return False, f"Error: Failed to verify cert issuer. Present cert issuer is None."
|
||
if re.search(desired_pattern, present_cert_issuer, 0):
|
||
return True, None
|
||
else:
|
||
return False, f"Error: Failed to verify cert issuer. Present cert issuer: {present_cert_issuer}."
|
||
|
||
def is_response_code_equal(self, present_code, desired_code):
|
||
if present_code == desired_code:
|
||
return True, None
|
||
else:
|
||
return False, f"Error: Failed to verfiy response code. Present code: {present_code}, desired code: {desired_code}."
|
||
|
||
def is_response_body_matched(self, present_body, desired_pattern):
|
||
present_body_utf8 = present_body.decode('utf-8')
|
||
if re.search(desired_pattern, present_body_utf8, 0):
|
||
return True, None
|
||
else:
|
||
return False, f"Error: The response body fail to match the desired content."
|
||
|
||
def is_response_body_not_matched(self, present_body, desired_pattern):
|
||
present_body_utf8 = present_body.decode('utf-8')
|
||
if not re.search(desired_pattern, present_body_utf8, 0):
|
||
return True, None
|
||
else:
|
||
return False, f"Error: The response body matched the desired content: {desired_pattern}."
|
||
|
||
def is_response_body_md5_equal(self, present_body, disired_md5):
|
||
present_body_md5_value = hashlib.md5(present_body).hexdigest()
|
||
if re.search(disired_md5, present_body_md5_value, 0):
|
||
return True, None
|
||
else:
|
||
return False, f"Error: The response body md5 fail to match. Present md5 value: {present_body_md5_value}."
|
||
|
||
|
||
def is_download_size_equal(self, present_size, disired_size):
|
||
if present_size == disired_size:
|
||
return True, None
|
||
else:
|
||
return False, f"Error: The response body download size fail to match. present_size: {present_size}."
|
||
|
||
def is_pycurl_error_code_equal(self, present_error_info, disired_error_code):
|
||
if present_error_info is None:
|
||
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
|
||
if present_error_info.args[0] == disired_error_code:
|
||
return True, None
|
||
else:
|
||
return False, f"Error: The erro code not equal to desired. Present error info: {present_error_info}."
|
||
|
||
def is_pycurl_error_none(self, present_error_info):
|
||
if present_error_info is None:
|
||
return True, None
|
||
else:
|
||
return False, f"Error: The pycurl error is not None. Present error info: {present_error_info}."
|
||
|
||
class DNSResponseAnalyzer:
|
||
def is_error_info_none(self, present_error_info):
|
||
if present_error_info is None:
|
||
return True, None
|
||
return False, f"Error: The error info: {present_error_info}."
|
||
|
||
def is_error_type_equal(self, present_error_info, desired_type):
|
||
if present_error_info is None:
|
||
return False, f"Error: The error info is None. Maybe the relevant actions didn’t take effect."
|
||
|
||
if type(present_error_info) == desired_type:
|
||
return True, None
|
||
else:
|
||
return False, f"Error: error type not equal to {desired_type}, error info: {present_error_info}."
|
||
|
||
def is_ttl_equal(self, present_ttl, desired_ttl):
|
||
if present_ttl == desired_ttl:
|
||
return True, None
|
||
return False, f"Error: Present ttl(%s) not equal to %d." %(present_ttl, desired_ttl)
|
||
|
||
def is_ttl_in_range(self, present_ttl, desired_left_edge, desired_right_edge):
|
||
if present_ttl >= desired_left_edge and present_ttl <= desired_right_edge:
|
||
return True, None
|
||
return False, f"Error: ttl(%d) not in [%d-%d]." %(present_ttl, desired_left_edge, desired_right_edge)
|
||
|
||
def is_address_equal(self, response_answer, desired_address, desired_address_type):
|
||
if desired_address_type == "ipv4":
|
||
rdtype = 1
|
||
if desired_address_type == "ipv6":
|
||
rdtype = 28
|
||
|
||
for i in response_answer:
|
||
for j in i.items:
|
||
if j.rdtype == rdtype: # 1: ipv4, 28: ipv6
|
||
if j.address == desired_address:
|
||
return True, None
|
||
else:
|
||
return False, f"Error: Present address error."
|
||
else:
|
||
return False, f"Error: Response rdtype error."
|
||
|
||
class ProxyCasesRunner:
|
||
def __init__(self) -> None:
|
||
self._analyzer = URLTransferResponseAnalyzer()
|
||
|
||
def action_redirect_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
|
||
desired_cert_issuer_pattern = r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b'
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
return True, None
|
||
|
||
def action_redirect_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
return True, None
|
||
|
||
def action_block_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
|
||
if not is_body_matched[0]:
|
||
return False, is_body_matched[1]
|
||
return True, None
|
||
|
||
def action_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
|
||
if not is_body_matched[0]:
|
||
return False, is_body_matched[1]
|
||
return True, None
|
||
|
||
def action_replace_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
|
||
if not is_body_matched[0]:
|
||
return False, is_body_matched[1]
|
||
is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
|
||
if not is_body_matched[0]:
|
||
return False, is_body_not_matched[1]
|
||
|
||
return True, None
|
||
|
||
def action_replace_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
|
||
if not is_body_matched[0]:
|
||
return False, is_body_matched[1]
|
||
|
||
is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
|
||
if not is_body_matched[0]:
|
||
return False, is_body_not_matched[1]
|
||
|
||
return True, None
|
||
|
||
def action_hijack_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
return True, None
|
||
|
||
def action_hijack_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
return True, None
|
||
|
||
def action_insert_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
|
||
is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
|
||
if not is_json_key_matched[0]:
|
||
return False, is_json_key_matched[1]
|
||
|
||
is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
|
||
if not is_json_val_matched[0]:
|
||
return False, is_json_val_matched[1]
|
||
|
||
return True, None
|
||
|
||
def action_insert_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
|
||
if not is_json_key_matched[0]:
|
||
return False, is_json_key_matched[1]
|
||
|
||
is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
|
||
if not is_json_val_matched[0]:
|
||
return False, is_json_key_matched[1]
|
||
|
||
return True, None
|
||
|
||
def action_deny_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host')
|
||
if not is_body_matched[0]:
|
||
return False, is_body_matched[1]
|
||
return True, None
|
||
|
||
def action_deny_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url')
|
||
if not is_body_matched[0]:
|
||
return False, is_body_matched[1]
|
||
return True, None
|
||
|
||
class FirewallCasesRunner:
|
||
def __init__(self) -> None:
|
||
self._analyzer = URLTransferResponseAnalyzer()
|
||
self._dns_analyzer = DNSResponseAnalyzer()
|
||
|
||
def action_bypass_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
return True, None
|
||
|
||
def action_intercept_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
return True, None
|
||
|
||
def action_intercept_protocol_https_cert_error(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
return True, None
|
||
|
||
def action_allow_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
return True, None
|
||
|
||
def action_deny_subaction_drop_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
|
||
if not is_error_type_equal[0]:
|
||
return False, is_error_type_equal[1]
|
||
return True, None
|
||
|
||
def action_deny_subaction_reset_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 56)
|
||
if not is_error_type_equal[0]:
|
||
return False, is_error_type_equal[1]
|
||
return True, None
|
||
|
||
def action_deny_subaction_reset_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large)
|
||
|
||
def action_deny_subaction_reset_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large)
|
||
|
||
def action_deny_subaction_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 403)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r"dign-testing-deny-block")
|
||
if not is_body_matched[0]:
|
||
return False, is_body_matched[1]
|
||
return True, None
|
||
|
||
def action_allow_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
|
||
if not is_code_equal[0]:
|
||
return False, is_code_equal[1]
|
||
return True, None
|
||
|
||
def action_deny_subaction_drop_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
|
||
if not is_error_type_equal[0]:
|
||
return False, is_error_type_equal[1]
|
||
return True, None
|
||
|
||
def action_deny_subaction_reset_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 35)
|
||
if not is_error_type_equal[0]:
|
||
return False, is_error_type_equal[1]
|
||
return True, None
|
||
|
||
|
||
def action_intercept_protocol_https_download_size_1k(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024)
|
||
|
||
def action_intercept_protocol_https_download_size_4k(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 4)
|
||
|
||
def action_intercept_protocol_https_download_size_16k(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 16)
|
||
|
||
def action_intercept_protocol_https_download_size_64k(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 64)
|
||
|
||
def action_intercept_protocol_https_download_size_256k(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 256)
|
||
|
||
def action_intercept_protocol_https_download_size_1M(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024)
|
||
|
||
def action_intercept_protocol_https_download_size_4M(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 4)
|
||
|
||
def action_intercept_protocol_https_download_size_16M(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 16)
|
||
|
||
def action_intercept_protocol_https_download_size_64M(self, url, resolves, conn_timeout, max_recv_speed_large):
|
||
return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 64)
|
||
|
||
def _action_intercept_protocol_ssl_by_download_size(self, url, resolves, conn_timeout, max_recv_speed_large, download_size):
|
||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large)
|
||
conn.connect()
|
||
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
|
||
if not is_error_none[0]:
|
||
return False, is_error_none[1]
|
||
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||
if not is_cert_matched[0]:
|
||
return False, is_cert_matched[1]
|
||
is_download_size_equal = self._analyzer.is_download_size_equal(conn.size_download, download_size)
|
||
if not is_download_size_equal[0]:
|
||
return False, is_download_size_equal[1]
|
||
return True, None
|
||
|
||
def action_deny_subaction_drop_protocol_dns(self, domain, nameservers, conn_timeout):
|
||
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
|
||
request.query()
|
||
|
||
is_error_type_equal = self._dns_analyzer.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout)
|
||
if not is_error_type_equal[0]:
|
||
return False, is_error_type_equal[1]
|
||
return True, None
|
||
|
||
def action_deny_subaction_redirect_protocol_dns_type_a(self, domain, nameservers, conn_timeout):
|
||
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
|
||
request.query()
|
||
|
||
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
|
||
if not is_error_info_none[0]:
|
||
return False, is_error_info_none[1]
|
||
|
||
is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333)
|
||
if not is_ttl_equal[0]:
|
||
return False, is_ttl_equal[1]
|
||
|
||
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4")
|
||
if not is_address_equal[0]:
|
||
return False, is_address_equal[1]
|
||
|
||
return True, None
|
||
|
||
def action_deny_subaction_redirect_protocol_dns_type_aaaa(self, domain, nameservers, conn_timeout):
|
||
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
|
||
request.query()
|
||
|
||
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
|
||
if not is_error_info_none[0]:
|
||
return False, is_error_info_none[1]
|
||
|
||
is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333)
|
||
if not is_ttl_equal[0]:
|
||
return False, is_ttl_equal[1]
|
||
|
||
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6")
|
||
if not is_address_equal[0]:
|
||
return False, is_address_equal[1]
|
||
|
||
return True, None
|
||
|
||
def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(self, domain, nameservers, conn_timeout):
|
||
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
|
||
request.query()
|
||
|
||
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
|
||
if not is_error_info_none[0]:
|
||
return False, is_error_info_none[1]
|
||
|
||
is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500)
|
||
if not is_ttl_equal[0]:
|
||
return False, is_ttl_equal[1]
|
||
|
||
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4")
|
||
if not is_address_equal[0]:
|
||
return False, is_address_equal[1]
|
||
|
||
return True, None
|
||
|
||
def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(self, domain, nameservers, conn_timeout):
|
||
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
|
||
request.query()
|
||
|
||
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
|
||
if not is_error_info_none[0]:
|
||
return False, is_error_info_none[1]
|
||
|
||
is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500)
|
||
if not is_ttl_equal[0]:
|
||
return False, is_ttl_equal[1]
|
||
|
||
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6")
|
||
if not is_address_equal[0]:
|
||
return False, is_address_equal[1]
|
||
|
||
return True, None
|
||
|
||
class ResultExportBuilder:
|
||
def __init__(self):
|
||
self._exporter = None
|
||
self._create_exporter()
|
||
|
||
def _create_exporter(self):
|
||
self._exporter = PrettyTable()
|
||
self._exporter.vrules = NONE
|
||
self._exporter.field_names = ["Case name", "Status", "Description"]
|
||
self._exporter.align["Case name"] = "l"
|
||
self._exporter.align["Status"] = "l"
|
||
self._exporter.align["Description"] = "l"
|
||
|
||
def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True):
|
||
status, description = self._convert_case_result(case_result)
|
||
if enable_ouptut_by_case:
|
||
self._output_by_case_name(case_name, status)
|
||
|
||
if not case_result[0]:
|
||
self._add_exporter_row(case_name, status, description)
|
||
|
||
def _convert_case_result(self, case_result):
|
||
status = "ok"
|
||
if not case_result[0]:
|
||
status = "FAIL"
|
||
|
||
description = "-"
|
||
if case_result[1] is not None:
|
||
description = case_result[1]
|
||
|
||
return status, description
|
||
|
||
def _output_by_case_name(self, case_name, case_status):
|
||
print(f"{case_name} ... {case_status}")
|
||
|
||
def _add_exporter_row(self, case_name: str, status: str, description: str):
|
||
self._exporter.add_row([case_name, status, description])
|
||
|
||
def export(self):
|
||
if len(self._exporter._rows) > 0:
|
||
print(self._exporter)
|
||
|
||
|
||
class DiagnoseCasesRunner:
|
||
def __init__(self, cmd_parser: CommandParser):
|
||
self._loop = cmd_parser.loop
|
||
self._count = cmd_parser.count
|
||
self._interval_s = cmd_parser.interval_s
|
||
self._service_function_indexs = cmd_parser.service_function_indexs
|
||
self._case_names_regexp = cmd_parser.case_names_regexp
|
||
self._config_path = cmd_parser.config_path
|
||
self._config_loader = ConfigLoader(self._config_path)
|
||
|
||
self._proxy_case = ProxyCasesRunner()
|
||
self._firewall_case = FirewallCasesRunner()
|
||
self._cases_info = [
|
||
{
|
||
"name": "test_firewallBypass_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_bypass_protocol_https,
|
||
"request_content": "https://sha384.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https,
|
||
"request_content": "https://sha256.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslCerterrExpired",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
|
||
"request_content": "https://expired.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslCerterrSelfsigned",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
|
||
"request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslCerterrUntrustedroot",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_cert_error,
|
||
"request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_proxyRedirect_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._proxy_case.action_redirect_protocol_https,
|
||
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js"
|
||
},
|
||
{
|
||
"name": "test_proxyBlock_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._proxy_case.action_block_protocol_https,
|
||
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js"
|
||
},
|
||
{
|
||
"name": "test_proxyReplace_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._proxy_case.action_replace_protocol_https,
|
||
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js"
|
||
},
|
||
{
|
||
"name": "test_proxyHijack_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._proxy_case.action_hijack_protocol_https,
|
||
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js"
|
||
},
|
||
{
|
||
"name": "test_proxyInsert_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._proxy_case.action_insert_protocol_https,
|
||
"request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html"
|
||
},
|
||
{
|
||
"name": "test_proxyRedirect_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._proxy_case.action_redirect_protocol_http,
|
||
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js"
|
||
},
|
||
{
|
||
"name": "test_proxyBlock_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._proxy_case.action_block_protocol_http,
|
||
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js"
|
||
},
|
||
{
|
||
"name": "test_proxyReplace_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._proxy_case.action_replace_protocol_http,
|
||
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js"
|
||
},
|
||
{
|
||
"name": "test_proxyHijack_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._proxy_case.action_hijack_protocol_http,
|
||
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js"
|
||
},
|
||
{
|
||
"name": "test_proxyInsert_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._proxy_case.action_insert_protocol_http,
|
||
"request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html"
|
||
},
|
||
{
|
||
"name": "test_firewallAllow_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._firewall_case.action_allow_protocol_http,
|
||
"request_content": "http://http.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyDrop_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_http,
|
||
"request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyReset_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http,
|
||
"request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyBlock_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._firewall_case.action_deny_subaction_block_protocol_http,
|
||
"request_content": "http://http-login.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallAllow_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_allow_protocol_https,
|
||
"request_content": "https://sha512.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyDrop_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_https,
|
||
"request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyReset_ssl",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_https,
|
||
"request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyDrop_dns",
|
||
"protocol_type": "dns",
|
||
"test_function": self._firewall_case.action_deny_subaction_drop_protocol_dns,
|
||
"request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyRedirectA_dns",
|
||
"protocol_type": "dns",
|
||
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a,
|
||
"request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyRedirectAAAA_dns",
|
||
"protocol_type": "dns",
|
||
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa,
|
||
"request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyRedirectARangeTTL_dns",
|
||
"protocol_type": "dns",
|
||
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl,
|
||
"request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyRedirectAAAARangeTTL_dns",
|
||
"protocol_type": "dns",
|
||
"test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl,
|
||
"request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize1k",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_1k,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize4k",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_4k,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize16k",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_16k,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize64k",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_64k,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize256k",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_256k,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize1M",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_1M,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize4M",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_4M,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize16M",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_16M,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M"
|
||
},
|
||
{
|
||
"name": "test_firewallIntercept_sslDownloadSize64M",
|
||
"protocol_type": "https",
|
||
"test_function": self._firewall_case.action_intercept_protocol_https_download_size_64M,
|
||
"request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyResetFilterHost_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_host,
|
||
"request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_firewallDenyResetFilterURL_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_url,
|
||
"request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_proxyDenyFilterHost_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._proxy_case.action_deny_protocol_http_filter_host,
|
||
"request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website"
|
||
},
|
||
{
|
||
"name": "test_proxyDenyFilterURL_http",
|
||
"protocol_type": "http",
|
||
"test_function": self._proxy_case.action_deny_protocol_http_filter_url,
|
||
"request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website"
|
||
}
|
||
]
|
||
|
||
def run_cases(self):
|
||
run_count = 1
|
||
while True:
|
||
print("\nRUN %d" % run_count)
|
||
self.__run_cases_by_service_function_ids()
|
||
if (self._loop is not True) and self._count >= run_count:
|
||
break
|
||
time.sleep(self._interval_s)
|
||
run_count = run_count + 1
|
||
|
||
def __run_cases_by_service_function_ids(self):
|
||
for sf_id in self._service_function_indexs:
|
||
start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
print(format(("Service function id:" + str(sf_id) + ",Test start time: " + start_timestamp),'#^70s'))
|
||
|
||
self._run_cases_in_one_service_function_id(sf_id)
|
||
|
||
end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
print(format(("Service function id:" + str(sf_id) + ",Test end time: " + end_timestamp),'=^70s'))
|
||
|
||
def _run_cases_in_one_service_function_id(self, service_function_id):
|
||
resolve_Builder = ServerAddressBuilder(service_function_id)
|
||
exporter = ResultExportBuilder()
|
||
for case_info in self._cases_info:
|
||
if re.fullmatch(self._case_names_regexp, case_info["name"]):
|
||
self._run_one_case(case_info, resolve_Builder, exporter)
|
||
exporter.export()
|
||
|
||
def _run_one_case(self, case_info, resolve_Builder, exporter):
|
||
conn_timeout = self._get_conn_timeout_from_configs(case_info["name"])
|
||
max_recv_speed_large = self._get_max_recv_speed_large_from_configs(case_info["name"])
|
||
|
||
test_func = case_info.get("test_function")
|
||
if test_func:
|
||
if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https":
|
||
ret = test_func(case_info["request_content"], resolve_Builder.resolves, conn_timeout, max_recv_speed_large)
|
||
|
||
if case_info["protocol_type"] == "dns":
|
||
ret = test_func(case_info["request_content"], resolve_Builder.nameservers, conn_timeout)
|
||
|
||
exporter.append_case_result(case_info["name"], ret)
|
||
|
||
def _get_conn_timeout_from_configs(self, section) -> int:
|
||
return self._config_loader.get_value_by_section_and_key(section, "conn_timeout")
|
||
|
||
def _get_max_recv_speed_large_from_configs(self, section) -> int:
|
||
return self._config_loader.get_value_by_section_and_key(section, "max_recv_speed_large")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
cmd_parser = CommandParser()
|
||
dign = DiagnoseCasesRunner(cmd_parser)
|
||
dign.run_cases()
|