import sys import json import pycurl import os import re import time from io import BytesIO class SSLBuild: def __init__(self): self.urldict = {"0k":"https://downloadfile.self-test.geedge.net/0k", \ "1k":"https://downloadfile.self-test.geedge.net/1k", \ "2k":"https://downloadfile.self-test.geedge.net/2k", \ "4k":"https://downloadfile.self-test.geedge.net/4k", \ "8k":"https://downloadfile.self-test.geedge.net/8k", \ "16k":"https://downloadfile.self-test.geedge.net/16k", \ "32k":"https://downloadfile.self-test.geedge.net/32k", \ "64k":"https://downloadfile.self-test.geedge.net/64k", \ "128k":"https://downloadfile.self-test.geedge.net/128k", \ "256k":"https://downloadfile.self-test.geedge.net/256k", \ "512k":"https://downloadfile.self-test.geedge.net/512k", \ "1M":"https://downloadfile.self-test.geedge.net/1M", \ "2M":"https://downloadfile.self-test.geedge.net/2M", \ "4M":"https://downloadfile.self-test.geedge.net/4M", \ "8M":"https://downloadfile.self-test.geedge.net/8M", \ "16M":"https://downloadfile.self-test.geedge.net/16M", \ "32M":"https://downloadfile.self-test.geedge.net/32M", \ "64M":"https://downloadfile.self-test.geedge.net/64M"} self.sizelist = ["0k","1k","2k","4k","8k","16k","32k","64k","128k","256k","512k","1M","2M","4M","8M","16M","32M","64M"] self.resultlist = [] def build_conninfo_json(self,conn): dictconninfo = {} dictconninfo["status"] = conn.getinfo(pycurl.HTTP_CODE) dictconninfo["dns_time"] = conn.getinfo(pycurl.NAMELOOKUP_TIME) dictconninfo["conn_time"] = conn.getinfo(pycurl.CONNECT_TIME) dictconninfo["app_time"] = conn.getinfo(pycurl.APPCONNECT_TIME) dictconninfo["start_transfer_time"] = conn.getinfo(pycurl.STARTTRANSFER_TIME) dictconninfo["total_time"] = conn.getinfo(pycurl.TOTAL_TIME) dictconninfo["redirect_count"] = conn.getinfo(pycurl.REDIRECT_COUNT) dictconninfo["size_upload"] = conn.getinfo(pycurl.SIZE_UPLOAD) dictconninfo["size_download"] = conn.getinfo(pycurl.SIZE_DOWNLOAD) dictconninfo["header_size"] = conn.getinfo(pycurl.HEADER_SIZE) dictconninfo["request_size"] = conn.getinfo(pycurl.REQUEST_SIZE) dictconninfo["speed_upload"] = conn.getinfo(pycurl.SPEED_UPLOAD) dictconninfo["speed_download"] = conn.getinfo(pycurl.SPEED_DOWNLOAD) dictconninfo["time_connect"] = conn.getinfo(pycurl.CONNECT_TIME) dictconninfo["time_pretransfer"] =conn.getinfo(pycurl.PRETRANSFER_TIME) return dictconninfo def intecept_succ_get_value(self,urlkey,url,conn): dictinfo = {} dictinfo["downloadsize"] = urlkey dictinfo["url"] = url dictinfo["time"] = time.asctime( time.localtime(time.time())) dictinfo["result"] = self.build_conninfo_json(conn) self.resultlist.append(dictinfo) def ssl_conn(self,urlkey,url): issuer = () conn = pycurl.Curl() errdict = {} conn.setopt(conn.WRITEFUNCTION, BytesIO().write) conn.setopt(conn.SSL_VERIFYPEER, False) conn.setopt(conn.OPT_CERTINFO, 1) conn.setopt(conn.URL,url) try: conn.perform() except pycurl.error as e: errdict["status"] = "error" errdict["errinfo"] = e errdict["url"] = url errdict["time"] = time.asctime( time.localtime(time.time())) return certs = conn.getinfo(conn.INFO_CERTINFO) for cert_info in certs[0]: if cert_info[0].lower() == "issuer": issuer = cert_info break if len(issuer) <= 0: errdict["status"] = "error" errdict["errinfo"] = "Get certificate info error" errdict["url"] = url errdict["time"] = time.asctime( time.localtime(time.time())) self.resultlist.append(errdict) elif not re.search(r'CN = Tango[\s\S]*UNTRUST',issuer[1],0): errdict["status"] = "error" errdict["errinfo"] = "Intercept fail: no Tango cert" errdict["url"] = url errdict["time"] = time.asctime( time.localtime(time.time())) self.resultlist.append(errdict) else: self.intecept_succ_get_value(urlkey,url,conn) conn.close() def ssl_intercept(self): for sizefield in self.sizelist: self.ssl_conn(sizefield,self.urldict[sizefield]) if __name__ == '__main__': while True: try: ssl = SSLBuild() ssl.ssl_intercept() logpath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log." + time.strftime("%Y-%m-%d",time.localtime()) logNewestPath = "/root/result_self_test/con_traffic_inject/con_traffic_inject_result.log.newest" with open(logNewestPath,"w+") as f: f.write(json.dumps(ssl.resultlist, sort_keys=True, indent=4, separators=(',', ': '))) f.close() with open(logpath,"a+") as f: f.write(json.dumps(ssl.resultlist)) f.write("\n") f.close() time.sleep(1) except: print("Exception:an exception occurred during the execution of the program",file=sys.stderr)