From e95831b44b4b49c8e172e37efe8cdf3d87d826b2 Mon Sep 17 00:00:00 2001 From: fumingwei Date: Tue, 4 Jun 2024 15:58:44 +0800 Subject: [PATCH] refactor: Rename assert function args. --- images_build/client/dign_client/bin/client.py | 140 +++++++++--------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/images_build/client/dign_client/bin/client.py b/images_build/client/dign_client/bin/client.py index ae1d912..47ce883 100644 --- a/images_build/client/dign_client/bin/client.py +++ b/images_build/client/dign_client/bin/client.py @@ -408,6 +408,15 @@ class DNSQueryBuilder: return self._dns_answer.response.answer return None + @property + def rdtype_address_pairs(self): + rdtype_address_pairs = [] + if self._dns_answer is not None: + for answer in self._dns_answer.response.answer: + for item in answer.items: + rdtype_address_pairs.append({"rdtype": item.rdtype, "address": item.address}) + return rdtype_address_pairs + class DNSQueryTypeABuilder(DNSQueryBuilder): def query(self): self._query("A") @@ -418,102 +427,93 @@ class DNSQueryTypeAAAABuilder(DNSQueryBuilder): class URLTransferResponseAnalyzer: - def is_cert_issuer_matched(self, present_cert_issuer, desired_pattern): - if present_cert_issuer is None: - return False, f"Error: Failed to verify cert issuer. Present cert issuer is None." - if re.search(desired_pattern, present_cert_issuer, 0): + def is_cert_issuer_matched(self, cert_issuer, regex): + if cert_issuer is None: + return False, f"Error: Failed to verify cert issuer. Actual cert issuer is None." + if re.search(regex, cert_issuer, 0): return True, None else: - return False, f"Error: Failed to verify cert issuer. Present cert issuer: {present_cert_issuer}." + return False, f"Error: Failed to verify cert issuer. Actual cert issuer: {cert_issuer}." - def is_response_code_equal(self, present_code, desired_code): - if present_code == desired_code: + def is_response_code_equal(self, actual_code, expected_code): + if actual_code == expected_code: return True, None else: - return False, f"Error: Failed to verfiy response code. Present code: {present_code}, desired code: {desired_code}." + return False, f"Error: Failed to verfiy response code. Actual code: {actual_code}, expected code: {expected_code}." - def is_response_body_matched(self, present_body, desired_pattern): - present_body_utf8 = present_body.decode('utf-8') - if re.search(desired_pattern, present_body_utf8, 0): + def is_response_body_matched(self, response_body, regex): + response_body_utf8 = response_body.decode('utf-8') + if re.search(regex, response_body_utf8, 0): return True, None else: - return False, f"Error: The response body fail to match the desired content." + return False, f"Error: The response body fail to match regex: {regex}." - def is_response_body_not_matched(self, present_body, desired_pattern): - present_body_utf8 = present_body.decode('utf-8') - if not re.search(desired_pattern, present_body_utf8, 0): + def is_response_body_not_matched(self, response_body, regex): + response_body_utf8 = response_body.decode('utf-8') + if not re.search(regex, response_body_utf8, 0): return True, None else: - return False, f"Error: The response body matched the desired content: {desired_pattern}." + return False, f"Error: The response body matched the regex: {regex}." - def is_response_body_md5_equal(self, present_body, disired_md5): - present_body_md5_value = hashlib.md5(present_body).hexdigest() - if re.search(disired_md5, present_body_md5_value, 0): + def is_response_body_md5_equal(self, response_body, expected_md5): + response_body_md5_value = hashlib.md5(response_body).hexdigest() + if expected_md5 == response_body_md5_value: return True, None else: - return False, f"Error: The response body md5 fail to match. Present md5 value: {present_body_md5_value}." + return False, f"Error: The response body md5 fail to match. Actual md5 value: {response_body_md5_value}." - def is_download_size_equal(self, present_size, disired_size): - if present_size == disired_size: + def is_download_size_equal(self, actual_size, expected_size): + if actual_size == expected_size: return True, None else: - return False, f"Error: The response body download size fail to match. present_size: {present_size}." + return False, f"Error: The response body download size fail to match. Actual size: {actual_size}." - def is_pycurl_error_code_equal(self, present_error_info, disired_error_code): - if present_error_info is None: + def is_pycurl_error_code_equal(self, error_info, expected_error_code): + if error_info is None: return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." - if present_error_info.args[0] == disired_error_code: + if error_info.args[0] == expected_error_code: return True, None else: - return False, f"Error: The erro code not equal to desired. Present error info: {present_error_info}." + return False, f"Error: The erro code not equal to desired. Actual error info: {error_info}." - def is_pycurl_error_none(self, present_error_info): - if present_error_info is None: + def is_pycurl_error_none(self, error_info): + if error_info is None: return True, None else: - return False, f"Error: The pycurl error is not None. Present error info: {present_error_info}." + return False, f"Error: The pycurl error is not None. Actual error info: {error_info}." class DNSResponseAnalyzer: - def is_error_info_none(self, present_error_info): - if present_error_info is None: + def is_error_info_none(self, error_info): + if error_info is None: return True, None - return False, f"Error: The error info: {present_error_info}." + return False, f"Error: The error info: {error_info}." - def is_error_type_equal(self, present_error_info, desired_type): - if present_error_info is None: + def is_error_type_equal(self, error_info, expected_type): + if error_info is None: return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." - if type(present_error_info) == desired_type: + if type(error_info) == expected_type: return True, None else: - return False, f"Error: error type not equal to {desired_type}, error info: {present_error_info}." + return False, f"Error: error type not equal to {expected_type}, error info: {error_info}." - def is_ttl_equal(self, present_ttl, desired_ttl): - if present_ttl == desired_ttl: + def is_ttl_equal(self, actual_ttl, expected_ttl): + if actual_ttl == expected_ttl: return True, None - return False, f"Error: Present ttl(%s) not equal to %d." %(present_ttl, desired_ttl) + return False, f"Error: Actual ttl(%s) not equal to %d." %(actual_ttl, expected_ttl) - def is_ttl_in_range(self, present_ttl, desired_left_edge, desired_right_edge): - if present_ttl >= desired_left_edge and present_ttl <= desired_right_edge: + def is_ttl_in_range(self, actual_ttl, expected_left_edge, expected_right_edge): + if actual_ttl >= expected_left_edge and actual_ttl <= expected_right_edge: return True, None - return False, f"Error: ttl(%d) not in [%d-%d]." %(present_ttl, desired_left_edge, desired_right_edge) + return False, f"Error: ttl(%d) not in [%d-%d]." %(actual_ttl, expected_left_edge, expected_right_edge) - def is_address_equal(self, response_answer, desired_address, desired_address_type): - if desired_address_type == "ipv4": - rdtype = 1 - if desired_address_type == "ipv6": - rdtype = 28 - - for i in response_answer: - for j in i.items: - if j.rdtype == rdtype: # 1: ipv4, 28: ipv6 - if j.address == desired_address: - return True, None - else: - return False, f"Error: Present address error." - else: - return False, f"Error: Response rdtype error." + def is_rdtype_address_pair_included(self, rdtype_address_pairs, expected_rdtype, expected_address): + #rdtype 1: ipv4, 28: ipv6. + for pair in rdtype_address_pairs: + if pair["address"] == expected_address and pair["rdtype"] == expected_rdtype: + return True, None + return False, f"Expected rdtype address pair[{expected_rdtype}, {expected_address}] not in {rdtype_address_pairs}." class ProxyCasesRunner: def __init__(self) -> None: @@ -945,9 +945,9 @@ class FirewallCasesRunner: if not is_ttl_equal[0]: return False, is_ttl_equal[1] - is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4") - if not is_address_equal[0]: - return False, is_address_equal[1] + is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101") + if not is_rdtype_address_pair_included[0]: + return False, is_rdtype_address_pair_included[1] return True, None @@ -963,9 +963,9 @@ class FirewallCasesRunner: if not is_ttl_equal[0]: return False, is_ttl_equal[1] - is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6") - if not is_address_equal[0]: - return False, is_address_equal[1] + is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001") + if not is_rdtype_address_pair_included[0]: + return False, is_rdtype_address_pair_included[1] return True, None @@ -981,9 +981,9 @@ class FirewallCasesRunner: if not is_ttl_equal[0]: return False, is_ttl_equal[1] - is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4") - if not is_address_equal[0]: - return False, is_address_equal[1] + is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 1, "33.252.0.101") + if not is_rdtype_address_pair_included[0]: + return False, is_rdtype_address_pair_included[1] return True, None @@ -999,9 +999,9 @@ class FirewallCasesRunner: if not is_ttl_equal[0]: return False, is_ttl_equal[1] - is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6") - if not is_address_equal[0]: - return False, is_address_equal[1] + is_rdtype_address_pair_included = self._dns_analyzer.is_rdtype_address_pair_included(request.rdtype_address_pairs, 28, "2001:db8::1001") + if not is_rdtype_address_pair_included[0]: + return False, is_rdtype_address_pair_included[1] return True, None