From b60dfca349e1e06e191fb59b6c52bb6685b5e5a0 Mon Sep 17 00:00:00 2001 From: fumingwei Date: Thu, 16 May 2024 17:21:02 +0800 Subject: [PATCH] refactor: Rewrite client.py. --- images_build/client/dign_client/bin/client.py | 2645 ++++++++--------- 1 file changed, 1273 insertions(+), 1372 deletions(-) diff --git a/images_build/client/dign_client/bin/client.py b/images_build/client/dign_client/bin/client.py index f878ccc..be24212 100644 --- a/images_build/client/dign_client/bin/client.py +++ b/images_build/client/dign_client/bin/client.py @@ -7,8 +7,8 @@ import re import time import io from io import BytesIO -import getopt -import ciunittest +# import getopt +# import ciunittest import argparse import hashlib from configparser import ConfigParser @@ -17,1395 +17,1296 @@ import dns.exception import dns.resolver import sys import logging +import copy +from prettytable import PrettyTable,NONE,HEADER +class ConfigLoader: + DEFAULT_CONFIGS = { + 'test_firewallBypass_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyDrop_dns': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyRedirectA_dns': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyRedirectAAAA_dns': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyRedirectARangeTTL_dns': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyRedirectAAAARangeTTL_dns': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_dnsRequest_allow_rdtype_a': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_dnsRequest_allow_rdtype_aaaa': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_dnsRequest_allow_rdtype_cname': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslCerterrExpired': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslCerterrSelfsigned': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslCerterrUntrustedroot': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyRedirect_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyBlock_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyReplace_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyHijack_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyInsert_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyRedirect_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyBlock_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyReplace_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyHijack_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyInsert_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize1k': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize4k': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize16k': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize64k': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize256k': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize1M': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize4M': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize16M': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallIntercept_sslDownloadSize64M': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallAllow_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyDrop_http': { + 'conn_timeout': 4,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyReset_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyBlock_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallAllow_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyDrop_ssl': { + 'conn_timeout': 4,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyReset_ssl': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyResetFilterHost_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_firewallDenyResetFilterURL_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyDenyFilterHost_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'test_proxyDenyFilterURL_http': { + 'conn_timeout': 1,'max_recv_speed_large': 6553600 + }, + 'start_time_random_delay_range': { + 'left_edge': 1,'right_edge': 30 + } + } -suite_test_config_dict = {'test_firewallBypass_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyDrop_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyRedirectA_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyRedirectAAAA_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyRedirectARangeTTL_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyRedirectAAAARangeTTL_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_dnsRequest_allow_rdtype_a': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_dnsRequest_allow_rdtype_aaaa': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_dnsRequest_allow_rdtype_cname': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslCerterrExpired': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslCerterrSelfsigned': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslCerterrUntrustedroot': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyRedirect_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyBlock_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyReplace_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyHijack_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyInsert_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyRedirect_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyBlock_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyReplace_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyHijack_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyInsert_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize1k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize4k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize16k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize64k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize256k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize1M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize4M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize16M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallIntercept_sslDownloadSize64M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallAllow_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyDrop_http': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600}, - 'test_firewallDenyReset_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyBlock_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallAllow_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyDrop_ssl': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600}, - 'test_firewallDenyReset_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyResetFilterHost_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_firewallDenyResetFilterURL_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyDenyFilterHost_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'test_proxyDenyFilterURL_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600}, - 'start_time_random_delay_range': {'enabled':1,'left_edge':1,'right_edge':30}} + def __init__(self, config_path: str): + self.__config_path = config_path + self.__configs = copy.deepcopy(self.DEFAULT_CONFIGS) + self.__load_configs() + def __load_configs(self): + config_parser = ConfigParser() + config_parser.read(self.__config_path, encoding='UTF-8') + for section in config_parser.sections(): + if section in self.__configs: + self.__configs[section].update(config_parser.items(section)) + else: + self.__configs[section] = dict(config_parser.items(section)) -ssl_bypass_info_re = "Ssl connection bypass success" -ssl_intercept_info_re = "Ssl connection intercept success" -https_exprired_info_re = "Ssl exprired cert check success" -https_self_signed_info_re = "Ssl self signed cert check success" -https_untrusted_root_info_re = "Ssl untrusted_root cert check success" - -ssl_redirect_info_re = "Ssl connection redirect success" -ssl_replace_info_re = "Ssl connection replace success" -ssl_insert_info_re = "Ssl connection insert success" -ssl_hijack_info_re = "Ssl connection hijack success" -ssl_block_info_re = "Ssl connection block success" - -http_redirect_info_re = "http connection redirect success" -http_replace_info_re = "http connection replace success" -http_insert_info_re = "http connection insert success" -http_hijack_info_re = "http connection hijack success" -http_block_info_re = "http connection block success" - -https_conn_taffic_1k_re = 'https download file 1k success' -https_conn_taffic_4k_re = 'https download file 4k success' -https_conn_taffic_16k_re = 'https download file 16k success' -https_conn_taffic_64k_re = 'https download file 64k success' -https_conn_taffic_256k_re = 'https download file 256k success' -https_conn_taffic_1M_re = 'https download file 1M success' -https_conn_taffic_4M_re = 'https download file 4M success' -https_conn_taffic_16M_re = 'https download file 16M success' -https_conn_taffic_64M_re = 'https download file 64M success' - -http_firewall_allow_re = "http firewall action allow success" -http_firewall_deny_drop_re = "http firewall aciton deny subaction drop success" -http_firewall_deny_rst_re = "http firewall action deny subaction rst success" -http_firewall_deny_block_re = "http firewall aciton deny subaction block success" -ssl_firewall_allow_re = "ssl firewall action allow success" -ssl_firewall_deny_drop_re = "ssl firewall action deny subaction drop success" -ssl_firewall_deny_rst_re = "ssl firewall action deny subaction rst success" - - -URLBypass = 'https://sha384.badssl.selftest.gdnt-cloud.website' -URLIntercept = 'https://sha256.badssl.selftest.gdnt-cloud.website' -URLSslExpired = 'https://expired.badssl.selftest.gdnt-cloud.website' -URLSslSelfsigned = 'https://self-signed.badssl.selftest.gdnt-cloud.website' -URLSslSuntrustedroot = 'https://untrusted-root.badssl.selftest.gdnt-cloud.website' - -URLSslRedirect = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js' -URLSslReplace = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js' -URLSslInsert = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html' -URLSslHijack = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js' -URLSslBlock = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js' - -URLHttpRedirect = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js' -URLHttpReplace = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js' -URLHttpInsert = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html' -URLHttpHijack = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js' -URLHttpBlock = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js' - -URLConTraffic_1k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k" -URLConTraffic_4k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k" -URLConTraffic_16k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k" -URLConTraffic_64k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k" -URLConTraffic_256k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k" -URLConTraffic_1M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M" -URLConTraffic_4M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M" -URLConTraffic_16M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M" -URLConTraffic_64M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M" - -URLHttpFirewallAllow = "http://http.badssl.selftest.gdnt-cloud.website" -URLHttpFirewallDenyDrop = "http://http-credit-card.badssl.selftest.gdnt-cloud.website" -URLHttpFirewallDenyRst = "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website" -URLHttpFirewallDenyBlock = "http://http-login.badssl.selftest.gdnt-cloud.website" -URLSslFirewallAllow = "https://sha512.badssl.selftest.gdnt-cloud.website" -URLSslFirewallDenyDrop = "https://rsa2048.badssl.selftest.gdnt-cloud.website" -URLSslFirewallDenyRst = "https://rsa4096.badssl.selftest.gdnt-cloud.website" - -URLHttpFirewallDenyRstFilterHost = 'http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website' -URLHttpFirewallDenyRstFilterURL = 'http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website' -URLHttpProxyDenyFilterHost = 'http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website' -URLHttpProxyDenyFilterURL = 'http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website' - -http_firewall_deny_rst_filter_host_re = "testing firewall deny reset filter host ok" -http_firewall_deny_rst_filter_url_re = "testing firewall deny reset filter url ok" -http_proxy_deny_filter_host_re = "testing proxy deny filter host ok" -http_proxy_deny_filter_url_re = "testing proxy deny filter url ok" - -HOST_DNS_ALLOW_A = "dnstest.allow-a-ipv4.selftest.gdnt-cloud.website" -HOST_DNS_DENY_REDIRECT_A = "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website" -HOST_DNS_DENY_DORY = "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website" -HOST_DNS_DENY_REDIRECT_A_RTTL = "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website" -HOST_DNS_ALLOW_AAAA = "dnstest.allow-4a-ipv6.selftest.gdnt-cloud.website" -HOST_DNS_DENY_REDIRECT_AAAA = "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website" -HOST_DNS_DENY_REDIRECT_AAAA_RTTL = "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website" -HOST_DNS_CNAME_QUERY = "dnstest.test-cname.selftest.gdnt-cloud.website" -HOST_DNS_CNAME_ANSWER = "dnstest.testanswer-cname.selftest.gdnt-cloud.website" - -DNS_REDIRECT_IPV4_ADDR = "33.252.0.101" -DNS_REDIRECT_IPV6_ADDR = "2001:db8::1001" -DNS_ALLOW_A_ADDR = "233.252.0.1" -DNS_ALLOW_AAAA_ADDR = "2001:db8::1" - - - -DNS_SERVER_ALLOW_TTL = 60 -DNS_SERVER_REDIRECT_TTL = 333 -DNS_SERVER_REDIRECT_RANGE_LOW = 400 -DNS_SERVER_REDIRECT_RANGE_HIGH = 500 -DNS_SERVER_IP = ["192.0.2.101"] -DnsRequestFirewallDenyDrop = "Dns request timeout is deny drop sucess" -DnsARequestFireWallDenyRedirect = "Dns rdtype A request is deny reidrect sucess" -DnsAAAARequestFireWallDenyRedirect = "Dns rdtype AAAA request is deny redirect sucess" -DnsARequestFireWallDenyRedirectRangTTL = "Dns rdtype A request is deny reidrect and range ttl sucess" -DnsAAAARequestFireWallDenyRedirectRangTTL = "Dns rdtype AAAA request is deny redirect and range ttl sucess" -DnsARequestFirewallAllow = "Dns rdtype A request data is sucess" -DnsAAAARequestFirewallAllow = "Dns rdtype AAAA request data is sucess" -DnsCNAMERequestFirewallAllow = "Dns rdtype CNAME request data is sucess" - - -REQUEST_RESOLVE = ['sha384.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'sha256.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'expired.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'self-signed.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'untrusted-root.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'web-replay.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\ - 'web-replay.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'testing-download.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'http.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\ - 'http-credit-card.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\ - 'http-dynamic-login.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\ - 'http-login.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\ - 'sha512.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'rsa2048.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'rsa4096.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\ - 'testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\ - 'testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\ - 'testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\ - 'testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website:80:192.0.2.101'] - -def set_http_request_resolve(id_service_function): - global REQUEST_RESOLVE - ip_left_edge = 100 - ip_http_server = ip_left_edge + id_service_function - REQUEST_RESOLVE = ['sha384.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'sha256.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'expired.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'self-signed.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'untrusted-root.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'web-replay.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\ - 'web-replay.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'testing-download.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'http.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\ - 'http-credit-card.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\ - 'http-dynamic-login.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\ - 'http-login.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\ - 'sha512.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'rsa2048.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'rsa4096.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\ - 'testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\ - 'testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\ - 'testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\ - 'testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server] - -def set_dns_server_ip(id_service_function): - global DNS_SERVER_IP - ip_left_edge = 100 - ip_dns_server = ip_left_edge + id_service_function - DNS_SERVER_IP = ['192.0.2.%d' % ip_dns_server] - -class _WritelnDecorator(object): - """Used to decorate file-like objects with a handy 'writeln' method""" - def __init__(self,stream): - self.stream = stream - - def __getattr__(self, attr): - if attr in ('stream', '__getstate__'): - raise AttributeError(attr) - return getattr(self.stream,attr) - - def writeln(self, arg=None): - if arg: - self.write(arg) - self.write('\n') # text-mode streams translate to \r\n if needed - - -class DignTextTestResult(unittest.result.TestResult): - """A test result class that can print formatted text results to a stream. - Used by TextTestRunner. - """ - separator1 = '=' * 70 - separator2 = '-' * 70 - - def __init__(self, stream, descriptions, verbosity): - super(DignTextTestResult, self).__init__(stream, descriptions, verbosity) - self.stream = stream - self.dignStream = _WritelnDecorator(sys.stderr) - self.showAll = verbosity > 1 - self.dots = verbosity == 1 - self.descriptions = descriptions - - def getDescription(self, test): - doc_first_line = test.shortDescription() - if self.descriptions and doc_first_line: - return '\n'.join((str(test), doc_first_line)) + def get_value_by_section_and_key(self, section, key): + if section in self.__configs and key in self.__configs[section]: + value = self.__configs[section][key] + if isinstance(value, str) and value.isdigit(): + return int(value) + else: + return value else: - return str(test).split(' ', 1 )[0] + raise KeyError("%s not in configs or %s.%s not in configs." % (section, section, key)) - def startTest(self, test): - super(DignTextTestResult, self).startTest(test) - if self.showAll: - self.stream.write(self.getDescription(test)) - self.dignStream.write(self.getDescription(test)) - self.stream.write(" ... ") - self.dignStream.write(" ... ") - self.stream.flush() - self.dignStream.flush() +class CommandParser: + COUNT_MIN = 1 + COUNT_MAX = 65535 + INDEX_PATTERN = r'^(6[0-3]|[0-5][0-9]|[0-9])$' + #INDEX_PATTERN = r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' - def addSuccess(self, test): - super(DignTextTestResult, self).addSuccess(test) - if self.showAll: - self.stream.writeln("ok") - self.dignStream.writeln("ok") - elif self.dots: - self.stream.write('.') - self.dignStream.write('.') - self.stream.flush() - self.dignStream.flush() - - def addError(self, test, err): - super(DignTextTestResult, self).addError(test, err) - if self.showAll: - self.stream.writeln("ERROR") - self.dignStream.writeln("ERROR") - elif self.dots: - self.stream.write('E') - self.dignStream.write('E') - self.stream.flush() - self.dignStream.flush() - - def addFailure(self, test, err): - super(DignTextTestResult, self).addFailure(test, err) - if self.showAll: - self.stream.writeln("FAIL") - self.dignStream.writeln("FAIL") - elif self.dots: - self.stream.write('F') - self.dignStream.write('F') - self.stream.flush() - self.dignStream.flush() - - def addSkip(self, test, reason): - super(DignTextTestResult, self).addSkip(test, reason) - if self.showAll: - self.stream.writeln("skipped {0!r}".format(reason)) - self.dignStream.writeln("skipped {0!r}".format(reason)) - elif self.dots: - self.stream.write("s") - self.dignStream.write("s") - self.stream.flush() - self.dignStream.flush() - - def addExpectedFailure(self, test, err): - super(DignTextTestResult, self).addExpectedFailure(test, err) - if self.showAll: - self.stream.writeln("expected failure") - self.dignStream.writeln("expected failure") - elif self.dots: - self.stream.write("x") - self.dignStream.write("x") - self.stream.flush() - self.dignStream.flush() - - def addUnexpectedSuccess(self, test): - super(DignTextTestResult, self).addUnexpectedSuccess(test) - if self.showAll: - self.stream.writeln("unexpected success") - self.dignStream.writeln("unexpected success") - elif self.dots: - self.stream.write("u") - self.dignStream.write("u") - self.stream.flush() - self.dignStream.flush() - - def printErrors(self): - if self.dots or self.showAll: - self.stream.writeln() - self.dignStream.writeln() - self.printErrorList('ERROR', self.errors) - self.printErrorList('FAIL', self.failures) - - def printErrorList(self, flavour, errors): - for test, err in errors: - self.stream.writeln(self.separator1) - self.dignStream.writeln(self.separator1) - self.stream.writeln("%s: %s" % (flavour,self.getDescription(test))) - self.dignStream.writeln("%s: %s" % (flavour,self.getDescription(test))) - self.stream.writeln(self.separator2) - self.dignStream.writeln(self.separator2) - self.stream.writeln("%s" % err) - self.dignStream.writeln("%s" % err) - -def get_logger(name,logPath,enableConsole=True): - logger = logging.getLogger(name) - fileHandler = logging.FileHandler(logPath) - fmt = '%(asctime)s, %(levelname)s, %(message)s' - formatter = logging.Formatter(fmt=fmt, datefmt='%a %b %d %H:%M:%S %Y') - fileHandler.setFormatter(formatter) - fileHandler.setLevel(logging.DEBUG) - logger.addHandler(fileHandler) - if enableConsole == True: - consoleHandler = logging.StreamHandler() - consoleHandler.setFormatter(formatter) - consoleHandler.setLevel(logging.DEBUG) - logger.addHandler(consoleHandler) - logger.setLevel(logging.DEBUG) - return logger - -class DNSCheckRequestBuild: def __init__(self): - self.dns_resolver=dns.resolver.Resolver() - self.dns_resolver.nameservers = DNS_SERVER_IP - self.dns_resolver.search = [] - self.dns_resolver.use_search_by_default = False - - def dns_action_deny_subaction_drop(self,test_suite_name): - self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - - try: - dns_answer = self.dns_resolver.query(HOST_DNS_DENY_DORY, 'A') - except dns.exception.DNSException as errorinfo: - if type(errorinfo) == dns.resolver.LifetimeTimeout: - raise Exception(DnsRequestFirewallDenyDrop) - else: - raise Exception("Error: The dns_action_deny_subaction_drop check failure, code: %s" % errorinfo) - else: - raise Exception("Error: The dns_action_deny_subaction_drop test deny drop failure" ) - - def dns_action_deny_subaction_redirect_a(self,test_suite_name): - self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - - try: - dns_answer = self.dns_resolver.query(HOST_DNS_DENY_REDIRECT_A, 'A') - except dns.exception.DNSException as errorinfo: - raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure, code: %s" % errorinfo) - else: # drop-redirect and respond rdtype A ipv4 - if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL: - raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL)) - return - for i in dns_answer.response.answer: - for j in i.items: - if j.rdtype == 1: #'rdtype is A: ipv4' - if j.address == DNS_REDIRECT_IPV4_ADDR: - raise Exception(DnsARequestFireWallDenyRedirect) - else: - raise Exception("Error: The dns request rdtype A drop redirect check failure: respond value error") - else: - raise Exception("Error: The dns request rdtype A drop redirect check failure: respond rdtype error") - - def dns_action_deny_subaction_redirect_aaaa(self,test_suite_name): - self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - - try: - dns_answer = self.dns_resolver.query(HOST_DNS_DENY_REDIRECT_AAAA, 'AAAA') - except dns.exception.DNSException as errorinfo: - raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure, code: %s" % errorinfo) - else: # drop-redirect and respond rdtype A ipv6 - if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL: - raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL)) - return - - for i in dns_answer.response.answer: - for j in i.items: - if j.rdtype == 28: #'rdtype is A: ipv6' - if j.address == DNS_REDIRECT_IPV6_ADDR: - raise Exception(DnsAAAARequestFireWallDenyRedirect) - else: - raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error") - else: - raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error") - - def dns_action_deny_subaction_redirect_a_rang_ttl(self,test_suite_name): - self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - - try: - dns_answer = self.dns_resolver.query(HOST_DNS_DENY_REDIRECT_A_RTTL, 'A') - except dns.exception.DNSException as errorinfo: - raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure, code: %s" % errorinfo) - else: # drop-redirect and respond rdtype A ipv4 - if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH: - raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl,DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH)) - return - - for i in dns_answer.response.answer: - for j in i.items: - if j.rdtype == 1: #'rdtype is A: ipv4' - if j.address == DNS_REDIRECT_IPV4_ADDR: - raise Exception(DnsARequestFireWallDenyRedirectRangTTL) - else: - raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond value error") - else: - raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond rdtype error") - - def dns_action_deny_subaction_redirect_aaaa_rang_ttl(self,test_suite_name): - self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - - try: - dns_answer = self.dns_resolver.query(HOST_DNS_DENY_REDIRECT_AAAA_RTTL, 'AAAA') - except dns.exception.DNSException as errorinfo: - raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa range ttl check failure, code: %s" % errorinfo) - else: # drop-redirect and respond rdtype A ipv6 - if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH: - raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH)) - return - - for i in dns_answer.response.answer: - for j in i.items: - if j.rdtype == 28: #'rdtype is A: ipv6' - if j.address == DNS_REDIRECT_IPV6_ADDR: - raise Exception(DnsAAAARequestFireWallDenyRedirectRangTTL) - else: - raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error") - else: - raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error") - - - - def dns_action_allow_rdtype_a(self,test_suite_name): - self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - - try: - dns_answer = self.dns_resolver.query(HOST_DNS_ALLOW_A, 'A') - except dns.exception.DNSException as errorinfo: - raise Exception("Error: The dns request rdtype A allow check failure, code: %s" % errorinfo) - else: - if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL: - raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL)) - return - - for i in dns_answer.response.answer: - for j in i.items: - if j.rdtype == 1: #'rdtype is A: ipv4' - if j.address == DNS_ALLOW_A_ADDR: - raise Exception(DnsARequestFirewallAllow) - else: - raise Exception("Error: The dns request rdtype A allow check failure: respond value error") - - else: - raise Exception("Error: The dns request rdtype A allow check failure: respond rdtype error") - - def dns_action_allow_rdtype_aaaa(self,test_suite_name): - self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - - try: - dns_answer = self.dns_resolver.query(HOST_DNS_ALLOW_AAAA, 'AAAA') - except dns.exception.DNSException as errorinfo: - raise Exception("Error: The dns request rdtype AAAA allow check failure, code: %s" % errorinfo) - else: - if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL: - raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL)) - return - - for i in dns_answer.response.answer: - for j in i.items: - if j.rdtype == 28: #'rdtype is AAAA: ipv6' - if j.address == DNS_ALLOW_AAAA_ADDR: - raise Exception(DnsAAAARequestFirewallAllow) - else: - raise Exception("Error: The dns request rdtype AAAA allow check failure: respond value error") - else: - raise Exception("Error: The dns request rdtype AAAA allow check failure: response rdtype error") - - def dns_action_allow_rdtype_cname(self,test_suite_name): - self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout']) - - try: - dns_answer = self.dns_resolver.query(HOST_DNS_CNAME_QUERY, 'CNAME') - except dns.exception.DNSException as errorinfo: - raise Exception("Error: The dns request rdtype CNAME allow check failure, code: %s" % errorinfo) - else: - if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL: - raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL)) - return - - for i in dns_answer.response.answer: - for j in i.items: - if j.rdtype == 5: #'CNAME: tag(www.xxx.com)' - m=str(j) - if m == (HOST_DNS_CNAME_ANSWER + '.'): - raise Exception(DnsCNAMERequestFirewallAllow) - else: - raise Exception("Error: The dns request rdtype CNAME allow check failure: respond value error") - else: - raise Exception("Error: The dns request rdtype CNAME allow check failure: respond rdtype error") - - -class SSLCheckRequestBuild: - def __init__(self): - self.conn = pycurl.Curl() - self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) - self.conn.setopt(self.conn.OPT_CERTINFO, 1) - self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE) - - def _set_conn_opt(self,test_suite_name, url): - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) - self.conn.setopt(self.conn.URL,url) - self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) - - def _get_conn_issuer(self,certs): - issuer = () - for cert_info in certs[0]: - if cert_info[0] == "Issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - return issuer - - def ssl_bypass(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLBypass) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = self._get_conn_issuer(certs) - - if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0): - raise Exception(ssl_bypass_info_re) - elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0): - raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1]) - else: - raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1]) - - def ssl_intercept(self,test_suite_name): - self._set_conn_opt(test_suite_name, URLIntercept) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = self._get_conn_issuer(certs) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1]) - else: - raise Exception(ssl_intercept_info_re) - else: - raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) - - -class SslInterceptRequestBuild: - def __init__(self): - self.conn = pycurl.Curl() - self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) - self.conn.setopt(self.conn.OPT_CERTINFO, 1) - self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE) - - def _set_conn_opt(self,test_suite_name,url): - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) - self.conn.setopt(self.conn.URL, url) - self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) - - def _conn_to_perform(self, test_suite_name, sec_info_re): - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception(sec_info_re) - else: - raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1]) - else: - if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0) or \ - re.search(r'\bCN[\s\S]*=[\s\S]*badssl[\s\S]*',issuer[1],0): - raise Exception("Error: Ssl connection intercept failed, cert info: %s" % issuer[1]) - else: - raise Exception(sec_info_re) - - - def ssl_intercept_certerrExpired(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLSslExpired) - self._conn_to_perform(test_suite_name,https_exprired_info_re) - - def ssl_intercept_certerrSelf_signed(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLSslSelfsigned) - self._conn_to_perform(test_suite_name,https_self_signed_info_re) - - def ssl_intercept_certerrUntrusted_root(self,test_suite_name,): - self._set_conn_opt(test_suite_name,URLSslSuntrustedroot) - self._conn_to_perform(test_suite_name,https_untrusted_root_info_re) - - -class ProxyRequestBuild: - def __init__(self): - self.bodyBuf = BytesIO() - self.conn = pycurl.Curl() - self.conn.setopt(self.conn.ENCODING, "gzip,deflate") - self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE) - - def _cert_verify(self, certs, isSsl): - if isSsl == True: - issuer = () - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1]) - else: - return - else: - raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1]) - - - def _set_conn_opt(self,test_suite_name, url,isSsl): - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) - self.conn.setopt(self.conn.URL,url) - self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) - self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf) - if isSsl == True: - self.conn.setopt(self.conn.OPT_CERTINFO, 1) - self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - - def proxy_redirect(self,test_suite_name,proxy_url,isSsl): - certs = None - - self._set_conn_opt(test_suite_name,proxy_url,isSsl) - self.conn.perform() - if isSsl == True: - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) - self.conn.close() - self._cert_verify(certs, isSsl) - if rescode == 301 or rescode == 302: - if isSsl == True: - raise Exception(ssl_redirect_info_re) - else: - raise Exception(http_redirect_info_re) - else: - if isSsl == True: - raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode) - else: - raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode) - - def proxy_replace(self,test_suite_name, proxy_url,isSsl): - certs = None - self._set_conn_opt(test_suite_name, proxy_url,isSsl) - self.conn.perform() - if isSsl == True: - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - body = self.bodyBuf.getvalue().decode('utf-8') - self.conn.close() - self._cert_verify(certs, isSsl) - if not re.search(r'EnglishSearchShared', body, 0) and \ - re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0): - if isSsl == True: - raise Exception(ssl_replace_info_re) - else: - raise Exception(http_replace_info_re) - else: - if isSsl == True: - raise Exception("Error:Ssl connection replace fail") - else: - raise Exception("Error:Http connection replace fail") - - def proxy_insert(self,test_suite_name,proxy_url,isSsl): - certs = None - self._set_conn_opt(test_suite_name,proxy_url,isSsl) - self.conn.perform() - body = self.bodyBuf.getvalue().decode('utf-8') - if isSsl == True: - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - self._cert_verify(certs, isSsl) - if re.search(r'httpSelfcheckInsert', body, 0) and \ - re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0): - if isSsl == True: - raise Exception(ssl_insert_info_re) - else: - raise Exception(http_insert_info_re) - else: - if isSsl == True: - raise Exception("Error:Ssl connection insert fail") - else: - raise Exception("Error:Http connection insert fail") - - def proxy_block(self,test_suite_name,proxy_url,isSsl): - certs = None - self._set_conn_opt(test_suite_name,proxy_url,isSsl) - self.conn.perform() - if isSsl == True: - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) - body = self.bodyBuf.getvalue().decode('utf-8') - self.conn.close() - self._cert_verify(certs, isSsl) - if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451): - if isSsl == True: - raise Exception(ssl_block_info_re) - else: - raise Exception(http_block_info_re) - else: - if isSsl == True: - raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode) - else: - raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode) - - def proxy_hijack(self,test_suite_name,proxy_url,isSsl): - certs = None - self._set_conn_opt(test_suite_name,proxy_url,isSsl) - self.conn.perform() - if isSsl == True: - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - self.conn.close() - self._cert_verify(certs, isSsl) - hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue()) - if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0): - if isSsl == True: - raise Exception(ssl_hijack_info_re) - else: - raise Exception(http_hijack_info_re) - else: - if isSsl == True: - raise Exception("Error:Ssl connection hijack fail") - else: - raise Exception("Error:Http connection hijack fail") - - -class SSLFileDownloadBuild: - def __init__(self): - self.conn = pycurl.Curl() - self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) - self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - self.conn.setopt(self.conn.OPT_CERTINFO, 1) - self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE) - - def _get_conninfo(self,conn): - dictconninfo = {} - dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) - dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) - dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME) - dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME) - dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME) - dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) - dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) - dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) - dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) - dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) - dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) - return dictconninfo - - def _set_conn_opt(self,test_suite_name,url): - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) - self.conn.setopt(self.conn.URL,url) - self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) - - def _write_in_logfile(self, sizeStr, connInfoDict): - connInfoLogPath = "/opt/dign_client/log/download_conn_info.log" + '.' + time.strftime("%Y-%m-%d", time.localtime()) - #connInfoLogger = get_logger("download",connInfoLogPath,enableConsole=False) - connInfoStr = json.dumps(connInfoDict) - #connInfoLogger.debug("Fize Size:" + sizeStr + ";connection info:" + connInfoStr) - with open(connInfoLogPath,"a+") as f: - f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Fize Size:" + sizeStr + ";connection info:" + connInfoStr + '\n') - f.close() - - def conn_traffic(self,test_suite_name,url,conn_taffic_re,sizeStr, size): - self._set_conn_opt(test_suite_name, url) - self.conn.perform() - certs = self.conn.getinfo(self.conn.INFO_CERTINFO) - conninfo = self._get_conninfo(self.conn) - self.conn.close() - issuer = () - for cert_info in certs[0]: - if cert_info[0] == "Issuer": - issuer = cert_info - break - if len(issuer) <= 0: - raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer)) - - if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0): - if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0): - raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1]) - else: - raise Exception("Error: Intercept fail: no Tango cert,cert info:%s" % issuer[1]) - - if int(conninfo["size_download"]) == size: - self._write_in_logfile(sizeStr,conninfo) - raise Exception(conn_taffic_re) - else: - raise Exception("Error: connection tarffic size error and is no equal", sizeStr) - - -class HttpFirewallActionBuild: - def __init__(self): - self.conn = pycurl.Curl() - self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) - self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE) - - def _set_conn_opt(self,test_suite_name, url): - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) - self.conn.setopt(self.conn.URL,url) - self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) - - def action_allow(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLHttpFirewallAllow) - self.conn.perform() - rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) - self.conn.close() - if rescode == 200: - raise Exception(http_firewall_allow_re) - else: - raise Exception("Error: The stream may be redirected, http code %s" % rescode) - - def action_deny_subaction_drop(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLHttpFirewallDenyDrop) - try: - self.conn.perform() - self.conn.close() - except pycurl.error as errorinfo: - errcode = errorinfo.args[0] - if(errcode == 28): - raise Exception(http_firewall_deny_drop_re) - else: - raise Exception("Error: The stream may be not dropped %s" % errorinfo) - - - def action_deny_subaction_rst(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLHttpFirewallDenyRst) - try: - self.conn.perform() - self.conn.close() - except pycurl.error as errorinfo: - errcode = errorinfo.args[0] - if(errcode == 56): - raise Exception(http_firewall_deny_rst_re) - else: - raise Exception("Error: The stream may be not rst %s" % errorinfo) - - def action_deny_subaction_block(self,test_suite_name): - bodyBuf = BytesIO() - self._set_conn_opt(test_suite_name,URLHttpFirewallDenyBlock) - self.conn.setopt(self.conn.WRITEDATA, bodyBuf) - self.conn.perform() - rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) - self.conn.close() - if rescode == 403: - if re.search(r'dign-testing-deny-block', bodyBuf.getvalue().decode('utf-8'), 0): - raise Exception(http_firewall_deny_block_re) - else: - raise Exception("Error: response content is not required, required \'dign-testing-deny-block\'") - - else: - raise Exception("Error: The stream may be not block, http code %s " % rescode) - -class SslFirewallActionBuild: - def __init__(self): - self.conn = pycurl.Curl() - self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) - self.conn.setopt(self.conn.OPT_CERTINFO, 1) - self.conn.setopt(self.conn.SSL_VERIFYPEER, False) - self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE) - - def _set_conn_opt(self,test_suite_name, url): - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) - self.conn.setopt(self.conn.URL,url) - self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) - - def action_allow(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLSslFirewallAllow) - self.conn.perform() - rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) - self.conn.close() - if rescode == 200: - raise Exception(ssl_firewall_allow_re) - else: - raise Exception("Error: The stream may be redirected, http code %s" % rescode) - - def action_deny_subaction_drop(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLSslFirewallDenyDrop) - try: - self.conn.perform() - self.conn.close() - except pycurl.error as errorinfo: - errcode = errorinfo.args[0] - if(errcode == 28): - raise Exception(ssl_firewall_deny_drop_re) - else: - raise Exception("Error: The stream may be not dropped %s" % errorinfo) - - def action_deny_subaction_rst(self,test_suite_name): - self._set_conn_opt(test_suite_name,URLSslFirewallDenyRst) - try: - self.conn.perform() - self.conn.close() - except pycurl.error as errorinfo: - errcode = errorinfo.args[0] - if(errcode == 35): - raise Exception(ssl_firewall_deny_rst_re) - else: - raise Exception("Error: The stream may be not rst %s" % errorinfo) - - -class FilterTestingBuild: - def __init__(self): - self.conn = pycurl.Curl() - self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write) - self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE) - - def _set_conn_opt(self,test_suite_name, url): - self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large'])) - self.conn.setopt(self.conn.URL,url) - self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout'])) - - def _firewall_deny_reset(self,test_suite_name, url, raise_re): - self._set_conn_opt(test_suite_name,url) - try: - self.conn.perform() - self.conn.close() - except pycurl.error as errorinfo: - errcode = errorinfo.args[0] - if(errcode == 56): - raise Exception(raise_re) - else: - raise Exception("Error: The stream may be not rst %s" % errorinfo) - - def _proxy_deny(self,test_suite_name,url,replaceStr,raise_re): - bodyBuf = BytesIO() - self._set_conn_opt(test_suite_name,url) - self.conn.setopt(self.conn.WRITEDATA, bodyBuf) - self.conn.perform() - rescode = self.conn.getinfo(self.conn.RESPONSE_CODE) - body = bodyBuf.getvalue().decode('utf-8') - self.conn.close() - if re.search(replaceStr, body, 0) and (rescode == 404 or rescode == 451): - raise Exception(raise_re) - else: - raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode) - - def firewall_http_deny_reset_filter_host(self,test_suite_name): - self._firewall_deny_reset(test_suite_name,URLHttpFirewallDenyRstFilterHost,http_firewall_deny_rst_filter_host_re) - - def firewall_http_deny_reset_filter_url(self,test_suite_name): - self._firewall_deny_reset(test_suite_name,URLHttpFirewallDenyRstFilterURL,http_firewall_deny_rst_filter_url_re) - - def proxy_http_deny_filter_host(self,test_suite_name): - self._proxy_deny(test_suite_name,URLHttpProxyDenyFilterHost, "testing-proxy-filter-host",http_proxy_deny_filter_host_re) - - def proxy_http_deny_filter_url(self,test_suite_name): - self._proxy_deny(test_suite_name,URLHttpProxyDenyFilterURL,"testing-proxy-filter-url",http_proxy_deny_filter_url_re) - -class TSGDiagnoseTest(unittest.TestCase): - - def test_firewallDenyDrop_dns(self): - dnsHandler = DNSCheckRequestBuild() - with self.assertRaisesRegex(Exception, DnsRequestFirewallDenyDrop): - dnsHandler.dns_action_deny_subaction_drop('test_firewallDenyDrop_dns') - - def test_firewallDenyRedirectA_dns(self): - dnsHandler = DNSCheckRequestBuild() - with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirect): - dnsHandler.dns_action_deny_subaction_redirect_a('test_firewallDenyRedirectA_dns') - - def test_firewallDenyRedirectAAAA_dns(self): - dnsHandler = DNSCheckRequestBuild() - with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirect): - dnsHandler.dns_action_deny_subaction_redirect_aaaa('test_firewallDenyRedirectARangeTTL_dns') - - def test_firewallDenyRedirectARangeTTL_dns(self): - dnsHandler = DNSCheckRequestBuild() - with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirectRangTTL): - dnsHandler.dns_action_deny_subaction_redirect_a_rang_ttl('test_firewallDenyRedirectARangeTTL_dns') - - def test_firewallDenyRedirectAAAARangeTTL_dns(self): - dnsHandler = DNSCheckRequestBuild() - with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirectRangTTL): - dnsHandler.dns_action_deny_subaction_redirect_aaaa_rang_ttl('test_firewallDenyRedirectAAAARangeTTL_dns') - - def test_dnsRequest_allow_rdtype_a(self): - dnsHandler = DNSCheckRequestBuild() - with self.assertRaisesRegex(Exception, DnsARequestFirewallAllow): - dnsHandler.dns_action_allow_rdtype_a('test_dnsRequest_allow_rdtype_a') - - def test_dnsRequest_allow_rdtype_aaaa(self): - dnsHandler = DNSCheckRequestBuild() - with self.assertRaisesRegex(Exception, DnsAAAARequestFirewallAllow): - dnsHandler.dns_action_allow_rdtype_aaaa('test_dnsRequest_allow_rdtype_aaaa') - - def test_dnsRequest_allow_rdtype_cname(self): - dnsHandler = DNSCheckRequestBuild() - with self.assertRaisesRegex(Exception, DnsCNAMERequestFirewallAllow): - dnsHandler.dns_action_allow_rdtype_cname('test_dnsRequest_allow_rdtype_cname') - - def test_firewallBypass_ssl(self): - sslHandler = SSLCheckRequestBuild() - with self.assertRaisesRegex(Exception, ssl_bypass_info_re): - sslHandler.ssl_bypass('test_firewallBypass_ssl') - - def test_firewallIntercept_ssl(self): - sslHandler = SSLCheckRequestBuild() - with self.assertRaisesRegex(Exception, ssl_intercept_info_re): - sslHandler.ssl_intercept('test_firewallIntercept_ssl') - - def test_firewallIntercept_sslCerterrExpired(self): - requestHandler = SslInterceptRequestBuild() - with self.assertRaisesRegex(Exception, https_exprired_info_re): - requestHandler.ssl_intercept_certerrExpired('test_firewallIntercept_sslCerterrExpired') - - def test_firewallIntercept_sslCerterrSelfsigned(self): - requestHandler = SslInterceptRequestBuild() - with self.assertRaisesRegex(Exception, https_self_signed_info_re): - requestHandler.ssl_intercept_certerrSelf_signed('test_firewallIntercept_sslCerterrSelfsigned') - - def test_firewallIntercept_sslCerterrUntrustedroot(self): - requestHandler = SslInterceptRequestBuild() - with self.assertRaisesRegex(Exception, https_untrusted_root_info_re): - requestHandler.ssl_intercept_certerrUntrusted_root('test_firewallIntercept_sslCerterrUntrustedroot') - - def test_proxyRedirect_ssl(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, ssl_redirect_info_re): - proxyHandler.proxy_redirect('test_proxyRedirect_ssl',URLSslRedirect,True) - - def test_proxyBlock_ssl(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, ssl_block_info_re): - proxyHandler.proxy_block('test_proxyBlock_ssl', URLSslBlock,True) - - def test_proxyReplace_ssl(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, ssl_replace_info_re): - proxyHandler.proxy_replace('test_proxyReplace_ssl',URLSslReplace, True) - - def test_proxyHijack_ssl(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, ssl_hijack_info_re): - proxyHandler.proxy_hijack('test_proxyHijack_ssl', URLSslHijack,True) - - def test_proxyInsert_ssl(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, ssl_insert_info_re): - proxyHandler.proxy_insert('test_proxyInsert_ssl',URLSslInsert,True) - - - def test_proxyRedirect_http(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, http_redirect_info_re): - proxyHandler.proxy_redirect('test_proxyRedirect_http',URLHttpRedirect, False) - - def test_proxyBlock_http(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, http_block_info_re): - proxyHandler.proxy_block('test_proxyBlock_http', URLHttpBlock,False) - - def test_proxyReplace_http(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, http_replace_info_re): - proxyHandler.proxy_replace('test_proxyReplace_http',URLHttpReplace, False) - - def test_proxyHijack_http(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, http_hijack_info_re): - proxyHandler.proxy_hijack('test_proxyHijack_http', URLHttpHijack,False) - - def test_proxyInsert_http(self): - proxyHandler = ProxyRequestBuild() - with self.assertRaisesRegex(Exception, http_insert_info_re): - proxyHandler.proxy_insert('test_proxyInsert_http',URLHttpInsert,False) - - def test_firewallIntercept_sslDownloadSize1k(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize1k', URLConTraffic_1k, https_conn_taffic_1k_re,'1k', 1024) - - def test_firewallIntercept_sslDownloadSize4k(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize4k',URLConTraffic_4k, https_conn_taffic_4k_re, '4k', 4*1024) - - def test_firewallIntercept_sslDownloadSize16k(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize16k', URLConTraffic_16k, https_conn_taffic_16k_re,'16k', 16*1024) - - def test_firewallIntercept_sslDownloadSize64k(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize64k',URLConTraffic_64k, https_conn_taffic_64k_re, '64k', 64*1024) - - def test_firewallIntercept_sslDownloadSize256k(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize256k', URLConTraffic_256k,https_conn_taffic_256k_re,'256k', 256*1024) - - def test_firewallIntercept_sslDownloadSize1M(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize1M', URLConTraffic_1M, https_conn_taffic_1M_re, '1M', 1024 * 1024) - - def test_firewallIntercept_sslDownloadSize4M(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize4M', URLConTraffic_4M, https_conn_taffic_4M_re,'4M', 4*1024*1024) - - def test_firewallIntercept_sslDownloadSize16M(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize16M', URLConTraffic_16M,https_conn_taffic_16M_re,'16M',16*1024*1024) - - def test_firewallIntercept_sslDownloadSize64M(self): - requestHandler = SSLFileDownloadBuild() - with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re): - requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize64M',URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024) - - def test_firewallAllow_http(self): - requestHandler = HttpFirewallActionBuild() - with self.assertRaisesRegex(Exception, http_firewall_allow_re): - requestHandler.action_allow('test_firewallAllow_http') - - def test_firewallDenyDrop_http(self): - requestHandler = HttpFirewallActionBuild() - with self.assertRaisesRegex(Exception, http_firewall_deny_drop_re): - requestHandler.action_deny_subaction_drop('test_firewallDenyDrop_http') - - def test_firewallDenyReset_http(self): - requestHandler = HttpFirewallActionBuild() - with self.assertRaisesRegex(Exception, http_firewall_deny_rst_re): - requestHandler.action_deny_subaction_rst('test_firewallDenyReset_http') - - def test_firewallDenyBlock_http(self): - requestHandler = HttpFirewallActionBuild() - with self.assertRaisesRegex(Exception, http_firewall_deny_block_re): - requestHandler.action_deny_subaction_block('test_firewallDenyBlock_http') - - def test_firewallAllow_ssl(self): - requestHandler = SslFirewallActionBuild() - with self.assertRaisesRegex(Exception, ssl_firewall_allow_re): - requestHandler.action_allow('test_firewallAllow_ssl') - - def test_firewallDenyDrop_ssl(self): - requestHandler = SslFirewallActionBuild() - with self.assertRaisesRegex(Exception, ssl_firewall_deny_drop_re): - requestHandler.action_deny_subaction_drop('test_firewallDenyDrop_ssl') - - def test_firewallDenyReset_ssl(self): - requestHandler = SslFirewallActionBuild() - with self.assertRaisesRegex(Exception, ssl_firewall_deny_rst_re): - requestHandler.action_deny_subaction_rst('test_firewallDenyReset_ssl') - - def test_firewallDenyResetFilterHost_http(self): - requestHandler = FilterTestingBuild() - with self.assertRaisesRegex(Exception, http_firewall_deny_rst_filter_host_re): - requestHandler.firewall_http_deny_reset_filter_host('test_firewallDenyResetFilterHost_http') - - def test_firewallDenyResetFilterURL_http(self): - requestHandler = FilterTestingBuild() - with self.assertRaisesRegex(Exception, http_firewall_deny_rst_filter_url_re): - requestHandler.firewall_http_deny_reset_filter_url('test_firewallDenyResetFilterURL_http') - - def test_proxyDenyFilterHost_http(self): - requestHandler = FilterTestingBuild() - with self.assertRaisesRegex(Exception, http_proxy_deny_filter_host_re): - requestHandler.proxy_http_deny_filter_host('test_proxyDenyFilterHost_http') - - def test_proxyDenyFilterURL_http(self): - requestHandler = FilterTestingBuild() - with self.assertRaisesRegex(Exception, http_proxy_deny_filter_url_re): - requestHandler.proxy_http_deny_filter_url('test_proxyDenyFilterURL_http') - -class TsgDiagnose: - def __init__(self): - self.interval = 1 - self.loop = False - self.count = 1 - self.config = None - self.client = None - self.config_dict = {} - self.dign_duration = 0 - self.max_service_function_index = 1 - self.list_service_function_index = [] - self.compatible_mode = False - - def _get_dign_option(self): - parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help") - parser.add_argument('-i','--interval', type = int, default = 30,help='Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose.') - parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535') - parser.add_argument('-p','--configpath', type = str, default = '/opt/dign_client/etc/client.conf',help='Specifies the config file, default /opt/dign_client/etc/client.conf') - parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal') - parser.add_argument('-m','--max_service_function_index', type = int, default = 1, help='Specifies the max index of service_function,range:1-256') - parser.add_argument('-d','--list_service_function_index', nargs='+', type = int, help='Specifies the list of service function index') - parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running') + self.__start_up() + + def __start_up(self): + parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose") + parser.add_argument('-i','--interval', type = int, default = 30, + help='The time interval in seconds between consecutive TSG diagnose operations. Default: 30.') + parser.add_argument('-c','--count', type = int, default = 1, + help='Specifies the number of TSG diagnoses. Default: 1. Range: [1,65535].') + parser.add_argument('-l','--loop', action='store_true', default = False, + help='Enable TSG diagnose loop, exit when recv a signal.') + parser.add_argument('--service_function_indexs', type = str, default = "0", + help = "Specifies the service function indexs. Example: 0,2,4-6,7. Range: [0,63]") + parser.add_argument('--config_path', type = str, default = '/opt/dign_client/etc/client.conf', + help='Specifies the config file, default /opt/dign_client/etc/client.conf') + parser.add_argument('--case_names_regexp', type = str, default = '.+', + help='Regexp of the case names to run. Example: .+') + + # parser.add_argument('-o','--compatible_mode', action='store_true', default = False, help='Tsg diagnose compatible mode to running') args = parser.parse_args() - self.interval = args.interval - self.loop = args.loop - self.count = args.count - self.config = args.configpath - self.compatible_mode = args.compatible_mode - self.max_service_function_index = args.max_service_function_index - self.list_service_function_index = args.list_service_function_index - if self.count == 0: - print("Error: bad number of tsg diagnose and will exit") + + if not self.__is_legal_args(args): parser.print_help() sys.exit(1) - def _get_dign_config(self): - global suite_test_config_dict - config = ConfigParser() - config.read(self.config, encoding='UTF-8') - for section in config.sections(): - if section in suite_test_config_dict: - suite_test_config_dict[section].update(config.items(section)) + self.__interval_s = args.interval + self.__count = args.count + self.__loop = args.loop + self.__service_function_indexs = args.service_function_indexs + self.__config_path = args.config_path + self.__case_names_regexp = args.case_names_regexp + + def __is_legal_args(self, args) -> bool: + if args.count < self.COUNT_MIN or args.count > self.COUNT_MAX: + print("Error: the number of TSG diagnoses must in [1,65535]") + return False + + str_indexs = args.service_function_indexs.split(",") + for item in str_indexs: + if not re.match(self.INDEX_PATTERN, item): + sub_indexs = item.split("-") + if len(sub_indexs) != 2 or (not re.match(self.INDEX_PATTERN, sub_indexs[0])) or \ + (not re.match(self.INDEX_PATTERN, sub_indexs[1])) or (int(sub_indexs[1]) < int(sub_indexs[0])): + print("Error: The service function indexs must in [0,63]") + return False + + return True + + @property + def interval_s(self): + return self.__interval_s + + @property + def count(self): + return self.__count + + @property + def loop(self): + return self.__loop + + @property + def service_function_indexs(self): + return self.__parse_service_function_indexs_str(self.__service_function_indexs) + + def __parse_service_function_indexs_str(self, indexs_str: str) -> list: + indexs = [] + str_indexs = indexs_str.split(",") + for item in str_indexs: + if re.match(self.INDEX_PATTERN, item): + indexs.append(int(item)) else: - suite_test_config_dict[section] = dict(config.items(section)) - self.config_dict = suite_test_config_dict - for config_value in self.config_dict.values(): - if 'conn_timeout' not in config_value: - continue - if int(config_value['enabled']) != 1: - continue - self.dign_duration += int(config_value['conn_timeout']) + sub_str_indexs = item.split("-") + indexs.extend([i for i in range(int(sub_str_indexs[0]), int(sub_str_indexs[1]) + 1)]) + indexs = list(set(indexs)) + return indexs + @property + def config_path(self): + return self.__config_path - def _add_suite(self,test_suite_name): - if int(self.config_dict[test_suite_name]['enabled']) == 1: - self.suite.addTest(TSGDiagnoseTest(test_suite_name)) + @property + def case_names_regexp(self): + return r"{}".format(self.__case_names_regexp) - def _add_dign_case(self): - self.suite = unittest.TestSuite() - self.suite._cleanup = False - self._add_suite('test_firewallBypass_ssl') - self._add_suite('test_firewallIntercept_ssl') - self._add_suite('test_firewallIntercept_sslCerterrExpired') - self._add_suite('test_firewallIntercept_sslCerterrSelfsigned') - self._add_suite('test_firewallIntercept_sslCerterrUntrustedroot') - self._add_suite('test_proxyRedirect_ssl') - self._add_suite('test_proxyBlock_ssl') - self._add_suite('test_proxyReplace_ssl') - self._add_suite('test_proxyHijack_ssl') - self._add_suite('test_proxyInsert_ssl') - self._add_suite('test_proxyRedirect_http') - self._add_suite('test_proxyBlock_http') - self._add_suite('test_proxyReplace_http') - self._add_suite('test_proxyHijack_http') - self._add_suite('test_proxyInsert_http') - self._add_suite('test_firewallAllow_http') - self._add_suite('test_firewallDenyDrop_http') - self._add_suite('test_firewallDenyReset_http') - self._add_suite('test_firewallDenyBlock_http') - self._add_suite('test_firewallAllow_ssl') - self._add_suite('test_firewallDenyDrop_ssl') - self._add_suite('test_firewallDenyReset_ssl') - self._add_suite('test_firewallDenyDrop_dns') - self._add_suite('test_firewallDenyRedirectA_dns') - self._add_suite('test_firewallDenyRedirectAAAA_dns') - self._add_suite('test_firewallDenyRedirectARangeTTL_dns') - self._add_suite('test_firewallDenyRedirectAAAARangeTTL_dns') - self._add_suite('test_firewallIntercept_sslDownloadSize1k') - self._add_suite('test_firewallIntercept_sslDownloadSize4k') - self._add_suite('test_firewallIntercept_sslDownloadSize16k') - self._add_suite('test_firewallIntercept_sslDownloadSize64k') - self._add_suite('test_firewallIntercept_sslDownloadSize256k') - self._add_suite('test_firewallIntercept_sslDownloadSize1M') - self._add_suite('test_firewallIntercept_sslDownloadSize4M') - self._add_suite('test_firewallIntercept_sslDownloadSize16M') - self._add_suite('test_firewallIntercept_sslDownloadSize64M') - self._add_suite('test_firewallDenyResetFilterHost_http') - self._add_suite('test_firewallDenyResetFilterURL_http') - self._add_suite('test_proxyDenyFilterHost_http') - self._add_suite('test_proxyDenyFilterURL_http') +class ServerAddressBuilder: + IPv4_4TH_OCTET_LEFT_EDGE = 101 + DOMAIN_TO_PORT_LIST = [ + ('sha384.badssl.selftest.gdnt-cloud.website', 443), + ('sha256.badssl.selftest.gdnt-cloud.website', 443), + ('expired.badssl.selftest.gdnt-cloud.website', 443), + ('self-signed.badssl.selftest.gdnt-cloud.website', 443), + ('untrusted-root.badssl.selftest.gdnt-cloud.website', 443), + ('web-replay.badssl.selftest.gdnt-cloud.website', 80), + ('web-replay.badssl.selftest.gdnt-cloud.website', 443), + ('testing-download.badssl.selftest.gdnt-cloud.website', 443), + ('http.badssl.selftest.gdnt-cloud.website', 80), + ('http-credit-card.badssl.selftest.gdnt-cloud.website', 80), + ('http-dynamic-login.badssl.selftest.gdnt-cloud.website', 80), + ('http-login.badssl.selftest.gdnt-cloud.website', 80), + ('sha512.badssl.selftest.gdnt-cloud.website', 443), + ('rsa2048.badssl.selftest.gdnt-cloud.website', 443), + ('rsa4096.badssl.selftest.gdnt-cloud.website', 443), + ('testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website', 80), + ('testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website', 80), + ('testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website', 80), + ('testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website', 80) + ] - def _dign_running(self,id_service_function): - print(format(("Service function id:" + str(id_service_function) + ",Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^70s')) - runningLogPath = "/opt/dign_client/log/tsg-diagnose.log" + '.' + time.strftime("%Y-%m-%d", time.localtime()) - #runningLogger = get_logger("running",runningLogPath, False) - #runningLogger.debug("Diagnose Start,the It will take up to %d seconds" %(self.dign_duration)) - with open(runningLogPath,"a+") as f: - f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Diagnose Start,the It will take up to " + str(self.dign_duration) + " senconds") - f.write('\n') - f.close() - result_stream = io.StringIO() - runner = unittest.TextTestRunner(stream=result_stream,verbosity=2,resultclass=DignTextTestResult) - runner.run(self.suite) - with open(runningLogPath,"a+") as f: - f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Diagnose end, Testing results:" + "\n" + result_stream.getvalue()) - f.close() - #runningLogger.debug("Diagnose end, Testing results:" + "\n" + result_stream.getvalue()) - print(format(("Service function id:" + str(id_service_function) + ",Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^70s')) + def __init__(self, service_function_index: int): + self._service_function_index = service_function_index + self._ipv4_4th_octet = self.IPv4_4TH_OCTET_LEFT_EDGE + self._service_function_index - def _dign_service_function_running(self): - if self.compatible_mode == True: - set_http_request_resolve(30) - set_dns_server_ip(35) - self._dign_running(1) - else: - if self.list_service_function_index == None: - for id_service_function in range(1,self.max_service_function_index + 1): - set_http_request_resolve(id_service_function) - set_dns_server_ip(id_service_function) - #print(REQUEST_RESOLVE) - self._dign_running(id_service_function) - else: - self.list_service_function_index.sort() - for id_service_function in self.list_service_function_index: - set_http_request_resolve(id_service_function) - set_dns_server_ip(id_service_function) - #print(REQUEST_RESOLVE) - self._dign_running(id_service_function) + @property + def resolves(self) -> list: + return [f"{domain}:{port}:192.0.2.{self._ipv4_4th_octet}" for domain, port in self.DOMAIN_TO_PORT_LIST] - def dign_exec(self): - self._get_dign_option() - self._get_dign_config() - self._add_dign_case() - dign_counter = 0 + @property + def nameservers(self) -> list: + return ['192.0.2.%d' % self._ipv4_4th_octet] + +class URLTransferBuilder: + def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int): + self._url = url + self._request_resolve = request_resolve + self._conn_timeout = conn_timeout + self._max_recv_speed_large = max_recv_speed_large + self._conn = None + self._response_code = None + self._response_buffer = BytesIO() + self._error_info = None + self._size_download = None + + def _setup_connection(self): + self._response_buffer = BytesIO() + self._conn = pycurl.Curl() + self._conn.setopt(self._conn.WRITEFUNCTION, self._response_buffer.write) + self._conn.setopt(self._conn.RESOLVE, self._request_resolve) + self._conn.setopt(self._conn.URL, self._url) + self._conn.setopt(self._conn.TIMEOUT, self._conn_timeout) + self._conn.setopt(self._conn.MAX_RECV_SPEED_LARGE, self._max_recv_speed_large) + + def _perform_connection(self): + self._conn.perform() + self._response_code = self._conn.getinfo(self._conn.RESPONSE_CODE) + self._size_download = self._conn.getinfo(pycurl.SIZE_DOWNLOAD) + + def _close_connection(self): + self._conn.close() + + def connect(self): try: - if int(self.config_dict['start_time_random_delay_range']['enabled']) == 1: - time.sleep(random.randint( \ - int(self.config_dict['start_time_random_delay_range']['left_edge']),\ - int(self.config_dict['start_time_random_delay_range']['right_edge']))) - while True: - print("\nRUN %d" %(dign_counter + 1)) - self._dign_service_function_running() - dign_counter = dign_counter + 1 - if not self.loop: - if dign_counter >= self.count: - break - time.sleep(self.interval) - except Exception as ex: - print("Process get an exception, will exit, Exception info:", ex) - sys.exit(1) + self._setup_connection() + self._perform_connection() + self._close_connection() + except pycurl.error as error_info: + self._error_info = error_info + + @property + def response_code(self): + return self._response_code + + @property + def response_body(self): + return self._response_buffer.getvalue() + + @property + def error_info(self): + return self._error_info + + @property + def size_download(self): + return self._size_download + +class HttpURLTransferBuilder(URLTransferBuilder): + def _perform_connection(self): + super()._perform_connection() + +class HttpsURLTransferBuilder(URLTransferBuilder): + def __init__(self, url: str, request_resolve: list, conn_timeout: int, max_recv_speed_large: int): + super().__init__(url, request_resolve, conn_timeout, max_recv_speed_large) + self._certs_info = None + + def _setup_connection(self): + super()._setup_connection() + self._conn.setopt(self._conn.OPT_CERTINFO, 1) + self._conn.setopt(self._conn.SSL_VERIFYPEER, False) + + def _perform_connection(self): + super()._perform_connection() + self._certs_info = self._conn.getinfo(self._conn.INFO_CERTINFO) + + @property + def cert_issuer(self) -> str: + return self._get_cert_issuer(self._certs_info) + + def _get_cert_issuer(self, certs) -> str: + if certs is None: + return "" + for cert_info in certs[0]: + if cert_info[0] == "Issuer": + issuer = cert_info[1] + return issuer + +class DNSQueryBuilder: + def __init__(self, domain: str, namesevers: list, conn_timeout: int): + self._domain = domain + self._nameservers = namesevers + self._conn_timeout = conn_timeout + self._dns_resolver = None + self._dns_answer = None + self._error_info = None + + def _setup(self): + self._dns_resolver = dns.resolver.Resolver() + self._dns_resolver.nameservers = self._nameservers + self._dns_resolver.search = [] + self._dns_resolver.use_search_by_default = False + self._dns_resolver.timeout = float(self._conn_timeout) + self._dns_resolver.lifetime = float(self._conn_timeout) + + def _query(self, record_type): + try: + self._setup() + self._dns_answer = self._dns_resolver.query(self._domain, record_type) + except Exception as error_info: + self._error_info = error_info + + @property + def error_info(self): + return self._error_info + + @property + def rrset_ttl(self): + if self._dns_answer is not None: + return self._dns_answer.rrset.ttl + return None + + @property + def response_answer(self): + if self._dns_answer is not None: + return self._dns_answer.response.answer + return None + +class DNSQueryTypeABuilder(DNSQueryBuilder): + def query(self): + self._query("A") + +class DNSQueryTypeAAAABuilder(DNSQueryBuilder): + def query(self): + self._query("AAAA") + + +class URLTransferResponseAnalyzer: + def is_cert_issuer_matched(self, present_cert_issuer, desired_pattern): + if present_cert_issuer is None: + return False, f"Error: Failed to verify cert issuer. Present cert issuer is None." + if re.search(desired_pattern, present_cert_issuer, 0): + return True, None + else: + return False, f"Error: Failed to verify cert issuer. Present cert issuer: {present_cert_issuer}." + + def is_response_code_equal(self, present_code, desired_code): + if present_code == desired_code: + return True, None + else: + return False, f"Error: Failed to verfiy response code. Present code: {present_code}, desired code: {desired_code}." + + def is_response_body_matched(self, present_body, desired_pattern): + present_body_utf8 = present_body.decode('utf-8') + if re.search(desired_pattern, present_body_utf8, 0): + return True, None + else: + return False, f"Error: The response body fail to match the desired content." + + def is_response_body_not_matched(self, present_body, desired_pattern): + present_body_utf8 = present_body.decode('utf-8') + if not re.search(desired_pattern, present_body_utf8, 0): + return True, None + else: + return False, f"Error: The response body matched the desired content: {desired_pattern}." + + def is_response_body_md5_equal(self, present_body, disired_md5): + present_body_md5_value = hashlib.md5(present_body).hexdigest() + if re.search(disired_md5, present_body_md5_value, 0): + return True, None + else: + return False, f"Error: The response body md5 fail to match. Present md5 value: {present_body_md5_value}." + + + def is_download_size_equal(self, present_size, disired_size): + if present_size == disired_size: + return True, None + else: + return False, f"Error: The response body download size fail to match. present_size: {present_size}." + + def is_pycurl_error_code_equal(self, present_error_info, disired_error_code): + if present_error_info is None: + return False, f"Error: The error info is None. Maybe the relevant actions didn't take effect." + if present_error_info.args[0] == disired_error_code: + return True, None + else: + return False, f"Error: The erro code not equal to desired. Present error info: {present_error_info}." + + def is_pycurl_error_none(self, present_error_info): + if present_error_info is None: + return True, None + else: + return False, f"Error: The pycurl error is not None. Present error info: {present_error_info}." + +class DNSResponseAnalyzer: + def is_error_info_none(self, present_error_info): + if present_error_info is None: + return True, None + return False, f"Error: The error info: {present_error_info}." + + def is_error_type_equal(self, present_error_info, desired_type): + if present_error_info is None: + return False, f"Error: The error info is None. Maybe the relevant actions didn’t take effect." + + if type(present_error_info) == desired_type: + return True, None + else: + return False, f"Error: error type not equal to {desired_type}, error info: {present_error_info}." + + def is_ttl_equal(self, present_ttl, desired_ttl): + if present_ttl == desired_ttl: + return True, None + return False, f"Error: Present ttl(%s) not equal to %d." %(present_ttl, desired_ttl) + + def is_ttl_in_range(self, present_ttl, desired_left_edge, desired_right_edge): + if present_ttl >= desired_left_edge and present_ttl <= desired_right_edge: + return True, None + return False, f"Error: ttl(%d) not in [%d-%d]." %(present_ttl, desired_left_edge, desired_right_edge) + + def is_address_equal(self, response_answer, desired_address, desired_address_type): + if desired_address_type == "ipv4": + rdtype = 1 + if desired_address_type == "ipv6": + rdtype = 28 + + for i in response_answer: + for j in i.items: + if j.rdtype == rdtype: # 1: ipv4, 28: ipv6 + if j.address == desired_address: + return True, None + else: + return False, f"Error: Present address error." + else: + return False, f"Error: Response rdtype error." + +class ProxyCasesRunner: + def __init__(self) -> None: + self._analyzer = URLTransferResponseAnalyzer() + + def action_redirect_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + + desired_cert_issuer_pattern = r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b' + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302) + if not is_code_equal[0]: + return False, is_code_equal[1] + return True, None + + def action_redirect_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 302) + if not is_code_equal[0]: + return False, is_code_equal[1] + return True, None + + def action_block_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) + if not is_code_equal[0]: + return False, is_code_equal[1] + is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B') + if not is_body_matched[0]: + return False, is_body_matched[1] + return True, None + + def action_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) + if not is_code_equal[0]: + return False, is_code_equal[1] + is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'E33F01E50AFE043191931DD40190B09B') + if not is_body_matched[0]: + return False, is_body_matched[1] + return True, None + + def action_replace_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E') + if not is_body_matched[0]: + return False, is_body_matched[1] + is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared') + if not is_body_matched[0]: + return False, is_body_not_matched[1] + + return True, None + + def action_replace_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'03C174CD9D809789CCEC18D6F585DF3E') + if not is_body_matched[0]: + return False, is_body_matched[1] + + is_body_not_matched = self._analyzer.is_response_body_not_matched(conn.response_body, r'EnglishSearchShared') + if not is_body_matched[0]: + return False, is_body_not_matched[1] + + return True, None + + def action_hijack_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d") + if not is_code_equal[0]: + return False, is_code_equal[1] + return True, None + + def action_hijack_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_code_equal = self._analyzer.is_response_body_md5_equal(conn.response_body, "4bf06db1a228c5c8d978ebf9e1169d0d") + if not is_code_equal[0]: + return False, is_code_equal[1] + return True, None + + def action_insert_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + + is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert') + if not is_json_key_matched[0]: + return False, is_json_key_matched[1] + + is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330') + if not is_json_val_matched[0]: + return False, is_json_val_matched[1] + + return True, None + + def action_insert_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_json_key_matched = self._analyzer.is_response_body_matched(conn.response_body, r'httpSelfcheckInsert') + if not is_json_key_matched[0]: + return False, is_json_key_matched[1] + + is_json_val_matched = self._analyzer.is_response_body_matched(conn.response_body, r'5BE3754D1EA8D51E8D993060FA225330') + if not is_json_val_matched[0]: + return False, is_json_key_matched[1] + + return True, None + + def action_deny_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) + if not is_code_equal[0]: + return False, is_code_equal[1] + is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-host') + if not is_body_matched[0]: + return False, is_body_matched[1] + return True, None + + def action_deny_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 404) + if not is_code_equal[0]: + return False, is_code_equal[1] + is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r'testing-proxy-filter-url') + if not is_body_matched[0]: + return False, is_body_matched[1] + return True, None + +class FirewallCasesRunner: + def __init__(self) -> None: + self._analyzer = URLTransferResponseAnalyzer() + self._dns_analyzer = DNSResponseAnalyzer() + + def action_bypass_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + return True, None + + def action_intercept_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + return True, None + + def action_intercept_protocol_https_cert_error(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*TSG CA Untrusted\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + return True, None + + def action_allow_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) + if not is_code_equal[0]: + return False, is_code_equal[1] + return True, None + + def action_deny_subaction_drop_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) + if not is_error_type_equal[0]: + return False, is_error_type_equal[1] + return True, None + + def action_deny_subaction_reset_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 56) + if not is_error_type_equal[0]: + return False, is_error_type_equal[1] + return True, None + + def action_deny_subaction_reset_protocol_http_filter_host(self, url, resolves, conn_timeout, max_recv_speed_large): + return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large) + + def action_deny_subaction_reset_protocol_http_filter_url(self, url, resolves, conn_timeout, max_recv_speed_large): + return self.action_deny_subaction_reset_protocol_http(url, resolves, conn_timeout, max_recv_speed_large) + + def action_deny_subaction_block_protocol_http(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 403) + if not is_code_equal[0]: + return False, is_code_equal[1] + is_body_matched = self._analyzer.is_response_body_matched(conn.response_body, r"dign-testing-deny-block") + if not is_body_matched[0]: + return False, is_body_matched[1] + return True, None + + def action_allow_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*BadSSL\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + is_code_equal = self._analyzer.is_response_code_equal(conn.response_code, 200) + if not is_code_equal[0]: + return False, is_code_equal[1] + return True, None + + def action_deny_subaction_drop_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 28) + if not is_error_type_equal[0]: + return False, is_error_type_equal[1] + return True, None + + def action_deny_subaction_reset_protocol_https(self, url, resolves, conn_timeout, max_recv_speed_large): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_type_equal = self._analyzer.is_pycurl_error_code_equal(conn.error_info, 35) + if not is_error_type_equal[0]: + return False, is_error_type_equal[1] + return True, None + + + def action_intercept_protocol_https_download_size_1k(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024) + + def action_intercept_protocol_https_download_size_4k(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 4) + + def action_intercept_protocol_https_download_size_16k(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 16) + + def action_intercept_protocol_https_download_size_64k(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 64) + + def action_intercept_protocol_https_download_size_256k(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 256) + + def action_intercept_protocol_https_download_size_1M(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024) + + def action_intercept_protocol_https_download_size_4M(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 4) + + def action_intercept_protocol_https_download_size_16M(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 16) + + def action_intercept_protocol_https_download_size_64M(self, url, resolves, conn_timeout, max_recv_speed_large): + return self._action_intercept_protocol_ssl_by_download_size(url, resolves, conn_timeout, max_recv_speed_large, 1024 * 1024 * 64) + + def _action_intercept_protocol_ssl_by_download_size(self, url, resolves, conn_timeout, max_recv_speed_large, download_size): + conn = HttpsURLTransferBuilder(url, resolves, conn_timeout, max_recv_speed_large) + conn.connect() + is_error_none = self._analyzer.is_pycurl_error_none(conn.error_info) + if not is_error_none[0]: + return False, is_error_none[1] + is_cert_matched = self._analyzer.is_cert_issuer_matched(conn.cert_issuer, r'\bCN[\s]*=[\s]*Tango Secure Gateway CA\b') + if not is_cert_matched[0]: + return False, is_cert_matched[1] + is_download_size_equal = self._analyzer.is_download_size_equal(conn.size_download, download_size) + if not is_download_size_equal[0]: + return False, is_download_size_equal[1] + return True, None + + def action_deny_subaction_drop_protocol_dns(self, domain, nameservers, conn_timeout): + request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) + request.query() + + is_error_type_equal = self._dns_analyzer.is_error_type_equal(request.error_info, dns.resolver.LifetimeTimeout) + if not is_error_type_equal[0]: + return False, is_error_type_equal[1] + return True, None + + def action_deny_subaction_redirect_protocol_dns_type_a(self, domain, nameservers, conn_timeout): + request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) + request.query() + + is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) + if not is_error_info_none[0]: + return False, is_error_info_none[1] + + is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333) + if not is_ttl_equal[0]: + return False, is_ttl_equal[1] + + is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4") + if not is_address_equal[0]: + return False, is_address_equal[1] + + return True, None + + def action_deny_subaction_redirect_protocol_dns_type_aaaa(self, domain, nameservers, conn_timeout): + request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout) + request.query() + + is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) + if not is_error_info_none[0]: + return False, is_error_info_none[1] + + is_ttl_equal = self._dns_analyzer.is_ttl_equal(request.rrset_ttl, 333) + if not is_ttl_equal[0]: + return False, is_ttl_equal[1] + + is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6") + if not is_address_equal[0]: + return False, is_address_equal[1] + + return True, None + + def action_deny_subaction_redirect_protocol_dns_type_a_range_ttl(self, domain, nameservers, conn_timeout): + request = DNSQueryTypeABuilder(domain, nameservers, conn_timeout) + request.query() + + is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) + if not is_error_info_none[0]: + return False, is_error_info_none[1] + + is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500) + if not is_ttl_equal[0]: + return False, is_ttl_equal[1] + + is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "33.252.0.101", "ipv4") + if not is_address_equal[0]: + return False, is_address_equal[1] + + return True, None + + def action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl(self, domain, nameservers, conn_timeout): + request = DNSQueryTypeAAAABuilder(domain, nameservers, conn_timeout) + request.query() + + is_error_info_none = self._dns_analyzer.is_error_info_none(request.error_info) + if not is_error_info_none[0]: + return False, is_error_info_none[1] + + is_ttl_equal = self._dns_analyzer.is_ttl_in_range(request.rrset_ttl, 400, 500) + if not is_ttl_equal[0]: + return False, is_ttl_equal[1] + + is_address_equal = self._dns_analyzer.is_address_equal(request.response_answer, "2001:db8::1001", "ipv6") + if not is_address_equal[0]: + return False, is_address_equal[1] + + return True, None + +class ResultExportBuilder: + def __init__(self): + self._exporter = None + self._create_exporter() + + def _create_exporter(self): + self._exporter = PrettyTable() + self._exporter.vrules = NONE + self._exporter.field_names = ["Case name", "Status", "Description"] + self._exporter.align["Case name"] = "l" + self._exporter.align["Status"] = "l" + self._exporter.align["Description"] = "l" + + def append_case_result(self, case_name, case_result, enable_ouptut_by_case=True): + status, description = self._convert_case_result(case_result) + if enable_ouptut_by_case: + self._output_by_case_name(case_name, status) + + if not case_result[0]: + self._add_exporter_row(case_name, status, description) + + def _convert_case_result(self, case_result): + status = "ok" + if not case_result[0]: + status = "FAIL" + + description = "-" + if case_result[1] is not None: + description = case_result[1] + + return status, description + + def _output_by_case_name(self, case_name, case_status): + print(f"{case_name} ... {case_status}") + + def _add_exporter_row(self, case_name: str, status: str, description: str): + self._exporter.add_row([case_name, status, description]) + + def export(self): + if len(self._exporter._rows) > 0: + print(self._exporter) + + +class DiagnoseCasesRunner: + def __init__(self, cmd_parser: CommandParser): + self._loop = cmd_parser.loop + self._count = cmd_parser.count + self._interval_s = cmd_parser.interval_s + self._service_function_indexs = cmd_parser.service_function_indexs + self._case_names_regexp = cmd_parser.case_names_regexp + self._config_path = cmd_parser.config_path + self._config_loader = ConfigLoader(self._config_path) + + self._proxy_case = ProxyCasesRunner() + self._firewall_case = FirewallCasesRunner() + self._cases_info = [ + { + "name": "test_firewallBypass_ssl", + "protocol_type": "https", + "test_function": self._firewall_case.action_bypass_protocol_https, + "request_content": "https://sha384.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallIntercept_ssl", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https, + "request_content": "https://sha256.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallIntercept_sslCerterrExpired", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_cert_error, + "request_content": "https://expired.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallIntercept_sslCerterrSelfsigned", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_cert_error, + "request_content": "https://self-signed.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallIntercept_sslCerterrUntrustedroot", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_cert_error, + "request_content": "https://untrusted-root.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_proxyRedirect_ssl", + "protocol_type": "https", + "test_function": self._proxy_case.action_redirect_protocol_https, + "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js" + }, + { + "name": "test_proxyBlock_ssl", + "protocol_type": "https", + "test_function": self._proxy_case.action_block_protocol_https, + "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js" + }, + { + "name": "test_proxyReplace_ssl", + "protocol_type": "https", + "test_function": self._proxy_case.action_replace_protocol_https, + "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js" + }, + { + "name": "test_proxyHijack_ssl", + "protocol_type": "https", + "test_function": self._proxy_case.action_hijack_protocol_https, + "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js" + }, + { + "name": "test_proxyInsert_ssl", + "protocol_type": "https", + "test_function": self._proxy_case.action_insert_protocol_https, + "request_content": "https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html" + }, + { + "name": "test_proxyRedirect_http", + "protocol_type": "http", + "test_function": self._proxy_case.action_redirect_protocol_http, + "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js" + }, + { + "name": "test_proxyBlock_http", + "protocol_type": "http", + "test_function": self._proxy_case.action_block_protocol_http, + "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js" + }, + { + "name": "test_proxyReplace_http", + "protocol_type": "http", + "test_function": self._proxy_case.action_replace_protocol_http, + "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js" + }, + { + "name": "test_proxyHijack_http", + "protocol_type": "http", + "test_function": self._proxy_case.action_hijack_protocol_http, + "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js" + }, + { + "name": "test_proxyInsert_http", + "protocol_type": "http", + "test_function": self._proxy_case.action_insert_protocol_http, + "request_content": "http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html" + }, + { + "name": "test_firewallAllow_http", + "protocol_type": "http", + "test_function": self._firewall_case.action_allow_protocol_http, + "request_content": "http://http.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyDrop_http", + "protocol_type": "http", + "test_function": self._firewall_case.action_deny_subaction_drop_protocol_http, + "request_content": "http://http-credit-card.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyReset_http", + "protocol_type": "http", + "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http, + "request_content": "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyBlock_http", + "protocol_type": "http", + "test_function": self._firewall_case.action_deny_subaction_block_protocol_http, + "request_content": "http://http-login.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallAllow_ssl", + "protocol_type": "https", + "test_function": self._firewall_case.action_allow_protocol_https, + "request_content": "https://sha512.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyDrop_ssl", + "protocol_type": "https", + "test_function": self._firewall_case.action_deny_subaction_drop_protocol_https, + "request_content": "https://rsa2048.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyReset_ssl", + "protocol_type": "https", + "test_function": self._firewall_case.action_deny_subaction_reset_protocol_https, + "request_content": "https://rsa4096.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyDrop_dns", + "protocol_type": "dns", + "test_function": self._firewall_case.action_deny_subaction_drop_protocol_dns, + "request_content": "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyRedirectA_dns", + "protocol_type": "dns", + "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a, + "request_content": "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyRedirectAAAA_dns", + "protocol_type": "dns", + "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa, + "request_content": "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyRedirectARangeTTL_dns", + "protocol_type": "dns", + "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_a_range_ttl, + "request_content": "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyRedirectAAAARangeTTL_dns", + "protocol_type": "dns", + "test_function": self._firewall_case.action_deny_subaction_redirect_protocol_dns_type_aaaa_range_ttl, + "request_content": "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallIntercept_sslDownloadSize1k", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_1k, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k" + }, + { + "name": "test_firewallIntercept_sslDownloadSize4k", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_4k, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k" + }, + { + "name": "test_firewallIntercept_sslDownloadSize16k", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_16k, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k" + }, + { + "name": "test_firewallIntercept_sslDownloadSize64k", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_64k, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k" + }, + { + "name": "test_firewallIntercept_sslDownloadSize256k", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_256k, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k" + }, + { + "name": "test_firewallIntercept_sslDownloadSize1M", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_1M, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M" + }, + { + "name": "test_firewallIntercept_sslDownloadSize4M", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_4M, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M" + }, + { + "name": "test_firewallIntercept_sslDownloadSize16M", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_16M, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M" + }, + { + "name": "test_firewallIntercept_sslDownloadSize64M", + "protocol_type": "https", + "test_function": self._firewall_case.action_intercept_protocol_https_download_size_64M, + "request_content": "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M" + }, + { + "name": "test_firewallDenyResetFilterHost_http", + "protocol_type": "http", + "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_host, + "request_content": "http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_firewallDenyResetFilterURL_http", + "protocol_type": "http", + "test_function": self._firewall_case.action_deny_subaction_reset_protocol_http_filter_url, + "request_content": "http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_proxyDenyFilterHost_http", + "protocol_type": "http", + "test_function": self._proxy_case.action_deny_protocol_http_filter_host, + "request_content": "http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website" + }, + { + "name": "test_proxyDenyFilterURL_http", + "protocol_type": "http", + "test_function": self._proxy_case.action_deny_protocol_http_filter_url, + "request_content": "http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website" + } + ] + + def run_cases(self): + run_count = 1 + while True: + print("\nRUN %d" % run_count) + self.__run_cases_by_service_function_ids() + if (self._loop is not True) and self._count >= run_count: + break + time.sleep(self._interval_s) + run_count = run_count + 1 + + def __run_cases_by_service_function_ids(self): + for sf_id in self._service_function_indexs: + start_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print(format(("Service function id:" + str(sf_id) + ",Test start time: " + start_timestamp),'#^70s')) + + self._run_cases_in_one_service_function_id(sf_id) + + end_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print(format(("Service function id:" + str(sf_id) + ",Test end time: " + end_timestamp),'=^70s')) + + def _run_cases_in_one_service_function_id(self, service_function_id): + resolve_Builder = ServerAddressBuilder(service_function_id) + exporter = ResultExportBuilder() + for case_info in self._cases_info: + if re.fullmatch(self._case_names_regexp, case_info["name"]): + self._run_one_case(case_info, resolve_Builder, exporter) + exporter.export() + + def _run_one_case(self, case_info, resolve_Builder, exporter): + conn_timeout = self._get_conn_timeout_from_configs(case_info["name"]) + max_recv_speed_large = self._get_max_recv_speed_large_from_configs(case_info["name"]) + + test_func = case_info.get("test_function") + if test_func: + if case_info["protocol_type"] == "http" or case_info["protocol_type"] == "https": + ret = test_func(case_info["request_content"], resolve_Builder.resolves, conn_timeout, max_recv_speed_large) + + if case_info["protocol_type"] == "dns": + ret = test_func(case_info["request_content"], resolve_Builder.nameservers, conn_timeout) + + exporter.append_case_result(case_info["name"], ret) + + def _get_conn_timeout_from_configs(self, section) -> int: + return self._config_loader.get_value_by_section_and_key(section, "conn_timeout") + + def _get_max_recv_speed_large_from_configs(self, section) -> int: + return self._config_loader.get_value_by_section_and_key(section, "max_recv_speed_large") + if __name__ == '__main__': - tsg_diagnose_running = TsgDiagnose() - tsg_diagnose_running.dign_exec() + cmd_parser = CommandParser() + dign = DiagnoseCasesRunner(cmd_parser) + dign.run_cases()