refactor: Change assert class function to staticmethod.

This commit is contained in:
fumingwei
2024-06-04 17:19:01 +08:00
parent e95831b44b
commit d3950a86ae

View File

@@ -262,15 +262,16 @@ class TcpPacketsCapture:
else:
return None
class TcpPacketsCaptureAnalyzer:
def is_dscp_equal(self, present_dscp, desired_dscp):
if present_dscp is None:
class TcpPacketsCaptureAssertion:
@staticmethod
def is_dscp_equal(actual_dscp, expected_dscp):
if actual_dscp is None:
return False, f"Error: Not read DSCP value."
if present_dscp == desired_dscp:
if actual_dscp == expected_dscp:
return True, None
else:
return False, f"Error: Failed to verify DSCP value. Present DSCP: {present_dscp}, desired DSCP: {desired_dscp}."
return False, f"Error: Failed to verify DSCP value. Actual DSCP: {actual_dscp}, expected DSCP: {expected_dscp}."
class URLTransferBuilder:
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed: int):
@@ -426,8 +427,9 @@ class DNSQueryTypeAAAABuilder(DNSQueryBuilder):
self._query("AAAA")
class URLTransferResponseAnalyzer:
def is_cert_issuer_matched(self, cert_issuer, regex):
class URLTransferResponseAssertion:
@staticmethod
def is_cert_issuer_matched(cert_issuer, regex):
if cert_issuer is None:
return False, f"Error: Failed to verify cert issuer. Actual cert issuer is None."
if re.search(regex, cert_issuer, 0):
@@ -435,41 +437,46 @@ class URLTransferResponseAnalyzer:
else:
return False, f"Error: Failed to verify cert issuer. Actual cert issuer: {cert_issuer}."
def is_response_code_equal(self, actual_code, expected_code):
@staticmethod
def is_response_code_equal(actual_code, expected_code):
if actual_code == expected_code:
return True, None
else:
return False, f"Error: Failed to verfiy response code. Actual code: {actual_code}, expected code: {expected_code}."
def is_response_body_matched(self, response_body, regex):
@staticmethod
def is_response_body_matched(response_body, regex):
response_body_utf8 = response_body.decode('utf-8')
if re.search(regex, response_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body fail to match regex: {regex}."
def is_response_body_not_matched(self, response_body, regex):
@staticmethod
def is_response_body_not_matched(response_body, regex):
response_body_utf8 = response_body.decode('utf-8')
if not re.search(regex, response_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body matched the regex: {regex}."
def is_response_body_md5_equal(self, response_body, expected_md5):
@staticmethod
def is_response_body_md5_equal(response_body, expected_md5):
response_body_md5_value = hashlib.md5(response_body).hexdigest()
if expected_md5 == response_body_md5_value:
return True, None
else:
return False, f"Error: The response body md5 fail to match. Actual md5 value: {response_body_md5_value}."
def is_download_size_equal(self, actual_size, expected_size):
@staticmethod
def is_download_size_equal(actual_size, expected_size):
if actual_size == expected_size:
return True, None
else:
return False, f"Error: The response body download size fail to match. Actual size: {actual_size}."
def is_pycurl_error_code_equal(self, error_info, expected_error_code):
@staticmethod
def is_pycurl_error_code_equal(error_info, expected_error_code):
if error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
if error_info.args[0] == expected_error_code:
@@ -477,19 +484,22 @@ class URLTransferResponseAnalyzer:
else:
return False, f"Error: The erro code not equal to desired. Actual error info: {error_info}."
def is_pycurl_error_none(self, error_info):
@staticmethod
def is_pycurl_error_none(error_info):
if error_info is None:
return True, None
else:
return False, f"Error: The pycurl error is not None. Actual error info: {error_info}."
class DNSResponseAnalyzer:
def is_error_info_none(self, error_info):
class DNSResponseAssertion:
@staticmethod
def is_error_info_none(error_info):
if error_info is None:
return True, None
return False, f"Error: The error info: {error_info}."
def is_error_type_equal(self, error_info, expected_type):
@staticmethod
def is_error_type_equal(error_info, expected_type):
if error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
@@ -498,17 +508,20 @@ class DNSResponseAnalyzer:
else:
return False, f"Error: error type not equal to {expected_type}, error info: {error_info}."
def is_ttl_equal(self, actual_ttl, expected_ttl):
@staticmethod
def is_ttl_equal(actual_ttl, expected_ttl):
if actual_ttl == expected_ttl:
return True, None
return False, f"Error: Actual ttl(%s) not equal to %d." %(actual_ttl, expected_ttl)
def is_ttl_in_range(self, actual_ttl, expected_left_edge, expected_right_edge):
@staticmethod
def is_ttl_in_range(actual_ttl, expected_left_edge, expected_right_edge):
if actual_ttl >= expected_left_edge and actual_ttl <= expected_right_edge:
return True, None
return False, f"Error: ttl(%d) not in [%d-%d]." %(actual_ttl, expected_left_edge, expected_right_edge)
def is_rdtype_address_pair_included(self, rdtype_address_pairs, expected_rdtype, expected_address):
@staticmethod
def is_rdtype_address_pair_included(rdtype_address_pairs, expected_rdtype, expected_address):
#rdtype 1: ipv4, 28: ipv6.
for pair in rdtype_address_pairs:
if pair["address"] == expected_address and pair["rdtype"] == expected_rdtype:
@@ -516,29 +529,26 @@ class DNSResponseAnalyzer:
return False, f"Expected rdtype address pair[{expected_rdtype}, {expected_address}] not in {rdtype_address_pairs}."
class ProxyCasesRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
def action_intercept_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
return True, None
def action_intercept_protocol_https_cert_error(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b')
if not status:
return False, info
return True, None
def action_intercept_protocol_https_download_size_1k(self, url, resolves, conn_timeout, max_recv_speed):
@@ -571,218 +581,211 @@ class ProxyCasesRunner:
def _action_intercept_protocol_ssl_by_download_size(self, url, resolves, conn_timeout, max_recv_speed, download_size):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_download_size_equal = self._analyzer.is_download_size_equal(conn.size_download, download_size)
if not is_download_size_equal[0]:
return False, is_download_size_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_download_size_equal(conn.size_download, download_size)
if not status:
return False, info
return True, None
def action_redirect_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
desired_cert_issuer_pattern = r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b'
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302)
if not is_code_equal[0]:
return False, is_code_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 302)
if not status:
return False, info
return True, None
def action_redirect_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302)
if not is_code_equal[0]:
return False, is_code_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 302)
if not status:
return False, info
return True, None
def action_block_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not is_body_matched[0]:
return False, is_body_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not status:
return False, info
return True, None
def action_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not is_body_matched[0]:
return False, is_body_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B')
if not status:
return False, info
return True, None
def action_replace_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not is_body_matched[0]:
return False, is_body_matched[1]
is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not is_body_matched[0]:
return False, is_body_not_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not status:
return False, info
return True, None
def action_replace_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not is_body_matched[0]:
return False, is_body_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E')
if not status:
return False, info
is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not is_body_matched[0]:
return False, is_body_not_matched[1]
status, info = URLTransferResponseAssertion.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared')
if not status:
return False, info
return True, None
def action_hijack_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not is_code_equal[0]:
return False, is_code_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not status:
return False, info
return True, None
def action_hijack_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not is_code_equal[0]:
return False, is_code_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d")
if not status:
return False, info
return True, None
def action_insert_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
if not status:
return False, info
is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not is_json_key_matched[0]:
return False, is_json_key_matched[1]
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not status:
return False, info
is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not is_json_val_matched[0]:
return False, is_json_val_matched[1]
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not status:
return False, info
return True, None
def action_insert_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not is_json_key_matched[0]:
return False, is_json_key_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert')
if not status:
return False, info
is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not is_json_val_matched[0]:
return False, is_json_key_matched[1]
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330')
if not status:
return False, info
return True, None
def action_deny_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host')
if not is_body_matched[0]:
return False, is_body_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host')
if not status:
return False, info
return True, None
def action_deny_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url')
if not is_body_matched[0]:
return False, is_body_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 404)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url')
if not status:
return False, info
return True, None
class ShapingCaseRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
self._dns_analyzer = DNSResponseAnalyzer()
self._capture_analyzer = TcpPacketsCaptureAnalyzer()
def rate_limit_0bps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
return True, None
def rate_limit_0bps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
return True, None
def rate_limit_1000gbps_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
@@ -792,16 +795,16 @@ class ShapingCaseRunner:
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
capture.stop()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
present_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
is_dscp_equal = self._capture_analyzer.is_dscp_equal(present_dscp, 8)
if not is_dscp_equal[0]:
return False, is_dscp_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
actual_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
status, info = TcpPacketsCaptureAssertion.is_dscp_equal(actual_dscp, 8)
if not status:
return False, info
return True, None
def rate_limit_1000gbps_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
@@ -811,19 +814,19 @@ class ShapingCaseRunner:
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
capture.stop()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
present_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
is_dscp_equal = self._capture_analyzer.is_dscp_equal(present_dscp, 8)
if not is_dscp_equal[0]:
return False, is_dscp_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
actual_dscp = capture.read_dscp_value_by_quadruple(conn.quadruple)
status, info = TcpPacketsCaptureAssertion.is_dscp_equal(actual_dscp, 8)
if not status:
return False, info
return True, None
def _read_server_ip_and_port_from_resolve(self, resolves):
@@ -832,46 +835,42 @@ class ShapingCaseRunner:
return resolve_split[2], resolve_split[1]
class FirewallCasesRunner:
def __init__(self) -> None:
self._analyzer = URLTransferResponseAnalyzer()
self._dns_analyzer = DNSResponseAnalyzer()
def action_bypass_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
return True, None
def action_allow_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
return True, None
def action_deny_subaction_drop_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
return True, None
def action_deny_subaction_reset_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 56)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 56)
if not status:
return False, info
return True, None
def action_deny_subaction_reset_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed):
@@ -883,71 +882,71 @@ class FirewallCasesRunner:
def action_deny_subaction_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 403)
if not is_code_equal[0]:
return False, is_code_equal[1]
is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r"dign-testing-deny-block")
if not is_body_matched[0]:
return False, is_body_matched[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 403)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_body_matched(conn.response_body, r"dign-testing-deny-block")
if not status:
return False, info
return True, None
def action_allow_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info)
if not is_error_none[0]:
return False, is_error_none[1]
is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not is_cert_matched[0]:
return False, is_cert_matched[1]
is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200)
if not is_code_equal[0]:
return False, is_code_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b')
if not status:
return False, info
status, info = URLTransferResponseAssertion.is_response_code_equal(conn.response_code, 200)
if not status:
return False, info
return True, None
def action_deny_subaction_drop_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 28)
if not status:
return False, info
return True, None
def action_deny_subaction_reset_protocol_https(self, url, resolves, conn_timeout, max_recv_speed):
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
conn.connect()
is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 35)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
status, info = URLTransferResponseAssertion.is_pycurl_error_code_equal(conn.error_info, 35)
if not status:
return False, info
return True, None
def action_deny_subaction_drop_protocol_dns(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_type_equal = self._dns_analyzer.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout)
if not is_error_type_equal[0]:
return False, is_error_type_equal[1]
status, info = DNSResponseAssertion.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout)
if not status:
return False, info
return True, None
def action_deny_subaction_redirect_protocol_dns_type_a(self, domain, nameservers, conn_timeout):
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
status, info = DNSResponseAssertion.is_ttl_equal(request.rrset_ttl, 333)
if not status:
return False, info
is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not is_rdtype_address_pair_included[0]:
return False, is_rdtype_address_pair_included[1]
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not status:
return False, info
return True, None
@@ -955,17 +954,17 @@ class FirewallCasesRunner:
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
status, info = DNSResponseAssertion.is_ttl_equal(request.rrset_ttl, 333)
if not status:
return False, info
is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not is_rdtype_address_pair_included[0]:
return False, is_rdtype_address_pair_included[1]
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not status:
return False, info
return True, None
@@ -973,17 +972,17 @@ class FirewallCasesRunner:
request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
status, info = DNSResponseAssertion.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not status:
return False, info
is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not is_rdtype_address_pair_included[0]:
return False, is_rdtype_address_pair_included[1]
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not status:
return False, info
return True, None
@@ -991,17 +990,17 @@ class FirewallCasesRunner:
request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout)
request.query()
is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info)
if not is_error_info_none[0]:
return False, is_error_info_none[1]
status, info = DNSResponseAssertion.is_error_info_none(request.error_info)
if not status:
return False, info
is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
status, info = DNSResponseAssertion.is_ttl_in_range(request.rrset_ttl, 400, 500)
if not status:
return False, info
is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not is_rdtype_address_pair_included[0]:
return False, is_rdtype_address_pair_included[1]
status, info = DNSResponseAssertion.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not status:
return False, info
return True, None