feature:TSG-22325:Add Proxy Intercept chello fragment cases.
This commit is contained in:
@@ -284,11 +284,12 @@ class TcpPacketsCaptureAssertion:
|
||||
return False, f"Error: Failed to verify DSCP value. Actual DSCP: {actual_dscp}, expected DSCP: {expected_dscp}."
|
||||
|
||||
class URLTransferBuilder:
|
||||
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed):
|
||||
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed, tcp_mss=None):
|
||||
self._url = url
|
||||
self._request_resolve = request_resolve
|
||||
self._conn_timeout = conn_timeout
|
||||
self._max_recv_speed = max_recv_speed
|
||||
self._tcp_mss = tcp_mss
|
||||
self._conn = None
|
||||
self._response_code = None
|
||||
self._response_buffer = BytesIO()
|
||||
@@ -301,6 +302,11 @@ class URLTransferBuilder:
|
||||
self._total_time_s = None
|
||||
self._speed_download = None
|
||||
|
||||
def opensocket_callback(self, purpose, address):
|
||||
new_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
new_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_MAXSEG, self._tcp_mss)
|
||||
return new_socket
|
||||
|
||||
def _setup_connection(self):
|
||||
self._response_buffer = BytesIO()
|
||||
self._conn = pycurl.Curl()
|
||||
@@ -310,6 +316,8 @@ class URLTransferBuilder:
|
||||
self._conn.setopt(pycurl.TIMEOUT, self._conn_timeout)
|
||||
if self._max_recv_speed is not None:
|
||||
self._conn.setopt(pycurl.MAX_RECV_SPEED_LARGE, self._max_recv_speed)
|
||||
if self._tcp_mss is not None:
|
||||
self._conn.setopt(pycurl.OPENSOCKETFUNCTION, self.opensocket_callback)
|
||||
|
||||
def _perform_connection(self):
|
||||
self._conn.perform()
|
||||
@@ -369,8 +377,8 @@ class HttpURLTransferBuilder(URLTransferBuilder):
|
||||
super()._perform_connection()
|
||||
|
||||
class HttpsURLTransferBuilder(URLTransferBuilder):
|
||||
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed: int):
|
||||
super().__init__(url, request_resolve, conn_timeout, max_recv_speed)
|
||||
def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed: int, tcp_mss=None):
|
||||
super().__init__(url, request_resolve, conn_timeout, max_recv_speed, tcp_mss)
|
||||
self._certs_info = None
|
||||
|
||||
def _setup_connection(self):
|
||||
@@ -572,6 +580,45 @@ class ProxyCasesRunner:
|
||||
return False, info
|
||||
return True, None
|
||||
|
||||
@staticmethod
|
||||
def action_intercept_protocol_https_chello_fragment_mss_150(url, resolves, conn_timeout, max_recv_speed):
|
||||
tcp_mss = 150
|
||||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed, tcp_mss)
|
||||
conn.connect()
|
||||
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
|
||||
if not status:
|
||||
return False, info
|
||||
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||||
if not status:
|
||||
return False, info
|
||||
return True, None
|
||||
|
||||
@staticmethod
|
||||
def action_intercept_protocol_https_chello_fragment_mss_200(url, resolves, conn_timeout, max_recv_speed):
|
||||
tcp_mss = 200
|
||||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed, tcp_mss)
|
||||
conn.connect()
|
||||
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
|
||||
if not status:
|
||||
return False, info
|
||||
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||||
if not status:
|
||||
return False, info
|
||||
return True, None
|
||||
|
||||
@staticmethod
|
||||
def action_intercept_protocol_https_chello_fragment_mss_300(url, resolves, conn_timeout, max_recv_speed):
|
||||
tcp_mss = 300
|
||||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed, tcp_mss)
|
||||
conn.connect()
|
||||
status, info = URLTransferResponseAssertion.is_pycurl_error_none(conn.error_info)
|
||||
if not status:
|
||||
return False, info
|
||||
status, info = URLTransferResponseAssertion.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b')
|
||||
if not status:
|
||||
return False, info
|
||||
return True, None
|
||||
|
||||
@staticmethod
|
||||
def action_intercept_protocol_https_cert_error(url, resolves, conn_timeout, max_recv_speed):
|
||||
conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed)
|
||||
@@ -1383,6 +1430,30 @@ class DiagnoseCasesRunner:
|
||||
"conn_timeout": 1,
|
||||
"max_recv_speed": 6553600
|
||||
},
|
||||
{
|
||||
"name": "Proxy_Intercept_HTTPS_ChelloFragment_MSS_150",
|
||||
"protocol_type": "https",
|
||||
"test_function": ProxyCasesRunner.action_intercept_protocol_https_chello_fragment_mss_150,
|
||||
"request_content": "https://sha256.badssl.selftest.gdnt-cloud.website",
|
||||
"conn_timeout": 1,
|
||||
"max_recv_speed": 6553600
|
||||
},
|
||||
{
|
||||
"name": "Proxy_Intercept_HTTPS_ChelloFragment_MSS_200",
|
||||
"protocol_type": "https",
|
||||
"test_function": ProxyCasesRunner.action_intercept_protocol_https_chello_fragment_mss_200,
|
||||
"request_content": "https://sha256.badssl.selftest.gdnt-cloud.website",
|
||||
"conn_timeout": 1,
|
||||
"max_recv_speed": 6553600
|
||||
},
|
||||
{
|
||||
"name": "Proxy_Intercept_HTTPS_ChelloFragment_MSS_300",
|
||||
"protocol_type": "https",
|
||||
"test_function": ProxyCasesRunner.action_intercept_protocol_https_chello_fragment_mss_300,
|
||||
"request_content": "https://sha256.badssl.selftest.gdnt-cloud.website",
|
||||
"conn_timeout": 1,
|
||||
"max_recv_speed": 6553600
|
||||
},
|
||||
{
|
||||
"name": "Proxy_Intercept_HTTPS_CertExpired",
|
||||
"protocol_type": "https",
|
||||
|
||||
Reference in New Issue
Block a user