diff --git a/unittest_python/unittest/con_traffic_inject.py b/unittest_python/unittest/con_traffic_inject.py deleted file mode 100644 index 3626a0e..0000000 --- a/unittest_python/unittest/con_traffic_inject.py +++ /dev/null @@ -1,118 +0,0 @@ -import sys -import json -import pycurl -import os -import re -import time -from io import BytesIO - -class SSLBuild: - def __init__(self): - self.urldict = {"0k":"https://downloadfile.self-test.geedge.net/0k", \ - "1k":"https://downloadfile.self-test.geedge.net/1k", \ - "2k":"https://downloadfile.self-test.geedge.net/2k", \ - "4k":"https://downloadfile.self-test.geedge.net/4k", \ - "8k":"https://downloadfile.self-test.geedge.net/8k", \ - "16k":"https://downloadfile.self-test.geedge.net/16k", \ - "32k":"https://downloadfile.self-test.geedge.net/32k", \ - "64k":"https://downloadfile.self-test.geedge.net/64k", \ - "128k":"https://downloadfile.self-test.geedge.net/128k", \ - "256k":"https://downloadfile.self-test.geedge.net/256k", \ - "512k":"https://downloadfile.self-test.geedge.net/512k", \ - "1M":"https://downloadfile.self-test.geedge.net/1M", \ - "2M":"https://downloadfile.self-test.geedge.net/2M", \ - "4M":"https://downloadfile.self-test.geedge.net/4M", \ - "8M":"https://downloadfile.self-test.geedge.net/8M", \ - "16M":"https://downloadfile.self-test.geedge.net/16M", \ - "32M":"https://downloadfile.self-test.geedge.net/32M", \ - "64M":"https://downloadfile.self-test.geedge.net/64M"} - self.sizelist = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"] - self.resultlist = [] - - def build_conninfo_json(self,conn): - dictconninfo = {} - dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) - dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) - dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME) - dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME) - dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME) - dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) - dictconninfo["redirect_count"] = conn.getinfo(pycurl.REDIRECT_COUNT) - dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) - dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) - dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE) - dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE) - dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) - dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) - dictconninfo["time_connect"] = conn.getinfo(pycurl.CONNECT_TIME) - dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) - return dictconninfo - - def intecept_succ_get_value(self,urlkey,url,conn): - dictinfo = {} - dictinfo["downloadsize"] = urlkey - dictinfo["url"] = url - dictinfo["time"] = time.asctime( time.localtime(time.time())) - dictinfo["result"] = self.build_conninfo_json(conn) - self.resultlist.append(dictinfo) - - - def ssl_conn(self,urlkey,url): - issuer = () - conn = pycurl.Curl() - errdict = {} - conn.setopt(conn.WRITEFUNCTION, BytesIO().write) - conn.setopt(conn.SSL_VERIFYPEER, False) - conn.setopt(conn.OPT_CERTINFO, 1) - conn.setopt(conn.URL,url) - try: - conn.perform() - except pycurl.error as e: - errdict["status"] = "error" - errdict["errinfo"] = e - errdict["url"] = url - errdict["time"] = time.asctime( time.localtime(time.time())) - return - certs = conn.getinfo(conn.INFO_CERTINFO) - for cert_info in certs[0]: - if cert_info[0].lower() == "issuer": - issuer = cert_info - break - if len(issuer) <= 0: - errdict["status"] = "error" - errdict["errinfo"] = "Get certificate info error" - errdict["url"] = url - errdict["time"] = time.asctime( time.localtime(time.time())) - self.resultlist.append(errdict) - elif not re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0): - errdict["status"] = "error" - errdict["errinfo"] = "Intercept fail: no Tango cert" - errdict["url"] = url - errdict["time"] = time.asctime( time.localtime(time.time())) - self.resultlist.append(errdict) - else: - self.intecept_succ_get_value(urlkey,url,conn) - conn.close() - def ssl_intercept(self): - - for sizefield in self.sizelist: - self.ssl_conn(sizefield,self.urldict[sizefield]) - - -if __name__ == '__main__': - while True: - try: - ssl = SSLBuild() - ssl.ssl_intercept() - logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime()) - logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest" - with open(logNewestPath,"w+") as f: - f.write(json.dumps(ssl.resultlist, sort_keys=True, indent=4, separators=(',', ': '))) - f.close() - with open(logpath,"a+") as f: - f.write(json.dumps(ssl.resultlist)) - f.write("\n") - f.close() - time.sleep(1) - except: - print("Exception:an exception occurred during the execution of the program",file=sys.stderr) \ No newline at end of file