This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tsg-tsg-diagnose/images_build/client/dign_client/bin/client.py

1338 lines
72 KiB
Python

import sys
import unittest
import json
import pycurl
import os
import re
import time
import io
from io import BytesIO
import getopt
import ciunittest
import argparse
import hashlib
from configparser import ConfigParser
import random
import dns.exception
import dns.resolver
import sys
import logging
suite_test_config_dict = {'test_firewallBypass_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyDrop_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyRedirectA_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyRedirectAAAA_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyRedirectARangeTTL_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyRedirectAAAARangeTTL_dns': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_allow_rdtype_a': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_allow_rdtype_aaaa': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_dnsRequest_allow_rdtype_cname': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslCerterrExpired': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslCerterrSelfsigned': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslCerterrUntrustedroot': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyRedirect_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyBlock_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyReplace_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyHijack_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyInsert_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyRedirect_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyBlock_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyReplace_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyHijack_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyInsert_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize1k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize4k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize16k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize64k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize256k': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize1M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize4M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize16M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallIntercept_sslDownloadSize64M': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallAllow_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyDrop_http': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600},
'test_firewallDenyReset_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyBlock_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallAllow_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyDrop_ssl': {'enabled':1,'conn_timeout':4,'max_recv_speed_large':6553600},
'test_firewallDenyReset_ssl': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyResetFilterHost_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_firewallDenyResetFilterURL_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyDenyFilterHost_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'test_proxyDenyFilterURL_http': {'enabled':1,'conn_timeout':1,'max_recv_speed_large':6553600},
'start_time_random_delay_range': {'enabled':1,'left_edge':1,'right_edge':30}}
ssl_bypass_info_re = "Ssl connection bypass success"
ssl_intercept_info_re = "Ssl connection intercept success"
https_exprired_info_re = "Ssl exprired cert check success"
https_self_signed_info_re = "Ssl self signed cert check success"
https_untrusted_root_info_re = "Ssl untrusted_root cert check success"
ssl_redirect_info_re = "Ssl connection redirect success"
ssl_replace_info_re = "Ssl connection replace success"
ssl_insert_info_re = "Ssl connection insert success"
ssl_hijack_info_re = "Ssl connection hijack success"
ssl_block_info_re = "Ssl connection block success"
http_redirect_info_re = "http connection redirect success"
http_replace_info_re = "http connection replace success"
http_insert_info_re = "http connection insert success"
http_hijack_info_re = "http connection hijack success"
http_block_info_re = "http connection block success"
https_conn_taffic_1k_re = 'https download file 1k success'
https_conn_taffic_4k_re = 'https download file 4k success'
https_conn_taffic_16k_re = 'https download file 16k success'
https_conn_taffic_64k_re = 'https download file 64k success'
https_conn_taffic_256k_re = 'https download file 256k success'
https_conn_taffic_1M_re = 'https download file 1M success'
https_conn_taffic_4M_re = 'https download file 4M success'
https_conn_taffic_16M_re = 'https download file 16M success'
https_conn_taffic_64M_re = 'https download file 64M success'
http_firewall_allow_re = "http firewall action allow success"
http_firewall_deny_drop_re = "http firewall aciton deny subaction drop success"
http_firewall_deny_rst_re = "http firewall action deny subaction rst success"
http_firewall_deny_block_re = "http firewall aciton deny subaction block success"
ssl_firewall_allow_re = "ssl firewall action allow success"
ssl_firewall_deny_drop_re = "ssl firewall action deny subaction drop success"
ssl_firewall_deny_rst_re = "ssl firewall action deny subaction rst success"
URLBypass = 'https://sha384.badssl.selftest.gdnt-cloud.website'
URLIntercept = 'https://sha256.badssl.selftest.gdnt-cloud.website'
URLSslExpired = 'https://expired.badssl.selftest.gdnt-cloud.website'
URLSslSelfsigned = 'https://self-signed.badssl.selftest.gdnt-cloud.website'
URLSslSuntrustedroot = 'https://untrusted-root.badssl.selftest.gdnt-cloud.website'
URLSslRedirect = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js'
URLSslReplace = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js'
URLSslInsert = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html'
URLSslHijack = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js'
URLSslBlock = 'https://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js'
URLHttpRedirect = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyRedirect.js'
URLHttpReplace = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyReplace.js'
URLHttpInsert = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyInsert.html'
URLHttpHijack = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyHijack.js'
URLHttpBlock = 'http://web-replay.badssl.selftest.gdnt-cloud.website/resources/proxyBlock.js'
URLConTraffic_1k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1k"
URLConTraffic_4k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4k"
URLConTraffic_16k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16k"
URLConTraffic_64k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64k"
URLConTraffic_256k = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/256k"
URLConTraffic_1M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/1M"
URLConTraffic_4M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/4M"
URLConTraffic_16M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/16M"
URLConTraffic_64M = "https://testing-download.badssl.selftest.gdnt-cloud.website/resources/64M"
URLHttpFirewallAllow = "http://http.badssl.selftest.gdnt-cloud.website"
URLHttpFirewallDenyDrop = "http://http-credit-card.badssl.selftest.gdnt-cloud.website"
URLHttpFirewallDenyRst = "http://http-dynamic-login.badssl.selftest.gdnt-cloud.website"
URLHttpFirewallDenyBlock = "http://http-login.badssl.selftest.gdnt-cloud.website"
URLSslFirewallAllow = "https://sha512.badssl.selftest.gdnt-cloud.website"
URLSslFirewallDenyDrop = "https://rsa2048.badssl.selftest.gdnt-cloud.website"
URLSslFirewallDenyRst = "https://rsa4096.badssl.selftest.gdnt-cloud.website"
URLHttpFirewallDenyRstFilterHost = 'http://testing-firewall-filter-host.badssl.selftest.gdnt-cloud.website'
URLHttpFirewallDenyRstFilterURL = 'http://testing-firewall-filter-url.badssl.selftest.gdnt-cloud.website'
URLHttpProxyDenyFilterHost = 'http://testing-proxy-filter-host.badssl.selftest.gdnt-cloud.website'
URLHttpProxyDenyFilterURL = 'http://testing-proxy-filter-url.badssl.selftest.gdnt-cloud.website'
http_firewall_deny_rst_filter_host_re = "testing firewall deny reset filter host ok"
http_firewall_deny_rst_filter_url_re = "testing firewall deny reset filter url ok"
http_proxy_deny_filter_host_re = "testing proxy deny filter host ok"
http_proxy_deny_filter_url_re = "testing proxy deny filter url ok"
HOST_DNS_ALLOW_A = "dnstest.allow-a-ipv4.selftest.gdnt-cloud.website"
HOST_DNS_DENY_REDIRECT_A = "dnstest.deny-redirect-a-ipv4.selftest.gdnt-cloud.website"
HOST_DNS_DENY_DORY = "dnstest.deny-drop-ipv4.selftest.gdnt-cloud.website"
HOST_DNS_DENY_REDIRECT_A_RTTL = "dnstest.deny-redirect-a-rttl-ipv4.selftest.gdnt-cloud.website"
HOST_DNS_ALLOW_AAAA = "dnstest.allow-4a-ipv6.selftest.gdnt-cloud.website"
HOST_DNS_DENY_REDIRECT_AAAA = "dnstest.deny-redirect-4a-ipv6.selftest.gdnt-cloud.website"
HOST_DNS_DENY_REDIRECT_AAAA_RTTL = "dnstest.deny-redirect-4a-rttl-ipv6.selftest.gdnt-cloud.website"
HOST_DNS_CNAME_QUERY = "dnstest.test-cname.selftest.gdnt-cloud.website"
HOST_DNS_CNAME_ANSWER = "dnstest.testanswer-cname.selftest.gdnt-cloud.website"
DNS_REDIRECT_IPV4_ADDR = "33.252.0.101"
DNS_REDIRECT_IPV6_ADDR = "2001:db8::1001"
DNS_ALLOW_A_ADDR = "233.252.0.1"
DNS_ALLOW_AAAA_ADDR = "2001:db8::1"
DNS_SERVER_ALLOW_TTL = 60
DNS_SERVER_REDIRECT_TTL = 333
DNS_SERVER_REDIRECT_RANGE_LOW = 400
DNS_SERVER_REDIRECT_RANGE_HIGH = 500
DNS_SERVER_IP = ["192.0.2.135"]
DnsRequestFirewallDenyDrop = "Dns request timeout is deny drop sucess"
DnsARequestFireWallDenyRedirect = "Dns rdtype A request is deny reidrect sucess"
DnsAAAARequestFireWallDenyRedirect = "Dns rdtype AAAA request is deny redirect sucess"
DnsARequestFireWallDenyRedirectRangTTL = "Dns rdtype A request is deny reidrect and range ttl sucess"
DnsAAAARequestFireWallDenyRedirectRangTTL = "Dns rdtype AAAA request is deny redirect and range ttl sucess"
DnsARequestFirewallAllow = "Dns rdtype A request data is sucess"
DnsAAAARequestFirewallAllow = "Dns rdtype AAAA request data is sucess"
DnsCNAMERequestFirewallAllow = "Dns rdtype CNAME request data is sucess"
class _WritelnDecorator(object):
"""Used to decorate file-like objects with a handy 'writeln' method"""
def __init__(self,stream):
self.stream = stream
def __getattr__(self, attr):
if attr in ('stream', '__getstate__'):
raise AttributeError(attr)
return getattr(self.stream,attr)
def writeln(self, arg=None):
if arg:
self.write(arg)
self.write('\n') # text-mode streams translate to \r\n if needed
class DignTextTestResult(unittest.result.TestResult):
"""A test result class that can print formatted text results to a stream.
Used by TextTestRunner.
"""
separator1 = '=' * 70
separator2 = '-' * 70
def __init__(self, stream, descriptions, verbosity):
super(DignTextTestResult, self).__init__(stream, descriptions, verbosity)
self.stream = stream
self.dignStream = _WritelnDecorator(sys.stderr)
self.showAll = verbosity > 1
self.dots = verbosity == 1
self.descriptions = descriptions
def getDescription(self, test):
doc_first_line = test.shortDescription()
if self.descriptions and doc_first_line:
return '\n'.join((str(test), doc_first_line))
else:
return str(test).split(' ', 1 )[0]
def startTest(self, test):
super(DignTextTestResult, self).startTest(test)
if self.showAll:
self.stream.write(self.getDescription(test))
self.dignStream.write(self.getDescription(test))
self.stream.write(" ... ")
self.dignStream.write(" ... ")
self.stream.flush()
self.dignStream.flush()
def addSuccess(self, test):
super(DignTextTestResult, self).addSuccess(test)
if self.showAll:
self.stream.writeln("ok")
self.dignStream.writeln("ok")
elif self.dots:
self.stream.write('.')
self.dignStream.write('.')
self.stream.flush()
self.dignStream.flush()
def addError(self, test, err):
super(DignTextTestResult, self).addError(test, err)
if self.showAll:
self.stream.writeln("ERROR")
self.dignStream.writeln("ERROR")
elif self.dots:
self.stream.write('E')
self.dignStream.write('E')
self.stream.flush()
self.dignStream.flush()
def addFailure(self, test, err):
super(DignTextTestResult, self).addFailure(test, err)
if self.showAll:
self.stream.writeln("FAIL")
self.dignStream.writeln("FAIL")
elif self.dots:
self.stream.write('F')
self.dignStream.write('F')
self.stream.flush()
self.dignStream.flush()
def addSkip(self, test, reason):
super(DignTextTestResult, self).addSkip(test, reason)
if self.showAll:
self.stream.writeln("skipped {0!r}".format(reason))
self.dignStream.writeln("skipped {0!r}".format(reason))
elif self.dots:
self.stream.write("s")
self.dignStream.write("s")
self.stream.flush()
self.dignStream.flush()
def addExpectedFailure(self, test, err):
super(DignTextTestResult, self).addExpectedFailure(test, err)
if self.showAll:
self.stream.writeln("expected failure")
self.dignStream.writeln("expected failure")
elif self.dots:
self.stream.write("x")
self.dignStream.write("x")
self.stream.flush()
self.dignStream.flush()
def addUnexpectedSuccess(self, test):
super(DignTextTestResult, self).addUnexpectedSuccess(test)
if self.showAll:
self.stream.writeln("unexpected success")
self.dignStream.writeln("unexpected success")
elif self.dots:
self.stream.write("u")
self.dignStream.write("u")
self.stream.flush()
self.dignStream.flush()
def printErrors(self):
if self.dots or self.showAll:
self.stream.writeln()
self.dignStream.writeln()
self.printErrorList('ERROR', self.errors)
self.printErrorList('FAIL', self.failures)
def printErrorList(self, flavour, errors):
for test, err in errors:
self.stream.writeln(self.separator1)
self.dignStream.writeln(self.separator1)
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
self.dignStream.writeln("%s: %s" % (flavour,self.getDescription(test)))
self.stream.writeln(self.separator2)
self.dignStream.writeln(self.separator2)
self.stream.writeln("%s" % err)
self.dignStream.writeln("%s" % err)
def get_logger(name,logPath,enableConsole=True):
logger = logging.getLogger(name)
fileHandler = logging.FileHandler(logPath)
fmt = '%(asctime)s, %(levelname)s, %(message)s'
formatter = logging.Formatter(fmt=fmt, datefmt='%a %b %d %H:%M:%S %Y')
fileHandler.setFormatter(formatter)
fileHandler.setLevel(logging.DEBUG)
logger.addHandler(fileHandler)
if enableConsole == True:
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
consoleHandler.setLevel(logging.DEBUG)
logger.addHandler(consoleHandler)
logger.setLevel(logging.DEBUG)
return logger
class DNSCheckRequestBuild:
def dns_action_deny_subaction_drop(self,test_suite_name):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
try:
dns_answer = dns_resolver.query(HOST_DNS_DENY_DORY, 'A')
except dns.exception.DNSException as errorinfo:
if type(errorinfo) == dns.resolver.LifetimeTimeout:
raise Exception(DnsRequestFirewallDenyDrop)
else:
raise Exception("Error: The dns_action_deny_subaction_drop check failure, code: %s" % errorinfo)
else:
raise Exception("Error: The dns_action_deny_subaction_drop test deny drop failure" )
def dns_action_deny_subaction_redirect_a(self,test_suite_name):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
try:
dns_answer = dns_resolver.query(HOST_DNS_DENY_REDIRECT_A, 'A')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure, code: %s" % errorinfo)
else: # drop-redirect and respond rdtype A ipv4
if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL:
raise Exception("Error: The dns_action_deny_subaction_redirect_a check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 1: #'rdtype is A: ipv4'
if j.address == DNS_REDIRECT_IPV4_ADDR:
raise Exception(DnsARequestFireWallDenyRedirect)
else:
raise Exception("Error: The dns request rdtype A drop redirect check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype A drop redirect check failure: respond rdtype error")
def dns_action_deny_subaction_redirect_aaaa(self,test_suite_name):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
try:
dns_answer = dns_resolver.query(HOST_DNS_DENY_REDIRECT_AAAA, 'AAAA')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure, code: %s" % errorinfo)
else: # drop-redirect and respond rdtype A ipv6
if dns_answer.rrset.ttl != DNS_SERVER_REDIRECT_TTL:
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa check failure: ttl(%s) is not DNS_SERVER_REDIRECT_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 28: #'rdtype is A: ipv6'
if j.address == DNS_REDIRECT_IPV6_ADDR:
raise Exception(DnsAAAARequestFireWallDenyRedirect)
else:
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error")
def dns_action_deny_subaction_redirect_a_rang_ttl(self,test_suite_name):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
try:
dns_answer = dns_resolver.query(HOST_DNS_DENY_REDIRECT_A_RTTL, 'A')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure, code: %s" % errorinfo)
else: # drop-redirect and respond rdtype A ipv4
if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH:
raise Exception("Error: The dns_action_deny_subaction_redirect_a_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl,DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 1: #'rdtype is A: ipv4'
if j.address == DNS_REDIRECT_IPV4_ADDR:
raise Exception(DnsARequestFireWallDenyRedirectRangTTL)
else:
raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype A drop redirect range ttl check failure: respond rdtype error")
def dns_action_deny_subaction_redirect_aaaa_rang_ttl(self,test_suite_name):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
try:
dns_answer = dns_resolver.query(HOST_DNS_DENY_REDIRECT_AAAA_RTTL, 'AAAA')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa range ttl check failure, code: %s" % errorinfo)
else: # drop-redirect and respond rdtype A ipv6
if DNS_SERVER_REDIRECT_RANGE_LOW > dns_answer.rrset.ttl or dns_answer.rrset.ttl > DNS_SERVER_REDIRECT_RANGE_HIGH:
raise Exception("Error: The dns_action_deny_subaction_redirect_aaaa_rang_ttl check failure: ttl(%d) is not DNS_SERVER_REDIRECT_RANG_TTL(%d-%d)"%(dns_answer.rrset.ttl, DNS_SERVER_REDIRECT_RANGE_LOW, DNS_SERVER_REDIRECT_RANGE_HIGH))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 28: #'rdtype is A: ipv6'
if j.address == DNS_REDIRECT_IPV6_ADDR:
raise Exception(DnsAAAARequestFireWallDenyRedirectRangTTL)
else:
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype AAAA drop redirect check failure: respond rdtype error")
def dns_action_allow_rdtype_a(self,test_suite_name):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
try:
dns_answer = dns_resolver.query(HOST_DNS_ALLOW_A, 'A')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns request rdtype A allow check failure, code: %s" % errorinfo)
else:
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 1: #'rdtype is A: ipv4'
if j.address == DNS_ALLOW_A_ADDR:
raise Exception(DnsARequestFirewallAllow)
else:
raise Exception("Error: The dns request rdtype A allow check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype A allow check failure: respond rdtype error")
def dns_action_allow_rdtype_aaaa(self,test_suite_name):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
try:
dns_answer = dns_resolver.query(HOST_DNS_ALLOW_AAAA, 'AAAA')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns request rdtype AAAA allow check failure, code: %s" % errorinfo)
else:
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 28: #'rdtype is AAAA: ipv6'
if j.address == DNS_ALLOW_AAAA_ADDR:
raise Exception(DnsAAAARequestFirewallAllow)
else:
raise Exception("Error: The dns request rdtype AAAA allow check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype AAAA allow check failure: response rdtype error")
def dns_action_allow_rdtype_cname(self,test_suite_name):
dns_resolver=dns.resolver.Resolver()
dns_resolver.nameservers = DNS_SERVER_IP
dns_resolver.timeout = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
dns_resolver.lifetime = float(suite_test_config_dict[test_suite_name]['conn_timeout'])
try:
dns_answer = dns_resolver.query(HOST_DNS_CNAME_QUERY, 'CNAME')
except dns.exception.DNSException as errorinfo:
raise Exception("Error: The dns request rdtype CNAME allow check failure, code: %s" % errorinfo)
else:
if dns_answer.rrset.ttl != DNS_SERVER_ALLOW_TTL:
raise Exception("Error: The dns request rdtype A allow check failure: ttl(%d) is not DNS_SERVER_ALLOW_TTL(%d)"%(dns_answer.rrset.ttl, DNS_SERVER_ALLOW_TTL))
return
for i in dns_answer.response.answer:
for j in i.items:
if j.rdtype == 5: #'CNAME: tag(www.xxx.com)'
m=str(j)
if m == (HOST_DNS_CNAME_ANSWER + '.'):
raise Exception(DnsCNAMERequestFirewallAllow)
else:
raise Exception("Error: The dns request rdtype CNAME allow check failure: respond value error")
else:
raise Exception("Error: The dns request rdtype CNAME allow check failure: respond rdtype error")
class SSLCheckRequestBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _get_conn_issuer(self,certs):
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
return issuer
def ssl_bypass(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLBypass)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = self._get_conn_issuer(certs)
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0):
raise Exception(ssl_bypass_info_re)
elif re.search(r'\bCN[\s]*=[\s]*Tango[\s]*Secure[\s]*Gateway[\s]*CA[\s\S]*\b',issuer[1],0):
raise Exception("Error:Ssl connection is intercepted, not bypass, cert info: %s" % issuer[1])
else:
raise Exception("Error:Got other error certificate information, cert info: %s" % issuer[1])
def ssl_intercept(self,test_suite_name):
self._set_conn_opt(test_suite_name, URLIntercept)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = self._get_conn_issuer(certs)
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert is untrust, cert info: %s" % issuer[1])
else:
raise Exception(ssl_intercept_info_re)
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
class SslInterceptRequestBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name,url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL, url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _conn_to_perform(self, test_suite_name, sec_info_re):
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception(sec_info_re)
else:
raise Exception("Error: Ssl connection is intercept, cert maybe trust, cert info: %s" % issuer[1])
else:
if re.search(r'\bCN[\s]*=[\s]*BadSSL\b',issuer[1],0) or \
re.search(r'\bCN[\s\S]*=[\s\S]*badssl[\s\S]*',issuer[1],0):
raise Exception("Error: Ssl connection intercept failed, cert info: %s" % issuer[1])
else:
raise Exception(sec_info_re)
def ssl_intercept_certerrExpired(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslExpired)
self._conn_to_perform(test_suite_name,https_exprired_info_re)
def ssl_intercept_certerrSelf_signed(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslSelfsigned)
self._conn_to_perform(test_suite_name,https_self_signed_info_re)
def ssl_intercept_certerrUntrusted_root(self,test_suite_name,):
self._set_conn_opt(test_suite_name,URLSslSuntrustedroot)
self._conn_to_perform(test_suite_name,https_untrusted_root_info_re)
class ProxyRequestBuild:
def __init__(self):
self.bodyBuf = BytesIO()
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.ENCODING, "gzip,deflate")
def _cert_verify(self, certs, isSsl):
if isSsl == True:
issuer = ()
for cert_info in certs[0]:
if cert_info[0].lower() == "issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1])
else:
return
else:
raise Exception("Error: Got other error certificate information, cert info: %s" % issuer[1])
def _set_conn_opt(self,test_suite_name, url,isSsl):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
self.conn.setopt(self.conn.WRITEDATA, self.bodyBuf)
if isSsl == True:
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def proxy_redirect(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
self._cert_verify(certs, isSsl)
if rescode == 301 or rescode == 302:
if isSsl == True:
raise Exception(ssl_redirect_info_re)
else:
raise Exception(http_redirect_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection redirect fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http Connection redirect fail,RESPONSE_CODE = %d" % rescode)
def proxy_replace(self,test_suite_name, proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name, proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close()
self._cert_verify(certs, isSsl)
if not re.search(r'EnglishSearchShared', body, 0) and \
re.search(r'03C174CD9D809789CCEC18D6F585DF3E', body, 0):
if isSsl == True:
raise Exception(ssl_replace_info_re)
else:
raise Exception(http_replace_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection replace fail")
else:
raise Exception("Error:Http connection replace fail")
def proxy_insert(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
body = self.bodyBuf.getvalue().decode('utf-8')
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
self._cert_verify(certs, isSsl)
if re.search(r'httpSelfcheckInsert', body, 0) and \
re.search(r'5BE3754D1EA8D51E8D993060FA225330', body, 0):
if isSsl == True:
raise Exception(ssl_insert_info_re)
else:
raise Exception(http_insert_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection insert fail")
else:
raise Exception("Error:Http connection insert fail")
def proxy_block(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
body = self.bodyBuf.getvalue().decode('utf-8')
self.conn.close()
self._cert_verify(certs, isSsl)
if re.search(r'E33F01E50AFE043191931DD40190B09B', body, 0) and (rescode == 404 or rescode == 451):
if isSsl == True:
raise Exception(ssl_block_info_re)
else:
raise Exception(http_block_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection block fail, RESPONSE_CODE = %d" % rescode)
else:
raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode)
def proxy_hijack(self,test_suite_name,proxy_url,isSsl):
certs = None
self._set_conn_opt(test_suite_name,proxy_url,isSsl)
self.conn.perform()
if isSsl == True:
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
self.conn.close()
self._cert_verify(certs, isSsl)
hijack_file_md5 = hashlib.md5(self.bodyBuf.getvalue())
if re.search("4bf06db1a228c5c8d978ebf9e1169d0d", hijack_file_md5.hexdigest(), 0):
if isSsl == True:
raise Exception(ssl_hijack_info_re)
else:
raise Exception(http_hijack_info_re)
else:
if isSsl == True:
raise Exception("Error:Ssl connection hijack fail")
else:
raise Exception("Error:Http connection hijack fail")
class SSLFileDownloadBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
def _get_conninfo(self,conn):
dictconninfo = {}
dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE)
dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME)
dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME)
dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME)
dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME)
dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME)
dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD)
dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD)
dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD)
dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD)
dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME)
return dictconninfo
def _set_conn_opt(self,test_suite_name,url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _write_in_logfile(self, sizeStr, connInfoDict):
connInfoLogPath = "/opt/dign_client/log/download_conn_info.log" + '.' + time.strftime("%Y-%m-%d", time.localtime())
#connInfoLogger = get_logger("download",connInfoLogPath,enableConsole=False)
connInfoStr = json.dumps(connInfoDict)
#connInfoLogger.debug("Fize Size:" + sizeStr + ";connection info:" + connInfoStr)
with open(connInfoLogPath,"a+") as f:
f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Fize Size:" + sizeStr + ";connection info:" + connInfoStr + '\n')
f.close()
def conn_traffic(self,test_suite_name,url,conn_taffic_re,sizeStr, size):
self._set_conn_opt(test_suite_name, url)
self.conn.perform()
certs = self.conn.getinfo(self.conn.INFO_CERTINFO)
conninfo = self._get_conninfo(self.conn)
self.conn.close()
issuer = ()
for cert_info in certs[0]:
if cert_info[0] == "Issuer":
issuer = cert_info
break
if len(issuer) <= 0:
raise Exception("Error: Get certificate info error, certificate's length is %s" % len(issuer))
if re.search(r'\bCN[\s]*=[\s]*Tango\b',issuer[1],0):
if re.search(r'\bCN = Tango[\s\S]*UNTRUST\b',issuer[1],0):
raise Exception("Error: Ssl connection is intercept, cert maybe untrust, cert info: %s" % issuer[1])
else:
raise Exception("Error: Intercept fail: no Tango cert,cert info:%s" % issuer[1])
if int(conninfo["size_download"]) == size:
self._write_in_logfile(sizeStr,conninfo)
raise Exception(conn_taffic_re)
else:
raise Exception("Error: connection tarffic size error and is no equal", sizeStr)
class HttpFirewallActionBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def action_allow(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallAllow)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 200:
raise Exception(http_firewall_allow_re)
else:
raise Exception("Error: The stream may be redirected, http code %s" % rescode)
def action_deny_subaction_drop(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyDrop)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 28):
raise Exception(http_firewall_deny_drop_re)
else:
raise Exception("Error: The stream may be not dropped %s" % errorinfo)
def action_deny_subaction_rst(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyRst)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 56):
raise Exception(http_firewall_deny_rst_re)
else:
raise Exception("Error: The stream may be not rst %s" % errorinfo)
def action_deny_subaction_block(self,test_suite_name):
bodyBuf = BytesIO()
self._set_conn_opt(test_suite_name,URLHttpFirewallDenyBlock)
self.conn.setopt(self.conn.WRITEDATA, bodyBuf)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 403:
if re.search(r'dign-testing-deny-block', bodyBuf.getvalue().decode('utf-8'), 0):
raise Exception(http_firewall_deny_block_re)
else:
raise Exception("Error: response content is not required, required \'dign-testing-deny-block\'")
else:
raise Exception("Error: The stream may be not block, http code %s " % rescode)
class SslFirewallActionBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
self.conn.setopt(self.conn.OPT_CERTINFO, 1)
self.conn.setopt(self.conn.SSL_VERIFYPEER, False)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def action_allow(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallAllow)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
self.conn.close()
if rescode == 200:
raise Exception(ssl_firewall_allow_re)
else:
raise Exception("Error: The stream may be redirected, http code %s" % rescode)
def action_deny_subaction_drop(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallDenyDrop)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 28):
raise Exception(ssl_firewall_deny_drop_re)
else:
raise Exception("Error: The stream may be not dropped %s" % errorinfo)
def action_deny_subaction_rst(self,test_suite_name):
self._set_conn_opt(test_suite_name,URLSslFirewallDenyRst)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 35):
raise Exception(ssl_firewall_deny_rst_re)
else:
raise Exception("Error: The stream may be not rst %s" % errorinfo)
class FilterTestingBuild:
def __init__(self):
self.conn = pycurl.Curl()
self.conn.setopt(self.conn.WRITEFUNCTION, BytesIO().write)
def _set_conn_opt(self,test_suite_name, url):
self.conn.setopt(self.conn.MAX_RECV_SPEED_LARGE, int(suite_test_config_dict[test_suite_name]['max_recv_speed_large']))
self.conn.setopt(self.conn.URL,url)
self.conn.setopt(self.conn.TIMEOUT, int(suite_test_config_dict[test_suite_name]['conn_timeout']))
def _firewall_deny_reset(self,test_suite_name, url, raise_re):
self._set_conn_opt(test_suite_name,url)
try:
self.conn.perform()
self.conn.close()
except pycurl.error as errorinfo:
errcode = errorinfo.args[0]
if(errcode == 56):
raise Exception(raise_re)
else:
raise Exception("Error: The stream may be not rst %s" % errorinfo)
def _proxy_deny(self,test_suite_name,url,replaceStr,raise_re):
bodyBuf = BytesIO()
self._set_conn_opt(test_suite_name,url)
self.conn.setopt(self.conn.WRITEDATA, bodyBuf)
self.conn.perform()
rescode = self.conn.getinfo(self.conn.RESPONSE_CODE)
body = bodyBuf.getvalue().decode('utf-8')
self.conn.close()
if re.search(replaceStr, body, 0) and (rescode == 404 or rescode == 451):
raise Exception(raise_re)
else:
raise Exception("Error:Http connection block fail, RESPONSE_CODE = %d" % rescode)
def firewall_http_deny_reset_filter_host(self,test_suite_name):
self._firewall_deny_reset(test_suite_name,URLHttpFirewallDenyRstFilterHost,http_firewall_deny_rst_filter_host_re)
def firewall_http_deny_reset_filter_url(self,test_suite_name):
self._firewall_deny_reset(test_suite_name,URLHttpFirewallDenyRstFilterURL,http_firewall_deny_rst_filter_url_re)
def proxy_http_deny_filter_host(self,test_suite_name):
self._proxy_deny(test_suite_name,URLHttpProxyDenyFilterHost, "testing-proxy-filter-host",http_proxy_deny_filter_host_re)
def proxy_http_deny_filter_url(self,test_suite_name):
self._proxy_deny(test_suite_name,URLHttpProxyDenyFilterURL,"testing-proxy-filter-url",http_proxy_deny_filter_url_re)
class TSGDiagnoseTest(unittest.TestCase):
def test_firewallDenyDrop_dns(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsRequestFirewallDenyDrop):
dnsHandler.dns_action_deny_subaction_drop('test_firewallDenyDrop_dns')
def test_firewallDenyRedirectA_dns(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirect):
dnsHandler.dns_action_deny_subaction_redirect_a('test_firewallDenyRedirectA_dns')
def test_firewallDenyRedirectAAAA_dns(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirect):
dnsHandler.dns_action_deny_subaction_redirect_aaaa('test_firewallDenyRedirectARangeTTL_dns')
def test_firewallDenyRedirectARangeTTL_dns(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsARequestFireWallDenyRedirectRangTTL):
dnsHandler.dns_action_deny_subaction_redirect_a_rang_ttl('test_firewallDenyRedirectARangeTTL_dns')
def test_firewallDenyRedirectAAAARangeTTL_dns(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsAAAARequestFireWallDenyRedirectRangTTL):
dnsHandler.dns_action_deny_subaction_redirect_aaaa_rang_ttl('test_firewallDenyRedirectAAAARangeTTL_dns')
def test_dnsRequest_allow_rdtype_a(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsARequestFirewallAllow):
dnsHandler.dns_action_allow_rdtype_a('test_dnsRequest_allow_rdtype_a')
def test_dnsRequest_allow_rdtype_aaaa(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsAAAARequestFirewallAllow):
dnsHandler.dns_action_allow_rdtype_aaaa('test_dnsRequest_allow_rdtype_aaaa')
def test_dnsRequest_allow_rdtype_cname(self):
dnsHandler = DNSCheckRequestBuild()
with self.assertRaisesRegex(Exception, DnsCNAMERequestFirewallAllow):
dnsHandler.dns_action_allow_rdtype_cname('test_dnsRequest_allow_rdtype_cname')
def test_firewallBypass_ssl(self):
sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_bypass_info_re):
sslHandler.ssl_bypass('test_firewallBypass_ssl')
def test_firewallIntercept_ssl(self):
sslHandler = SSLCheckRequestBuild()
with self.assertRaisesRegex(Exception, ssl_intercept_info_re):
sslHandler.ssl_intercept('test_firewallIntercept_ssl')
def test_firewallIntercept_sslCerterrExpired(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_exprired_info_re):
requestHandler.ssl_intercept_certerrExpired('test_firewallIntercept_sslCerterrExpired')
def test_firewallIntercept_sslCerterrSelfsigned(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_self_signed_info_re):
requestHandler.ssl_intercept_certerrSelf_signed('test_firewallIntercept_sslCerterrSelfsigned')
def test_firewallIntercept_sslCerterrUntrustedroot(self):
requestHandler = SslInterceptRequestBuild()
with self.assertRaisesRegex(Exception, https_untrusted_root_info_re):
requestHandler.ssl_intercept_certerrUntrusted_root('test_firewallIntercept_sslCerterrUntrustedroot')
def test_proxyRedirect_ssl(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_redirect_info_re):
proxyHandler.proxy_redirect('test_proxyRedirect_ssl',URLSslRedirect,True)
def test_proxyBlock_ssl(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_block_info_re):
proxyHandler.proxy_block('test_proxyBlock_ssl', URLSslBlock,True)
def test_proxyReplace_ssl(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_replace_info_re):
proxyHandler.proxy_replace('test_proxyReplace_ssl',URLSslReplace, True)
def test_proxyHijack_ssl(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_hijack_info_re):
proxyHandler.proxy_hijack('test_proxyHijack_ssl', URLSslHijack,True)
def test_proxyInsert_ssl(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, ssl_insert_info_re):
proxyHandler.proxy_insert('test_proxyInsert_ssl',URLSslInsert,True)
def test_proxyRedirect_http(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_redirect_info_re):
proxyHandler.proxy_redirect('test_proxyRedirect_http',URLHttpRedirect, False)
def test_proxyBlock_http(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_block_info_re):
proxyHandler.proxy_block('test_proxyBlock_http', URLHttpBlock,False)
def test_proxyReplace_http(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_replace_info_re):
proxyHandler.proxy_replace('test_proxyReplace_http',URLHttpReplace, False)
def test_proxyHijack_http(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_hijack_info_re):
proxyHandler.proxy_hijack('test_proxyHijack_http', URLHttpHijack,False)
def test_proxyInsert_http(self):
proxyHandler = ProxyRequestBuild()
with self.assertRaisesRegex(Exception, http_insert_info_re):
proxyHandler.proxy_insert('test_proxyInsert_http',URLHttpInsert,False)
def test_firewallIntercept_sslDownloadSize1k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1k_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize1k', URLConTraffic_1k, https_conn_taffic_1k_re,'1k', 1024)
def test_firewallIntercept_sslDownloadSize4k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4k_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize4k',URLConTraffic_4k, https_conn_taffic_4k_re, '4k', 4*1024)
def test_firewallIntercept_sslDownloadSize16k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16k_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize16k', URLConTraffic_16k, https_conn_taffic_16k_re,'16k', 16*1024)
def test_firewallIntercept_sslDownloadSize64k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64k_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize64k',URLConTraffic_64k, https_conn_taffic_64k_re, '64k', 64*1024)
def test_firewallIntercept_sslDownloadSize256k(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_256k_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize256k', URLConTraffic_256k,https_conn_taffic_256k_re,'256k', 256*1024)
def test_firewallIntercept_sslDownloadSize1M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_1M_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize1M', URLConTraffic_1M, https_conn_taffic_1M_re, '1M', 1024 * 1024)
def test_firewallIntercept_sslDownloadSize4M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_4M_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize4M', URLConTraffic_4M, https_conn_taffic_4M_re,'4M', 4*1024*1024)
def test_firewallIntercept_sslDownloadSize16M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_16M_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize16M', URLConTraffic_16M,https_conn_taffic_16M_re,'16M',16*1024*1024)
def test_firewallIntercept_sslDownloadSize64M(self):
requestHandler = SSLFileDownloadBuild()
with self.assertRaisesRegex(Exception,https_conn_taffic_64M_re):
requestHandler.conn_traffic( 'test_firewallIntercept_sslDownloadSize64M',URLConTraffic_64M,https_conn_taffic_64M_re, '64M', 64*1024*1024)
def test_firewallAllow_http(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_allow_re):
requestHandler.action_allow('test_firewallAllow_http')
def test_firewallDenyDrop_http(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_drop_re):
requestHandler.action_deny_subaction_drop('test_firewallDenyDrop_http')
def test_firewallDenyReset_http(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_rst_re):
requestHandler.action_deny_subaction_rst('test_firewallDenyReset_http')
def test_firewallDenyBlock_http(self):
requestHandler = HttpFirewallActionBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_block_re):
requestHandler.action_deny_subaction_block('test_firewallDenyBlock_http')
def test_firewallAllow_ssl(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_allow_re):
requestHandler.action_allow('test_firewallAllow_ssl')
def test_firewallDenyDrop_ssl(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_deny_drop_re):
requestHandler.action_deny_subaction_drop('test_firewallDenyDrop_ssl')
def test_firewallDenyReset_ssl(self):
requestHandler = SslFirewallActionBuild()
with self.assertRaisesRegex(Exception, ssl_firewall_deny_rst_re):
requestHandler.action_deny_subaction_rst('test_firewallDenyReset_ssl')
def test_firewallDenyResetFilterHost_http(self):
requestHandler = FilterTestingBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_rst_filter_host_re):
requestHandler.firewall_http_deny_reset_filter_host('test_firewallDenyResetFilterHost_http')
def test_firewallDenyResetFilterURL_http(self):
requestHandler = FilterTestingBuild()
with self.assertRaisesRegex(Exception, http_firewall_deny_rst_filter_url_re):
requestHandler.firewall_http_deny_reset_filter_url('test_firewallDenyResetFilterURL_http')
def test_proxyDenyFilterHost_http(self):
requestHandler = FilterTestingBuild()
with self.assertRaisesRegex(Exception, http_proxy_deny_filter_host_re):
requestHandler.proxy_http_deny_filter_host('test_proxyDenyFilterHost_http')
def test_proxyDenyFilterURL_http(self):
requestHandler = FilterTestingBuild()
with self.assertRaisesRegex(Exception, http_proxy_deny_filter_url_re):
requestHandler.proxy_http_deny_filter_url('test_proxyDenyFilterURL_http')
class TsgDiagnose:
def __init__(self):
self.interval = 1
self.loop = False
self.count = 1
self.config = None
self.client = None
self.config_dict = {}
self.dign_duration = 0
def _get_dign_option(self):
parser = argparse.ArgumentParser(description="Tsg Tools - tsg diagnose", epilog = "Example:help")
parser.add_argument('-i','--interval', type = int, default = 30,help='Wait interval seconds between each tsg disagnose. The default is to wait for 30 seconds between each tsg diagnose.')
parser.add_argument('-c','--count', type = int, default = 1, help='Specifies the count of tsg diagnoses ,range:1-65535')
parser.add_argument('-p','--configpath', type = str, default = '/opt/dign_client/etc/client.conf',help='Specifies the config file, default /opt/dign_client/etc/client.conf')
parser.add_argument('-l','--loop', action='store_true', default = False, help='Tsg diagnose loop, exit when recv a signal')
args = parser.parse_args()
self.interval = args.interval
self.loop = args.loop
self.count = args.count
self.config = args.configpath
if self.count == 0:
print("Error: bad number of tsg diagnose and will exit")
parser.print_help()
sys.exit(1)
def _get_dign_config(self):
global suite_test_config_dict
config = ConfigParser()
config.read(self.config, encoding='UTF-8')
for section in config.sections():
if section in suite_test_config_dict:
suite_test_config_dict[section].update(config.items(section))
else:
suite_test_config_dict[section] = dict(config.items(section))
self.config_dict = suite_test_config_dict
for config_value in self.config_dict.values():
if 'conn_timeout' not in config_value:
continue
if int(config_value['enabled']) != 1:
continue
self.dign_duration += int(config_value['conn_timeout'])
def _add_suite(self,test_suite_name):
if int(self.config_dict[test_suite_name]['enabled']) == 1:
self.suite.addTest(TSGDiagnoseTest(test_suite_name))
def _add_dign_case(self):
self.suite = unittest.TestSuite()
self.suite._cleanup = False
self._add_suite('test_firewallBypass_ssl')
self._add_suite('test_firewallIntercept_ssl')
self._add_suite('test_firewallIntercept_sslCerterrExpired')
self._add_suite('test_firewallIntercept_sslCerterrSelfsigned')
self._add_suite('test_firewallIntercept_sslCerterrUntrustedroot')
self._add_suite('test_proxyRedirect_ssl')
self._add_suite('test_proxyBlock_ssl')
self._add_suite('test_proxyReplace_ssl')
self._add_suite('test_proxyHijack_ssl')
self._add_suite('test_proxyInsert_ssl')
self._add_suite('test_proxyRedirect_http')
self._add_suite('test_proxyBlock_http')
self._add_suite('test_proxyReplace_http')
self._add_suite('test_proxyHijack_http')
self._add_suite('test_proxyInsert_http')
self._add_suite('test_firewallAllow_http')
self._add_suite('test_firewallDenyDrop_http')
self._add_suite('test_firewallDenyReset_http')
self._add_suite('test_firewallDenyBlock_http')
self._add_suite('test_firewallAllow_ssl')
self._add_suite('test_firewallDenyDrop_ssl')
self._add_suite('test_firewallDenyReset_ssl')
self._add_suite('test_firewallDenyDrop_dns')
self._add_suite('test_firewallDenyRedirectA_dns')
self._add_suite('test_firewallDenyRedirectAAAA_dns')
self._add_suite('test_firewallDenyRedirectARangeTTL_dns')
self._add_suite('test_firewallDenyRedirectAAAARangeTTL_dns')
self._add_suite('test_firewallIntercept_sslDownloadSize1k')
self._add_suite('test_firewallIntercept_sslDownloadSize4k')
self._add_suite('test_firewallIntercept_sslDownloadSize16k')
self._add_suite('test_firewallIntercept_sslDownloadSize64k')
self._add_suite('test_firewallIntercept_sslDownloadSize256k')
self._add_suite('test_firewallIntercept_sslDownloadSize1M')
self._add_suite('test_firewallIntercept_sslDownloadSize4M')
self._add_suite('test_firewallIntercept_sslDownloadSize16M')
self._add_suite('test_firewallIntercept_sslDownloadSize64M')
self._add_suite('test_firewallDenyResetFilterHost_http')
self._add_suite('test_firewallDenyResetFilterURL_http')
self._add_suite('test_proxyDenyFilterHost_http')
self._add_suite('test_proxyDenyFilterURL_http')
def _dign_running(self):
print(format(("Test start time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'#^70s'))
runningLogPath = "/opt/dign_client/log/tsg-diagnose.log" + '.' + time.strftime("%Y-%m-%d", time.localtime())
#runningLogger = get_logger("running",runningLogPath, False)
#runningLogger.debug("Diagnose Start,the It will take up to %d seconds" %(self.dign_duration))
with open(runningLogPath,"a+") as f:
f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Diagnose Start,the It will take up to " + str(self.dign_duration) + " senconds")
f.write('\n')
f.close()
result_stream = io.StringIO()
runner = unittest.TextTestRunner(stream=result_stream,verbosity=2,resultclass=DignTextTestResult)
runner.run(self.suite)
with open(runningLogPath,"a+") as f:
f.write(time.strftime("%a %b %d %H:%M:%S %Y", time.localtime()) + "Diagnose end, Testing results:" + "\n" + result_stream.getvalue())
f.close()
#runningLogger.debug("Diagnose end, Testing results:" + "\n" + result_stream.getvalue())
print(format(("Test end time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())),'=^70s'))
def dign_exec(self):
self._get_dign_option()
self._get_dign_config()
self._add_dign_case()
dign_counter = 0
try:
if int(self.config_dict['start_time_random_delay_range']['enabled']) == 1:
time.sleep(random.randint( \
int(self.config_dict['start_time_random_delay_range']['left_edge']),\
int(self.config_dict['start_time_random_delay_range']['right_edge'])))
while True:
print("\nRUN %d" %(dign_counter + 1))
self._dign_running()
dign_counter = dign_counter + 1
if not self.loop:
if dign_counter >= self.count:
break
time.sleep(self.interval)
except Exception as ex:
print("Process get an exception, will exit, Exception info:", ex)
sys.exit(1)
if __name__ == '__main__':
tsg_diagnose_running = TsgDiagnose()
tsg_diagnose_running.dign_exec()