refactor: Rename assert function args.

This commit is contained in:
fumingwei
2024-06-04 15:58:44 +08:00
parent fa40b16818
commit e95831b44b

View File

@@ -408,6 +408,15 @@ class DNSQueryBuilder:
return self._dns_answer.response.answer
return None
@property
def rdtype_address_pairs(self):
rdtype_address_pairs = []
if self._dns_answer is not None:
for answer in self._dns_answer.response.answer:
for item in answer.items:
rdtype_address_pairs.append({"rdtype": item.rdtype, "address": item.address})
return rdtype_address_pairs
class DNSQueryTypeABuilder(DNSQueryBuilder):
def query(self):
self._query("A")
@@ -418,102 +427,93 @@ class DNSQueryTypeAAAABuilder(DNSQueryBuilder):
class URLTransferResponseAnalyzer:
def is_cert_issuer_matched(self, present_cert_issuer, desired_pattern):
if present_cert_issuer is None:
return False, f"Error: Failed to verify cert issuer. Present cert issuer is None."
if re.search(desired_pattern, present_cert_issuer, 0):
def is_cert_issuer_matched(self, cert_issuer, regex):
if cert_issuer is None:
return False, f"Error: Failed to verify cert issuer. Actual cert issuer is None."
if re.search(regex, cert_issuer, 0):
return True, None
else:
return False, f"Error: Failed to verify cert issuer. Present cert issuer: {present_cert_issuer}."
return False, f"Error: Failed to verify cert issuer. Actual cert issuer: {cert_issuer}."
def is_response_code_equal(self, present_code, desired_code):
if present_code == desired_code:
def is_response_code_equal(self, actual_code, expected_code):
if actual_code == expected_code:
return True, None
else:
return False, f"Error: Failed to verfiy response code. Present code: {present_code}, desired code: {desired_code}."
return False, f"Error: Failed to verfiy response code. Actual code: {actual_code}, expected code: {expected_code}."
def is_response_body_matched(self, present_body, desired_pattern):
present_body_utf8 = present_body.decode('utf-8')
if re.search(desired_pattern, present_body_utf8, 0):
def is_response_body_matched(self, response_body, regex):
response_body_utf8 = response_body.decode('utf-8')
if re.search(regex, response_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body fail to match the desired content."
return False, f"Error: The response body fail to match regex: {regex}."
def is_response_body_not_matched(self, present_body, desired_pattern):
present_body_utf8 = present_body.decode('utf-8')
if not re.search(desired_pattern, present_body_utf8, 0):
def is_response_body_not_matched(self, response_body, regex):
response_body_utf8 = response_body.decode('utf-8')
if not re.search(regex, response_body_utf8, 0):
return True, None
else:
return False, f"Error: The response body matched the desired content: {desired_pattern}."
return False, f"Error: The response body matched the regex: {regex}."
def is_response_body_md5_equal(self, present_body, disired_md5):
present_body_md5_value = hashlib.md5(present_body).hexdigest()
if re.search(disired_md5, present_body_md5_value, 0):
def is_response_body_md5_equal(self, response_body, expected_md5):
response_body_md5_value = hashlib.md5(response_body).hexdigest()
if expected_md5 == response_body_md5_value:
return True, None
else:
return False, f"Error: The response body md5 fail to match. Present md5 value: {present_body_md5_value}."
return False, f"Error: The response body md5 fail to match. Actual md5 value: {response_body_md5_value}."
def is_download_size_equal(self, present_size, disired_size):
if present_size == disired_size:
def is_download_size_equal(self, actual_size, expected_size):
if actual_size == expected_size:
return True, None
else:
return False, f"Error: The response body download size fail to match. present_size: {present_size}."
return False, f"Error: The response body download size fail to match. Actual size: {actual_size}."
def is_pycurl_error_code_equal(self, present_error_info, disired_error_code):
if present_error_info is None:
def is_pycurl_error_code_equal(self, error_info, expected_error_code):
if error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
if present_error_info.args[0] == disired_error_code:
if error_info.args[0] == expected_error_code:
return True, None
else:
return False, f"Error: The erro code not equal to desired. Present error info: {present_error_info}."
return False, f"Error: The erro code not equal to desired. Actual error info: {error_info}."
def is_pycurl_error_none(self, present_error_info):
if present_error_info is None:
def is_pycurl_error_none(self, error_info):
if error_info is None:
return True, None
else:
return False, f"Error: The pycurl error is not None. Present error info: {present_error_info}."
return False, f"Error: The pycurl error is not None. Actual error info: {error_info}."
class DNSResponseAnalyzer:
def is_error_info_none(self, present_error_info):
if present_error_info is None:
def is_error_info_none(self, error_info):
if error_info is None:
return True, None
return False, f"Error: The error info: {present_error_info}."
return False, f"Error: The error info: {error_info}."
def is_error_type_equal(self, present_error_info, desired_type):
if present_error_info is None:
def is_error_type_equal(self, error_info, expected_type):
if error_info is None:
return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect."
if type(present_error_info) == desired_type:
if type(error_info) == expected_type:
return True, None
else:
return False, f"Error: error type not equal to {desired_type}, error info: {present_error_info}."
return False, f"Error: error type not equal to {expected_type}, error info: {error_info}."
def is_ttl_equal(self, present_ttl, desired_ttl):
if present_ttl == desired_ttl:
def is_ttl_equal(self, actual_ttl, expected_ttl):
if actual_ttl == expected_ttl:
return True, None
return False, f"Error: Present ttl(%s) not equal to %d." %(present_ttl, desired_ttl)
return False, f"Error: Actual ttl(%s) not equal to %d." %(actual_ttl, expected_ttl)
def is_ttl_in_range(self, present_ttl, desired_left_edge, desired_right_edge):
if present_ttl >= desired_left_edge and present_ttl <= desired_right_edge:
def is_ttl_in_range(self, actual_ttl, expected_left_edge, expected_right_edge):
if actual_ttl >= expected_left_edge and actual_ttl <= expected_right_edge:
return True, None
return False, f"Error: ttl(%d) not in [%d-%d]." %(present_ttl, desired_left_edge, desired_right_edge)
return False, f"Error: ttl(%d) not in [%d-%d]." %(actual_ttl, expected_left_edge, expected_right_edge)
def is_address_equal(self, response_answer, desired_address, desired_address_type):
if desired_address_type == "ipv4":
rdtype = 1
if desired_address_type == "ipv6":
rdtype = 28
for i in response_answer:
for j in i.items:
if j.rdtype == rdtype: # 1: ipv4, 28: ipv6
if j.address == desired_address:
return True, None
else:
return False, f"Error: Present address error."
else:
return False, f"Error: Response rdtype error."
def is_rdtype_address_pair_included(self, rdtype_address_pairs, expected_rdtype, expected_address):
#rdtype 1: ipv4, 28: ipv6.
for pair in rdtype_address_pairs:
if pair["address"] == expected_address and pair["rdtype"] == expected_rdtype:
return True, None
return False, f"Expected rdtype address pair[{expected_rdtype}, {expected_address}] not in {rdtype_address_pairs}."
class ProxyCasesRunner:
def __init__(self) -> None:
@@ -945,9 +945,9 @@ class FirewallCasesRunner:
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4")
if not is_address_equal[0]:
return False, is_address_equal[1]
is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not is_rdtype_address_pair_included[0]:
return False, is_rdtype_address_pair_included[1]
return True, None
@@ -963,9 +963,9 @@ class FirewallCasesRunner:
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6")
if not is_address_equal[0]:
return False, is_address_equal[1]
is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not is_rdtype_address_pair_included[0]:
return False, is_rdtype_address_pair_included[1]
return True, None
@@ -981,9 +981,9 @@ class FirewallCasesRunner:
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4")
if not is_address_equal[0]:
return False, is_address_equal[1]
is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101")
if not is_rdtype_address_pair_included[0]:
return False, is_rdtype_address_pair_included[1]
return True, None
@@ -999,9 +999,9 @@ class FirewallCasesRunner:
if not is_ttl_equal[0]:
return False, is_ttl_equal[1]
is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6")
if not is_address_equal[0]:
return False, is_address_equal[1]
is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001")
if not is_rdtype_address_pair_included[0]:
return False, is_rdtype_address_pair_included[1]
return True, None