1393 lines
76 KiB
Python
1393 lines
76 KiB
Python
import sys
|
|
import unittest
|
|
import json
|
|
import pycurl
|
|
import os
|
|
import re
|
|
import time
|
|
import io
|
|
from io import BytesIO
|
|
import getopt
|
|
import ciunittest
|
|
import argparse
|
|
import hashlib
|
|
from configparser import ConfigParser
|
|
import random
|
|
import dns.exception
|
|
import dns.resolver
|
|
import sys
|
|
import logging
|
|
|
|
|
|
suite_test_config_dict = {'test_firewallBypass_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyDrop_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyRedirectA_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyRedirectAAAA_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyRedirectARangeTTL_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyRedirectAAAARangeTTL_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_dnsRequest_allow_rdtype_a': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_dnsRequest_allow_rdtype_aaaa': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_dnsRequest_allow_rdtype_cname': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslCerterrExpired': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslCerterrSelfsigned': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslCerterrUntrustedroot': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyRedirect_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyBlock_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyReplace_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyHijack_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyInsert_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyRedirect_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyBlock_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyReplace_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyHijack_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyInsert_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize1k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize4k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize16k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize64k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize256k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize1M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize4M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize16M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallIntercept_sslDownloadSize64M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallAllow_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyDrop_http': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyReset_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyBlock_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallAllow_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyDrop_ssl': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyReset_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyResetFilterHost_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_firewallDenyResetFilterURL_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyDenyFilterHost_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'test_proxyDenyFilterURL_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
|
|
'start_time_random_delay_range': {'enabled':1,'left_edge':1,'right_edge':30}}
|
|
|
|
|
|
ssl_bypass_info_re = "Ssl connection bypass success"
|
|
ssl_intercept_info_re = "Ssl connection intercept success"
|
|
https_exprired_info_re = "Ssl exprired cert check success"
|
|
https_self_signed_info_re = "Ssl self signed cert check success"
|
|
https_untrusted_root_info_re = "Ssl untrusted_root cert check success"
|
|
|
|
ssl_redirect_info_re = "Ssl connection redirect success"
|
|
ssl_replace_info_re = "Ssl connection replace success"
|
|
ssl_insert_info_re = "Ssl connection insert success"
|
|
ssl_hijack_info_re = "Ssl connection hijack success"
|
|
ssl_block_info_re = "Ssl connection block success"
|
|
|
|
http_redirect_info_re = "http connection redirect success"
|
|
http_replace_info_re = "http connection replace success"
|
|
http_insert_info_re = "http connection insert success"
|
|
http_hijack_info_re = "http connection hijack success"
|
|
http_block_info_re = "http connection block success"
|
|
|
|
https_conn_taffic_1k_re = 'https download file 1k success'
|
|
https_conn_taffic_4k_re = 'https download file 4k success'
|
|
https_conn_taffic_16k_re = 'https download file 16k success'
|
|
https_conn_taffic_64k_re = 'https download file 64k success'
|
|
https_conn_taffic_256k_re = 'https download file 256k success'
|
|
https_conn_taffic_1M_re = 'https download file 1M success'
|
|
https_conn_taffic_4M_re = 'https download file 4M success'
|
|
https_conn_taffic_16M_re = 'https download file 16M success'
|
|
https_conn_taffic_64M_re = 'https download file 64M success'
|
|
|
|
http_firewall_allow_re = "http firewall action allow success"
|
|
http_firewall_deny_drop_re = "http firewall aciton deny subaction drop success"
|
|
http_firewall_deny_rst_re = "http firewall action deny subaction rst success"
|
|
http_firewall_deny_block_re = "http firewall aciton deny subaction block success"
|
|
ssl_firewall_allow_re = "ssl firewall action allow success"
|
|
ssl_firewall_deny_drop_re = "ssl firewall action deny subaction drop success"
|
|
ssl_firewall_deny_rst_re = "ssl firewall action deny subaction rst success"
|
|
|
|
|
|
URLBypass = 'https://sha384.badssl.selftest.gdnt-cloud.website'
|
|
URLIntercept = 'https://sha256.badssl.selftest.gdnt-cloud.website'
|
|
URLSslExpired = 'https://expired.badssl.selftest.gdnt-cloud.website'
|
|
URLSslSelfsigned = 'https://self-signed.badssl.selftest.gdnt-cloud.website'
|
|
URLSslSuntrustedroot = 'https://untrusted-root.badssl.selftest.gdnt-cloud.website'
|
|
|
|
URLSslRedirect = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js'
|
|
URLSslReplace = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js'
|
|
URLSslInsert = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html'
|
|
URLSslHijack = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js'
|
|
URLSslBlock = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js'
|
|
|
|
URLHttpRedirect = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js'
|
|
URLHttpReplace = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js'
|
|
URLHttpInsert = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html'
|
|
URLHttpHijack = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js'
|
|
URLHttpBlock = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js'
|
|
|
|
URLConTraffic_1k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k"
|
|
URLConTraffic_4k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k"
|
|
URLConTraffic_16k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k"
|
|
URLConTraffic_64k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k"
|
|
URLConTraffic_256k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k"
|
|
URLConTraffic_1M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M"
|
|
URLConTraffic_4M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M"
|
|
URLConTraffic_16M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M"
|
|
URLConTraffic_64M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M"
|
|
|
|
URLHttpFirewallAllow = "http://http.badssl.selftest.gdnt-cloud.website"
|
|
URLHttpFirewallDenyDrop = "http://http-credit-card.badssl.selftest.gdnt-cloud.website"
|
|
URLHttpFirewallDenyRst = "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website"
|
|
URLHttpFirewallDenyBlock = "http://http-login.badssl.selftest.gdnt-cloud.website"
|
|
URLSslFirewallAllow = "https://sha512.badssl.selftest.gdnt-cloud.website"
|
|
URLSslFirewallDenyDrop = "https://rsa2048.badssl.selftest.gdnt-cloud.website"
|
|
URLSslFirewallDenyRst = "https://rsa4096.badssl.selftest.gdnt-cloud.website"
|
|
|
|
URLHttpFirewallDenyRstFilterHost = 'http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website'
|
|
URLHttpFirewallDenyRstFilterURL = 'http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website'
|
|
URLHttpProxyDenyFilterHost = 'http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website'
|
|
URLHttpProxyDenyFilterURL = 'http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website'
|
|
|
|
http_firewall_deny_rst_filter_host_re = "testing firewall deny reset filter host ok"
|
|
http_firewall_deny_rst_filter_url_re = "testing firewall deny reset filter url ok"
|
|
http_proxy_deny_filter_host_re = "testing proxy deny filter host ok"
|
|
http_proxy_deny_filter_url_re = "testing proxy deny filter url ok"
|
|
|
|
HOST_DNS_ALLOW_A = "dnstest.allow-a-ipv4.selftest.gdnt-cloud.website"
|
|
HOST_DNS_DENY_REDIRECT_A = "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website"
|
|
HOST_DNS_DENY_DORY = "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website"
|
|
HOST_DNS_DENY_REDIRECT_A_RTTL = "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website"
|
|
HOST_DNS_ALLOW_AAAA = "dnstest.allow-4a-ipv6.selftest.gdnt-cloud.website"
|
|
HOST_DNS_DENY_REDIRECT_AAAA = "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website"
|
|
HOST_DNS_DENY_REDIRECT_AAAA_RTTL = "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website"
|
|
HOST_DNS_CNAME_QUERY = "dnstest.test-cname.selftest.gdnt-cloud.website"
|
|
HOST_DNS_CNAME_ANSWER = "dnstest.testanswer-cname.selftest.gdnt-cloud.website"
|
|
|
|
DNS_REDIRECT_IPV4_ADDR = "33.252.0.101"
|
|
DNS_REDIRECT_IPV6_ADDR = "2001:db8::1001"
|
|
DNS_ALLOW_A_ADDR = "233.252.0.1"
|
|
DNS_ALLOW_AAAA_ADDR = "2001:db8::1"
|
|
|
|
|
|
|
|
DNS_SERVER_ALLOW_TTL = 60
|
|
DNS_SERVER_REDIRECT_TTL = 333
|
|
DNS_SERVER_REDIRECT_RANGE_LOW = 400
|
|
DNS_SERVER_REDIRECT_RANGE_HIGH = 500
|
|
DNS_SERVER_IP = ["192.0.2.101"]
|
|
DnsRequestFirewallDenyDrop = "Dns request timeout is deny drop sucess"
|
|
DnsARequestFireWallDenyRedirect = "Dns rdtype A request is deny reidrect sucess"
|
|
DnsAAAARequestFireWallDenyRedirect = "Dns rdtype AAAA request is deny redirect sucess"
|
|
DnsARequestFireWallDenyRedirectRangTTL = "Dns rdtype A request is deny reidrect and range ttl sucess"
|
|
DnsAAAARequestFireWallDenyRedirectRangTTL = "Dns rdtype AAAA request is deny redirect and range ttl sucess"
|
|
DnsARequestFirewallAllow = "Dns rdtype A request data is sucess"
|
|
DnsAAAARequestFirewallAllow = "Dns rdtype AAAA request data is sucess"
|
|
DnsCNAMERequestFirewallAllow = "Dns rdtype CNAME request data is sucess"
|
|
|
|
|
|
REQUEST_RESOLVE = ['sha384.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'sha256.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'expired.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'self-signed.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'untrusted-root.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'web-replay.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\
|
|
'web-replay.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'testing-download.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'http.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\
|
|
'http-credit-card.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\
|
|
'http-dynamic-login.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\
|
|
'http-login.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\
|
|
'sha512.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'rsa2048.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'rsa4096.badssl.selftest.gdnt-cloud.website:443:192.0.2.101',\
|
|
'testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\
|
|
'testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\
|
|
'testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website:80:192.0.2.101',\
|
|
'testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website:80:192.0.2.101']
|
|
|
|
def set_http_request_resolve(id_service_function):
|
|
global REQUEST_RESOLVE
|
|
ip_left_edge = 100
|
|
ip_http_server = ip_left_edge + id_service_function
|
|
REQUEST_RESOLVE = ['sha384.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'sha256.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'expired.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'self-signed.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'untrusted-root.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'web-replay.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\
|
|
'web-replay.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'testing-download.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'http.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\
|
|
'http-credit-card.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\
|
|
'http-dynamic-login.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\
|
|
'http-login.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\
|
|
'sha512.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'rsa2048.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'rsa4096.badssl.selftest.gdnt-cloud.website:443:192.0.2.%d' % ip_http_server,\
|
|
'testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\
|
|
'testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\
|
|
'testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server,\
|
|
'testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website:80:192.0.2.%d' % ip_http_server]
|
|
|
|
def set_dns_server_ip(id_service_function):
|
|
global DNS_SERVER_IP
|
|
ip_left_edge = 100
|
|
ip_dns_server = ip_left_edge + id_service_function
|
|
DNS_SERVER_IP = ['192.0.2.%d' % ip_dns_server]
|
|
|
|
class _WritelnDecorator(object):
|
|
"""Used to decorate file-like objects with a handy 'writeln' method"""
|
|
def __init__(self,stream):
|
|
self.stream = stream
|
|
|
|
def __getattr__(self, attr):
|
|
if attr in ('stream', '__getstate__'):
|
|
raise AttributeError(attr)
|
|
return getattr(self.stream,attr)
|
|
|
|
def writeln(self, arg=None):
|
|
if arg:
|
|
self.write(arg)
|
|
self.write('\n') # text-mode streams translate to \r\n if needed
|
|
|
|
|
|
class DignTextTestResult(unittest.result.TestResult):
|
|
"""A test result class that can print formatted text results to a stream.
|
|
Used by TextTestRunner.
|
|
"""
|
|
separator1 = '=' * 70
|
|
separator2 = '-' * 70
|
|
|
|
def __init__(self, stream, descriptions, verbosity):
|
|
super(DignTextTestResult, self).__init__(stream, descriptions, verbosity)
|
|
self.stream = stream
|
|
self.dignStream = _WritelnDecorator(sys.stderr)
|
|
self.showAll = verbosity > 1
|
|
self.dots = verbosity == 1
|
|
self.descriptions = descriptions
|
|
|
|
def getDescription(self, test):
|
|
doc_first_line = test.shortDescription()
|
|
if self.descriptions and doc_first_line:
|
|
return '\n'.join((str(test), doc_first_line))
|
|
else:
|
|
return str(test).split(' ', 1 )[0]
|
|
|
|
def startTest(self, test):
|
|
super(DignTextTestResult, self).startTest(test)
|
|
if self.showAll:
|
|
self.stream.write(self.getDescription(test))
|
|
self.dignStream.write(self.getDescription(test))
|
|
self.stream.write(" ... ")
|
|
self.dignStream.write(" ... ")
|
|
self.stream.flush()
|
|
self.dignStream.flush()
|
|
|
|
def addSuccess(self, test):
|
|
super(DignTextTestResult, self).addSuccess(test)
|
|
if self.showAll:
|
|
self.stream.writeln("ok")
|
|
self.dignStream.writeln("ok")
|
|
elif self.dots:
|
|
self.stream.write('.')
|
|
self.dignStream.write('.')
|
|
self.stream.flush()
|
|
self.dignStream.flush()
|
|
|
|
def addError(self, test, err):
|
|
super(DignTextTestResult, self).addError(test, err)
|
|
if self.showAll:
|
|
self.stream.writeln("ERROR")
|
|
self.dignStream.writeln("ERROR")
|
|
elif self.dots:
|
|
self.stream.write('E')
|
|
self.dignStream.write('E')
|
|
self.stream.flush()
|
|
self.dignStream.flush()
|
|
|
|
def addFailure(self, test, err):
|
|
super(DignTextTestResult, self).addFailure(test, err)
|
|
if self.showAll:
|
|
self.stream.writeln("FAIL")
|
|
self.dignStream.writeln("FAIL")
|
|
elif self.dots:
|
|
self.stream.write('F')
|
|
self.dignStream.write('F')
|
|
self.stream.flush()
|
|
self.dignStream.flush()
|
|
|
|
def addSkip(self, test, reason):
|
|
super(DignTextTestResult, self).addSkip(test, reason)
|
|
if self.showAll:
|
|
self.stream.writeln("skipped {0!r}".format(reason))
|
|
self.dignStream.writeln("skipped {0!r}".format(reason))
|
|
elif self.dots:
|
|
self.stream.write("s")
|
|
self.dignStream.write("s")
|
|
self.stream.flush()
|
|
self.dignStream.flush()
|
|
|
|
def addExpectedFailure(self, test, err):
|
|
super(DignTextTestResult, self).addExpectedFailure(test, err)
|
|
if self.showAll:
|
|
self.stream.writeln("expected failure")
|
|
self.dignStream.writeln("expected failure")
|
|
elif self.dots:
|
|
self.stream.write("x")
|
|
self.dignStream.write("x")
|
|
self.stream.flush()
|
|
self.dignStream.flush()
|
|
|
|
def addUnexpectedSuccess(self, test):
|
|
super(DignTextTestResult, self).addUnexpectedSuccess(test)
|
|
if self.showAll:
|
|
self.stream.writeln("unexpected success")
|
|
self.dignStream.writeln("unexpected success")
|
|
elif self.dots:
|
|
self.stream.write("u")
|
|
self.dignStream.write("u")
|
|
self.stream.flush()
|
|
self.dignStream.flush()
|
|
|
|
def printErrors(self):
|
|
if self.dots or self.showAll:
|
|
self.stream.writeln()
|
|
self.dignStream.writeln()
|
|
self.printErrorList('ERROR', self.errors)
|
|
self.printErrorList('FAIL', self.failures)
|
|
|
|
def printErrorList(self, flavour, errors):
|
|
for test, err in errors:
|
|
self.stream.writeln(self.separator1)
|
|
self.dignStream.writeln(self.separator1)
|
|
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
|
|
self.dignStream.writeln("%s: %s" % (flavour,self.getDescription(test)))
|
|
self.stream.writeln(self.separator2)
|
|
self.dignStream.writeln(self.separator2)
|
|
self.stream.writeln("%s" % err)
|
|
self.dignStream.writeln("%s" % err)
|
|
|
|
def get_logger(name,logPath,enableConsole=True):
|
|
logger = logging.getLogger(name)
|
|
fileHandler = logging.FileHandler(logPath)
|
|
fmt = '%(asctime)s, %(levelname)s, %(message)s'
|
|
formatter = logging.Formatter(fmt=fmt, datefmt='%a %b %d %H:%M:%S %Y')
|
|
fileHandler.setFormatter(formatter)
|
|
fileHandler.setLevel(logging.DEBUG)
|
|
logger.addHandler(fileHandler)
|
|
if enableConsole == True:
|
|
consoleHandler = logging.StreamHandler()
|
|
consoleHandler.setFormatter(formatter)
|
|
consoleHandler.setLevel(logging.DEBUG)
|
|
logger.addHandler(consoleHandler)
|
|
logger.setLevel(logging.DEBUG)
|
|
return logger
|
|
|
|
class DNSCheckRequestBuild:
|
|
def __init__(self):
|
|
self.dns_resolver=dns.resolver.Resolver()
|
|
self.dns_resolver.nameservers = DNS_SERVER_IP
|
|
self.dns_resolver.search = []
|
|
self.dns_resolver.use_search_by_default = False
|
|
|
|
def dns_action_deny_subaction_drop(self,test_suite_name):
|
|
self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
|
|
try:
|
|
dns_answer = self.dns_resolver.query(HOST_DNS_DENY_DORY, 'A')
|
|
except dns.exception.DNSException as errorinfo:
|
|
if type(errorinfo) == dns.resolver.LifetimeTimeout:
|
|
raise Exception(DnsRequestFirewallDenyDrop)
|
|
else:
|
|
raise Exception("Error: The dns_action_deny_subaction_drop check failure, code: %s" % errorinfo)
|
|
else:
|
|
raise Exception("Error: The dns_action_deny_subaction_drop test deny drop failure" )
|
|
|
|
def dns_action_deny_subaction_redirect_a(self,test_suite_name):
|
|
self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
|
|
try:
|
|
dns_answer = self.dns_resolver.query(HOST_DNS_DENY_REDIRECT_A, 'A')
|
|
except dns.exception.DNSException as errorinfo:
|
|
raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure, code: %s" % errorinfo)
|
|
else: # drop-redirect and respond rdtype A ipv4
|
|
if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL:
|
|
raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL))
|
|
return
|
|
for i in dns_answer.response.answer:
|
|
for j in i.items:
|
|
if j.rdtype == 1: #'rdtype is A: ipv4'
|
|
if j.address == DNS_REDIRECT_IPV4_ADDR:
|
|
raise Exception(DnsARequestFireWallDenyRedirect)
|
|
else:
|
|
raise Exception("Error: The dns request rdtype A drop redirect check failure: respond value error")
|
|
else:
|
|
raise Exception("Error: The dns request rdtype A drop redirect check failure: respond rdtype error")
|
|
|
|
def dns_action_deny_subaction_redirect_aaaa(self,test_suite_name):
|
|
self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
|
|
try:
|
|
dns_answer = self.dns_resolver.query(HOST_DNS_DENY_REDIRECT_AAAA, 'AAAA')
|
|
except dns.exception.DNSException as errorinfo:
|
|
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure, code: %s" % errorinfo)
|
|
else: # drop-redirect and respond rdtype A ipv6
|
|
if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL:
|
|
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL))
|
|
return
|
|
|
|
for i in dns_answer.response.answer:
|
|
for j in i.items:
|
|
if j.rdtype == 28: #'rdtype is A: ipv6'
|
|
if j.address == DNS_REDIRECT_IPV6_ADDR:
|
|
raise Exception(DnsAAAARequestFireWallDenyRedirect)
|
|
else:
|
|
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error")
|
|
else:
|
|
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error")
|
|
|
|
def dns_action_deny_subaction_redirect_a_rang_ttl(self,test_suite_name):
|
|
self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
|
|
try:
|
|
dns_answer = self.dns_resolver.query(HOST_DNS_DENY_REDIRECT_A_RTTL, 'A')
|
|
except dns.exception.DNSException as errorinfo:
|
|
raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure, code: %s" % errorinfo)
|
|
else: # drop-redirect and respond rdtype A ipv4
|
|
if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH:
|
|
raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl,DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH))
|
|
return
|
|
|
|
for i in dns_answer.response.answer:
|
|
for j in i.items:
|
|
if j.rdtype == 1: #'rdtype is A: ipv4'
|
|
if j.address == DNS_REDIRECT_IPV4_ADDR:
|
|
raise Exception(DnsARequestFireWallDenyRedirectRangTTL)
|
|
else:
|
|
raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond value error")
|
|
else:
|
|
raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond rdtype error")
|
|
|
|
def dns_action_deny_subaction_redirect_aaaa_rang_ttl(self,test_suite_name):
|
|
self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
|
|
try:
|
|
dns_answer = self.dns_resolver.query(HOST_DNS_DENY_REDIRECT_AAAA_RTTL, 'AAAA')
|
|
except dns.exception.DNSException as errorinfo:
|
|
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa range ttl check failure, code: %s" % errorinfo)
|
|
else: # drop-redirect and respond rdtype A ipv6
|
|
if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH:
|
|
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH))
|
|
return
|
|
|
|
for i in dns_answer.response.answer:
|
|
for j in i.items:
|
|
if j.rdtype == 28: #'rdtype is A: ipv6'
|
|
if j.address == DNS_REDIRECT_IPV6_ADDR:
|
|
raise Exception(DnsAAAARequestFireWallDenyRedirectRangTTL)
|
|
else:
|
|
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error")
|
|
else:
|
|
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error")
|
|
|
|
|
|
|
|
def dns_action_allow_rdtype_a(self,test_suite_name):
|
|
self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
|
|
try:
|
|
dns_answer = self.dns_resolver.query(HOST_DNS_ALLOW_A, 'A')
|
|
except dns.exception.DNSException as errorinfo:
|
|
raise Exception("Error: The dns request rdtype A allow check failure, code: %s" % errorinfo)
|
|
else:
|
|
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
|
|
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
|
|
return
|
|
|
|
for i in dns_answer.response.answer:
|
|
for j in i.items:
|
|
if j.rdtype == 1: #'rdtype is A: ipv4'
|
|
if j.address == DNS_ALLOW_A_ADDR:
|
|
raise Exception(DnsARequestFirewallAllow)
|
|
else:
|
|
raise Exception("Error: The dns request rdtype A allow check failure: respond value error")
|
|
|
|
else:
|
|
raise Exception("Error: The dns request rdtype A allow check failure: respond rdtype error")
|
|
|
|
def dns_action_allow_rdtype_aaaa(self,test_suite_name):
|
|
self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
|
|
try:
|
|
dns_answer = self.dns_resolver.query(HOST_DNS_ALLOW_AAAA, 'AAAA')
|
|
except dns.exception.DNSException as errorinfo:
|
|
raise Exception("Error: The dns request rdtype AAAA allow check failure, code: %s" % errorinfo)
|
|
else:
|
|
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
|
|
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
|
|
return
|
|
|
|
for i in dns_answer.response.answer:
|
|
for j in i.items:
|
|
if j.rdtype == 28: #'rdtype is AAAA: ipv6'
|
|
if j.address == DNS_ALLOW_AAAA_ADDR:
|
|
raise Exception(DnsAAAARequestFirewallAllow)
|
|
else:
|
|
raise Exception("Error: The dns request rdtype AAAA allow check failure: respond value error")
|
|
else:
|
|
raise Exception("Error: The dns request rdtype AAAA allow check failure: response rdtype error")
|
|
|
|
def dns_action_allow_rdtype_cname(self,test_suite_name):
|
|
self.dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
self.dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
|
|
|
|
try:
|
|
dns_answer = self.dns_resolver.query(HOST_DNS_CNAME_QUERY, 'CNAME')
|
|
except dns.exception.DNSException as errorinfo:
|
|
raise Exception("Error: The dns request rdtype CNAME allow check failure, code: %s" % errorinfo)
|
|
else:
|
|
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
|
|
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
|
|
return
|
|
|
|
for i in dns_answer.response.answer:
|
|
for j in i.items:
|
|
if j.rdtype == 5: #'CNAME: tag(www.xxx.com)'
|
|
m=str(j)
|
|
if m == (HOST_DNS_CNAME_ANSWER + '.'):
|
|
raise Exception(DnsCNAMERequestFirewallAllow)
|
|
else:
|
|
raise Exception("Error: The dns request rdtype CNAME allow check failure: respond value error")
|
|
else:
|
|
raise Exception("Error: The dns request rdtype CNAME allow check failure: respond rdtype error")
|
|
|
|
|
|
class SSLCheckRequestBuild:
|
|
def __init__(self):
|
|
self.conn = pycurl.Curl()
|
|
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
|
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
|
|
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
|
|
self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE)
|
|
|
|
def _set_conn_opt(self,test_suite_name, url):
|
|
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
|
|
self.conn.setopt(self.conn.URL,url)
|
|
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
|
|
|
|
def _get_conn_issuer(self,certs):
|
|
issuer = ()
|
|
for cert_info in certs[0]:
|
|
if cert_info[0] == "Issuer":
|
|
issuer = cert_info
|
|
break
|
|
if len(issuer) <= 0:
|
|
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
|
return issuer
|
|
|
|
def ssl_bypass(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLBypass)
|
|
self.conn.perform()
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
self.conn.close()
|
|
issuer = self._get_conn_issuer(certs)
|
|
|
|
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0):
|
|
raise Exception(ssl_bypass_info_re)
|
|
elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0):
|
|
raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1])
|
|
else:
|
|
raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1])
|
|
|
|
def ssl_intercept(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name, URLIntercept)
|
|
self.conn.perform()
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
self.conn.close()
|
|
issuer = self._get_conn_issuer(certs)
|
|
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
|
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
|
raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1])
|
|
else:
|
|
raise Exception(ssl_intercept_info_re)
|
|
else:
|
|
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
|
|
|
|
|
|
class SslInterceptRequestBuild:
|
|
def __init__(self):
|
|
self.conn = pycurl.Curl()
|
|
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
|
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
|
|
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
|
|
self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE)
|
|
|
|
def _set_conn_opt(self,test_suite_name,url):
|
|
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
|
|
self.conn.setopt(self.conn.URL, url)
|
|
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
|
|
|
|
def _conn_to_perform(self, test_suite_name, sec_info_re):
|
|
self.conn.perform()
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
self.conn.close()
|
|
issuer = ()
|
|
for cert_info in certs[0]:
|
|
if cert_info[0].lower() == "issuer":
|
|
issuer = cert_info
|
|
break
|
|
if len(issuer) <= 0:
|
|
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
|
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
|
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
|
raise Exception(sec_info_re)
|
|
else:
|
|
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
|
|
else:
|
|
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0) or \
|
|
re.search(r'\bCN[\s\S]*=[\s\S]*badssl[\s\S]*',issuer[1],0):
|
|
raise Exception("Error: Ssl connection intercept failed, cert info: %s" % issuer[1])
|
|
else:
|
|
raise Exception(sec_info_re)
|
|
|
|
|
|
def ssl_intercept_certerrExpired(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLSslExpired)
|
|
self._conn_to_perform(test_suite_name,https_exprired_info_re)
|
|
|
|
def ssl_intercept_certerrSelf_signed(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLSslSelfsigned)
|
|
self._conn_to_perform(test_suite_name,https_self_signed_info_re)
|
|
|
|
def ssl_intercept_certerrUntrusted_root(self,test_suite_name,):
|
|
self._set_conn_opt(test_suite_name,URLSslSuntrustedroot)
|
|
self._conn_to_perform(test_suite_name,https_untrusted_root_info_re)
|
|
|
|
|
|
class ProxyRequestBuild:
|
|
def __init__(self):
|
|
self.bodyBuf = BytesIO()
|
|
self.conn = pycurl.Curl()
|
|
self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
|
|
self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE)
|
|
|
|
def _cert_verify(self, certs, isSsl):
|
|
if isSsl == True:
|
|
issuer = ()
|
|
for cert_info in certs[0]:
|
|
if cert_info[0].lower() == "issuer":
|
|
issuer = cert_info
|
|
break
|
|
if len(issuer) <= 0:
|
|
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
|
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
|
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
|
raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1])
|
|
else:
|
|
return
|
|
else:
|
|
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
|
|
|
|
|
|
def _set_conn_opt(self,test_suite_name, url,isSsl):
|
|
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
|
|
self.conn.setopt(self.conn.URL,url)
|
|
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
|
|
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
|
|
if isSsl == True:
|
|
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
|
|
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
|
|
|
|
def proxy_redirect(self,test_suite_name,proxy_url,isSsl):
|
|
certs = None
|
|
|
|
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
|
|
self.conn.perform()
|
|
if isSsl == True:
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
|
self.conn.close()
|
|
self._cert_verify(certs, isSsl)
|
|
if rescode == 301 or rescode == 302:
|
|
if isSsl == True:
|
|
raise Exception(ssl_redirect_info_re)
|
|
else:
|
|
raise Exception(http_redirect_info_re)
|
|
else:
|
|
if isSsl == True:
|
|
raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode)
|
|
else:
|
|
raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode)
|
|
|
|
def proxy_replace(self,test_suite_name, proxy_url,isSsl):
|
|
certs = None
|
|
self._set_conn_opt(test_suite_name, proxy_url,isSsl)
|
|
self.conn.perform()
|
|
if isSsl == True:
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
body = self.bodyBuf.getvalue().decode('utf-8')
|
|
self.conn.close()
|
|
self._cert_verify(certs, isSsl)
|
|
if not re.search(r'EnglishSearchShared', body, 0) and \
|
|
re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0):
|
|
if isSsl == True:
|
|
raise Exception(ssl_replace_info_re)
|
|
else:
|
|
raise Exception(http_replace_info_re)
|
|
else:
|
|
if isSsl == True:
|
|
raise Exception("Error:Ssl connection replace fail")
|
|
else:
|
|
raise Exception("Error:Http connection replace fail")
|
|
|
|
def proxy_insert(self,test_suite_name,proxy_url,isSsl):
|
|
certs = None
|
|
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
|
|
self.conn.perform()
|
|
body = self.bodyBuf.getvalue().decode('utf-8')
|
|
if isSsl == True:
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
self.conn.close()
|
|
self._cert_verify(certs, isSsl)
|
|
if re.search(r'httpSelfcheckInsert', body, 0) and \
|
|
re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0):
|
|
if isSsl == True:
|
|
raise Exception(ssl_insert_info_re)
|
|
else:
|
|
raise Exception(http_insert_info_re)
|
|
else:
|
|
if isSsl == True:
|
|
raise Exception("Error:Ssl connection insert fail")
|
|
else:
|
|
raise Exception("Error:Http connection insert fail")
|
|
|
|
def proxy_block(self,test_suite_name,proxy_url,isSsl):
|
|
certs = None
|
|
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
|
|
self.conn.perform()
|
|
if isSsl == True:
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
|
body = self.bodyBuf.getvalue().decode('utf-8')
|
|
self.conn.close()
|
|
self._cert_verify(certs, isSsl)
|
|
if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451):
|
|
if isSsl == True:
|
|
raise Exception(ssl_block_info_re)
|
|
else:
|
|
raise Exception(http_block_info_re)
|
|
else:
|
|
if isSsl == True:
|
|
raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode)
|
|
else:
|
|
raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode)
|
|
|
|
def proxy_hijack(self,test_suite_name,proxy_url,isSsl):
|
|
certs = None
|
|
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
|
|
self.conn.perform()
|
|
if isSsl == True:
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
self.conn.close()
|
|
self._cert_verify(certs, isSsl)
|
|
hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue())
|
|
if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0):
|
|
if isSsl == True:
|
|
raise Exception(ssl_hijack_info_re)
|
|
else:
|
|
raise Exception(http_hijack_info_re)
|
|
else:
|
|
if isSsl == True:
|
|
raise Exception("Error:Ssl connection hijack fail")
|
|
else:
|
|
raise Exception("Error:Http connection hijack fail")
|
|
|
|
|
|
class SSLFileDownloadBuild:
|
|
def __init__(self):
|
|
self.conn = pycurl.Curl()
|
|
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
|
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
|
|
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
|
|
self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE)
|
|
|
|
def _get_conninfo(self,conn):
|
|
dictconninfo = {}
|
|
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
|
|
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
|
|
dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME)
|
|
dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME)
|
|
dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME)
|
|
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
|
|
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
|
|
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
|
|
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
|
|
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
|
|
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
|
|
return dictconninfo
|
|
|
|
def _set_conn_opt(self,test_suite_name,url):
|
|
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
|
|
self.conn.setopt(self.conn.URL,url)
|
|
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
|
|
|
|
def _write_in_logfile(self, sizeStr, connInfoDict):
|
|
connInfoLogPath = "/opt/dign_client/log/download_conn_info.log" + '.' + time.strftime("%Y-%m-%d", time.localtime())
|
|
#connInfoLogger = get_logger("download",connInfoLogPath,enableConsole=False)
|
|
connInfoStr = json.dumps(connInfoDict)
|
|
#connInfoLogger.debug("Fize Size:" + sizeStr + ";connection info:" + connInfoStr)
|
|
with open(connInfoLogPath,"a+") as f:
|
|
f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Fize Size:" + sizeStr + ";connection info:" + connInfoStr + '\n')
|
|
f.close()
|
|
|
|
def conn_traffic(self,test_suite_name,url,conn_taffic_re,sizeStr, size):
|
|
self._set_conn_opt(test_suite_name, url)
|
|
self.conn.perform()
|
|
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
|
|
conninfo = self._get_conninfo(self.conn)
|
|
self.conn.close()
|
|
issuer = ()
|
|
for cert_info in certs[0]:
|
|
if cert_info[0] == "Issuer":
|
|
issuer = cert_info
|
|
break
|
|
if len(issuer) <= 0:
|
|
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
|
|
|
|
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
|
|
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
|
|
raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1])
|
|
else:
|
|
raise Exception("Error: Intercept fail: no Tango cert,cert info:%s" % issuer[1])
|
|
|
|
if int(conninfo["size_download"]) == size:
|
|
self._write_in_logfile(sizeStr,conninfo)
|
|
raise Exception(conn_taffic_re)
|
|
else:
|
|
raise Exception("Error: connection tarffic size error and is no equal", sizeStr)
|
|
|
|
|
|
class HttpFirewallActionBuild:
|
|
def __init__(self):
|
|
self.conn = pycurl.Curl()
|
|
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
|
self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE)
|
|
|
|
def _set_conn_opt(self,test_suite_name, url):
|
|
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
|
|
self.conn.setopt(self.conn.URL,url)
|
|
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
|
|
|
|
def action_allow(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLHttpFirewallAllow)
|
|
self.conn.perform()
|
|
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
|
self.conn.close()
|
|
if rescode == 200:
|
|
raise Exception(http_firewall_allow_re)
|
|
else:
|
|
raise Exception("Error: The stream may be redirected, http code %s" % rescode)
|
|
|
|
def action_deny_subaction_drop(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyDrop)
|
|
try:
|
|
self.conn.perform()
|
|
self.conn.close()
|
|
except pycurl.error as errorinfo:
|
|
errcode = errorinfo.args[0]
|
|
if(errcode == 28):
|
|
raise Exception(http_firewall_deny_drop_re)
|
|
else:
|
|
raise Exception("Error: The stream may be not dropped %s" % errorinfo)
|
|
|
|
|
|
def action_deny_subaction_rst(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyRst)
|
|
try:
|
|
self.conn.perform()
|
|
self.conn.close()
|
|
except pycurl.error as errorinfo:
|
|
errcode = errorinfo.args[0]
|
|
if(errcode == 56):
|
|
raise Exception(http_firewall_deny_rst_re)
|
|
else:
|
|
raise Exception("Error: The stream may be not rst %s" % errorinfo)
|
|
|
|
def action_deny_subaction_block(self,test_suite_name):
|
|
bodyBuf = BytesIO()
|
|
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyBlock)
|
|
self.conn.setopt(self.conn.WRITEDATA, bodyBuf)
|
|
self.conn.perform()
|
|
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
|
self.conn.close()
|
|
if rescode == 403:
|
|
if re.search(r'dign-testing-deny-block', bodyBuf.getvalue().decode('utf-8'), 0):
|
|
raise Exception(http_firewall_deny_block_re)
|
|
else:
|
|
raise Exception("Error: response content is not required, required \'dign-testing-deny-block\'")
|
|
|
|
else:
|
|
raise Exception("Error: The stream may be not block, http code %s " % rescode)
|
|
|
|
class SslFirewallActionBuild:
|
|
def __init__(self):
|
|
self.conn = pycurl.Curl()
|
|
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
|
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
|
|
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
|
|
self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE)
|
|
|
|
def _set_conn_opt(self,test_suite_name, url):
|
|
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
|
|
self.conn.setopt(self.conn.URL,url)
|
|
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
|
|
|
|
def action_allow(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLSslFirewallAllow)
|
|
self.conn.perform()
|
|
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
|
self.conn.close()
|
|
if rescode == 200:
|
|
raise Exception(ssl_firewall_allow_re)
|
|
else:
|
|
raise Exception("Error: The stream may be redirected, http code %s" % rescode)
|
|
|
|
def action_deny_subaction_drop(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLSslFirewallDenyDrop)
|
|
try:
|
|
self.conn.perform()
|
|
self.conn.close()
|
|
except pycurl.error as errorinfo:
|
|
errcode = errorinfo.args[0]
|
|
if(errcode == 28):
|
|
raise Exception(ssl_firewall_deny_drop_re)
|
|
else:
|
|
raise Exception("Error: The stream may be not dropped %s" % errorinfo)
|
|
|
|
def action_deny_subaction_rst(self,test_suite_name):
|
|
self._set_conn_opt(test_suite_name,URLSslFirewallDenyRst)
|
|
try:
|
|
self.conn.perform()
|
|
self.conn.close()
|
|
except pycurl.error as errorinfo:
|
|
errcode = errorinfo.args[0]
|
|
if(errcode == 35):
|
|
raise Exception(ssl_firewall_deny_rst_re)
|
|
else:
|
|
raise Exception("Error: The stream may be not rst %s" % errorinfo)
|
|
|
|
|
|
class FilterTestingBuild:
|
|
def __init__(self):
|
|
self.conn = pycurl.Curl()
|
|
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
|
|
self.conn.setopt(self.conn.RESOLVE,REQUEST_RESOLVE)
|
|
|
|
def _set_conn_opt(self,test_suite_name, url):
|
|
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
|
|
self.conn.setopt(self.conn.URL,url)
|
|
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
|
|
|
|
def _firewall_deny_reset(self,test_suite_name, url, raise_re):
|
|
self._set_conn_opt(test_suite_name,url)
|
|
try:
|
|
self.conn.perform()
|
|
self.conn.close()
|
|
except pycurl.error as errorinfo:
|
|
errcode = errorinfo.args[0]
|
|
if(errcode == 56):
|
|
raise Exception(raise_re)
|
|
else:
|
|
raise Exception("Error: The stream may be not rst %s" % errorinfo)
|
|
|
|
def _proxy_deny(self,test_suite_name,url,replaceStr,raise_re):
|
|
bodyBuf = BytesIO()
|
|
self._set_conn_opt(test_suite_name,url)
|
|
self.conn.setopt(self.conn.WRITEDATA, bodyBuf)
|
|
self.conn.perform()
|
|
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
|
|
body = bodyBuf.getvalue().decode('utf-8')
|
|
self.conn.close()
|
|
if re.search(replaceStr, body, 0) and (rescode == 404 or rescode == 451):
|
|
raise Exception(raise_re)
|
|
else:
|
|
raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode)
|
|
|
|
def firewall_http_deny_reset_filter_host(self,test_suite_name):
|
|
self._firewall_deny_reset(test_suite_name,URLHttpFirewallDenyRstFilterHost,http_firewall_deny_rst_filter_host_re)
|
|
|
|
def firewall_http_deny_reset_filter_url(self,test_suite_name):
|
|
self._firewall_deny_reset(test_suite_name,URLHttpFirewallDenyRstFilterURL,http_firewall_deny_rst_filter_url_re)
|
|
|
|
def proxy_http_deny_filter_host(self,test_suite_name):
|
|
self._proxy_deny(test_suite_name,URLHttpProxyDenyFilterHost, "testing-proxy-filter-host",http_proxy_deny_filter_host_re)
|
|
|
|
def proxy_http_deny_filter_url(self,test_suite_name):
|
|
self._proxy_deny(test_suite_name,URLHttpProxyDenyFilterURL,"testing-proxy-filter-url",http_proxy_deny_filter_url_re)
|
|
|
|
class TSGDiagnoseTest(unittest.TestCase):
|
|
|
|
def test_firewallDenyDrop_dns(self):
|
|
dnsHandler = DNSCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, DnsRequestFirewallDenyDrop):
|
|
dnsHandler.dns_action_deny_subaction_drop('test_firewallDenyDrop_dns')
|
|
|
|
def test_firewallDenyRedirectA_dns(self):
|
|
dnsHandler = DNSCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirect):
|
|
dnsHandler.dns_action_deny_subaction_redirect_a('test_firewallDenyRedirectA_dns')
|
|
|
|
def test_firewallDenyRedirectAAAA_dns(self):
|
|
dnsHandler = DNSCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirect):
|
|
dnsHandler.dns_action_deny_subaction_redirect_aaaa('test_firewallDenyRedirectARangeTTL_dns')
|
|
|
|
def test_firewallDenyRedirectARangeTTL_dns(self):
|
|
dnsHandler = DNSCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirectRangTTL):
|
|
dnsHandler.dns_action_deny_subaction_redirect_a_rang_ttl('test_firewallDenyRedirectARangeTTL_dns')
|
|
|
|
def test_firewallDenyRedirectAAAARangeTTL_dns(self):
|
|
dnsHandler = DNSCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirectRangTTL):
|
|
dnsHandler.dns_action_deny_subaction_redirect_aaaa_rang_ttl('test_firewallDenyRedirectAAAARangeTTL_dns')
|
|
|
|
def test_dnsRequest_allow_rdtype_a(self):
|
|
dnsHandler = DNSCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, DnsARequestFirewallAllow):
|
|
dnsHandler.dns_action_allow_rdtype_a('test_dnsRequest_allow_rdtype_a')
|
|
|
|
def test_dnsRequest_allow_rdtype_aaaa(self):
|
|
dnsHandler = DNSCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, DnsAAAARequestFirewallAllow):
|
|
dnsHandler.dns_action_allow_rdtype_aaaa('test_dnsRequest_allow_rdtype_aaaa')
|
|
|
|
def test_dnsRequest_allow_rdtype_cname(self):
|
|
dnsHandler = DNSCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, DnsCNAMERequestFirewallAllow):
|
|
dnsHandler.dns_action_allow_rdtype_cname('test_dnsRequest_allow_rdtype_cname')
|
|
|
|
def test_firewallBypass_ssl(self):
|
|
sslHandler = SSLCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_bypass_info_re):
|
|
sslHandler.ssl_bypass('test_firewallBypass_ssl')
|
|
|
|
def test_firewallIntercept_ssl(self):
|
|
sslHandler = SSLCheckRequestBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_intercept_info_re):
|
|
sslHandler.ssl_intercept('test_firewallIntercept_ssl')
|
|
|
|
def test_firewallIntercept_sslCerterrExpired(self):
|
|
requestHandler = SslInterceptRequestBuild()
|
|
with self.assertRaisesRegex(Exception, https_exprired_info_re):
|
|
requestHandler.ssl_intercept_certerrExpired('test_firewallIntercept_sslCerterrExpired')
|
|
|
|
def test_firewallIntercept_sslCerterrSelfsigned(self):
|
|
requestHandler = SslInterceptRequestBuild()
|
|
with self.assertRaisesRegex(Exception, https_self_signed_info_re):
|
|
requestHandler.ssl_intercept_certerrSelf_signed('test_firewallIntercept_sslCerterrSelfsigned')
|
|
|
|
def test_firewallIntercept_sslCerterrUntrustedroot(self):
|
|
requestHandler = SslInterceptRequestBuild()
|
|
with self.assertRaisesRegex(Exception, https_untrusted_root_info_re):
|
|
requestHandler.ssl_intercept_certerrUntrusted_root('test_firewallIntercept_sslCerterrUntrustedroot')
|
|
|
|
def test_proxyRedirect_ssl(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_redirect_info_re):
|
|
proxyHandler.proxy_redirect('test_proxyRedirect_ssl',URLSslRedirect,True)
|
|
|
|
def test_proxyBlock_ssl(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_block_info_re):
|
|
proxyHandler.proxy_block('test_proxyBlock_ssl', URLSslBlock,True)
|
|
|
|
def test_proxyReplace_ssl(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_replace_info_re):
|
|
proxyHandler.proxy_replace('test_proxyReplace_ssl',URLSslReplace, True)
|
|
|
|
def test_proxyHijack_ssl(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_hijack_info_re):
|
|
proxyHandler.proxy_hijack('test_proxyHijack_ssl', URLSslHijack,True)
|
|
|
|
def test_proxyInsert_ssl(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_insert_info_re):
|
|
proxyHandler.proxy_insert('test_proxyInsert_ssl',URLSslInsert,True)
|
|
|
|
|
|
def test_proxyRedirect_http(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, http_redirect_info_re):
|
|
proxyHandler.proxy_redirect('test_proxyRedirect_http',URLHttpRedirect, False)
|
|
|
|
def test_proxyBlock_http(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, http_block_info_re):
|
|
proxyHandler.proxy_block('test_proxyBlock_http', URLHttpBlock,False)
|
|
|
|
def test_proxyReplace_http(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, http_replace_info_re):
|
|
proxyHandler.proxy_replace('test_proxyReplace_http',URLHttpReplace, False)
|
|
|
|
def test_proxyHijack_http(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, http_hijack_info_re):
|
|
proxyHandler.proxy_hijack('test_proxyHijack_http', URLHttpHijack,False)
|
|
|
|
def test_proxyInsert_http(self):
|
|
proxyHandler = ProxyRequestBuild()
|
|
with self.assertRaisesRegex(Exception, http_insert_info_re):
|
|
proxyHandler.proxy_insert('test_proxyInsert_http',URLHttpInsert,False)
|
|
|
|
def test_firewallIntercept_sslDownloadSize1k(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize1k', URLConTraffic_1k, https_conn_taffic_1k_re,'1k', 1024)
|
|
|
|
def test_firewallIntercept_sslDownloadSize4k(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize4k',URLConTraffic_4k, https_conn_taffic_4k_re, '4k', 4*1024)
|
|
|
|
def test_firewallIntercept_sslDownloadSize16k(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize16k', URLConTraffic_16k, https_conn_taffic_16k_re,'16k', 16*1024)
|
|
|
|
def test_firewallIntercept_sslDownloadSize64k(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize64k',URLConTraffic_64k, https_conn_taffic_64k_re, '64k', 64*1024)
|
|
|
|
def test_firewallIntercept_sslDownloadSize256k(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize256k', URLConTraffic_256k,https_conn_taffic_256k_re,'256k', 256*1024)
|
|
|
|
def test_firewallIntercept_sslDownloadSize1M(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize1M', URLConTraffic_1M, https_conn_taffic_1M_re, '1M', 1024 * 1024)
|
|
|
|
def test_firewallIntercept_sslDownloadSize4M(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize4M', URLConTraffic_4M, https_conn_taffic_4M_re,'4M', 4*1024*1024)
|
|
|
|
def test_firewallIntercept_sslDownloadSize16M(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize16M', URLConTraffic_16M,https_conn_taffic_16M_re,'16M',16*1024*1024)
|
|
|
|
def test_firewallIntercept_sslDownloadSize64M(self):
|
|
requestHandler = SSLFileDownloadBuild()
|
|
with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re):
|
|
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize64M',URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024)
|
|
|
|
def test_firewallAllow_http(self):
|
|
requestHandler = HttpFirewallActionBuild()
|
|
with self.assertRaisesRegex(Exception, http_firewall_allow_re):
|
|
requestHandler.action_allow('test_firewallAllow_http')
|
|
|
|
def test_firewallDenyDrop_http(self):
|
|
requestHandler = HttpFirewallActionBuild()
|
|
with self.assertRaisesRegex(Exception, http_firewall_deny_drop_re):
|
|
requestHandler.action_deny_subaction_drop('test_firewallDenyDrop_http')
|
|
|
|
def test_firewallDenyReset_http(self):
|
|
requestHandler = HttpFirewallActionBuild()
|
|
with self.assertRaisesRegex(Exception, http_firewall_deny_rst_re):
|
|
requestHandler.action_deny_subaction_rst('test_firewallDenyReset_http')
|
|
|
|
def test_firewallDenyBlock_http(self):
|
|
requestHandler = HttpFirewallActionBuild()
|
|
with self.assertRaisesRegex(Exception, http_firewall_deny_block_re):
|
|
requestHandler.action_deny_subaction_block('test_firewallDenyBlock_http')
|
|
|
|
def test_firewallAllow_ssl(self):
|
|
requestHandler = SslFirewallActionBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_firewall_allow_re):
|
|
requestHandler.action_allow('test_firewallAllow_ssl')
|
|
|
|
def test_firewallDenyDrop_ssl(self):
|
|
requestHandler = SslFirewallActionBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_firewall_deny_drop_re):
|
|
requestHandler.action_deny_subaction_drop('test_firewallDenyDrop_ssl')
|
|
|
|
def test_firewallDenyReset_ssl(self):
|
|
requestHandler = SslFirewallActionBuild()
|
|
with self.assertRaisesRegex(Exception, ssl_firewall_deny_rst_re):
|
|
requestHandler.action_deny_subaction_rst('test_firewallDenyReset_ssl')
|
|
|
|
def test_firewallDenyResetFilterHost_http(self):
|
|
requestHandler = FilterTestingBuild()
|
|
with self.assertRaisesRegex(Exception, http_firewall_deny_rst_filter_host_re):
|
|
requestHandler.firewall_http_deny_reset_filter_host('test_firewallDenyResetFilterHost_http')
|
|
|
|
def test_firewallDenyResetFilterURL_http(self):
|
|
requestHandler = FilterTestingBuild()
|
|
with self.assertRaisesRegex(Exception, http_firewall_deny_rst_filter_url_re):
|
|
requestHandler.firewall_http_deny_reset_filter_url('test_firewallDenyResetFilterURL_http')
|
|
|
|
def test_proxyDenyFilterHost_http(self):
|
|
requestHandler = FilterTestingBuild()
|
|
with self.assertRaisesRegex(Exception, http_proxy_deny_filter_host_re):
|
|
requestHandler.proxy_http_deny_filter_host('test_proxyDenyFilterHost_http')
|
|
|
|
def test_proxyDenyFilterURL_http(self):
|
|
requestHandler = FilterTestingBuild()
|
|
with self.assertRaisesRegex(Exception, http_proxy_deny_filter_url_re):
|
|
requestHandler.proxy_http_deny_filter_url('test_proxyDenyFilterURL_http')
|
|
|
|
class TsgDiagnose:
|
|
def __init__(self):
|
|
self.interval = 1
|
|
self.loop = False
|
|
self.count = 1
|
|
self.config = None
|
|
self.client = None
|
|
self.config_dict = {}
|
|
self.dign_duration = 0
|
|
self.count_service_function = 1
|
|
|
|
def _get_dign_option(self):
|
|
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help")
|
|
parser.add_argument('-i','--interval', type = int, default = 30,help='Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose.')
|
|
parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535')
|
|
parser.add_argument('-p','--configpath', type = str, default = '/opt/dign_client/etc/client.conf',help='Specifies the config file, default /opt/dign_client/etc/client.conf')
|
|
parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal')
|
|
parser.add_argument('-C','--count_service_function', type = int, default = 1, help='Specifies the counts of service_function ,range:1-256')
|
|
args = parser.parse_args()
|
|
self.interval = args.interval
|
|
self.loop = args.loop
|
|
self.count = args.count
|
|
self.config = args.configpath
|
|
self.count_service_function = args.count_service_function
|
|
if self.count == 0:
|
|
print("Error: bad number of tsg diagnose and will exit")
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
def _get_dign_config(self):
|
|
global suite_test_config_dict
|
|
config = ConfigParser()
|
|
config.read(self.config, encoding='UTF-8')
|
|
for section in config.sections():
|
|
if section in suite_test_config_dict:
|
|
suite_test_config_dict[section].update(config.items(section))
|
|
else:
|
|
suite_test_config_dict[section] = dict(config.items(section))
|
|
self.config_dict = suite_test_config_dict
|
|
for config_value in self.config_dict.values():
|
|
if 'conn_timeout' not in config_value:
|
|
continue
|
|
if int(config_value['enabled']) != 1:
|
|
continue
|
|
self.dign_duration += int(config_value['conn_timeout'])
|
|
|
|
|
|
def _add_suite(self,test_suite_name):
|
|
if int(self.config_dict[test_suite_name]['enabled']) == 1:
|
|
self.suite.addTest(TSGDiagnoseTest(test_suite_name))
|
|
|
|
def _add_dign_case(self):
|
|
self.suite = unittest.TestSuite()
|
|
self.suite._cleanup = False
|
|
self._add_suite('test_firewallBypass_ssl')
|
|
self._add_suite('test_firewallIntercept_ssl')
|
|
self._add_suite('test_firewallIntercept_sslCerterrExpired')
|
|
self._add_suite('test_firewallIntercept_sslCerterrSelfsigned')
|
|
self._add_suite('test_firewallIntercept_sslCerterrUntrustedroot')
|
|
self._add_suite('test_proxyRedirect_ssl')
|
|
self._add_suite('test_proxyBlock_ssl')
|
|
self._add_suite('test_proxyReplace_ssl')
|
|
self._add_suite('test_proxyHijack_ssl')
|
|
self._add_suite('test_proxyInsert_ssl')
|
|
self._add_suite('test_proxyRedirect_http')
|
|
self._add_suite('test_proxyBlock_http')
|
|
self._add_suite('test_proxyReplace_http')
|
|
self._add_suite('test_proxyHijack_http')
|
|
self._add_suite('test_proxyInsert_http')
|
|
self._add_suite('test_firewallAllow_http')
|
|
self._add_suite('test_firewallDenyDrop_http')
|
|
self._add_suite('test_firewallDenyReset_http')
|
|
self._add_suite('test_firewallDenyBlock_http')
|
|
self._add_suite('test_firewallAllow_ssl')
|
|
self._add_suite('test_firewallDenyDrop_ssl')
|
|
self._add_suite('test_firewallDenyReset_ssl')
|
|
self._add_suite('test_firewallDenyDrop_dns')
|
|
self._add_suite('test_firewallDenyRedirectA_dns')
|
|
self._add_suite('test_firewallDenyRedirectAAAA_dns')
|
|
self._add_suite('test_firewallDenyRedirectARangeTTL_dns')
|
|
self._add_suite('test_firewallDenyRedirectAAAARangeTTL_dns')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize1k')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize4k')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize16k')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize64k')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize256k')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize1M')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize4M')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize16M')
|
|
self._add_suite('test_firewallIntercept_sslDownloadSize64M')
|
|
self._add_suite('test_firewallDenyResetFilterHost_http')
|
|
self._add_suite('test_firewallDenyResetFilterURL_http')
|
|
self._add_suite('test_proxyDenyFilterHost_http')
|
|
self._add_suite('test_proxyDenyFilterURL_http')
|
|
|
|
def _dign_running(self,id_service_function):
|
|
print(format(("Service function id:" + str(id_service_function) + ",Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^70s'))
|
|
runningLogPath = "/opt/dign_client/log/tsg-diagnose.log" + '.' + time.strftime("%Y-%m-%d", time.localtime())
|
|
#runningLogger = get_logger("running",runningLogPath, False)
|
|
#runningLogger.debug("Diagnose Start,the It will take up to %d seconds" %(self.dign_duration))
|
|
with open(runningLogPath,"a+") as f:
|
|
f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Diagnose Start,the It will take up to " + str(self.dign_duration) + " senconds")
|
|
f.write('\n')
|
|
f.close()
|
|
result_stream = io.StringIO()
|
|
runner = unittest.TextTestRunner(stream=result_stream,verbosity=2,resultclass=DignTextTestResult)
|
|
runner.run(self.suite)
|
|
with open(runningLogPath,"a+") as f:
|
|
f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Diagnose end, Testing results:" + "\n" + result_stream.getvalue())
|
|
f.close()
|
|
#runningLogger.debug("Diagnose end, Testing results:" + "\n" + result_stream.getvalue())
|
|
print(format(("Service function id:" + str(id_service_function) + ",Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^70s'))
|
|
|
|
def _dign_service_function_running(self):
|
|
for id_service_function in range(1,self.count_service_function + 1):
|
|
set_http_request_resolve(id_service_function)
|
|
set_dns_server_ip(id_service_function)
|
|
#print(REQUEST_RESOLVE)
|
|
self._dign_running(id_service_function)
|
|
|
|
def dign_exec(self):
|
|
self._get_dign_option()
|
|
self._get_dign_config()
|
|
self._add_dign_case()
|
|
dign_counter = 0
|
|
try:
|
|
if int(self.config_dict['start_time_random_delay_range']['enabled']) == 1:
|
|
time.sleep(random.randint( \
|
|
int(self.config_dict['start_time_random_delay_range']['left_edge']),\
|
|
int(self.config_dict['start_time_random_delay_range']['right_edge'])))
|
|
while True:
|
|
print("\nRUN %d" %(dign_counter + 1))
|
|
self._dign_service_function_running()
|
|
dign_counter = dign_counter + 1
|
|
if not self.loop:
|
|
if dign_counter >= self.count:
|
|
break
|
|
time.sleep(self.interval)
|
|
except Exception as ex:
|
|
print("Process get an exception, will exit, Exception info:", ex)
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
tsg_diagnose_running = TsgDiagnose()
|
|
tsg_diagnose_running.dign_exec()
|